arbitrage-engine/backend/liquidation_collector.py

141 lines
5.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
清算数据采集器 — 币安WS forceOrder实时流
存入 market_indicators 表indicator_type = 'liquidation'
每笔清算记录symbol, side, price, qty, trade_time
每5分钟汇总一次long_liq_usd, short_liq_usd, total_liq_usd, count
"""
import asyncio
import json
import logging
import time
import websockets
from db import get_sync_conn
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("liquidation_collector")
SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"]
WS_URL = "wss://fstream.binance.com/stream?streams=" + "/".join(
f"{s.lower()}@forceOrder" for s in SYMBOLS
)
# 5分钟聚合窗口
AGG_INTERVAL = 300 # seconds
def ensure_table():
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS liquidations (
id BIGSERIAL PRIMARY KEY,
symbol TEXT NOT NULL,
side TEXT NOT NULL,
price DOUBLE PRECISION NOT NULL,
qty DOUBLE PRECISION NOT NULL,
usd_value DOUBLE PRECISION NOT NULL,
trade_time BIGINT NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_liquidations_symbol_time
ON liquidations(symbol, trade_time DESC);
""")
conn.commit()
logger.info("liquidations table ensured")
def save_liquidation(symbol: str, side: str, price: float, qty: float, usd_value: float, trade_time: int):
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO liquidations (symbol, side, price, qty, usd_value, trade_time) "
"VALUES (%s, %s, %s, %s, %s, %s)",
(symbol, side, price, qty, usd_value, trade_time)
)
conn.commit()
def save_aggregated(symbol: str, ts_ms: int, long_liq_usd: float, short_liq_usd: float, count: int):
"""每5分钟汇总存入market_indicators供signal_engine读取"""
payload = json.dumps({
"long_liq_usd": round(long_liq_usd, 2),
"short_liq_usd": round(short_liq_usd, 2),
"total_liq_usd": round(long_liq_usd + short_liq_usd, 2),
"count": count,
})
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO market_indicators (symbol, indicator_type, timestamp_ms, value) "
"VALUES (%s, %s, %s, %s)",
(symbol, "liquidation", ts_ms, payload)
)
conn.commit()
async def run():
ensure_table()
# 每个symbol的聚合缓冲
agg = {s: {"long_usd": 0.0, "short_usd": 0.0, "count": 0, "window_start": int(time.time())} for s in SYMBOLS}
while True:
try:
logger.info("Connecting to Binance forceOrder WS...")
async with websockets.connect(WS_URL, ping_interval=20, ping_timeout=10) as ws:
logger.info("Connected! Listening for liquidations...")
async for msg in ws:
data = json.loads(msg)
if "data" not in data:
continue
order = data["data"]["o"]
symbol = order["s"]
side = order["S"] # BUY = short被清算, SELL = long被清算
price = float(order["p"])
qty = float(order["q"])
usd_value = price * qty
trade_time = order["T"]
# 清算方向BUY=空头被爆仓, SELL=多头被爆仓
liq_side = "SHORT" if side == "BUY" else "LONG"
# 存入原始记录
save_liquidation(symbol, liq_side, price, qty, usd_value, trade_time)
logger.info(f"[{symbol}] 💥 {liq_side} liquidation: {qty} @ ${price:.2f} = ${usd_value:,.0f}")
# 聚合
if symbol in agg:
buf = agg[symbol]
if liq_side == "LONG":
buf["long_usd"] += usd_value
else:
buf["short_usd"] += usd_value
buf["count"] += 1
# 检查是否到了5分钟聚合窗口
now = int(time.time())
for sym in SYMBOLS:
buf = agg[sym]
if now - buf["window_start"] >= AGG_INTERVAL:
if buf["count"] > 0:
save_aggregated(sym, now * 1000, buf["long_usd"], buf["short_usd"], buf["count"])
logger.info(f"[{sym}] 📊 5min agg: long=${buf['long_usd']:,.0f} short=${buf['short_usd']:,.0f} count={buf['count']}")
# 即使没清算也写一条0记录保持连贯
elif now - buf["window_start"] >= AGG_INTERVAL:
save_aggregated(sym, now * 1000, 0, 0, 0)
buf["long_usd"] = 0.0
buf["short_usd"] = 0.0
buf["count"] = 0
buf["window_start"] = now
except Exception as e:
logger.error(f"WS error: {e}, reconnecting in 5s...")
await asyncio.sleep(5)
if __name__ == "__main__":
asyncio.run(run())