feat: Phase 2 - realtime WebSocket OBI + spot/perp divergence for BTC gate-control, tiered whale CVD in-process

This commit is contained in:
root 2026-03-03 13:59:34 +00:00
parent 17a387b6f4
commit 85db47e41f

View File

@ -19,10 +19,14 @@ import logging
import os import os
import time import time
import json import json
import threading
import asyncio
from collections import deque from collections import deque
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Any, Optional from typing import Any, Optional
import websockets
from db import get_sync_conn, init_schema from db import get_sync_conn, init_schema
logging.basicConfig( logging.basicConfig(
@ -312,6 +316,12 @@ class SymbolState:
self.last_signal_ts: dict[str, int] = {} self.last_signal_ts: dict[str, int] = {}
self.last_signal_dir: dict[str, str] = {} self.last_signal_dir: dict[str, str] = {}
self.recent_large_trades: deque = deque() self.recent_large_trades: deque = deque()
# ── Phase 2 实时内存字段由后台WebSocket协程更新──────────
self.rt_obi: float = 0.0 # 订单簿失衡[-1,1]
self.rt_spot_perp_div: float = 0.0 # 期现背离spot-mark)/mark
# tiered_cvd_whale按成交额分档实时累计最近15分钟窗口
self._whale_trades: deque = deque() # (time_ms, usd_val, is_sell)
self.WHALE_WINDOW_MS: int = 15 * 60 * 1000 # 15分钟
def process_trade(self, agg_id: int, time_ms: int, price: float, qty: float, is_buyer_maker: int): def process_trade(self, agg_id: int, time_ms: int, price: float, qty: float, is_buyer_maker: int):
now_ms = time_ms now_ms = time_ms
@ -327,6 +337,23 @@ class SymbolState:
self.last_processed_id = agg_id self.last_processed_id = agg_id
self.last_trade_price = price # 最新成交价用于entry_price self.last_trade_price = price # 最新成交价用于entry_price
# tiered_cvd_whale 实时累计(>$100k 为巨鲸)
usd_val = price * qty
if usd_val >= 100_000:
self._whale_trades.append((time_ms, usd_val, bool(is_buyer_maker)))
# 修剪15分钟窗口
cutoff = now_ms - self.WHALE_WINDOW_MS
while self._whale_trades and self._whale_trades[0][0] < cutoff:
self._whale_trades.popleft()
@property
def whale_cvd_ratio(self) -> float:
"""巨鲸净CVD比率[-1,1]基于最近15分钟>$100k成交"""
buy_usd = sum(t[1] for t in self._whale_trades if not t[2])
sell_usd = sum(t[1] for t in self._whale_trades if t[2])
total = buy_usd + sell_usd
return (buy_usd - sell_usd) / total if total > 0 else 0.0
def compute_p95_p99(self) -> tuple: def compute_p95_p99(self) -> tuple:
if len(self.win_day.trades) < 100: if len(self.win_day.trades) < 100:
return 5.0, 10.0 return 5.0, 10.0
@ -886,8 +913,8 @@ class SymbolState:
else: else:
direction = "LONG" if cvd_fast > 0 else "SHORT" direction = "LONG" if cvd_fast > 0 else "SHORT"
# Gate 3: OBI否决Phase2接入obi_depth_10后生效 # Gate 3: OBI否决 — 优先用实时WebSocket值回退DB值
obi_raw = to_float(self.market_indicators.get("obi_depth_10")) obi_raw = self.rt_obi if self.rt_obi != 0.0 else to_float(self.market_indicators.get("obi_depth_10"))
if not block_reason and obi_raw is not None: if not block_reason and obi_raw is not None:
# obi_raw: 正值=买单占优,负值=卖单占优,[-1,1] # obi_raw: 正值=买单占优,负值=卖单占优,[-1,1]
if direction == "LONG" and obi_raw < -obi_veto: if direction == "LONG" and obi_raw < -obi_veto:
@ -895,8 +922,8 @@ class SymbolState:
elif direction == "SHORT" and obi_raw > obi_veto: elif direction == "SHORT" and obi_raw > obi_veto:
block_reason = f"obi_imbalance_veto(obi={obi_raw:.3f})" block_reason = f"obi_imbalance_veto(obi={obi_raw:.3f})"
# Gate 4: 期现背离否决Phase2接入spot_perp_divergence后生效 # Gate 4: 期现背离否决 — 优先用实时WebSocket值回退DB值
spot_perp_div = to_float(self.market_indicators.get("spot_perp_divergence")) spot_perp_div = self.rt_spot_perp_div if self.rt_spot_perp_div != 0.0 else to_float(self.market_indicators.get("spot_perp_divergence"))
if not block_reason and spot_perp_div is not None: if not block_reason and spot_perp_div is not None:
# spot_perp_div: 绝对背离率如0.005=0.5% # spot_perp_div: 绝对背离率如0.005=0.5%
if abs(spot_perp_div) > spot_perp_veto: if abs(spot_perp_div) > spot_perp_veto:
@ -909,6 +936,8 @@ class SymbolState:
gate_passed = block_reason is None gate_passed = block_reason is None
# 复用ALT评分作为参考分不影响门控决策仅供记录 # 复用ALT评分作为参考分不影响门控决策仅供记录
# whale_cvd_ratio 优先用实时计算值
whale_cvd = self.whale_cvd_ratio if self._whale_trades else to_float(self.market_indicators.get("tiered_cvd_whale"))
alt_result = self._evaluate_v53_alt(now_ms, strategy_cfg, snap) alt_result = self._evaluate_v53_alt(now_ms, strategy_cfg, snap)
total_score = alt_result["score"] if gate_passed else 0 total_score = alt_result["score"] if gate_passed else 0
@ -923,6 +952,7 @@ class SymbolState:
"atr_pct_price": round(atr_pct_price, 5), "atr_pct_price": round(atr_pct_price, 5),
"obi_raw": obi_raw, "obi_raw": obi_raw,
"spot_perp_div": spot_perp_div, "spot_perp_div": spot_perp_div,
"whale_cvd_ratio": whale_cvd,
"alt_score_ref": alt_result["score"], "alt_score_ref": alt_result["score"],
}, },
}) })
@ -1153,6 +1183,107 @@ def paper_active_count(strategy: Optional[str] = None) -> int:
return cur.fetchone()[0] return cur.fetchone()[0]
# ─── 实时 WebSocket 数据OBI + 期现背离)────────────────────────
_REALTIME_STATES: dict = {} # symbol -> SymbolState在main()里注入
async def _ws_obi_stream(symbol: str, state):
"""订阅期货盘口深度流,实时更新 state.rt_obi"""
stream = f"{symbol.lower()}@depth10@100ms"
url = f"wss://fstream.binance.com/stream?streams={stream}"
while True:
try:
async with websockets.connect(url, ping_interval=20, ping_timeout=10) as ws:
logger.info(f"[RT-OBI] {symbol} WebSocket connected")
async for raw in ws:
data = json.loads(raw)
book = data.get("data", data)
bids = book.get("b", [])
asks = book.get("a", [])
bid_vol = sum(float(b[1]) for b in bids)
ask_vol = sum(float(a[1]) for a in asks)
total = bid_vol + ask_vol
state.rt_obi = (bid_vol - ask_vol) / total if total > 0 else 0.0
except Exception as e:
logger.warning(f"[RT-OBI] {symbol} 断线重连: {e}")
await asyncio.sleep(3)
async def _ws_spot_perp_stream(symbol: str, state):
"""
订阅现货 bookTicker最优买卖价+ 期货 markPrice计算期现背离
spot_mid = (best_bid + best_ask) / 2
divergence = (spot_mid - mark_price) / mark_price
"""
spot_stream = f"{symbol.lower()}@bookTicker"
perp_stream = f"{symbol.lower()}@markPrice@1s"
spot_url = f"wss://stream.binance.com:9443/stream?streams={spot_stream}"
perp_url = f"wss://fstream.binance.com/stream?streams={perp_stream}"
spot_mid = [0.0] # mutable container for closure
mark_price = [0.0]
async def read_spot():
while True:
try:
async with websockets.connect(spot_url, ping_interval=20, ping_timeout=10) as ws:
logger.info(f"[RT-SPD] {symbol} spot WebSocket connected")
async for raw in ws:
d = json.loads(raw).get("data", json.loads(raw))
bid = float(d.get("b", 0) or 0)
ask = float(d.get("a", 0) or 0)
if bid > 0 and ask > 0:
spot_mid[0] = (bid + ask) / 2
if mark_price[0] > 0:
state.rt_spot_perp_div = (spot_mid[0] - mark_price[0]) / mark_price[0]
except Exception as e:
logger.warning(f"[RT-SPD] {symbol} spot 断线: {e}")
await asyncio.sleep(3)
async def read_perp():
while True:
try:
async with websockets.connect(perp_url, ping_interval=20, ping_timeout=10) as ws:
logger.info(f"[RT-SPD] {symbol} perp WebSocket connected")
async for raw in ws:
d = json.loads(raw).get("data", json.loads(raw))
mp = float(d.get("p", 0) or d.get("markPrice", 0) or 0)
if mp > 0:
mark_price[0] = mp
if spot_mid[0] > 0:
state.rt_spot_perp_div = (spot_mid[0] - mark_price[0]) / mark_price[0]
except Exception as e:
logger.warning(f"[RT-SPD] {symbol} perp 断线: {e}")
await asyncio.sleep(3)
await asyncio.gather(read_spot(), read_perp())
async def _realtime_ws_runner(states: dict):
"""统一启动所有实时WebSocket协程目前只给BTC"""
btc_state = states.get("BTCUSDT")
if btc_state is None:
return
await asyncio.gather(
_ws_obi_stream("BTCUSDT", btc_state),
_ws_spot_perp_stream("BTCUSDT", btc_state),
)
def start_realtime_ws(states: dict):
"""在独立线程里跑asyncio event loop驱动实时WebSocket采集"""
def _run():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(_realtime_ws_runner(states))
except Exception as e:
logger.error(f"[RT-WS] event loop 异常: {e}")
finally:
loop.close()
t = threading.Thread(target=_run, daemon=True, name="realtime-ws")
t.start()
logger.info("[RT-WS] 实时WebSocket后台线程已启动BTC OBI + 期现背离)")
# ─── 主循环 ────────────────────────────────────────────────────── # ─── 主循环 ──────────────────────────────────────────────────────
def main(): def main():
@ -1169,6 +1300,9 @@ def main():
logger.info("=== Signal Engine (PG) 启动完成 ===") logger.info("=== Signal Engine (PG) 启动完成 ===")
# 启动实时WebSocket后台线程BTC OBI + 期现背离)
start_realtime_ws(states)
last_1m_save = {} last_1m_save = {}
cycle = 0 cycle = 0
warmup_cycles = 3 # 启动后跳过前3轮45秒避免冷启动信号开仓 warmup_cycles = 3 # 启动后跳过前3轮45秒避免冷启动信号开仓