""" signal_engine.py — V5 短线交易信号引擎(PostgreSQL版) 架构: - 独立PM2进程,每5秒循环 - 内存滚动窗口计算指标(CVD/ATR/VWAP/大单阈值) - 启动时回灌历史数据(冷启动warmup) - 信号评估:核心3条件+加分3条件 - 输出:signal_indicators表 + signal_trades表 + Discord推送 指标: - CVD_fast (30m滚动) / CVD_mid (4h滚动) / CVD_day (UTC日内) - ATR (5m, 14周期) - VWAP_30m - 大单阈值 P95/P99 (24h滚动) """ import logging import os import time import json from collections import deque from datetime import datetime, timezone from typing import Any, Optional from db import get_sync_conn, init_schema logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[ logging.StreamHandler(), logging.FileHandler(os.path.join(os.path.dirname(__file__), "..", "signal-engine.log")), ], ) logger = logging.getLogger("signal-engine") SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"] LOOP_INTERVAL = 15 # 秒(从5改15,CPU降60%,信号质量无影响) STRATEGY_DIR = os.path.join(os.path.dirname(__file__), "strategies") DEFAULT_STRATEGY_FILES = ["v51_baseline.json", "v52_8signals.json", "v53_alt.json", "v53_btc.json"] def load_strategy_configs() -> list[dict]: configs = [] for filename in DEFAULT_STRATEGY_FILES: path = os.path.join(STRATEGY_DIR, filename) try: with open(path, "r", encoding="utf-8") as f: cfg = json.load(f) if isinstance(cfg, dict) and cfg.get("name"): configs.append(cfg) except FileNotFoundError: logger.warning(f"策略配置缺失: {path}") except Exception as e: logger.error(f"策略配置加载失败 {path}: {e}") if not configs: logger.warning("未加载到策略配置,回退到v51_baseline默认配置") configs.append( { "name": "v51_baseline", "threshold": 75, "signals": ["cvd", "p99", "accel", "ls_ratio", "oi", "coinbase_premium"], "tp_sl": {"sl_multiplier": 2.0, "tp1_multiplier": 1.5, "tp2_multiplier": 3.0}, } ) return configs # ─── 模拟盘配置 ─────────────────────────────────────────────────── PAPER_TRADING_ENABLED = False # 总开关(兼容旧逻辑) PAPER_ENABLED_STRATEGIES = [] # 分策略开关: ["v51_baseline", "v52_8signals"] PAPER_INITIAL_BALANCE = 10000 # 虚拟初始资金 USDT PAPER_RISK_PER_TRADE = 0.02 # 单笔风险 2%(即200U) PAPER_MAX_POSITIONS = 4 # 每套策略最大同时持仓数 PAPER_TIER_MULTIPLIER = { # 档位仓位倍数 "light": 0.5, # 轻仓: 1% "standard": 1.0, # 标准: 2% "heavy": 1.5, # 加仓: 3% } PAPER_FEE_RATE = 0.0005 # Taker手续费 0.05%(开仓+平仓各一次) def load_paper_config(): """从配置文件加载模拟盘开关和参数""" global PAPER_TRADING_ENABLED, PAPER_ENABLED_STRATEGIES, PAPER_INITIAL_BALANCE, PAPER_RISK_PER_TRADE, PAPER_MAX_POSITIONS config_path = os.path.join(os.path.dirname(__file__), "paper_config.json") try: with open(config_path, "r") as f: import json as _json2 cfg = _json2.load(f) PAPER_TRADING_ENABLED = cfg.get("enabled", False) PAPER_ENABLED_STRATEGIES = cfg.get("enabled_strategies", []) PAPER_INITIAL_BALANCE = cfg.get("initial_balance", 10000) PAPER_RISK_PER_TRADE = cfg.get("risk_per_trade", 0.02) PAPER_MAX_POSITIONS = cfg.get("max_positions", 4) except FileNotFoundError: pass def is_strategy_enabled(strategy_name: str) -> bool: """检查某策略是否启用模拟盘""" if not PAPER_TRADING_ENABLED: return False # 如果enabled_strategies为空,走旧逻辑(全部启用) if not PAPER_ENABLED_STRATEGIES: return True return strategy_name in PAPER_ENABLED_STRATEGIES # ───────────────────────────────────────────────────────────────── # 窗口大小(毫秒) WINDOW_FAST = 30 * 60 * 1000 # 30分钟 WINDOW_MID = 4 * 3600 * 1000 # 4小时 WINDOW_DAY = 24 * 3600 * 1000 # 24小时 WINDOW_VWAP = 30 * 60 * 1000 # 30分钟 # ATR参数 ATR_PERIOD_MS = 5 * 60 * 1000 ATR_LENGTH = 14 # 信号冷却 COOLDOWN_MS = 10 * 60 * 1000 def fetch_market_indicators(symbol: str) -> dict: """从PG读取最新的market_indicators数据,解析JSONB提取关键数值""" import json as _json with get_sync_conn() as conn: with conn.cursor() as cur: indicators = {} ind_types = [ "long_short_ratio", "top_trader_position", "open_interest_hist", "coinbase_premium", "funding_rate", "obi_depth_10", "spot_perp_divergence", "tiered_cvd_whale", # Phase 2 ] for ind_type in ind_types: cur.execute( "SELECT value FROM market_indicators WHERE symbol=%s AND indicator_type=%s ORDER BY timestamp_ms DESC LIMIT 1", (symbol, ind_type), ) row = cur.fetchone() if not row or row[0] is None: indicators[ind_type] = None continue val = row[0] if isinstance(val, str): try: val = _json.loads(val) except Exception: indicators[ind_type] = None continue # 提取关键数值 if ind_type == "long_short_ratio": indicators[ind_type] = float(val.get("longShortRatio", 1.0)) elif ind_type == "top_trader_position": indicators[ind_type] = float(val.get("longAccount", 0.5)) elif ind_type == "open_interest_hist": indicators[ind_type] = float(val.get("sumOpenInterestValue", 0)) elif ind_type == "coinbase_premium": indicators[ind_type] = float(val.get("premium_pct", 0)) / 100.0 elif ind_type == "funding_rate": indicators[ind_type] = float(val.get("fundingRate", val.get("lastFundingRate", 0))) elif ind_type == "obi_depth_10": # obi范围[-1,1],正值=买压,负值=卖压 indicators[ind_type] = float(val.get("obi", 0)) elif ind_type == "spot_perp_divergence": # divergence = (spot - mark) / mark indicators[ind_type] = float(val.get("divergence", 0)) elif ind_type == "tiered_cvd_whale": # 巨鲸净CVD比率[-1,1],正值=净买入 indicators[ind_type] = float(val.get("whale_cvd_ratio", 0)) return indicators def to_float(value: Any) -> Optional[float]: try: return float(value) if value is not None else None except (TypeError, ValueError): return None # ─── 滚动窗口 ─────────────────────────────────────────────────── class TradeWindow: def __init__(self, window_ms: int): self.window_ms = window_ms self.trades: deque = deque() self.buy_vol = 0.0 self.sell_vol = 0.0 self.pq_sum = 0.0 self.q_sum = 0.0 def add(self, time_ms: int, qty: float, price: float, is_buyer_maker: int): self.trades.append((time_ms, qty, price, is_buyer_maker)) pq = price * qty self.pq_sum += pq self.q_sum += qty if is_buyer_maker == 0: self.buy_vol += qty else: self.sell_vol += qty def trim(self, now_ms: int): cutoff = now_ms - self.window_ms while self.trades and self.trades[0][0] < cutoff: t_ms, qty, price, ibm = self.trades.popleft() self.pq_sum -= price * qty self.q_sum -= qty if ibm == 0: self.buy_vol -= qty else: self.sell_vol -= qty @property def cvd(self) -> float: return self.buy_vol - self.sell_vol @property def vwap(self) -> float: return self.pq_sum / self.q_sum if self.q_sum > 0 else 0.0 class ATRCalculator: def __init__(self, period_ms: int = ATR_PERIOD_MS, length: int = ATR_LENGTH): self.period_ms = period_ms self.length = length self.candles: deque = deque(maxlen=length + 1) self.current_candle: Optional[dict] = None self.atr_history: deque = deque(maxlen=288) def update(self, time_ms: int, price: float): bar_ms = (time_ms // self.period_ms) * self.period_ms if self.current_candle is None or self.current_candle["bar"] != bar_ms: if self.current_candle is not None: self.candles.append(self.current_candle) self.current_candle = {"bar": bar_ms, "open": price, "high": price, "low": price, "close": price} else: c = self.current_candle c["high"] = max(c["high"], price) c["low"] = min(c["low"], price) c["close"] = price @property def atr(self) -> float: if len(self.candles) < 2: return 0.0 trs = [] candles_list = list(self.candles) for i in range(1, len(candles_list)): prev_close = candles_list[i-1]["close"] c = candles_list[i] tr = max(c["high"] - c["low"], abs(c["high"] - prev_close), abs(c["low"] - prev_close)) trs.append(tr) if not trs: return 0.0 atr_val = trs[0] for tr in trs[1:]: atr_val = (atr_val * (self.length - 1) + tr) / self.length return atr_val @property def atr_percentile(self) -> float: current = self.atr if current == 0: return 50.0 self.atr_history.append(current) if len(self.atr_history) < 10: return 50.0 sorted_hist = sorted(self.atr_history) rank = sum(1 for x in sorted_hist if x <= current) return (rank / len(sorted_hist)) * 100 # ─── FR历史最大值缓存(每小时更新)─────────────────────────────── _max_fr_cache: dict = {} # {symbol: max_abs_fr} _max_fr_updated: float = 0 def get_max_fr(symbol: str) -> float: """获取该币种历史最大|FR|,每小时刷新一次""" global _max_fr_cache, _max_fr_updated now = time.time() if now - _max_fr_updated > 3600 or symbol not in _max_fr_cache: try: with get_sync_conn() as conn: with conn.cursor() as cur: cur.execute( "SELECT symbol, MAX(ABS((value->>'fundingRate')::float)) as max_fr " "FROM market_indicators WHERE indicator_type='funding_rate' " "GROUP BY symbol" ) for row in cur.fetchall(): _max_fr_cache[row[0]] = row[1] if row[1] else 0.0001 _max_fr_updated = now except Exception as e: logger.warning(f"get_max_fr error: {e}") return _max_fr_cache.get(symbol, 0.0001) # 默认0.01%防除零 class SymbolState: def __init__(self, symbol: str): self.symbol = symbol self.win_fast = TradeWindow(WINDOW_FAST) self.win_mid = TradeWindow(WINDOW_MID) self.win_day = TradeWindow(WINDOW_DAY) self.win_vwap = TradeWindow(WINDOW_VWAP) self.atr_calc = ATRCalculator() self.last_processed_id = 0 self.last_trade_price = 0.0 self.warmup = True self.prev_cvd_fast = 0.0 self.prev_cvd_fast_slope = 0.0 self.prev_oi_value = 0.0 self.market_indicators = fetch_market_indicators(symbol) self.last_signal_ts: dict[str, int] = {} self.last_signal_dir: dict[str, str] = {} self.recent_large_trades: deque = deque() def process_trade(self, agg_id: int, time_ms: int, price: float, qty: float, is_buyer_maker: int): now_ms = time_ms self.win_fast.add(time_ms, qty, price, is_buyer_maker) self.win_mid.add(time_ms, qty, price, is_buyer_maker) self.win_day.add(time_ms, qty, price, is_buyer_maker) self.win_vwap.add(time_ms, qty, price, is_buyer_maker) self.atr_calc.update(time_ms, price) self.win_fast.trim(now_ms) self.win_mid.trim(now_ms) self.win_day.trim(now_ms) self.win_vwap.trim(now_ms) self.last_processed_id = agg_id self.last_trade_price = price # 最新成交价,用于entry_price def compute_p95_p99(self) -> tuple: if len(self.win_day.trades) < 100: return 5.0, 10.0 qtys = sorted([t[1] for t in self.win_day.trades]) n = len(qtys) p95 = qtys[int(n * 0.95)] p99 = qtys[int(n * 0.99)] if "BTC" in self.symbol: p95 = max(p95, 5.0); p99 = max(p99, 10.0) else: p95 = max(p95, 50.0); p99 = max(p99, 100.0) return p95, p99 def update_large_trades(self, now_ms: int, p99: float): cutoff = now_ms - 15 * 60 * 1000 while self.recent_large_trades and self.recent_large_trades[0][0] < cutoff: self.recent_large_trades.popleft() # 只检查新trade(避免重复添加) seen = set(t[0] for t in self.recent_large_trades) # time_ms作为去重key for t in self.win_fast.trades: if t[1] >= p99 and t[0] > cutoff and t[0] not in seen: self.recent_large_trades.append((t[0], t[1], t[3])) seen.add(t[0]) def build_evaluation_snapshot(self, now_ms: int) -> dict: cvd_fast = self.win_fast.cvd cvd_mid = self.win_mid.cvd cvd_day = self.win_day.cvd vwap = self.win_vwap.vwap atr = self.atr_calc.atr atr_pct = self.atr_calc.atr_percentile p95, p99 = self.compute_p95_p99() self.update_large_trades(now_ms, p99) price = self.last_trade_price if self.last_trade_price > 0 else vwap # 用最新成交价,非VWAP cvd_fast_slope = cvd_fast - self.prev_cvd_fast cvd_fast_accel = cvd_fast_slope - self.prev_cvd_fast_slope self.prev_cvd_fast = cvd_fast self.prev_cvd_fast_slope = cvd_fast_slope oi_value = to_float(self.market_indicators.get("open_interest_hist")) if oi_value is None or self.prev_oi_value == 0: oi_change = 0.0 environment_score = 10 else: oi_change = (oi_value - self.prev_oi_value) / self.prev_oi_value if self.prev_oi_value > 0 else 0.0 if oi_change >= 0.03: environment_score = 15 elif oi_change > 0: environment_score = 10 else: environment_score = 5 if oi_value is not None and oi_value > 0: self.prev_oi_value = oi_value return { "cvd_fast": cvd_fast, "cvd_mid": cvd_mid, "cvd_day": cvd_day, "vwap": vwap, "atr": atr, "atr_value": atr, # V5.3: ATR绝对值快照(用于feature_events落库) "atr_pct": atr_pct, "p95": p95, "p99": p99, "price": price, "cvd_fast_slope": cvd_fast_slope, "cvd_fast_accel": cvd_fast_accel, "oi_change": oi_change, "environment_score": environment_score, "oi_value": oi_value, } def fetch_recent_liquidations(self, window_ms: int = 300000): """Fetch last 5min liquidation totals from liquidations table""" now_ms = int(time.time() * 1000) cutoff = now_ms - window_ms with get_sync_conn() as conn: with conn.cursor() as cur: cur.execute( """ SELECT COALESCE(SUM(CASE WHEN side='LONG' THEN usd_value ELSE 0 END), 0) as long_liq, COALESCE(SUM(CASE WHEN side='SHORT' THEN usd_value ELSE 0 END), 0) as short_liq FROM liquidations WHERE symbol=%s AND trade_time >= %s """, (self.symbol, cutoff), ) row = cur.fetchone() if row: return {"long_usd": row[0], "short_usd": row[1]} return None def evaluate_signal(self, now_ms: int, strategy_cfg: Optional[dict] = None, snapshot: Optional[dict] = None) -> dict: strategy_cfg = strategy_cfg or {} strategy_name = strategy_cfg.get("name", "v51_baseline") track = strategy_cfg.get("track", "ALT") # ─── Track Router ─────────────────────────────────────────── # V5.3策略按track路由到专属评分逻辑 # v53_alt → ALT轨(ETH/XRP/SOL),55/25/15/5,删确认层 # v53_btc → BTC轨,gate-control逻辑 # v51/v52 → 原有代码路径(兼容,不修改) if track == "ALT" and strategy_name.startswith("v53"): # 检查symbol限制(v53_alt只跑ALT,不跑BTC) allowed_symbols = strategy_cfg.get("symbols", []) if allowed_symbols and self.symbol not in allowed_symbols: snap = snapshot or self.build_evaluation_snapshot(now_ms) return self._empty_result(strategy_name, snap) return self._evaluate_v53_alt(now_ms, strategy_cfg, snapshot) if track == "BTC" and strategy_name.startswith("v53"): # v53_btc只跑BTC if self.symbol != "BTCUSDT": snap = snapshot or self.build_evaluation_snapshot(now_ms) return self._empty_result(strategy_name, snap) return self._evaluate_v53_btc(now_ms, strategy_cfg, snapshot) # ─── 原有V5.1/V5.2评分逻辑(保持不变)──────────────────────── strategy_cfg = strategy_cfg or {} strategy_name = strategy_cfg.get("name", "v51_baseline") strategy_threshold = int(strategy_cfg.get("threshold", 75)) enabled_signals = set(strategy_cfg.get("signals", [])) snap = snapshot or self.build_evaluation_snapshot(now_ms) cvd_fast = snap["cvd_fast"] cvd_mid = snap["cvd_mid"] vwap = snap["vwap"] atr = snap["atr"] atr_pct = snap["atr_pct"] p95 = snap["p95"] p99 = snap["p99"] price = snap["price"] cvd_fast_slope = snap["cvd_fast_slope"] cvd_fast_accel = snap["cvd_fast_accel"] oi_change = snap["oi_change"] environment_score = snap["environment_score"] result = { "strategy": strategy_name, "cvd_fast": cvd_fast, "cvd_mid": cvd_mid, "cvd_day": snap["cvd_day"], "cvd_fast_slope": cvd_fast_slope, "atr": atr, "atr_pct": atr_pct, "vwap": vwap, "price": price, "p95": p95, "p99": p99, "signal": None, "direction": None, "score": 0, "tier": None, "factors": {}, } if self.warmup or price == 0 or atr == 0: return result # 判断倾向方向(用于评分展示,即使冷却或方向不一致也计算) no_direction = False last_signal_ts = self.last_signal_ts.get(strategy_name, 0) in_cooldown = now_ms - last_signal_ts < COOLDOWN_MS if cvd_fast > 0 and cvd_mid > 0: direction = "LONG" elif cvd_fast < 0 and cvd_mid < 0: direction = "SHORT" else: direction = "LONG" if cvd_fast > 0 else "SHORT" no_direction = True # V5.1 五层评分体系(总分100,方向层可因加速额外+5) # 1) 方向层(45分 + 5加速分) direction_score = 0 if (direction == "LONG" and cvd_fast > 0) or (direction == "SHORT" and cvd_fast < 0): direction_score += 15 if (direction == "LONG" and cvd_mid > 0) or (direction == "SHORT" and cvd_mid < 0): direction_score += 15 has_adverse_p99 = any( (direction == "LONG" and lt[2] == 1) or (direction == "SHORT" and lt[2] == 0) for lt in self.recent_large_trades ) has_aligned_p99 = any( (direction == "LONG" and lt[2] == 0) or (direction == "SHORT" and lt[2] == 1) for lt in self.recent_large_trades ) if has_aligned_p99: direction_score += 15 elif not has_adverse_p99: direction_score += 10 accel_bonus = 0 if "accel" in enabled_signals and ( (direction == "LONG" and cvd_fast_accel > 0) or (direction == "SHORT" and cvd_fast_accel < 0) ): accel_bonus = int(strategy_cfg.get("accel_bonus", 5)) # 2) 拥挤层(20分)- market_indicators缺失时给中间分 long_short_ratio = to_float(self.market_indicators.get("long_short_ratio")) if long_short_ratio is None: ls_score = 5 elif (direction == "SHORT" and long_short_ratio > 2.0) or (direction == "LONG" and long_short_ratio < 0.5): ls_score = 10 else: ls_score = 5 top_trader_position = to_float(self.market_indicators.get("top_trader_position")) if top_trader_position is None: top_trader_score = 5 else: if direction == "LONG": if top_trader_position >= 0.55: top_trader_score = 10 elif top_trader_position <= 0.45: top_trader_score = 0 else: top_trader_score = 5 else: if top_trader_position <= 0.45: top_trader_score = 10 elif top_trader_position >= 0.55: top_trader_score = 0 else: top_trader_score = 5 crowding_score = ls_score + top_trader_score # Funding Rate scoring (拥挤层加分) # Read from market_indicators table funding_rate = to_float(self.market_indicators.get("funding_rate")) fr_score = 0.0 if "funding_rate" in enabled_signals and funding_rate is not None and funding_rate != 0: max_fr = get_max_fr(self.symbol) fr_abs = abs(funding_rate) # 线性映射: raw = (|当前FR| / 历史最大FR) × 5, clamp到5 raw_score = min(fr_abs / max_fr * 5.0, 5.0) # FR评估"资金费率对当前方向有多有利",0~5分,不扣分 # 做多+FR负(空头付费给多头)=有利 → 给分 # 做空+FR正(多头付费给空头)=有利 → 给分 # 其他情况(FR方向不利) → 0分 if (direction == "LONG" and funding_rate < 0) or (direction == "SHORT" and funding_rate > 0): fr_score = round(raw_score, 2) # 有利,给分 else: fr_score = 0.0 # 不利,不给分也不扣分 # 4) 确认层(15分) confirmation_score = 15 if ( (direction == "LONG" and cvd_fast > 0 and cvd_mid > 0) or (direction == "SHORT" and cvd_fast < 0 and cvd_mid < 0) ) else 0 # Liquidation scoring (确认层加分) liq_score = 0 liq_data = None if "liquidation" in enabled_signals: liq_data = self.fetch_recent_liquidations() if liq_data: liq_long_usd = liq_data.get("long_usd", 0) liq_short_usd = liq_data.get("short_usd", 0) total = liq_long_usd + liq_short_usd if total > 0: # ratio = 空头清算 / 多头清算 # ratio高 = 空头爆仓多 = 有利于做多 if liq_long_usd > 0: ratio = liq_short_usd / liq_long_usd else: ratio = float("inf") if liq_short_usd > 0 else 1.0 # 梯度评分:ratio越极端分越高 def liq_ratio_to_score(r: float) -> int: if r >= 2.0: return 5 if r >= 1.5: return 3 if r >= 1.2: return 1 return 0 if direction == "LONG": # 做多:空头爆仓多(ratio高)有利 liq_score = liq_ratio_to_score(ratio) else: # 做空:多头爆仓多(ratio低=1/ratio高)有利 inv_ratio = (1.0 / ratio) if ratio > 0 else float("inf") liq_score = liq_ratio_to_score(inv_ratio) # 5) 辅助层(5分) coinbase_premium = to_float(self.market_indicators.get("coinbase_premium")) if coinbase_premium is None: aux_score = 2 elif (direction == "LONG" and coinbase_premium > 0.0005) or (direction == "SHORT" and coinbase_premium < -0.0005): aux_score = 5 elif abs(coinbase_premium) <= 0.0005: aux_score = 2 else: aux_score = 0 # 按策略配置的权重缩放各层分数 weights = strategy_cfg.get("weights", {}) w_direction = weights.get("direction", 45) w_crowding = weights.get("crowding", 20) w_fr = weights.get("funding_rate", 0) w_environment = weights.get("environment", 15) w_confirmation = weights.get("confirmation", 15) w_liq = weights.get("liquidation", 0) w_auxiliary = weights.get("auxiliary", 5) # 原始评分范围: direction 0~45, crowding 0~20, environment 0~15, confirmation 0~15, auxiliary 0~5 # FR 0~5 (or -5~5), Liq 0~5 # 缩放到配置权重 scaled_direction = min(round(direction_score / 45 * w_direction) + accel_bonus, w_direction) scaled_crowding = round(crowding_score / 20 * w_crowding) scaled_fr = fr_score if w_fr > 0 else 0 # FR already 0~5 range, matches w_fr=5 scaled_environment = round(environment_score / 15 * w_environment) scaled_confirmation = round(confirmation_score / 15 * w_confirmation) scaled_liq = liq_score if w_liq > 0 else 0 # Liq already 0~5 range, matches w_liq=5 scaled_auxiliary = round(aux_score / 5 * w_auxiliary) total_score = scaled_direction + scaled_crowding + scaled_fr + scaled_environment + scaled_confirmation + scaled_liq + scaled_auxiliary total_score = max(0, min(round(total_score, 1), 100)) # clamp 0~100, 保留1位小数 result["score"] = total_score result["direction"] = direction result["factors"] = { "direction": { "score": scaled_direction, "max": w_direction, "cvd_fast": 15 if ((direction == "LONG" and cvd_fast > 0) or (direction == "SHORT" and cvd_fast < 0)) else 0, "cvd_mid": 15 if ((direction == "LONG" and cvd_mid > 0) or (direction == "SHORT" and cvd_mid < 0)) else 0, "p99_flow": 15 if has_aligned_p99 else (10 if not has_adverse_p99 else 0), "accel_bonus": accel_bonus, }, "crowding": {"score": scaled_crowding, "max": w_crowding, "long_short_ratio": ls_score, "top_trader_position": top_trader_score}, "environment": {"score": scaled_environment, "max": w_environment, "open_interest_hist": oi_change}, "confirmation": {"score": scaled_confirmation, "max": w_confirmation}, "auxiliary": {"score": scaled_auxiliary, "max": w_auxiliary, "coinbase_premium": coinbase_premium}, "funding_rate": {"score": round(scaled_fr, 2), "max": w_fr, "value": funding_rate, "max_fr_hist": get_max_fr(self.symbol)}, "liquidation": { "score": scaled_liq, "max": w_liq, "long_usd": liq_data.get("long_usd", 0) if liq_data else 0, "short_usd": liq_data.get("short_usd", 0) if liq_data else 0, }, } # 始终输出direction供反向平仓判断(不受冷却限制) result["direction"] = direction if not no_direction else None heavy_threshold = max(strategy_threshold + 10, 85) if total_score >= heavy_threshold and not no_direction and not in_cooldown: result["signal"] = direction result["tier"] = "heavy" elif total_score >= strategy_threshold and not no_direction and not in_cooldown: result["signal"] = direction result["tier"] = "standard" else: result["signal"] = None result["tier"] = None if result["signal"]: self.last_signal_ts[strategy_name] = now_ms self.last_signal_dir[strategy_name] = direction return result def _empty_result(self, strategy_name: str, snap: dict) -> dict: """返回空评分结果(symbol不匹配track时使用)""" return { "strategy": strategy_name, "cvd_fast": snap["cvd_fast"], "cvd_mid": snap["cvd_mid"], "cvd_day": snap["cvd_day"], "cvd_fast_slope": snap["cvd_fast_slope"], "atr": snap["atr"], "atr_value": snap.get("atr_value", snap["atr"]), "atr_pct": snap["atr_pct"], "vwap": snap["vwap"], "price": snap["price"], "p95": snap["p95"], "p99": snap["p99"], "signal": None, "direction": None, "score": 0, "tier": None, "factors": {}, } def _evaluate_v53_alt(self, now_ms: int, strategy_cfg: dict, snapshot: Optional[dict] = None) -> dict: """ V5.3 ALT轨评分(ETH/XRP/SOL) 权重:Direction 55 | Crowding 25 | Environment 15 | Auxiliary 5 删除独立确认层,解决CVD双重计分问题 """ strategy_name = strategy_cfg.get("name", "v53_alt") strategy_threshold = int(strategy_cfg.get("threshold", 75)) flip_threshold = int(strategy_cfg.get("flip_threshold", 85)) snap = snapshot or self.build_evaluation_snapshot(now_ms) cvd_fast = snap["cvd_fast"] cvd_mid = snap["cvd_mid"] price = snap["price"] atr = snap["atr"] atr_value = snap.get("atr_value", atr) atr_pct = snap["atr_pct"] cvd_fast_accel = snap["cvd_fast_accel"] environment_score_raw = snap["environment_score"] result = self._empty_result(strategy_name, snap) if self.warmup or price == 0 or atr == 0: return result last_signal_ts = self.last_signal_ts.get(strategy_name, 0) in_cooldown = now_ms - last_signal_ts < COOLDOWN_MS # ── Direction Layer(55分)────────────────────────────────── # cvd_resonance(30分):fast+mid同向 = 有效方向 if cvd_fast > 0 and cvd_mid > 0: direction = "LONG" cvd_resonance = 30 no_direction = False elif cvd_fast < 0 and cvd_mid < 0: direction = "SHORT" cvd_resonance = 30 no_direction = False else: direction = "LONG" if cvd_fast > 0 else "SHORT" cvd_resonance = 0 no_direction = True # gate: 方向不一致 → 直接不开仓 # p99_flow_alignment(0/10/20分) has_adverse_p99 = any( (direction == "LONG" and lt[2] == 1) or (direction == "SHORT" and lt[2] == 0) for lt in self.recent_large_trades ) has_aligned_p99 = any( (direction == "LONG" and lt[2] == 0) or (direction == "SHORT" and lt[2] == 1) for lt in self.recent_large_trades ) if has_aligned_p99: p99_flow = 20 elif not has_adverse_p99: p99_flow = 10 else: p99_flow = 0 # cvd_accel_bonus(0/5分) accel_bonus = 5 if ( (direction == "LONG" and cvd_fast_accel > 0) or (direction == "SHORT" and cvd_fast_accel < 0) ) else 0 direction_score = min(cvd_resonance + p99_flow + accel_bonus, 55) # ── Crowding Layer(25分)────────────────────────────────── long_short_ratio = to_float(self.market_indicators.get("long_short_ratio")) if long_short_ratio is None: ls_score = 7 # 缺失给中间分 elif (direction == "SHORT" and long_short_ratio > 2.0) or (direction == "LONG" and long_short_ratio < 0.5): ls_score = 15 elif (direction == "SHORT" and long_short_ratio > 1.5) or (direction == "LONG" and long_short_ratio < 0.7): ls_score = 10 elif (direction == "SHORT" and long_short_ratio > 1.0) or (direction == "LONG" and long_short_ratio < 1.0): ls_score = 7 else: ls_score = 0 top_trader_position = to_float(self.market_indicators.get("top_trader_position")) if top_trader_position is None: top_trader_score = 5 else: if direction == "LONG": top_trader_score = 10 if top_trader_position >= 0.55 else (0 if top_trader_position <= 0.45 else 5) else: top_trader_score = 10 if top_trader_position <= 0.45 else (0 if top_trader_position >= 0.55 else 5) crowding_score = min(ls_score + top_trader_score, 25) # ── Environment Layer(15分)──────────────────────────────── environment_score = round(environment_score_raw / 15 * 15) # 已是0~15 # ── Auxiliary Layer(5分)────────────────────────────────── coinbase_premium = to_float(self.market_indicators.get("coinbase_premium")) if coinbase_premium is None: aux_score = 2 elif (direction == "LONG" and coinbase_premium > 0.0005) or (direction == "SHORT" and coinbase_premium < -0.0005): aux_score = 5 elif abs(coinbase_premium) <= 0.0005: aux_score = 2 else: aux_score = 0 total_score = min(direction_score + crowding_score + environment_score + aux_score, 100) total_score = max(0, round(total_score, 1)) result.update({ "score": total_score, "direction": direction if not no_direction else None, "atr_value": atr_value, "factors": { "track": "ALT", "direction": { "score": direction_score, "max": 55, "cvd_resonance": cvd_resonance, "p99_flow": p99_flow, "accel_bonus": accel_bonus, }, "crowding": { "score": crowding_score, "max": 25, "lsr_contrarian": ls_score, "top_trader_position": top_trader_score, }, "environment": {"score": environment_score, "max": 15}, "auxiliary": {"score": aux_score, "max": 5, "coinbase_premium": coinbase_premium}, }, }) if not no_direction and not in_cooldown: if total_score >= flip_threshold: result["signal"] = direction result["tier"] = "heavy" elif total_score >= strategy_threshold: result["signal"] = direction result["tier"] = "standard" if result["signal"]: self.last_signal_ts[strategy_name] = now_ms self.last_signal_dir[strategy_name] = direction return result def _evaluate_v53_btc(self, now_ms: int, strategy_cfg: dict, snapshot: Optional[dict] = None) -> dict: """ V5.3 BTC轨评分(gate-control逻辑) 不用线性总分,用条件门控+否决条件决定是否开仓 新特征(Phase 2采集后启用):tiered_cvd_whale, obi_depth_10, spot_perp_divergence 当前Phase1版本:用已有特征填充,为数据接入预留扩展点 """ strategy_name = strategy_cfg.get("name", "v53_btc") btc_gate = strategy_cfg.get("btc_gate", {}) min_vol = btc_gate.get("min_vol_threshold", 0.002) obi_veto = btc_gate.get("obi_veto_threshold", 0.30) spot_perp_veto = btc_gate.get("spot_perp_divergence_veto", 0.003) snap = snapshot or self.build_evaluation_snapshot(now_ms) cvd_fast = snap["cvd_fast"] cvd_mid = snap["cvd_mid"] price = snap["price"] atr = snap["atr"] atr_value = snap.get("atr_value", atr) atr_pct = snap["atr_pct"] result = self._empty_result(strategy_name, snap) if self.warmup or price == 0 or atr == 0: return result last_signal_ts = self.last_signal_ts.get(strategy_name, 0) in_cooldown = now_ms - last_signal_ts < COOLDOWN_MS block_reason = None # Gate 1: 波动率门控(atr_percent_1h = atr/price) atr_pct_price = atr / price if price > 0 else 0 if atr_pct_price < min_vol: block_reason = f"low_vol_regime({atr_pct_price:.4f}<{min_vol})" # Gate 2: 方向门控(CVD共振,BTC需要更严格) if not block_reason: if cvd_fast > 0 and cvd_mid > 0: direction = "LONG" elif cvd_fast < 0 and cvd_mid < 0: direction = "SHORT" else: block_reason = "no_direction_consensus" direction = "LONG" if cvd_fast > 0 else "SHORT" else: direction = "LONG" if cvd_fast > 0 else "SHORT" # Gate 3: OBI否决(Phase2接入obi_depth_10后生效) obi_raw = to_float(self.market_indicators.get("obi_depth_10")) if not block_reason and obi_raw is not None: # obi_raw: 正值=买单占优,负值=卖单占优,[-1,1] if direction == "LONG" and obi_raw < -obi_veto: block_reason = f"obi_imbalance_veto(obi={obi_raw:.3f})" elif direction == "SHORT" and obi_raw > obi_veto: block_reason = f"obi_imbalance_veto(obi={obi_raw:.3f})" # Gate 4: 期现背离否决(Phase2接入spot_perp_divergence后生效) spot_perp_div = to_float(self.market_indicators.get("spot_perp_divergence")) if not block_reason and spot_perp_div is not None: # spot_perp_div: 绝对背离率,如0.005=0.5% if abs(spot_perp_div) > spot_perp_veto: # 背离方向与信号方向相反时否决 if (direction == "LONG" and spot_perp_div < -spot_perp_veto) or \ (direction == "SHORT" and spot_perp_div > spot_perp_veto): block_reason = f"spot_perp_divergence_veto({spot_perp_div:.4f})" # 所有门控通过后,用ALT评分作为BTC综合评分(暂用,待Phase2换专属特征) gate_passed = block_reason is None # 复用ALT评分作为参考分(不影响门控决策,仅供记录) alt_result = self._evaluate_v53_alt(now_ms, strategy_cfg, snap) total_score = alt_result["score"] if gate_passed else 0 result.update({ "score": total_score, "direction": direction if gate_passed else None, "atr_value": atr_value, "factors": { "track": "BTC", "gate_passed": gate_passed, "block_reason": block_reason, "atr_pct_price": round(atr_pct_price, 5), "obi_raw": obi_raw, "spot_perp_div": spot_perp_div, "alt_score_ref": alt_result["score"], }, }) strategy_threshold = int(strategy_cfg.get("threshold", 75)) if gate_passed and not in_cooldown and total_score >= strategy_threshold: result["signal"] = direction result["tier"] = "standard" if result["signal"]: self.last_signal_ts[strategy_name] = now_ms self.last_signal_dir[strategy_name] = direction return result # ─── PG DB操作 ─────────────────────────────────────────────────── def load_historical(state: SymbolState, window_ms: int): now_ms = int(time.time() * 1000) start_ms = now_ms - window_ms count = 0 with get_sync_conn() as conn: with conn.cursor() as cur: cur.execute( "SELECT agg_id, price, qty, time_ms, is_buyer_maker FROM agg_trades " "WHERE symbol = %s AND time_ms >= %s ORDER BY agg_id ASC", (state.symbol, start_ms) ) while True: rows = cur.fetchmany(5000) if not rows: break for r in rows: state.process_trade(r[0], r[3], r[1], r[2], r[4]) count += 1 logger.info(f"[{state.symbol}] 冷启动完成: 加载{count:,}条历史数据 (窗口={window_ms//3600000}h)") state.warmup = False def fetch_new_trades(symbol: str, last_id: int) -> list: with get_sync_conn() as conn: with conn.cursor() as cur: cur.execute( "SELECT agg_id, price, qty, time_ms, is_buyer_maker FROM agg_trades " "WHERE symbol = %s AND agg_id > %s ORDER BY agg_id ASC LIMIT 10000", (symbol, last_id) ) return [{"agg_id": r[0], "price": r[1], "qty": r[2], "time_ms": r[3], "is_buyer_maker": r[4]} for r in cur.fetchall()] def save_indicator(ts: int, symbol: str, result: dict, strategy: str = "v52_8signals"): with get_sync_conn() as conn: with conn.cursor() as cur: import json as _json3 factors_json = _json3.dumps(result.get("factors")) if result.get("factors") else None cur.execute( "INSERT INTO signal_indicators " "(ts,symbol,strategy,cvd_fast,cvd_mid,cvd_day,cvd_fast_slope,atr_5m,atr_percentile,atr_value,vwap_30m,price,p95_qty,p99_qty,score,signal,factors) " "VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", (ts, symbol, strategy, result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["cvd_fast_slope"], result["atr"], result["atr_pct"], result.get("atr_value", result["atr"]), result["vwap"], result["price"], result["p95"], result["p99"], result["score"], result.get("signal"), factors_json) ) # 有信号时通知live_executor if result.get("signal"): cur.execute("NOTIFY new_signal, %s", (f"{symbol}:{strategy}:{result['signal']}:{result['score']}",)) conn.commit() def save_indicator_1m(ts: int, symbol: str, result: dict): bar_ts = (ts // 60000) * 60000 with get_sync_conn() as conn: with conn.cursor() as cur: cur.execute("SELECT id FROM signal_indicators_1m WHERE ts=%s AND symbol=%s", (bar_ts, symbol)) if cur.fetchone(): cur.execute( "UPDATE signal_indicators_1m SET cvd_fast=%s,cvd_mid=%s,cvd_day=%s,atr_5m=%s,vwap_30m=%s,price=%s,score=%s,signal=%s WHERE ts=%s AND symbol=%s", (result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["atr"], result["vwap"], result["price"], result["score"], result.get("signal"), bar_ts, symbol) ) else: cur.execute( "INSERT INTO signal_indicators_1m (ts,symbol,cvd_fast,cvd_mid,cvd_day,atr_5m,vwap_30m,price,score,signal) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", (bar_ts, symbol, result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["atr"], result["vwap"], result["price"], result["score"], result.get("signal")) ) conn.commit() # ─── 模拟盘 ────────────────────────────────────────────────────── def paper_open_trade( symbol: str, direction: str, price: float, score: int, tier: str, atr: float, now_ms: int, factors: dict = None, strategy: str = "v51_baseline", tp_sl: Optional[dict] = None, ): """模拟开仓""" import json as _json3 if atr <= 0: return sl_multiplier = float((tp_sl or {}).get("sl_multiplier", 2.0)) tp1_multiplier = float((tp_sl or {}).get("tp1_multiplier", 1.5)) tp2_multiplier = float((tp_sl or {}).get("tp2_multiplier", 3.0)) risk_distance = sl_multiplier * atr # 1R = SL距离 = sl_multiplier × ATR if direction == "LONG": sl = price - sl_multiplier * atr tp1 = price + tp1_multiplier * atr tp2 = price + tp2_multiplier * atr else: sl = price + sl_multiplier * atr tp1 = price - tp1_multiplier * atr tp2 = price - tp2_multiplier * atr with get_sync_conn() as conn: with conn.cursor() as cur: cur.execute( "INSERT INTO paper_trades (symbol,direction,score,tier,entry_price,entry_ts,tp1_price,tp2_price,sl_price,atr_at_entry,score_factors,strategy,risk_distance) " "VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", ( symbol, direction, score, tier, price, now_ms, tp1, tp2, sl, atr, _json3.dumps(factors) if factors else None, strategy, risk_distance, ), ) conn.commit() logger.info( f"[{symbol}] 📝 模拟开仓: {direction} @ {price:.2f} score={score} tier={tier} strategy={strategy} " f"TP1={tp1:.2f} TP2={tp2:.2f} SL={sl:.2f}" ) def paper_has_active_position(symbol: str, strategy: Optional[str] = None) -> bool: """检查该币种是否有活跃持仓""" with get_sync_conn() as conn: with conn.cursor() as cur: if strategy: cur.execute( "SELECT COUNT(*) FROM paper_trades WHERE symbol=%s AND strategy=%s AND status IN ('active','tp1_hit')", (symbol, strategy), ) else: cur.execute("SELECT COUNT(*) FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit')", (symbol,)) return cur.fetchone()[0] > 0 def paper_get_active_direction(symbol: str, strategy: Optional[str] = None) -> str | None: """获取该币种活跃持仓的方向,无持仓返回None""" with get_sync_conn() as conn: with conn.cursor() as cur: if strategy: cur.execute( "SELECT direction FROM paper_trades WHERE symbol=%s AND strategy=%s AND status IN ('active','tp1_hit') LIMIT 1", (symbol, strategy), ) else: cur.execute( "SELECT direction FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit') LIMIT 1", (symbol,), ) row = cur.fetchone() return row[0] if row else None def paper_close_by_signal(symbol: str, current_price: float, now_ms: int, strategy: Optional[str] = None): """反向信号平仓:按当前价平掉该币种所有活跃仓位""" with get_sync_conn() as conn: with conn.cursor() as cur: if strategy: cur.execute( "SELECT id, direction, entry_price, tp1_hit, atr_at_entry, risk_distance " "FROM paper_trades WHERE symbol=%s AND strategy=%s AND status IN ('active','tp1_hit')", (symbol, strategy), ) else: cur.execute( "SELECT id, direction, entry_price, tp1_hit, atr_at_entry, risk_distance " "FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit')", (symbol,), ) positions = cur.fetchall() for pos in positions: pid, direction, entry_price, tp1_hit, atr_entry, rd_db = pos risk_distance = rd_db if rd_db and rd_db > 0 else abs(entry_price * 0.01) if direction == "LONG": pnl_r = (current_price - entry_price) / risk_distance if risk_distance > 0 else 0 else: pnl_r = (entry_price - current_price) / risk_distance if risk_distance > 0 else 0 # 扣手续费 fee_r = (2 * PAPER_FEE_RATE * entry_price) / risk_distance if risk_distance > 0 else 0 pnl_r -= fee_r cur.execute( "UPDATE paper_trades SET status='signal_flip', exit_price=%s, exit_ts=%s, pnl_r=%s WHERE id=%s", (current_price, now_ms, round(pnl_r, 4), pid) ) logger.info( f"[{symbol}] 📝 反向信号平仓: {direction} @ {current_price:.2f} pnl={pnl_r:+.2f}R" f"{f' strategy={strategy}' if strategy else ''}" ) conn.commit() def paper_active_count(strategy: Optional[str] = None) -> int: """当前活跃持仓总数(按策略独立计数)""" with get_sync_conn() as conn: with conn.cursor() as cur: if strategy: cur.execute("SELECT COUNT(*) FROM paper_trades WHERE strategy=%s AND status IN ('active','tp1_hit')", (strategy,)) else: cur.execute("SELECT COUNT(*) FROM paper_trades WHERE status IN ('active','tp1_hit')") return cur.fetchone()[0] # ─── 主循环 ────────────────────────────────────────────────────── def main(): init_schema() strategy_configs = load_strategy_configs() strategy_names = [cfg.get("name", "unknown") for cfg in strategy_configs] logger.info(f"已加载策略配置: {', '.join(strategy_names)}") primary_strategy_name = "v52_8signals" if any(cfg.get("name") == "v52_8signals" for cfg in strategy_configs) else strategy_names[0] states = {sym: SymbolState(sym) for sym in SYMBOLS} for sym, state in states.items(): load_historical(state, WINDOW_MID) logger.info("=== Signal Engine (PG) 启动完成 ===") last_1m_save = {} cycle = 0 warmup_cycles = 3 # 启动后跳过前3轮(45秒),避免冷启动信号开仓 while True: try: now_ms = int(time.time() * 1000) # 每轮重新加载配置(支持API热更新开关) load_paper_config() for sym, state in states.items(): new_trades = fetch_new_trades(sym, state.last_processed_id) for t in new_trades: state.process_trade(t["agg_id"], t["time_ms"], t["price"], t["qty"], t["is_buyer_maker"]) state.market_indicators = fetch_market_indicators(sym) snapshot = state.build_evaluation_snapshot(now_ms) strategy_results: list[tuple[dict, dict]] = [] for strategy_cfg in strategy_configs: strategy_result = state.evaluate_signal(now_ms, strategy_cfg=strategy_cfg, snapshot=snapshot) strategy_results.append((strategy_cfg, strategy_result)) # 每个策略独立存储indicator for strategy_cfg, strategy_result in strategy_results: sname = strategy_cfg.get("name", "v51_baseline") save_indicator(now_ms, sym, strategy_result, strategy=sname) # 1m表仍用primary(图表用) primary_result = strategy_results[0][1] for strategy_cfg, strategy_result in strategy_results: if strategy_cfg.get("name") == primary_strategy_name: primary_result = strategy_result break bar_1m = (now_ms // 60000) * 60000 if last_1m_save.get(sym) != bar_1m: save_indicator_1m(now_ms, sym, primary_result) last_1m_save[sym] = bar_1m # 反向信号平仓:按策略独立判断,score>=75才触发 if warmup_cycles <= 0: for strategy_cfg, result in strategy_results: strategy_name = strategy_cfg.get("name", "v51_baseline") if not is_strategy_enabled(strategy_name): continue eval_dir = result.get("direction") existing_dir = paper_get_active_direction(sym, strategy_name) if existing_dir and eval_dir and existing_dir != eval_dir and result["score"] >= 75: paper_close_by_signal(sym, result["price"], now_ms, strategy_name) logger.info( f"[{sym}] 📝 反向信号平仓[{strategy_name}]: {existing_dir} → {eval_dir} " f"(score={result['score']})" ) for strategy_cfg, result in strategy_results: strategy_name = strategy_cfg.get("name", "v51_baseline") if result.get("signal"): logger.info( f"[{sym}] 🚨 信号[{strategy_name}]: {result['signal']} " f"score={result['score']} price={result['price']:.1f}" ) # 模拟盘开仓(需该策略启用 + 跳过冷启动) if is_strategy_enabled(strategy_name) and warmup_cycles <= 0: if not paper_has_active_position(sym, strategy_name): active_count = paper_active_count(strategy_name) if active_count < PAPER_MAX_POSITIONS: tier = result.get("tier", "standard") paper_open_trade( sym, result["signal"], result["price"], result["score"], tier, result["atr"], now_ms, factors=result.get("factors"), strategy=strategy_name, tp_sl=strategy_cfg.get("tp_sl"), ) # 模拟盘持仓检查由paper_monitor.py通过WebSocket实时处理,这里不再检查 cycle += 1 if warmup_cycles > 0: warmup_cycles -= 1 if warmup_cycles == 0: logger.info("冷启动保护期结束,模拟盘开仓已启用") # 每60轮(约15分钟)热加载配置,不需要重启 if cycle % 60 == 0: old_strategies = list(PAPER_ENABLED_STRATEGIES) load_paper_config() strategy_configs = load_strategy_configs() # A1: 热重载权重/阈值/TP/SL strategy_names = [cfg.get("name", "unknown") for cfg in strategy_configs] primary_strategy_name = "v52_8signals" if any(cfg.get("name") == "v52_8signals" for cfg in strategy_configs) else strategy_names[0] if list(PAPER_ENABLED_STRATEGIES) != old_strategies: logger.info(f"📋 配置热加载: enabled_strategies={PAPER_ENABLED_STRATEGIES}") for sym, state in states.items(): logger.info( f"[{sym}] 状态: CVD_fast={state.win_fast.cvd:.1f} " f"CVD_mid={state.win_mid.cvd:.1f} ATR={state.atr_calc.atr:.2f} " f"({state.atr_calc.atr_percentile:.0f}%) VWAP={state.win_vwap.vwap:.1f}" ) except Exception as e: logger.error(f"循环异常: {e}", exc_info=True) time.sleep(LOOP_INTERVAL) if __name__ == "__main__": main()