From 85db47e41f3538aad2fb476bebe24fc2a73395db Mon Sep 17 00:00:00 2001 From: root Date: Tue, 3 Mar 2026 13:59:34 +0000 Subject: [PATCH] feat: Phase 2 - realtime WebSocket OBI + spot/perp divergence for BTC gate-control, tiered whale CVD in-process --- backend/signal_engine.py | 142 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 138 insertions(+), 4 deletions(-) diff --git a/backend/signal_engine.py b/backend/signal_engine.py index 17b7c5c..5b73190 100644 --- a/backend/signal_engine.py +++ b/backend/signal_engine.py @@ -19,10 +19,14 @@ import logging import os import time import json +import threading +import asyncio from collections import deque from datetime import datetime, timezone from typing import Any, Optional +import websockets + from db import get_sync_conn, init_schema logging.basicConfig( @@ -312,6 +316,12 @@ class SymbolState: self.last_signal_ts: dict[str, int] = {} self.last_signal_dir: dict[str, str] = {} self.recent_large_trades: deque = deque() + # ── Phase 2 实时内存字段(由后台WebSocket协程更新)────────── + self.rt_obi: float = 0.0 # 订单簿失衡[-1,1] + self.rt_spot_perp_div: float = 0.0 # 期现背离(spot-mark)/mark + # tiered_cvd_whale:按成交额分档,实时累计(最近15分钟窗口) + self._whale_trades: deque = deque() # (time_ms, usd_val, is_sell) + self.WHALE_WINDOW_MS: int = 15 * 60 * 1000 # 15分钟 def process_trade(self, agg_id: int, time_ms: int, price: float, qty: float, is_buyer_maker: int): now_ms = time_ms @@ -327,6 +337,23 @@ class SymbolState: self.last_processed_id = agg_id self.last_trade_price = price # 最新成交价,用于entry_price + # tiered_cvd_whale 实时累计(>$100k 为巨鲸) + usd_val = price * qty + if usd_val >= 100_000: + self._whale_trades.append((time_ms, usd_val, bool(is_buyer_maker))) + # 修剪15分钟窗口 + cutoff = now_ms - self.WHALE_WINDOW_MS + while self._whale_trades and self._whale_trades[0][0] < cutoff: + self._whale_trades.popleft() + + @property + def whale_cvd_ratio(self) -> float: + """巨鲸净CVD比率[-1,1],基于最近15分钟>$100k成交""" + buy_usd = sum(t[1] for t in self._whale_trades if not t[2]) + sell_usd = sum(t[1] for t in self._whale_trades if t[2]) + total = buy_usd + sell_usd + return (buy_usd - sell_usd) / total if total > 0 else 0.0 + def compute_p95_p99(self) -> tuple: if len(self.win_day.trades) < 100: return 5.0, 10.0 @@ -886,8 +913,8 @@ class SymbolState: else: direction = "LONG" if cvd_fast > 0 else "SHORT" - # Gate 3: OBI否决(Phase2接入obi_depth_10后生效) - obi_raw = to_float(self.market_indicators.get("obi_depth_10")) + # Gate 3: OBI否决 — 优先用实时WebSocket值,回退DB值 + obi_raw = self.rt_obi if self.rt_obi != 0.0 else to_float(self.market_indicators.get("obi_depth_10")) if not block_reason and obi_raw is not None: # obi_raw: 正值=买单占优,负值=卖单占优,[-1,1] if direction == "LONG" and obi_raw < -obi_veto: @@ -895,8 +922,8 @@ class SymbolState: elif direction == "SHORT" and obi_raw > obi_veto: block_reason = f"obi_imbalance_veto(obi={obi_raw:.3f})" - # Gate 4: 期现背离否决(Phase2接入spot_perp_divergence后生效) - spot_perp_div = to_float(self.market_indicators.get("spot_perp_divergence")) + # Gate 4: 期现背离否决 — 优先用实时WebSocket值,回退DB值 + spot_perp_div = self.rt_spot_perp_div if self.rt_spot_perp_div != 0.0 else to_float(self.market_indicators.get("spot_perp_divergence")) if not block_reason and spot_perp_div is not None: # spot_perp_div: 绝对背离率,如0.005=0.5% if abs(spot_perp_div) > spot_perp_veto: @@ -909,6 +936,8 @@ class SymbolState: gate_passed = block_reason is None # 复用ALT评分作为参考分(不影响门控决策,仅供记录) + # whale_cvd_ratio 优先用实时计算值 + whale_cvd = self.whale_cvd_ratio if self._whale_trades else to_float(self.market_indicators.get("tiered_cvd_whale")) alt_result = self._evaluate_v53_alt(now_ms, strategy_cfg, snap) total_score = alt_result["score"] if gate_passed else 0 @@ -923,6 +952,7 @@ class SymbolState: "atr_pct_price": round(atr_pct_price, 5), "obi_raw": obi_raw, "spot_perp_div": spot_perp_div, + "whale_cvd_ratio": whale_cvd, "alt_score_ref": alt_result["score"], }, }) @@ -1153,6 +1183,107 @@ def paper_active_count(strategy: Optional[str] = None) -> int: return cur.fetchone()[0] +# ─── 实时 WebSocket 数据(OBI + 期现背离)──────────────────────── + +_REALTIME_STATES: dict = {} # symbol -> SymbolState,在main()里注入 + +async def _ws_obi_stream(symbol: str, state): + """订阅期货盘口深度流,实时更新 state.rt_obi""" + stream = f"{symbol.lower()}@depth10@100ms" + url = f"wss://fstream.binance.com/stream?streams={stream}" + while True: + try: + async with websockets.connect(url, ping_interval=20, ping_timeout=10) as ws: + logger.info(f"[RT-OBI] {symbol} WebSocket connected") + async for raw in ws: + data = json.loads(raw) + book = data.get("data", data) + bids = book.get("b", []) + asks = book.get("a", []) + bid_vol = sum(float(b[1]) for b in bids) + ask_vol = sum(float(a[1]) for a in asks) + total = bid_vol + ask_vol + state.rt_obi = (bid_vol - ask_vol) / total if total > 0 else 0.0 + except Exception as e: + logger.warning(f"[RT-OBI] {symbol} 断线重连: {e}") + await asyncio.sleep(3) + +async def _ws_spot_perp_stream(symbol: str, state): + """ + 订阅现货 bookTicker(最优买卖价)+ 期货 markPrice,计算期现背离 + spot_mid = (best_bid + best_ask) / 2 + divergence = (spot_mid - mark_price) / mark_price + """ + spot_stream = f"{symbol.lower()}@bookTicker" + perp_stream = f"{symbol.lower()}@markPrice@1s" + spot_url = f"wss://stream.binance.com:9443/stream?streams={spot_stream}" + perp_url = f"wss://fstream.binance.com/stream?streams={perp_stream}" + + spot_mid = [0.0] # mutable container for closure + mark_price = [0.0] + + async def read_spot(): + while True: + try: + async with websockets.connect(spot_url, ping_interval=20, ping_timeout=10) as ws: + logger.info(f"[RT-SPD] {symbol} spot WebSocket connected") + async for raw in ws: + d = json.loads(raw).get("data", json.loads(raw)) + bid = float(d.get("b", 0) or 0) + ask = float(d.get("a", 0) or 0) + if bid > 0 and ask > 0: + spot_mid[0] = (bid + ask) / 2 + if mark_price[0] > 0: + state.rt_spot_perp_div = (spot_mid[0] - mark_price[0]) / mark_price[0] + except Exception as e: + logger.warning(f"[RT-SPD] {symbol} spot 断线: {e}") + await asyncio.sleep(3) + + async def read_perp(): + while True: + try: + async with websockets.connect(perp_url, ping_interval=20, ping_timeout=10) as ws: + logger.info(f"[RT-SPD] {symbol} perp WebSocket connected") + async for raw in ws: + d = json.loads(raw).get("data", json.loads(raw)) + mp = float(d.get("p", 0) or d.get("markPrice", 0) or 0) + if mp > 0: + mark_price[0] = mp + if spot_mid[0] > 0: + state.rt_spot_perp_div = (spot_mid[0] - mark_price[0]) / mark_price[0] + except Exception as e: + logger.warning(f"[RT-SPD] {symbol} perp 断线: {e}") + await asyncio.sleep(3) + + await asyncio.gather(read_spot(), read_perp()) + +async def _realtime_ws_runner(states: dict): + """统一启动所有实时WebSocket协程,目前只给BTC""" + btc_state = states.get("BTCUSDT") + if btc_state is None: + return + await asyncio.gather( + _ws_obi_stream("BTCUSDT", btc_state), + _ws_spot_perp_stream("BTCUSDT", btc_state), + ) + +def start_realtime_ws(states: dict): + """在独立线程里跑asyncio event loop,驱动实时WebSocket采集""" + def _run(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(_realtime_ws_runner(states)) + except Exception as e: + logger.error(f"[RT-WS] event loop 异常: {e}") + finally: + loop.close() + + t = threading.Thread(target=_run, daemon=True, name="realtime-ws") + t.start() + logger.info("[RT-WS] 实时WebSocket后台线程已启动(BTC OBI + 期现背离)") + + # ─── 主循环 ────────────────────────────────────────────────────── def main(): @@ -1169,6 +1300,9 @@ def main(): logger.info("=== Signal Engine (PG) 启动完成 ===") + # 启动实时WebSocket后台线程(BTC OBI + 期现背离) + start_realtime_ws(states) + last_1m_save = {} cycle = 0 warmup_cycles = 3 # 启动后跳过前3轮(45秒),避免冷启动信号开仓