""" backfill_agg_trades.py — 历史aggTrades回补脚本 功能: - 从当前DB最早agg_id向历史方向回补 - Binance REST API分页拉取,每次1000条 - INSERT OR IGNORE写入按月分表 - 断点续传:记录进度到agg_trades_meta - 速率控制:sleep 200ms/请求,429自动退避 - BTC+ETH并行回补 用法: python3 backfill_agg_trades.py [--days 30] [--symbol BTCUSDT] """ import argparse import logging import os import sqlite3 import sys import time from datetime import datetime, timezone import requests logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.StreamHandler(), logging.FileHandler(os.path.join(os.path.dirname(__file__), "..", "backfill.log")), ], ) logger = logging.getLogger("backfill") DB_PATH = os.path.join(os.path.dirname(__file__), "..", "arb.db") BINANCE_FAPI = "https://fapi.binance.com/fapi/v1" HEADERS = {"User-Agent": "Mozilla/5.0 ArbitrageEngine/backfill"} BATCH_SIZE = 1000 SLEEP_MS = 200 # 毫秒/请求 # ─── DB helpers ────────────────────────────────────────────────── def get_conn() -> sqlite3.Connection: conn = sqlite3.connect(DB_PATH, timeout=30) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA synchronous=NORMAL") return conn def table_name(ts_ms: int) -> str: dt = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc) return f"agg_trades_{dt.strftime('%Y%m')}" def ensure_table(conn: sqlite3.Connection, tname: str): conn.execute(f""" CREATE TABLE IF NOT EXISTS {tname} ( agg_id INTEGER PRIMARY KEY, symbol TEXT NOT NULL, price REAL NOT NULL, qty REAL NOT NULL, time_ms INTEGER NOT NULL, is_buyer_maker INTEGER NOT NULL ) """) conn.execute(f"CREATE INDEX IF NOT EXISTS idx_{tname}_symbol_time ON {tname}(symbol, time_ms)") def ensure_meta(conn: sqlite3.Connection): conn.execute(""" CREATE TABLE IF NOT EXISTS agg_trades_meta ( symbol TEXT PRIMARY KEY, last_agg_id INTEGER, last_time_ms INTEGER, earliest_agg_id INTEGER, earliest_time_ms INTEGER ) """) # 添加earliest字段(如果旧表没有) try: conn.execute("ALTER TABLE agg_trades_meta ADD COLUMN earliest_agg_id INTEGER") conn.execute("ALTER TABLE agg_trades_meta ADD COLUMN earliest_time_ms INTEGER") except Exception: pass # 已存在 def get_earliest_agg_id(conn: sqlite3.Connection, symbol: str) -> int | None: """查找DB中该symbol的最小agg_id""" # 先查meta表 row = conn.execute( "SELECT earliest_agg_id FROM agg_trades_meta WHERE symbol = ?", (symbol,) ).fetchone() if row and row["earliest_agg_id"]: return row["earliest_agg_id"] # meta表没有,扫所有月表 tables = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'agg_trades_2%'" ).fetchall() min_id = None for t in tables: r = conn.execute( f"SELECT MIN(agg_id) as mid FROM {t['name']} WHERE symbol = ?", (symbol,) ).fetchone() if r and r["mid"] is not None: if min_id is None or r["mid"] < min_id: min_id = r["mid"] return min_id def update_earliest_meta(conn: sqlite3.Connection, symbol: str, agg_id: int, time_ms: int): # 先检查是否已有该symbol的记录 row = conn.execute("SELECT symbol FROM agg_trades_meta WHERE symbol = ?", (symbol,)).fetchone() if row: conn.execute(""" UPDATE agg_trades_meta SET earliest_agg_id = MIN(?, COALESCE(earliest_agg_id, ?)), earliest_time_ms = MIN(?, COALESCE(earliest_time_ms, ?)) WHERE symbol = ? """, (agg_id, agg_id, time_ms, time_ms, symbol)) else: conn.execute(""" INSERT INTO agg_trades_meta (symbol, last_agg_id, last_time_ms, earliest_agg_id, earliest_time_ms) VALUES (?, ?, ?, ?, ?) """, (symbol, agg_id, time_ms, agg_id, time_ms)) conn.commit() # ─── REST API ──────────────────────────────────────────────────── def fetch_agg_trades(symbol: str, from_id: int | None = None, start_time: int | None = None, limit: int = 1000) -> list[dict]: """从Binance拉取aggTrades,支持fromId和startTime""" params = {"symbol": symbol, "limit": limit} if from_id is not None: params["fromId"] = from_id elif start_time is not None: params["startTime"] = start_time for attempt in range(5): try: r = requests.get( f"{BINANCE_FAPI}/aggTrades", params=params, headers=HEADERS, timeout=15 ) if r.status_code == 429: wait = min(60 * (2 ** attempt), 300) logger.warning(f"Rate limited (429), waiting {wait}s (attempt {attempt+1})") time.sleep(wait) continue if r.status_code != 200: logger.error(f"HTTP {r.status_code}: {r.text[:200]}") time.sleep(2) continue return r.json() except requests.exceptions.RequestException as e: logger.error(f"Request error: {e}, retrying in {2 ** attempt}s") time.sleep(2 ** attempt) return [] def write_batch(conn: sqlite3.Connection, symbol: str, trades: list[dict]) -> int: """批量写入,返回实际插入条数""" inserted = 0 tables_seen = set() for t in trades: tname = table_name(t["T"]) if tname not in tables_seen: ensure_table(conn, tname) tables_seen.add(tname) try: conn.execute( f"INSERT OR IGNORE INTO {tname} (agg_id, symbol, price, qty, time_ms, is_buyer_maker) " f"VALUES (?, ?, ?, ?, ?, ?)", (t["a"], symbol, float(t["p"]), float(t["q"]), t["T"], 1 if t["m"] else 0) ) inserted += 1 except sqlite3.IntegrityError: pass conn.commit() return inserted # ─── 主逻辑 ────────────────────────────────────────────────────── def backfill_symbol(symbol: str, target_days: int | None = None): """回补单个symbol的历史数据""" conn = get_conn() ensure_meta(conn) earliest = get_earliest_agg_id(conn, symbol) if earliest is None: logger.info(f"[{symbol}] DB无数据,从最新开始拉最近的数据确定起点") trades = fetch_agg_trades(symbol, limit=1) if not trades: logger.error(f"[{symbol}] 无法获取起始数据") return earliest = trades[0]["a"] logger.info(f"[{symbol}] 当前最新agg_id: {earliest}") # 计算目标:往前补到多少天前 if target_days: target_ts = int((time.time() - target_days * 86400) * 1000) else: target_ts = 0 # 拉到最早 logger.info(f"[{symbol}] 开始回补,当前最早agg_id={earliest}") if target_days: target_dt = datetime.fromtimestamp(target_ts / 1000, tz=timezone.utc) logger.info(f"[{symbol}] 目标:回补到 {target_dt.strftime('%Y-%m-%d')} ({target_days}天前)") total_inserted = 0 total_requests = 0 rate_limit_hits = 0 current_from_id = earliest while True: # 从current_from_id向前拉:先拉current_from_id - BATCH_SIZE*2的位置 # Binance fromId是从该id开始正向拉,所以我们用endTime方式 # 更可靠:用fromId = current_from_id - BATCH_SIZE,然后正向拉 fetch_from = max(0, current_from_id - BATCH_SIZE) trades = fetch_agg_trades(symbol, from_id=fetch_from, limit=BATCH_SIZE) total_requests += 1 if not trades: logger.info(f"[{symbol}] 无更多数据,回补结束") break # 过滤掉已有的(>= earliest的数据实时采集器已覆盖) new_trades = [t for t in trades if t["a"] < current_from_id] if not new_trades: # 没有更早的数据了 logger.info(f"[{symbol}] 已到达最早数据,回补结束") break inserted = write_batch(conn, symbol, new_trades) total_inserted += inserted oldest = min(new_trades, key=lambda x: x["a"]) oldest_time = datetime.fromtimestamp(oldest["T"] / 1000, tz=timezone.utc) current_from_id = oldest["a"] # 更新meta update_earliest_meta(conn, symbol, oldest["a"], oldest["T"]) # 进度日志(每50批打一次) if total_requests % 50 == 0: logger.info( f"[{symbol}] 进度: {total_inserted:,} 条已插入, " f"当前位置: {oldest_time.strftime('%Y-%m-%d %H:%M')}, " f"agg_id={current_from_id:,}, " f"请求数={total_requests}, 429次数={rate_limit_hits}" ) # 检查是否到达目标时间 if target_ts and oldest["T"] <= target_ts: logger.info(f"[{symbol}] 已达到目标时间,回补结束") break time.sleep(SLEEP_MS / 1000) conn.close() logger.info( f"[{symbol}] 回补完成: " f"总插入={total_inserted:,}, 总请求={total_requests}, " f"429次数={rate_limit_hits}" ) return total_inserted, total_requests, rate_limit_hits def check_continuity(symbol: str): """检查agg_id连续性""" conn = get_conn() tables = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'agg_trades_2%' ORDER BY name" ).fetchall() all_ids = [] for t in tables: rows = conn.execute( f"SELECT agg_id FROM {t['name']} WHERE symbol = ? ORDER BY agg_id", (symbol,) ).fetchall() all_ids.extend(r["agg_id"] for r in rows) conn.close() if not all_ids: logger.info(f"[{symbol}] 无数据") return all_ids.sort() gaps = 0 gap_ranges = [] for i in range(1, len(all_ids)): diff = all_ids[i] - all_ids[i-1] if diff > 1: gaps += 1 if len(gap_ranges) < 10: gap_ranges.append((all_ids[i-1], all_ids[i], diff - 1)) total = len(all_ids) span = all_ids[-1] - all_ids[0] + 1 coverage = total / span * 100 if span > 0 else 0 logger.info( f"[{symbol}] 连续性检查: " f"总条数={total:,}, ID范围={all_ids[0]:,}~{all_ids[-1]:,}, " f"理论条数={span:,}, 覆盖率={coverage:.2f}%, 缺口数={gaps}" ) if gap_ranges: for start, end, missing in gap_ranges[:5]: logger.info(f" 缺口: {start} → {end} (缺{missing}条)") # ─── CLI ───────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Backfill aggTrades from Binance") parser.add_argument("--days", type=int, default=None, help="回补天数(默认全量)") parser.add_argument("--symbol", type=str, default=None, help="指定symbol(默认BTC+ETH)") parser.add_argument("--check", action="store_true", help="仅检查连续性") args = parser.parse_args() symbols = [args.symbol] if args.symbol else ["BTCUSDT", "ETHUSDT"] if args.check: for sym in symbols: check_continuity(sym) return logger.info(f"=== 开始回补 === symbols={symbols}, days={args.days or '全量'}") for sym in symbols: logger.info(f"--- 回补 {sym} ---") result = backfill_symbol(sym, target_days=args.days) if result: inserted, reqs, limits = result logger.info(f"[{sym}] 结果: 插入{inserted:,}条, {reqs}次请求, {limits}次429") logger.info("=== 回补完成,开始连续性检查 ===") for sym in symbols: check_continuity(sym) logger.info("=== 全部完成 ===") if __name__ == "__main__": main()