diff --git a/backend/signal_engine.py b/backend/signal_engine.py index cc7dde5..dff9345 100644 --- a/backend/signal_engine.py +++ b/backend/signal_engine.py @@ -18,6 +18,7 @@ signal_engine.py — V5 短线交易信号引擎(PostgreSQL版) import logging import os import time +import json from collections import deque from datetime import datetime, timezone from typing import Any, Optional @@ -36,6 +37,34 @@ logger = logging.getLogger("signal-engine") SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"] LOOP_INTERVAL = 15 # 秒(从5改15,CPU降60%,信号质量无影响) +STRATEGY_DIR = os.path.join(os.path.dirname(__file__), "strategies") +DEFAULT_STRATEGY_FILES = ["v51_baseline.json", "v52_8signals.json"] + + +def load_strategy_configs() -> list[dict]: + configs = [] + for filename in DEFAULT_STRATEGY_FILES: + path = os.path.join(STRATEGY_DIR, filename) + try: + with open(path, "r", encoding="utf-8") as f: + cfg = json.load(f) + if isinstance(cfg, dict) and cfg.get("name"): + configs.append(cfg) + except FileNotFoundError: + logger.warning(f"策略配置缺失: {path}") + except Exception as e: + logger.error(f"策略配置加载失败 {path}: {e}") + if not configs: + logger.warning("未加载到策略配置,回退到v51_baseline默认配置") + configs.append( + { + "name": "v51_baseline", + "threshold": 75, + "signals": ["cvd", "p99", "accel", "ls_ratio", "oi", "coinbase_premium"], + "tp_sl": {"sl_multiplier": 2.0, "tp1_multiplier": 1.5, "tp2_multiplier": 3.0}, + } + ) + return configs # ─── 模拟盘配置 ─────────────────────────────────────────────────── PAPER_TRADING_ENABLED = False # 开关(范总确认后通过API开启) @@ -85,7 +114,7 @@ def fetch_market_indicators(symbol: str) -> dict: with get_sync_conn() as conn: with conn.cursor() as cur: indicators = {} - for ind_type in ["long_short_ratio", "top_trader_position", "open_interest_hist", "coinbase_premium"]: + for ind_type in ["long_short_ratio", "top_trader_position", "open_interest_hist", "coinbase_premium", "funding_rate"]: cur.execute( "SELECT value FROM market_indicators WHERE symbol=%s AND indicator_type=%s ORDER BY timestamp_ms DESC LIMIT 1", (symbol, ind_type), @@ -111,6 +140,8 @@ def fetch_market_indicators(symbol: str) -> dict: indicators[ind_type] = float(val.get("sumOpenInterestValue", 0)) elif ind_type == "coinbase_premium": indicators[ind_type] = float(val.get("premium_pct", 0)) + elif ind_type == "funding_rate": + indicators[ind_type] = float(val.get("lastFundingRate", 0)) return indicators @@ -227,8 +258,8 @@ class SymbolState: self.prev_cvd_fast_slope = 0.0 self.prev_oi_value = 0.0 self.market_indicators = fetch_market_indicators(symbol) - self.last_signal_ts = 0 - self.last_signal_dir = "" + self.last_signal_ts: dict[str, int] = {} + self.last_signal_dir: dict[str, str] = {} self.recent_large_trades: deque = deque() def process_trade(self, agg_id: int, time_ms: int, price: float, qty: float, is_buyer_maker: int): @@ -268,9 +299,10 @@ class SymbolState: self.recent_large_trades.append((t[0], t[1], t[3])) seen.add(t[0]) - def evaluate_signal(self, now_ms: int) -> dict: + def build_evaluation_snapshot(self, now_ms: int) -> dict: cvd_fast = self.win_fast.cvd cvd_mid = self.win_mid.cvd + cvd_day = self.win_day.cvd vwap = self.win_vwap.vwap atr = self.atr_calc.atr atr_pct = self.atr_calc.atr_percentile @@ -282,11 +314,94 @@ class SymbolState: self.prev_cvd_fast = cvd_fast self.prev_cvd_fast_slope = cvd_fast_slope - result = { - "cvd_fast": cvd_fast, "cvd_mid": cvd_mid, "cvd_day": self.win_day.cvd, + oi_value = to_float(self.market_indicators.get("open_interest_hist")) + if oi_value is None or self.prev_oi_value == 0: + oi_change = 0.0 + environment_score = 10 + else: + oi_change = (oi_value - self.prev_oi_value) / self.prev_oi_value if self.prev_oi_value > 0 else 0.0 + if oi_change >= 0.03: + environment_score = 15 + elif oi_change > 0: + environment_score = 10 + else: + environment_score = 5 + if oi_value is not None and oi_value > 0: + self.prev_oi_value = oi_value + + return { + "cvd_fast": cvd_fast, + "cvd_mid": cvd_mid, + "cvd_day": cvd_day, + "vwap": vwap, + "atr": atr, + "atr_pct": atr_pct, + "p95": p95, + "p99": p99, + "price": price, "cvd_fast_slope": cvd_fast_slope, - "atr": atr, "atr_pct": atr_pct, "vwap": vwap, "price": price, - "p95": p95, "p99": p99, "signal": None, "direction": None, "score": 0, + "cvd_fast_accel": cvd_fast_accel, + "oi_change": oi_change, + "environment_score": environment_score, + "oi_value": oi_value, + } + + def fetch_recent_liquidations(self, window_ms: int = 300000): + """Fetch last 5min liquidation totals from liquidations table""" + now_ms = int(time.time() * 1000) + cutoff = now_ms - window_ms + with get_sync_conn() as conn: + with conn.cursor() as cur: + cur.execute( + """ + SELECT + COALESCE(SUM(CASE WHEN side='SELL' THEN usd_value ELSE 0 END), 0) as long_liq, + COALESCE(SUM(CASE WHEN side='BUY' THEN usd_value ELSE 0 END), 0) as short_liq + FROM liquidations + WHERE symbol=%s AND trade_time >= %s + """, + (self.symbol, cutoff), + ) + row = cur.fetchone() + if row: + return {"long_usd": row[0], "short_usd": row[1]} + return None + + def evaluate_signal(self, now_ms: int, strategy_cfg: Optional[dict] = None, snapshot: Optional[dict] = None) -> dict: + strategy_cfg = strategy_cfg or {} + strategy_name = strategy_cfg.get("name", "v51_baseline") + strategy_threshold = int(strategy_cfg.get("threshold", 75)) + enabled_signals = set(strategy_cfg.get("signals", [])) + + snap = snapshot or self.build_evaluation_snapshot(now_ms) + cvd_fast = snap["cvd_fast"] + cvd_mid = snap["cvd_mid"] + vwap = snap["vwap"] + atr = snap["atr"] + atr_pct = snap["atr_pct"] + p95 = snap["p95"] + p99 = snap["p99"] + price = snap["price"] + cvd_fast_slope = snap["cvd_fast_slope"] + cvd_fast_accel = snap["cvd_fast_accel"] + oi_change = snap["oi_change"] + environment_score = snap["environment_score"] + + result = { + "strategy": strategy_name, + "cvd_fast": cvd_fast, + "cvd_mid": cvd_mid, + "cvd_day": snap["cvd_day"], + "cvd_fast_slope": cvd_fast_slope, + "atr": atr, + "atr_pct": atr_pct, + "vwap": vwap, + "price": price, + "p95": p95, + "p99": p99, + "signal": None, + "direction": None, + "score": 0, "tier": None, "factors": {}, } @@ -296,7 +411,8 @@ class SymbolState: # 判断倾向方向(用于评分展示,即使冷却或方向不一致也计算) no_direction = False - in_cooldown = (now_ms - self.last_signal_ts < COOLDOWN_MS) + last_signal_ts = self.last_signal_ts.get(strategy_name, 0) + in_cooldown = now_ms - last_signal_ts < COOLDOWN_MS if cvd_fast > 0 and cvd_mid > 0: direction = "LONG" @@ -326,8 +442,10 @@ class SymbolState: elif not has_adverse_p99: direction_score += 10 accel_bonus = 0 - if (direction == "LONG" and cvd_fast_accel > 0) or (direction == "SHORT" and cvd_fast_accel < 0): - accel_bonus = 5 + if "accel" in enabled_signals and ( + (direction == "LONG" and cvd_fast_accel > 0) or (direction == "SHORT" and cvd_fast_accel < 0) + ): + accel_bonus = int(strategy_cfg.get("accel_bonus", 5)) # 2) 拥挤层(20分)- market_indicators缺失时给中间分 long_short_ratio = to_float(self.market_indicators.get("long_short_ratio")) @@ -358,24 +476,53 @@ class SymbolState: top_trader_score = 5 crowding_score = ls_score + top_trader_score - # 3) 环境层(15分)— OI变化率 - oi_value = to_float(self.market_indicators.get("open_interest_hist")) - if oi_value is None or self.prev_oi_value == 0: - environment_score = 10 - oi_change = 0.0 - else: - oi_change = (oi_value - self.prev_oi_value) / self.prev_oi_value if self.prev_oi_value > 0 else 0 - if oi_change >= 0.03: - environment_score = 15 - elif oi_change > 0: - environment_score = 10 - else: - environment_score = 5 - if oi_value is not None and oi_value > 0: - self.prev_oi_value = oi_value + # Funding Rate scoring (拥挤层加分) + # Read from market_indicators table + funding_rate = to_float(self.market_indicators.get("funding_rate")) + fr_score = 0 + if "funding_rate" in enabled_signals and funding_rate is not None: + fr_abs = abs(funding_rate) + if fr_abs >= 0.001: # extreme ±0.1% + # Extreme: penalize if going WITH the crowd + if (direction == "LONG" and funding_rate > 0.001) or (direction == "SHORT" and funding_rate < -0.001): + fr_score = -5 + else: + fr_score = 5 + elif fr_abs >= 0.0003: # moderate ±0.03% + # Moderate: reward going AGAINST the crowd + if (direction == "LONG" and funding_rate < -0.0003) or (direction == "SHORT" and funding_rate > 0.0003): + fr_score = 5 + else: + fr_score = 0 # 4) 确认层(15分) - confirmation_score = 15 if ((direction == "LONG" and cvd_fast > 0 and cvd_mid > 0) or (direction == "SHORT" and cvd_fast < 0 and cvd_mid < 0)) else 0 + confirmation_score = 15 if ( + (direction == "LONG" and cvd_fast > 0 and cvd_mid > 0) + or (direction == "SHORT" and cvd_fast < 0 and cvd_mid < 0) + ) else 0 + + # Liquidation scoring (确认层加分) + liq_score = 0 + liq_data = None + if "liquidation" in enabled_signals: + liq_data = self.fetch_recent_liquidations() + if liq_data: + liq_long_usd = liq_data.get("long_usd", 0) + liq_short_usd = liq_data.get("short_usd", 0) + thresholds = {"BTCUSDT": 500000, "ETHUSDT": 200000, "XRPUSDT": 100000, "SOLUSDT": 100000} + threshold = thresholds.get(self.symbol, 100000) + total = liq_long_usd + liq_short_usd + if total >= threshold: + if liq_short_usd > 0 and liq_long_usd > 0: + ratio = liq_short_usd / liq_long_usd + elif liq_short_usd > 0: + ratio = float("inf") + else: + ratio = 0 + if ratio >= 2.0 and direction == "LONG": + liq_score = 5 + elif ratio <= 0.5 and direction == "SHORT": + liq_score = 5 # 5) 辅助层(5分) coinbase_premium = to_float(self.market_indicators.get("coinbase_premium")) @@ -388,7 +535,7 @@ class SymbolState: else: aux_score = 0 - total_score = direction_score + accel_bonus + crowding_score + environment_score + confirmation_score + aux_score + total_score = direction_score + accel_bonus + crowding_score + fr_score + environment_score + confirmation_score + liq_score + aux_score result["score"] = total_score result["direction"] = direction result["factors"] = { @@ -403,27 +550,31 @@ class SymbolState: "environment": {"score": environment_score, "open_interest_hist": oi_change}, "confirmation": {"score": confirmation_score}, "auxiliary": {"score": aux_score, "coinbase_premium": coinbase_premium}, + "funding_rate": {"score": fr_score, "value": funding_rate}, + "liquidation": { + "score": liq_score, + "long_usd": liq_data.get("long_usd", 0) if liq_data else 0, + "short_usd": liq_data.get("short_usd", 0) if liq_data else 0, + }, } # 始终输出direction供反向平仓判断(不受冷却限制) result["direction"] = direction if not no_direction else None - if total_score >= 85 and not no_direction and not in_cooldown: + heavy_threshold = max(strategy_threshold + 10, 85) + if total_score >= heavy_threshold and not no_direction and not in_cooldown: result["signal"] = direction result["tier"] = "heavy" - elif total_score >= 75 and not no_direction and not in_cooldown: + elif total_score >= strategy_threshold and not no_direction and not in_cooldown: result["signal"] = direction result["tier"] = "standard" - elif total_score >= 60 and not no_direction and not in_cooldown: - result["signal"] = direction - result["tier"] = "light" else: result["signal"] = None result["tier"] = None if result["signal"]: - self.last_signal_ts = now_ms - self.last_signal_dir = direction + self.last_signal_ts[strategy_name] = now_ms + self.last_signal_dir[strategy_name] = direction return result @@ -499,31 +650,60 @@ def save_indicator_1m(ts: int, symbol: str, result: dict): # ─── 模拟盘 ────────────────────────────────────────────────────── -def paper_open_trade(symbol: str, direction: str, price: float, score: int, tier: str, atr: float, now_ms: int, factors: dict = None): +def paper_open_trade( + symbol: str, + direction: str, + price: float, + score: int, + tier: str, + atr: float, + now_ms: int, + factors: dict = None, + strategy: str = "v51_baseline", + tp_sl: Optional[dict] = None, +): """模拟开仓""" import json as _json3 risk_atr = 0.7 * atr if risk_atr <= 0: return + sl_multiplier = float((tp_sl or {}).get("sl_multiplier", 2.0)) + tp1_multiplier = float((tp_sl or {}).get("tp1_multiplier", 1.5)) + tp2_multiplier = float((tp_sl or {}).get("tp2_multiplier", 3.0)) if direction == "LONG": - sl = price - 2.0 * risk_atr - tp1 = price + 1.5 * risk_atr - tp2 = price + 3.0 * risk_atr + sl = price - sl_multiplier * risk_atr + tp1 = price + tp1_multiplier * risk_atr + tp2 = price + tp2_multiplier * risk_atr else: - sl = price + 2.0 * risk_atr - tp1 = price - 1.5 * risk_atr - tp2 = price - 3.0 * risk_atr + sl = price + sl_multiplier * risk_atr + tp1 = price - tp1_multiplier * risk_atr + tp2 = price - tp2_multiplier * risk_atr with get_sync_conn() as conn: with conn.cursor() as cur: cur.execute( - "INSERT INTO paper_trades (symbol,direction,score,tier,entry_price,entry_ts,tp1_price,tp2_price,sl_price,atr_at_entry,score_factors) " - "VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", - (symbol, direction, score, tier, price, now_ms, tp1, tp2, sl, atr, - _json3.dumps(factors) if factors else None) + "INSERT INTO paper_trades (symbol,direction,score,tier,entry_price,entry_ts,tp1_price,tp2_price,sl_price,atr_at_entry,score_factors,strategy) " + "VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", + ( + symbol, + direction, + score, + tier, + price, + now_ms, + tp1, + tp2, + sl, + atr, + _json3.dumps(factors) if factors else None, + strategy, + ), ) conn.commit() - logger.info(f"[{symbol}] 📝 模拟开仓: {direction} @ {price:.2f} score={score} tier={tier} TP1={tp1:.2f} TP2={tp2:.2f} SL={sl:.2f}") + logger.info( + f"[{symbol}] 📝 模拟开仓: {direction} @ {price:.2f} score={score} tier={tier} strategy={strategy} " + f"TP1={tp1:.2f} TP2={tp2:.2f} SL={sl:.2f}" + ) def paper_check_positions(symbol: str, current_price: float, now_ms: int): @@ -620,32 +800,54 @@ def paper_check_positions(symbol: str, current_price: float, now_ms: int): conn.commit() -def paper_has_active_position(symbol: str) -> bool: +def paper_has_active_position(symbol: str, strategy: Optional[str] = None) -> bool: """检查该币种是否有活跃持仓""" with get_sync_conn() as conn: with conn.cursor() as cur: - cur.execute("SELECT COUNT(*) FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit')", (symbol,)) + if strategy: + cur.execute( + "SELECT COUNT(*) FROM paper_trades WHERE symbol=%s AND strategy=%s AND status IN ('active','tp1_hit')", + (symbol, strategy), + ) + else: + cur.execute("SELECT COUNT(*) FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit')", (symbol,)) return cur.fetchone()[0] > 0 -def paper_get_active_direction(symbol: str) -> str | None: +def paper_get_active_direction(symbol: str, strategy: Optional[str] = None) -> str | None: """获取该币种活跃持仓的方向,无持仓返回None""" with get_sync_conn() as conn: with conn.cursor() as cur: - cur.execute("SELECT direction FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit') LIMIT 1", (symbol,)) + if strategy: + cur.execute( + "SELECT direction FROM paper_trades WHERE symbol=%s AND strategy=%s AND status IN ('active','tp1_hit') LIMIT 1", + (symbol, strategy), + ) + else: + cur.execute( + "SELECT direction FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit') LIMIT 1", + (symbol,), + ) row = cur.fetchone() return row[0] if row else None -def paper_close_by_signal(symbol: str, current_price: float, now_ms: int): +def paper_close_by_signal(symbol: str, current_price: float, now_ms: int, strategy: Optional[str] = None): """反向信号平仓:按当前价平掉该币种所有活跃仓位""" with get_sync_conn() as conn: with conn.cursor() as cur: - cur.execute( - "SELECT id, direction, entry_price, tp1_hit, atr_at_entry " - "FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit')", - (symbol,) - ) + if strategy: + cur.execute( + "SELECT id, direction, entry_price, tp1_hit, atr_at_entry " + "FROM paper_trades WHERE symbol=%s AND strategy=%s AND status IN ('active','tp1_hit')", + (symbol, strategy), + ) + else: + cur.execute( + "SELECT id, direction, entry_price, tp1_hit, atr_at_entry " + "FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit')", + (symbol,), + ) positions = cur.fetchall() for pos in positions: pid, direction, entry_price, tp1_hit, atr_entry = pos @@ -661,7 +863,10 @@ def paper_close_by_signal(symbol: str, current_price: float, now_ms: int): "UPDATE paper_trades SET status='signal_flip', exit_price=%s, exit_ts=%s, pnl_r=%s WHERE id=%s", (current_price, now_ms, round(pnl_r, 4), pid) ) - logger.info(f"[{symbol}] 📝 反向信号平仓: {direction} @ {current_price:.2f} pnl={pnl_r:+.2f}R") + logger.info( + f"[{symbol}] 📝 反向信号平仓: {direction} @ {current_price:.2f} pnl={pnl_r:+.2f}R" + f"{f' strategy={strategy}' if strategy else ''}" + ) conn.commit() @@ -677,6 +882,11 @@ def paper_active_count() -> int: def main(): init_schema() + strategy_configs = load_strategy_configs() + strategy_names = [cfg.get("name", "unknown") for cfg in strategy_configs] + logger.info(f"已加载策略配置: {', '.join(strategy_names)}") + primary_strategy_name = "v52_8signals" if any(cfg.get("name") == "v52_8signals" for cfg in strategy_configs) else strategy_names[0] + states = {sym: SymbolState(sym) for sym in SYMBOLS} for sym, state in states.items(): @@ -699,36 +909,63 @@ def main(): state.process_trade(t["agg_id"], t["time_ms"], t["price"], t["qty"], t["is_buyer_maker"]) state.market_indicators = fetch_market_indicators(sym) - result = state.evaluate_signal(now_ms) - save_indicator(now_ms, sym, result) + snapshot = state.build_evaluation_snapshot(now_ms) + strategy_results: list[tuple[dict, dict]] = [] + for strategy_cfg in strategy_configs: + strategy_result = state.evaluate_signal(now_ms, strategy_cfg=strategy_cfg, snapshot=snapshot) + strategy_results.append((strategy_cfg, strategy_result)) + + primary_result = strategy_results[0][1] + for strategy_cfg, strategy_result in strategy_results: + if strategy_cfg.get("name") == primary_strategy_name: + primary_result = strategy_result + break + + save_indicator(now_ms, sym, primary_result) bar_1m = (now_ms // 60000) * 60000 if last_1m_save.get(sym) != bar_1m: - save_indicator_1m(now_ms, sym, result) + save_indicator_1m(now_ms, sym, primary_result) last_1m_save[sym] = bar_1m - # 反向信号平仓:基于direction(不受冷却限制),score>=60才触发 + # 反向信号平仓:按策略独立判断,score>=75才触发 if PAPER_TRADING_ENABLED and warmup_cycles <= 0: - eval_dir = result.get("direction") - existing_dir = paper_get_active_direction(sym) - if existing_dir and eval_dir and existing_dir != eval_dir and result["score"] >= 60: - paper_close_by_signal(sym, result["price"], now_ms) - logger.info(f"[{sym}] 📝 反向信号平仓: {existing_dir} → {eval_dir} (score={result['score']})") + for strategy_cfg, result in strategy_results: + strategy_name = strategy_cfg.get("name", "v51_baseline") + eval_dir = result.get("direction") + existing_dir = paper_get_active_direction(sym, strategy_name) + if existing_dir and eval_dir and existing_dir != eval_dir and result["score"] >= 75: + paper_close_by_signal(sym, result["price"], now_ms, strategy_name) + logger.info( + f"[{sym}] 📝 反向信号平仓[{strategy_name}]: {existing_dir} → {eval_dir} " + f"(score={result['score']})" + ) - if result.get("signal"): - logger.info(f"[{sym}] 🚨 信号: {result['signal']} score={result['score']} price={result['price']:.1f}") - # 模拟盘开仓(需开关开启 + 跳过冷启动) - if PAPER_TRADING_ENABLED and warmup_cycles <= 0: - if not paper_has_active_position(sym): - active_count = paper_active_count() - if active_count < PAPER_MAX_POSITIONS: - tier = result.get("tier", "standard") - paper_open_trade( - sym, result["signal"], result["price"], - result["score"], tier, - result["atr"], now_ms, - factors=result.get("factors") - ) + for strategy_cfg, result in strategy_results: + strategy_name = strategy_cfg.get("name", "v51_baseline") + if result.get("signal"): + logger.info( + f"[{sym}] 🚨 信号[{strategy_name}]: {result['signal']} " + f"score={result['score']} price={result['price']:.1f}" + ) + # 模拟盘开仓(需开关开启 + 跳过冷启动) + if PAPER_TRADING_ENABLED and warmup_cycles <= 0: + if not paper_has_active_position(sym, strategy_name): + active_count = paper_active_count() + if active_count < PAPER_MAX_POSITIONS: + tier = result.get("tier", "standard") + paper_open_trade( + sym, + result["signal"], + result["price"], + result["score"], + tier, + result["atr"], + now_ms, + factors=result.get("factors"), + strategy=strategy_name, + tp_sl=strategy_cfg.get("tp_sl"), + ) # 模拟盘持仓检查由paper_monitor.py通过WebSocket实时处理,这里不再检查