Implement V5.2 FR/liquidation scoring and strategy AB loop

This commit is contained in:
root 2026-03-01 11:54:58 +00:00
parent a7600e8db1
commit 732b01691b

View File

@ -18,6 +18,7 @@ signal_engine.py — V5 短线交易信号引擎PostgreSQL版
import logging
import os
import time
import json
from collections import deque
from datetime import datetime, timezone
from typing import Any, Optional
@ -36,6 +37,34 @@ logger = logging.getLogger("signal-engine")
SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"]
LOOP_INTERVAL = 15 # 秒从5改15CPU降60%,信号质量无影响)
STRATEGY_DIR = os.path.join(os.path.dirname(__file__), "strategies")
DEFAULT_STRATEGY_FILES = ["v51_baseline.json", "v52_8signals.json"]
def load_strategy_configs() -> list[dict]:
configs = []
for filename in DEFAULT_STRATEGY_FILES:
path = os.path.join(STRATEGY_DIR, filename)
try:
with open(path, "r", encoding="utf-8") as f:
cfg = json.load(f)
if isinstance(cfg, dict) and cfg.get("name"):
configs.append(cfg)
except FileNotFoundError:
logger.warning(f"策略配置缺失: {path}")
except Exception as e:
logger.error(f"策略配置加载失败 {path}: {e}")
if not configs:
logger.warning("未加载到策略配置回退到v51_baseline默认配置")
configs.append(
{
"name": "v51_baseline",
"threshold": 75,
"signals": ["cvd", "p99", "accel", "ls_ratio", "oi", "coinbase_premium"],
"tp_sl": {"sl_multiplier": 2.0, "tp1_multiplier": 1.5, "tp2_multiplier": 3.0},
}
)
return configs
# ─── 模拟盘配置 ───────────────────────────────────────────────────
PAPER_TRADING_ENABLED = False # 开关范总确认后通过API开启
@ -85,7 +114,7 @@ def fetch_market_indicators(symbol: str) -> dict:
with get_sync_conn() as conn:
with conn.cursor() as cur:
indicators = {}
for ind_type in ["long_short_ratio", "top_trader_position", "open_interest_hist", "coinbase_premium"]:
for ind_type in ["long_short_ratio", "top_trader_position", "open_interest_hist", "coinbase_premium", "funding_rate"]:
cur.execute(
"SELECT value FROM market_indicators WHERE symbol=%s AND indicator_type=%s ORDER BY timestamp_ms DESC LIMIT 1",
(symbol, ind_type),
@ -111,6 +140,8 @@ def fetch_market_indicators(symbol: str) -> dict:
indicators[ind_type] = float(val.get("sumOpenInterestValue", 0))
elif ind_type == "coinbase_premium":
indicators[ind_type] = float(val.get("premium_pct", 0))
elif ind_type == "funding_rate":
indicators[ind_type] = float(val.get("lastFundingRate", 0))
return indicators
@ -227,8 +258,8 @@ class SymbolState:
self.prev_cvd_fast_slope = 0.0
self.prev_oi_value = 0.0
self.market_indicators = fetch_market_indicators(symbol)
self.last_signal_ts = 0
self.last_signal_dir = ""
self.last_signal_ts: dict[str, int] = {}
self.last_signal_dir: dict[str, str] = {}
self.recent_large_trades: deque = deque()
def process_trade(self, agg_id: int, time_ms: int, price: float, qty: float, is_buyer_maker: int):
@ -268,9 +299,10 @@ class SymbolState:
self.recent_large_trades.append((t[0], t[1], t[3]))
seen.add(t[0])
def evaluate_signal(self, now_ms: int) -> dict:
def build_evaluation_snapshot(self, now_ms: int) -> dict:
cvd_fast = self.win_fast.cvd
cvd_mid = self.win_mid.cvd
cvd_day = self.win_day.cvd
vwap = self.win_vwap.vwap
atr = self.atr_calc.atr
atr_pct = self.atr_calc.atr_percentile
@ -282,11 +314,94 @@ class SymbolState:
self.prev_cvd_fast = cvd_fast
self.prev_cvd_fast_slope = cvd_fast_slope
result = {
"cvd_fast": cvd_fast, "cvd_mid": cvd_mid, "cvd_day": self.win_day.cvd,
oi_value = to_float(self.market_indicators.get("open_interest_hist"))
if oi_value is None or self.prev_oi_value == 0:
oi_change = 0.0
environment_score = 10
else:
oi_change = (oi_value - self.prev_oi_value) / self.prev_oi_value if self.prev_oi_value > 0 else 0.0
if oi_change >= 0.03:
environment_score = 15
elif oi_change > 0:
environment_score = 10
else:
environment_score = 5
if oi_value is not None and oi_value > 0:
self.prev_oi_value = oi_value
return {
"cvd_fast": cvd_fast,
"cvd_mid": cvd_mid,
"cvd_day": cvd_day,
"vwap": vwap,
"atr": atr,
"atr_pct": atr_pct,
"p95": p95,
"p99": p99,
"price": price,
"cvd_fast_slope": cvd_fast_slope,
"atr": atr, "atr_pct": atr_pct, "vwap": vwap, "price": price,
"p95": p95, "p99": p99, "signal": None, "direction": None, "score": 0,
"cvd_fast_accel": cvd_fast_accel,
"oi_change": oi_change,
"environment_score": environment_score,
"oi_value": oi_value,
}
def fetch_recent_liquidations(self, window_ms: int = 300000):
"""Fetch last 5min liquidation totals from liquidations table"""
now_ms = int(time.time() * 1000)
cutoff = now_ms - window_ms
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT
COALESCE(SUM(CASE WHEN side='SELL' THEN usd_value ELSE 0 END), 0) as long_liq,
COALESCE(SUM(CASE WHEN side='BUY' THEN usd_value ELSE 0 END), 0) as short_liq
FROM liquidations
WHERE symbol=%s AND trade_time >= %s
""",
(self.symbol, cutoff),
)
row = cur.fetchone()
if row:
return {"long_usd": row[0], "short_usd": row[1]}
return None
def evaluate_signal(self, now_ms: int, strategy_cfg: Optional[dict] = None, snapshot: Optional[dict] = None) -> dict:
strategy_cfg = strategy_cfg or {}
strategy_name = strategy_cfg.get("name", "v51_baseline")
strategy_threshold = int(strategy_cfg.get("threshold", 75))
enabled_signals = set(strategy_cfg.get("signals", []))
snap = snapshot or self.build_evaluation_snapshot(now_ms)
cvd_fast = snap["cvd_fast"]
cvd_mid = snap["cvd_mid"]
vwap = snap["vwap"]
atr = snap["atr"]
atr_pct = snap["atr_pct"]
p95 = snap["p95"]
p99 = snap["p99"]
price = snap["price"]
cvd_fast_slope = snap["cvd_fast_slope"]
cvd_fast_accel = snap["cvd_fast_accel"]
oi_change = snap["oi_change"]
environment_score = snap["environment_score"]
result = {
"strategy": strategy_name,
"cvd_fast": cvd_fast,
"cvd_mid": cvd_mid,
"cvd_day": snap["cvd_day"],
"cvd_fast_slope": cvd_fast_slope,
"atr": atr,
"atr_pct": atr_pct,
"vwap": vwap,
"price": price,
"p95": p95,
"p99": p99,
"signal": None,
"direction": None,
"score": 0,
"tier": None,
"factors": {},
}
@ -296,7 +411,8 @@ class SymbolState:
# 判断倾向方向(用于评分展示,即使冷却或方向不一致也计算)
no_direction = False
in_cooldown = (now_ms - self.last_signal_ts < COOLDOWN_MS)
last_signal_ts = self.last_signal_ts.get(strategy_name, 0)
in_cooldown = now_ms - last_signal_ts < COOLDOWN_MS
if cvd_fast > 0 and cvd_mid > 0:
direction = "LONG"
@ -326,8 +442,10 @@ class SymbolState:
elif not has_adverse_p99:
direction_score += 10
accel_bonus = 0
if (direction == "LONG" and cvd_fast_accel > 0) or (direction == "SHORT" and cvd_fast_accel < 0):
accel_bonus = 5
if "accel" in enabled_signals and (
(direction == "LONG" and cvd_fast_accel > 0) or (direction == "SHORT" and cvd_fast_accel < 0)
):
accel_bonus = int(strategy_cfg.get("accel_bonus", 5))
# 2) 拥挤层20分- market_indicators缺失时给中间分
long_short_ratio = to_float(self.market_indicators.get("long_short_ratio"))
@ -358,24 +476,53 @@ class SymbolState:
top_trader_score = 5
crowding_score = ls_score + top_trader_score
# 3) 环境层15分— OI变化率
oi_value = to_float(self.market_indicators.get("open_interest_hist"))
if oi_value is None or self.prev_oi_value == 0:
environment_score = 10
oi_change = 0.0
# Funding Rate scoring (拥挤层加分)
# Read from market_indicators table
funding_rate = to_float(self.market_indicators.get("funding_rate"))
fr_score = 0
if "funding_rate" in enabled_signals and funding_rate is not None:
fr_abs = abs(funding_rate)
if fr_abs >= 0.001: # extreme ±0.1%
# Extreme: penalize if going WITH the crowd
if (direction == "LONG" and funding_rate > 0.001) or (direction == "SHORT" and funding_rate < -0.001):
fr_score = -5
else:
oi_change = (oi_value - self.prev_oi_value) / self.prev_oi_value if self.prev_oi_value > 0 else 0
if oi_change >= 0.03:
environment_score = 15
elif oi_change > 0:
environment_score = 10
fr_score = 5
elif fr_abs >= 0.0003: # moderate ±0.03%
# Moderate: reward going AGAINST the crowd
if (direction == "LONG" and funding_rate < -0.0003) or (direction == "SHORT" and funding_rate > 0.0003):
fr_score = 5
else:
environment_score = 5
if oi_value is not None and oi_value > 0:
self.prev_oi_value = oi_value
fr_score = 0
# 4) 确认层15分
confirmation_score = 15 if ((direction == "LONG" and cvd_fast > 0 and cvd_mid > 0) or (direction == "SHORT" and cvd_fast < 0 and cvd_mid < 0)) else 0
confirmation_score = 15 if (
(direction == "LONG" and cvd_fast > 0 and cvd_mid > 0)
or (direction == "SHORT" and cvd_fast < 0 and cvd_mid < 0)
) else 0
# Liquidation scoring (确认层加分)
liq_score = 0
liq_data = None
if "liquidation" in enabled_signals:
liq_data = self.fetch_recent_liquidations()
if liq_data:
liq_long_usd = liq_data.get("long_usd", 0)
liq_short_usd = liq_data.get("short_usd", 0)
thresholds = {"BTCUSDT": 500000, "ETHUSDT": 200000, "XRPUSDT": 100000, "SOLUSDT": 100000}
threshold = thresholds.get(self.symbol, 100000)
total = liq_long_usd + liq_short_usd
if total >= threshold:
if liq_short_usd > 0 and liq_long_usd > 0:
ratio = liq_short_usd / liq_long_usd
elif liq_short_usd > 0:
ratio = float("inf")
else:
ratio = 0
if ratio >= 2.0 and direction == "LONG":
liq_score = 5
elif ratio <= 0.5 and direction == "SHORT":
liq_score = 5
# 5) 辅助层5分
coinbase_premium = to_float(self.market_indicators.get("coinbase_premium"))
@ -388,7 +535,7 @@ class SymbolState:
else:
aux_score = 0
total_score = direction_score + accel_bonus + crowding_score + environment_score + confirmation_score + aux_score
total_score = direction_score + accel_bonus + crowding_score + fr_score + environment_score + confirmation_score + liq_score + aux_score
result["score"] = total_score
result["direction"] = direction
result["factors"] = {
@ -403,27 +550,31 @@ class SymbolState:
"environment": {"score": environment_score, "open_interest_hist": oi_change},
"confirmation": {"score": confirmation_score},
"auxiliary": {"score": aux_score, "coinbase_premium": coinbase_premium},
"funding_rate": {"score": fr_score, "value": funding_rate},
"liquidation": {
"score": liq_score,
"long_usd": liq_data.get("long_usd", 0) if liq_data else 0,
"short_usd": liq_data.get("short_usd", 0) if liq_data else 0,
},
}
# 始终输出direction供反向平仓判断不受冷却限制
result["direction"] = direction if not no_direction else None
if total_score >= 85 and not no_direction and not in_cooldown:
heavy_threshold = max(strategy_threshold + 10, 85)
if total_score >= heavy_threshold and not no_direction and not in_cooldown:
result["signal"] = direction
result["tier"] = "heavy"
elif total_score >= 75 and not no_direction and not in_cooldown:
elif total_score >= strategy_threshold and not no_direction and not in_cooldown:
result["signal"] = direction
result["tier"] = "standard"
elif total_score >= 60 and not no_direction and not in_cooldown:
result["signal"] = direction
result["tier"] = "light"
else:
result["signal"] = None
result["tier"] = None
if result["signal"]:
self.last_signal_ts = now_ms
self.last_signal_dir = direction
self.last_signal_ts[strategy_name] = now_ms
self.last_signal_dir[strategy_name] = direction
return result
@ -499,31 +650,60 @@ def save_indicator_1m(ts: int, symbol: str, result: dict):
# ─── 模拟盘 ──────────────────────────────────────────────────────
def paper_open_trade(symbol: str, direction: str, price: float, score: int, tier: str, atr: float, now_ms: int, factors: dict = None):
def paper_open_trade(
symbol: str,
direction: str,
price: float,
score: int,
tier: str,
atr: float,
now_ms: int,
factors: dict = None,
strategy: str = "v51_baseline",
tp_sl: Optional[dict] = None,
):
"""模拟开仓"""
import json as _json3
risk_atr = 0.7 * atr
if risk_atr <= 0:
return
sl_multiplier = float((tp_sl or {}).get("sl_multiplier", 2.0))
tp1_multiplier = float((tp_sl or {}).get("tp1_multiplier", 1.5))
tp2_multiplier = float((tp_sl or {}).get("tp2_multiplier", 3.0))
if direction == "LONG":
sl = price - 2.0 * risk_atr
tp1 = price + 1.5 * risk_atr
tp2 = price + 3.0 * risk_atr
sl = price - sl_multiplier * risk_atr
tp1 = price + tp1_multiplier * risk_atr
tp2 = price + tp2_multiplier * risk_atr
else:
sl = price + 2.0 * risk_atr
tp1 = price - 1.5 * risk_atr
tp2 = price - 3.0 * risk_atr
sl = price + sl_multiplier * risk_atr
tp1 = price - tp1_multiplier * risk_atr
tp2 = price - tp2_multiplier * risk_atr
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO paper_trades (symbol,direction,score,tier,entry_price,entry_ts,tp1_price,tp2_price,sl_price,atr_at_entry,score_factors) "
"VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
(symbol, direction, score, tier, price, now_ms, tp1, tp2, sl, atr,
_json3.dumps(factors) if factors else None)
"INSERT INTO paper_trades (symbol,direction,score,tier,entry_price,entry_ts,tp1_price,tp2_price,sl_price,atr_at_entry,score_factors,strategy) "
"VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
(
symbol,
direction,
score,
tier,
price,
now_ms,
tp1,
tp2,
sl,
atr,
_json3.dumps(factors) if factors else None,
strategy,
),
)
conn.commit()
logger.info(f"[{symbol}] 📝 模拟开仓: {direction} @ {price:.2f} score={score} tier={tier} TP1={tp1:.2f} TP2={tp2:.2f} SL={sl:.2f}")
logger.info(
f"[{symbol}] 📝 模拟开仓: {direction} @ {price:.2f} score={score} tier={tier} strategy={strategy} "
f"TP1={tp1:.2f} TP2={tp2:.2f} SL={sl:.2f}"
)
def paper_check_positions(symbol: str, current_price: float, now_ms: int):
@ -620,31 +800,53 @@ def paper_check_positions(symbol: str, current_price: float, now_ms: int):
conn.commit()
def paper_has_active_position(symbol: str) -> bool:
def paper_has_active_position(symbol: str, strategy: Optional[str] = None) -> bool:
"""检查该币种是否有活跃持仓"""
with get_sync_conn() as conn:
with conn.cursor() as cur:
if strategy:
cur.execute(
"SELECT COUNT(*) FROM paper_trades WHERE symbol=%s AND strategy=%s AND status IN ('active','tp1_hit')",
(symbol, strategy),
)
else:
cur.execute("SELECT COUNT(*) FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit')", (symbol,))
return cur.fetchone()[0] > 0
def paper_get_active_direction(symbol: str) -> str | None:
def paper_get_active_direction(symbol: str, strategy: Optional[str] = None) -> str | None:
"""获取该币种活跃持仓的方向无持仓返回None"""
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT direction FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit') LIMIT 1", (symbol,))
if strategy:
cur.execute(
"SELECT direction FROM paper_trades WHERE symbol=%s AND strategy=%s AND status IN ('active','tp1_hit') LIMIT 1",
(symbol, strategy),
)
else:
cur.execute(
"SELECT direction FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit') LIMIT 1",
(symbol,),
)
row = cur.fetchone()
return row[0] if row else None
def paper_close_by_signal(symbol: str, current_price: float, now_ms: int):
def paper_close_by_signal(symbol: str, current_price: float, now_ms: int, strategy: Optional[str] = None):
"""反向信号平仓:按当前价平掉该币种所有活跃仓位"""
with get_sync_conn() as conn:
with conn.cursor() as cur:
if strategy:
cur.execute(
"SELECT id, direction, entry_price, tp1_hit, atr_at_entry "
"FROM paper_trades WHERE symbol=%s AND strategy=%s AND status IN ('active','tp1_hit')",
(symbol, strategy),
)
else:
cur.execute(
"SELECT id, direction, entry_price, tp1_hit, atr_at_entry "
"FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit')",
(symbol,)
(symbol,),
)
positions = cur.fetchall()
for pos in positions:
@ -661,7 +863,10 @@ def paper_close_by_signal(symbol: str, current_price: float, now_ms: int):
"UPDATE paper_trades SET status='signal_flip', exit_price=%s, exit_ts=%s, pnl_r=%s WHERE id=%s",
(current_price, now_ms, round(pnl_r, 4), pid)
)
logger.info(f"[{symbol}] 📝 反向信号平仓: {direction} @ {current_price:.2f} pnl={pnl_r:+.2f}R")
logger.info(
f"[{symbol}] 📝 反向信号平仓: {direction} @ {current_price:.2f} pnl={pnl_r:+.2f}R"
f"{f' strategy={strategy}' if strategy else ''}"
)
conn.commit()
@ -677,6 +882,11 @@ def paper_active_count() -> int:
def main():
init_schema()
strategy_configs = load_strategy_configs()
strategy_names = [cfg.get("name", "unknown") for cfg in strategy_configs]
logger.info(f"已加载策略配置: {', '.join(strategy_names)}")
primary_strategy_name = "v52_8signals" if any(cfg.get("name") == "v52_8signals" for cfg in strategy_configs) else strategy_names[0]
states = {sym: SymbolState(sym) for sym in SYMBOLS}
for sym, state in states.items():
@ -699,35 +909,62 @@ def main():
state.process_trade(t["agg_id"], t["time_ms"], t["price"], t["qty"], t["is_buyer_maker"])
state.market_indicators = fetch_market_indicators(sym)
result = state.evaluate_signal(now_ms)
save_indicator(now_ms, sym, result)
snapshot = state.build_evaluation_snapshot(now_ms)
strategy_results: list[tuple[dict, dict]] = []
for strategy_cfg in strategy_configs:
strategy_result = state.evaluate_signal(now_ms, strategy_cfg=strategy_cfg, snapshot=snapshot)
strategy_results.append((strategy_cfg, strategy_result))
primary_result = strategy_results[0][1]
for strategy_cfg, strategy_result in strategy_results:
if strategy_cfg.get("name") == primary_strategy_name:
primary_result = strategy_result
break
save_indicator(now_ms, sym, primary_result)
bar_1m = (now_ms // 60000) * 60000
if last_1m_save.get(sym) != bar_1m:
save_indicator_1m(now_ms, sym, result)
save_indicator_1m(now_ms, sym, primary_result)
last_1m_save[sym] = bar_1m
# 反向信号平仓基于direction不受冷却限制score>=60才触发
# 反向信号平仓:按策略独立判断score>=75才触发
if PAPER_TRADING_ENABLED and warmup_cycles <= 0:
for strategy_cfg, result in strategy_results:
strategy_name = strategy_cfg.get("name", "v51_baseline")
eval_dir = result.get("direction")
existing_dir = paper_get_active_direction(sym)
if existing_dir and eval_dir and existing_dir != eval_dir and result["score"] >= 60:
paper_close_by_signal(sym, result["price"], now_ms)
logger.info(f"[{sym}] 📝 反向信号平仓: {existing_dir}{eval_dir} (score={result['score']})")
existing_dir = paper_get_active_direction(sym, strategy_name)
if existing_dir and eval_dir and existing_dir != eval_dir and result["score"] >= 75:
paper_close_by_signal(sym, result["price"], now_ms, strategy_name)
logger.info(
f"[{sym}] 📝 反向信号平仓[{strategy_name}]: {existing_dir}{eval_dir} "
f"(score={result['score']})"
)
for strategy_cfg, result in strategy_results:
strategy_name = strategy_cfg.get("name", "v51_baseline")
if result.get("signal"):
logger.info(f"[{sym}] 🚨 信号: {result['signal']} score={result['score']} price={result['price']:.1f}")
logger.info(
f"[{sym}] 🚨 信号[{strategy_name}]: {result['signal']} "
f"score={result['score']} price={result['price']:.1f}"
)
# 模拟盘开仓(需开关开启 + 跳过冷启动)
if PAPER_TRADING_ENABLED and warmup_cycles <= 0:
if not paper_has_active_position(sym):
if not paper_has_active_position(sym, strategy_name):
active_count = paper_active_count()
if active_count < PAPER_MAX_POSITIONS:
tier = result.get("tier", "standard")
paper_open_trade(
sym, result["signal"], result["price"],
result["score"], tier,
result["atr"], now_ms,
factors=result.get("factors")
sym,
result["signal"],
result["price"],
result["score"],
tier,
result["atr"],
now_ms,
factors=result.get("factors"),
strategy=strategy_name,
tp_sl=strategy_cfg.get("tp_sl"),
)
# 模拟盘持仓检查由paper_monitor.py通过WebSocket实时处理这里不再检查