356 lines
12 KiB
Python
356 lines
12 KiB
Python
"""
|
||
backfill_agg_trades.py — 历史aggTrades回补脚本
|
||
|
||
功能:
|
||
- 从当前DB最早agg_id向历史方向回补
|
||
- Binance REST API分页拉取,每次1000条
|
||
- INSERT OR IGNORE写入按月分表
|
||
- 断点续传:记录进度到agg_trades_meta
|
||
- 速率控制:sleep 200ms/请求,429自动退避
|
||
- BTC+ETH并行回补
|
||
|
||
用法:
|
||
python3 backfill_agg_trades.py [--days 30] [--symbol BTCUSDT]
|
||
"""
|
||
|
||
import argparse
|
||
import logging
|
||
import os
|
||
import sqlite3
|
||
import sys
|
||
import time
|
||
from datetime import datetime, timezone
|
||
|
||
import requests
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||
handlers=[
|
||
logging.StreamHandler(),
|
||
logging.FileHandler(os.path.join(os.path.dirname(__file__), "..", "backfill.log")),
|
||
],
|
||
)
|
||
logger = logging.getLogger("backfill")
|
||
|
||
DB_PATH = os.path.join(os.path.dirname(__file__), "..", "arb.db")
|
||
BINANCE_FAPI = "https://fapi.binance.com/fapi/v1"
|
||
HEADERS = {"User-Agent": "Mozilla/5.0 ArbitrageEngine/backfill"}
|
||
BATCH_SIZE = 1000
|
||
SLEEP_MS = 2000 # 低优先级:2秒/请求,让出锁给实时服务
|
||
|
||
|
||
# ─── DB helpers ──────────────────────────────────────────────────
|
||
|
||
def get_conn() -> sqlite3.Connection:
|
||
conn = sqlite3.connect(DB_PATH, timeout=30)
|
||
conn.row_factory = sqlite3.Row
|
||
conn.execute("PRAGMA journal_mode=WAL")
|
||
conn.execute("PRAGMA synchronous=NORMAL")
|
||
return conn
|
||
|
||
|
||
def table_name(ts_ms: int) -> str:
|
||
dt = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc)
|
||
return f"agg_trades_{dt.strftime('%Y%m')}"
|
||
|
||
|
||
def ensure_table(conn: sqlite3.Connection, tname: str):
|
||
conn.execute(f"""
|
||
CREATE TABLE IF NOT EXISTS {tname} (
|
||
agg_id INTEGER PRIMARY KEY,
|
||
symbol TEXT NOT NULL,
|
||
price REAL NOT NULL,
|
||
qty REAL NOT NULL,
|
||
time_ms INTEGER NOT NULL,
|
||
is_buyer_maker INTEGER NOT NULL
|
||
)
|
||
""")
|
||
conn.execute(f"CREATE INDEX IF NOT EXISTS idx_{tname}_symbol_time ON {tname}(symbol, time_ms)")
|
||
|
||
|
||
def ensure_meta(conn: sqlite3.Connection):
|
||
conn.execute("""
|
||
CREATE TABLE IF NOT EXISTS agg_trades_meta (
|
||
symbol TEXT PRIMARY KEY,
|
||
last_agg_id INTEGER,
|
||
last_time_ms INTEGER,
|
||
earliest_agg_id INTEGER,
|
||
earliest_time_ms INTEGER
|
||
)
|
||
""")
|
||
# 添加earliest字段(如果旧表没有)
|
||
try:
|
||
conn.execute("ALTER TABLE agg_trades_meta ADD COLUMN earliest_agg_id INTEGER")
|
||
conn.execute("ALTER TABLE agg_trades_meta ADD COLUMN earliest_time_ms INTEGER")
|
||
except Exception:
|
||
pass # 已存在
|
||
|
||
|
||
def get_earliest_agg_id(conn: sqlite3.Connection, symbol: str) -> int | None:
|
||
"""查找DB中该symbol的最小agg_id"""
|
||
# 先查meta表
|
||
row = conn.execute(
|
||
"SELECT earliest_agg_id FROM agg_trades_meta WHERE symbol = ?", (symbol,)
|
||
).fetchone()
|
||
if row and row["earliest_agg_id"]:
|
||
return row["earliest_agg_id"]
|
||
|
||
# meta表没有,扫所有月表
|
||
tables = conn.execute(
|
||
"SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'agg_trades_2%'"
|
||
).fetchall()
|
||
min_id = None
|
||
for t in tables:
|
||
r = conn.execute(
|
||
f"SELECT MIN(agg_id) as mid FROM {t['name']} WHERE symbol = ?", (symbol,)
|
||
).fetchone()
|
||
if r and r["mid"] is not None:
|
||
if min_id is None or r["mid"] < min_id:
|
||
min_id = r["mid"]
|
||
return min_id
|
||
|
||
|
||
def update_earliest_meta(conn: sqlite3.Connection, symbol: str, agg_id: int, time_ms: int):
|
||
# 先检查是否已有该symbol的记录
|
||
row = conn.execute("SELECT symbol FROM agg_trades_meta WHERE symbol = ?", (symbol,)).fetchone()
|
||
if row:
|
||
conn.execute("""
|
||
UPDATE agg_trades_meta SET
|
||
earliest_agg_id = MIN(?, COALESCE(earliest_agg_id, ?)),
|
||
earliest_time_ms = MIN(?, COALESCE(earliest_time_ms, ?))
|
||
WHERE symbol = ?
|
||
""", (agg_id, agg_id, time_ms, time_ms, symbol))
|
||
else:
|
||
conn.execute("""
|
||
INSERT INTO agg_trades_meta (symbol, last_agg_id, last_time_ms, earliest_agg_id, earliest_time_ms)
|
||
VALUES (?, ?, ?, ?, ?)
|
||
""", (symbol, agg_id, time_ms, agg_id, time_ms))
|
||
conn.commit()
|
||
|
||
|
||
# ─── REST API ────────────────────────────────────────────────────
|
||
|
||
def fetch_agg_trades(symbol: str, from_id: int | None = None,
|
||
start_time: int | None = None, limit: int = 1000) -> list[dict]:
|
||
"""从Binance拉取aggTrades,支持fromId和startTime"""
|
||
params = {"symbol": symbol, "limit": limit}
|
||
if from_id is not None:
|
||
params["fromId"] = from_id
|
||
elif start_time is not None:
|
||
params["startTime"] = start_time
|
||
|
||
for attempt in range(5):
|
||
try:
|
||
r = requests.get(
|
||
f"{BINANCE_FAPI}/aggTrades",
|
||
params=params, headers=HEADERS, timeout=15
|
||
)
|
||
if r.status_code == 429:
|
||
wait = min(60 * (2 ** attempt), 300)
|
||
logger.warning(f"Rate limited (429), waiting {wait}s (attempt {attempt+1})")
|
||
time.sleep(wait)
|
||
continue
|
||
if r.status_code != 200:
|
||
logger.error(f"HTTP {r.status_code}: {r.text[:200]}")
|
||
time.sleep(2)
|
||
continue
|
||
return r.json()
|
||
except requests.exceptions.RequestException as e:
|
||
logger.error(f"Request error: {e}, retrying in {2 ** attempt}s")
|
||
time.sleep(2 ** attempt)
|
||
return []
|
||
|
||
|
||
def write_batch(conn: sqlite3.Connection, symbol: str, trades: list[dict]) -> int:
|
||
"""批量写入,返回实际插入条数"""
|
||
inserted = 0
|
||
tables_seen = set()
|
||
for t in trades:
|
||
tname = table_name(t["T"])
|
||
if tname not in tables_seen:
|
||
ensure_table(conn, tname)
|
||
tables_seen.add(tname)
|
||
try:
|
||
conn.execute(
|
||
f"INSERT OR IGNORE INTO {tname} (agg_id, symbol, price, qty, time_ms, is_buyer_maker) "
|
||
f"VALUES (?, ?, ?, ?, ?, ?)",
|
||
(t["a"], symbol, float(t["p"]), float(t["q"]), t["T"], 1 if t["m"] else 0)
|
||
)
|
||
inserted += 1
|
||
except sqlite3.IntegrityError:
|
||
pass
|
||
conn.commit()
|
||
return inserted
|
||
|
||
|
||
# ─── 主逻辑 ──────────────────────────────────────────────────────
|
||
|
||
def backfill_symbol(symbol: str, target_days: int | None = None):
|
||
"""回补单个symbol的历史数据"""
|
||
conn = get_conn()
|
||
ensure_meta(conn)
|
||
|
||
earliest = get_earliest_agg_id(conn, symbol)
|
||
if earliest is None:
|
||
logger.info(f"[{symbol}] DB无数据,从最新开始拉最近的数据确定起点")
|
||
trades = fetch_agg_trades(symbol, limit=1)
|
||
if not trades:
|
||
logger.error(f"[{symbol}] 无法获取起始数据")
|
||
return
|
||
earliest = trades[0]["a"]
|
||
logger.info(f"[{symbol}] 当前最新agg_id: {earliest}")
|
||
|
||
# 计算目标:往前补到多少天前
|
||
if target_days:
|
||
target_ts = int((time.time() - target_days * 86400) * 1000)
|
||
else:
|
||
target_ts = 0 # 拉到最早
|
||
|
||
logger.info(f"[{symbol}] 开始回补,当前最早agg_id={earliest}")
|
||
if target_days:
|
||
target_dt = datetime.fromtimestamp(target_ts / 1000, tz=timezone.utc)
|
||
logger.info(f"[{symbol}] 目标:回补到 {target_dt.strftime('%Y-%m-%d')} ({target_days}天前)")
|
||
|
||
total_inserted = 0
|
||
total_requests = 0
|
||
rate_limit_hits = 0
|
||
current_from_id = earliest
|
||
|
||
while True:
|
||
# 从current_from_id向前拉:先拉current_from_id - BATCH_SIZE*2的位置
|
||
# Binance fromId是从该id开始正向拉,所以我们用endTime方式
|
||
# 更可靠:用fromId = current_from_id - BATCH_SIZE,然后正向拉
|
||
fetch_from = max(0, current_from_id - BATCH_SIZE)
|
||
|
||
trades = fetch_agg_trades(symbol, from_id=fetch_from, limit=BATCH_SIZE)
|
||
total_requests += 1
|
||
|
||
if not trades:
|
||
logger.info(f"[{symbol}] 无更多数据,回补结束")
|
||
break
|
||
|
||
# 过滤掉已有的(>= earliest的数据实时采集器已覆盖)
|
||
new_trades = [t for t in trades if t["a"] < current_from_id]
|
||
|
||
if not new_trades:
|
||
# 没有更早的数据了
|
||
logger.info(f"[{symbol}] 已到达最早数据,回补结束")
|
||
break
|
||
|
||
inserted = write_batch(conn, symbol, new_trades)
|
||
total_inserted += inserted
|
||
|
||
oldest = min(new_trades, key=lambda x: x["a"])
|
||
oldest_time = datetime.fromtimestamp(oldest["T"] / 1000, tz=timezone.utc)
|
||
current_from_id = oldest["a"]
|
||
|
||
# 更新meta
|
||
update_earliest_meta(conn, symbol, oldest["a"], oldest["T"])
|
||
|
||
# 进度日志(每50批打一次)
|
||
if total_requests % 50 == 0:
|
||
logger.info(
|
||
f"[{symbol}] 进度: {total_inserted:,} 条已插入, "
|
||
f"当前位置: {oldest_time.strftime('%Y-%m-%d %H:%M')}, "
|
||
f"agg_id={current_from_id:,}, "
|
||
f"请求数={total_requests}, 429次数={rate_limit_hits}"
|
||
)
|
||
|
||
# 检查是否到达目标时间
|
||
if target_ts and oldest["T"] <= target_ts:
|
||
logger.info(f"[{symbol}] 已达到目标时间,回补结束")
|
||
break
|
||
|
||
time.sleep(SLEEP_MS / 1000)
|
||
|
||
conn.close()
|
||
|
||
logger.info(
|
||
f"[{symbol}] 回补完成: "
|
||
f"总插入={total_inserted:,}, 总请求={total_requests}, "
|
||
f"429次数={rate_limit_hits}"
|
||
)
|
||
return total_inserted, total_requests, rate_limit_hits
|
||
|
||
|
||
def check_continuity(symbol: str):
|
||
"""检查agg_id连续性"""
|
||
conn = get_conn()
|
||
tables = conn.execute(
|
||
"SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'agg_trades_2%' ORDER BY name"
|
||
).fetchall()
|
||
|
||
all_ids = []
|
||
for t in tables:
|
||
rows = conn.execute(
|
||
f"SELECT agg_id FROM {t['name']} WHERE symbol = ? ORDER BY agg_id",
|
||
(symbol,)
|
||
).fetchall()
|
||
all_ids.extend(r["agg_id"] for r in rows)
|
||
|
||
conn.close()
|
||
|
||
if not all_ids:
|
||
logger.info(f"[{symbol}] 无数据")
|
||
return
|
||
|
||
all_ids.sort()
|
||
gaps = 0
|
||
gap_ranges = []
|
||
for i in range(1, len(all_ids)):
|
||
diff = all_ids[i] - all_ids[i-1]
|
||
if diff > 1:
|
||
gaps += 1
|
||
if len(gap_ranges) < 10:
|
||
gap_ranges.append((all_ids[i-1], all_ids[i], diff - 1))
|
||
|
||
total = len(all_ids)
|
||
span = all_ids[-1] - all_ids[0] + 1
|
||
coverage = total / span * 100 if span > 0 else 0
|
||
|
||
logger.info(
|
||
f"[{symbol}] 连续性检查: "
|
||
f"总条数={total:,}, ID范围={all_ids[0]:,}~{all_ids[-1]:,}, "
|
||
f"理论条数={span:,}, 覆盖率={coverage:.2f}%, 缺口数={gaps}"
|
||
)
|
||
if gap_ranges:
|
||
for start, end, missing in gap_ranges[:5]:
|
||
logger.info(f" 缺口: {start} → {end} (缺{missing}条)")
|
||
|
||
|
||
# ─── CLI ─────────────────────────────────────────────────────────
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="Backfill aggTrades from Binance")
|
||
parser.add_argument("--days", type=int, default=None, help="回补天数(默认全量)")
|
||
parser.add_argument("--symbol", type=str, default=None, help="指定symbol(默认BTC+ETH)")
|
||
parser.add_argument("--check", action="store_true", help="仅检查连续性")
|
||
args = parser.parse_args()
|
||
|
||
symbols = [args.symbol] if args.symbol else ["BTCUSDT", "ETHUSDT"]
|
||
|
||
if args.check:
|
||
for sym in symbols:
|
||
check_continuity(sym)
|
||
return
|
||
|
||
logger.info(f"=== 开始回补 === symbols={symbols}, days={args.days or '全量'}")
|
||
|
||
for sym in symbols:
|
||
logger.info(f"--- 回补 {sym} ---")
|
||
result = backfill_symbol(sym, target_days=args.days)
|
||
if result:
|
||
inserted, reqs, limits = result
|
||
logger.info(f"[{sym}] 结果: 插入{inserted:,}条, {reqs}次请求, {limits}次429")
|
||
|
||
logger.info("=== 回补完成,开始连续性检查 ===")
|
||
for sym in symbols:
|
||
check_continuity(sym)
|
||
|
||
logger.info("=== 全部完成 ===")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|