""" backfill_agg_trades.py — 历史aggTrades回补脚本(PostgreSQL版) """ import argparse import logging import os import sys import time from datetime import datetime, timezone import requests import psycopg2.extras from db import get_sync_conn, init_schema, ensure_partitions logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.StreamHandler(), logging.FileHandler(os.path.join(os.path.dirname(__file__), "..", "backfill.log")), ], ) logger = logging.getLogger("backfill") BINANCE_FAPI = "https://fapi.binance.com/fapi/v1" HEADERS = {"User-Agent": "Mozilla/5.0 ArbitrageEngine/backfill"} BATCH_SIZE = 1000 SLEEP_MS = 200 # PG并发无锁,全速拉取 # ─── DB helpers ────────────────────────────────────────────────── def get_earliest_agg_id(symbol: str) -> int | None: with get_sync_conn() as conn: with conn.cursor() as cur: cur.execute("SELECT earliest_agg_id FROM agg_trades_meta WHERE symbol = %s", (symbol,)) row = cur.fetchone() if row and row[0]: return row[0] # fallback: scan agg_trades cur.execute("SELECT MIN(agg_id) FROM agg_trades WHERE symbol = %s", (symbol,)) row = cur.fetchone() return row[0] if row else None def update_earliest_meta(symbol: str, agg_id: int, time_ms: int): with get_sync_conn() as conn: with conn.cursor() as cur: cur.execute("SELECT symbol FROM agg_trades_meta WHERE symbol = %s", (symbol,)) if cur.fetchone(): cur.execute(""" UPDATE agg_trades_meta SET earliest_agg_id = LEAST(%s, COALESCE(earliest_agg_id, %s)), earliest_time_ms = LEAST(%s, COALESCE(earliest_time_ms, %s)) WHERE symbol = %s """, (agg_id, agg_id, time_ms, time_ms, symbol)) else: cur.execute(""" INSERT INTO agg_trades_meta (symbol, last_agg_id, last_time_ms, earliest_agg_id, earliest_time_ms) VALUES (%s, %s, %s, %s, %s) """, (symbol, agg_id, time_ms, agg_id, time_ms)) conn.commit() # ─── REST API ──────────────────────────────────────────────────── def fetch_agg_trades(symbol: str, from_id: int | None = None, limit: int = 1000) -> list: params = {"symbol": symbol, "limit": limit} if from_id is not None: params["fromId"] = from_id for attempt in range(5): try: r = requests.get(f"{BINANCE_FAPI}/aggTrades", params=params, headers=HEADERS, timeout=15) if r.status_code == 429: wait = min(60 * (2 ** attempt), 300) logger.warning(f"Rate limited (429), waiting {wait}s (attempt {attempt+1})") time.sleep(wait) continue if r.status_code != 200: logger.error(f"HTTP {r.status_code}: {r.text[:200]}") time.sleep(2) continue return r.json() except requests.exceptions.RequestException as e: logger.error(f"Request error: {e}, retrying in {2 ** attempt}s") time.sleep(2 ** attempt) return [] def write_batch(symbol: str, trades: list) -> int: if not trades: return 0 ensure_partitions() values = [(t["a"], symbol, float(t["p"]), float(t["q"]), t["T"], 1 if t["m"] else 0) for t in trades] with get_sync_conn() as conn: with conn.cursor() as cur: psycopg2.extras.execute_values( cur, "INSERT INTO agg_trades (agg_id, symbol, price, qty, time_ms, is_buyer_maker) " "VALUES %s ON CONFLICT (time_ms, symbol, agg_id) DO NOTHING", values, template="(%s, %s, %s, %s, %s, %s)", page_size=1000, ) inserted = cur.rowcount conn.commit() return inserted # ─── 主逻辑 ────────────────────────────────────────────────────── def backfill_symbol(symbol: str, target_days: int | None = None): earliest = get_earliest_agg_id(symbol) if earliest is None: logger.info(f"[{symbol}] DB无数据,拉最新确定起点") trades = fetch_agg_trades(symbol, limit=1) if not trades: logger.error(f"[{symbol}] 无法获取起始数据") return earliest = trades[0]["a"] target_ts = int((time.time() - target_days * 86400) * 1000) if target_days else 0 logger.info(f"[{symbol}] 开始回补,当前最早agg_id={earliest}") total_inserted = 0 total_requests = 0 current_from_id = earliest while True: fetch_from = max(0, current_from_id - BATCH_SIZE) trades = fetch_agg_trades(symbol, from_id=fetch_from, limit=BATCH_SIZE) total_requests += 1 if not trades: logger.info(f"[{symbol}] 无更多数据,回补结束") break new_trades = [t for t in trades if t["a"] < current_from_id] if not new_trades: logger.info(f"[{symbol}] 已到达最早数据,回补结束") break inserted = write_batch(symbol, new_trades) total_inserted += inserted oldest = min(new_trades, key=lambda x: x["a"]) oldest_time = datetime.fromtimestamp(oldest["T"] / 1000, tz=timezone.utc) current_from_id = oldest["a"] update_earliest_meta(symbol, oldest["a"], oldest["T"]) if total_requests % 50 == 0: logger.info( f"[{symbol}] 进度: {total_inserted:,} 条已插入, " f"当前位置: {oldest_time.strftime('%Y-%m-%d %H:%M')}, agg_id={current_from_id:,}, 请求数={total_requests}" ) if target_ts and oldest["T"] <= target_ts: logger.info(f"[{symbol}] 已达到目标时间,回补结束") break time.sleep(SLEEP_MS / 1000) logger.info(f"[{symbol}] 回补完成: 总插入={total_inserted:,}, 总请求={total_requests}") def check_continuity(symbol: str): with get_sync_conn() as conn: with conn.cursor() as cur: cur.execute("SELECT COUNT(*), MIN(agg_id), MAX(agg_id) FROM agg_trades WHERE symbol = %s", (symbol,)) row = cur.fetchone() total, min_id, max_id = row if not total: logger.info(f"[{symbol}] 无数据") return span = max_id - min_id + 1 coverage = total / span * 100 if span > 0 else 0 logger.info(f"[{symbol}] 总条数={total:,}, ID范围={min_id:,}~{max_id:,}, 覆盖率={coverage:.2f}%") def main(): parser = argparse.ArgumentParser() parser.add_argument("--days", type=int, default=None) parser.add_argument("--symbol", type=str, default=None) parser.add_argument("--check", action="store_true") args = parser.parse_args() init_schema() symbols = [args.symbol] if args.symbol else ["BTCUSDT", "ETHUSDT"] if args.check: for sym in symbols: check_continuity(sym) return logger.info(f"=== 开始回补 === symbols={symbols}, days={args.days or '全量'}") for sym in symbols: backfill_symbol(sym, target_days=args.days) logger.info("=== 连续性检查 ===") for sym in symbols: check_continuity(sym) logger.info("=== 全部完成 ===") if __name__ == "__main__": main()