arbitrage-engine/backend/signal_engine.py

1608 lines
73 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
signal_engine.py — V5 短线交易信号引擎PostgreSQL版
架构:
- 独立PM2进程每5秒循环
- 内存滚动窗口计算指标CVD/ATR/VWAP/大单阈值)
- 启动时回灌历史数据冷启动warmup
- 信号评估核心3条件+加分3条件
- 输出signal_indicators表 + signal_trades表 + Discord推送
指标:
- CVD_fast (30m滚动) / CVD_mid (4h滚动) / CVD_day (UTC日内)
- ATR (5m, 14周期)
- VWAP_30m
- 大单阈值 P95/P99 (24h滚动)
"""
import logging
import os
import time
import json
import threading
import asyncio
from collections import deque
from datetime import datetime, timezone
from typing import Any, Optional
import websockets
from db import get_sync_conn, init_schema
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler(os.path.join(os.path.dirname(__file__), "..", "signal-engine.log")),
],
)
logger = logging.getLogger("signal-engine")
SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"]
LOOP_INTERVAL = 15 # 秒从5改15CPU降60%,信号质量无影响)
STRATEGY_DIR = os.path.join(os.path.dirname(__file__), "strategies")
DEFAULT_STRATEGY_FILES = ["v51_baseline.json", "v52_8signals.json", "v53.json", "v53_fast.json", "v53_middle.json"]
def load_strategy_configs() -> list[dict]:
configs = []
for filename in DEFAULT_STRATEGY_FILES:
path = os.path.join(STRATEGY_DIR, filename)
try:
with open(path, "r", encoding="utf-8") as f:
cfg = json.load(f)
if isinstance(cfg, dict) and cfg.get("name"):
configs.append(cfg)
except FileNotFoundError:
logger.warning(f"策略配置缺失: {path}")
except Exception as e:
logger.error(f"策略配置加载失败 {path}: {e}")
if not configs:
logger.warning("未加载到策略配置回退到v51_baseline默认配置")
configs.append(
{
"name": "v51_baseline",
"threshold": 75,
"signals": ["cvd", "p99", "accel", "ls_ratio", "oi", "coinbase_premium"],
"tp_sl": {"sl_multiplier": 2.0, "tp1_multiplier": 1.5, "tp2_multiplier": 3.0},
}
)
return configs
def load_strategy_configs_from_db() -> list[dict]:
"""
V5.4: 从 strategies 表读取 running 状态的策略配置。
把 DB 字段映射成现有 JSON 格式(保持与 JSON 文件完全兼容)。
失败时返回空列表,调用方应 fallback 到 JSON。
内存安全:每次读取只返回配置列表,无缓存,无大对象。
"""
try:
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT
strategy_id::text, display_name, symbol,
cvd_fast_window, cvd_slow_window,
weight_direction, weight_env, weight_aux, weight_momentum,
entry_score,
gate_obi_enabled, obi_threshold,
gate_whale_enabled, whale_cvd_threshold,
gate_vol_enabled, atr_percentile_min,
gate_spot_perp_enabled, spot_perp_threshold,
sl_atr_multiplier, tp1_ratio, tp2_ratio,
timeout_minutes, flip_threshold, direction
FROM strategies
WHERE status = 'running'
ORDER BY created_at ASC
""")
rows = cur.fetchall()
configs = []
for row in rows:
(sid, display_name, symbol,
cvd_fast, cvd_slow,
w_dir, w_env, w_aux, w_mom,
entry_score,
gate_obi, obi_thr,
gate_whale, whale_thr,
gate_vol, atr_pct_min,
gate_spot, spot_thr,
sl_mult, tp1_r, tp2_r,
timeout_min, flip_thr, direction) = row
# 把 display_name 映射回 legacy strategy name用于兼容评分逻辑
# legacy 策略用固定 UUID 识别
LEGACY_UUID_MAP = {
"00000000-0000-0000-0000-000000000053": "v53",
"00000000-0000-0000-0000-000000000054": "v53_middle",
"00000000-0000-0000-0000-000000000055": "v53_fast",
}
strategy_name = LEGACY_UUID_MAP.get(sid, f"custom_{sid[:8]}")
# 构造与 JSON 文件格式兼容的配置 dict
cfg = {
"name": strategy_name,
"strategy_id": sid, # V5.4 新增:用于写 strategy_id 到 DB
"strategy_name_snapshot": display_name, # V5.4 新增:写入时快照名称
"symbol": symbol,
"direction": direction,
"cvd_fast_window": cvd_fast,
"cvd_slow_window": cvd_slow,
"threshold": entry_score,
"weights": {
"direction": w_dir,
"env": w_env,
"aux": w_aux,
"momentum": w_mom,
},
"gates": {
"obi": {"enabled": gate_obi, "threshold": obi_thr},
"whale": {"enabled": gate_whale, "threshold": whale_thr},
"vol": {"enabled": gate_vol, "atr_percentile_min": atr_pct_min},
"spot_perp": {"enabled": gate_spot, "threshold": spot_thr},
},
"tp_sl": {
"sl_multiplier": sl_mult,
"tp1_multiplier": tp1_r,
"tp2_multiplier": tp2_r,
},
"timeout_minutes": timeout_min,
"flip_threshold": flip_thr,
}
configs.append(cfg)
logger.info(f"[DB] 已加载 {len(configs)} 个策略配置: {[c['name'] for c in configs]}")
return configs
except Exception as e:
logger.warning(f"[DB] load_strategy_configs_from_db 失败,将 fallback 到 JSON: {e}")
return []
# ─── 模拟盘配置 ───────────────────────────────────────────────────
PAPER_TRADING_ENABLED = False # 总开关(兼容旧逻辑)
PAPER_ENABLED_STRATEGIES = [] # 分策略开关: ["v51_baseline", "v52_8signals"]
PAPER_INITIAL_BALANCE = 10000 # 虚拟初始资金 USDT
PAPER_RISK_PER_TRADE = 0.02 # 单笔风险 2%即200U
PAPER_MAX_POSITIONS = 4 # 每套策略最大同时持仓数
PAPER_TIER_MULTIPLIER = { # 档位仓位倍数
"light": 0.5, # 轻仓: 1%
"standard": 1.0, # 标准: 2%
"heavy": 1.5, # 加仓: 3%
}
PAPER_FEE_RATE = 0.0005 # Taker手续费 0.05%(开仓+平仓各一次)
def load_paper_config():
"""从配置文件加载模拟盘开关和参数"""
global PAPER_TRADING_ENABLED, PAPER_ENABLED_STRATEGIES, PAPER_INITIAL_BALANCE, PAPER_RISK_PER_TRADE, PAPER_MAX_POSITIONS
config_path = os.path.join(os.path.dirname(__file__), "paper_config.json")
try:
with open(config_path, "r") as f:
import json as _json2
cfg = _json2.load(f)
PAPER_TRADING_ENABLED = cfg.get("enabled", False)
PAPER_ENABLED_STRATEGIES = cfg.get("enabled_strategies", [])
PAPER_INITIAL_BALANCE = cfg.get("initial_balance", 10000)
PAPER_RISK_PER_TRADE = cfg.get("risk_per_trade", 0.02)
PAPER_MAX_POSITIONS = cfg.get("max_positions", 4)
except FileNotFoundError:
pass
def is_strategy_enabled(strategy_name: str) -> bool:
"""检查某策略是否启用模拟盘"""
if not PAPER_TRADING_ENABLED:
return False
# 如果enabled_strategies为空走旧逻辑全部启用
if not PAPER_ENABLED_STRATEGIES:
return True
return strategy_name in PAPER_ENABLED_STRATEGIES
# ─────────────────────────────────────────────────────────────────
# 窗口大小(毫秒)
WINDOW_FAST = 30 * 60 * 1000 # 30分钟
WINDOW_MID = 4 * 3600 * 1000 # 4小时
WINDOW_DAY = 24 * 3600 * 1000 # 24小时
WINDOW_VWAP = 30 * 60 * 1000 # 30分钟
# ATR参数
ATR_PERIOD_MS = 5 * 60 * 1000
ATR_LENGTH = 14
# 信号冷却
COOLDOWN_MS = 10 * 60 * 1000
def fetch_market_indicators(symbol: str) -> dict:
"""从PG读取最新的market_indicators数据解析JSONB提取关键数值"""
import json as _json
with get_sync_conn() as conn:
with conn.cursor() as cur:
indicators = {}
ind_types = [
"long_short_ratio", "top_trader_position", "open_interest_hist",
"coinbase_premium", "funding_rate",
"obi_depth_10", "spot_perp_divergence", "tiered_cvd_whale", # Phase 2
]
for ind_type in ind_types:
cur.execute(
"SELECT value FROM market_indicators WHERE symbol=%s AND indicator_type=%s ORDER BY timestamp_ms DESC LIMIT 1",
(symbol, ind_type),
)
row = cur.fetchone()
if not row or row[0] is None:
indicators[ind_type] = None
continue
val = row[0]
if isinstance(val, str):
try:
val = _json.loads(val)
except Exception:
indicators[ind_type] = None
continue
# 提取关键数值
if ind_type == "long_short_ratio":
indicators[ind_type] = float(val.get("longShortRatio", 1.0))
elif ind_type == "top_trader_position":
indicators[ind_type] = float(val.get("longAccount", 0.5))
elif ind_type == "open_interest_hist":
indicators[ind_type] = float(val.get("sumOpenInterestValue", 0))
elif ind_type == "coinbase_premium":
indicators[ind_type] = float(val.get("premium_pct", 0)) / 100.0
elif ind_type == "funding_rate":
indicators[ind_type] = float(val.get("fundingRate", val.get("lastFundingRate", 0)))
elif ind_type == "obi_depth_10":
# obi范围[-1,1],正值=买压,负值=卖压
indicators[ind_type] = float(val.get("obi", 0))
elif ind_type == "spot_perp_divergence":
# divergence = (spot - mark) / mark
indicators[ind_type] = float(val.get("divergence", 0))
elif ind_type == "tiered_cvd_whale":
# 巨鲸净CVD比率[-1,1],正值=净买入
indicators[ind_type] = float(val.get("whale_cvd_ratio", 0))
return indicators
def to_float(value: Any) -> Optional[float]:
try:
return float(value) if value is not None else None
except (TypeError, ValueError):
return None
# ─── 滚动窗口 ───────────────────────────────────────────────────
class TradeWindow:
def __init__(self, window_ms: int):
self.window_ms = window_ms
self.trades: deque = deque()
self.buy_vol = 0.0
self.sell_vol = 0.0
self.pq_sum = 0.0
self.q_sum = 0.0
def add(self, time_ms: int, qty: float, price: float, is_buyer_maker: int):
self.trades.append((time_ms, qty, price, is_buyer_maker))
pq = price * qty
self.pq_sum += pq
self.q_sum += qty
if is_buyer_maker == 0:
self.buy_vol += qty
else:
self.sell_vol += qty
def trim(self, now_ms: int):
cutoff = now_ms - self.window_ms
while self.trades and self.trades[0][0] < cutoff:
t_ms, qty, price, ibm = self.trades.popleft()
self.pq_sum -= price * qty
self.q_sum -= qty
if ibm == 0:
self.buy_vol -= qty
else:
self.sell_vol -= qty
@property
def cvd(self) -> float:
return self.buy_vol - self.sell_vol
@property
def vwap(self) -> float:
return self.pq_sum / self.q_sum if self.q_sum > 0 else 0.0
class ATRCalculator:
def __init__(self, period_ms: int = ATR_PERIOD_MS, length: int = ATR_LENGTH):
self.period_ms = period_ms
self.length = length
self.candles: deque = deque(maxlen=length + 1)
self.current_candle: Optional[dict] = None
self.atr_history: deque = deque(maxlen=288)
def update(self, time_ms: int, price: float):
bar_ms = (time_ms // self.period_ms) * self.period_ms
if self.current_candle is None or self.current_candle["bar"] != bar_ms:
if self.current_candle is not None:
self.candles.append(self.current_candle)
self.current_candle = {"bar": bar_ms, "open": price, "high": price, "low": price, "close": price}
else:
c = self.current_candle
c["high"] = max(c["high"], price)
c["low"] = min(c["low"], price)
c["close"] = price
@property
def atr(self) -> float:
if len(self.candles) < 2:
return 0.0
trs = []
candles_list = list(self.candles)
for i in range(1, len(candles_list)):
prev_close = candles_list[i-1]["close"]
c = candles_list[i]
tr = max(c["high"] - c["low"], abs(c["high"] - prev_close), abs(c["low"] - prev_close))
trs.append(tr)
if not trs:
return 0.0
atr_val = trs[0]
for tr in trs[1:]:
atr_val = (atr_val * (self.length - 1) + tr) / self.length
return atr_val
@property
def atr_percentile(self) -> float:
current = self.atr
if current == 0:
return 50.0
self.atr_history.append(current)
if len(self.atr_history) < 10:
return 50.0
sorted_hist = sorted(self.atr_history)
rank = sum(1 for x in sorted_hist if x <= current)
return (rank / len(sorted_hist)) * 100
# ─── FR历史最大值缓存每小时更新───────────────────────────────
_max_fr_cache: dict = {} # {symbol: max_abs_fr}
_max_fr_updated: float = 0
def get_max_fr(symbol: str) -> float:
"""获取该币种历史最大|FR|,每小时刷新一次"""
global _max_fr_cache, _max_fr_updated
now = time.time()
if now - _max_fr_updated > 3600 or symbol not in _max_fr_cache:
try:
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT symbol, MAX(ABS((value->>'fundingRate')::float)) as max_fr "
"FROM market_indicators WHERE indicator_type='funding_rate' "
"GROUP BY symbol"
)
for row in cur.fetchall():
_max_fr_cache[row[0]] = row[1] if row[1] else 0.0001
_max_fr_updated = now
except Exception as e:
logger.warning(f"get_max_fr error: {e}")
return _max_fr_cache.get(symbol, 0.0001) # 默认0.01%防除零
class SymbolState:
def __init__(self, symbol: str):
self.symbol = symbol
self.win_fast = TradeWindow(WINDOW_FAST)
self.win_mid = TradeWindow(WINDOW_MID)
self.win_day = TradeWindow(WINDOW_DAY)
self.win_vwap = TradeWindow(WINDOW_VWAP)
self.atr_calc = ATRCalculator()
self.last_processed_id = 0
self.last_trade_price = 0.0
self.warmup = True
self.prev_cvd_fast = 0.0
self.prev_cvd_fast_slope = 0.0
self.prev_oi_value = 0.0
self.market_indicators = fetch_market_indicators(symbol)
self.last_signal_ts: dict[str, int] = {}
self.last_signal_dir: dict[str, str] = {}
self.recent_large_trades: deque = deque()
# ── Phase 2 实时内存字段由后台WebSocket协程更新──────────
self.rt_obi: float = 0.0 # 订单簿失衡[-1,1]实时WebSocket所有symbol
self.rt_spot_perp_div: float = 0.0 # 期现背离spot-mark)/mark实时WebSocket所有symbol
# tiered_cvd_whale按成交额分档实时累计最近15分钟窗口
self._whale_trades: deque = deque() # (time_ms, usd_val, is_sell)
self.WHALE_WINDOW_MS: int = 15 * 60 * 1000 # 15分钟
def process_trade(self, agg_id: int, time_ms: int, price: float, qty: float, is_buyer_maker: int):
now_ms = time_ms
self.win_fast.add(time_ms, qty, price, is_buyer_maker)
self.win_mid.add(time_ms, qty, price, is_buyer_maker)
self.win_day.add(time_ms, qty, price, is_buyer_maker)
self.win_vwap.add(time_ms, qty, price, is_buyer_maker)
self.atr_calc.update(time_ms, price)
self.win_fast.trim(now_ms)
self.win_mid.trim(now_ms)
self.win_day.trim(now_ms)
self.win_vwap.trim(now_ms)
self.last_processed_id = agg_id
self.last_trade_price = price # 最新成交价用于entry_price
# tiered_cvd_whale 实时累计(>$100k 为巨鲸)
usd_val = price * qty
if usd_val >= 100_000:
self._whale_trades.append((time_ms, usd_val, bool(is_buyer_maker)))
# 修剪15分钟窗口
cutoff = now_ms - self.WHALE_WINDOW_MS
while self._whale_trades and self._whale_trades[0][0] < cutoff:
self._whale_trades.popleft()
@property
def whale_cvd_ratio(self) -> float:
"""巨鲸净CVD比率[-1,1]基于最近15分钟>$100k成交"""
buy_usd = sum(t[1] for t in self._whale_trades if not t[2])
sell_usd = sum(t[1] for t in self._whale_trades if t[2])
total = buy_usd + sell_usd
return (buy_usd - sell_usd) / total if total > 0 else 0.0
def compute_p95_p99(self) -> tuple:
if len(self.win_day.trades) < 100:
return 5.0, 10.0
qtys = sorted([t[1] for t in self.win_day.trades])
n = len(qtys)
p95 = qtys[int(n * 0.95)]
p99 = qtys[int(n * 0.99)]
if "BTC" in self.symbol:
p95 = max(p95, 5.0); p99 = max(p99, 10.0)
else:
p95 = max(p95, 50.0); p99 = max(p99, 100.0)
return p95, p99
def update_large_trades(self, now_ms: int, p99: float):
cutoff = now_ms - 15 * 60 * 1000
while self.recent_large_trades and self.recent_large_trades[0][0] < cutoff:
self.recent_large_trades.popleft()
# 只检查新trade避免重复添加
seen = set(t[0] for t in self.recent_large_trades) # time_ms作为去重key
for t in self.win_fast.trades:
if t[1] >= p99 and t[0] > cutoff and t[0] not in seen:
self.recent_large_trades.append((t[0], t[1], t[3]))
seen.add(t[0])
def build_evaluation_snapshot(self, now_ms: int) -> dict:
cvd_fast = self.win_fast.cvd
cvd_mid = self.win_mid.cvd
cvd_day = self.win_day.cvd
vwap = self.win_vwap.vwap
atr = self.atr_calc.atr
atr_pct = self.atr_calc.atr_percentile
p95, p99 = self.compute_p95_p99()
self.update_large_trades(now_ms, p99)
price = self.last_trade_price if self.last_trade_price > 0 else vwap # 用最新成交价非VWAP
cvd_fast_slope = cvd_fast - self.prev_cvd_fast
cvd_fast_accel = cvd_fast_slope - self.prev_cvd_fast_slope
self.prev_cvd_fast = cvd_fast
self.prev_cvd_fast_slope = cvd_fast_slope
oi_value = to_float(self.market_indicators.get("open_interest_hist"))
if oi_value is None or self.prev_oi_value == 0:
oi_change = 0.0
environment_score = 10
else:
oi_change = (oi_value - self.prev_oi_value) / self.prev_oi_value if self.prev_oi_value > 0 else 0.0
if oi_change >= 0.03:
environment_score = 15
elif oi_change > 0:
environment_score = 10
else:
environment_score = 5
if oi_value is not None and oi_value > 0:
self.prev_oi_value = oi_value
return {
"cvd_fast": cvd_fast,
"cvd_mid": cvd_mid,
"cvd_day": cvd_day,
"vwap": vwap,
"atr": atr,
"atr_value": atr, # V5.3: ATR绝对值快照用于feature_events落库
"atr_pct": atr_pct,
"p95": p95,
"p99": p99,
"price": price,
"cvd_fast_slope": cvd_fast_slope,
"cvd_fast_accel": cvd_fast_accel,
"oi_change": oi_change,
"environment_score": environment_score,
"oi_value": oi_value,
}
def fetch_recent_liquidations(self, window_ms: int = 300000):
"""Fetch last 5min liquidation totals from liquidations table"""
now_ms = int(time.time() * 1000)
cutoff = now_ms - window_ms
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT
COALESCE(SUM(CASE WHEN side='LONG' THEN usd_value ELSE 0 END), 0) as long_liq,
COALESCE(SUM(CASE WHEN side='SHORT' THEN usd_value ELSE 0 END), 0) as short_liq
FROM liquidations
WHERE symbol=%s AND trade_time >= %s
""",
(self.symbol, cutoff),
)
row = cur.fetchone()
if row:
return {"long_usd": row[0], "short_usd": row[1]}
return None
def evaluate_signal(self, now_ms: int, strategy_cfg: Optional[dict] = None, snapshot: Optional[dict] = None) -> dict:
strategy_cfg = strategy_cfg or {}
strategy_name = strategy_cfg.get("name", "v51_baseline")
track = strategy_cfg.get("track", "ALT")
# ─── Track Router ───────────────────────────────────────────
# v53 → 统一评分BTC/ETH/XRP/SOL
# v53_alt / v53_btc → 兼容旧策略名,转发到 _evaluate_v53()
# v51/v52 → 原有代码路径(兼容,不修改)
if strategy_name.startswith("v53"):
allowed_symbols = strategy_cfg.get("symbols", [])
if allowed_symbols and self.symbol not in allowed_symbols:
snap = snapshot or self.build_evaluation_snapshot(now_ms)
return self._empty_result(strategy_name, snap)
return self._evaluate_v53(now_ms, strategy_cfg, snapshot)
# ─── 原有V5.1/V5.2评分逻辑(保持不变)────────────────────────
strategy_cfg = strategy_cfg or {}
strategy_name = strategy_cfg.get("name", "v51_baseline")
strategy_threshold = int(strategy_cfg.get("threshold", 75))
enabled_signals = set(strategy_cfg.get("signals", []))
snap = snapshot or self.build_evaluation_snapshot(now_ms)
cvd_fast = snap["cvd_fast"]
cvd_mid = snap["cvd_mid"]
vwap = snap["vwap"]
atr = snap["atr"]
atr_pct = snap["atr_pct"]
p95 = snap["p95"]
p99 = snap["p99"]
price = snap["price"]
cvd_fast_slope = snap["cvd_fast_slope"]
cvd_fast_accel = snap["cvd_fast_accel"]
oi_change = snap["oi_change"]
environment_score = snap["environment_score"]
result = {
"strategy": strategy_name,
"cvd_fast": cvd_fast,
"cvd_mid": cvd_mid,
"cvd_day": snap["cvd_day"],
"cvd_fast_slope": cvd_fast_slope,
"atr": atr,
"atr_pct": atr_pct,
"vwap": vwap,
"price": price,
"p95": p95,
"p99": p99,
"signal": None,
"direction": None,
"score": 0,
"tier": None,
"factors": {},
}
if self.warmup or price == 0 or atr == 0:
return result
# 判断倾向方向(用于评分展示,即使冷却或方向不一致也计算)
no_direction = False
last_signal_ts = self.last_signal_ts.get(strategy_name, 0)
in_cooldown = now_ms - last_signal_ts < COOLDOWN_MS
if cvd_fast > 0 and cvd_mid > 0:
direction = "LONG"
elif cvd_fast < 0 and cvd_mid < 0:
direction = "SHORT"
else:
direction = "LONG" if cvd_fast > 0 else "SHORT"
no_direction = True
# V5.1 五层评分体系总分100方向层可因加速额外+5
# 1) 方向层45分 + 5加速分
direction_score = 0
if (direction == "LONG" and cvd_fast > 0) or (direction == "SHORT" and cvd_fast < 0):
direction_score += 15
if (direction == "LONG" and cvd_mid > 0) or (direction == "SHORT" and cvd_mid < 0):
direction_score += 15
has_adverse_p99 = any(
(direction == "LONG" and lt[2] == 1) or (direction == "SHORT" and lt[2] == 0)
for lt in self.recent_large_trades
)
has_aligned_p99 = any(
(direction == "LONG" and lt[2] == 0) or (direction == "SHORT" and lt[2] == 1)
for lt in self.recent_large_trades
)
if has_aligned_p99:
direction_score += 15
elif not has_adverse_p99:
direction_score += 10
accel_bonus = 0
if "accel" in enabled_signals and (
(direction == "LONG" and cvd_fast_accel > 0) or (direction == "SHORT" and cvd_fast_accel < 0)
):
accel_bonus = int(strategy_cfg.get("accel_bonus", 5))
# 2) 拥挤层20分- market_indicators缺失时给中间分
long_short_ratio = to_float(self.market_indicators.get("long_short_ratio"))
if long_short_ratio is None:
ls_score = 5
elif (direction == "SHORT" and long_short_ratio > 2.0) or (direction == "LONG" and long_short_ratio < 0.5):
ls_score = 10
else:
ls_score = 5
top_trader_position = to_float(self.market_indicators.get("top_trader_position"))
if top_trader_position is None:
top_trader_score = 5
else:
if direction == "LONG":
if top_trader_position >= 0.55:
top_trader_score = 10
elif top_trader_position <= 0.45:
top_trader_score = 0
else:
top_trader_score = 5
else:
if top_trader_position <= 0.45:
top_trader_score = 10
elif top_trader_position >= 0.55:
top_trader_score = 0
else:
top_trader_score = 5
crowding_score = ls_score + top_trader_score
# Funding Rate scoring (拥挤层加分)
# Read from market_indicators table
funding_rate = to_float(self.market_indicators.get("funding_rate"))
fr_score = 0.0
if "funding_rate" in enabled_signals and funding_rate is not None and funding_rate != 0:
max_fr = get_max_fr(self.symbol)
fr_abs = abs(funding_rate)
# 线性映射: raw = (|当前FR| / 历史最大FR) × 5, clamp到5
raw_score = min(fr_abs / max_fr * 5.0, 5.0)
# FR评估"资金费率对当前方向有多有利"0~5分不扣分
# 做多+FR负(空头付费给多头)=有利 → 给分
# 做空+FR正(多头付费给空头)=有利 → 给分
# 其他情况(FR方向不利) → 0分
if (direction == "LONG" and funding_rate < 0) or (direction == "SHORT" and funding_rate > 0):
fr_score = round(raw_score, 2) # 有利,给分
else:
fr_score = 0.0 # 不利,不给分也不扣分
# 4) 确认层15分
confirmation_score = 15 if (
(direction == "LONG" and cvd_fast > 0 and cvd_mid > 0)
or (direction == "SHORT" and cvd_fast < 0 and cvd_mid < 0)
) else 0
# Liquidation scoring (确认层加分)
liq_score = 0
liq_data = None
if "liquidation" in enabled_signals:
liq_data = self.fetch_recent_liquidations()
if liq_data:
liq_long_usd = liq_data.get("long_usd", 0)
liq_short_usd = liq_data.get("short_usd", 0)
total = liq_long_usd + liq_short_usd
if total > 0:
# ratio = 空头清算 / 多头清算
# ratio高 = 空头爆仓多 = 有利于做多
if liq_long_usd > 0:
ratio = liq_short_usd / liq_long_usd
else:
ratio = float("inf") if liq_short_usd > 0 else 1.0
# 梯度评分ratio越极端分越高
def liq_ratio_to_score(r: float) -> int:
if r >= 2.0: return 5
if r >= 1.5: return 3
if r >= 1.2: return 1
return 0
if direction == "LONG":
# 做多:空头爆仓多(ratio高)有利
liq_score = liq_ratio_to_score(ratio)
else:
# 做空:多头爆仓多(ratio低=1/ratio高)有利
inv_ratio = (1.0 / ratio) if ratio > 0 else float("inf")
liq_score = liq_ratio_to_score(inv_ratio)
# 5) 辅助层5分
coinbase_premium = to_float(self.market_indicators.get("coinbase_premium"))
if coinbase_premium is None:
aux_score = 2
elif (direction == "LONG" and coinbase_premium > 0.0005) or (direction == "SHORT" and coinbase_premium < -0.0005):
aux_score = 5
elif abs(coinbase_premium) <= 0.0005:
aux_score = 2
else:
aux_score = 0
# 按策略配置的权重缩放各层分数
weights = strategy_cfg.get("weights", {})
w_direction = weights.get("direction", 45)
w_crowding = weights.get("crowding", 20)
w_fr = weights.get("funding_rate", 0)
w_environment = weights.get("environment", 15)
w_confirmation = weights.get("confirmation", 15)
w_liq = weights.get("liquidation", 0)
w_auxiliary = weights.get("auxiliary", 5)
# 原始评分范围: direction 0~45, crowding 0~20, environment 0~15, confirmation 0~15, auxiliary 0~5
# FR 0~5 (or -5~5), Liq 0~5
# 缩放到配置权重
scaled_direction = min(round(direction_score / 45 * w_direction) + accel_bonus, w_direction)
scaled_crowding = round(crowding_score / 20 * w_crowding)
scaled_fr = fr_score if w_fr > 0 else 0 # FR already 0~5 range, matches w_fr=5
scaled_environment = round(environment_score / 15 * w_environment)
scaled_confirmation = round(confirmation_score / 15 * w_confirmation)
scaled_liq = liq_score if w_liq > 0 else 0 # Liq already 0~5 range, matches w_liq=5
scaled_auxiliary = round(aux_score / 5 * w_auxiliary)
total_score = scaled_direction + scaled_crowding + scaled_fr + scaled_environment + scaled_confirmation + scaled_liq + scaled_auxiliary
total_score = max(0, min(round(total_score, 1), 100)) # clamp 0~100, 保留1位小数
result["score"] = total_score
result["direction"] = direction
result["factors"] = {
"direction": {
"score": scaled_direction,
"max": w_direction,
"cvd_fast": 15 if ((direction == "LONG" and cvd_fast > 0) or (direction == "SHORT" and cvd_fast < 0)) else 0,
"cvd_mid": 15 if ((direction == "LONG" and cvd_mid > 0) or (direction == "SHORT" and cvd_mid < 0)) else 0,
"p99_flow": 15 if has_aligned_p99 else (10 if not has_adverse_p99 else 0),
"accel_bonus": accel_bonus,
},
"crowding": {"score": scaled_crowding, "max": w_crowding, "long_short_ratio": ls_score, "top_trader_position": top_trader_score},
"environment": {"score": scaled_environment, "max": w_environment, "open_interest_hist": oi_change},
"confirmation": {"score": scaled_confirmation, "max": w_confirmation},
"auxiliary": {"score": scaled_auxiliary, "max": w_auxiliary, "coinbase_premium": coinbase_premium},
"funding_rate": {"score": round(scaled_fr, 2), "max": w_fr, "value": funding_rate, "max_fr_hist": get_max_fr(self.symbol)},
"liquidation": {
"score": scaled_liq,
"max": w_liq,
"long_usd": liq_data.get("long_usd", 0) if liq_data else 0,
"short_usd": liq_data.get("short_usd", 0) if liq_data else 0,
},
}
# 始终输出direction供反向平仓判断不受冷却限制
result["direction"] = direction if not no_direction else None
heavy_threshold = max(strategy_threshold + 10, 85)
if total_score >= heavy_threshold and not no_direction and not in_cooldown:
result["signal"] = direction
result["tier"] = "heavy"
elif total_score >= strategy_threshold and not no_direction and not in_cooldown:
result["signal"] = direction
result["tier"] = "standard"
else:
result["signal"] = None
result["tier"] = None
if result["signal"]:
self.last_signal_ts[strategy_name] = now_ms
self.last_signal_dir[strategy_name] = direction
return result
def _empty_result(self, strategy_name: str, snap: dict) -> dict:
"""返回空评分结果symbol不匹配track时使用"""
return {
"strategy": strategy_name,
"cvd_fast": snap["cvd_fast"], "cvd_mid": snap["cvd_mid"],
"cvd_day": snap["cvd_day"], "cvd_fast_slope": snap["cvd_fast_slope"],
"atr": snap["atr"], "atr_value": snap.get("atr_value", snap["atr"]),
"atr_pct": snap["atr_pct"], "vwap": snap["vwap"], "price": snap["price"],
"p95": snap["p95"], "p99": snap["p99"],
"signal": None, "direction": None, "score": 0, "tier": None, "factors": {},
}
def _evaluate_v53(self, now_ms: int, strategy_cfg: dict, snapshot: Optional[dict] = None) -> dict:
"""
V5.3 统一评分BTC/ETH/XRP/SOL
架构:四层评分 55/25/15/5 + per-symbol 四门控制
- 门1波动率下限atr/price
- 门2CVD共振fast+mid同向
- 门3OBI否决实时WebSocketfallback DB
- 门4期现背离否决实时WebSocketfallback DB
BTC额外whale_cvd_ratio>$100k巨鲸CVD
"""
strategy_name = strategy_cfg.get("name", "v53")
strategy_threshold = int(strategy_cfg.get("threshold", 75))
flip_threshold = int(strategy_cfg.get("flip_threshold", 85))
is_fast = strategy_name.endswith("_fast")
snap = snapshot or self.build_evaluation_snapshot(now_ms)
# v53_fast: 用自定义短窗口重算 cvd_fast / cvd_mid
if is_fast:
fast_ms = int(strategy_cfg.get("cvd_window_fast_ms", 5 * 60 * 1000))
mid_ms = int(strategy_cfg.get("cvd_window_mid_ms", 30 * 60 * 1000))
cutoff_fast = now_ms - fast_ms
cutoff_mid = now_ms - mid_ms
buy_f = sell_f = buy_m = sell_m = 0.0
for t_ms, qty, _price, ibm in self.win_fast.trades:
if t_ms >= cutoff_fast:
if ibm == 0: buy_f += qty
else: sell_f += qty
# mid 从 win_mid 中读win_mid 窗口是4h包含30min内数据
for t_ms, qty, _price, ibm in self.win_mid.trades:
if t_ms >= cutoff_mid:
if ibm == 0: buy_m += qty
else: sell_m += qty
cvd_fast = buy_f - sell_f
cvd_mid = buy_m - sell_m
else:
cvd_fast = snap["cvd_fast"]
cvd_mid = snap["cvd_mid"]
price = snap["price"]
atr = snap["atr"]
atr_value = snap.get("atr_value", atr)
cvd_fast_accel = snap["cvd_fast_accel"]
environment_score_raw = snap["environment_score"]
result = self._empty_result(strategy_name, snap)
if self.warmup or price == 0 or atr == 0:
return result
last_signal_ts = self.last_signal_ts.get(strategy_name, 0)
in_cooldown = now_ms - last_signal_ts < COOLDOWN_MS
# ── Per-symbol 四门参数 ────────────────────────────────────
symbol_gates = (strategy_cfg.get("symbol_gates") or {}).get(self.symbol, {})
min_vol = float(symbol_gates.get("min_vol_threshold", 0.002))
whale_usd = float(symbol_gates.get("whale_threshold_usd", 50000))
obi_veto = float(symbol_gates.get("obi_veto_threshold", 0.35))
spd_veto = float(symbol_gates.get("spot_perp_divergence_veto", 0.005))
gate_block = None
# 门1波动率下限
atr_pct_price = atr / price if price > 0 else 0
if atr_pct_price < min_vol:
gate_block = f"low_vol({atr_pct_price:.4f}<{min_vol})"
# 门2CVD共振方向门
if cvd_fast > 0 and cvd_mid > 0:
direction = "LONG"
cvd_resonance = 30
no_direction = False
elif cvd_fast < 0 and cvd_mid < 0:
direction = "SHORT"
cvd_resonance = 30
no_direction = False
else:
direction = "LONG" if cvd_fast > 0 else "SHORT"
cvd_resonance = 0
no_direction = True
if not gate_block:
gate_block = "no_direction_consensus"
# 门3鲸鱼否决BTC用whale_cvd_ratioALT用大单对立
if not gate_block and not no_direction:
if self.symbol == "BTCUSDT":
# BTC巨鲸CVD净方向与信号方向冲突时否决
whale_cvd = self.whale_cvd_ratio if self._whale_trades else to_float(self.market_indicators.get("tiered_cvd_whale")) or 0.0
whale_threshold_pct = float(symbol_gates.get("whale_flow_threshold_pct", 0.5)) / 100
if (direction == "LONG" and whale_cvd < -whale_threshold_pct) or \
(direction == "SHORT" and whale_cvd > whale_threshold_pct):
gate_block = f"whale_cvd_veto({whale_cvd:.3f})"
else:
# ALTrecent_large_trades 里有对立大单则否决
whale_adverse = any(
(direction == "LONG" and lt[2] == 1 and lt[1] * price >= whale_usd) or
(direction == "SHORT" and lt[2] == 0 and lt[1] * price >= whale_usd)
for lt in self.recent_large_trades
)
whale_aligned = any(
(direction == "LONG" and lt[2] == 0 and lt[1] * price >= whale_usd) or
(direction == "SHORT" and lt[2] == 1 and lt[1] * price >= whale_usd)
for lt in self.recent_large_trades
)
if whale_adverse and not whale_aligned:
gate_block = f"whale_adverse(>${whale_usd/1000:.0f}k)"
# 门4OBI否决实时WS优先fallback DB
obi_raw = self.rt_obi if self.rt_obi != 0.0 else to_float(self.market_indicators.get("obi_depth_10"))
if not gate_block and not no_direction and obi_raw is not None:
if direction == "LONG" and obi_raw < -obi_veto:
gate_block = f"obi_veto({obi_raw:.3f}<-{obi_veto})"
elif direction == "SHORT" and obi_raw > obi_veto:
gate_block = f"obi_veto({obi_raw:.3f}>{obi_veto})"
# 门5期现背离否决实时WS优先fallback DB
spot_perp_div = self.rt_spot_perp_div if self.rt_spot_perp_div != 0.0 else to_float(self.market_indicators.get("spot_perp_divergence"))
if not gate_block and not no_direction and spot_perp_div is not None:
if (direction == "LONG" and spot_perp_div < -spd_veto) or \
(direction == "SHORT" and spot_perp_div > spd_veto):
gate_block = f"spd_veto({spot_perp_div:.4f})"
gate_passed = gate_block is None
# ── Direction Layer55分─────────────────────────────────
has_adverse_p99 = any(
(direction == "LONG" and lt[2] == 1) or (direction == "SHORT" and lt[2] == 0)
for lt in self.recent_large_trades
)
has_aligned_p99 = any(
(direction == "LONG" and lt[2] == 0) or (direction == "SHORT" and lt[2] == 1)
for lt in self.recent_large_trades
)
p99_flow = 20 if has_aligned_p99 else (10 if not has_adverse_p99 else 0)
accel_bonus = 5 if (
(direction == "LONG" and cvd_fast_accel > 0) or
(direction == "SHORT" and cvd_fast_accel < 0)
) else 0
# v53_fastaccel 独立触发路径(不要求 cvd 双线同向)
accel_independent_score = 0
if is_fast and not no_direction:
accel_cfg = strategy_cfg.get("accel_independent", {})
if accel_cfg.get("enabled", False):
# accel 足够大 + p99 大单同向 → 独立给分
accel_strong = (
(direction == "LONG" and cvd_fast_accel > 0 and has_aligned_p99) or
(direction == "SHORT" and cvd_fast_accel < 0 and has_aligned_p99)
)
if accel_strong:
accel_independent_score = int(accel_cfg.get("min_direction_score", 35))
direction_score = max(min(cvd_resonance + p99_flow + accel_bonus, 55), accel_independent_score)
# ── Crowding Layer25分─────────────────────────────────
long_short_ratio = to_float(self.market_indicators.get("long_short_ratio"))
if long_short_ratio is None:
ls_score = 7
elif (direction == "SHORT" and long_short_ratio > 2.0) or (direction == "LONG" and long_short_ratio < 0.5):
ls_score = 15
elif (direction == "SHORT" and long_short_ratio > 1.5) or (direction == "LONG" and long_short_ratio < 0.7):
ls_score = 10
elif (direction == "SHORT" and long_short_ratio > 1.0) or (direction == "LONG" and long_short_ratio < 1.0):
ls_score = 7
else:
ls_score = 0
top_trader_position = to_float(self.market_indicators.get("top_trader_position"))
if top_trader_position is None:
top_trader_score = 5
else:
if direction == "LONG":
top_trader_score = 10 if top_trader_position >= 0.55 else (0 if top_trader_position <= 0.45 else 5)
else:
top_trader_score = 10 if top_trader_position <= 0.45 else (0 if top_trader_position >= 0.55 else 5)
crowding_score = min(ls_score + top_trader_score, 25)
# ── Environment Layer15分──────────────────────────────
# OI变化率基础分v53: 0~15
oi_base_score = round(environment_score_raw / 15 * 10) # 缩到10分
# v53_fastOBI 正向加分5分放入 environment 层)
obi_bonus = 0
if is_fast and obi_raw is not None:
obi_cfg = strategy_cfg.get("obi_scoring", {})
strong_thr = float(obi_cfg.get("strong_threshold", 0.30))
weak_thr = float(obi_cfg.get("weak_threshold", 0.15))
strong_sc = int(obi_cfg.get("strong_score", 5))
weak_sc = int(obi_cfg.get("weak_score", 3))
obi_aligned = (direction == "LONG" and obi_raw > 0) or (direction == "SHORT" and obi_raw < 0)
obi_abs = abs(obi_raw)
if obi_aligned:
if obi_abs >= strong_thr:
obi_bonus = strong_sc
elif obi_abs >= weak_thr:
obi_bonus = weak_sc
environment_score = min(oi_base_score + obi_bonus, 15) if is_fast else round(environment_score_raw / 15 * 15)
# ── Auxiliary Layer5分────────────────────────────────
coinbase_premium = to_float(self.market_indicators.get("coinbase_premium"))
if coinbase_premium is None:
aux_score = 2
elif (direction == "LONG" and coinbase_premium > 0.0005) or (direction == "SHORT" and coinbase_premium < -0.0005):
aux_score = 5
elif abs(coinbase_premium) <= 0.0005:
aux_score = 2
else:
aux_score = 0
total_score = min(direction_score + crowding_score + environment_score + aux_score, 100)
total_score = max(0, round(total_score, 1))
if not gate_passed:
total_score = 0
# whale_cvd for BTC display
whale_cvd_display = (self.whale_cvd_ratio if self._whale_trades else to_float(self.market_indicators.get("tiered_cvd_whale"))) if self.symbol == "BTCUSDT" else None
result.update({
"score": total_score,
"direction": direction if (not no_direction and gate_passed) else None,
"atr_value": atr_value,
"cvd_fast_5m": cvd_fast if is_fast else None, # v53_fast: 存5m实算CVD
"factors": {
"track": "BTC" if self.symbol == "BTCUSDT" else "ALT",
"gate_passed": gate_passed,
"gate_block": gate_block,
"atr_pct_price": round(atr_pct_price, 5),
"obi_raw": obi_raw,
"spot_perp_div": spot_perp_div,
"whale_cvd_ratio": whale_cvd_display,
"direction": {
"score": direction_score, "max": 55,
"cvd_resonance": cvd_resonance, "p99_flow": p99_flow, "accel_bonus": accel_bonus,
},
"crowding": {
"score": crowding_score, "max": 25,
"lsr_contrarian": ls_score, "top_trader_position": top_trader_score,
},
"environment": {"score": environment_score, "max": 15, "oi_base": oi_base_score if is_fast else environment_score, "obi_bonus": obi_bonus},
"auxiliary": {"score": aux_score, "max": 5, "coinbase_premium": coinbase_premium},
},
})
if not no_direction and gate_passed and not in_cooldown:
if total_score >= flip_threshold:
result["signal"] = direction
result["tier"] = "heavy"
elif total_score >= strategy_threshold:
result["signal"] = direction
result["tier"] = "standard"
if result["signal"]:
self.last_signal_ts[strategy_name] = now_ms
self.last_signal_dir[strategy_name] = direction
return result
def _evaluate_v53_alt(self, now_ms: int, strategy_cfg: dict, snapshot: Optional[dict] = None) -> dict:
"""已废弃,由 _evaluate_v53() 统一处理,保留供兼容"""
return self._evaluate_v53(now_ms, strategy_cfg, snapshot)
def _evaluate_v53_btc(self, now_ms: int, strategy_cfg: dict, snapshot: Optional[dict] = None) -> dict:
"""已废弃,由 _evaluate_v53() 统一处理,保留供兼容"""
return self._evaluate_v53(now_ms, strategy_cfg, snapshot)
# ─── PG DB操作 ───────────────────────────────────────────────────
# ─── PG DB操作 ───────────────────────────────────────────────────
def load_historical(state: SymbolState, window_ms: int):
now_ms = int(time.time() * 1000)
start_ms = now_ms - window_ms
count = 0
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT agg_id, price, qty, time_ms, is_buyer_maker FROM agg_trades "
"WHERE symbol = %s AND time_ms >= %s ORDER BY agg_id ASC",
(state.symbol, start_ms)
)
while True:
rows = cur.fetchmany(5000)
if not rows:
break
for r in rows:
state.process_trade(r[0], r[3], r[1], r[2], r[4])
count += 1
logger.info(f"[{state.symbol}] 冷启动完成: 加载{count:,}条历史数据 (窗口={window_ms//3600000}h)")
state.warmup = False
def fetch_new_trades(symbol: str, last_id: int) -> list:
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT agg_id, price, qty, time_ms, is_buyer_maker FROM agg_trades "
"WHERE symbol = %s AND agg_id > %s ORDER BY agg_id ASC LIMIT 10000",
(symbol, last_id)
)
return [{"agg_id": r[0], "price": r[1], "qty": r[2], "time_ms": r[3], "is_buyer_maker": r[4]}
for r in cur.fetchall()]
def save_indicator(ts: int, symbol: str, result: dict, strategy: str = "v52_8signals",
strategy_id: Optional[str] = None, strategy_name_snapshot: Optional[str] = None):
with get_sync_conn() as conn:
with conn.cursor() as cur:
import json as _json3
factors_json = _json3.dumps(result.get("factors")) if result.get("factors") else None
cvd_fast_5m = result.get("cvd_fast_5m") # v53_fast 专用5m窗口CVD其他策略为None
cur.execute(
"INSERT INTO signal_indicators "
"(ts,symbol,strategy,cvd_fast,cvd_mid,cvd_day,cvd_fast_slope,atr_5m,atr_percentile,atr_value,vwap_30m,price,p95_qty,p99_qty,score,signal,factors,cvd_fast_5m,strategy_id,strategy_name_snapshot) "
"VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
(ts, symbol, strategy, result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["cvd_fast_slope"],
result["atr"], result["atr_pct"], result.get("atr_value", result["atr"]),
result["vwap"], result["price"],
result["p95"], result["p99"], result["score"], result.get("signal"), factors_json,
cvd_fast_5m, strategy_id, strategy_name_snapshot)
)
# 有信号时通知live_executor
if result.get("signal"):
cur.execute("NOTIFY new_signal, %s", (f"{symbol}:{strategy}:{result['signal']}:{result['score']}",))
conn.commit()
def save_feature_event(ts: int, symbol: str, result: dict, strategy: str):
"""
V5.3 专用:每次评分后把 raw features + score 层写入 signal_feature_events。
只对 v53_alt / v53_btc 调用,其他策略跳过。
"""
if not strategy.startswith("v53"):
return
f = result.get("factors") or {}
track = f.get("track", "ALT")
side = result.get("direction") or ("LONG" if result.get("score", 0) >= 0 else "SHORT")
score_direction = (f.get("direction") or {}).get("score", 0) if track == "ALT" else (f.get("direction") or {}).get("score", 0)
score_crowding = (f.get("crowding") or {}).get("score", 0)
score_env = (f.get("environment") or {}).get("score", 0)
score_aux = (f.get("auxiliary") or {}).get("score", 0)
gate_passed = f.get("gate_passed", True)
block_reason = f.get("gate_block") or f.get("block_reason")
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO signal_feature_events
(ts, symbol, track, side, strategy, strategy_version,
cvd_fast_raw, cvd_mid_raw, cvd_day_raw, cvd_fast_slope_raw,
p95_qty_raw, p99_qty_raw,
atr_value, atr_percentile,
oi_delta_raw, ls_ratio_raw, top_pos_raw, coinbase_premium_raw,
obi_raw, tiered_cvd_whale_raw,
score_direction, score_crowding, score_environment, score_aux, score_total,
gate_passed, block_reason, price)
VALUES
(%s,%s,%s,%s,%s,%s,
%s,%s,%s,%s,
%s,%s,
%s,%s,
%s,%s,%s,%s,
%s,%s,
%s,%s,%s,%s,%s,
%s,%s,%s)
""",
(
ts, symbol, track, side, strategy, "v5.3",
result.get("cvd_fast"), result.get("cvd_mid"), result.get("cvd_day"), result.get("cvd_fast_slope"),
result.get("p95"), result.get("p99"),
result.get("atr_value", result.get("atr")), result.get("atr_pct"),
result.get("oi_delta"), result.get("ls_ratio"), result.get("top_trader_position"),
(f.get("auxiliary") or {}).get("coinbase_premium"),
f.get("obi_raw"), f.get("whale_cvd_ratio"),
score_direction, score_crowding, score_env, score_aux, result.get("score", 0),
gate_passed, block_reason, result.get("price"),
)
)
conn.commit()
def save_indicator_1m(ts: int, symbol: str, result: dict):
bar_ts = (ts // 60000) * 60000
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT id FROM signal_indicators_1m WHERE ts=%s AND symbol=%s", (bar_ts, symbol))
if cur.fetchone():
cur.execute(
"UPDATE signal_indicators_1m SET cvd_fast=%s,cvd_mid=%s,cvd_day=%s,atr_5m=%s,vwap_30m=%s,price=%s,score=%s,signal=%s WHERE ts=%s AND symbol=%s",
(result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["atr"], result["vwap"],
result["price"], result["score"], result.get("signal"), bar_ts, symbol)
)
else:
cur.execute(
"INSERT INTO signal_indicators_1m (ts,symbol,cvd_fast,cvd_mid,cvd_day,atr_5m,vwap_30m,price,score,signal) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
(bar_ts, symbol, result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["atr"],
result["vwap"], result["price"], result["score"], result.get("signal"))
)
conn.commit()
# ─── 模拟盘 ──────────────────────────────────────────────────────
def paper_open_trade(
symbol: str,
direction: str,
price: float,
score: int,
tier: str,
atr: float,
now_ms: int,
factors: dict = None,
strategy: str = "v51_baseline",
tp_sl: Optional[dict] = None,
strategy_id: Optional[str] = None,
strategy_name_snapshot: Optional[str] = None,
):
"""模拟开仓"""
import json as _json3
if atr <= 0:
return
sl_multiplier = float((tp_sl or {}).get("sl_multiplier", 2.0))
tp1_multiplier = float((tp_sl or {}).get("tp1_multiplier", 1.5))
tp2_multiplier = float((tp_sl or {}).get("tp2_multiplier", 3.0))
risk_distance = sl_multiplier * atr # 1R = SL距离 = sl_multiplier × ATR
if direction == "LONG":
sl = price - sl_multiplier * atr
tp1 = price + tp1_multiplier * atr
tp2 = price + tp2_multiplier * atr
else:
sl = price + sl_multiplier * atr
tp1 = price - tp1_multiplier * atr
tp2 = price - tp2_multiplier * atr
# SL 合理性校验:实际距离必须在 risk_distance 的 80%~120% 范围内
actual_sl_dist = abs(sl - price)
if actual_sl_dist < risk_distance * 0.8 or actual_sl_dist > risk_distance * 1.2:
logger.error(
f"[{symbol}] ⚠️ SL校验失败拒绝开仓: direction={direction} price={price:.4f} "
f"sl={sl:.4f} actual_dist={actual_sl_dist:.4f} expected={risk_distance:.4f} atr={atr:.4f}"
)
return
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO paper_trades (symbol,direction,score,tier,entry_price,entry_ts,tp1_price,tp2_price,sl_price,atr_at_entry,score_factors,strategy,risk_distance,strategy_id,strategy_name_snapshot) "
"VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
(
symbol,
direction,
score,
tier,
price,
now_ms,
tp1,
tp2,
sl,
atr,
_json3.dumps(factors) if factors else None,
strategy,
risk_distance,
strategy_id,
strategy_name_snapshot,
),
)
conn.commit()
logger.info(
f"[{symbol}] 📝 模拟开仓: {direction} @ {price:.2f} score={score} tier={tier} strategy={strategy} "
f"TP1={tp1:.2f} TP2={tp2:.2f} SL={sl:.2f}"
)
def paper_has_active_position(symbol: str, strategy: Optional[str] = None) -> bool:
"""检查该币种是否有活跃持仓"""
with get_sync_conn() as conn:
with conn.cursor() as cur:
if strategy:
cur.execute(
"SELECT COUNT(*) FROM paper_trades WHERE symbol=%s AND strategy=%s AND status IN ('active','tp1_hit')",
(symbol, strategy),
)
else:
cur.execute("SELECT COUNT(*) FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit')", (symbol,))
return cur.fetchone()[0] > 0
def paper_get_active_direction(symbol: str, strategy: Optional[str] = None) -> str | None:
"""获取该币种活跃持仓的方向无持仓返回None"""
with get_sync_conn() as conn:
with conn.cursor() as cur:
if strategy:
cur.execute(
"SELECT direction FROM paper_trades WHERE symbol=%s AND strategy=%s AND status IN ('active','tp1_hit') LIMIT 1",
(symbol, strategy),
)
else:
cur.execute(
"SELECT direction FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit') LIMIT 1",
(symbol,),
)
row = cur.fetchone()
return row[0] if row else None
def paper_close_by_signal(symbol: str, current_price: float, now_ms: int, strategy: Optional[str] = None):
"""反向信号平仓:按当前价平掉该币种所有活跃仓位"""
with get_sync_conn() as conn:
with conn.cursor() as cur:
if strategy:
cur.execute(
"SELECT id, direction, entry_price, tp1_hit, atr_at_entry, risk_distance "
"FROM paper_trades WHERE symbol=%s AND strategy=%s AND status IN ('active','tp1_hit')",
(symbol, strategy),
)
else:
cur.execute(
"SELECT id, direction, entry_price, tp1_hit, atr_at_entry, risk_distance "
"FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit')",
(symbol,),
)
positions = cur.fetchall()
for pos in positions:
pid, direction, entry_price, tp1_hit, atr_entry, rd_db = pos
risk_distance = rd_db if rd_db and rd_db > 0 else abs(entry_price * 0.01)
if direction == "LONG":
pnl_r = (current_price - entry_price) / risk_distance if risk_distance > 0 else 0
else:
pnl_r = (entry_price - current_price) / risk_distance if risk_distance > 0 else 0
# 扣手续费
fee_r = (2 * PAPER_FEE_RATE * entry_price) / risk_distance if risk_distance > 0 else 0
pnl_r -= fee_r
cur.execute(
"UPDATE paper_trades SET status='signal_flip', exit_price=%s, exit_ts=%s, pnl_r=%s WHERE id=%s",
(current_price, now_ms, round(pnl_r, 4), pid)
)
logger.info(
f"[{symbol}] 📝 反向信号平仓: {direction} @ {current_price:.2f} pnl={pnl_r:+.2f}R"
f"{f' strategy={strategy}' if strategy else ''}"
)
conn.commit()
def paper_active_count(strategy: Optional[str] = None) -> int:
"""当前活跃持仓总数(按策略独立计数)"""
with get_sync_conn() as conn:
with conn.cursor() as cur:
if strategy:
cur.execute("SELECT COUNT(*) FROM paper_trades WHERE strategy=%s AND status IN ('active','tp1_hit')", (strategy,))
else:
cur.execute("SELECT COUNT(*) FROM paper_trades WHERE status IN ('active','tp1_hit')")
return cur.fetchone()[0]
# ─── 实时 WebSocket 数据OBI + 期现背离)────────────────────────
_REALTIME_STATES: dict = {} # symbol -> SymbolState在main()里注入
async def _ws_obi_stream(symbol: str, state):
"""订阅期货盘口深度流,实时更新 state.rt_obi"""
stream = f"{symbol.lower()}@depth10@100ms"
url = f"wss://fstream.binance.com/stream?streams={stream}"
while True:
try:
async with websockets.connect(url, ping_interval=20, ping_timeout=10) as ws:
logger.info(f"[RT-OBI] {symbol} WebSocket connected")
async for raw in ws:
data = json.loads(raw)
book = data.get("data", data)
bids = book.get("b", [])
asks = book.get("a", [])
bid_vol = sum(float(b[1]) for b in bids)
ask_vol = sum(float(a[1]) for a in asks)
total = bid_vol + ask_vol
state.rt_obi = (bid_vol - ask_vol) / total if total > 0 else 0.0
except Exception as e:
logger.warning(f"[RT-OBI] {symbol} 断线重连: {e}")
await asyncio.sleep(3)
async def _ws_spot_perp_stream(symbol: str, state):
"""
订阅现货 bookTicker最优买卖价+ 期货 markPrice计算期现背离
spot_mid = (best_bid + best_ask) / 2
divergence = (spot_mid - mark_price) / mark_price
"""
spot_stream = f"{symbol.lower()}@bookTicker"
perp_stream = f"{symbol.lower()}@markPrice@1s"
spot_url = f"wss://stream.binance.com:9443/stream?streams={spot_stream}"
perp_url = f"wss://fstream.binance.com/stream?streams={perp_stream}"
spot_mid = [0.0] # mutable container for closure
mark_price = [0.0]
async def read_spot():
while True:
try:
async with websockets.connect(spot_url, ping_interval=20, ping_timeout=10) as ws:
logger.info(f"[RT-SPD] {symbol} spot WebSocket connected")
async for raw in ws:
d = json.loads(raw).get("data", json.loads(raw))
bid = float(d.get("b", 0) or 0)
ask = float(d.get("a", 0) or 0)
if bid > 0 and ask > 0:
spot_mid[0] = (bid + ask) / 2
if mark_price[0] > 0:
state.rt_spot_perp_div = (spot_mid[0] - mark_price[0]) / mark_price[0]
except Exception as e:
logger.warning(f"[RT-SPD] {symbol} spot 断线: {e}")
await asyncio.sleep(3)
async def read_perp():
while True:
try:
async with websockets.connect(perp_url, ping_interval=20, ping_timeout=10) as ws:
logger.info(f"[RT-SPD] {symbol} perp WebSocket connected")
async for raw in ws:
d = json.loads(raw).get("data", json.loads(raw))
mp = float(d.get("p", 0) or d.get("markPrice", 0) or 0)
if mp > 0:
mark_price[0] = mp
if spot_mid[0] > 0:
state.rt_spot_perp_div = (spot_mid[0] - mark_price[0]) / mark_price[0]
except Exception as e:
logger.warning(f"[RT-SPD] {symbol} perp 断线: {e}")
await asyncio.sleep(3)
await asyncio.gather(read_spot(), read_perp())
async def _realtime_ws_runner(states: dict):
"""统一启动所有实时WebSocket协程BTC + ALT (ETH/XRP/SOL)"""
coros = []
for sym, state in states.items():
# OBI流所有symbol都接perp depth10
coros.append(_ws_obi_stream(sym, state))
# 期现背离流:只有有现货+合约的币种BTC/ETH/XRP/SOL都有
coros.append(_ws_spot_perp_stream(sym, state))
if coros:
await asyncio.gather(*coros)
def start_realtime_ws(states: dict):
"""在独立线程里跑asyncio event loop驱动实时WebSocket采集"""
def _run():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(_realtime_ws_runner(states))
except Exception as e:
logger.error(f"[RT-WS] event loop 异常: {e}")
finally:
loop.close()
t = threading.Thread(target=_run, daemon=True, name="realtime-ws")
t.start()
logger.info("[RT-WS] 实时WebSocket后台线程已启动BTC OBI + 期现背离)")
# ─── 主循环 ──────────────────────────────────────────────────────
def main():
init_schema()
# V5.4: 优先从 DB 加载策略配置,失败 fallback 到 JSON
strategy_configs = load_strategy_configs_from_db()
if not strategy_configs:
logger.warning("[DB] 未能从 DB 加载策略配置,使用 JSON fallback")
strategy_configs = load_strategy_configs()
strategy_names = [cfg.get("name", "unknown") for cfg in strategy_configs]
logger.info(f"已加载策略配置: {', '.join(strategy_names)}")
primary_strategy_name = "v52_8signals" if any(cfg.get("name") == "v52_8signals" for cfg in strategy_configs) else strategy_names[0]
states = {sym: SymbolState(sym) for sym in SYMBOLS}
for sym, state in states.items():
load_historical(state, WINDOW_MID)
logger.info("=== Signal Engine (PG) 启动完成 ===")
# 启动实时WebSocket后台线程BTC OBI + 期现背离)
start_realtime_ws(states)
last_1m_save = {}
cycle = 0
warmup_cycles = 3 # 启动后跳过前3轮45秒避免冷启动信号开仓
while True:
try:
now_ms = int(time.time() * 1000)
# 每轮重新加载配置支持API热更新开关
load_paper_config()
for sym, state in states.items():
new_trades = fetch_new_trades(sym, state.last_processed_id)
for t in new_trades:
state.process_trade(t["agg_id"], t["time_ms"], t["price"], t["qty"], t["is_buyer_maker"])
state.market_indicators = fetch_market_indicators(sym)
snapshot = state.build_evaluation_snapshot(now_ms)
strategy_results: list[tuple[dict, dict]] = []
for strategy_cfg in strategy_configs:
strategy_result = state.evaluate_signal(now_ms, strategy_cfg=strategy_cfg, snapshot=snapshot)
strategy_results.append((strategy_cfg, strategy_result))
# 每个策略独立存储indicator
for strategy_cfg, strategy_result in strategy_results:
sname = strategy_cfg.get("name", "v51_baseline")
sid = strategy_cfg.get("strategy_id")
ssnap = strategy_cfg.get("strategy_name_snapshot")
save_indicator(now_ms, sym, strategy_result, strategy=sname,
strategy_id=sid, strategy_name_snapshot=ssnap)
save_feature_event(now_ms, sym, strategy_result, strategy=sname)
# 1m表仍用primary图表用
primary_result = strategy_results[0][1]
for strategy_cfg, strategy_result in strategy_results:
if strategy_cfg.get("name") == primary_strategy_name:
primary_result = strategy_result
break
bar_1m = (now_ms // 60000) * 60000
if last_1m_save.get(sym) != bar_1m:
save_indicator_1m(now_ms, sym, primary_result)
last_1m_save[sym] = bar_1m
# 反向信号平仓按策略独立判断score>=75才触发
if warmup_cycles <= 0:
for strategy_cfg, result in strategy_results:
strategy_name = strategy_cfg.get("name", "v51_baseline")
if not is_strategy_enabled(strategy_name):
continue
eval_dir = result.get("direction")
existing_dir = paper_get_active_direction(sym, strategy_name)
if existing_dir and eval_dir and existing_dir != eval_dir and result["score"] >= 75:
paper_close_by_signal(sym, result["price"], now_ms, strategy_name)
logger.info(
f"[{sym}] 📝 反向信号平仓[{strategy_name}]: {existing_dir}{eval_dir} "
f"(score={result['score']})"
)
for strategy_cfg, result in strategy_results:
strategy_name = strategy_cfg.get("name", "v51_baseline")
if result.get("signal"):
logger.info(
f"[{sym}] 🚨 信号[{strategy_name}]: {result['signal']} "
f"score={result['score']} price={result['price']:.1f}"
)
# 模拟盘开仓(需该策略启用 + 跳过冷启动)
if is_strategy_enabled(strategy_name) and warmup_cycles <= 0:
if not paper_has_active_position(sym, strategy_name):
active_count = paper_active_count(strategy_name)
if active_count < PAPER_MAX_POSITIONS:
tier = result.get("tier", "standard")
paper_open_trade(
sym,
result["signal"],
result["price"],
result["score"],
tier,
result["atr"],
now_ms,
factors=result.get("factors"),
strategy=strategy_name,
tp_sl=strategy_cfg.get("tp_sl"),
strategy_id=strategy_cfg.get("strategy_id"),
strategy_name_snapshot=strategy_cfg.get("strategy_name_snapshot"),
)
# 模拟盘持仓检查由paper_monitor.py通过WebSocket实时处理这里不再检查
cycle += 1
if warmup_cycles > 0:
warmup_cycles -= 1
if warmup_cycles == 0:
logger.info("冷启动保护期结束,模拟盘开仓已启用")
# 每10轮(约2-3分钟)热加载配置,不需要重启
if cycle % 10 == 0:
old_strategies = list(PAPER_ENABLED_STRATEGIES)
load_paper_config()
# V5.4: 热重载优先读 DB失败 fallback 到 JSON
new_configs = load_strategy_configs_from_db()
if new_configs:
strategy_configs = new_configs
else:
strategy_configs = load_strategy_configs() # A1: 热重载权重/阈值/TP/SL
strategy_names = [cfg.get("name", "unknown") for cfg in strategy_configs]
primary_strategy_name = "v52_8signals" if any(cfg.get("name") == "v52_8signals" for cfg in strategy_configs) else strategy_names[0]
if list(PAPER_ENABLED_STRATEGIES) != old_strategies:
logger.info(f"📋 配置热加载: enabled_strategies={PAPER_ENABLED_STRATEGIES}")
for sym, state in states.items():
logger.info(
f"[{sym}] 状态: CVD_fast={state.win_fast.cvd:.1f} "
f"CVD_mid={state.win_mid.cvd:.1f} ATR={state.atr_calc.atr:.2f} "
f"({state.atr_calc.atr_percentile:.0f}%) VWAP={state.win_vwap.vwap:.1f}"
)
except Exception as e:
logger.error(f"循环异常: {e}", exc_info=True)
time.sleep(LOOP_INTERVAL)
if __name__ == "__main__":
main()