210 lines
7.7 KiB
Python
210 lines
7.7 KiB
Python
"""
|
||
backfill_agg_trades.py — 历史aggTrades回补脚本(PostgreSQL版)
|
||
"""
|
||
|
||
import argparse
|
||
import logging
|
||
import os
|
||
import sys
|
||
import time
|
||
from datetime import datetime, timezone
|
||
|
||
import requests
|
||
import psycopg2.extras
|
||
|
||
from db import get_sync_conn, init_schema, ensure_partitions
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||
handlers=[
|
||
logging.StreamHandler(),
|
||
logging.FileHandler(os.path.join(os.path.dirname(__file__), "..", "backfill.log")),
|
||
],
|
||
)
|
||
logger = logging.getLogger("backfill")
|
||
|
||
BINANCE_FAPI = "https://fapi.binance.com/fapi/v1"
|
||
HEADERS = {"User-Agent": "Mozilla/5.0 ArbitrageEngine/backfill"}
|
||
BATCH_SIZE = 1000
|
||
SLEEP_MS = 500 # 平衡速度与CPU负载
|
||
|
||
|
||
# ─── DB helpers ──────────────────────────────────────────────────
|
||
|
||
def get_earliest_agg_id(symbol: str) -> int | None:
|
||
with get_sync_conn() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute("SELECT earliest_agg_id FROM agg_trades_meta WHERE symbol = %s", (symbol,))
|
||
row = cur.fetchone()
|
||
if row and row[0]:
|
||
return row[0]
|
||
# fallback: scan agg_trades
|
||
cur.execute("SELECT MIN(agg_id) FROM agg_trades WHERE symbol = %s", (symbol,))
|
||
row = cur.fetchone()
|
||
return row[0] if row else None
|
||
|
||
|
||
def update_earliest_meta(symbol: str, agg_id: int, time_ms: int):
|
||
with get_sync_conn() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute("SELECT symbol FROM agg_trades_meta WHERE symbol = %s", (symbol,))
|
||
if cur.fetchone():
|
||
cur.execute("""
|
||
UPDATE agg_trades_meta SET
|
||
earliest_agg_id = LEAST(%s, COALESCE(earliest_agg_id, %s)),
|
||
earliest_time_ms = LEAST(%s, COALESCE(earliest_time_ms, %s))
|
||
WHERE symbol = %s
|
||
""", (agg_id, agg_id, time_ms, time_ms, symbol))
|
||
else:
|
||
cur.execute("""
|
||
INSERT INTO agg_trades_meta (symbol, last_agg_id, last_time_ms, earliest_agg_id, earliest_time_ms)
|
||
VALUES (%s, %s, %s, %s, %s)
|
||
""", (symbol, agg_id, time_ms, agg_id, time_ms))
|
||
conn.commit()
|
||
|
||
|
||
# ─── REST API ────────────────────────────────────────────────────
|
||
|
||
def fetch_agg_trades(symbol: str, from_id: int | None = None, limit: int = 1000) -> list:
|
||
params = {"symbol": symbol, "limit": limit}
|
||
if from_id is not None:
|
||
params["fromId"] = from_id
|
||
|
||
for attempt in range(5):
|
||
try:
|
||
r = requests.get(f"{BINANCE_FAPI}/aggTrades", params=params, headers=HEADERS, timeout=15)
|
||
if r.status_code == 429:
|
||
wait = min(60 * (2 ** attempt), 300)
|
||
logger.warning(f"Rate limited (429), waiting {wait}s (attempt {attempt+1})")
|
||
time.sleep(wait)
|
||
continue
|
||
if r.status_code != 200:
|
||
logger.error(f"HTTP {r.status_code}: {r.text[:200]}")
|
||
time.sleep(2)
|
||
continue
|
||
return r.json()
|
||
except requests.exceptions.RequestException as e:
|
||
logger.error(f"Request error: {e}, retrying in {2 ** attempt}s")
|
||
time.sleep(2 ** attempt)
|
||
return []
|
||
|
||
|
||
def write_batch(symbol: str, trades: list) -> int:
|
||
if not trades:
|
||
return 0
|
||
ensure_partitions()
|
||
values = [(t["a"], symbol, float(t["p"]), float(t["q"]), t["T"], 1 if t["m"] else 0) for t in trades]
|
||
with get_sync_conn() as conn:
|
||
with conn.cursor() as cur:
|
||
psycopg2.extras.execute_values(
|
||
cur,
|
||
"INSERT INTO agg_trades (agg_id, symbol, price, qty, time_ms, is_buyer_maker) "
|
||
"VALUES %s ON CONFLICT (time_ms, symbol, agg_id) DO NOTHING",
|
||
values,
|
||
template="(%s, %s, %s, %s, %s, %s)",
|
||
page_size=1000,
|
||
)
|
||
inserted = cur.rowcount
|
||
conn.commit()
|
||
return inserted
|
||
|
||
|
||
# ─── 主逻辑 ──────────────────────────────────────────────────────
|
||
|
||
def backfill_symbol(symbol: str, target_days: int | None = None):
|
||
earliest = get_earliest_agg_id(symbol)
|
||
if earliest is None:
|
||
logger.info(f"[{symbol}] DB无数据,拉最新确定起点")
|
||
trades = fetch_agg_trades(symbol, limit=1)
|
||
if not trades:
|
||
logger.error(f"[{symbol}] 无法获取起始数据")
|
||
return
|
||
earliest = trades[0]["a"]
|
||
|
||
target_ts = int((time.time() - target_days * 86400) * 1000) if target_days else 0
|
||
|
||
logger.info(f"[{symbol}] 开始回补,当前最早agg_id={earliest}")
|
||
total_inserted = 0
|
||
total_requests = 0
|
||
current_from_id = earliest
|
||
|
||
while True:
|
||
fetch_from = max(0, current_from_id - BATCH_SIZE)
|
||
trades = fetch_agg_trades(symbol, from_id=fetch_from, limit=BATCH_SIZE)
|
||
total_requests += 1
|
||
|
||
if not trades:
|
||
logger.info(f"[{symbol}] 无更多数据,回补结束")
|
||
break
|
||
|
||
new_trades = [t for t in trades if t["a"] < current_from_id]
|
||
if not new_trades:
|
||
logger.info(f"[{symbol}] 已到达最早数据,回补结束")
|
||
break
|
||
|
||
inserted = write_batch(symbol, new_trades)
|
||
total_inserted += inserted
|
||
|
||
oldest = min(new_trades, key=lambda x: x["a"])
|
||
oldest_time = datetime.fromtimestamp(oldest["T"] / 1000, tz=timezone.utc)
|
||
current_from_id = oldest["a"]
|
||
update_earliest_meta(symbol, oldest["a"], oldest["T"])
|
||
|
||
if total_requests % 50 == 0:
|
||
logger.info(
|
||
f"[{symbol}] 进度: {total_inserted:,} 条已插入, "
|
||
f"当前位置: {oldest_time.strftime('%Y-%m-%d %H:%M')}, agg_id={current_from_id:,}, 请求数={total_requests}"
|
||
)
|
||
|
||
if target_ts and oldest["T"] <= target_ts:
|
||
logger.info(f"[{symbol}] 已达到目标时间,回补结束")
|
||
break
|
||
|
||
time.sleep(SLEEP_MS / 1000)
|
||
|
||
logger.info(f"[{symbol}] 回补完成: 总插入={total_inserted:,}, 总请求={total_requests}")
|
||
|
||
|
||
def check_continuity(symbol: str):
|
||
with get_sync_conn() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute("SELECT COUNT(*), MIN(agg_id), MAX(agg_id) FROM agg_trades WHERE symbol = %s", (symbol,))
|
||
row = cur.fetchone()
|
||
total, min_id, max_id = row
|
||
if not total:
|
||
logger.info(f"[{symbol}] 无数据")
|
||
return
|
||
span = max_id - min_id + 1
|
||
coverage = total / span * 100 if span > 0 else 0
|
||
logger.info(f"[{symbol}] 总条数={total:,}, ID范围={min_id:,}~{max_id:,}, 覆盖率={coverage:.2f}%")
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument("--days", type=int, default=None)
|
||
parser.add_argument("--symbol", type=str, default=None)
|
||
parser.add_argument("--check", action="store_true")
|
||
args = parser.parse_args()
|
||
|
||
init_schema()
|
||
symbols = [args.symbol] if args.symbol else ["BTCUSDT", "ETHUSDT"]
|
||
|
||
if args.check:
|
||
for sym in symbols:
|
||
check_continuity(sym)
|
||
return
|
||
|
||
logger.info(f"=== 开始回补 === symbols={symbols}, days={args.days or '全量'}")
|
||
for sym in symbols:
|
||
backfill_symbol(sym, target_days=args.days)
|
||
|
||
logger.info("=== 连续性检查 ===")
|
||
for sym in symbols:
|
||
check_continuity(sym)
|
||
logger.info("=== 全部完成 ===")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|