feat: liquidation_collector.py - Binance WS forceOrder realtime + 5min aggregation to market_indicators

This commit is contained in:
root 2026-02-28 15:38:14 +00:00
parent 6659c4524c
commit abfdc63705

View File

@ -0,0 +1,140 @@
"""
清算数据采集器 币安WS forceOrder实时流
存入 market_indicators indicator_type = 'liquidation'
每笔清算记录symbol, side, price, qty, trade_time
每5分钟汇总一次long_liq_usd, short_liq_usd, total_liq_usd, count
"""
import asyncio
import json
import logging
import time
import websockets
from db import get_sync_conn
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("liquidation_collector")
SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"]
WS_URL = "wss://fstream.binance.com/stream?streams=" + "/".join(
f"{s.lower()}@forceOrder" for s in SYMBOLS
)
# 5分钟聚合窗口
AGG_INTERVAL = 300 # seconds
def ensure_table():
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS liquidations (
id BIGSERIAL PRIMARY KEY,
symbol TEXT NOT NULL,
side TEXT NOT NULL,
price DOUBLE PRECISION NOT NULL,
qty DOUBLE PRECISION NOT NULL,
usd_value DOUBLE PRECISION NOT NULL,
trade_time BIGINT NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_liquidations_symbol_time
ON liquidations(symbol, trade_time DESC);
""")
conn.commit()
logger.info("liquidations table ensured")
def save_liquidation(symbol: str, side: str, price: float, qty: float, usd_value: float, trade_time: int):
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO liquidations (symbol, side, price, qty, usd_value, trade_time) "
"VALUES (%s, %s, %s, %s, %s, %s)",
(symbol, side, price, qty, usd_value, trade_time)
)
conn.commit()
def save_aggregated(symbol: str, ts_ms: int, long_liq_usd: float, short_liq_usd: float, count: int):
"""每5分钟汇总存入market_indicators供signal_engine读取"""
payload = json.dumps({
"long_liq_usd": round(long_liq_usd, 2),
"short_liq_usd": round(short_liq_usd, 2),
"total_liq_usd": round(long_liq_usd + short_liq_usd, 2),
"count": count,
})
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO market_indicators (symbol, indicator_type, timestamp_ms, value) "
"VALUES (%s, %s, %s, %s)",
(symbol, "liquidation", ts_ms, payload)
)
conn.commit()
async def run():
ensure_table()
# 每个symbol的聚合缓冲
agg = {s: {"long_usd": 0.0, "short_usd": 0.0, "count": 0, "window_start": int(time.time())} for s in SYMBOLS}
while True:
try:
logger.info("Connecting to Binance forceOrder WS...")
async with websockets.connect(WS_URL, ping_interval=20, ping_timeout=10) as ws:
logger.info("Connected! Listening for liquidations...")
async for msg in ws:
data = json.loads(msg)
if "data" not in data:
continue
order = data["data"]["o"]
symbol = order["s"]
side = order["S"] # BUY = short被清算, SELL = long被清算
price = float(order["p"])
qty = float(order["q"])
usd_value = price * qty
trade_time = order["T"]
# 清算方向BUY=空头被爆仓, SELL=多头被爆仓
liq_side = "SHORT" if side == "BUY" else "LONG"
# 存入原始记录
save_liquidation(symbol, liq_side, price, qty, usd_value, trade_time)
logger.info(f"[{symbol}] 💥 {liq_side} liquidation: {qty} @ ${price:.2f} = ${usd_value:,.0f}")
# 聚合
if symbol in agg:
buf = agg[symbol]
if liq_side == "LONG":
buf["long_usd"] += usd_value
else:
buf["short_usd"] += usd_value
buf["count"] += 1
# 检查是否到了5分钟聚合窗口
now = int(time.time())
for sym in SYMBOLS:
buf = agg[sym]
if now - buf["window_start"] >= AGG_INTERVAL:
if buf["count"] > 0:
save_aggregated(sym, now * 1000, buf["long_usd"], buf["short_usd"], buf["count"])
logger.info(f"[{sym}] 📊 5min agg: long=${buf['long_usd']:,.0f} short=${buf['short_usd']:,.0f} count={buf['count']}")
# 即使没清算也写一条0记录保持连贯
elif now - buf["window_start"] >= AGG_INTERVAL:
save_aggregated(sym, now * 1000, 0, 0, 0)
buf["long_usd"] = 0.0
buf["short_usd"] = 0.0
buf["count"] = 0
buf["window_start"] = now
except Exception as e:
logger.error(f"WS error: {e}, reconnecting in 5s...")
await asyncio.sleep(5)
if __name__ == "__main__":
asyncio.run(run())