feat: backfill_agg_trades.py - historical aggTrades REST backfill with rate limiting, continuity check

This commit is contained in:
root 2026-02-27 15:01:55 +00:00
parent 41af2ed2e6
commit 009f114e4d

View File

@ -0,0 +1,348 @@
"""
backfill_agg_trades.py 历史aggTrades回补脚本
功能
- 从当前DB最早agg_id向历史方向回补
- Binance REST API分页拉取每次1000条
- INSERT OR IGNORE写入按月分表
- 断点续传记录进度到agg_trades_meta
- 速率控制sleep 200ms/请求429自动退避
- BTC+ETH并行回补
用法
python3 backfill_agg_trades.py [--days 30] [--symbol BTCUSDT]
"""
import argparse
import logging
import os
import sqlite3
import sys
import time
from datetime import datetime, timezone
import requests
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler(os.path.join(os.path.dirname(__file__), "..", "backfill.log")),
],
)
logger = logging.getLogger("backfill")
DB_PATH = os.path.join(os.path.dirname(__file__), "..", "arb.db")
BINANCE_FAPI = "https://fapi.binance.com/fapi/v1"
HEADERS = {"User-Agent": "Mozilla/5.0 ArbitrageEngine/backfill"}
BATCH_SIZE = 1000
SLEEP_MS = 200 # 毫秒/请求
# ─── DB helpers ──────────────────────────────────────────────────
def get_conn() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH, timeout=30)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
return conn
def table_name(ts_ms: int) -> str:
dt = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc)
return f"agg_trades_{dt.strftime('%Y%m')}"
def ensure_table(conn: sqlite3.Connection, tname: str):
conn.execute(f"""
CREATE TABLE IF NOT EXISTS {tname} (
agg_id INTEGER PRIMARY KEY,
symbol TEXT NOT NULL,
price REAL NOT NULL,
qty REAL NOT NULL,
time_ms INTEGER NOT NULL,
is_buyer_maker INTEGER NOT NULL
)
""")
conn.execute(f"CREATE INDEX IF NOT EXISTS idx_{tname}_symbol_time ON {tname}(symbol, time_ms)")
def ensure_meta(conn: sqlite3.Connection):
conn.execute("""
CREATE TABLE IF NOT EXISTS agg_trades_meta (
symbol TEXT PRIMARY KEY,
last_agg_id INTEGER,
last_time_ms INTEGER,
earliest_agg_id INTEGER,
earliest_time_ms INTEGER
)
""")
# 添加earliest字段如果旧表没有
try:
conn.execute("ALTER TABLE agg_trades_meta ADD COLUMN earliest_agg_id INTEGER")
conn.execute("ALTER TABLE agg_trades_meta ADD COLUMN earliest_time_ms INTEGER")
except Exception:
pass # 已存在
def get_earliest_agg_id(conn: sqlite3.Connection, symbol: str) -> int | None:
"""查找DB中该symbol的最小agg_id"""
# 先查meta表
row = conn.execute(
"SELECT earliest_agg_id FROM agg_trades_meta WHERE symbol = ?", (symbol,)
).fetchone()
if row and row["earliest_agg_id"]:
return row["earliest_agg_id"]
# meta表没有扫所有月表
tables = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'agg_trades_2%'"
).fetchall()
min_id = None
for t in tables:
r = conn.execute(
f"SELECT MIN(agg_id) as mid FROM {t['name']} WHERE symbol = ?", (symbol,)
).fetchone()
if r and r["mid"] is not None:
if min_id is None or r["mid"] < min_id:
min_id = r["mid"]
return min_id
def update_earliest_meta(conn: sqlite3.Connection, symbol: str, agg_id: int, time_ms: int):
conn.execute("""
INSERT INTO agg_trades_meta (symbol, earliest_agg_id, earliest_time_ms)
VALUES (?, ?, ?)
ON CONFLICT(symbol) DO UPDATE SET
earliest_agg_id = MIN(excluded.earliest_agg_id, COALESCE(agg_trades_meta.earliest_agg_id, excluded.earliest_agg_id)),
earliest_time_ms = MIN(excluded.earliest_time_ms, COALESCE(agg_trades_meta.earliest_time_ms, excluded.earliest_time_ms))
""", (symbol, agg_id, time_ms))
conn.commit()
# ─── REST API ────────────────────────────────────────────────────
def fetch_agg_trades(symbol: str, from_id: int | None = None,
start_time: int | None = None, limit: int = 1000) -> list[dict]:
"""从Binance拉取aggTrades支持fromId和startTime"""
params = {"symbol": symbol, "limit": limit}
if from_id is not None:
params["fromId"] = from_id
elif start_time is not None:
params["startTime"] = start_time
for attempt in range(5):
try:
r = requests.get(
f"{BINANCE_FAPI}/aggTrades",
params=params, headers=HEADERS, timeout=15
)
if r.status_code == 429:
wait = min(60 * (2 ** attempt), 300)
logger.warning(f"Rate limited (429), waiting {wait}s (attempt {attempt+1})")
time.sleep(wait)
continue
if r.status_code != 200:
logger.error(f"HTTP {r.status_code}: {r.text[:200]}")
time.sleep(2)
continue
return r.json()
except requests.exceptions.RequestException as e:
logger.error(f"Request error: {e}, retrying in {2 ** attempt}s")
time.sleep(2 ** attempt)
return []
def write_batch(conn: sqlite3.Connection, symbol: str, trades: list[dict]) -> int:
"""批量写入,返回实际插入条数"""
inserted = 0
tables_seen = set()
for t in trades:
tname = table_name(t["T"])
if tname not in tables_seen:
ensure_table(conn, tname)
tables_seen.add(tname)
try:
conn.execute(
f"INSERT OR IGNORE INTO {tname} (agg_id, symbol, price, qty, time_ms, is_buyer_maker) "
f"VALUES (?, ?, ?, ?, ?, ?)",
(t["a"], symbol, float(t["p"]), float(t["q"]), t["T"], 1 if t["m"] else 0)
)
inserted += 1
except sqlite3.IntegrityError:
pass
conn.commit()
return inserted
# ─── 主逻辑 ──────────────────────────────────────────────────────
def backfill_symbol(symbol: str, target_days: int | None = None):
"""回补单个symbol的历史数据"""
conn = get_conn()
ensure_meta(conn)
earliest = get_earliest_agg_id(conn, symbol)
if earliest is None:
logger.info(f"[{symbol}] DB无数据从最新开始拉最近的数据确定起点")
trades = fetch_agg_trades(symbol, limit=1)
if not trades:
logger.error(f"[{symbol}] 无法获取起始数据")
return
earliest = trades[0]["a"]
logger.info(f"[{symbol}] 当前最新agg_id: {earliest}")
# 计算目标:往前补到多少天前
if target_days:
target_ts = int((time.time() - target_days * 86400) * 1000)
else:
target_ts = 0 # 拉到最早
logger.info(f"[{symbol}] 开始回补当前最早agg_id={earliest}")
if target_days:
target_dt = datetime.fromtimestamp(target_ts / 1000, tz=timezone.utc)
logger.info(f"[{symbol}] 目标:回补到 {target_dt.strftime('%Y-%m-%d')} ({target_days}天前)")
total_inserted = 0
total_requests = 0
rate_limit_hits = 0
current_from_id = earliest
while True:
# 从current_from_id向前拉先拉current_from_id - BATCH_SIZE*2的位置
# Binance fromId是从该id开始正向拉所以我们用endTime方式
# 更可靠用fromId = current_from_id - BATCH_SIZE然后正向拉
fetch_from = max(0, current_from_id - BATCH_SIZE)
trades = fetch_agg_trades(symbol, from_id=fetch_from, limit=BATCH_SIZE)
total_requests += 1
if not trades:
logger.info(f"[{symbol}] 无更多数据,回补结束")
break
# 过滤掉已有的(>= earliest的数据实时采集器已覆盖
new_trades = [t for t in trades if t["a"] < current_from_id]
if not new_trades:
# 没有更早的数据了
logger.info(f"[{symbol}] 已到达最早数据,回补结束")
break
inserted = write_batch(conn, symbol, new_trades)
total_inserted += inserted
oldest = min(new_trades, key=lambda x: x["a"])
oldest_time = datetime.fromtimestamp(oldest["T"] / 1000, tz=timezone.utc)
current_from_id = oldest["a"]
# 更新meta
update_earliest_meta(conn, symbol, oldest["a"], oldest["T"])
# 进度日志每50批打一次
if total_requests % 50 == 0:
logger.info(
f"[{symbol}] 进度: {total_inserted:,} 条已插入, "
f"当前位置: {oldest_time.strftime('%Y-%m-%d %H:%M')}, "
f"agg_id={current_from_id:,}, "
f"请求数={total_requests}, 429次数={rate_limit_hits}"
)
# 检查是否到达目标时间
if target_ts and oldest["T"] <= target_ts:
logger.info(f"[{symbol}] 已达到目标时间,回补结束")
break
time.sleep(SLEEP_MS / 1000)
conn.close()
logger.info(
f"[{symbol}] 回补完成: "
f"总插入={total_inserted:,}, 总请求={total_requests}, "
f"429次数={rate_limit_hits}"
)
return total_inserted, total_requests, rate_limit_hits
def check_continuity(symbol: str):
"""检查agg_id连续性"""
conn = get_conn()
tables = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'agg_trades_2%' ORDER BY name"
).fetchall()
all_ids = []
for t in tables:
rows = conn.execute(
f"SELECT agg_id FROM {t['name']} WHERE symbol = ? ORDER BY agg_id",
(symbol,)
).fetchall()
all_ids.extend(r["agg_id"] for r in rows)
conn.close()
if not all_ids:
logger.info(f"[{symbol}] 无数据")
return
all_ids.sort()
gaps = 0
gap_ranges = []
for i in range(1, len(all_ids)):
diff = all_ids[i] - all_ids[i-1]
if diff > 1:
gaps += 1
if len(gap_ranges) < 10:
gap_ranges.append((all_ids[i-1], all_ids[i], diff - 1))
total = len(all_ids)
span = all_ids[-1] - all_ids[0] + 1
coverage = total / span * 100 if span > 0 else 0
logger.info(
f"[{symbol}] 连续性检查: "
f"总条数={total:,}, ID范围={all_ids[0]:,}~{all_ids[-1]:,}, "
f"理论条数={span:,}, 覆盖率={coverage:.2f}%, 缺口数={gaps}"
)
if gap_ranges:
for start, end, missing in gap_ranges[:5]:
logger.info(f" 缺口: {start}{end} (缺{missing}条)")
# ─── CLI ─────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Backfill aggTrades from Binance")
parser.add_argument("--days", type=int, default=None, help="回补天数(默认全量)")
parser.add_argument("--symbol", type=str, default=None, help="指定symbol默认BTC+ETH")
parser.add_argument("--check", action="store_true", help="仅检查连续性")
args = parser.parse_args()
symbols = [args.symbol] if args.symbol else ["BTCUSDT", "ETHUSDT"]
if args.check:
for sym in symbols:
check_continuity(sym)
return
logger.info(f"=== 开始回补 === symbols={symbols}, days={args.days or '全量'}")
for sym in symbols:
logger.info(f"--- 回补 {sym} ---")
result = backfill_symbol(sym, target_days=args.days)
if result:
inserted, reqs, limits = result
logger.info(f"[{sym}] 结果: 插入{inserted:,}条, {reqs}次请求, {limits}次429")
logger.info("=== 回补完成,开始连续性检查 ===")
for sym in symbols:
check_continuity(sym)
logger.info("=== 全部完成 ===")
if __name__ == "__main__":
main()