From abfdc637050794bd0b56723d038ff7a716662454 Mon Sep 17 00:00:00 2001 From: root Date: Sat, 28 Feb 2026 15:38:14 +0000 Subject: [PATCH] feat: liquidation_collector.py - Binance WS forceOrder realtime + 5min aggregation to market_indicators --- backend/liquidation_collector.py | 140 +++++++++++++++++++++++++++++++ 1 file changed, 140 insertions(+) create mode 100644 backend/liquidation_collector.py diff --git a/backend/liquidation_collector.py b/backend/liquidation_collector.py new file mode 100644 index 0000000..5cce746 --- /dev/null +++ b/backend/liquidation_collector.py @@ -0,0 +1,140 @@ +""" +清算数据采集器 — 币安WS forceOrder实时流 +存入 market_indicators 表,indicator_type = 'liquidation' + +每笔清算记录:symbol, side, price, qty, trade_time +每5分钟汇总一次:long_liq_usd, short_liq_usd, total_liq_usd, count +""" + +import asyncio +import json +import logging +import time +import websockets +from db import get_sync_conn + +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") +logger = logging.getLogger("liquidation_collector") + +SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"] +WS_URL = "wss://fstream.binance.com/stream?streams=" + "/".join( + f"{s.lower()}@forceOrder" for s in SYMBOLS +) + +# 5分钟聚合窗口 +AGG_INTERVAL = 300 # seconds + + +def ensure_table(): + with get_sync_conn() as conn: + with conn.cursor() as cur: + cur.execute(""" + CREATE TABLE IF NOT EXISTS liquidations ( + id BIGSERIAL PRIMARY KEY, + symbol TEXT NOT NULL, + side TEXT NOT NULL, + price DOUBLE PRECISION NOT NULL, + qty DOUBLE PRECISION NOT NULL, + usd_value DOUBLE PRECISION NOT NULL, + trade_time BIGINT NOT NULL, + created_at TIMESTAMP DEFAULT NOW() + ); + CREATE INDEX IF NOT EXISTS idx_liquidations_symbol_time + ON liquidations(symbol, trade_time DESC); + """) + conn.commit() + logger.info("liquidations table ensured") + + +def save_liquidation(symbol: str, side: str, price: float, qty: float, usd_value: float, trade_time: int): + with get_sync_conn() as conn: + with conn.cursor() as cur: + cur.execute( + "INSERT INTO liquidations (symbol, side, price, qty, usd_value, trade_time) " + "VALUES (%s, %s, %s, %s, %s, %s)", + (symbol, side, price, qty, usd_value, trade_time) + ) + conn.commit() + + +def save_aggregated(symbol: str, ts_ms: int, long_liq_usd: float, short_liq_usd: float, count: int): + """每5分钟汇总存入market_indicators,供signal_engine读取""" + payload = json.dumps({ + "long_liq_usd": round(long_liq_usd, 2), + "short_liq_usd": round(short_liq_usd, 2), + "total_liq_usd": round(long_liq_usd + short_liq_usd, 2), + "count": count, + }) + with get_sync_conn() as conn: + with conn.cursor() as cur: + cur.execute( + "INSERT INTO market_indicators (symbol, indicator_type, timestamp_ms, value) " + "VALUES (%s, %s, %s, %s)", + (symbol, "liquidation", ts_ms, payload) + ) + conn.commit() + + +async def run(): + ensure_table() + + # 每个symbol的聚合缓冲 + agg = {s: {"long_usd": 0.0, "short_usd": 0.0, "count": 0, "window_start": int(time.time())} for s in SYMBOLS} + + while True: + try: + logger.info("Connecting to Binance forceOrder WS...") + async with websockets.connect(WS_URL, ping_interval=20, ping_timeout=10) as ws: + logger.info("Connected! Listening for liquidations...") + async for msg in ws: + data = json.loads(msg) + if "data" not in data: + continue + + order = data["data"]["o"] + symbol = order["s"] + side = order["S"] # BUY = short被清算, SELL = long被清算 + price = float(order["p"]) + qty = float(order["q"]) + usd_value = price * qty + trade_time = order["T"] + + # 清算方向:BUY=空头被爆仓, SELL=多头被爆仓 + liq_side = "SHORT" if side == "BUY" else "LONG" + + # 存入原始记录 + save_liquidation(symbol, liq_side, price, qty, usd_value, trade_time) + logger.info(f"[{symbol}] 💥 {liq_side} liquidation: {qty} @ ${price:.2f} = ${usd_value:,.0f}") + + # 聚合 + if symbol in agg: + buf = agg[symbol] + if liq_side == "LONG": + buf["long_usd"] += usd_value + else: + buf["short_usd"] += usd_value + buf["count"] += 1 + + # 检查是否到了5分钟聚合窗口 + now = int(time.time()) + for sym in SYMBOLS: + buf = agg[sym] + if now - buf["window_start"] >= AGG_INTERVAL: + if buf["count"] > 0: + save_aggregated(sym, now * 1000, buf["long_usd"], buf["short_usd"], buf["count"]) + logger.info(f"[{sym}] 📊 5min agg: long=${buf['long_usd']:,.0f} short=${buf['short_usd']:,.0f} count={buf['count']}") + # 即使没清算也写一条0记录,保持连贯 + elif now - buf["window_start"] >= AGG_INTERVAL: + save_aggregated(sym, now * 1000, 0, 0, 0) + buf["long_usd"] = 0.0 + buf["short_usd"] = 0.0 + buf["count"] = 0 + buf["window_start"] = now + + except Exception as e: + logger.error(f"WS error: {e}, reconnecting in 5s...") + await asyncio.sleep(5) + + +if __name__ == "__main__": + asyncio.run(run())