arbitrage-engine/backend/backfill_agg_trades.py

210 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
backfill_agg_trades.py — 历史aggTrades回补脚本PostgreSQL版
"""
import argparse
import logging
import os
import sys
import time
from datetime import datetime, timezone
import requests
import psycopg2.extras
from db import get_sync_conn, init_schema, ensure_partitions
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler(os.path.join(os.path.dirname(__file__), "..", "backfill.log")),
],
)
logger = logging.getLogger("backfill")
BINANCE_FAPI = "https://fapi.binance.com/fapi/v1"
HEADERS = {"User-Agent": "Mozilla/5.0 ArbitrageEngine/backfill"}
BATCH_SIZE = 1000
SLEEP_MS = 500 # 平衡速度与CPU负载
# ─── DB helpers ──────────────────────────────────────────────────
def get_earliest_agg_id(symbol: str) -> int | None:
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT earliest_agg_id FROM agg_trades_meta WHERE symbol = %s", (symbol,))
row = cur.fetchone()
if row and row[0]:
return row[0]
# fallback: scan agg_trades
cur.execute("SELECT MIN(agg_id) FROM agg_trades WHERE symbol = %s", (symbol,))
row = cur.fetchone()
return row[0] if row else None
def update_earliest_meta(symbol: str, agg_id: int, time_ms: int):
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT symbol FROM agg_trades_meta WHERE symbol = %s", (symbol,))
if cur.fetchone():
cur.execute("""
UPDATE agg_trades_meta SET
earliest_agg_id = LEAST(%s, COALESCE(earliest_agg_id, %s)),
earliest_time_ms = LEAST(%s, COALESCE(earliest_time_ms, %s))
WHERE symbol = %s
""", (agg_id, agg_id, time_ms, time_ms, symbol))
else:
cur.execute("""
INSERT INTO agg_trades_meta (symbol, last_agg_id, last_time_ms, earliest_agg_id, earliest_time_ms)
VALUES (%s, %s, %s, %s, %s)
""", (symbol, agg_id, time_ms, agg_id, time_ms))
conn.commit()
# ─── REST API ────────────────────────────────────────────────────
def fetch_agg_trades(symbol: str, from_id: int | None = None, limit: int = 1000) -> list:
params = {"symbol": symbol, "limit": limit}
if from_id is not None:
params["fromId"] = from_id
for attempt in range(5):
try:
r = requests.get(f"{BINANCE_FAPI}/aggTrades", params=params, headers=HEADERS, timeout=15)
if r.status_code == 429:
wait = min(60 * (2 ** attempt), 300)
logger.warning(f"Rate limited (429), waiting {wait}s (attempt {attempt+1})")
time.sleep(wait)
continue
if r.status_code != 200:
logger.error(f"HTTP {r.status_code}: {r.text[:200]}")
time.sleep(2)
continue
return r.json()
except requests.exceptions.RequestException as e:
logger.error(f"Request error: {e}, retrying in {2 ** attempt}s")
time.sleep(2 ** attempt)
return []
def write_batch(symbol: str, trades: list) -> int:
if not trades:
return 0
ensure_partitions()
values = [(t["a"], symbol, float(t["p"]), float(t["q"]), t["T"], 1 if t["m"] else 0) for t in trades]
with get_sync_conn() as conn:
with conn.cursor() as cur:
psycopg2.extras.execute_values(
cur,
"INSERT INTO agg_trades (agg_id, symbol, price, qty, time_ms, is_buyer_maker) "
"VALUES %s ON CONFLICT (time_ms, symbol, agg_id) DO NOTHING",
values,
template="(%s, %s, %s, %s, %s, %s)",
page_size=1000,
)
inserted = cur.rowcount
conn.commit()
return inserted
# ─── 主逻辑 ──────────────────────────────────────────────────────
def backfill_symbol(symbol: str, target_days: int | None = None):
earliest = get_earliest_agg_id(symbol)
if earliest is None:
logger.info(f"[{symbol}] DB无数据拉最新确定起点")
trades = fetch_agg_trades(symbol, limit=1)
if not trades:
logger.error(f"[{symbol}] 无法获取起始数据")
return
earliest = trades[0]["a"]
target_ts = int((time.time() - target_days * 86400) * 1000) if target_days else 0
logger.info(f"[{symbol}] 开始回补当前最早agg_id={earliest}")
total_inserted = 0
total_requests = 0
current_from_id = earliest
while True:
fetch_from = max(0, current_from_id - BATCH_SIZE)
trades = fetch_agg_trades(symbol, from_id=fetch_from, limit=BATCH_SIZE)
total_requests += 1
if not trades:
logger.info(f"[{symbol}] 无更多数据,回补结束")
break
new_trades = [t for t in trades if t["a"] < current_from_id]
if not new_trades:
logger.info(f"[{symbol}] 已到达最早数据,回补结束")
break
inserted = write_batch(symbol, new_trades)
total_inserted += inserted
oldest = min(new_trades, key=lambda x: x["a"])
oldest_time = datetime.fromtimestamp(oldest["T"] / 1000, tz=timezone.utc)
current_from_id = oldest["a"]
update_earliest_meta(symbol, oldest["a"], oldest["T"])
if total_requests % 50 == 0:
logger.info(
f"[{symbol}] 进度: {total_inserted:,} 条已插入, "
f"当前位置: {oldest_time.strftime('%Y-%m-%d %H:%M')}, agg_id={current_from_id:,}, 请求数={total_requests}"
)
if target_ts and oldest["T"] <= target_ts:
logger.info(f"[{symbol}] 已达到目标时间,回补结束")
break
time.sleep(SLEEP_MS / 1000)
logger.info(f"[{symbol}] 回补完成: 总插入={total_inserted:,}, 总请求={total_requests}")
def check_continuity(symbol: str):
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*), MIN(agg_id), MAX(agg_id) FROM agg_trades WHERE symbol = %s", (symbol,))
row = cur.fetchone()
total, min_id, max_id = row
if not total:
logger.info(f"[{symbol}] 无数据")
return
span = max_id - min_id + 1
coverage = total / span * 100 if span > 0 else 0
logger.info(f"[{symbol}] 总条数={total:,}, ID范围={min_id:,}~{max_id:,}, 覆盖率={coverage:.2f}%")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--days", type=int, default=None)
parser.add_argument("--symbol", type=str, default=None)
parser.add_argument("--check", action="store_true")
args = parser.parse_args()
init_schema()
symbols = [args.symbol] if args.symbol else ["BTCUSDT", "ETHUSDT"]
if args.check:
for sym in symbols:
check_continuity(sym)
return
logger.info(f"=== 开始回补 === symbols={symbols}, days={args.days or '全量'}")
for sym in symbols:
backfill_symbol(sym, target_days=args.days)
logger.info("=== 连续性检查 ===")
for sym in symbols:
check_continuity(sym)
logger.info("=== 全部完成 ===")
if __name__ == "__main__":
main()