feat: upgrade signal engine to V5.1 layered scoring

This commit is contained in:
root 2026-02-28 05:24:16 +00:00
parent a9bdec208f
commit 5c38a2f9bf

View File

@ -20,7 +20,7 @@ import os
import time import time
from collections import deque from collections import deque
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Optional from typing import Any, Optional
from db import get_sync_conn, init_schema from db import get_sync_conn, init_schema
@ -51,6 +51,28 @@ ATR_LENGTH = 14
COOLDOWN_MS = 10 * 60 * 1000 COOLDOWN_MS = 10 * 60 * 1000
def fetch_market_indicators(symbol: str) -> dict:
"""从PG读取最新的market_indicators数据"""
with get_sync_conn() as conn:
with conn.cursor() as cur:
indicators = {}
for ind_type in ["long_short_ratio", "top_trader_position", "open_interest_hist", "coinbase_premium"]:
cur.execute(
"SELECT value FROM market_indicators WHERE symbol=%s AND indicator_type=%s ORDER BY timestamp_ms DESC LIMIT 1",
(symbol, ind_type),
)
row = cur.fetchone()
indicators[ind_type] = row[0] if row else None
return indicators
def to_float(value: Any) -> Optional[float]:
try:
return float(value) if value is not None else None
except (TypeError, ValueError):
return None
# ─── 滚动窗口 ─────────────────────────────────────────────────── # ─── 滚动窗口 ───────────────────────────────────────────────────
class TradeWindow: class TradeWindow:
@ -154,6 +176,8 @@ class SymbolState:
self.last_processed_id = 0 self.last_processed_id = 0
self.warmup = True self.warmup = True
self.prev_cvd_fast = 0.0 self.prev_cvd_fast = 0.0
self.prev_cvd_fast_slope = 0.0
self.market_indicators = fetch_market_indicators(symbol)
self.last_signal_ts = 0 self.last_signal_ts = 0
self.last_signal_dir = "" self.last_signal_dir = ""
self.recent_large_trades: deque = deque() self.recent_large_trades: deque = deque()
@ -202,13 +226,17 @@ class SymbolState:
self.update_large_trades(now_ms, p99) self.update_large_trades(now_ms, p99)
price = vwap if vwap > 0 else 0 price = vwap if vwap > 0 else 0
cvd_fast_slope = cvd_fast - self.prev_cvd_fast cvd_fast_slope = cvd_fast - self.prev_cvd_fast
cvd_fast_accel = cvd_fast_slope - self.prev_cvd_fast_slope
self.prev_cvd_fast = cvd_fast self.prev_cvd_fast = cvd_fast
self.prev_cvd_fast_slope = cvd_fast_slope
result = { result = {
"cvd_fast": cvd_fast, "cvd_mid": cvd_mid, "cvd_day": self.win_day.cvd, "cvd_fast": cvd_fast, "cvd_mid": cvd_mid, "cvd_day": self.win_day.cvd,
"cvd_fast_slope": cvd_fast_slope, "cvd_fast_slope": cvd_fast_slope,
"atr": atr, "atr_pct": atr_pct, "vwap": vwap, "price": price, "atr": atr, "atr_pct": atr_pct, "vwap": vwap, "price": price,
"p95": p95, "p99": p99, "signal": None, "direction": None, "score": 0, "p95": p95, "p99": p99, "signal": None, "direction": None, "score": 0,
"tier": None,
"factors": {},
} }
if self.warmup or price == 0 or atr == 0: if self.warmup or price == 0 or atr == 0:
@ -216,28 +244,123 @@ class SymbolState:
if now_ms - self.last_signal_ts < COOLDOWN_MS: if now_ms - self.last_signal_ts < COOLDOWN_MS:
return result return result
long_core = cvd_fast > 0 and cvd_fast_slope > 0 and cvd_mid > 0 and price > vwap if cvd_fast > 0 and cvd_mid > 0:
short_core = cvd_fast < 0 and cvd_fast_slope < 0 and cvd_mid < 0 and price < vwap direction = "LONG"
elif cvd_fast < 0 and cvd_mid < 0:
if not long_core and not short_core: direction = "SHORT"
else:
return result return result
direction = "LONG" if long_core else "SHORT" # V5.1 五层评分体系总分100方向层可因加速额外+5
score = 0 # 1) 方向层45分 + 5加速分
if atr_pct > 60: direction_score = 0
score += 25 if (direction == "LONG" and cvd_fast > 0) or (direction == "SHORT" and cvd_fast < 0):
has_adverse = any( direction_score += 15
if (direction == "LONG" and cvd_mid > 0) or (direction == "SHORT" and cvd_mid < 0):
direction_score += 15
has_adverse_p99 = any(
(direction == "LONG" and lt[2] == 1) or (direction == "SHORT" and lt[2] == 0) (direction == "LONG" and lt[2] == 1) or (direction == "SHORT" and lt[2] == 0)
for lt in self.recent_large_trades for lt in self.recent_large_trades
) )
if not has_adverse: has_aligned_p99 = any(
score += 20 (direction == "LONG" and lt[2] == 0) or (direction == "SHORT" and lt[2] == 1)
for lt in self.recent_large_trades
)
if has_aligned_p99:
direction_score += 15
elif not has_adverse_p99:
direction_score += 10
accel_bonus = 0
if (direction == "LONG" and cvd_fast_accel > 0) or (direction == "SHORT" and cvd_fast_accel < 0):
accel_bonus = 5
result["signal"] = direction # 2) 拥挤层20分- market_indicators缺失时给中间分
long_short_ratio = to_float(self.market_indicators.get("long_short_ratio"))
if long_short_ratio is None:
ls_score = 5
elif (direction == "SHORT" and long_short_ratio > 2.0) or (direction == "LONG" and long_short_ratio < 0.5):
ls_score = 10
else:
ls_score = 5
top_trader_position = to_float(self.market_indicators.get("top_trader_position"))
if top_trader_position is None:
top_trader_score = 5
else:
if direction == "LONG":
if top_trader_position >= 0.55:
top_trader_score = 10
elif top_trader_position <= 0.45:
top_trader_score = 0
else:
top_trader_score = 5
else:
if top_trader_position <= 0.45:
top_trader_score = 10
elif top_trader_position >= 0.55:
top_trader_score = 0
else:
top_trader_score = 5
crowding_score = ls_score + top_trader_score
# 3) 环境层15分
oi_change = to_float(self.market_indicators.get("open_interest_hist"))
if oi_change is None:
environment_score = 10
elif oi_change >= 0.03:
environment_score = 15
elif oi_change > 0:
environment_score = 10
else:
environment_score = 5
# 4) 确认层15分
confirmation_score = 15 if ((direction == "LONG" and cvd_fast > 0 and cvd_mid > 0) or (direction == "SHORT" and cvd_fast < 0 and cvd_mid < 0)) else 0
# 5) 辅助层5分
coinbase_premium = to_float(self.market_indicators.get("coinbase_premium"))
if coinbase_premium is None:
aux_score = 2
elif (direction == "LONG" and coinbase_premium > 0.0005) or (direction == "SHORT" and coinbase_premium < -0.0005):
aux_score = 5
elif abs(coinbase_premium) <= 0.0005:
aux_score = 2
else:
aux_score = 0
total_score = direction_score + accel_bonus + crowding_score + environment_score + confirmation_score + aux_score
result["score"] = total_score
result["direction"] = direction result["direction"] = direction
result["score"] = score result["factors"] = {
self.last_signal_ts = now_ms "direction": {
self.last_signal_dir = direction "score": direction_score,
"cvd_fast": 15 if ((direction == "LONG" and cvd_fast > 0) or (direction == "SHORT" and cvd_fast < 0)) else 0,
"cvd_mid": 15 if ((direction == "LONG" and cvd_mid > 0) or (direction == "SHORT" and cvd_mid < 0)) else 0,
"p99_flow": 15 if has_aligned_p99 else (10 if not has_adverse_p99 else 0),
"accel_bonus": accel_bonus,
},
"crowding": {"score": crowding_score, "long_short_ratio": ls_score, "top_trader_position": top_trader_score},
"environment": {"score": environment_score, "open_interest_hist": oi_change},
"confirmation": {"score": confirmation_score},
"auxiliary": {"score": aux_score, "coinbase_premium": coinbase_premium},
}
if total_score >= 85:
result["signal"] = direction
result["tier"] = "heavy"
elif total_score >= 75:
result["signal"] = direction
result["tier"] = "standard"
elif total_score >= 60:
result["signal"] = direction
result["tier"] = "light"
else:
result["signal"] = None
result["tier"] = None
if result["signal"]:
self.last_signal_ts = now_ms
self.last_signal_dir = direction
return result return result
@ -333,6 +456,7 @@ def main():
for t in new_trades: for t in new_trades:
state.process_trade(t["agg_id"], t["time_ms"], t["price"], t["qty"], t["is_buyer_maker"]) state.process_trade(t["agg_id"], t["time_ms"], t["price"], t["qty"], t["is_buyer_maker"])
state.market_indicators = fetch_market_indicators(sym)
result = state.evaluate_signal(now_ms) result = state.evaluate_signal(now_ms)
save_indicator(now_ms, sym, result) save_indicator(now_ms, sym, result)
@ -347,8 +471,11 @@ def main():
cycle += 1 cycle += 1
if cycle % 60 == 0: if cycle % 60 == 0:
for sym, state in states.items(): for sym, state in states.items():
r = state.evaluate_signal(now_ms) logger.info(
logger.info(f"[{sym}] 状态: CVD_fast={r['cvd_fast']:.1f} CVD_mid={r['cvd_mid']:.1f} ATR={r['atr']:.2f}({r['atr_pct']:.0f}%) VWAP={r['vwap']:.1f}") f"[{sym}] 状态: CVD_fast={state.win_fast.cvd:.1f} "
f"CVD_mid={state.win_mid.cvd:.1f} ATR={state.atr_calc.atr:.2f} "
f"({state.atr_calc.atr_percentile:.0f}%) VWAP={state.win_vwap.vwap:.1f}"
)
except Exception as e: except Exception as e:
logger.error(f"循环异常: {e}", exc_info=True) logger.error(f"循环异常: {e}", exc_info=True)