""" signal_engine.py — V5 短线交易信号引擎(PostgreSQL版) 架构: - 独立PM2进程,每5秒循环 - 内存滚动窗口计算指标(CVD/ATR/VWAP/大单阈值) - 启动时回灌历史数据(冷启动warmup) - 信号评估:核心3条件+加分3条件 - 输出:signal_indicators表 + signal_trades表 + Discord推送 指标: - CVD_fast (30m滚动) / CVD_mid (4h滚动) / CVD_day (UTC日内) - ATR (5m, 14周期) - VWAP_30m - 大单阈值 P95/P99 (24h滚动) """ import logging import os import time import threading import asyncio from datetime import datetime, timezone from typing import Optional import json import websockets from db import get_sync_conn, init_schema from signal_state import SymbolState as BaseSymbolState from strategy_scoring import evaluate_signal as score_strategy from strategy_loader import ( load_strategy_configs, load_strategy_configs_from_db, ) from paper_trading import ( paper_open_trade, paper_has_active_position, paper_get_active_direction, paper_close_by_signal, paper_active_count, ) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[ logging.StreamHandler(), logging.FileHandler(os.path.join(os.path.dirname(__file__), "..", "signal-engine.log")), ], ) logger = logging.getLogger("signal-engine") SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"] LOOP_INTERVAL = 15 # 秒(从5改15,CPU降60%,信号质量无影响) STRATEGY_DIR = os.path.join(os.path.dirname(__file__), "strategies") # ─── 模拟盘配置 ─────────────────────────────────────────────────── PAPER_TRADING_ENABLED = False # 总开关(兼容旧逻辑) PAPER_ENABLED_STRATEGIES = [] # 分策略开关: ["v51_baseline", "v52_8signals"] PAPER_INITIAL_BALANCE = 10000 # 虚拟初始资金 USDT PAPER_RISK_PER_TRADE = 0.02 # 单笔风险 2%(即200U) PAPER_MAX_POSITIONS = 4 # 每套策略最大同时持仓数 PAPER_TIER_MULTIPLIER = { # 档位仓位倍数 "light": 0.5, # 轻仓: 1% "standard": 1.0, # 标准: 2% "heavy": 1.5, # 加仓: 3% } PAPER_FEE_RATE = 0.0005 # Taker手续费 0.05%(开仓+平仓各一次) def load_paper_config(): """从配置文件加载模拟盘开关和参数""" global PAPER_TRADING_ENABLED, PAPER_ENABLED_STRATEGIES, PAPER_INITIAL_BALANCE, PAPER_RISK_PER_TRADE, PAPER_MAX_POSITIONS config_path = os.path.join(os.path.dirname(__file__), "paper_config.json") try: with open(config_path, "r") as f: import json as _json2 cfg = _json2.load(f) PAPER_TRADING_ENABLED = cfg.get("enabled", False) PAPER_ENABLED_STRATEGIES = cfg.get("enabled_strategies", []) PAPER_INITIAL_BALANCE = cfg.get("initial_balance", 10000) PAPER_RISK_PER_TRADE = cfg.get("risk_per_trade", 0.02) PAPER_MAX_POSITIONS = cfg.get("max_positions", 4) except FileNotFoundError: pass def is_strategy_enabled(strategy_name: str) -> bool: """检查某策略是否启用模拟盘""" if not PAPER_TRADING_ENABLED: return False # 如果enabled_strategies为空,走旧逻辑(全部启用) if not PAPER_ENABLED_STRATEGIES: return True return strategy_name in PAPER_ENABLED_STRATEGIES # ───────────────────────────────────────────────────────────────── # 窗口大小(毫秒) WINDOW_FAST = 30 * 60 * 1000 # 30分钟 WINDOW_MID = 4 * 3600 * 1000 # 4小时 WINDOW_DAY = 24 * 3600 * 1000 # 24小时 WINDOW_VWAP = 30 * 60 * 1000 # 30分钟 # ATR参数 ATR_PERIOD_MS = 5 * 60 * 1000 ATR_LENGTH = 14 def fetch_market_indicators(symbol: str) -> dict: """从PG读取最新的market_indicators数据,解析JSONB提取关键数值""" import json as _json with get_sync_conn() as conn: with conn.cursor() as cur: indicators = {} ind_types = [ "long_short_ratio", "top_trader_position", "open_interest_hist", "coinbase_premium", "funding_rate", "obi_depth_10", "spot_perp_divergence", "tiered_cvd_whale", # Phase 2 ] for ind_type in ind_types: cur.execute( "SELECT value FROM market_indicators WHERE symbol=%s AND indicator_type=%s ORDER BY timestamp_ms DESC LIMIT 1", (symbol, ind_type), ) row = cur.fetchone() if not row or row[0] is None: indicators[ind_type] = None continue val = row[0] if isinstance(val, str): try: val = _json.loads(val) except Exception: indicators[ind_type] = None continue # 提取关键数值 if ind_type == "long_short_ratio": indicators[ind_type] = float(val.get("longShortRatio", 1.0)) elif ind_type == "top_trader_position": indicators[ind_type] = float(val.get("longAccount", 0.5)) elif ind_type == "open_interest_hist": indicators[ind_type] = float(val.get("sumOpenInterestValue", 0)) elif ind_type == "coinbase_premium": indicators[ind_type] = float(val.get("premium_pct", 0)) / 100.0 elif ind_type == "funding_rate": indicators[ind_type] = float(val.get("fundingRate", val.get("lastFundingRate", 0))) elif ind_type == "obi_depth_10": # obi范围[-1,1],正值=买压,负值=卖压 indicators[ind_type] = float(val.get("obi", 0)) elif ind_type == "spot_perp_divergence": # divergence = (spot - mark) / mark indicators[ind_type] = float(val.get("divergence", 0)) elif ind_type == "tiered_cvd_whale": # 巨鲸净CVD比率[-1,1],正值=净买入 indicators[ind_type] = float(val.get("whale_cvd_ratio", 0)) return indicators class SymbolState(BaseSymbolState): """ 兼容旧接口的 SymbolState 包装: - 旧代码只传入 symbol,这里补全窗口与 ATR 配置。 - 评分逻辑统一在 strategy_scoring.evaluate_signal 中实现。 """ def __init__(self, symbol: str): super().__init__( symbol, WINDOW_FAST, WINDOW_MID, WINDOW_DAY, WINDOW_VWAP, ATR_PERIOD_MS, ATR_LENGTH, fetch_market_indicators, ) # 评分相关方法已抽离到 backend/strategy_scoring.py, # 这里保留一个纯状态容器封装,不再实现打分逻辑。 # ─── PG DB操作 ─────────────────────────────────────────────────── # ─── PG DB操作 ─────────────────────────────────────────────────── def load_historical(state: SymbolState, window_ms: int): now_ms = int(time.time() * 1000) start_ms = now_ms - window_ms count = 0 with get_sync_conn() as conn: with conn.cursor() as cur: cur.execute( "SELECT agg_id, price, qty, time_ms, is_buyer_maker FROM agg_trades " "WHERE symbol = %s AND time_ms >= %s ORDER BY agg_id ASC", (state.symbol, start_ms) ) while True: rows = cur.fetchmany(5000) if not rows: break for r in rows: state.process_trade(r[0], r[3], r[1], r[2], r[4]) count += 1 logger.info(f"[{state.symbol}] 冷启动完成: 加载{count:,}条历史数据 (窗口={window_ms//3600000}h)") state.warmup = False def fetch_new_trades(symbol: str, last_id: int) -> list: with get_sync_conn() as conn: with conn.cursor() as cur: cur.execute( "SELECT agg_id, price, qty, time_ms, is_buyer_maker FROM agg_trades " "WHERE symbol = %s AND agg_id > %s ORDER BY agg_id ASC LIMIT 10000", (symbol, last_id) ) return [{"agg_id": r[0], "price": r[1], "qty": r[2], "time_ms": r[3], "is_buyer_maker": r[4]} for r in cur.fetchall()] def save_indicator(ts: int, symbol: str, result: dict, strategy: str = "v52_8signals", strategy_id: Optional[str] = None, strategy_name_snapshot: Optional[str] = None): with get_sync_conn() as conn: with conn.cursor() as cur: import json as _json3 factors_json = _json3.dumps(result.get("factors")) if result.get("factors") else None cvd_fast_5m = result.get("cvd_fast_5m") # v53_fast 专用:5m窗口CVD,其他策略为None cur.execute( "INSERT INTO signal_indicators " "(ts,symbol,strategy,cvd_fast,cvd_mid,cvd_day,cvd_fast_slope,atr_5m,atr_percentile,atr_value,vwap_30m,price,p95_qty,p99_qty,score,signal,factors,cvd_fast_5m,strategy_id,strategy_name_snapshot) " "VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", (ts, symbol, strategy, result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["cvd_fast_slope"], result["atr"], result["atr_pct"], result.get("atr_value", result["atr"]), result["vwap"], result["price"], result["p95"], result["p99"], result["score"], result.get("signal"), factors_json, cvd_fast_5m, strategy_id, strategy_name_snapshot) ) # 有信号时通知live_executor if result.get("signal"): cur.execute("NOTIFY new_signal, %s", (f"{symbol}:{strategy}:{result['signal']}:{result['score']}",)) conn.commit() def save_feature_event(ts: int, symbol: str, result: dict, strategy: str): """ V5.3 专用:每次评分后把 raw features + score 层写入 signal_feature_events。 只对 v53_alt / v53_btc 调用,其他策略跳过。 """ if not strategy.startswith("v53"): return f = result.get("factors") or {} track = f.get("track", "ALT") side = result.get("direction") or ("LONG" if result.get("score", 0) >= 0 else "SHORT") score_direction = (f.get("direction") or {}).get("score", 0) if track == "ALT" else (f.get("direction") or {}).get("score", 0) score_crowding = (f.get("crowding") or {}).get("score", 0) score_env = (f.get("environment") or {}).get("score", 0) score_aux = (f.get("auxiliary") or {}).get("score", 0) gate_passed = f.get("gate_passed", True) block_reason = f.get("gate_block") or f.get("block_reason") with get_sync_conn() as conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO signal_feature_events (ts, symbol, track, side, strategy, strategy_version, cvd_fast_raw, cvd_mid_raw, cvd_day_raw, cvd_fast_slope_raw, p95_qty_raw, p99_qty_raw, atr_value, atr_percentile, oi_delta_raw, ls_ratio_raw, top_pos_raw, coinbase_premium_raw, obi_raw, tiered_cvd_whale_raw, score_direction, score_crowding, score_environment, score_aux, score_total, gate_passed, block_reason, price) VALUES (%s,%s,%s,%s,%s,%s, %s,%s,%s,%s, %s,%s, %s,%s, %s,%s,%s,%s, %s,%s, %s,%s,%s,%s,%s, %s,%s,%s) """, ( ts, symbol, track, side, strategy, "v5.3", result.get("cvd_fast"), result.get("cvd_mid"), result.get("cvd_day"), result.get("cvd_fast_slope"), result.get("p95"), result.get("p99"), result.get("atr_value", result.get("atr")), result.get("atr_pct"), result.get("oi_delta"), result.get("ls_ratio"), result.get("top_trader_position"), (f.get("auxiliary") or {}).get("coinbase_premium"), f.get("obi_raw"), f.get("whale_cvd_ratio"), score_direction, score_crowding, score_env, score_aux, result.get("score", 0), gate_passed, block_reason, result.get("price"), ) ) conn.commit() def save_indicator_1m(ts: int, symbol: str, result: dict): bar_ts = (ts // 60000) * 60000 with get_sync_conn() as conn: with conn.cursor() as cur: cur.execute("SELECT id FROM signal_indicators_1m WHERE ts=%s AND symbol=%s", (bar_ts, symbol)) if cur.fetchone(): cur.execute( "UPDATE signal_indicators_1m SET cvd_fast=%s,cvd_mid=%s,cvd_day=%s,atr_5m=%s,vwap_30m=%s,price=%s,score=%s,signal=%s WHERE ts=%s AND symbol=%s", (result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["atr"], result["vwap"], result["price"], result["score"], result.get("signal"), bar_ts, symbol) ) else: cur.execute( "INSERT INTO signal_indicators_1m (ts,symbol,cvd_fast,cvd_mid,cvd_day,atr_5m,vwap_30m,price,score,signal) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", (bar_ts, symbol, result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["atr"], result["vwap"], result["price"], result["score"], result.get("signal")) ) conn.commit() # ─── 模拟盘 ────────────────────────────────────────────────────── def strategy_applies_to_symbol(strategy_cfg: dict, symbol: str) -> bool: """ 判断策略是否适用于当前 symbol。 - V5.4 单币策略:strategy_cfg.symbol 必须匹配; - 兼容旧版多币策略:若配置了 symbols,则需在列表内。 """ strategy_symbol = strategy_cfg.get("symbol") if strategy_symbol and strategy_symbol != symbol: return False allowed_symbols = strategy_cfg.get("symbols", []) if allowed_symbols and symbol not in allowed_symbols: return False return True # ─── 实时 WebSocket 数据(OBI + 期现背离)──────────────────────── _REALTIME_STATES: dict = {} # symbol -> SymbolState,在main()里注入 async def _ws_obi_stream(symbol: str, state): """订阅期货盘口深度流,实时更新 state.rt_obi""" stream = f"{symbol.lower()}@depth10@100ms" url = f"wss://fstream.binance.com/stream?streams={stream}" while True: try: async with websockets.connect(url, ping_interval=20, ping_timeout=10) as ws: logger.info(f"[RT-OBI] {symbol} WebSocket connected") async for raw in ws: data = json.loads(raw) book = data.get("data", data) bids = book.get("b", []) asks = book.get("a", []) bid_vol = sum(float(b[1]) for b in bids) ask_vol = sum(float(a[1]) for a in asks) total = bid_vol + ask_vol state.rt_obi = (bid_vol - ask_vol) / total if total > 0 else 0.0 except Exception as e: logger.warning(f"[RT-OBI] {symbol} 断线重连: {e}") await asyncio.sleep(3) async def _ws_spot_perp_stream(symbol: str, state): """ 订阅现货 bookTicker(最优买卖价)+ 期货 markPrice,计算期现背离 spot_mid = (best_bid + best_ask) / 2 divergence = (spot_mid - mark_price) / mark_price """ spot_stream = f"{symbol.lower()}@bookTicker" perp_stream = f"{symbol.lower()}@markPrice@1s" spot_url = f"wss://stream.binance.com:9443/stream?streams={spot_stream}" perp_url = f"wss://fstream.binance.com/stream?streams={perp_stream}" spot_mid = [0.0] # mutable container for closure mark_price = [0.0] async def read_spot(): while True: try: async with websockets.connect(spot_url, ping_interval=20, ping_timeout=10) as ws: logger.info(f"[RT-SPD] {symbol} spot WebSocket connected") async for raw in ws: d = json.loads(raw).get("data", json.loads(raw)) bid = float(d.get("b", 0) or 0) ask = float(d.get("a", 0) or 0) if bid > 0 and ask > 0: spot_mid[0] = (bid + ask) / 2 if mark_price[0] > 0: state.rt_spot_perp_div = (spot_mid[0] - mark_price[0]) / mark_price[0] except Exception as e: logger.warning(f"[RT-SPD] {symbol} spot 断线: {e}") await asyncio.sleep(3) async def read_perp(): while True: try: async with websockets.connect(perp_url, ping_interval=20, ping_timeout=10) as ws: logger.info(f"[RT-SPD] {symbol} perp WebSocket connected") async for raw in ws: d = json.loads(raw).get("data", json.loads(raw)) mp = float(d.get("p", 0) or d.get("markPrice", 0) or 0) if mp > 0: mark_price[0] = mp if spot_mid[0] > 0: state.rt_spot_perp_div = (spot_mid[0] - mark_price[0]) / mark_price[0] except Exception as e: logger.warning(f"[RT-SPD] {symbol} perp 断线: {e}") await asyncio.sleep(3) await asyncio.gather(read_spot(), read_perp()) async def _realtime_ws_runner(states: dict): """统一启动所有实时WebSocket协程,BTC + ALT (ETH/XRP/SOL)""" coros = [] for sym, state in states.items(): # OBI流:所有symbol都接(perp depth10) coros.append(_ws_obi_stream(sym, state)) # 期现背离流:只有有现货+合约的币种(BTC/ETH/XRP/SOL都有) coros.append(_ws_spot_perp_stream(sym, state)) if coros: await asyncio.gather(*coros) def start_realtime_ws(states: dict): """在独立线程里跑asyncio event loop,驱动实时WebSocket采集""" def _run(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(_realtime_ws_runner(states)) except Exception as e: logger.error(f"[RT-WS] event loop 异常: {e}") finally: loop.close() t = threading.Thread(target=_run, daemon=True, name="realtime-ws") t.start() logger.info("[RT-WS] 实时WebSocket后台线程已启动(BTC OBI + 期现背离)") # ─── 主循环 ────────────────────────────────────────────────────── def main(): init_schema() # V5.4: 优先从 DB 加载策略配置,失败 fallback 到 JSON strategy_configs = load_strategy_configs_from_db() if not strategy_configs: logger.warning("[DB] 未能从 DB 加载策略配置,使用 JSON fallback") strategy_configs = load_strategy_configs() strategy_names = [cfg.get("name", "unknown") for cfg in strategy_configs] logger.info(f"已加载策略配置: {', '.join(strategy_names)}") # 选取 primary 策略用于 1m 指标表:优先 v53 系列,其次第一个配置 primary_candidates = ["v53", "v53_middle", "v53_fast"] primary_strategy_name = None for cand in primary_candidates: if any(cfg.get("name") == cand for cfg in strategy_configs): primary_strategy_name = cand break if not primary_strategy_name: primary_strategy_name = strategy_names[0] if strategy_names else "v53" states = {sym: SymbolState(sym) for sym in SYMBOLS} for sym, state in states.items(): load_historical(state, WINDOW_MID) logger.info("=== Signal Engine (PG) 启动完成 ===") # 启动实时WebSocket后台线程(BTC OBI + 期现背离) start_realtime_ws(states) last_1m_save = {} cycle = 0 warmup_cycles = 3 # 启动后跳过前3轮(45秒),避免冷启动信号开仓 while True: try: now_ms = int(time.time() * 1000) # 每轮重新加载配置(支持API热更新开关) load_paper_config() for sym, state in states.items(): new_trades = fetch_new_trades(sym, state.last_processed_id) for t in new_trades: state.process_trade(t["agg_id"], t["time_ms"], t["price"], t["qty"], t["is_buyer_maker"]) state.market_indicators = fetch_market_indicators(sym) snapshot = state.build_evaluation_snapshot(now_ms) strategy_results: list[tuple[dict, dict]] = [] for strategy_cfg in strategy_configs: if not strategy_applies_to_symbol(strategy_cfg, sym): continue strategy_result = score_strategy( state, now_ms, strategy_cfg=strategy_cfg, snapshot=snapshot ) strategy_results.append((strategy_cfg, strategy_result)) if not strategy_results: logger.warning(f"[{sym}] 当前无可用策略配置,跳过本轮") continue # 每个策略独立存储indicator for strategy_cfg, strategy_result in strategy_results: sname = strategy_cfg.get("name") or "v53" sid = strategy_cfg.get("strategy_id") ssnap = strategy_cfg.get("strategy_name_snapshot") save_indicator(now_ms, sym, strategy_result, strategy=sname, strategy_id=sid, strategy_name_snapshot=ssnap) save_feature_event(now_ms, sym, strategy_result, strategy=sname) # 1m表仍用primary(图表用) primary_result = strategy_results[0][1] for strategy_cfg, strategy_result in strategy_results: if strategy_cfg.get("name") == primary_strategy_name: primary_result = strategy_result break bar_1m = (now_ms // 60000) * 60000 if last_1m_save.get(sym) != bar_1m: save_indicator_1m(now_ms, sym, primary_result) last_1m_save[sym] = bar_1m # 反向信号平仓:按策略独立判断,score>=每策略配置的 flip_threshold 才触发 if warmup_cycles <= 0: for strategy_cfg, result in strategy_results: strategy_name = strategy_cfg.get("name") or "v53" if not is_strategy_enabled(strategy_name): continue # V5.4: custom策略只处理自己配置的symbol # per-strategy 方向约束:只接受与策略方向一致的反向信号 dir_cfg_raw = (strategy_cfg.get("direction") or "both").lower() if dir_cfg_raw not in ("long", "short", "both"): dir_cfg_raw = "both" eval_dir = result.get("direction") existing_dir = paper_get_active_direction(sym, strategy_name) # 反向平仓分数门槛:优先使用每个策略自己的 flip_threshold,fallback 到 entry_threshold/75 flip_thr = int(strategy_cfg.get("flip_threshold", strategy_cfg.get("threshold", 75))) # 如果策略配置为只多/只空,则忽略与配置相反方向的评估结果 if dir_cfg_raw == "long" and eval_dir == "SHORT": continue if dir_cfg_raw == "short" and eval_dir == "LONG": continue if existing_dir and eval_dir and existing_dir != eval_dir and result["score"] >= flip_thr: paper_close_by_signal(sym, result["price"], now_ms, strategy_name) logger.info( f"[{sym}] 📝 反向信号平仓[{strategy_name}]: {existing_dir} → {eval_dir} " f"(score={result['score']}, flip_thr={flip_thr})" ) for strategy_cfg, result in strategy_results: strategy_name = strategy_cfg.get("name") or "v53" if result.get("signal"): logger.info( f"[{sym}] 🚨 信号[{strategy_name}]: {result['signal']} " f"score={result['score']} price={result['price']:.1f}" ) # 模拟盘开仓(需该策略启用 + 跳过冷启动) if is_strategy_enabled(strategy_name) and warmup_cycles <= 0: if not paper_has_active_position(sym, strategy_name): active_count = paper_active_count(strategy_name) if active_count < PAPER_MAX_POSITIONS: tier = result.get("tier", "standard") paper_open_trade( sym, result["signal"], result["price"], result["score"], tier, result["atr"], now_ms, factors=result.get("factors"), strategy=strategy_name, tp_sl=strategy_cfg.get("tp_sl"), strategy_id=strategy_cfg.get("strategy_id"), strategy_name_snapshot=strategy_cfg.get("strategy_name_snapshot"), ) # 模拟盘持仓检查由paper_monitor.py通过WebSocket实时处理,这里不再检查 cycle += 1 if warmup_cycles > 0: warmup_cycles -= 1 if warmup_cycles == 0: logger.info("冷启动保护期结束,模拟盘开仓已启用") # 每10轮(约2-3分钟)热加载配置,不需要重启 if cycle % 10 == 0: old_strategies = list(PAPER_ENABLED_STRATEGIES) load_paper_config() # V5.4: 热重载优先读 DB,失败 fallback 到 JSON new_configs = load_strategy_configs_from_db() if new_configs: strategy_configs = new_configs else: strategy_configs = load_strategy_configs() # A1: 热重载权重/阈值/TP/SL strategy_names = [cfg.get("name", "unknown") for cfg in strategy_configs] primary_candidates = ["v53", "v53_middle", "v53_fast"] primary_strategy_name = None for cand in primary_candidates: if any(cfg.get("name") == cand for cfg in strategy_configs): primary_strategy_name = cand break if not primary_strategy_name: primary_strategy_name = strategy_names[0] if strategy_names else "v53" if list(PAPER_ENABLED_STRATEGIES) != old_strategies: logger.info(f"📋 配置热加载: enabled_strategies={PAPER_ENABLED_STRATEGIES}") for sym, state in states.items(): logger.info( f"[{sym}] 状态: CVD_fast={state.win_fast.cvd:.1f} " f"CVD_mid={state.win_mid.cvd:.1f} ATR={state.atr_calc.atr:.2f} " f"({state.atr_calc.atr_percentile:.0f}%) VWAP={state.win_vwap.vwap:.1f}" ) except Exception as e: logger.error(f"循环异常: {e}", exc_info=True) time.sleep(LOOP_INTERVAL) if __name__ == "__main__": main()