diff --git a/backend/backfill_agg_trades.py b/backend/backfill_agg_trades.py new file mode 100644 index 0000000..0be8463 --- /dev/null +++ b/backend/backfill_agg_trades.py @@ -0,0 +1,348 @@ +""" +backfill_agg_trades.py — 历史aggTrades回补脚本 + +功能: + - 从当前DB最早agg_id向历史方向回补 + - Binance REST API分页拉取,每次1000条 + - INSERT OR IGNORE写入按月分表 + - 断点续传:记录进度到agg_trades_meta + - 速率控制:sleep 200ms/请求,429自动退避 + - BTC+ETH并行回补 + +用法: + python3 backfill_agg_trades.py [--days 30] [--symbol BTCUSDT] +""" + +import argparse +import logging +import os +import sqlite3 +import sys +import time +from datetime import datetime, timezone + +import requests + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[ + logging.StreamHandler(), + logging.FileHandler(os.path.join(os.path.dirname(__file__), "..", "backfill.log")), + ], +) +logger = logging.getLogger("backfill") + +DB_PATH = os.path.join(os.path.dirname(__file__), "..", "arb.db") +BINANCE_FAPI = "https://fapi.binance.com/fapi/v1" +HEADERS = {"User-Agent": "Mozilla/5.0 ArbitrageEngine/backfill"} +BATCH_SIZE = 1000 +SLEEP_MS = 200 # 毫秒/请求 + + +# ─── DB helpers ────────────────────────────────────────────────── + +def get_conn() -> sqlite3.Connection: + conn = sqlite3.connect(DB_PATH, timeout=30) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA synchronous=NORMAL") + return conn + + +def table_name(ts_ms: int) -> str: + dt = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc) + return f"agg_trades_{dt.strftime('%Y%m')}" + + +def ensure_table(conn: sqlite3.Connection, tname: str): + conn.execute(f""" + CREATE TABLE IF NOT EXISTS {tname} ( + agg_id INTEGER PRIMARY KEY, + symbol TEXT NOT NULL, + price REAL NOT NULL, + qty REAL NOT NULL, + time_ms INTEGER NOT NULL, + is_buyer_maker INTEGER NOT NULL + ) + """) + conn.execute(f"CREATE INDEX IF NOT EXISTS idx_{tname}_symbol_time ON {tname}(symbol, time_ms)") + + +def ensure_meta(conn: sqlite3.Connection): + conn.execute(""" + CREATE TABLE IF NOT EXISTS agg_trades_meta ( + symbol TEXT PRIMARY KEY, + last_agg_id INTEGER, + last_time_ms INTEGER, + earliest_agg_id INTEGER, + earliest_time_ms INTEGER + ) + """) + # 添加earliest字段(如果旧表没有) + try: + conn.execute("ALTER TABLE agg_trades_meta ADD COLUMN earliest_agg_id INTEGER") + conn.execute("ALTER TABLE agg_trades_meta ADD COLUMN earliest_time_ms INTEGER") + except Exception: + pass # 已存在 + + +def get_earliest_agg_id(conn: sqlite3.Connection, symbol: str) -> int | None: + """查找DB中该symbol的最小agg_id""" + # 先查meta表 + row = conn.execute( + "SELECT earliest_agg_id FROM agg_trades_meta WHERE symbol = ?", (symbol,) + ).fetchone() + if row and row["earliest_agg_id"]: + return row["earliest_agg_id"] + + # meta表没有,扫所有月表 + tables = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'agg_trades_2%'" + ).fetchall() + min_id = None + for t in tables: + r = conn.execute( + f"SELECT MIN(agg_id) as mid FROM {t['name']} WHERE symbol = ?", (symbol,) + ).fetchone() + if r and r["mid"] is not None: + if min_id is None or r["mid"] < min_id: + min_id = r["mid"] + return min_id + + +def update_earliest_meta(conn: sqlite3.Connection, symbol: str, agg_id: int, time_ms: int): + conn.execute(""" + INSERT INTO agg_trades_meta (symbol, earliest_agg_id, earliest_time_ms) + VALUES (?, ?, ?) + ON CONFLICT(symbol) DO UPDATE SET + earliest_agg_id = MIN(excluded.earliest_agg_id, COALESCE(agg_trades_meta.earliest_agg_id, excluded.earliest_agg_id)), + earliest_time_ms = MIN(excluded.earliest_time_ms, COALESCE(agg_trades_meta.earliest_time_ms, excluded.earliest_time_ms)) + """, (symbol, agg_id, time_ms)) + conn.commit() + + +# ─── REST API ──────────────────────────────────────────────────── + +def fetch_agg_trades(symbol: str, from_id: int | None = None, + start_time: int | None = None, limit: int = 1000) -> list[dict]: + """从Binance拉取aggTrades,支持fromId和startTime""" + params = {"symbol": symbol, "limit": limit} + if from_id is not None: + params["fromId"] = from_id + elif start_time is not None: + params["startTime"] = start_time + + for attempt in range(5): + try: + r = requests.get( + f"{BINANCE_FAPI}/aggTrades", + params=params, headers=HEADERS, timeout=15 + ) + if r.status_code == 429: + wait = min(60 * (2 ** attempt), 300) + logger.warning(f"Rate limited (429), waiting {wait}s (attempt {attempt+1})") + time.sleep(wait) + continue + if r.status_code != 200: + logger.error(f"HTTP {r.status_code}: {r.text[:200]}") + time.sleep(2) + continue + return r.json() + except requests.exceptions.RequestException as e: + logger.error(f"Request error: {e}, retrying in {2 ** attempt}s") + time.sleep(2 ** attempt) + return [] + + +def write_batch(conn: sqlite3.Connection, symbol: str, trades: list[dict]) -> int: + """批量写入,返回实际插入条数""" + inserted = 0 + tables_seen = set() + for t in trades: + tname = table_name(t["T"]) + if tname not in tables_seen: + ensure_table(conn, tname) + tables_seen.add(tname) + try: + conn.execute( + f"INSERT OR IGNORE INTO {tname} (agg_id, symbol, price, qty, time_ms, is_buyer_maker) " + f"VALUES (?, ?, ?, ?, ?, ?)", + (t["a"], symbol, float(t["p"]), float(t["q"]), t["T"], 1 if t["m"] else 0) + ) + inserted += 1 + except sqlite3.IntegrityError: + pass + conn.commit() + return inserted + + +# ─── 主逻辑 ────────────────────────────────────────────────────── + +def backfill_symbol(symbol: str, target_days: int | None = None): + """回补单个symbol的历史数据""" + conn = get_conn() + ensure_meta(conn) + + earliest = get_earliest_agg_id(conn, symbol) + if earliest is None: + logger.info(f"[{symbol}] DB无数据,从最新开始拉最近的数据确定起点") + trades = fetch_agg_trades(symbol, limit=1) + if not trades: + logger.error(f"[{symbol}] 无法获取起始数据") + return + earliest = trades[0]["a"] + logger.info(f"[{symbol}] 当前最新agg_id: {earliest}") + + # 计算目标:往前补到多少天前 + if target_days: + target_ts = int((time.time() - target_days * 86400) * 1000) + else: + target_ts = 0 # 拉到最早 + + logger.info(f"[{symbol}] 开始回补,当前最早agg_id={earliest}") + if target_days: + target_dt = datetime.fromtimestamp(target_ts / 1000, tz=timezone.utc) + logger.info(f"[{symbol}] 目标:回补到 {target_dt.strftime('%Y-%m-%d')} ({target_days}天前)") + + total_inserted = 0 + total_requests = 0 + rate_limit_hits = 0 + current_from_id = earliest + + while True: + # 从current_from_id向前拉:先拉current_from_id - BATCH_SIZE*2的位置 + # Binance fromId是从该id开始正向拉,所以我们用endTime方式 + # 更可靠:用fromId = current_from_id - BATCH_SIZE,然后正向拉 + fetch_from = max(0, current_from_id - BATCH_SIZE) + + trades = fetch_agg_trades(symbol, from_id=fetch_from, limit=BATCH_SIZE) + total_requests += 1 + + if not trades: + logger.info(f"[{symbol}] 无更多数据,回补结束") + break + + # 过滤掉已有的(>= earliest的数据实时采集器已覆盖) + new_trades = [t for t in trades if t["a"] < current_from_id] + + if not new_trades: + # 没有更早的数据了 + logger.info(f"[{symbol}] 已到达最早数据,回补结束") + break + + inserted = write_batch(conn, symbol, new_trades) + total_inserted += inserted + + oldest = min(new_trades, key=lambda x: x["a"]) + oldest_time = datetime.fromtimestamp(oldest["T"] / 1000, tz=timezone.utc) + current_from_id = oldest["a"] + + # 更新meta + update_earliest_meta(conn, symbol, oldest["a"], oldest["T"]) + + # 进度日志(每50批打一次) + if total_requests % 50 == 0: + logger.info( + f"[{symbol}] 进度: {total_inserted:,} 条已插入, " + f"当前位置: {oldest_time.strftime('%Y-%m-%d %H:%M')}, " + f"agg_id={current_from_id:,}, " + f"请求数={total_requests}, 429次数={rate_limit_hits}" + ) + + # 检查是否到达目标时间 + if target_ts and oldest["T"] <= target_ts: + logger.info(f"[{symbol}] 已达到目标时间,回补结束") + break + + time.sleep(SLEEP_MS / 1000) + + conn.close() + + logger.info( + f"[{symbol}] 回补完成: " + f"总插入={total_inserted:,}, 总请求={total_requests}, " + f"429次数={rate_limit_hits}" + ) + return total_inserted, total_requests, rate_limit_hits + + +def check_continuity(symbol: str): + """检查agg_id连续性""" + conn = get_conn() + tables = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'agg_trades_2%' ORDER BY name" + ).fetchall() + + all_ids = [] + for t in tables: + rows = conn.execute( + f"SELECT agg_id FROM {t['name']} WHERE symbol = ? ORDER BY agg_id", + (symbol,) + ).fetchall() + all_ids.extend(r["agg_id"] for r in rows) + + conn.close() + + if not all_ids: + logger.info(f"[{symbol}] 无数据") + return + + all_ids.sort() + gaps = 0 + gap_ranges = [] + for i in range(1, len(all_ids)): + diff = all_ids[i] - all_ids[i-1] + if diff > 1: + gaps += 1 + if len(gap_ranges) < 10: + gap_ranges.append((all_ids[i-1], all_ids[i], diff - 1)) + + total = len(all_ids) + span = all_ids[-1] - all_ids[0] + 1 + coverage = total / span * 100 if span > 0 else 0 + + logger.info( + f"[{symbol}] 连续性检查: " + f"总条数={total:,}, ID范围={all_ids[0]:,}~{all_ids[-1]:,}, " + f"理论条数={span:,}, 覆盖率={coverage:.2f}%, 缺口数={gaps}" + ) + if gap_ranges: + for start, end, missing in gap_ranges[:5]: + logger.info(f" 缺口: {start} → {end} (缺{missing}条)") + + +# ─── CLI ───────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser(description="Backfill aggTrades from Binance") + parser.add_argument("--days", type=int, default=None, help="回补天数(默认全量)") + parser.add_argument("--symbol", type=str, default=None, help="指定symbol(默认BTC+ETH)") + parser.add_argument("--check", action="store_true", help="仅检查连续性") + args = parser.parse_args() + + symbols = [args.symbol] if args.symbol else ["BTCUSDT", "ETHUSDT"] + + if args.check: + for sym in symbols: + check_continuity(sym) + return + + logger.info(f"=== 开始回补 === symbols={symbols}, days={args.days or '全量'}") + + for sym in symbols: + logger.info(f"--- 回补 {sym} ---") + result = backfill_symbol(sym, target_days=args.days) + if result: + inserted, reqs, limits = result + logger.info(f"[{sym}] 结果: 插入{inserted:,}条, {reqs}次请求, {limits}次429") + + logger.info("=== 回补完成,开始连续性检查 ===") + for sym in symbols: + check_continuity(sym) + + logger.info("=== 全部完成 ===") + + +if __name__ == "__main__": + main()