""" strategy_scoring.py — V5 策略工厂统一评分逻辑 从原来的 signal_engine.py 中拆分出的 V5 评分与 Gate 逻辑: - evaluate_factory_strategy(state, now_ms, strategy_cfg, snapshot) → 单条策略(含工厂产出的 custom_*)的核心评分逻辑 - evaluate_signal(state, now_ms, strategy_cfg, snapshot) → 对外统一入口(仅支持 v53*/custom_*) 由 signal_engine / backtest 调用。V5.1/V5.2(v51_baseline / v52_8signals) 已在此模块中下线:若仍传入旧策略名,将返回空结果并打印 warning。 """ import logging from typing import Any, Optional from signal_state import SymbolState, to_float logger = logging.getLogger("strategy-scoring") def evaluate_factory_strategy( state: SymbolState, now_ms: int, strategy_cfg: dict, snapshot: Optional[dict] = None, ) -> dict: """ V5 策略工厂统一评分(BTC/ETH/XRP/SOL + custom_*) - 输入:动态 CVD 窗口 + 五门参数 + OI/拥挤/辅助指标 - 输出:score / signal / tier + 详细 factors - 支持:单币种策略(symbol)、方向限制(long_only/short_only/both)、自定义四层权重 """ strategy_name = strategy_cfg.get("name", "v53") strategy_threshold = int(strategy_cfg.get("threshold", 75)) flip_threshold = int(strategy_cfg.get("flip_threshold", 85)) # per-strategy 方向约束(long_only / short_only / both) dir_cfg_raw = (strategy_cfg.get("direction") or "both").lower() # 兼容策略工厂的 long_only / short_only 配置 if dir_cfg_raw in ("long_only", "only_long"): dir_cfg_raw = "long" elif dir_cfg_raw in ("short_only", "only_short"): dir_cfg_raw = "short" if dir_cfg_raw not in ("long", "short", "both"): dir_cfg_raw = "both" snap = snapshot or state.build_evaluation_snapshot(now_ms) # 按策略配置的 cvd_fast_window / cvd_slow_window 动态切片重算 CVD cvd_fast_window = strategy_cfg.get("cvd_fast_window", "30m") cvd_slow_window = strategy_cfg.get("cvd_slow_window", "4h") def _window_ms(code: str) -> int: if not isinstance(code, str) or len(code) < 2: return 30 * 60 * 1000 unit = code[-1] try: val = int(code[:-1]) except ValueError: return 30 * 60 * 1000 if unit == "m": return val * 60 * 1000 if unit == "h": return val * 3600 * 1000 return 30 * 60 * 1000 fast_ms = _window_ms(cvd_fast_window) slow_ms = _window_ms(cvd_slow_window) if cvd_fast_window == "30m" and cvd_slow_window == "4h": cvd_fast = snap["cvd_fast"] cvd_mid = snap["cvd_mid"] else: cutoff_fast = now_ms - fast_ms cutoff_slow = now_ms - slow_ms buy_f = sell_f = buy_m = sell_m = 0.0 src_fast = state.win_mid if fast_ms > state.win_fast.window_ms else state.win_fast for t_ms, qty, _price, ibm in src_fast.trades: if t_ms >= cutoff_fast: if ibm == 0: buy_f += qty else: sell_f += qty for t_ms, qty, _price, ibm in state.win_mid.trades: if t_ms >= cutoff_slow: if ibm == 0: buy_m += qty else: sell_m += qty cvd_fast = buy_f - sell_f cvd_mid = buy_m - sell_m price = snap["price"] atr = snap["atr"] atr_value = snap.get("atr_value", atr) cvd_fast_accel = snap["cvd_fast_accel"] environment_score_raw = snap["environment_score"] # 默认 result 零值(基础字段从 snapshot 填充,以兼容 save_indicator/save_feature_event) # 注意:cvd_fast/cvd_mid 会在后面覆盖为「按策略窗口重算」后的值, # 这里先用 snapshot 保证字段存在。 result = { "strategy": strategy_name, "cvd_fast": snap["cvd_fast"], "cvd_mid": snap["cvd_mid"], "cvd_day": snap["cvd_day"], "cvd_fast_slope": snap["cvd_fast_slope"], "atr": atr, "atr_value": atr_value, "atr_pct": snap["atr_pct"], "vwap": snap["vwap"], "price": price, "p95": snap["p95"], "p99": snap["p99"], "signal": None, "direction": None, "score": 0.0, "tier": None, "factors": {}, } if state.warmup or price == 0 or atr == 0: return result last_signal_ts = state.last_signal_ts.get(strategy_name, 0) COOLDOWN_MS = 10 * 60 * 1000 in_cooldown = now_ms - last_signal_ts < COOLDOWN_MS # ── 五门参数:优先读 DB config(V5.4),fallback 到 JSON symbol_gates ──── db_gates = strategy_cfg.get("gates") or {} symbol_gates = (strategy_cfg.get("symbol_gates") or {}).get(state.symbol, {}) gate_vol_enabled = db_gates.get("vol", {}).get("enabled", True) min_vol = float( db_gates.get("vol", {}).get("vol_atr_pct_min") or symbol_gates.get("min_vol_threshold", 0.002) ) gate_cvd_enabled = db_gates.get("cvd", {}).get("enabled", True) gate_whale_enabled = db_gates.get("whale", {}).get("enabled", True) whale_usd = float( db_gates.get("whale", {}).get("whale_usd_threshold") or symbol_gates.get("whale_threshold_usd", 50000) ) whale_flow_pct = float( db_gates.get("whale", {}).get("whale_flow_pct") or symbol_gates.get("whale_flow_threshold_pct", 0.5) ) gate_obi_enabled = db_gates.get("obi", {}).get("enabled", True) obi_veto = float( db_gates.get("obi", {}).get("threshold") or symbol_gates.get("obi_veto_threshold", 0.35) ) gate_spd_enabled = db_gates.get("spot_perp", {}).get("enabled", False) spd_veto = float( db_gates.get("spot_perp", {}).get("threshold") or symbol_gates.get("spot_perp_divergence_veto", 0.005) ) # 覆盖为按策略窗口重算后的 CVD(用于 signal_indicators 展示) result["cvd_fast"] = cvd_fast result["cvd_mid"] = cvd_mid gate_block = None # 门1:波动率下限(可关闭) atr_pct_price = atr / price if price > 0 else 0 # 市场状态(供复盘/优化使用,不直接改变默认策略行为) regime = "range" if atr_pct_price >= 0.012: regime = "crash" elif atr_pct_price >= 0.008: regime = "high_vol" elif abs(cvd_fast_accel) > 0 and abs(cvd_fast) > 0 and abs(cvd_mid) > 0: same_dir = (cvd_fast > 0 and cvd_mid > 0) or (cvd_fast < 0 and cvd_mid < 0) if same_dir and abs(cvd_fast_accel) > 10: regime = "trend" if gate_vol_enabled and atr_pct_price < min_vol: gate_block = f"low_vol({atr_pct_price:.4f}<{min_vol})" # 门2:CVD 共振(方向门,可关闭) no_direction = False cvd_resonance = 0 if cvd_fast > 0 and cvd_mid > 0: direction = "LONG" cvd_resonance = 30 no_direction = False elif cvd_fast < 0 and cvd_mid < 0: direction = "SHORT" cvd_resonance = 30 no_direction = False else: direction = "LONG" if cvd_fast > 0 else "SHORT" cvd_resonance = 0 if gate_cvd_enabled: no_direction = True if not gate_block: gate_block = "no_direction_consensus" else: no_direction = False # per-strategy 方向限制:long/short 仅限制开仓方向,不影响评分与指标快照 if dir_cfg_raw == "long" and direction == "SHORT": strategy_direction_allowed = False elif dir_cfg_raw == "short" and direction == "LONG": strategy_direction_allowed = False else: strategy_direction_allowed = True # 门3:鲸鱼否决(BTC 用 whale_cvd_ratio,ALT 用大单对立,可关闭) if gate_whale_enabled and not gate_block and not no_direction: if state.symbol == "BTCUSDT": whale_cvd = ( state.whale_cvd_ratio if state._whale_trades else to_float(state.market_indicators.get("tiered_cvd_whale")) or 0.0 ) if (direction == "LONG" and whale_cvd < -whale_flow_pct) or ( direction == "SHORT" and whale_cvd > whale_flow_pct ): gate_block = f"whale_cvd_veto({whale_cvd:.3f})" else: whale_adverse = any( (direction == "LONG" and lt[2] == 1 and lt[1] * price >= whale_usd) or (direction == "SHORT" and lt[2] == 0 and lt[1] * price >= whale_usd) for lt in state.recent_large_trades ) whale_aligned = any( (direction == "LONG" and lt[2] == 0 and lt[1] * price >= whale_usd) or (direction == "SHORT" and lt[2] == 1 and lt[1] * price >= whale_usd) for lt in state.recent_large_trades ) if whale_adverse and not whale_aligned: gate_block = f"whale_adverse(>${whale_usd/1000:.0f}k)" # 门4:OBI 否决(实时 WS 优先,fallback DB,可关闭) obi_raw = state.rt_obi if state.rt_obi != 0.0 else to_float( state.market_indicators.get("obi_depth_10") ) if gate_obi_enabled and not gate_block and not no_direction and obi_raw is not None: if direction == "LONG" and obi_raw < -obi_veto: gate_block = f"obi_veto({obi_raw:.3f}<-{obi_veto})" elif direction == "SHORT" and obi_raw > obi_veto: gate_block = f"obi_veto({obi_raw:.3f}>{obi_veto})" # 门5:期现背离否决(实时 WS 优先,fallback DB,可关闭) spot_perp_div = ( state.rt_spot_perp_div if state.rt_spot_perp_div != 0.0 else to_float( state.market_indicators.get("spot_perp_divergence") ) ) if gate_spd_enabled and not gate_block and not no_direction and spot_perp_div is not None: if (direction == "LONG" and spot_perp_div < -spd_veto) or ( direction == "SHORT" and spot_perp_div > spd_veto ): gate_block = f"spd_veto({spot_perp_div:.4f})" gate_passed = gate_block is None # ── Direction Layer(55 分,原始尺度)─────────────────────── has_adverse_p99 = any( (direction == "LONG" and lt[2] == 1) or (direction == "SHORT" and lt[2] == 0) for lt in state.recent_large_trades ) has_aligned_p99 = any( (direction == "LONG" and lt[2] == 0) or (direction == "SHORT" and lt[2] == 1) for lt in state.recent_large_trades ) p99_flow = 20 if has_aligned_p99 else (10 if not has_adverse_p99 else 0) accel_bonus = 5 if ( (direction == "LONG" and cvd_fast_accel > 0) or (direction == "SHORT" and cvd_fast_accel < 0) ) else 0 # v53_fast:accel 独立触发路径(不要求 cvd 双线同向) is_fast = strategy_name.endswith("fast") accel_independent_score = 0 if is_fast and not no_direction: accel_cfg = strategy_cfg.get("accel_independent", {}) if accel_cfg.get("enabled", False): accel_strong = ( (direction == "LONG" and cvd_fast_accel > 0 and has_aligned_p99) or (direction == "SHORT" and cvd_fast_accel < 0 and has_aligned_p99) ) if accel_strong: accel_independent_score = int( accel_cfg.get("min_direction_score", 35) ) direction_score = max( min(cvd_resonance + p99_flow + accel_bonus, 55), accel_independent_score, ) # ── Crowding Layer(25 分,原始尺度)─────────────────────── long_short_ratio = to_float(state.market_indicators.get("long_short_ratio")) if long_short_ratio is None: ls_score = 7 elif (direction == "SHORT" and long_short_ratio > 2.0) or ( direction == "LONG" and long_short_ratio < 0.5 ): ls_score = 15 elif (direction == "SHORT" and long_short_ratio > 1.5) or ( direction == "LONG" and long_short_ratio < 0.7 ): ls_score = 10 elif (direction == "SHORT" and long_short_ratio > 1.0) or ( direction == "LONG" and long_short_ratio < 1.0 ): ls_score = 7 else: ls_score = 0 top_trader_position = to_float(state.market_indicators.get("top_trader_position")) if top_trader_position is None: top_trader_score = 5 else: if direction == "LONG": top_trader_score = ( 10 if top_trader_position >= 0.55 else (0 if top_trader_position <= 0.45 else 5) ) else: top_trader_score = ( 10 if top_trader_position <= 0.45 else (0 if top_trader_position >= 0.55 else 5) ) crowding_score = min(ls_score + top_trader_score, 25) # ── Environment Layer(15 分,原始尺度)──────────────────── oi_base_score = round(environment_score_raw / 15 * 10) obi_raw = state.rt_obi if state.rt_obi != 0.0 else to_float( state.market_indicators.get("obi_depth_10") ) obi_bonus = 0 if is_fast and obi_raw is not None: obi_cfg = strategy_cfg.get("obi_scoring", {}) strong_thr = float(obi_cfg.get("strong_threshold", 0.30)) weak_thr = float(obi_cfg.get("weak_threshold", 0.15)) strong_sc = int(obi_cfg.get("strong_score", 5)) weak_sc = int(obi_cfg.get("weak_score", 3)) obi_aligned = (direction == "LONG" and obi_raw > 0) or ( direction == "SHORT" and obi_raw < 0 ) obi_abs = abs(obi_raw) if obi_aligned: if obi_abs >= strong_thr: obi_bonus = strong_sc elif obi_abs >= weak_thr: obi_bonus = weak_sc environment_score = ( min(oi_base_score + obi_bonus, 15) if is_fast else round(environment_score_raw / 15 * 15) ) # ── Auxiliary Layer(5 分,原始尺度)────────────────────── coinbase_premium = to_float(state.market_indicators.get("coinbase_premium")) if coinbase_premium is None: aux_score = 2 elif ( (direction == "LONG" and coinbase_premium > 0.0005) or (direction == "SHORT" and coinbase_premium < -0.0005) ): aux_score = 5 elif abs(coinbase_premium) <= 0.0005: aux_score = 2 else: aux_score = 0 # ── 根据策略权重缩放四层分数(direction/env/aux/momentum)──── weights_cfg = strategy_cfg.get("weights") or {} w_dir = float(weights_cfg.get("direction", 55)) w_env = float(weights_cfg.get("env", 25)) w_aux = float(weights_cfg.get("aux", 15)) w_mom = float(weights_cfg.get("momentum", 5)) total_w = w_dir + w_env + w_aux + w_mom if total_w <= 0: # Fallback 到默认 55/25/15/5 w_dir, w_env, w_aux, w_mom = 55.0, 25.0, 15.0, 5.0 total_w = 100.0 # 归一化到 100 分制 norm = 100.0 / total_w w_dir_eff = (w_dir + w_mom) * norm # 动量权重并入方向层 w_env_eff = w_env * norm w_aux_eff = w_aux * norm # 原始最大值:direction 55 + crowding 25 = 80 DIR_RAW_MAX = 55.0 CROWD_RAW_MAX = 25.0 ENV_RAW_MAX = 15.0 AUX_RAW_MAX = 5.0 DIR_PLUS_CROWD_RAW_MAX = DIR_RAW_MAX + CROWD_RAW_MAX # 把方向+拥挤总权重按 55:25 拆分 dir_max_scaled = w_dir_eff * (DIR_RAW_MAX / DIR_PLUS_CROWD_RAW_MAX) crowd_max_scaled = w_dir_eff * (CROWD_RAW_MAX / DIR_PLUS_CROWD_RAW_MAX) env_max_scaled = w_env_eff aux_max_scaled = w_aux_eff # 按原始分数比例缩放到新的权重上 def _scale(raw_score: float, raw_max: float, scaled_max: float) -> float: if raw_max <= 0 or scaled_max <= 0: return 0.0 return min(max(raw_score, 0) / raw_max * scaled_max, scaled_max) direction_score_scaled = _scale(direction_score, DIR_RAW_MAX, dir_max_scaled) crowding_score_scaled = _scale(crowding_score, CROWD_RAW_MAX, crowd_max_scaled) environment_score_scaled = _scale(environment_score, ENV_RAW_MAX, env_max_scaled) aux_score_scaled = _scale(aux_score, AUX_RAW_MAX, aux_max_scaled) total_score = min( direction_score_scaled + crowding_score_scaled + environment_score_scaled + aux_score_scaled, 100, ) total_score = max(0, round(total_score, 1)) if not gate_passed: total_score = 0 whale_cvd_display = ( state.whale_cvd_ratio if state._whale_trades else to_float(state.market_indicators.get("tiered_cvd_whale")) ) if state.symbol == "BTCUSDT" else None result.update( { "score": total_score, "direction": direction if (not no_direction and gate_passed) else None, "atr_value": atr_value, "cvd_fast_5m": cvd_fast if is_fast else None, "factors": { "track": "BTC" if state.symbol == "BTCUSDT" else "ALT", "regime": regime, "gate_passed": gate_passed, "gate_block": gate_block, "atr_pct_price": round(atr_pct_price, 5), "obi_raw": obi_raw, "spot_perp_div": spot_perp_div, "whale_cvd_ratio": whale_cvd_display, "direction": { "score": round(direction_score_scaled, 2), "max": round(dir_max_scaled, 2), "raw_score": direction_score, "raw_max": DIR_RAW_MAX, "cvd_resonance": cvd_resonance, "p99_flow": p99_flow, "accel_bonus": accel_bonus, }, "crowding": { "score": round(crowding_score_scaled, 2), "max": round(crowd_max_scaled, 2), "raw_score": crowding_score, "raw_max": CROWD_RAW_MAX, }, "environment": { "score": round(environment_score_scaled, 2), "max": round(env_max_scaled, 2), "raw_score": environment_score, "raw_max": ENV_RAW_MAX, "oi_change": snap["oi_change"], }, "auxiliary": { "score": round(aux_score_scaled, 2), "max": round(aux_max_scaled, 2), "raw_score": aux_score, "raw_max": AUX_RAW_MAX, "coinbase_premium": coinbase_premium, }, }, } ) # 赋值 tier/signal(和原逻辑一致) if total_score >= strategy_threshold and gate_passed and strategy_direction_allowed: result["signal"] = direction # tier 简化:score >= flip_threshold → heavy;否则 standard result["tier"] = "heavy" if total_score >= flip_threshold else "standard" state.last_signal_ts[strategy_name] = now_ms state.last_signal_dir[strategy_name] = direction return result def _empty_result(strategy_name: str, snap: dict) -> dict: """返回空评分结果(symbol 不匹配 / 无信号时使用)""" return { "strategy": strategy_name, "cvd_fast": snap["cvd_fast"], "cvd_mid": snap["cvd_mid"], "cvd_day": snap["cvd_day"], "cvd_fast_slope": snap["cvd_fast_slope"], "atr": snap["atr"], "atr_value": snap.get("atr_value", snap["atr"]), "atr_pct": snap["atr_pct"], "vwap": snap["vwap"], "price": snap["price"], "p95": snap["p95"], "p99": snap["p99"], "signal": None, "direction": None, "score": 0, "tier": None, "factors": {}, } def evaluate_signal( state: SymbolState, now_ms: int, strategy_cfg: Optional[dict] = None, snapshot: Optional[dict] = None, ) -> dict: """ 统一评分入口: - v53*/custom_* → evaluate_factory_strategy (V5.3/V5.4 策略工厂) - 其他策略(v51_baseline/v52_8signals 等)→ 视为已下线,返回空结果并记录 warning """ strategy_cfg = strategy_cfg or {} strategy_name = strategy_cfg.get("name", "v53") # v53 / custom_* 策略:走统一 V5 工厂打分 if strategy_name.startswith("v53") or strategy_name.startswith("custom_"): snap = snapshot or state.build_evaluation_snapshot(now_ms) # 单币种策略:如 cfg.symbol 存在,仅在该 symbol 上有效 strategy_symbol = strategy_cfg.get("symbol") if strategy_symbol and strategy_symbol != state.symbol: return _empty_result(strategy_name, snap) allowed_symbols = strategy_cfg.get("symbols", []) if allowed_symbols and state.symbol not in allowed_symbols: return _empty_result(strategy_name, snap) # 直接复用工厂评分核心逻辑,并确保基础字段完整 result = evaluate_factory_strategy(state, now_ms, strategy_cfg, snap) # 补充缺失的基础字段(以 snapshot 为准) base = _empty_result(strategy_name, snap) for k, v in base.items(): result.setdefault(k, v) return result # 非 v53/custom_ 策略:视为已下线,返回空结果并记录 warning snap = snapshot or state.build_evaluation_snapshot(now_ms) logger.warning( "[strategy_scoring] strategy '%s' 已下线 (仅支持 v53*/custom_*), 返回空结果", strategy_name, ) return _empty_result(strategy_name, snap)