""" signal_engine.py — V5 短线交易信号引擎(PostgreSQL版) 架构: - 独立PM2进程,每5秒循环 - 内存滚动窗口计算指标(CVD/ATR/VWAP/大单阈值) - 启动时回灌历史数据(冷启动warmup) - 信号评估:核心3条件+加分3条件 - 输出:signal_indicators表 + signal_trades表 + Discord推送 指标: - CVD_fast (30m滚动) / CVD_mid (4h滚动) / CVD_day (UTC日内) - ATR (5m, 14周期) - VWAP_30m - 大单阈值 P95/P99 (24h滚动) """ import logging import os import time import json import threading import asyncio from collections import deque from datetime import datetime, timezone from typing import Any, Optional import websockets from db import get_sync_conn, init_schema logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[ logging.StreamHandler(), logging.FileHandler(os.path.join(os.path.dirname(__file__), "..", "signal-engine.log")), ], ) logger = logging.getLogger("signal-engine") SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"] LOOP_INTERVAL = 15 # 秒(从5改15,CPU降60%,信号质量无影响) STRATEGY_DIR = os.path.join(os.path.dirname(__file__), "strategies") DEFAULT_STRATEGY_FILES = ["v51_baseline.json", "v52_8signals.json", "v53.json", "v53_fast.json", "v53_middle.json"] def load_strategy_configs() -> list[dict]: configs = [] for filename in DEFAULT_STRATEGY_FILES: path = os.path.join(STRATEGY_DIR, filename) try: with open(path, "r", encoding="utf-8") as f: cfg = json.load(f) if isinstance(cfg, dict) and cfg.get("name"): configs.append(cfg) except FileNotFoundError: logger.warning(f"策略配置缺失: {path}") except Exception as e: logger.error(f"策略配置加载失败 {path}: {e}") if not configs: logger.warning("未加载到策略配置,回退到v51_baseline默认配置") configs.append( { "name": "v51_baseline", "threshold": 75, "signals": ["cvd", "p99", "accel", "ls_ratio", "oi", "coinbase_premium"], "tp_sl": {"sl_multiplier": 2.0, "tp1_multiplier": 1.5, "tp2_multiplier": 3.0}, } ) return configs def load_strategy_configs_from_db() -> list[dict]: """ V5.4: 从 strategies 表读取 running 状态的策略配置。 把 DB 字段映射成现有 JSON 格式(保持与 JSON 文件完全兼容)。 失败时返回空列表,调用方应 fallback 到 JSON。 内存安全:每次读取只返回配置列表,无缓存,无大对象。 """ try: with get_sync_conn() as conn: with conn.cursor() as cur: cur.execute(""" SELECT strategy_id::text, display_name, symbol, cvd_fast_window, cvd_slow_window, weight_direction, weight_env, weight_aux, weight_momentum, entry_score, gate_obi_enabled, obi_threshold, gate_whale_enabled, whale_usd_threshold, whale_flow_pct, gate_vol_enabled, vol_atr_pct_min, gate_cvd_enabled, gate_spot_perp_enabled, spot_perp_threshold, sl_atr_multiplier, tp1_ratio, tp2_ratio, timeout_minutes, flip_threshold, direction FROM strategies WHERE status = 'running' ORDER BY created_at ASC """) rows = cur.fetchall() configs = [] for row in rows: (sid, display_name, symbol, cvd_fast, cvd_slow, w_dir, w_env, w_aux, w_mom, entry_score, gate_obi, obi_thr, gate_whale, whale_usd_thr, whale_flow_pct_val, gate_vol, vol_atr_pct, gate_cvd, gate_spot, spot_thr, sl_mult, tp1_r, tp2_r, timeout_min, flip_thr, direction) = row # 把 display_name 映射回 legacy strategy name(用于兼容评分逻辑) # legacy 策略用固定 UUID 识别 LEGACY_UUID_MAP = { "00000000-0000-0000-0000-000000000053": "v53", "00000000-0000-0000-0000-000000000054": "v53_middle", "00000000-0000-0000-0000-000000000055": "v53_fast", } strategy_name = LEGACY_UUID_MAP.get(sid, f"custom_{sid[:8]}") # 构造与 JSON 文件格式兼容的配置 dict cfg = { "name": strategy_name, "strategy_id": sid, # V5.4 新增:用于写 strategy_id 到 DB "strategy_name_snapshot": display_name, # V5.4 新增:写入时快照名称 "symbol": symbol, "direction": direction, "cvd_fast_window": cvd_fast, "cvd_slow_window": cvd_slow, "threshold": entry_score, "weights": { "direction": w_dir, "env": w_env, "aux": w_aux, "momentum": w_mom, }, "gates": { # 门1 波动率 "vol": {"enabled": gate_vol, "vol_atr_pct_min": float(vol_atr_pct or 0.002)}, # 门2 CVD共振 "cvd": {"enabled": gate_cvd}, # 门3 鲸鱼否决 "whale": {"enabled": gate_whale, "whale_usd_threshold": float(whale_usd_thr or 50000), "whale_flow_pct": float(whale_flow_pct_val or 0.5)}, # 门4 OBI否决 "obi": {"enabled": gate_obi, "threshold": float(obi_thr or 0.35)}, # 门5 期现背离 "spot_perp": {"enabled": gate_spot, "threshold": float(spot_thr or 0.005)}, }, "tp_sl": { "sl_multiplier": sl_mult, "tp1_multiplier": tp1_r, "tp2_multiplier": tp2_r, }, "timeout_minutes": timeout_min, "flip_threshold": flip_thr, } configs.append(cfg) logger.info(f"[DB] 已加载 {len(configs)} 个策略配置: {[c['name'] for c in configs]}") return configs except Exception as e: logger.warning(f"[DB] load_strategy_configs_from_db 失败,将 fallback 到 JSON: {e}") return [] # ─── 模拟盘配置 ─────────────────────────────────────────────────── PAPER_TRADING_ENABLED = False # 总开关(兼容旧逻辑) PAPER_ENABLED_STRATEGIES = [] # 分策略开关: ["v51_baseline", "v52_8signals"] PAPER_INITIAL_BALANCE = 10000 # 虚拟初始资金 USDT PAPER_RISK_PER_TRADE = 0.02 # 单笔风险 2%(即200U) PAPER_MAX_POSITIONS = 4 # 每套策略最大同时持仓数 PAPER_TIER_MULTIPLIER = { # 档位仓位倍数 "light": 0.5, # 轻仓: 1% "standard": 1.0, # 标准: 2% "heavy": 1.5, # 加仓: 3% } PAPER_FEE_RATE = 0.0005 # Taker手续费 0.05%(开仓+平仓各一次) def load_paper_config(): """从配置文件加载模拟盘开关和参数""" global PAPER_TRADING_ENABLED, PAPER_ENABLED_STRATEGIES, PAPER_INITIAL_BALANCE, PAPER_RISK_PER_TRADE, PAPER_MAX_POSITIONS config_path = os.path.join(os.path.dirname(__file__), "paper_config.json") try: with open(config_path, "r") as f: import json as _json2 cfg = _json2.load(f) PAPER_TRADING_ENABLED = cfg.get("enabled", False) PAPER_ENABLED_STRATEGIES = cfg.get("enabled_strategies", []) PAPER_INITIAL_BALANCE = cfg.get("initial_balance", 10000) PAPER_RISK_PER_TRADE = cfg.get("risk_per_trade", 0.02) PAPER_MAX_POSITIONS = cfg.get("max_positions", 4) except FileNotFoundError: pass def is_strategy_enabled(strategy_name: str) -> bool: """检查某策略是否启用模拟盘""" if not PAPER_TRADING_ENABLED: return False # 如果enabled_strategies为空,走旧逻辑(全部启用) if not PAPER_ENABLED_STRATEGIES: return True return strategy_name in PAPER_ENABLED_STRATEGIES # ───────────────────────────────────────────────────────────────── # 窗口大小(毫秒) WINDOW_FAST = 30 * 60 * 1000 # 30分钟 WINDOW_MID = 4 * 3600 * 1000 # 4小时 WINDOW_DAY = 24 * 3600 * 1000 # 24小时 WINDOW_VWAP = 30 * 60 * 1000 # 30分钟 # ATR参数 ATR_PERIOD_MS = 5 * 60 * 1000 ATR_LENGTH = 14 # 信号冷却 COOLDOWN_MS = 10 * 60 * 1000 def fetch_market_indicators(symbol: str) -> dict: """从PG读取最新的market_indicators数据,解析JSONB提取关键数值""" import json as _json with get_sync_conn() as conn: with conn.cursor() as cur: indicators = {} ind_types = [ "long_short_ratio", "top_trader_position", "open_interest_hist", "coinbase_premium", "funding_rate", "obi_depth_10", "spot_perp_divergence", "tiered_cvd_whale", # Phase 2 ] for ind_type in ind_types: cur.execute( "SELECT value FROM market_indicators WHERE symbol=%s AND indicator_type=%s ORDER BY timestamp_ms DESC LIMIT 1", (symbol, ind_type), ) row = cur.fetchone() if not row or row[0] is None: indicators[ind_type] = None continue val = row[0] if isinstance(val, str): try: val = _json.loads(val) except Exception: indicators[ind_type] = None continue # 提取关键数值 if ind_type == "long_short_ratio": indicators[ind_type] = float(val.get("longShortRatio", 1.0)) elif ind_type == "top_trader_position": indicators[ind_type] = float(val.get("longAccount", 0.5)) elif ind_type == "open_interest_hist": indicators[ind_type] = float(val.get("sumOpenInterestValue", 0)) elif ind_type == "coinbase_premium": indicators[ind_type] = float(val.get("premium_pct", 0)) / 100.0 elif ind_type == "funding_rate": indicators[ind_type] = float(val.get("fundingRate", val.get("lastFundingRate", 0))) elif ind_type == "obi_depth_10": # obi范围[-1,1],正值=买压,负值=卖压 indicators[ind_type] = float(val.get("obi", 0)) elif ind_type == "spot_perp_divergence": # divergence = (spot - mark) / mark indicators[ind_type] = float(val.get("divergence", 0)) elif ind_type == "tiered_cvd_whale": # 巨鲸净CVD比率[-1,1],正值=净买入 indicators[ind_type] = float(val.get("whale_cvd_ratio", 0)) return indicators def to_float(value: Any) -> Optional[float]: try: return float(value) if value is not None else None except (TypeError, ValueError): return None # ─── 滚动窗口 ─────────────────────────────────────────────────── class TradeWindow: def __init__(self, window_ms: int): self.window_ms = window_ms self.trades: deque = deque() self.buy_vol = 0.0 self.sell_vol = 0.0 self.pq_sum = 0.0 self.q_sum = 0.0 def add(self, time_ms: int, qty: float, price: float, is_buyer_maker: int): self.trades.append((time_ms, qty, price, is_buyer_maker)) pq = price * qty self.pq_sum += pq self.q_sum += qty if is_buyer_maker == 0: self.buy_vol += qty else: self.sell_vol += qty def trim(self, now_ms: int): cutoff = now_ms - self.window_ms while self.trades and self.trades[0][0] < cutoff: t_ms, qty, price, ibm = self.trades.popleft() self.pq_sum -= price * qty self.q_sum -= qty if ibm == 0: self.buy_vol -= qty else: self.sell_vol -= qty @property def cvd(self) -> float: return self.buy_vol - self.sell_vol @property def vwap(self) -> float: return self.pq_sum / self.q_sum if self.q_sum > 0 else 0.0 class ATRCalculator: def __init__(self, period_ms: int = ATR_PERIOD_MS, length: int = ATR_LENGTH): self.period_ms = period_ms self.length = length self.candles: deque = deque(maxlen=length + 1) self.current_candle: Optional[dict] = None self.atr_history: deque = deque(maxlen=288) def update(self, time_ms: int, price: float): bar_ms = (time_ms // self.period_ms) * self.period_ms if self.current_candle is None or self.current_candle["bar"] != bar_ms: if self.current_candle is not None: self.candles.append(self.current_candle) self.current_candle = {"bar": bar_ms, "open": price, "high": price, "low": price, "close": price} else: c = self.current_candle c["high"] = max(c["high"], price) c["low"] = min(c["low"], price) c["close"] = price @property def atr(self) -> float: if len(self.candles) < 2: return 0.0 trs = [] candles_list = list(self.candles) for i in range(1, len(candles_list)): prev_close = candles_list[i-1]["close"] c = candles_list[i] tr = max(c["high"] - c["low"], abs(c["high"] - prev_close), abs(c["low"] - prev_close)) trs.append(tr) if not trs: return 0.0 atr_val = trs[0] for tr in trs[1:]: atr_val = (atr_val * (self.length - 1) + tr) / self.length return atr_val @property def atr_percentile(self) -> float: current = self.atr if current == 0: return 50.0 self.atr_history.append(current) if len(self.atr_history) < 10: return 50.0 sorted_hist = sorted(self.atr_history) rank = sum(1 for x in sorted_hist if x <= current) return (rank / len(sorted_hist)) * 100 # ─── FR历史最大值缓存(每小时更新)─────────────────────────────── _max_fr_cache: dict = {} # {symbol: max_abs_fr} _max_fr_updated: float = 0 def get_max_fr(symbol: str) -> float: """获取该币种历史最大|FR|,每小时刷新一次""" global _max_fr_cache, _max_fr_updated now = time.time() if now - _max_fr_updated > 3600 or symbol not in _max_fr_cache: try: with get_sync_conn() as conn: with conn.cursor() as cur: cur.execute( "SELECT symbol, MAX(ABS((value->>'fundingRate')::float)) as max_fr " "FROM market_indicators WHERE indicator_type='funding_rate' " "GROUP BY symbol" ) for row in cur.fetchall(): _max_fr_cache[row[0]] = row[1] if row[1] else 0.0001 _max_fr_updated = now except Exception as e: logger.warning(f"get_max_fr error: {e}") return _max_fr_cache.get(symbol, 0.0001) # 默认0.01%防除零 class SymbolState: def __init__(self, symbol: str): self.symbol = symbol self.win_fast = TradeWindow(WINDOW_FAST) self.win_mid = TradeWindow(WINDOW_MID) self.win_day = TradeWindow(WINDOW_DAY) self.win_vwap = TradeWindow(WINDOW_VWAP) self.atr_calc = ATRCalculator() self.last_processed_id = 0 self.last_trade_price = 0.0 self.warmup = True self.prev_cvd_fast = 0.0 self.prev_cvd_fast_slope = 0.0 self.prev_oi_value = 0.0 self.market_indicators = fetch_market_indicators(symbol) self.last_signal_ts: dict[str, int] = {} self.last_signal_dir: dict[str, str] = {} self.recent_large_trades: deque = deque() # ── Phase 2 实时内存字段(由后台WebSocket协程更新)────────── self.rt_obi: float = 0.0 # 订单簿失衡[-1,1](实时WebSocket,所有symbol) self.rt_spot_perp_div: float = 0.0 # 期现背离(spot-mark)/mark(实时WebSocket,所有symbol) # tiered_cvd_whale:按成交额分档,实时累计(最近15分钟窗口) self._whale_trades: deque = deque() # (time_ms, usd_val, is_sell) self.WHALE_WINDOW_MS: int = 15 * 60 * 1000 # 15分钟 def process_trade(self, agg_id: int, time_ms: int, price: float, qty: float, is_buyer_maker: int): now_ms = time_ms self.win_fast.add(time_ms, qty, price, is_buyer_maker) self.win_mid.add(time_ms, qty, price, is_buyer_maker) self.win_day.add(time_ms, qty, price, is_buyer_maker) self.win_vwap.add(time_ms, qty, price, is_buyer_maker) self.atr_calc.update(time_ms, price) self.win_fast.trim(now_ms) self.win_mid.trim(now_ms) self.win_day.trim(now_ms) self.win_vwap.trim(now_ms) self.last_processed_id = agg_id self.last_trade_price = price # 最新成交价,用于entry_price # tiered_cvd_whale 实时累计(>$100k 为巨鲸) usd_val = price * qty if usd_val >= 100_000: self._whale_trades.append((time_ms, usd_val, bool(is_buyer_maker))) # 修剪15分钟窗口 cutoff = now_ms - self.WHALE_WINDOW_MS while self._whale_trades and self._whale_trades[0][0] < cutoff: self._whale_trades.popleft() @property def whale_cvd_ratio(self) -> float: """巨鲸净CVD比率[-1,1],基于最近15分钟>$100k成交""" buy_usd = sum(t[1] for t in self._whale_trades if not t[2]) sell_usd = sum(t[1] for t in self._whale_trades if t[2]) total = buy_usd + sell_usd return (buy_usd - sell_usd) / total if total > 0 else 0.0 def compute_p95_p99(self) -> tuple: if len(self.win_day.trades) < 100: return 5.0, 10.0 qtys = sorted([t[1] for t in self.win_day.trades]) n = len(qtys) p95 = qtys[int(n * 0.95)] p99 = qtys[int(n * 0.99)] if "BTC" in self.symbol: p95 = max(p95, 5.0); p99 = max(p99, 10.0) else: p95 = max(p95, 50.0); p99 = max(p99, 100.0) return p95, p99 def update_large_trades(self, now_ms: int, p99: float): cutoff = now_ms - 15 * 60 * 1000 while self.recent_large_trades and self.recent_large_trades[0][0] < cutoff: self.recent_large_trades.popleft() # 只检查新trade(避免重复添加) seen = set(t[0] for t in self.recent_large_trades) # time_ms作为去重key for t in self.win_fast.trades: if t[1] >= p99 and t[0] > cutoff and t[0] not in seen: self.recent_large_trades.append((t[0], t[1], t[3])) seen.add(t[0]) def build_evaluation_snapshot(self, now_ms: int) -> dict: cvd_fast = self.win_fast.cvd cvd_mid = self.win_mid.cvd cvd_day = self.win_day.cvd vwap = self.win_vwap.vwap atr = self.atr_calc.atr atr_pct = self.atr_calc.atr_percentile p95, p99 = self.compute_p95_p99() self.update_large_trades(now_ms, p99) price = self.last_trade_price if self.last_trade_price > 0 else vwap # 用最新成交价,非VWAP cvd_fast_slope = cvd_fast - self.prev_cvd_fast cvd_fast_accel = cvd_fast_slope - self.prev_cvd_fast_slope self.prev_cvd_fast = cvd_fast self.prev_cvd_fast_slope = cvd_fast_slope oi_value = to_float(self.market_indicators.get("open_interest_hist")) if oi_value is None or self.prev_oi_value == 0: oi_change = 0.0 environment_score = 10 else: oi_change = (oi_value - self.prev_oi_value) / self.prev_oi_value if self.prev_oi_value > 0 else 0.0 if oi_change >= 0.03: environment_score = 15 elif oi_change > 0: environment_score = 10 else: environment_score = 5 if oi_value is not None and oi_value > 0: self.prev_oi_value = oi_value return { "cvd_fast": cvd_fast, "cvd_mid": cvd_mid, "cvd_day": cvd_day, "vwap": vwap, "atr": atr, "atr_value": atr, # V5.3: ATR绝对值快照(用于feature_events落库) "atr_pct": atr_pct, "p95": p95, "p99": p99, "price": price, "cvd_fast_slope": cvd_fast_slope, "cvd_fast_accel": cvd_fast_accel, "oi_change": oi_change, "environment_score": environment_score, "oi_value": oi_value, } def fetch_recent_liquidations(self, window_ms: int = 300000): """Fetch last 5min liquidation totals from liquidations table""" now_ms = int(time.time() * 1000) cutoff = now_ms - window_ms with get_sync_conn() as conn: with conn.cursor() as cur: cur.execute( """ SELECT COALESCE(SUM(CASE WHEN side='LONG' THEN usd_value ELSE 0 END), 0) as long_liq, COALESCE(SUM(CASE WHEN side='SHORT' THEN usd_value ELSE 0 END), 0) as short_liq FROM liquidations WHERE symbol=%s AND trade_time >= %s """, (self.symbol, cutoff), ) row = cur.fetchone() if row: return {"long_usd": row[0], "short_usd": row[1]} return None def evaluate_signal(self, now_ms: int, strategy_cfg: Optional[dict] = None, snapshot: Optional[dict] = None) -> dict: strategy_cfg = strategy_cfg or {} strategy_name = strategy_cfg.get("name", "v51_baseline") track = strategy_cfg.get("track", "ALT") # ─── Track Router ─────────────────────────────────────────── # v53 → 统一评分(BTC/ETH/XRP/SOL) # v53_alt / v53_btc → 兼容旧策略名,转发到 _evaluate_v53() # v51/v52 → 原有代码路径(兼容,不修改) if strategy_name.startswith("v53"): allowed_symbols = strategy_cfg.get("symbols", []) if allowed_symbols and self.symbol not in allowed_symbols: snap = snapshot or self.build_evaluation_snapshot(now_ms) return self._empty_result(strategy_name, snap) return self._evaluate_v53(now_ms, strategy_cfg, snapshot) # ─── 原有V5.1/V5.2评分逻辑(保持不变)──────────────────────── strategy_cfg = strategy_cfg or {} strategy_name = strategy_cfg.get("name", "v51_baseline") strategy_threshold = int(strategy_cfg.get("threshold", 75)) enabled_signals = set(strategy_cfg.get("signals", [])) snap = snapshot or self.build_evaluation_snapshot(now_ms) cvd_fast = snap["cvd_fast"] cvd_mid = snap["cvd_mid"] vwap = snap["vwap"] atr = snap["atr"] atr_pct = snap["atr_pct"] p95 = snap["p95"] p99 = snap["p99"] price = snap["price"] cvd_fast_slope = snap["cvd_fast_slope"] cvd_fast_accel = snap["cvd_fast_accel"] oi_change = snap["oi_change"] environment_score = snap["environment_score"] result = { "strategy": strategy_name, "cvd_fast": cvd_fast, "cvd_mid": cvd_mid, "cvd_day": snap["cvd_day"], "cvd_fast_slope": cvd_fast_slope, "atr": atr, "atr_pct": atr_pct, "vwap": vwap, "price": price, "p95": p95, "p99": p99, "signal": None, "direction": None, "score": 0, "tier": None, "factors": {}, } if self.warmup or price == 0 or atr == 0: return result # 判断倾向方向(用于评分展示,即使冷却或方向不一致也计算) no_direction = False last_signal_ts = self.last_signal_ts.get(strategy_name, 0) in_cooldown = now_ms - last_signal_ts < COOLDOWN_MS if cvd_fast > 0 and cvd_mid > 0: direction = "LONG" elif cvd_fast < 0 and cvd_mid < 0: direction = "SHORT" else: direction = "LONG" if cvd_fast > 0 else "SHORT" no_direction = True # V5.1 五层评分体系(总分100,方向层可因加速额外+5) # 1) 方向层(45分 + 5加速分) direction_score = 0 if (direction == "LONG" and cvd_fast > 0) or (direction == "SHORT" and cvd_fast < 0): direction_score += 15 if (direction == "LONG" and cvd_mid > 0) or (direction == "SHORT" and cvd_mid < 0): direction_score += 15 has_adverse_p99 = any( (direction == "LONG" and lt[2] == 1) or (direction == "SHORT" and lt[2] == 0) for lt in self.recent_large_trades ) has_aligned_p99 = any( (direction == "LONG" and lt[2] == 0) or (direction == "SHORT" and lt[2] == 1) for lt in self.recent_large_trades ) if has_aligned_p99: direction_score += 15 elif not has_adverse_p99: direction_score += 10 accel_bonus = 0 if "accel" in enabled_signals and ( (direction == "LONG" and cvd_fast_accel > 0) or (direction == "SHORT" and cvd_fast_accel < 0) ): accel_bonus = int(strategy_cfg.get("accel_bonus", 5)) # 2) 拥挤层(20分)- market_indicators缺失时给中间分 long_short_ratio = to_float(self.market_indicators.get("long_short_ratio")) if long_short_ratio is None: ls_score = 5 elif (direction == "SHORT" and long_short_ratio > 2.0) or (direction == "LONG" and long_short_ratio < 0.5): ls_score = 10 else: ls_score = 5 top_trader_position = to_float(self.market_indicators.get("top_trader_position")) if top_trader_position is None: top_trader_score = 5 else: if direction == "LONG": if top_trader_position >= 0.55: top_trader_score = 10 elif top_trader_position <= 0.45: top_trader_score = 0 else: top_trader_score = 5 else: if top_trader_position <= 0.45: top_trader_score = 10 elif top_trader_position >= 0.55: top_trader_score = 0 else: top_trader_score = 5 crowding_score = ls_score + top_trader_score # Funding Rate scoring (拥挤层加分) # Read from market_indicators table funding_rate = to_float(self.market_indicators.get("funding_rate")) fr_score = 0.0 if "funding_rate" in enabled_signals and funding_rate is not None and funding_rate != 0: max_fr = get_max_fr(self.symbol) fr_abs = abs(funding_rate) # 线性映射: raw = (|当前FR| / 历史最大FR) × 5, clamp到5 raw_score = min(fr_abs / max_fr * 5.0, 5.0) # FR评估"资金费率对当前方向有多有利",0~5分,不扣分 # 做多+FR负(空头付费给多头)=有利 → 给分 # 做空+FR正(多头付费给空头)=有利 → 给分 # 其他情况(FR方向不利) → 0分 if (direction == "LONG" and funding_rate < 0) or (direction == "SHORT" and funding_rate > 0): fr_score = round(raw_score, 2) # 有利,给分 else: fr_score = 0.0 # 不利,不给分也不扣分 # 4) 确认层(15分) confirmation_score = 15 if ( (direction == "LONG" and cvd_fast > 0 and cvd_mid > 0) or (direction == "SHORT" and cvd_fast < 0 and cvd_mid < 0) ) else 0 # Liquidation scoring (确认层加分) liq_score = 0 liq_data = None if "liquidation" in enabled_signals: liq_data = self.fetch_recent_liquidations() if liq_data: liq_long_usd = liq_data.get("long_usd", 0) liq_short_usd = liq_data.get("short_usd", 0) total = liq_long_usd + liq_short_usd if total > 0: # ratio = 空头清算 / 多头清算 # ratio高 = 空头爆仓多 = 有利于做多 if liq_long_usd > 0: ratio = liq_short_usd / liq_long_usd else: ratio = float("inf") if liq_short_usd > 0 else 1.0 # 梯度评分:ratio越极端分越高 def liq_ratio_to_score(r: float) -> int: if r >= 2.0: return 5 if r >= 1.5: return 3 if r >= 1.2: return 1 return 0 if direction == "LONG": # 做多:空头爆仓多(ratio高)有利 liq_score = liq_ratio_to_score(ratio) else: # 做空:多头爆仓多(ratio低=1/ratio高)有利 inv_ratio = (1.0 / ratio) if ratio > 0 else float("inf") liq_score = liq_ratio_to_score(inv_ratio) # 5) 辅助层(5分) coinbase_premium = to_float(self.market_indicators.get("coinbase_premium")) if coinbase_premium is None: aux_score = 2 elif (direction == "LONG" and coinbase_premium > 0.0005) or (direction == "SHORT" and coinbase_premium < -0.0005): aux_score = 5 elif abs(coinbase_premium) <= 0.0005: aux_score = 2 else: aux_score = 0 # 按策略配置的权重缩放各层分数 weights = strategy_cfg.get("weights", {}) w_direction = weights.get("direction", 45) w_crowding = weights.get("crowding", 20) w_fr = weights.get("funding_rate", 0) w_environment = weights.get("environment", 15) w_confirmation = weights.get("confirmation", 15) w_liq = weights.get("liquidation", 0) w_auxiliary = weights.get("auxiliary", 5) # 原始评分范围: direction 0~45, crowding 0~20, environment 0~15, confirmation 0~15, auxiliary 0~5 # FR 0~5 (or -5~5), Liq 0~5 # 缩放到配置权重 scaled_direction = min(round(direction_score / 45 * w_direction) + accel_bonus, w_direction) scaled_crowding = round(crowding_score / 20 * w_crowding) scaled_fr = fr_score if w_fr > 0 else 0 # FR already 0~5 range, matches w_fr=5 scaled_environment = round(environment_score / 15 * w_environment) scaled_confirmation = round(confirmation_score / 15 * w_confirmation) scaled_liq = liq_score if w_liq > 0 else 0 # Liq already 0~5 range, matches w_liq=5 scaled_auxiliary = round(aux_score / 5 * w_auxiliary) total_score = scaled_direction + scaled_crowding + scaled_fr + scaled_environment + scaled_confirmation + scaled_liq + scaled_auxiliary total_score = max(0, min(round(total_score, 1), 100)) # clamp 0~100, 保留1位小数 result["score"] = total_score result["direction"] = direction result["factors"] = { "direction": { "score": scaled_direction, "max": w_direction, "cvd_fast": 15 if ((direction == "LONG" and cvd_fast > 0) or (direction == "SHORT" and cvd_fast < 0)) else 0, "cvd_mid": 15 if ((direction == "LONG" and cvd_mid > 0) or (direction == "SHORT" and cvd_mid < 0)) else 0, "p99_flow": 15 if has_aligned_p99 else (10 if not has_adverse_p99 else 0), "accel_bonus": accel_bonus, }, "crowding": {"score": scaled_crowding, "max": w_crowding, "long_short_ratio": ls_score, "top_trader_position": top_trader_score}, "environment": {"score": scaled_environment, "max": w_environment, "open_interest_hist": oi_change}, "confirmation": {"score": scaled_confirmation, "max": w_confirmation}, "auxiliary": {"score": scaled_auxiliary, "max": w_auxiliary, "coinbase_premium": coinbase_premium}, "funding_rate": {"score": round(scaled_fr, 2), "max": w_fr, "value": funding_rate, "max_fr_hist": get_max_fr(self.symbol)}, "liquidation": { "score": scaled_liq, "max": w_liq, "long_usd": liq_data.get("long_usd", 0) if liq_data else 0, "short_usd": liq_data.get("short_usd", 0) if liq_data else 0, }, } # 始终输出direction供反向平仓判断(不受冷却限制) result["direction"] = direction if not no_direction else None heavy_threshold = max(strategy_threshold + 10, 85) if total_score >= heavy_threshold and not no_direction and not in_cooldown: result["signal"] = direction result["tier"] = "heavy" elif total_score >= strategy_threshold and not no_direction and not in_cooldown: result["signal"] = direction result["tier"] = "standard" else: result["signal"] = None result["tier"] = None if result["signal"]: self.last_signal_ts[strategy_name] = now_ms self.last_signal_dir[strategy_name] = direction return result def _empty_result(self, strategy_name: str, snap: dict) -> dict: """返回空评分结果(symbol不匹配track时使用)""" return { "strategy": strategy_name, "cvd_fast": snap["cvd_fast"], "cvd_mid": snap["cvd_mid"], "cvd_day": snap["cvd_day"], "cvd_fast_slope": snap["cvd_fast_slope"], "atr": snap["atr"], "atr_value": snap.get("atr_value", snap["atr"]), "atr_pct": snap["atr_pct"], "vwap": snap["vwap"], "price": snap["price"], "p95": snap["p95"], "p99": snap["p99"], "signal": None, "direction": None, "score": 0, "tier": None, "factors": {}, } def _window_ms(window_str: str) -> int: """把CVD窗口字符串转换为毫秒,如 '5m'->300000, '1h'->3600000, '4h'->14400000""" window_str = (window_str or "30m").strip().lower() if window_str.endswith("h"): return int(window_str[:-1]) * 3600 * 1000 elif window_str.endswith("m"): return int(window_str[:-1]) * 60 * 1000 return 30 * 60 * 1000 # fallback 30min def _evaluate_v53(self, now_ms: int, strategy_cfg: dict, snapshot: Optional[dict] = None) -> dict: """ V5.3 统一评分(BTC/ETH/XRP/SOL) 架构:四层评分 55/25/15/5 + per-symbol 四门控制 - 门1:波动率下限(atr/price) - 门2:CVD共振(fast+mid同向) - 门3:OBI否决(实时WebSocket,fallback DB) - 门4:期现背离否决(实时WebSocket,fallback DB) BTC额外:whale_cvd_ratio(>$100k巨鲸CVD) """ strategy_name = strategy_cfg.get("name", "v53") strategy_threshold = int(strategy_cfg.get("threshold", 75)) flip_threshold = int(strategy_cfg.get("flip_threshold", 85)) snap = snapshot or self.build_evaluation_snapshot(now_ms) # 按策略配置的 cvd_fast_window / cvd_slow_window 动态切片重算CVD # 支持 5m/15m/30m/1h/4h 所有组合 cvd_fast_window = strategy_cfg.get("cvd_fast_window", "30m") cvd_slow_window = strategy_cfg.get("cvd_slow_window", "4h") fast_ms = _window_ms(cvd_fast_window) slow_ms = _window_ms(cvd_slow_window) # 默认窗口 (30m/4h) 直接用快照,否则从 trades 列表切片重算 if cvd_fast_window == "30m" and cvd_slow_window == "4h": cvd_fast = snap["cvd_fast"] cvd_mid = snap["cvd_mid"] else: cutoff_fast = now_ms - fast_ms cutoff_slow = now_ms - slow_ms buy_f = sell_f = buy_m = sell_m = 0.0 # fast: 从 win_fast (30min) 或 win_mid (4h) 中切片 src_fast = self.win_mid if fast_ms > WINDOW_FAST else self.win_fast for t_ms, qty, _price, ibm in src_fast.trades: if t_ms >= cutoff_fast: if ibm == 0: buy_f += qty else: sell_f += qty # slow: 从 win_mid (4h) 中切片 for t_ms, qty, _price, ibm in self.win_mid.trades: if t_ms >= cutoff_slow: if ibm == 0: buy_m += qty else: sell_m += qty cvd_fast = buy_f - sell_f cvd_mid = buy_m - sell_m price = snap["price"] atr = snap["atr"] atr_value = snap.get("atr_value", atr) cvd_fast_accel = snap["cvd_fast_accel"] environment_score_raw = snap["environment_score"] result = self._empty_result(strategy_name, snap) if self.warmup or price == 0 or atr == 0: return result last_signal_ts = self.last_signal_ts.get(strategy_name, 0) in_cooldown = now_ms - last_signal_ts < COOLDOWN_MS # ── 五门参数:优先读 DB config(V5.4),fallback 到 JSON symbol_gates ──── # DB config 来自 load_strategy_configs_from_db(),已映射到 strategy_cfg["gates"] db_gates = strategy_cfg.get("gates") or {} symbol_gates = (strategy_cfg.get("symbol_gates") or {}).get(self.symbol, {}) # 门1 波动率:vol_atr_pct_min(ATR%价格,如0.002=价格的0.2%) gate_vol_enabled = db_gates.get("vol", {}).get("enabled", True) min_vol = float(db_gates.get("vol", {}).get("vol_atr_pct_min") or symbol_gates.get("min_vol_threshold", 0.002)) # 门2 CVD共振:gate_cvd_enabled(快慢CVD必须同向) gate_cvd_enabled = db_gates.get("cvd", {}).get("enabled", True) # 门3 鲸鱼否决:whale_usd_threshold + whale_flow_pct(BTC) gate_whale_enabled = db_gates.get("whale", {}).get("enabled", True) whale_usd = float(db_gates.get("whale", {}).get("whale_usd_threshold") or symbol_gates.get("whale_threshold_usd", 50000)) whale_flow_pct = float(db_gates.get("whale", {}).get("whale_flow_pct") or symbol_gates.get("whale_flow_threshold_pct", 0.5)) / 100 # 门4 OBI否决:obi_threshold gate_obi_enabled = db_gates.get("obi", {}).get("enabled", True) obi_veto = float(db_gates.get("obi", {}).get("threshold") or symbol_gates.get("obi_veto_threshold", 0.35)) # 门5 期现背离:spot_perp_threshold gate_spd_enabled = db_gates.get("spot_perp", {}).get("enabled", False) spd_veto = float(db_gates.get("spot_perp", {}).get("threshold") or symbol_gates.get("spot_perp_divergence_veto", 0.005)) gate_block = None # 门1:波动率下限(可关闭) atr_pct_price = atr / price if price > 0 else 0 if gate_vol_enabled and atr_pct_price < min_vol: gate_block = f"low_vol({atr_pct_price:.4f}<{min_vol})" # 门2:CVD共振(方向门,可关闭) if cvd_fast > 0 and cvd_mid > 0: direction = "LONG" cvd_resonance = 30 no_direction = False elif cvd_fast < 0 and cvd_mid < 0: direction = "SHORT" cvd_resonance = 30 no_direction = False else: direction = "LONG" if cvd_fast > 0 else "SHORT" cvd_resonance = 0 if gate_cvd_enabled: no_direction = True if not gate_block: gate_block = "no_direction_consensus" else: no_direction = False # 门2关闭时,允许单线CVD方向 # 门3:鲸鱼否决(BTC用whale_cvd_ratio,ALT用大单对立,可关闭) if gate_whale_enabled and not gate_block and not no_direction: if self.symbol == "BTCUSDT": # BTC:巨鲸CVD净方向与信号方向冲突时否决 whale_cvd = self.whale_cvd_ratio if self._whale_trades else to_float(self.market_indicators.get("tiered_cvd_whale")) or 0.0 if (direction == "LONG" and whale_cvd < -whale_flow_pct) or \ (direction == "SHORT" and whale_cvd > whale_flow_pct): gate_block = f"whale_cvd_veto({whale_cvd:.3f})" else: # ALT:recent_large_trades 里有对立大单则否决 whale_adverse = any( (direction == "LONG" and lt[2] == 1 and lt[1] * price >= whale_usd) or (direction == "SHORT" and lt[2] == 0 and lt[1] * price >= whale_usd) for lt in self.recent_large_trades ) whale_aligned = any( (direction == "LONG" and lt[2] == 0 and lt[1] * price >= whale_usd) or (direction == "SHORT" and lt[2] == 1 and lt[1] * price >= whale_usd) for lt in self.recent_large_trades ) if whale_adverse and not whale_aligned: gate_block = f"whale_adverse(>${whale_usd/1000:.0f}k)" # 门4:OBI否决(实时WS优先,fallback DB,可关闭) obi_raw = self.rt_obi if self.rt_obi != 0.0 else to_float(self.market_indicators.get("obi_depth_10")) if gate_obi_enabled and not gate_block and not no_direction and obi_raw is not None: if direction == "LONG" and obi_raw < -obi_veto: gate_block = f"obi_veto({obi_raw:.3f}<-{obi_veto})" elif direction == "SHORT" and obi_raw > obi_veto: gate_block = f"obi_veto({obi_raw:.3f}>{obi_veto})" # 门5:期现背离否决(实时WS优先,fallback DB,可关闭) spot_perp_div = self.rt_spot_perp_div if self.rt_spot_perp_div != 0.0 else to_float(self.market_indicators.get("spot_perp_divergence")) if gate_spd_enabled and not gate_block and not no_direction and spot_perp_div is not None: if (direction == "LONG" and spot_perp_div < -spd_veto) or \ (direction == "SHORT" and spot_perp_div > spd_veto): gate_block = f"spd_veto({spot_perp_div:.4f})" gate_passed = gate_block is None # ── Direction Layer(55分)───────────────────────────────── has_adverse_p99 = any( (direction == "LONG" and lt[2] == 1) or (direction == "SHORT" and lt[2] == 0) for lt in self.recent_large_trades ) has_aligned_p99 = any( (direction == "LONG" and lt[2] == 0) or (direction == "SHORT" and lt[2] == 1) for lt in self.recent_large_trades ) p99_flow = 20 if has_aligned_p99 else (10 if not has_adverse_p99 else 0) accel_bonus = 5 if ( (direction == "LONG" and cvd_fast_accel > 0) or (direction == "SHORT" and cvd_fast_accel < 0) ) else 0 # v53_fast:accel 独立触发路径(不要求 cvd 双线同向) accel_independent_score = 0 if is_fast and not no_direction: accel_cfg = strategy_cfg.get("accel_independent", {}) if accel_cfg.get("enabled", False): # accel 足够大 + p99 大单同向 → 独立给分 accel_strong = ( (direction == "LONG" and cvd_fast_accel > 0 and has_aligned_p99) or (direction == "SHORT" and cvd_fast_accel < 0 and has_aligned_p99) ) if accel_strong: accel_independent_score = int(accel_cfg.get("min_direction_score", 35)) direction_score = max(min(cvd_resonance + p99_flow + accel_bonus, 55), accel_independent_score) # ── Crowding Layer(25分)───────────────────────────────── long_short_ratio = to_float(self.market_indicators.get("long_short_ratio")) if long_short_ratio is None: ls_score = 7 elif (direction == "SHORT" and long_short_ratio > 2.0) or (direction == "LONG" and long_short_ratio < 0.5): ls_score = 15 elif (direction == "SHORT" and long_short_ratio > 1.5) or (direction == "LONG" and long_short_ratio < 0.7): ls_score = 10 elif (direction == "SHORT" and long_short_ratio > 1.0) or (direction == "LONG" and long_short_ratio < 1.0): ls_score = 7 else: ls_score = 0 top_trader_position = to_float(self.market_indicators.get("top_trader_position")) if top_trader_position is None: top_trader_score = 5 else: if direction == "LONG": top_trader_score = 10 if top_trader_position >= 0.55 else (0 if top_trader_position <= 0.45 else 5) else: top_trader_score = 10 if top_trader_position <= 0.45 else (0 if top_trader_position >= 0.55 else 5) crowding_score = min(ls_score + top_trader_score, 25) # ── Environment Layer(15分)────────────────────────────── # OI变化率基础分(v53: 0~15) oi_base_score = round(environment_score_raw / 15 * 10) # 缩到10分 # v53_fast:OBI 正向加分(5分,放入 environment 层) obi_bonus = 0 if is_fast and obi_raw is not None: obi_cfg = strategy_cfg.get("obi_scoring", {}) strong_thr = float(obi_cfg.get("strong_threshold", 0.30)) weak_thr = float(obi_cfg.get("weak_threshold", 0.15)) strong_sc = int(obi_cfg.get("strong_score", 5)) weak_sc = int(obi_cfg.get("weak_score", 3)) obi_aligned = (direction == "LONG" and obi_raw > 0) or (direction == "SHORT" and obi_raw < 0) obi_abs = abs(obi_raw) if obi_aligned: if obi_abs >= strong_thr: obi_bonus = strong_sc elif obi_abs >= weak_thr: obi_bonus = weak_sc environment_score = min(oi_base_score + obi_bonus, 15) if is_fast else round(environment_score_raw / 15 * 15) # ── Auxiliary Layer(5分)──────────────────────────────── coinbase_premium = to_float(self.market_indicators.get("coinbase_premium")) if coinbase_premium is None: aux_score = 2 elif (direction == "LONG" and coinbase_premium > 0.0005) or (direction == "SHORT" and coinbase_premium < -0.0005): aux_score = 5 elif abs(coinbase_premium) <= 0.0005: aux_score = 2 else: aux_score = 0 total_score = min(direction_score + crowding_score + environment_score + aux_score, 100) total_score = max(0, round(total_score, 1)) if not gate_passed: total_score = 0 # whale_cvd for BTC display whale_cvd_display = (self.whale_cvd_ratio if self._whale_trades else to_float(self.market_indicators.get("tiered_cvd_whale"))) if self.symbol == "BTCUSDT" else None result.update({ "score": total_score, "direction": direction if (not no_direction and gate_passed) else None, "atr_value": atr_value, "cvd_fast_5m": cvd_fast if is_fast else None, # v53_fast: 存5m实算CVD "factors": { "track": "BTC" if self.symbol == "BTCUSDT" else "ALT", "gate_passed": gate_passed, "gate_block": gate_block, "atr_pct_price": round(atr_pct_price, 5), "obi_raw": obi_raw, "spot_perp_div": spot_perp_div, "whale_cvd_ratio": whale_cvd_display, "direction": { "score": direction_score, "max": 55, "cvd_resonance": cvd_resonance, "p99_flow": p99_flow, "accel_bonus": accel_bonus, }, "crowding": { "score": crowding_score, "max": 25, "lsr_contrarian": ls_score, "top_trader_position": top_trader_score, }, "environment": {"score": environment_score, "max": 15, "oi_base": oi_base_score if is_fast else environment_score, "obi_bonus": obi_bonus}, "auxiliary": {"score": aux_score, "max": 5, "coinbase_premium": coinbase_premium}, }, }) if not no_direction and gate_passed and not in_cooldown: if total_score >= flip_threshold: result["signal"] = direction result["tier"] = "heavy" elif total_score >= strategy_threshold: result["signal"] = direction result["tier"] = "standard" if result["signal"]: self.last_signal_ts[strategy_name] = now_ms self.last_signal_dir[strategy_name] = direction return result def _evaluate_v53_alt(self, now_ms: int, strategy_cfg: dict, snapshot: Optional[dict] = None) -> dict: """已废弃,由 _evaluate_v53() 统一处理,保留供兼容""" return self._evaluate_v53(now_ms, strategy_cfg, snapshot) def _evaluate_v53_btc(self, now_ms: int, strategy_cfg: dict, snapshot: Optional[dict] = None) -> dict: """已废弃,由 _evaluate_v53() 统一处理,保留供兼容""" return self._evaluate_v53(now_ms, strategy_cfg, snapshot) # ─── PG DB操作 ─────────────────────────────────────────────────── # ─── PG DB操作 ─────────────────────────────────────────────────── def load_historical(state: SymbolState, window_ms: int): now_ms = int(time.time() * 1000) start_ms = now_ms - window_ms count = 0 with get_sync_conn() as conn: with conn.cursor() as cur: cur.execute( "SELECT agg_id, price, qty, time_ms, is_buyer_maker FROM agg_trades " "WHERE symbol = %s AND time_ms >= %s ORDER BY agg_id ASC", (state.symbol, start_ms) ) while True: rows = cur.fetchmany(5000) if not rows: break for r in rows: state.process_trade(r[0], r[3], r[1], r[2], r[4]) count += 1 logger.info(f"[{state.symbol}] 冷启动完成: 加载{count:,}条历史数据 (窗口={window_ms//3600000}h)") state.warmup = False def fetch_new_trades(symbol: str, last_id: int) -> list: with get_sync_conn() as conn: with conn.cursor() as cur: cur.execute( "SELECT agg_id, price, qty, time_ms, is_buyer_maker FROM agg_trades " "WHERE symbol = %s AND agg_id > %s ORDER BY agg_id ASC LIMIT 10000", (symbol, last_id) ) return [{"agg_id": r[0], "price": r[1], "qty": r[2], "time_ms": r[3], "is_buyer_maker": r[4]} for r in cur.fetchall()] def save_indicator(ts: int, symbol: str, result: dict, strategy: str = "v52_8signals", strategy_id: Optional[str] = None, strategy_name_snapshot: Optional[str] = None): with get_sync_conn() as conn: with conn.cursor() as cur: import json as _json3 factors_json = _json3.dumps(result.get("factors")) if result.get("factors") else None cvd_fast_5m = result.get("cvd_fast_5m") # v53_fast 专用:5m窗口CVD,其他策略为None cur.execute( "INSERT INTO signal_indicators " "(ts,symbol,strategy,cvd_fast,cvd_mid,cvd_day,cvd_fast_slope,atr_5m,atr_percentile,atr_value,vwap_30m,price,p95_qty,p99_qty,score,signal,factors,cvd_fast_5m,strategy_id,strategy_name_snapshot) " "VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", (ts, symbol, strategy, result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["cvd_fast_slope"], result["atr"], result["atr_pct"], result.get("atr_value", result["atr"]), result["vwap"], result["price"], result["p95"], result["p99"], result["score"], result.get("signal"), factors_json, cvd_fast_5m, strategy_id, strategy_name_snapshot) ) # 有信号时通知live_executor if result.get("signal"): cur.execute("NOTIFY new_signal, %s", (f"{symbol}:{strategy}:{result['signal']}:{result['score']}",)) conn.commit() def save_feature_event(ts: int, symbol: str, result: dict, strategy: str): """ V5.3 专用:每次评分后把 raw features + score 层写入 signal_feature_events。 只对 v53_alt / v53_btc 调用,其他策略跳过。 """ if not strategy.startswith("v53"): return f = result.get("factors") or {} track = f.get("track", "ALT") side = result.get("direction") or ("LONG" if result.get("score", 0) >= 0 else "SHORT") score_direction = (f.get("direction") or {}).get("score", 0) if track == "ALT" else (f.get("direction") or {}).get("score", 0) score_crowding = (f.get("crowding") or {}).get("score", 0) score_env = (f.get("environment") or {}).get("score", 0) score_aux = (f.get("auxiliary") or {}).get("score", 0) gate_passed = f.get("gate_passed", True) block_reason = f.get("gate_block") or f.get("block_reason") with get_sync_conn() as conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO signal_feature_events (ts, symbol, track, side, strategy, strategy_version, cvd_fast_raw, cvd_mid_raw, cvd_day_raw, cvd_fast_slope_raw, p95_qty_raw, p99_qty_raw, atr_value, atr_percentile, oi_delta_raw, ls_ratio_raw, top_pos_raw, coinbase_premium_raw, obi_raw, tiered_cvd_whale_raw, score_direction, score_crowding, score_environment, score_aux, score_total, gate_passed, block_reason, price) VALUES (%s,%s,%s,%s,%s,%s, %s,%s,%s,%s, %s,%s, %s,%s, %s,%s,%s,%s, %s,%s, %s,%s,%s,%s,%s, %s,%s,%s) """, ( ts, symbol, track, side, strategy, "v5.3", result.get("cvd_fast"), result.get("cvd_mid"), result.get("cvd_day"), result.get("cvd_fast_slope"), result.get("p95"), result.get("p99"), result.get("atr_value", result.get("atr")), result.get("atr_pct"), result.get("oi_delta"), result.get("ls_ratio"), result.get("top_trader_position"), (f.get("auxiliary") or {}).get("coinbase_premium"), f.get("obi_raw"), f.get("whale_cvd_ratio"), score_direction, score_crowding, score_env, score_aux, result.get("score", 0), gate_passed, block_reason, result.get("price"), ) ) conn.commit() def save_indicator_1m(ts: int, symbol: str, result: dict): bar_ts = (ts // 60000) * 60000 with get_sync_conn() as conn: with conn.cursor() as cur: cur.execute("SELECT id FROM signal_indicators_1m WHERE ts=%s AND symbol=%s", (bar_ts, symbol)) if cur.fetchone(): cur.execute( "UPDATE signal_indicators_1m SET cvd_fast=%s,cvd_mid=%s,cvd_day=%s,atr_5m=%s,vwap_30m=%s,price=%s,score=%s,signal=%s WHERE ts=%s AND symbol=%s", (result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["atr"], result["vwap"], result["price"], result["score"], result.get("signal"), bar_ts, symbol) ) else: cur.execute( "INSERT INTO signal_indicators_1m (ts,symbol,cvd_fast,cvd_mid,cvd_day,atr_5m,vwap_30m,price,score,signal) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", (bar_ts, symbol, result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["atr"], result["vwap"], result["price"], result["score"], result.get("signal")) ) conn.commit() # ─── 模拟盘 ────────────────────────────────────────────────────── def paper_open_trade( symbol: str, direction: str, price: float, score: int, tier: str, atr: float, now_ms: int, factors: dict = None, strategy: str = "v51_baseline", tp_sl: Optional[dict] = None, strategy_id: Optional[str] = None, strategy_name_snapshot: Optional[str] = None, ): """模拟开仓""" import json as _json3 if atr <= 0: return sl_multiplier = float((tp_sl or {}).get("sl_multiplier", 2.0)) tp1_multiplier = float((tp_sl or {}).get("tp1_multiplier", 1.5)) tp2_multiplier = float((tp_sl or {}).get("tp2_multiplier", 3.0)) risk_distance = sl_multiplier * atr # 1R = SL距离 = sl_multiplier × ATR if direction == "LONG": sl = price - sl_multiplier * atr tp1 = price + tp1_multiplier * atr tp2 = price + tp2_multiplier * atr else: sl = price + sl_multiplier * atr tp1 = price - tp1_multiplier * atr tp2 = price - tp2_multiplier * atr # SL 合理性校验:实际距离必须在 risk_distance 的 80%~120% 范围内 actual_sl_dist = abs(sl - price) if actual_sl_dist < risk_distance * 0.8 or actual_sl_dist > risk_distance * 1.2: logger.error( f"[{symbol}] ⚠️ SL校验失败,拒绝开仓: direction={direction} price={price:.4f} " f"sl={sl:.4f} actual_dist={actual_sl_dist:.4f} expected={risk_distance:.4f} atr={atr:.4f}" ) return with get_sync_conn() as conn: with conn.cursor() as cur: cur.execute( "INSERT INTO paper_trades (symbol,direction,score,tier,entry_price,entry_ts,tp1_price,tp2_price,sl_price,atr_at_entry,score_factors,strategy,risk_distance,strategy_id,strategy_name_snapshot) " "VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", ( symbol, direction, score, tier, price, now_ms, tp1, tp2, sl, atr, _json3.dumps(factors) if factors else None, strategy, risk_distance, strategy_id, strategy_name_snapshot, ), ) conn.commit() logger.info( f"[{symbol}] 📝 模拟开仓: {direction} @ {price:.2f} score={score} tier={tier} strategy={strategy} " f"TP1={tp1:.2f} TP2={tp2:.2f} SL={sl:.2f}" ) def paper_has_active_position(symbol: str, strategy: Optional[str] = None) -> bool: """检查该币种是否有活跃持仓""" with get_sync_conn() as conn: with conn.cursor() as cur: if strategy: cur.execute( "SELECT COUNT(*) FROM paper_trades WHERE symbol=%s AND strategy=%s AND status IN ('active','tp1_hit')", (symbol, strategy), ) else: cur.execute("SELECT COUNT(*) FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit')", (symbol,)) return cur.fetchone()[0] > 0 def paper_get_active_direction(symbol: str, strategy: Optional[str] = None) -> str | None: """获取该币种活跃持仓的方向,无持仓返回None""" with get_sync_conn() as conn: with conn.cursor() as cur: if strategy: cur.execute( "SELECT direction FROM paper_trades WHERE symbol=%s AND strategy=%s AND status IN ('active','tp1_hit') LIMIT 1", (symbol, strategy), ) else: cur.execute( "SELECT direction FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit') LIMIT 1", (symbol,), ) row = cur.fetchone() return row[0] if row else None def paper_close_by_signal(symbol: str, current_price: float, now_ms: int, strategy: Optional[str] = None): """反向信号平仓:按当前价平掉该币种所有活跃仓位""" with get_sync_conn() as conn: with conn.cursor() as cur: if strategy: cur.execute( "SELECT id, direction, entry_price, tp1_hit, atr_at_entry, risk_distance " "FROM paper_trades WHERE symbol=%s AND strategy=%s AND status IN ('active','tp1_hit')", (symbol, strategy), ) else: cur.execute( "SELECT id, direction, entry_price, tp1_hit, atr_at_entry, risk_distance " "FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit')", (symbol,), ) positions = cur.fetchall() for pos in positions: pid, direction, entry_price, tp1_hit, atr_entry, rd_db = pos risk_distance = rd_db if rd_db and rd_db > 0 else abs(entry_price * 0.01) if direction == "LONG": pnl_r = (current_price - entry_price) / risk_distance if risk_distance > 0 else 0 else: pnl_r = (entry_price - current_price) / risk_distance if risk_distance > 0 else 0 # 扣手续费 fee_r = (2 * PAPER_FEE_RATE * entry_price) / risk_distance if risk_distance > 0 else 0 pnl_r -= fee_r cur.execute( "UPDATE paper_trades SET status='signal_flip', exit_price=%s, exit_ts=%s, pnl_r=%s WHERE id=%s", (current_price, now_ms, round(pnl_r, 4), pid) ) logger.info( f"[{symbol}] 📝 反向信号平仓: {direction} @ {current_price:.2f} pnl={pnl_r:+.2f}R" f"{f' strategy={strategy}' if strategy else ''}" ) conn.commit() def paper_active_count(strategy: Optional[str] = None) -> int: """当前活跃持仓总数(按策略独立计数)""" with get_sync_conn() as conn: with conn.cursor() as cur: if strategy: cur.execute("SELECT COUNT(*) FROM paper_trades WHERE strategy=%s AND status IN ('active','tp1_hit')", (strategy,)) else: cur.execute("SELECT COUNT(*) FROM paper_trades WHERE status IN ('active','tp1_hit')") return cur.fetchone()[0] # ─── 实时 WebSocket 数据(OBI + 期现背离)──────────────────────── _REALTIME_STATES: dict = {} # symbol -> SymbolState,在main()里注入 async def _ws_obi_stream(symbol: str, state): """订阅期货盘口深度流,实时更新 state.rt_obi""" stream = f"{symbol.lower()}@depth10@100ms" url = f"wss://fstream.binance.com/stream?streams={stream}" while True: try: async with websockets.connect(url, ping_interval=20, ping_timeout=10) as ws: logger.info(f"[RT-OBI] {symbol} WebSocket connected") async for raw in ws: data = json.loads(raw) book = data.get("data", data) bids = book.get("b", []) asks = book.get("a", []) bid_vol = sum(float(b[1]) for b in bids) ask_vol = sum(float(a[1]) for a in asks) total = bid_vol + ask_vol state.rt_obi = (bid_vol - ask_vol) / total if total > 0 else 0.0 except Exception as e: logger.warning(f"[RT-OBI] {symbol} 断线重连: {e}") await asyncio.sleep(3) async def _ws_spot_perp_stream(symbol: str, state): """ 订阅现货 bookTicker(最优买卖价)+ 期货 markPrice,计算期现背离 spot_mid = (best_bid + best_ask) / 2 divergence = (spot_mid - mark_price) / mark_price """ spot_stream = f"{symbol.lower()}@bookTicker" perp_stream = f"{symbol.lower()}@markPrice@1s" spot_url = f"wss://stream.binance.com:9443/stream?streams={spot_stream}" perp_url = f"wss://fstream.binance.com/stream?streams={perp_stream}" spot_mid = [0.0] # mutable container for closure mark_price = [0.0] async def read_spot(): while True: try: async with websockets.connect(spot_url, ping_interval=20, ping_timeout=10) as ws: logger.info(f"[RT-SPD] {symbol} spot WebSocket connected") async for raw in ws: d = json.loads(raw).get("data", json.loads(raw)) bid = float(d.get("b", 0) or 0) ask = float(d.get("a", 0) or 0) if bid > 0 and ask > 0: spot_mid[0] = (bid + ask) / 2 if mark_price[0] > 0: state.rt_spot_perp_div = (spot_mid[0] - mark_price[0]) / mark_price[0] except Exception as e: logger.warning(f"[RT-SPD] {symbol} spot 断线: {e}") await asyncio.sleep(3) async def read_perp(): while True: try: async with websockets.connect(perp_url, ping_interval=20, ping_timeout=10) as ws: logger.info(f"[RT-SPD] {symbol} perp WebSocket connected") async for raw in ws: d = json.loads(raw).get("data", json.loads(raw)) mp = float(d.get("p", 0) or d.get("markPrice", 0) or 0) if mp > 0: mark_price[0] = mp if spot_mid[0] > 0: state.rt_spot_perp_div = (spot_mid[0] - mark_price[0]) / mark_price[0] except Exception as e: logger.warning(f"[RT-SPD] {symbol} perp 断线: {e}") await asyncio.sleep(3) await asyncio.gather(read_spot(), read_perp()) async def _realtime_ws_runner(states: dict): """统一启动所有实时WebSocket协程,BTC + ALT (ETH/XRP/SOL)""" coros = [] for sym, state in states.items(): # OBI流:所有symbol都接(perp depth10) coros.append(_ws_obi_stream(sym, state)) # 期现背离流:只有有现货+合约的币种(BTC/ETH/XRP/SOL都有) coros.append(_ws_spot_perp_stream(sym, state)) if coros: await asyncio.gather(*coros) def start_realtime_ws(states: dict): """在独立线程里跑asyncio event loop,驱动实时WebSocket采集""" def _run(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(_realtime_ws_runner(states)) except Exception as e: logger.error(f"[RT-WS] event loop 异常: {e}") finally: loop.close() t = threading.Thread(target=_run, daemon=True, name="realtime-ws") t.start() logger.info("[RT-WS] 实时WebSocket后台线程已启动(BTC OBI + 期现背离)") # ─── 主循环 ────────────────────────────────────────────────────── def main(): init_schema() # V5.4: 优先从 DB 加载策略配置,失败 fallback 到 JSON strategy_configs = load_strategy_configs_from_db() if not strategy_configs: logger.warning("[DB] 未能从 DB 加载策略配置,使用 JSON fallback") strategy_configs = load_strategy_configs() strategy_names = [cfg.get("name", "unknown") for cfg in strategy_configs] logger.info(f"已加载策略配置: {', '.join(strategy_names)}") primary_strategy_name = "v52_8signals" if any(cfg.get("name") == "v52_8signals" for cfg in strategy_configs) else strategy_names[0] states = {sym: SymbolState(sym) for sym in SYMBOLS} for sym, state in states.items(): load_historical(state, WINDOW_MID) logger.info("=== Signal Engine (PG) 启动完成 ===") # 启动实时WebSocket后台线程(BTC OBI + 期现背离) start_realtime_ws(states) last_1m_save = {} cycle = 0 warmup_cycles = 3 # 启动后跳过前3轮(45秒),避免冷启动信号开仓 while True: try: now_ms = int(time.time() * 1000) # 每轮重新加载配置(支持API热更新开关) load_paper_config() for sym, state in states.items(): new_trades = fetch_new_trades(sym, state.last_processed_id) for t in new_trades: state.process_trade(t["agg_id"], t["time_ms"], t["price"], t["qty"], t["is_buyer_maker"]) state.market_indicators = fetch_market_indicators(sym) snapshot = state.build_evaluation_snapshot(now_ms) strategy_results: list[tuple[dict, dict]] = [] for strategy_cfg in strategy_configs: strategy_result = state.evaluate_signal(now_ms, strategy_cfg=strategy_cfg, snapshot=snapshot) strategy_results.append((strategy_cfg, strategy_result)) # 每个策略独立存储indicator for strategy_cfg, strategy_result in strategy_results: sname = strategy_cfg.get("name", "v51_baseline") sid = strategy_cfg.get("strategy_id") ssnap = strategy_cfg.get("strategy_name_snapshot") save_indicator(now_ms, sym, strategy_result, strategy=sname, strategy_id=sid, strategy_name_snapshot=ssnap) save_feature_event(now_ms, sym, strategy_result, strategy=sname) # 1m表仍用primary(图表用) primary_result = strategy_results[0][1] for strategy_cfg, strategy_result in strategy_results: if strategy_cfg.get("name") == primary_strategy_name: primary_result = strategy_result break bar_1m = (now_ms // 60000) * 60000 if last_1m_save.get(sym) != bar_1m: save_indicator_1m(now_ms, sym, primary_result) last_1m_save[sym] = bar_1m # 反向信号平仓:按策略独立判断,score>=75才触发 if warmup_cycles <= 0: for strategy_cfg, result in strategy_results: strategy_name = strategy_cfg.get("name", "v51_baseline") if not is_strategy_enabled(strategy_name): continue eval_dir = result.get("direction") existing_dir = paper_get_active_direction(sym, strategy_name) if existing_dir and eval_dir and existing_dir != eval_dir and result["score"] >= 75: paper_close_by_signal(sym, result["price"], now_ms, strategy_name) logger.info( f"[{sym}] 📝 反向信号平仓[{strategy_name}]: {existing_dir} → {eval_dir} " f"(score={result['score']})" ) for strategy_cfg, result in strategy_results: strategy_name = strategy_cfg.get("name", "v51_baseline") if result.get("signal"): logger.info( f"[{sym}] 🚨 信号[{strategy_name}]: {result['signal']} " f"score={result['score']} price={result['price']:.1f}" ) # 模拟盘开仓(需该策略启用 + 跳过冷启动) if is_strategy_enabled(strategy_name) and warmup_cycles <= 0: if not paper_has_active_position(sym, strategy_name): active_count = paper_active_count(strategy_name) if active_count < PAPER_MAX_POSITIONS: tier = result.get("tier", "standard") paper_open_trade( sym, result["signal"], result["price"], result["score"], tier, result["atr"], now_ms, factors=result.get("factors"), strategy=strategy_name, tp_sl=strategy_cfg.get("tp_sl"), strategy_id=strategy_cfg.get("strategy_id"), strategy_name_snapshot=strategy_cfg.get("strategy_name_snapshot"), ) # 模拟盘持仓检查由paper_monitor.py通过WebSocket实时处理,这里不再检查 cycle += 1 if warmup_cycles > 0: warmup_cycles -= 1 if warmup_cycles == 0: logger.info("冷启动保护期结束,模拟盘开仓已启用") # 每10轮(约2-3分钟)热加载配置,不需要重启 if cycle % 10 == 0: old_strategies = list(PAPER_ENABLED_STRATEGIES) load_paper_config() # V5.4: 热重载优先读 DB,失败 fallback 到 JSON new_configs = load_strategy_configs_from_db() if new_configs: strategy_configs = new_configs else: strategy_configs = load_strategy_configs() # A1: 热重载权重/阈值/TP/SL strategy_names = [cfg.get("name", "unknown") for cfg in strategy_configs] primary_strategy_name = "v52_8signals" if any(cfg.get("name") == "v52_8signals" for cfg in strategy_configs) else strategy_names[0] if list(PAPER_ENABLED_STRATEGIES) != old_strategies: logger.info(f"📋 配置热加载: enabled_strategies={PAPER_ENABLED_STRATEGIES}") for sym, state in states.items(): logger.info( f"[{sym}] 状态: CVD_fast={state.win_fast.cvd:.1f} " f"CVD_mid={state.win_mid.cvd:.1f} ATR={state.atr_calc.atr:.2f} " f"({state.atr_calc.atr_percentile:.0f}%) VWAP={state.win_vwap.vwap:.1f}" ) except Exception as e: logger.error(f"循环异常: {e}", exc_info=True) time.sleep(LOOP_INTERVAL) if __name__ == "__main__": main()