arbitrage-engine/backend/signal_engine.py
root 2f9dce483c fix: simulate limit orders for TP/SL (match real trading)
- TP/SL now exit at order price (limit order), not market price
- SL exits at sl_price, TP1 at tp1_price, TP2 at tp2_price
- Only timeout and signal_flip use market price (current price)
- Updated fix_historical_pnl.py to also correct exit_price
- This eliminates fake slippage in paper trading stats
2026-03-01 09:40:00 +00:00

756 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
signal_engine.py — V5 短线交易信号引擎PostgreSQL版
架构:
- 独立PM2进程每5秒循环
- 内存滚动窗口计算指标CVD/ATR/VWAP/大单阈值)
- 启动时回灌历史数据冷启动warmup
- 信号评估核心3条件+加分3条件
- 输出signal_indicators表 + signal_trades表 + Discord推送
指标:
- CVD_fast (30m滚动) / CVD_mid (4h滚动) / CVD_day (UTC日内)
- ATR (5m, 14周期)
- VWAP_30m
- 大单阈值 P95/P99 (24h滚动)
"""
import logging
import os
import time
from collections import deque
from datetime import datetime, timezone
from typing import Any, Optional
from db import get_sync_conn, init_schema
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler(os.path.join(os.path.dirname(__file__), "..", "signal-engine.log")),
],
)
logger = logging.getLogger("signal-engine")
SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"]
LOOP_INTERVAL = 15 # 秒从5改15CPU降60%,信号质量无影响)
# ─── 模拟盘配置 ───────────────────────────────────────────────────
PAPER_TRADING_ENABLED = False # 开关范总确认后通过API开启
PAPER_INITIAL_BALANCE = 10000 # 虚拟初始资金 USDT
PAPER_RISK_PER_TRADE = 0.02 # 单笔风险 2%即200U
PAPER_MAX_POSITIONS = 4 # 最大同时持仓数
PAPER_TIER_MULTIPLIER = { # 档位仓位倍数
"light": 0.5, # 轻仓: 1%
"standard": 1.0, # 标准: 2%
"heavy": 1.5, # 加仓: 3%
}
PAPER_FEE_RATE = 0.0005 # Taker手续费 0.05%(开仓+平仓各一次)
def load_paper_config():
"""从配置文件加载模拟盘开关和参数"""
global PAPER_TRADING_ENABLED, PAPER_INITIAL_BALANCE, PAPER_RISK_PER_TRADE, PAPER_MAX_POSITIONS
config_path = os.path.join(os.path.dirname(__file__), "paper_config.json")
try:
with open(config_path, "r") as f:
import json as _json2
cfg = _json2.load(f)
PAPER_TRADING_ENABLED = cfg.get("enabled", False)
PAPER_INITIAL_BALANCE = cfg.get("initial_balance", 10000)
PAPER_RISK_PER_TRADE = cfg.get("risk_per_trade", 0.02)
PAPER_MAX_POSITIONS = cfg.get("max_positions", 4)
except FileNotFoundError:
pass
# ─────────────────────────────────────────────────────────────────
# 窗口大小(毫秒)
WINDOW_FAST = 30 * 60 * 1000 # 30分钟
WINDOW_MID = 4 * 3600 * 1000 # 4小时
WINDOW_DAY = 24 * 3600 * 1000 # 24小时
WINDOW_VWAP = 30 * 60 * 1000 # 30分钟
# ATR参数
ATR_PERIOD_MS = 5 * 60 * 1000
ATR_LENGTH = 14
# 信号冷却
COOLDOWN_MS = 10 * 60 * 1000
def fetch_market_indicators(symbol: str) -> dict:
"""从PG读取最新的market_indicators数据解析JSONB提取关键数值"""
import json as _json
with get_sync_conn() as conn:
with conn.cursor() as cur:
indicators = {}
for ind_type in ["long_short_ratio", "top_trader_position", "open_interest_hist", "coinbase_premium"]:
cur.execute(
"SELECT value FROM market_indicators WHERE symbol=%s AND indicator_type=%s ORDER BY timestamp_ms DESC LIMIT 1",
(symbol, ind_type),
)
row = cur.fetchone()
if not row or row[0] is None:
indicators[ind_type] = None
continue
# value可能是JSON字符串或已解析的dict
val = row[0]
if isinstance(val, str):
try:
val = _json.loads(val)
except Exception:
indicators[ind_type] = None
continue
# 提取关键数值
if ind_type == "long_short_ratio":
indicators[ind_type] = float(val.get("longShortRatio", 1.0))
elif ind_type == "top_trader_position":
indicators[ind_type] = float(val.get("longAccount", 0.5))
elif ind_type == "open_interest_hist":
indicators[ind_type] = float(val.get("sumOpenInterestValue", 0))
elif ind_type == "coinbase_premium":
indicators[ind_type] = float(val.get("premium_pct", 0))
return indicators
def to_float(value: Any) -> Optional[float]:
try:
return float(value) if value is not None else None
except (TypeError, ValueError):
return None
# ─── 滚动窗口 ───────────────────────────────────────────────────
class TradeWindow:
def __init__(self, window_ms: int):
self.window_ms = window_ms
self.trades: deque = deque()
self.buy_vol = 0.0
self.sell_vol = 0.0
self.pq_sum = 0.0
self.q_sum = 0.0
def add(self, time_ms: int, qty: float, price: float, is_buyer_maker: int):
self.trades.append((time_ms, qty, price, is_buyer_maker))
pq = price * qty
self.pq_sum += pq
self.q_sum += qty
if is_buyer_maker == 0:
self.buy_vol += qty
else:
self.sell_vol += qty
def trim(self, now_ms: int):
cutoff = now_ms - self.window_ms
while self.trades and self.trades[0][0] < cutoff:
t_ms, qty, price, ibm = self.trades.popleft()
self.pq_sum -= price * qty
self.q_sum -= qty
if ibm == 0:
self.buy_vol -= qty
else:
self.sell_vol -= qty
@property
def cvd(self) -> float:
return self.buy_vol - self.sell_vol
@property
def vwap(self) -> float:
return self.pq_sum / self.q_sum if self.q_sum > 0 else 0.0
class ATRCalculator:
def __init__(self, period_ms: int = ATR_PERIOD_MS, length: int = ATR_LENGTH):
self.period_ms = period_ms
self.length = length
self.candles: deque = deque(maxlen=length + 1)
self.current_candle: Optional[dict] = None
self.atr_history: deque = deque(maxlen=288)
def update(self, time_ms: int, price: float):
bar_ms = (time_ms // self.period_ms) * self.period_ms
if self.current_candle is None or self.current_candle["bar"] != bar_ms:
if self.current_candle is not None:
self.candles.append(self.current_candle)
self.current_candle = {"bar": bar_ms, "open": price, "high": price, "low": price, "close": price}
else:
c = self.current_candle
c["high"] = max(c["high"], price)
c["low"] = min(c["low"], price)
c["close"] = price
@property
def atr(self) -> float:
if len(self.candles) < 2:
return 0.0
trs = []
candles_list = list(self.candles)
for i in range(1, len(candles_list)):
prev_close = candles_list[i-1]["close"]
c = candles_list[i]
tr = max(c["high"] - c["low"], abs(c["high"] - prev_close), abs(c["low"] - prev_close))
trs.append(tr)
if not trs:
return 0.0
atr_val = trs[0]
for tr in trs[1:]:
atr_val = (atr_val * (self.length - 1) + tr) / self.length
return atr_val
@property
def atr_percentile(self) -> float:
current = self.atr
if current == 0:
return 50.0
self.atr_history.append(current)
if len(self.atr_history) < 10:
return 50.0
sorted_hist = sorted(self.atr_history)
rank = sum(1 for x in sorted_hist if x <= current)
return (rank / len(sorted_hist)) * 100
class SymbolState:
def __init__(self, symbol: str):
self.symbol = symbol
self.win_fast = TradeWindow(WINDOW_FAST)
self.win_mid = TradeWindow(WINDOW_MID)
self.win_day = TradeWindow(WINDOW_DAY)
self.win_vwap = TradeWindow(WINDOW_VWAP)
self.atr_calc = ATRCalculator()
self.last_processed_id = 0
self.warmup = True
self.prev_cvd_fast = 0.0
self.prev_cvd_fast_slope = 0.0
self.prev_oi_value = 0.0
self.market_indicators = fetch_market_indicators(symbol)
self.last_signal_ts = 0
self.last_signal_dir = ""
self.recent_large_trades: deque = deque()
def process_trade(self, agg_id: int, time_ms: int, price: float, qty: float, is_buyer_maker: int):
now_ms = time_ms
self.win_fast.add(time_ms, qty, price, is_buyer_maker)
self.win_mid.add(time_ms, qty, price, is_buyer_maker)
self.win_day.add(time_ms, qty, price, is_buyer_maker)
self.win_vwap.add(time_ms, qty, price, is_buyer_maker)
self.atr_calc.update(time_ms, price)
self.win_fast.trim(now_ms)
self.win_mid.trim(now_ms)
self.win_day.trim(now_ms)
self.win_vwap.trim(now_ms)
self.last_processed_id = agg_id
def compute_p95_p99(self) -> tuple:
if len(self.win_day.trades) < 100:
return 5.0, 10.0
qtys = sorted([t[1] for t in self.win_day.trades])
n = len(qtys)
p95 = qtys[int(n * 0.95)]
p99 = qtys[int(n * 0.99)]
if "BTC" in self.symbol:
p95 = max(p95, 5.0); p99 = max(p99, 10.0)
else:
p95 = max(p95, 50.0); p99 = max(p99, 100.0)
return p95, p99
def update_large_trades(self, now_ms: int, p99: float):
cutoff = now_ms - 15 * 60 * 1000
while self.recent_large_trades and self.recent_large_trades[0][0] < cutoff:
self.recent_large_trades.popleft()
# 只检查新trade避免重复添加
seen = set(t[0] for t in self.recent_large_trades) # time_ms作为去重key
for t in self.win_fast.trades:
if t[1] >= p99 and t[0] > cutoff and t[0] not in seen:
self.recent_large_trades.append((t[0], t[1], t[3]))
seen.add(t[0])
def evaluate_signal(self, now_ms: int) -> dict:
cvd_fast = self.win_fast.cvd
cvd_mid = self.win_mid.cvd
vwap = self.win_vwap.vwap
atr = self.atr_calc.atr
atr_pct = self.atr_calc.atr_percentile
p95, p99 = self.compute_p95_p99()
self.update_large_trades(now_ms, p99)
price = vwap if vwap > 0 else 0
cvd_fast_slope = cvd_fast - self.prev_cvd_fast
cvd_fast_accel = cvd_fast_slope - self.prev_cvd_fast_slope
self.prev_cvd_fast = cvd_fast
self.prev_cvd_fast_slope = cvd_fast_slope
result = {
"cvd_fast": cvd_fast, "cvd_mid": cvd_mid, "cvd_day": self.win_day.cvd,
"cvd_fast_slope": cvd_fast_slope,
"atr": atr, "atr_pct": atr_pct, "vwap": vwap, "price": price,
"p95": p95, "p99": p99, "signal": None, "direction": None, "score": 0,
"tier": None,
"factors": {},
}
if self.warmup or price == 0 or atr == 0:
return result
# 判断倾向方向(用于评分展示,即使冷却或方向不一致也计算)
no_direction = False
in_cooldown = (now_ms - self.last_signal_ts < COOLDOWN_MS)
if cvd_fast > 0 and cvd_mid > 0:
direction = "LONG"
elif cvd_fast < 0 and cvd_mid < 0:
direction = "SHORT"
else:
direction = "LONG" if cvd_fast > 0 else "SHORT"
no_direction = True
# V5.1 五层评分体系总分100方向层可因加速额外+5
# 1) 方向层45分 + 5加速分
direction_score = 0
if (direction == "LONG" and cvd_fast > 0) or (direction == "SHORT" and cvd_fast < 0):
direction_score += 15
if (direction == "LONG" and cvd_mid > 0) or (direction == "SHORT" and cvd_mid < 0):
direction_score += 15
has_adverse_p99 = any(
(direction == "LONG" and lt[2] == 1) or (direction == "SHORT" and lt[2] == 0)
for lt in self.recent_large_trades
)
has_aligned_p99 = any(
(direction == "LONG" and lt[2] == 0) or (direction == "SHORT" and lt[2] == 1)
for lt in self.recent_large_trades
)
if has_aligned_p99:
direction_score += 15
elif not has_adverse_p99:
direction_score += 10
accel_bonus = 0
if (direction == "LONG" and cvd_fast_accel > 0) or (direction == "SHORT" and cvd_fast_accel < 0):
accel_bonus = 5
# 2) 拥挤层20分- market_indicators缺失时给中间分
long_short_ratio = to_float(self.market_indicators.get("long_short_ratio"))
if long_short_ratio is None:
ls_score = 5
elif (direction == "SHORT" and long_short_ratio > 2.0) or (direction == "LONG" and long_short_ratio < 0.5):
ls_score = 10
else:
ls_score = 5
top_trader_position = to_float(self.market_indicators.get("top_trader_position"))
if top_trader_position is None:
top_trader_score = 5
else:
if direction == "LONG":
if top_trader_position >= 0.55:
top_trader_score = 10
elif top_trader_position <= 0.45:
top_trader_score = 0
else:
top_trader_score = 5
else:
if top_trader_position <= 0.45:
top_trader_score = 10
elif top_trader_position >= 0.55:
top_trader_score = 0
else:
top_trader_score = 5
crowding_score = ls_score + top_trader_score
# 3) 环境层15分— OI变化率
oi_value = to_float(self.market_indicators.get("open_interest_hist"))
if oi_value is None or self.prev_oi_value == 0:
environment_score = 10
oi_change = 0.0
else:
oi_change = (oi_value - self.prev_oi_value) / self.prev_oi_value if self.prev_oi_value > 0 else 0
if oi_change >= 0.03:
environment_score = 15
elif oi_change > 0:
environment_score = 10
else:
environment_score = 5
if oi_value is not None and oi_value > 0:
self.prev_oi_value = oi_value
# 4) 确认层15分
confirmation_score = 15 if ((direction == "LONG" and cvd_fast > 0 and cvd_mid > 0) or (direction == "SHORT" and cvd_fast < 0 and cvd_mid < 0)) else 0
# 5) 辅助层5分
coinbase_premium = to_float(self.market_indicators.get("coinbase_premium"))
if coinbase_premium is None:
aux_score = 2
elif (direction == "LONG" and coinbase_premium > 0.0005) or (direction == "SHORT" and coinbase_premium < -0.0005):
aux_score = 5
elif abs(coinbase_premium) <= 0.0005:
aux_score = 2
else:
aux_score = 0
total_score = direction_score + accel_bonus + crowding_score + environment_score + confirmation_score + aux_score
result["score"] = total_score
result["direction"] = direction
result["factors"] = {
"direction": {
"score": direction_score,
"cvd_fast": 15 if ((direction == "LONG" and cvd_fast > 0) or (direction == "SHORT" and cvd_fast < 0)) else 0,
"cvd_mid": 15 if ((direction == "LONG" and cvd_mid > 0) or (direction == "SHORT" and cvd_mid < 0)) else 0,
"p99_flow": 15 if has_aligned_p99 else (10 if not has_adverse_p99 else 0),
"accel_bonus": accel_bonus,
},
"crowding": {"score": crowding_score, "long_short_ratio": ls_score, "top_trader_position": top_trader_score},
"environment": {"score": environment_score, "open_interest_hist": oi_change},
"confirmation": {"score": confirmation_score},
"auxiliary": {"score": aux_score, "coinbase_premium": coinbase_premium},
}
# 始终输出direction供反向平仓判断不受冷却限制
result["direction"] = direction if not no_direction else None
if total_score >= 85 and not no_direction and not in_cooldown:
result["signal"] = direction
result["tier"] = "heavy"
elif total_score >= 75 and not no_direction and not in_cooldown:
result["signal"] = direction
result["tier"] = "standard"
elif total_score >= 60 and not no_direction and not in_cooldown:
result["signal"] = direction
result["tier"] = "light"
else:
result["signal"] = None
result["tier"] = None
if result["signal"]:
self.last_signal_ts = now_ms
self.last_signal_dir = direction
return result
# ─── PG DB操作 ───────────────────────────────────────────────────
def load_historical(state: SymbolState, window_ms: int):
now_ms = int(time.time() * 1000)
start_ms = now_ms - window_ms
count = 0
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT agg_id, price, qty, time_ms, is_buyer_maker FROM agg_trades "
"WHERE symbol = %s AND time_ms >= %s ORDER BY agg_id ASC",
(state.symbol, start_ms)
)
while True:
rows = cur.fetchmany(5000)
if not rows:
break
for r in rows:
state.process_trade(r[0], r[3], r[1], r[2], r[4])
count += 1
logger.info(f"[{state.symbol}] 冷启动完成: 加载{count:,}条历史数据 (窗口={window_ms//3600000}h)")
state.warmup = False
def fetch_new_trades(symbol: str, last_id: int) -> list:
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT agg_id, price, qty, time_ms, is_buyer_maker FROM agg_trades "
"WHERE symbol = %s AND agg_id > %s ORDER BY agg_id ASC LIMIT 10000",
(symbol, last_id)
)
return [{"agg_id": r[0], "price": r[1], "qty": r[2], "time_ms": r[3], "is_buyer_maker": r[4]}
for r in cur.fetchall()]
def save_indicator(ts: int, symbol: str, result: dict):
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO signal_indicators "
"(ts,symbol,cvd_fast,cvd_mid,cvd_day,cvd_fast_slope,atr_5m,atr_percentile,vwap_30m,price,p95_qty,p99_qty,score,signal) "
"VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
(ts, symbol, result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["cvd_fast_slope"],
result["atr"], result["atr_pct"], result["vwap"], result["price"],
result["p95"], result["p99"], result["score"], result.get("signal"))
)
conn.commit()
def save_indicator_1m(ts: int, symbol: str, result: dict):
bar_ts = (ts // 60000) * 60000
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT id FROM signal_indicators_1m WHERE ts=%s AND symbol=%s", (bar_ts, symbol))
if cur.fetchone():
cur.execute(
"UPDATE signal_indicators_1m SET cvd_fast=%s,cvd_mid=%s,cvd_day=%s,atr_5m=%s,vwap_30m=%s,price=%s,score=%s,signal=%s WHERE ts=%s AND symbol=%s",
(result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["atr"], result["vwap"],
result["price"], result["score"], result.get("signal"), bar_ts, symbol)
)
else:
cur.execute(
"INSERT INTO signal_indicators_1m (ts,symbol,cvd_fast,cvd_mid,cvd_day,atr_5m,vwap_30m,price,score,signal) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
(bar_ts, symbol, result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["atr"],
result["vwap"], result["price"], result["score"], result.get("signal"))
)
conn.commit()
# ─── 模拟盘 ──────────────────────────────────────────────────────
def paper_open_trade(symbol: str, direction: str, price: float, score: int, tier: str, atr: float, now_ms: int, factors: dict = None):
"""模拟开仓"""
import json as _json3
risk_atr = 0.7 * atr
if risk_atr <= 0:
return
if direction == "LONG":
sl = price - 2.0 * risk_atr
tp1 = price + 1.5 * risk_atr
tp2 = price + 3.0 * risk_atr
else:
sl = price + 2.0 * risk_atr
tp1 = price - 1.5 * risk_atr
tp2 = price - 3.0 * risk_atr
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO paper_trades (symbol,direction,score,tier,entry_price,entry_ts,tp1_price,tp2_price,sl_price,atr_at_entry,score_factors) "
"VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
(symbol, direction, score, tier, price, now_ms, tp1, tp2, sl, atr,
_json3.dumps(factors) if factors else None)
)
conn.commit()
logger.info(f"[{symbol}] 📝 模拟开仓: {direction} @ {price:.2f} score={score} tier={tier} TP1={tp1:.2f} TP2={tp2:.2f} SL={sl:.2f}")
def paper_check_positions(symbol: str, current_price: float, now_ms: int):
"""检查模拟盘持仓的止盈止损"""
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT id, direction, entry_price, tp1_price, tp2_price, sl_price, tp1_hit, entry_ts, atr_at_entry "
"FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit') ORDER BY id",
(symbol,)
)
positions = cur.fetchall()
for pos in positions:
pid, direction, entry_price, tp1, tp2, sl, tp1_hit, entry_ts, atr_entry = pos
closed = False
new_status = None
pnl_r = 0.0
risk_distance = 2.0 * 0.7 * atr_entry if atr_entry > 0 else 1
# === 实盘模拟TP/SL视为限价单以挂单价成交 ===
if direction == "LONG":
if current_price <= sl:
closed = True
exit_price = sl
if tp1_hit:
new_status = "sl_be"
tp1_r = (tp1 - entry_price) / risk_distance if risk_distance > 0 else 0
pnl_r = 0.5 * tp1_r
else:
new_status = "sl"
pnl_r = (exit_price - entry_price) / risk_distance if risk_distance > 0 else -1.0
elif not tp1_hit and current_price >= tp1:
# TP1触发移动止损到成本价
new_sl = entry_price * 1.0005
cur.execute("UPDATE paper_trades SET tp1_hit=TRUE, sl_price=%s, status='tp1_hit' WHERE id=%s", (new_sl, pid))
logger.info(f"[{symbol}] 📝 TP1触发 LONG @ {current_price:.2f}, SL移至成本{new_sl:.2f}")
elif tp1_hit and current_price >= tp2:
closed = True
exit_price = tp2
new_status = "tp"
tp1_r = (tp1 - entry_price) / risk_distance if risk_distance > 0 else 0
tp2_r = (tp2 - entry_price) / risk_distance if risk_distance > 0 else 0
pnl_r = 0.5 * tp1_r + 0.5 * tp2_r
else: # SHORT
if current_price >= sl:
closed = True
exit_price = sl
if tp1_hit:
new_status = "sl_be"
tp1_r = (entry_price - tp1) / risk_distance if risk_distance > 0 else 0
pnl_r = 0.5 * tp1_r
else:
new_status = "sl"
pnl_r = (entry_price - exit_price) / risk_distance if risk_distance > 0 else -1.0
elif not tp1_hit and current_price <= tp1:
new_sl = entry_price * 0.9995
cur.execute("UPDATE paper_trades SET tp1_hit=TRUE, sl_price=%s, status='tp1_hit' WHERE id=%s", (new_sl, pid))
logger.info(f"[{symbol}] 📝 TP1触发 SHORT @ {current_price:.2f}, SL移至成本{new_sl:.2f}")
elif tp1_hit and current_price <= tp2:
closed = True
exit_price = tp2
new_status = "tp"
tp1_r = (entry_price - tp1) / risk_distance if risk_distance > 0 else 0
tp2_r = (entry_price - tp2) / risk_distance if risk_distance > 0 else 0
pnl_r = 0.5 * tp1_r + 0.5 * tp2_r
# 时间止损60分钟市价平仓
if not closed and (now_ms - entry_ts > 60 * 60 * 1000):
closed = True
exit_price = current_price
new_status = "timeout"
if direction == "LONG":
move = current_price - entry_price
else:
move = entry_price - current_price
pnl_r = move / risk_distance if risk_distance > 0 else 0
if tp1_hit:
tp1_r = abs(tp1 - entry_price) / risk_distance if risk_distance > 0 else 0
pnl_r = max(pnl_r, 0.5 * tp1_r)
if closed:
# 扣除手续费(开仓+平仓各Taker 0.05%
risk_distance = 2.0 * 0.7 * atr_entry if atr_entry > 0 else 1
fee_r = (2 * PAPER_FEE_RATE * entry_price) / risk_distance if risk_distance > 0 else 0
pnl_r -= fee_r
cur.execute(
"UPDATE paper_trades SET status=%s, exit_price=%s, exit_ts=%s, pnl_r=%s WHERE id=%s",
(new_status, exit_price, now_ms, round(pnl_r, 4), pid)
)
logger.info(f"[{symbol}] 📝 模拟平仓: {direction} @ {exit_price:.2f} status={new_status} pnl={pnl_r:+.2f}R (fee={fee_r:.3f}R)")
conn.commit()
def paper_has_active_position(symbol: str) -> bool:
"""检查该币种是否有活跃持仓"""
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit')", (symbol,))
return cur.fetchone()[0] > 0
def paper_get_active_direction(symbol: str) -> str | None:
"""获取该币种活跃持仓的方向无持仓返回None"""
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT direction FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit') LIMIT 1", (symbol,))
row = cur.fetchone()
return row[0] if row else None
def paper_close_by_signal(symbol: str, current_price: float, now_ms: int):
"""反向信号平仓:按当前价平掉该币种所有活跃仓位"""
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT id, direction, entry_price, tp1_hit, atr_at_entry "
"FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit')",
(symbol,)
)
positions = cur.fetchall()
for pos in positions:
pid, direction, entry_price, tp1_hit, atr_entry = pos
risk_distance = 2.0 * 0.7 * atr_entry if atr_entry > 0 else 1
if direction == "LONG":
pnl_r = (current_price - entry_price) / risk_distance if risk_distance > 0 else 0
else:
pnl_r = (entry_price - current_price) / risk_distance if risk_distance > 0 else 0
# 扣手续费
fee_r = (2 * PAPER_FEE_RATE * entry_price) / risk_distance if risk_distance > 0 else 0
pnl_r -= fee_r
cur.execute(
"UPDATE paper_trades SET status='signal_flip', exit_price=%s, exit_ts=%s, pnl_r=%s WHERE id=%s",
(current_price, now_ms, round(pnl_r, 4), pid)
)
logger.info(f"[{symbol}] 📝 反向信号平仓: {direction} @ {current_price:.2f} pnl={pnl_r:+.2f}R")
conn.commit()
def paper_active_count() -> int:
"""当前所有币种活跃持仓总数"""
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM paper_trades WHERE status IN ('active','tp1_hit')")
return cur.fetchone()[0]
# ─── 主循环 ──────────────────────────────────────────────────────
def main():
init_schema()
states = {sym: SymbolState(sym) for sym in SYMBOLS}
for sym, state in states.items():
load_historical(state, WINDOW_MID)
logger.info("=== Signal Engine (PG) 启动完成 ===")
last_1m_save = {}
cycle = 0
warmup_cycles = 3 # 启动后跳过前3轮45秒避免冷启动信号开仓
while True:
try:
now_ms = int(time.time() * 1000)
# 每轮重新加载配置支持API热更新开关
load_paper_config()
for sym, state in states.items():
new_trades = fetch_new_trades(sym, state.last_processed_id)
for t in new_trades:
state.process_trade(t["agg_id"], t["time_ms"], t["price"], t["qty"], t["is_buyer_maker"])
state.market_indicators = fetch_market_indicators(sym)
result = state.evaluate_signal(now_ms)
save_indicator(now_ms, sym, result)
bar_1m = (now_ms // 60000) * 60000
if last_1m_save.get(sym) != bar_1m:
save_indicator_1m(now_ms, sym, result)
last_1m_save[sym] = bar_1m
# 反向信号平仓基于direction不受冷却限制score>=60才触发
if PAPER_TRADING_ENABLED and warmup_cycles <= 0:
eval_dir = result.get("direction")
existing_dir = paper_get_active_direction(sym)
if existing_dir and eval_dir and existing_dir != eval_dir and result["score"] >= 60:
paper_close_by_signal(sym, result["price"], now_ms)
logger.info(f"[{sym}] 📝 反向信号平仓: {existing_dir}{eval_dir} (score={result['score']})")
if result.get("signal"):
logger.info(f"[{sym}] 🚨 信号: {result['signal']} score={result['score']} price={result['price']:.1f}")
# 模拟盘开仓(需开关开启 + 跳过冷启动)
if PAPER_TRADING_ENABLED and warmup_cycles <= 0:
if not paper_has_active_position(sym):
active_count = paper_active_count()
if active_count < PAPER_MAX_POSITIONS:
tier = result.get("tier", "standard")
paper_open_trade(
sym, result["signal"], result["price"],
result["score"], tier,
result["atr"], now_ms,
factors=result.get("factors")
)
# 模拟盘持仓检查由paper_monitor.py通过WebSocket实时处理这里不再检查
cycle += 1
if warmup_cycles > 0:
warmup_cycles -= 1
if warmup_cycles == 0:
logger.info("冷启动保护期结束,模拟盘开仓已启用")
if cycle % 60 == 0:
for sym, state in states.items():
logger.info(
f"[{sym}] 状态: CVD_fast={state.win_fast.cvd:.1f} "
f"CVD_mid={state.win_mid.cvd:.1f} ATR={state.atr_calc.atr:.2f} "
f"({state.atr_calc.atr_percentile:.0f}%) VWAP={state.win_vwap.vwap:.1f}"
)
except Exception as e:
logger.error(f"循环异常: {e}", exc_info=True)
time.sleep(LOOP_INTERVAL)
if __name__ == "__main__":
main()