diff --git a/backend/signal_engine.py b/backend/signal_engine.py index 6ea5c20..4c44ac7 100644 --- a/backend/signal_engine.py +++ b/backend/signal_engine.py @@ -70,6 +70,96 @@ def load_strategy_configs() -> list[dict]: ) return configs + +def load_strategy_configs_from_db() -> list[dict]: + """ + V5.4: 从 strategies 表读取 running 状态的策略配置。 + 把 DB 字段映射成现有 JSON 格式(保持与 JSON 文件完全兼容)。 + 失败时返回空列表,调用方应 fallback 到 JSON。 + 内存安全:每次读取只返回配置列表,无缓存,无大对象。 + """ + try: + with get_sync_conn() as conn: + with conn.cursor() as cur: + cur.execute(""" + SELECT + strategy_id::text, display_name, symbol, + cvd_fast_window, cvd_slow_window, + weight_direction, weight_env, weight_aux, weight_momentum, + entry_score, + gate_obi_enabled, obi_threshold, + gate_whale_enabled, whale_cvd_threshold, + gate_vol_enabled, atr_percentile_min, + gate_spot_perp_enabled, spot_perp_threshold, + sl_atr_multiplier, tp1_ratio, tp2_ratio, + timeout_minutes, flip_threshold, direction + FROM strategies + WHERE status = 'running' + ORDER BY created_at ASC + """) + rows = cur.fetchall() + + configs = [] + for row in rows: + (sid, display_name, symbol, + cvd_fast, cvd_slow, + w_dir, w_env, w_aux, w_mom, + entry_score, + gate_obi, obi_thr, + gate_whale, whale_thr, + gate_vol, atr_pct_min, + gate_spot, spot_thr, + sl_mult, tp1_r, tp2_r, + timeout_min, flip_thr, direction) = row + + # 把 display_name 映射回 legacy strategy name(用于兼容评分逻辑) + # legacy 策略用固定 UUID 识别 + LEGACY_UUID_MAP = { + "00000000-0000-0000-0000-000000000053": "v53", + "00000000-0000-0000-0000-000000000054": "v53_middle", + "00000000-0000-0000-0000-000000000055": "v53_fast", + } + strategy_name = LEGACY_UUID_MAP.get(sid, f"custom_{sid[:8]}") + + # 构造与 JSON 文件格式兼容的配置 dict + cfg = { + "name": strategy_name, + "strategy_id": sid, # V5.4 新增:用于写 strategy_id 到 DB + "strategy_name_snapshot": display_name, # V5.4 新增:写入时快照名称 + "symbol": symbol, + "direction": direction, + "cvd_fast_window": cvd_fast, + "cvd_slow_window": cvd_slow, + "threshold": entry_score, + "weights": { + "direction": w_dir, + "env": w_env, + "aux": w_aux, + "momentum": w_mom, + }, + "gates": { + "obi": {"enabled": gate_obi, "threshold": obi_thr}, + "whale": {"enabled": gate_whale, "threshold": whale_thr}, + "vol": {"enabled": gate_vol, "atr_percentile_min": atr_pct_min}, + "spot_perp": {"enabled": gate_spot, "threshold": spot_thr}, + }, + "tp_sl": { + "sl_multiplier": sl_mult, + "tp1_multiplier": tp1_r, + "tp2_multiplier": tp2_r, + }, + "timeout_minutes": timeout_min, + "flip_threshold": flip_thr, + } + configs.append(cfg) + + logger.info(f"[DB] 已加载 {len(configs)} 个策略配置: {[c['name'] for c in configs]}") + return configs + + except Exception as e: + logger.warning(f"[DB] load_strategy_configs_from_db 失败,将 fallback 到 JSON: {e}") + return [] + # ─── 模拟盘配置 ─────────────────────────────────────────────────── PAPER_TRADING_ENABLED = False # 总开关(兼容旧逻辑) PAPER_ENABLED_STRATEGIES = [] # 分策略开关: ["v51_baseline", "v52_8signals"] @@ -1018,7 +1108,8 @@ def fetch_new_trades(symbol: str, last_id: int) -> list: for r in cur.fetchall()] -def save_indicator(ts: int, symbol: str, result: dict, strategy: str = "v52_8signals"): +def save_indicator(ts: int, symbol: str, result: dict, strategy: str = "v52_8signals", + strategy_id: Optional[str] = None, strategy_name_snapshot: Optional[str] = None): with get_sync_conn() as conn: with conn.cursor() as cur: import json as _json3 @@ -1026,13 +1117,13 @@ def save_indicator(ts: int, symbol: str, result: dict, strategy: str = "v52_8sig cvd_fast_5m = result.get("cvd_fast_5m") # v53_fast 专用:5m窗口CVD,其他策略为None cur.execute( "INSERT INTO signal_indicators " - "(ts,symbol,strategy,cvd_fast,cvd_mid,cvd_day,cvd_fast_slope,atr_5m,atr_percentile,atr_value,vwap_30m,price,p95_qty,p99_qty,score,signal,factors,cvd_fast_5m) " - "VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", + "(ts,symbol,strategy,cvd_fast,cvd_mid,cvd_day,cvd_fast_slope,atr_5m,atr_percentile,atr_value,vwap_30m,price,p95_qty,p99_qty,score,signal,factors,cvd_fast_5m,strategy_id,strategy_name_snapshot) " + "VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", (ts, symbol, strategy, result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["cvd_fast_slope"], result["atr"], result["atr_pct"], result.get("atr_value", result["atr"]), result["vwap"], result["price"], result["p95"], result["p99"], result["score"], result.get("signal"), factors_json, - cvd_fast_5m) + cvd_fast_5m, strategy_id, strategy_name_snapshot) ) # 有信号时通知live_executor if result.get("signal"): @@ -1128,6 +1219,8 @@ def paper_open_trade( factors: dict = None, strategy: str = "v51_baseline", tp_sl: Optional[dict] = None, + strategy_id: Optional[str] = None, + strategy_name_snapshot: Optional[str] = None, ): """模拟开仓""" import json as _json3 @@ -1158,8 +1251,8 @@ def paper_open_trade( with get_sync_conn() as conn: with conn.cursor() as cur: cur.execute( - "INSERT INTO paper_trades (symbol,direction,score,tier,entry_price,entry_ts,tp1_price,tp2_price,sl_price,atr_at_entry,score_factors,strategy,risk_distance) " - "VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", + "INSERT INTO paper_trades (symbol,direction,score,tier,entry_price,entry_ts,tp1_price,tp2_price,sl_price,atr_at_entry,score_factors,strategy,risk_distance,strategy_id,strategy_name_snapshot) " + "VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", ( symbol, direction, @@ -1174,6 +1267,8 @@ def paper_open_trade( _json3.dumps(factors) if factors else None, strategy, risk_distance, + strategy_id, + strategy_name_snapshot, ), ) conn.commit() @@ -1370,7 +1465,11 @@ def start_realtime_ws(states: dict): def main(): init_schema() - strategy_configs = load_strategy_configs() + # V5.4: 优先从 DB 加载策略配置,失败 fallback 到 JSON + strategy_configs = load_strategy_configs_from_db() + if not strategy_configs: + logger.warning("[DB] 未能从 DB 加载策略配置,使用 JSON fallback") + strategy_configs = load_strategy_configs() strategy_names = [cfg.get("name", "unknown") for cfg in strategy_configs] logger.info(f"已加载策略配置: {', '.join(strategy_names)}") primary_strategy_name = "v52_8signals" if any(cfg.get("name") == "v52_8signals" for cfg in strategy_configs) else strategy_names[0] @@ -1409,7 +1508,10 @@ def main(): # 每个策略独立存储indicator for strategy_cfg, strategy_result in strategy_results: sname = strategy_cfg.get("name", "v51_baseline") - save_indicator(now_ms, sym, strategy_result, strategy=sname) + sid = strategy_cfg.get("strategy_id") + ssnap = strategy_cfg.get("strategy_name_snapshot") + save_indicator(now_ms, sym, strategy_result, strategy=sname, + strategy_id=sid, strategy_name_snapshot=ssnap) save_feature_event(now_ms, sym, strategy_result, strategy=sname) # 1m表仍用primary(图表用) @@ -1463,6 +1565,8 @@ def main(): factors=result.get("factors"), strategy=strategy_name, tp_sl=strategy_cfg.get("tp_sl"), + strategy_id=strategy_cfg.get("strategy_id"), + strategy_name_snapshot=strategy_cfg.get("strategy_name_snapshot"), ) # 模拟盘持仓检查由paper_monitor.py通过WebSocket实时处理,这里不再检查 @@ -1476,7 +1580,12 @@ def main(): if cycle % 10 == 0: old_strategies = list(PAPER_ENABLED_STRATEGIES) load_paper_config() - strategy_configs = load_strategy_configs() # A1: 热重载权重/阈值/TP/SL + # V5.4: 热重载优先读 DB,失败 fallback 到 JSON + new_configs = load_strategy_configs_from_db() + if new_configs: + strategy_configs = new_configs + else: + strategy_configs = load_strategy_configs() # A1: 热重载权重/阈值/TP/SL strategy_names = [cfg.get("name", "unknown") for cfg in strategy_configs] primary_strategy_name = "v52_8signals" if any(cfg.get("name") == "v52_8signals" for cfg in strategy_configs) else strategy_names[0] if list(PAPER_ENABLED_STRATEGIES) != old_strategies: