arbitrage-engine/backend/signal_engine.py
fanziqi ad60a53262 review: add code audit annotations and REVIEW.md for v5.1
P0 issues annotated (critical, must fix before live trading):
- signal_engine.py: cooldown blocks reverse-signal position close
- paper_monitor.py + signal_engine.py: pnl_r 2x inflated for TP scenarios
- signal_engine.py: entry price uses 30min VWAP instead of real-time price
- paper_monitor.py + signal_engine.py: concurrent write race on paper_trades

P1 issues annotated (long-term stability):
- db.py: ensure_partitions uses timedelta(30d) causing missed monthly partitions
- signal_engine.py: float precision drift in buy_vol/sell_vol accumulation
- market_data_collector.py: single bare connection with no reconnect logic
- db.py: get_sync_pool initialization not thread-safe
- signal_engine.py: recent_large_trades deque has no maxlen

P2/P3 issues annotated across backend and frontend:
- coinbase_premium KeyError for XRP/SOL symbols
- liquidation_collector: redundant elif condition in aggregation logic
- auth.py: JWT secret hardcoded default, login rate-limit absent
- Frontend: concurrent refresh token race, AuthContext not synced on failure
- Frontend: universal catch{} swallows all API errors silently
- Frontend: serial API requests in LatestSignals, market-indicators over-polling

docs/REVIEW.md: comprehensive audit report with all 34 issues (P0×4, P1×5,
P2×6, P3×4 backend + FE-P1×4, FE-P2×8, FE-P3×3 frontend), fix suggestions
and prioritized remediation roadmap.
2026-03-01 17:14:52 +08:00

778 lines
35 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
signal_engine.py — V5 短线交易信号引擎PostgreSQL版
架构:
- 独立PM2进程每5秒循环
- 内存滚动窗口计算指标CVD/ATR/VWAP/大单阈值)
- 启动时回灌历史数据冷启动warmup
- 信号评估核心3条件+加分3条件
- 输出signal_indicators表 + signal_trades表 + Discord推送
指标:
- CVD_fast (30m滚动) / CVD_mid (4h滚动) / CVD_day (UTC日内)
- ATR (5m, 14周期)
- VWAP_30m
- 大单阈值 P95/P99 (24h滚动)
"""
import logging
import os
import time
from collections import deque
from datetime import datetime, timezone
from typing import Any, Optional
from db import get_sync_conn, init_schema
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler(os.path.join(os.path.dirname(__file__), "..", "signal-engine.log")),
],
)
logger = logging.getLogger("signal-engine")
SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"]
LOOP_INTERVAL = 15 # 秒从5改15CPU降60%,信号质量无影响)
# ─── 模拟盘配置 ───────────────────────────────────────────────────
PAPER_TRADING_ENABLED = False # 开关范总确认后通过API开启
PAPER_INITIAL_BALANCE = 10000 # 虚拟初始资金 USDT
PAPER_RISK_PER_TRADE = 0.02 # 单笔风险 2%即200U
PAPER_MAX_POSITIONS = 4 # 最大同时持仓数
PAPER_TIER_MULTIPLIER = { # 档位仓位倍数
"light": 0.5, # 轻仓: 1%
"standard": 1.0, # 标准: 2%
"heavy": 1.5, # 加仓: 3%
}
PAPER_FEE_RATE = 0.0005 # Taker手续费 0.05%(开仓+平仓各一次)
def load_paper_config():
"""从配置文件加载模拟盘开关和参数"""
global PAPER_TRADING_ENABLED, PAPER_INITIAL_BALANCE, PAPER_RISK_PER_TRADE, PAPER_MAX_POSITIONS
config_path = os.path.join(os.path.dirname(__file__), "paper_config.json")
try:
with open(config_path, "r") as f:
import json as _json2
cfg = _json2.load(f)
PAPER_TRADING_ENABLED = cfg.get("enabled", False)
PAPER_INITIAL_BALANCE = cfg.get("initial_balance", 10000)
PAPER_RISK_PER_TRADE = cfg.get("risk_per_trade", 0.02)
PAPER_MAX_POSITIONS = cfg.get("max_positions", 4)
except FileNotFoundError:
pass
# ─────────────────────────────────────────────────────────────────
# 窗口大小(毫秒)
WINDOW_FAST = 30 * 60 * 1000 # 30分钟
WINDOW_MID = 4 * 3600 * 1000 # 4小时
WINDOW_DAY = 24 * 3600 * 1000 # 24小时
WINDOW_VWAP = 30 * 60 * 1000 # 30分钟
# ATR参数
ATR_PERIOD_MS = 5 * 60 * 1000
ATR_LENGTH = 14
# 信号冷却
# [REVIEW] P3 | 文档说冷却300秒(5分钟)代码是10分钟与PROJECT.md不一致请确认
COOLDOWN_MS = 10 * 60 * 1000
def fetch_market_indicators(symbol: str) -> dict:
"""从PG读取最新的market_indicators数据解析JSONB提取关键数值"""
import json as _json
with get_sync_conn() as conn:
with conn.cursor() as cur:
indicators = {}
for ind_type in ["long_short_ratio", "top_trader_position", "open_interest_hist", "coinbase_premium"]:
cur.execute(
"SELECT value FROM market_indicators WHERE symbol=%s AND indicator_type=%s ORDER BY timestamp_ms DESC LIMIT 1",
(symbol, ind_type),
)
row = cur.fetchone()
if not row or row[0] is None:
indicators[ind_type] = None
continue
# value可能是JSON字符串或已解析的dict
val = row[0]
if isinstance(val, str):
try:
val = _json.loads(val)
except Exception:
indicators[ind_type] = None
continue
# 提取关键数值
if ind_type == "long_short_ratio":
indicators[ind_type] = float(val.get("longShortRatio", 1.0))
elif ind_type == "top_trader_position":
indicators[ind_type] = float(val.get("longAccount", 0.5))
elif ind_type == "open_interest_hist":
indicators[ind_type] = float(val.get("sumOpenInterestValue", 0))
elif ind_type == "coinbase_premium":
indicators[ind_type] = float(val.get("premium_pct", 0))
return indicators
def to_float(value: Any) -> Optional[float]:
try:
return float(value) if value is not None else None
except (TypeError, ValueError):
return None
# ─── 滚动窗口 ───────────────────────────────────────────────────
class TradeWindow:
def __init__(self, window_ms: int):
self.window_ms = window_ms
self.trades: deque = deque()
self.buy_vol = 0.0
self.sell_vol = 0.0
self.pq_sum = 0.0
self.q_sum = 0.0
def add(self, time_ms: int, qty: float, price: float, is_buyer_maker: int):
self.trades.append((time_ms, qty, price, is_buyer_maker))
pq = price * qty
self.pq_sum += pq
self.q_sum += qty
if is_buyer_maker == 0:
self.buy_vol += qty
else:
self.sell_vol += qty
def trim(self, now_ms: int):
cutoff = now_ms - self.window_ms
while self.trades and self.trades[0][0] < cutoff:
t_ms, qty, price, ibm = self.trades.popleft()
# [REVIEW] P1 | 浮点精度漂移buy_vol/sell_vol经历数千万次加减后会累积误差
# 7000万条记录后 CVD 可能漂移数百至数千单位
# 建议:定期用 sum(t[2] if t[3]==0 else 0 for t in self.trades) 重算
self.pq_sum -= price * qty
self.q_sum -= qty
if ibm == 0:
self.buy_vol -= qty
else:
self.sell_vol -= qty
@property
def cvd(self) -> float:
return self.buy_vol - self.sell_vol
@property
def vwap(self) -> float:
return self.pq_sum / self.q_sum if self.q_sum > 0 else 0.0
class ATRCalculator:
def __init__(self, period_ms: int = ATR_PERIOD_MS, length: int = ATR_LENGTH):
self.period_ms = period_ms
self.length = length
self.candles: deque = deque(maxlen=length + 1)
self.current_candle: Optional[dict] = None
self.atr_history: deque = deque(maxlen=288)
def update(self, time_ms: int, price: float):
bar_ms = (time_ms // self.period_ms) * self.period_ms
if self.current_candle is None or self.current_candle["bar"] != bar_ms:
if self.current_candle is not None:
self.candles.append(self.current_candle)
self.current_candle = {"bar": bar_ms, "open": price, "high": price, "low": price, "close": price}
else:
c = self.current_candle
c["high"] = max(c["high"], price)
c["low"] = min(c["low"], price)
c["close"] = price
@property
def atr(self) -> float:
if len(self.candles) < 2:
return 0.0
trs = []
candles_list = list(self.candles)
for i in range(1, len(candles_list)):
prev_close = candles_list[i-1]["close"]
c = candles_list[i]
tr = max(c["high"] - c["low"], abs(c["high"] - prev_close), abs(c["low"] - prev_close))
trs.append(tr)
if not trs:
return 0.0
atr_val = trs[0]
for tr in trs[1:]:
atr_val = (atr_val * (self.length - 1) + tr) / self.length
return atr_val
@property
def atr_percentile(self) -> float:
current = self.atr
if current == 0:
return 50.0
# [REVIEW] P2 | @property 含副作用append若多次调用会重复追加相同ATR值
# 建议将 append 移到显式的 update() 方法中property只读不写
self.atr_history.append(current)
if len(self.atr_history) < 10:
return 50.0
sorted_hist = sorted(self.atr_history)
rank = sum(1 for x in sorted_hist if x <= current)
return (rank / len(sorted_hist)) * 100
class SymbolState:
def __init__(self, symbol: str):
self.symbol = symbol
self.win_fast = TradeWindow(WINDOW_FAST)
self.win_mid = TradeWindow(WINDOW_MID)
self.win_day = TradeWindow(WINDOW_DAY)
self.win_vwap = TradeWindow(WINDOW_VWAP)
self.atr_calc = ATRCalculator()
self.last_processed_id = 0
self.warmup = True
self.prev_cvd_fast = 0.0
self.prev_cvd_fast_slope = 0.0
self.prev_oi_value = 0.0
self.market_indicators = fetch_market_indicators(symbol)
self.last_signal_ts = 0
self.last_signal_dir = ""
# [REVIEW] P1 | recent_large_trades 无 maxlen 限制
# 极端行情如全市场P99涌现期间15分钟窗口内的大单数量可能非常多
# 建议改为 deque(maxlen=500) 防止内存失控
self.recent_large_trades: deque = deque()
def process_trade(self, agg_id: int, time_ms: int, price: float, qty: float, is_buyer_maker: int):
now_ms = time_ms
self.win_fast.add(time_ms, qty, price, is_buyer_maker)
self.win_mid.add(time_ms, qty, price, is_buyer_maker)
self.win_day.add(time_ms, qty, price, is_buyer_maker)
self.win_vwap.add(time_ms, qty, price, is_buyer_maker)
self.atr_calc.update(time_ms, price)
self.win_fast.trim(now_ms)
self.win_mid.trim(now_ms)
self.win_day.trim(now_ms)
self.win_vwap.trim(now_ms)
self.last_processed_id = agg_id
def compute_p95_p99(self) -> tuple:
if len(self.win_day.trades) < 100:
return 5.0, 10.0
qtys = sorted([t[1] for t in self.win_day.trades])
n = len(qtys)
p95 = qtys[int(n * 0.95)]
p99 = qtys[int(n * 0.99)]
if "BTC" in self.symbol:
p95 = max(p95, 5.0); p99 = max(p99, 10.0)
else:
p95 = max(p95, 50.0); p99 = max(p99, 100.0)
return p95, p99
def update_large_trades(self, now_ms: int, p99: float):
cutoff = now_ms - 15 * 60 * 1000
while self.recent_large_trades and self.recent_large_trades[0][0] < cutoff:
self.recent_large_trades.popleft()
# 只检查新trade避免重复添加
seen = set(t[0] for t in self.recent_large_trades) # time_ms作为去重key
for t in self.win_fast.trades:
if t[1] >= p99 and t[0] > cutoff and t[0] not in seen:
self.recent_large_trades.append((t[0], t[1], t[3]))
seen.add(t[0])
def evaluate_signal(self, now_ms: int) -> dict:
cvd_fast = self.win_fast.cvd
cvd_mid = self.win_mid.cvd
vwap = self.win_vwap.vwap
atr = self.atr_calc.atr
atr_pct = self.atr_calc.atr_percentile
p95, p99 = self.compute_p95_p99()
self.update_large_trades(now_ms, p99)
# [REVIEW] P0 | 开仓价使用30分钟VWAP而非当前实时价格
# VWAP是过去30分钟的均价在强趋势行情中可能与当前市价相差1%以上
# 实盘接入时必须改用最新aggTrade的price或从Binance ticker获取mark price
# 当前VWAP偏差导致TP1/TP2/SL价位均基于历史均价计算不是真实的市价入场价
price = vwap if vwap > 0 else 0
cvd_fast_slope = cvd_fast - self.prev_cvd_fast
cvd_fast_accel = cvd_fast_slope - self.prev_cvd_fast_slope
self.prev_cvd_fast = cvd_fast
self.prev_cvd_fast_slope = cvd_fast_slope
result = {
"cvd_fast": cvd_fast, "cvd_mid": cvd_mid, "cvd_day": self.win_day.cvd,
"cvd_fast_slope": cvd_fast_slope,
"atr": atr, "atr_pct": atr_pct, "vwap": vwap, "price": price,
"p95": p95, "p99": p99, "signal": None, "direction": None, "score": 0,
"tier": None,
"factors": {},
}
if self.warmup or price == 0 or atr == 0:
return result
# 判断倾向方向(用于评分展示,即使冷却或方向不一致也计算)
no_direction = False
# [REVIEW] P0 | 冷却期阻断反向信号,导致反向平仓无法触发
# 问题COOLDOWN_MS内任何方向的信号都被置为None见下方 result["signal"] = None
# 后果LONG开仓后冷却期内出现强烈SHORT信号 → result["signal"]=None
# → 主循环 if result.get("signal") 为False → paper_close_by_signal 不执行
# → LONG仓位在强反向信号面前继续亏损直到SL或超时才关闭
# 修复方案:反向信号应绕过冷却,或在主循环中单独基于方向判断触发反向平仓
in_cooldown = (now_ms - self.last_signal_ts < COOLDOWN_MS)
if cvd_fast > 0 and cvd_mid > 0:
direction = "LONG"
elif cvd_fast < 0 and cvd_mid < 0:
direction = "SHORT"
else:
direction = "LONG" if cvd_fast > 0 else "SHORT"
no_direction = True
# V5.1 五层评分体系总分100方向层可因加速额外+5
# 1) 方向层45分 + 5加速分
direction_score = 0
if (direction == "LONG" and cvd_fast > 0) or (direction == "SHORT" and cvd_fast < 0):
direction_score += 15
if (direction == "LONG" and cvd_mid > 0) or (direction == "SHORT" and cvd_mid < 0):
direction_score += 15
has_adverse_p99 = any(
(direction == "LONG" and lt[2] == 1) or (direction == "SHORT" and lt[2] == 0)
for lt in self.recent_large_trades
)
has_aligned_p99 = any(
(direction == "LONG" and lt[2] == 0) or (direction == "SHORT" and lt[2] == 1)
for lt in self.recent_large_trades
)
if has_aligned_p99:
direction_score += 15
elif not has_adverse_p99:
direction_score += 10
accel_bonus = 0
if (direction == "LONG" and cvd_fast_accel > 0) or (direction == "SHORT" and cvd_fast_accel < 0):
accel_bonus = 5
# 2) 拥挤层20分- market_indicators缺失时给中间分
long_short_ratio = to_float(self.market_indicators.get("long_short_ratio"))
if long_short_ratio is None:
ls_score = 5
elif (direction == "SHORT" and long_short_ratio > 2.0) or (direction == "LONG" and long_short_ratio < 0.5):
ls_score = 10
else:
ls_score = 5
top_trader_position = to_float(self.market_indicators.get("top_trader_position"))
if top_trader_position is None:
top_trader_score = 5
else:
if direction == "LONG":
if top_trader_position >= 0.55:
top_trader_score = 10
elif top_trader_position <= 0.45:
top_trader_score = 0
else:
top_trader_score = 5
else:
if top_trader_position <= 0.45:
top_trader_score = 10
elif top_trader_position >= 0.55:
top_trader_score = 0
else:
top_trader_score = 5
crowding_score = ls_score + top_trader_score
# 3) 环境层15分— OI变化率
oi_value = to_float(self.market_indicators.get("open_interest_hist"))
if oi_value is None or self.prev_oi_value == 0:
environment_score = 10
oi_change = 0.0
else:
oi_change = (oi_value - self.prev_oi_value) / self.prev_oi_value if self.prev_oi_value > 0 else 0
if oi_change >= 0.03:
environment_score = 15
elif oi_change > 0:
environment_score = 10
else:
environment_score = 5
if oi_value is not None and oi_value > 0:
self.prev_oi_value = oi_value
# 4) 确认层15分
confirmation_score = 15 if ((direction == "LONG" and cvd_fast > 0 and cvd_mid > 0) or (direction == "SHORT" and cvd_fast < 0 and cvd_mid < 0)) else 0
# 5) 辅助层5分
coinbase_premium = to_float(self.market_indicators.get("coinbase_premium"))
if coinbase_premium is None:
aux_score = 2
elif (direction == "LONG" and coinbase_premium > 0.0005) or (direction == "SHORT" and coinbase_premium < -0.0005):
aux_score = 5
elif abs(coinbase_premium) <= 0.0005:
aux_score = 2
else:
aux_score = 0
total_score = direction_score + accel_bonus + crowding_score + environment_score + confirmation_score + aux_score
result["score"] = total_score
result["direction"] = direction
result["factors"] = {
"direction": {
"score": direction_score,
"cvd_fast": 15 if ((direction == "LONG" and cvd_fast > 0) or (direction == "SHORT" and cvd_fast < 0)) else 0,
"cvd_mid": 15 if ((direction == "LONG" and cvd_mid > 0) or (direction == "SHORT" and cvd_mid < 0)) else 0,
"p99_flow": 15 if has_aligned_p99 else (10 if not has_adverse_p99 else 0),
"accel_bonus": accel_bonus,
},
"crowding": {"score": crowding_score, "long_short_ratio": ls_score, "top_trader_position": top_trader_score},
"environment": {"score": environment_score, "open_interest_hist": oi_change},
"confirmation": {"score": confirmation_score},
"auxiliary": {"score": aux_score, "coinbase_premium": coinbase_premium},
}
if total_score >= 85 and not no_direction and not in_cooldown:
result["signal"] = direction
result["tier"] = "heavy"
elif total_score >= 75 and not no_direction and not in_cooldown:
result["signal"] = direction
result["tier"] = "standard"
elif total_score >= 60 and not no_direction and not in_cooldown:
result["signal"] = direction
result["tier"] = "light"
else:
result["signal"] = None
result["tier"] = None
if result["signal"]:
self.last_signal_ts = now_ms
self.last_signal_dir = direction
return result
# ─── PG DB操作 ───────────────────────────────────────────────────
def load_historical(state: SymbolState, window_ms: int):
now_ms = int(time.time() * 1000)
start_ms = now_ms - window_ms
count = 0
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT agg_id, price, qty, time_ms, is_buyer_maker FROM agg_trades "
"WHERE symbol = %s AND time_ms >= %s ORDER BY agg_id ASC",
(state.symbol, start_ms)
)
while True:
rows = cur.fetchmany(5000)
if not rows:
break
for r in rows:
state.process_trade(r[0], r[3], r[1], r[2], r[4])
count += 1
logger.info(f"[{state.symbol}] 冷启动完成: 加载{count:,}条历史数据 (窗口={window_ms//3600000}h)")
state.warmup = False
def fetch_new_trades(symbol: str, last_id: int) -> list:
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT agg_id, price, qty, time_ms, is_buyer_maker FROM agg_trades "
"WHERE symbol = %s AND agg_id > %s ORDER BY agg_id ASC LIMIT 10000",
(symbol, last_id)
)
return [{"agg_id": r[0], "price": r[1], "qty": r[2], "time_ms": r[3], "is_buyer_maker": r[4]}
for r in cur.fetchall()]
def save_indicator(ts: int, symbol: str, result: dict):
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO signal_indicators "
"(ts,symbol,cvd_fast,cvd_mid,cvd_day,cvd_fast_slope,atr_5m,atr_percentile,vwap_30m,price,p95_qty,p99_qty,score,signal) "
"VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
(ts, symbol, result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["cvd_fast_slope"],
result["atr"], result["atr_pct"], result["vwap"], result["price"],
result["p95"], result["p99"], result["score"], result.get("signal"))
)
conn.commit()
def save_indicator_1m(ts: int, symbol: str, result: dict):
bar_ts = (ts // 60000) * 60000
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT id FROM signal_indicators_1m WHERE ts=%s AND symbol=%s", (bar_ts, symbol))
if cur.fetchone():
cur.execute(
"UPDATE signal_indicators_1m SET cvd_fast=%s,cvd_mid=%s,cvd_day=%s,atr_5m=%s,vwap_30m=%s,price=%s,score=%s,signal=%s WHERE ts=%s AND symbol=%s",
(result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["atr"], result["vwap"],
result["price"], result["score"], result.get("signal"), bar_ts, symbol)
)
else:
cur.execute(
"INSERT INTO signal_indicators_1m (ts,symbol,cvd_fast,cvd_mid,cvd_day,atr_5m,vwap_30m,price,score,signal) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
(bar_ts, symbol, result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["atr"],
result["vwap"], result["price"], result["score"], result.get("signal"))
)
conn.commit()
# ─── 模拟盘 ──────────────────────────────────────────────────────
def paper_open_trade(symbol: str, direction: str, price: float, score: int, tier: str, atr: float, now_ms: int, factors: dict = None):
"""模拟开仓"""
import json as _json3
risk_atr = 0.7 * atr
if risk_atr <= 0:
return
# [REVIEW] P0 | pnl_r 计算基准与TP/SL设置不一致导致TP场景收益率虚高2倍
# risk_distance (1R) = 2.0 * risk_atr = 1.4 * ATR
# TP1 = 1.5 * risk_atr = 0.75 * risk_distance = 0.75R而非1.5R
# TP2 = 3.0 * risk_atr = 1.5 * risk_distance = 1.5R而非3.0R
# 但 paper_monitor.py 的 pnl_r 写死为 0.5*1.5=0.75R 和 0.5*1.5+0.5*3.0=2.25R
# 正确值应为TP1半仓=0.375R全TP=1.125R
# 导致balance/equity_curve/统计指标全部虚高约2倍
# 修复pnl_r 统一用 (exit_price - entry_price) / risk_distance 计算参考timeout逻辑
if direction == "LONG":
sl = price - 2.0 * risk_atr
tp1 = price + 1.5 * risk_atr
tp2 = price + 3.0 * risk_atr
else:
sl = price + 2.0 * risk_atr
tp1 = price - 1.5 * risk_atr
tp2 = price - 3.0 * risk_atr
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO paper_trades (symbol,direction,score,tier,entry_price,entry_ts,tp1_price,tp2_price,sl_price,atr_at_entry,score_factors) "
"VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
(symbol, direction, score, tier, price, now_ms, tp1, tp2, sl, atr,
_json3.dumps(factors) if factors else None)
)
conn.commit()
logger.info(f"[{symbol}] 📝 模拟开仓: {direction} @ {price:.2f} score={score} tier={tier} TP1={tp1:.2f} TP2={tp2:.2f} SL={sl:.2f}")
# [REVIEW] P1 | paper_check_positions 当前已被注释掉主循环717行但代码仍存在
# 该函数与 paper_monitor.py 的 check_and_close 逻辑重复,应删除或明确废弃
# 若未来重新启用例如paper_monitor挂掉时的fallback需同步修复pnl_r计算错误
def paper_check_positions(symbol: str, current_price: float, now_ms: int):
"""检查模拟盘持仓的止盈止损"""
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT id, direction, entry_price, tp1_price, tp2_price, sl_price, tp1_hit, entry_ts, atr_at_entry "
"FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit') ORDER BY id",
(symbol,)
)
positions = cur.fetchall()
for pos in positions:
pid, direction, entry_price, tp1, tp2, sl, tp1_hit, entry_ts, atr_entry = pos
closed = False
new_status = None
pnl_r = 0.0
if direction == "LONG":
if current_price <= sl:
closed = True
if tp1_hit:
new_status = "sl_be"
pnl_r = 0.5 * 1.5
else:
new_status = "sl"
pnl_r = -1.0
elif not tp1_hit and current_price >= tp1:
# TP1触发移动止损到成本价
new_sl = entry_price * 1.0005
cur.execute("UPDATE paper_trades SET tp1_hit=TRUE, sl_price=%s, status='tp1_hit' WHERE id=%s", (new_sl, pid))
logger.info(f"[{symbol}] 📝 TP1触发 LONG @ {current_price:.2f}, SL移至成本{new_sl:.2f}")
elif tp1_hit and current_price >= tp2:
closed = True
new_status = "tp"
pnl_r = 0.5 * 1.5 + 0.5 * 3.0 # 2.25R
else: # SHORT
if current_price >= sl:
closed = True
if tp1_hit:
new_status = "sl_be"
pnl_r = 0.5 * 1.5
else:
new_status = "sl"
pnl_r = -1.0
elif not tp1_hit and current_price <= tp1:
new_sl = entry_price * 0.9995
cur.execute("UPDATE paper_trades SET tp1_hit=TRUE, sl_price=%s, status='tp1_hit' WHERE id=%s", (new_sl, pid))
logger.info(f"[{symbol}] 📝 TP1触发 SHORT @ {current_price:.2f}, SL移至成本{new_sl:.2f}")
elif tp1_hit and current_price <= tp2:
closed = True
new_status = "tp"
pnl_r = 2.25
# 时间止损60分钟
if not closed and (now_ms - entry_ts > 60 * 60 * 1000):
closed = True
new_status = "timeout"
if direction == "LONG":
move = (current_price - entry_price) / entry_price
else:
move = (entry_price - current_price) / entry_price
risk_pct = abs(sl - entry_price) / entry_price
pnl_r = move / risk_pct if risk_pct > 0 else 0
if tp1_hit:
pnl_r = max(pnl_r, 0.5 * 1.5)
if closed:
# 扣除手续费(开仓+平仓各Taker 0.05%
risk_distance = 2.0 * 0.7 * atr_entry if atr_entry > 0 else 1
fee_r = (2 * PAPER_FEE_RATE * entry_price) / risk_distance if risk_distance > 0 else 0
pnl_r -= fee_r
# [REVIEW] P0 | 无 SELECT FOR UPDATE与 paper_monitor.py 存在竞态
# signal_engine 的 paper_close_by_signal 和 paper_monitor 的 check_and_close
# 可能同时对同一行执行 UPDATE后者覆盖前者status/pnl_r 将不一致
# 修复:使用 SELECT ... FOR UPDATE SKIP LOCKED 或在应用层加互斥锁
cur.execute(
"UPDATE paper_trades SET status=%s, exit_price=%s, exit_ts=%s, pnl_r=%s WHERE id=%s",
(new_status, current_price, now_ms, round(pnl_r, 4), pid)
)
logger.info(f"[{symbol}] 📝 模拟平仓: {direction} @ {current_price:.2f} status={new_status} pnl={pnl_r:+.2f}R (fee={fee_r:.3f}R)")
conn.commit()
def paper_has_active_position(symbol: str) -> bool:
"""检查该币种是否有活跃持仓"""
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit')", (symbol,))
return cur.fetchone()[0] > 0
def paper_get_active_direction(symbol: str) -> str | None:
"""获取该币种活跃持仓的方向无持仓返回None"""
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT direction FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit') LIMIT 1", (symbol,))
row = cur.fetchone()
return row[0] if row else None
def paper_close_by_signal(symbol: str, current_price: float, now_ms: int):
"""反向信号平仓:按当前价平掉该币种所有活跃仓位"""
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT id, direction, entry_price, tp1_hit, atr_at_entry "
"FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit')",
(symbol,)
)
positions = cur.fetchall()
for pos in positions:
pid, direction, entry_price, tp1_hit, atr_entry = pos
risk_distance = 2.0 * 0.7 * atr_entry if atr_entry > 0 else 1
if direction == "LONG":
pnl_r = (current_price - entry_price) / risk_distance if risk_distance > 0 else 0
else:
pnl_r = (entry_price - current_price) / risk_distance if risk_distance > 0 else 0
# 扣手续费
fee_r = (2 * PAPER_FEE_RATE * entry_price) / risk_distance if risk_distance > 0 else 0
pnl_r -= fee_r
cur.execute(
"UPDATE paper_trades SET status='signal_flip', exit_price=%s, exit_ts=%s, pnl_r=%s WHERE id=%s",
(current_price, now_ms, round(pnl_r, 4), pid)
)
logger.info(f"[{symbol}] 📝 反向信号平仓: {direction} @ {current_price:.2f} pnl={pnl_r:+.2f}R")
conn.commit()
def paper_active_count() -> int:
"""当前所有币种活跃持仓总数"""
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM paper_trades WHERE status IN ('active','tp1_hit')")
return cur.fetchone()[0]
# ─── 主循环 ──────────────────────────────────────────────────────
def main():
init_schema()
states = {sym: SymbolState(sym) for sym in SYMBOLS}
for sym, state in states.items():
load_historical(state, WINDOW_MID)
logger.info("=== Signal Engine (PG) 启动完成 ===")
last_1m_save = {}
cycle = 0
warmup_cycles = 3 # 启动后跳过前3轮45秒避免冷启动信号开仓
while True:
try:
now_ms = int(time.time() * 1000)
# 每轮重新加载配置支持API热更新开关
load_paper_config()
for sym, state in states.items():
new_trades = fetch_new_trades(sym, state.last_processed_id)
for t in new_trades:
state.process_trade(t["agg_id"], t["time_ms"], t["price"], t["qty"], t["is_buyer_maker"])
state.market_indicators = fetch_market_indicators(sym)
result = state.evaluate_signal(now_ms)
save_indicator(now_ms, sym, result)
bar_1m = (now_ms // 60000) * 60000
if last_1m_save.get(sym) != bar_1m:
save_indicator_1m(now_ms, sym, result)
last_1m_save[sym] = bar_1m
if result.get("signal"):
logger.info(f"[{sym}] 🚨 信号: {result['signal']} score={result['score']} price={result['price']:.1f}")
# 模拟盘开仓(需开关开启 + 跳过冷启动)
if PAPER_TRADING_ENABLED and warmup_cycles <= 0:
existing_dir = paper_get_active_direction(sym)
new_dir = result["signal"]
if existing_dir and existing_dir != new_dir:
# 反向信号:先平掉现有仓位
# [REVIEW] P0 | 此处永远不会被触发(冷却期内的反向信号)
# 因为 in_cooldown=True 时 result["signal"]=None
# 外层 if result.get("signal") 为False不会进入此代码块
# 修复:在 evaluate_signal 中不因冷却屏蔽信号方向,
# 或在主循环中基于 direction而非signal判断是否需要反向平仓
paper_close_by_signal(sym, result["price"], now_ms)
logger.info(f"[{sym}] 📝 反向信号平仓: {existing_dir}{new_dir}")
if not paper_has_active_position(sym):
active_count = paper_active_count()
if active_count < PAPER_MAX_POSITIONS:
tier = result.get("tier", "standard")
paper_open_trade(
sym, result["signal"], result["price"],
result["score"], tier,
result["atr"], now_ms,
factors=result.get("factors")
)
# 模拟盘持仓检查由paper_monitor.py通过WebSocket实时处理这里不再检查
cycle += 1
if warmup_cycles > 0:
warmup_cycles -= 1
if warmup_cycles == 0:
logger.info("冷启动保护期结束,模拟盘开仓已启用")
if cycle % 60 == 0:
for sym, state in states.items():
logger.info(
f"[{sym}] 状态: CVD_fast={state.win_fast.cvd:.1f} "
f"CVD_mid={state.win_mid.cvd:.1f} ATR={state.atr_calc.atr:.2f} "
f"({state.atr_calc.atr_percentile:.0f}%) VWAP={state.win_vwap.vwap:.1f}"
)
except Exception as e:
logger.error(f"循环异常: {e}", exc_info=True)
time.sleep(LOOP_INTERVAL)
if __name__ == "__main__":
main()