From 5c38a2f9bfe0e09cc8fcec7f30ec05cd4f3dca00 Mon Sep 17 00:00:00 2001 From: root Date: Sat, 28 Feb 2026 05:24:16 +0000 Subject: [PATCH] feat: upgrade signal engine to V5.1 layered scoring --- backend/signal_engine.py | 163 ++++++++++++++++++++++++++++++++++----- 1 file changed, 145 insertions(+), 18 deletions(-) diff --git a/backend/signal_engine.py b/backend/signal_engine.py index 6e71706..bf23c31 100644 --- a/backend/signal_engine.py +++ b/backend/signal_engine.py @@ -20,7 +20,7 @@ import os import time from collections import deque from datetime import datetime, timezone -from typing import Optional +from typing import Any, Optional from db import get_sync_conn, init_schema @@ -51,6 +51,28 @@ ATR_LENGTH = 14 COOLDOWN_MS = 10 * 60 * 1000 +def fetch_market_indicators(symbol: str) -> dict: + """从PG读取最新的market_indicators数据""" + with get_sync_conn() as conn: + with conn.cursor() as cur: + indicators = {} + for ind_type in ["long_short_ratio", "top_trader_position", "open_interest_hist", "coinbase_premium"]: + cur.execute( + "SELECT value FROM market_indicators WHERE symbol=%s AND indicator_type=%s ORDER BY timestamp_ms DESC LIMIT 1", + (symbol, ind_type), + ) + row = cur.fetchone() + indicators[ind_type] = row[0] if row else None + return indicators + + +def to_float(value: Any) -> Optional[float]: + try: + return float(value) if value is not None else None + except (TypeError, ValueError): + return None + + # ─── 滚动窗口 ─────────────────────────────────────────────────── class TradeWindow: @@ -154,6 +176,8 @@ class SymbolState: self.last_processed_id = 0 self.warmup = True self.prev_cvd_fast = 0.0 + self.prev_cvd_fast_slope = 0.0 + self.market_indicators = fetch_market_indicators(symbol) self.last_signal_ts = 0 self.last_signal_dir = "" self.recent_large_trades: deque = deque() @@ -202,13 +226,17 @@ class SymbolState: self.update_large_trades(now_ms, p99) price = vwap if vwap > 0 else 0 cvd_fast_slope = cvd_fast - self.prev_cvd_fast + cvd_fast_accel = cvd_fast_slope - self.prev_cvd_fast_slope self.prev_cvd_fast = cvd_fast + self.prev_cvd_fast_slope = cvd_fast_slope result = { "cvd_fast": cvd_fast, "cvd_mid": cvd_mid, "cvd_day": self.win_day.cvd, "cvd_fast_slope": cvd_fast_slope, "atr": atr, "atr_pct": atr_pct, "vwap": vwap, "price": price, "p95": p95, "p99": p99, "signal": None, "direction": None, "score": 0, + "tier": None, + "factors": {}, } if self.warmup or price == 0 or atr == 0: @@ -216,28 +244,123 @@ class SymbolState: if now_ms - self.last_signal_ts < COOLDOWN_MS: return result - long_core = cvd_fast > 0 and cvd_fast_slope > 0 and cvd_mid > 0 and price > vwap - short_core = cvd_fast < 0 and cvd_fast_slope < 0 and cvd_mid < 0 and price < vwap - - if not long_core and not short_core: + if cvd_fast > 0 and cvd_mid > 0: + direction = "LONG" + elif cvd_fast < 0 and cvd_mid < 0: + direction = "SHORT" + else: return result - direction = "LONG" if long_core else "SHORT" - score = 0 - if atr_pct > 60: - score += 25 - has_adverse = any( + # V5.1 五层评分体系(总分100,方向层可因加速额外+5) + # 1) 方向层(45分 + 5加速分) + direction_score = 0 + if (direction == "LONG" and cvd_fast > 0) or (direction == "SHORT" and cvd_fast < 0): + direction_score += 15 + if (direction == "LONG" and cvd_mid > 0) or (direction == "SHORT" and cvd_mid < 0): + direction_score += 15 + has_adverse_p99 = any( (direction == "LONG" and lt[2] == 1) or (direction == "SHORT" and lt[2] == 0) for lt in self.recent_large_trades ) - if not has_adverse: - score += 20 + has_aligned_p99 = any( + (direction == "LONG" and lt[2] == 0) or (direction == "SHORT" and lt[2] == 1) + for lt in self.recent_large_trades + ) + if has_aligned_p99: + direction_score += 15 + elif not has_adverse_p99: + direction_score += 10 + accel_bonus = 0 + if (direction == "LONG" and cvd_fast_accel > 0) or (direction == "SHORT" and cvd_fast_accel < 0): + accel_bonus = 5 - result["signal"] = direction + # 2) 拥挤层(20分)- market_indicators缺失时给中间分 + long_short_ratio = to_float(self.market_indicators.get("long_short_ratio")) + if long_short_ratio is None: + ls_score = 5 + elif (direction == "SHORT" and long_short_ratio > 2.0) or (direction == "LONG" and long_short_ratio < 0.5): + ls_score = 10 + else: + ls_score = 5 + + top_trader_position = to_float(self.market_indicators.get("top_trader_position")) + if top_trader_position is None: + top_trader_score = 5 + else: + if direction == "LONG": + if top_trader_position >= 0.55: + top_trader_score = 10 + elif top_trader_position <= 0.45: + top_trader_score = 0 + else: + top_trader_score = 5 + else: + if top_trader_position <= 0.45: + top_trader_score = 10 + elif top_trader_position >= 0.55: + top_trader_score = 0 + else: + top_trader_score = 5 + crowding_score = ls_score + top_trader_score + + # 3) 环境层(15分) + oi_change = to_float(self.market_indicators.get("open_interest_hist")) + if oi_change is None: + environment_score = 10 + elif oi_change >= 0.03: + environment_score = 15 + elif oi_change > 0: + environment_score = 10 + else: + environment_score = 5 + + # 4) 确认层(15分) + confirmation_score = 15 if ((direction == "LONG" and cvd_fast > 0 and cvd_mid > 0) or (direction == "SHORT" and cvd_fast < 0 and cvd_mid < 0)) else 0 + + # 5) 辅助层(5分) + coinbase_premium = to_float(self.market_indicators.get("coinbase_premium")) + if coinbase_premium is None: + aux_score = 2 + elif (direction == "LONG" and coinbase_premium > 0.0005) or (direction == "SHORT" and coinbase_premium < -0.0005): + aux_score = 5 + elif abs(coinbase_premium) <= 0.0005: + aux_score = 2 + else: + aux_score = 0 + + total_score = direction_score + accel_bonus + crowding_score + environment_score + confirmation_score + aux_score + result["score"] = total_score result["direction"] = direction - result["score"] = score - self.last_signal_ts = now_ms - self.last_signal_dir = direction + result["factors"] = { + "direction": { + "score": direction_score, + "cvd_fast": 15 if ((direction == "LONG" and cvd_fast > 0) or (direction == "SHORT" and cvd_fast < 0)) else 0, + "cvd_mid": 15 if ((direction == "LONG" and cvd_mid > 0) or (direction == "SHORT" and cvd_mid < 0)) else 0, + "p99_flow": 15 if has_aligned_p99 else (10 if not has_adverse_p99 else 0), + "accel_bonus": accel_bonus, + }, + "crowding": {"score": crowding_score, "long_short_ratio": ls_score, "top_trader_position": top_trader_score}, + "environment": {"score": environment_score, "open_interest_hist": oi_change}, + "confirmation": {"score": confirmation_score}, + "auxiliary": {"score": aux_score, "coinbase_premium": coinbase_premium}, + } + + if total_score >= 85: + result["signal"] = direction + result["tier"] = "heavy" + elif total_score >= 75: + result["signal"] = direction + result["tier"] = "standard" + elif total_score >= 60: + result["signal"] = direction + result["tier"] = "light" + else: + result["signal"] = None + result["tier"] = None + + if result["signal"]: + self.last_signal_ts = now_ms + self.last_signal_dir = direction return result @@ -333,6 +456,7 @@ def main(): for t in new_trades: state.process_trade(t["agg_id"], t["time_ms"], t["price"], t["qty"], t["is_buyer_maker"]) + state.market_indicators = fetch_market_indicators(sym) result = state.evaluate_signal(now_ms) save_indicator(now_ms, sym, result) @@ -347,8 +471,11 @@ def main(): cycle += 1 if cycle % 60 == 0: for sym, state in states.items(): - r = state.evaluate_signal(now_ms) - logger.info(f"[{sym}] 状态: CVD_fast={r['cvd_fast']:.1f} CVD_mid={r['cvd_mid']:.1f} ATR={r['atr']:.2f}({r['atr_pct']:.0f}%) VWAP={r['vwap']:.1f}") + logger.info( + f"[{sym}] 状态: CVD_fast={state.win_fast.cvd:.1f} " + f"CVD_mid={state.win_mid.cvd:.1f} ATR={state.atr_calc.atr:.2f} " + f"({state.atr_calc.atr_percentile:.0f}%) VWAP={state.win_vwap.vwap:.1f}" + ) except Exception as e: logger.error(f"循环异常: {e}", exc_info=True)