arbitrage-engine/backend/signal_engine.py

615 lines
29 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
signal_engine.py — V5 短线交易信号引擎PostgreSQL版
架构:
- 独立PM2进程每5秒循环
- 内存滚动窗口计算指标CVD/ATR/VWAP/大单阈值)
- 启动时回灌历史数据冷启动warmup
- 信号评估核心3条件+加分3条件
- 输出signal_indicators表 + signal_trades表 + Discord推送
指标:
- CVD_fast (30m滚动) / CVD_mid (4h滚动) / CVD_day (UTC日内)
- ATR (5m, 14周期)
- VWAP_30m
- 大单阈值 P95/P99 (24h滚动)
"""
import logging
import os
import time
import threading
import asyncio
from datetime import datetime, timezone
from typing import Optional
import json
import websockets
from db import get_sync_conn, init_schema
from signal_state import SymbolState as BaseSymbolState
from strategy_scoring import evaluate_signal as score_strategy
from strategy_loader import (
load_strategy_configs,
load_strategy_configs_from_db,
)
from paper_trading import (
paper_open_trade,
paper_has_active_position,
paper_get_active_direction,
paper_close_by_signal,
paper_active_count,
)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler(os.path.join(os.path.dirname(__file__), "..", "signal-engine.log")),
],
)
logger = logging.getLogger("signal-engine")
SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"]
LOOP_INTERVAL = 15 # 秒从5改15CPU降60%,信号质量无影响)
STRATEGY_DIR = os.path.join(os.path.dirname(__file__), "strategies")
# ─── 模拟盘配置 ───────────────────────────────────────────────────
PAPER_TRADING_ENABLED = False # 总开关(兼容旧逻辑)
PAPER_ENABLED_STRATEGIES = [] # 分策略开关: ["v51_baseline", "v52_8signals"]
PAPER_INITIAL_BALANCE = 10000 # 虚拟初始资金 USDT
PAPER_RISK_PER_TRADE = 0.02 # 单笔风险 2%即200U
PAPER_MAX_POSITIONS = 4 # 每套策略最大同时持仓数
PAPER_TIER_MULTIPLIER = { # 档位仓位倍数
"light": 0.5, # 轻仓: 1%
"standard": 1.0, # 标准: 2%
"heavy": 1.5, # 加仓: 3%
}
PAPER_FEE_RATE = 0.0005 # Taker手续费 0.05%(开仓+平仓各一次)
def load_paper_config():
"""从配置文件加载模拟盘开关和参数"""
global PAPER_TRADING_ENABLED, PAPER_ENABLED_STRATEGIES, PAPER_INITIAL_BALANCE, PAPER_RISK_PER_TRADE, PAPER_MAX_POSITIONS
config_path = os.path.join(os.path.dirname(__file__), "paper_config.json")
try:
with open(config_path, "r") as f:
import json as _json2
cfg = _json2.load(f)
PAPER_TRADING_ENABLED = cfg.get("enabled", False)
PAPER_ENABLED_STRATEGIES = cfg.get("enabled_strategies", [])
PAPER_INITIAL_BALANCE = cfg.get("initial_balance", 10000)
PAPER_RISK_PER_TRADE = cfg.get("risk_per_trade", 0.02)
PAPER_MAX_POSITIONS = cfg.get("max_positions", 4)
except FileNotFoundError:
pass
def is_strategy_enabled(strategy_name: str) -> bool:
"""检查某策略是否启用模拟盘"""
if not PAPER_TRADING_ENABLED:
return False
# 如果enabled_strategies为空走旧逻辑全部启用
if not PAPER_ENABLED_STRATEGIES:
return True
return strategy_name in PAPER_ENABLED_STRATEGIES
# ─────────────────────────────────────────────────────────────────
# 窗口大小(毫秒)
WINDOW_FAST = 30 * 60 * 1000 # 30分钟
WINDOW_MID = 4 * 3600 * 1000 # 4小时
WINDOW_DAY = 24 * 3600 * 1000 # 24小时
WINDOW_VWAP = 30 * 60 * 1000 # 30分钟
# ATR参数
ATR_PERIOD_MS = 5 * 60 * 1000
ATR_LENGTH = 14
def fetch_market_indicators(symbol: str) -> dict:
"""从PG读取最新的market_indicators数据解析JSONB提取关键数值"""
import json as _json
with get_sync_conn() as conn:
with conn.cursor() as cur:
indicators = {}
ind_types = [
"long_short_ratio", "top_trader_position", "open_interest_hist",
"coinbase_premium", "funding_rate",
"obi_depth_10", "spot_perp_divergence", "tiered_cvd_whale", # Phase 2
]
for ind_type in ind_types:
cur.execute(
"SELECT value FROM market_indicators WHERE symbol=%s AND indicator_type=%s ORDER BY timestamp_ms DESC LIMIT 1",
(symbol, ind_type),
)
row = cur.fetchone()
if not row or row[0] is None:
indicators[ind_type] = None
continue
val = row[0]
if isinstance(val, str):
try:
val = _json.loads(val)
except Exception:
indicators[ind_type] = None
continue
# 提取关键数值
if ind_type == "long_short_ratio":
indicators[ind_type] = float(val.get("longShortRatio", 1.0))
elif ind_type == "top_trader_position":
indicators[ind_type] = float(val.get("longAccount", 0.5))
elif ind_type == "open_interest_hist":
indicators[ind_type] = float(val.get("sumOpenInterestValue", 0))
elif ind_type == "coinbase_premium":
indicators[ind_type] = float(val.get("premium_pct", 0)) / 100.0
elif ind_type == "funding_rate":
indicators[ind_type] = float(val.get("fundingRate", val.get("lastFundingRate", 0)))
elif ind_type == "obi_depth_10":
# obi范围[-1,1],正值=买压,负值=卖压
indicators[ind_type] = float(val.get("obi", 0))
elif ind_type == "spot_perp_divergence":
# divergence = (spot - mark) / mark
indicators[ind_type] = float(val.get("divergence", 0))
elif ind_type == "tiered_cvd_whale":
# 巨鲸净CVD比率[-1,1],正值=净买入
indicators[ind_type] = float(val.get("whale_cvd_ratio", 0))
return indicators
class SymbolState(BaseSymbolState):
"""
兼容旧接口的 SymbolState 包装:
- 旧代码只传入 symbol这里补全窗口与 ATR 配置。
- 评分逻辑统一在 strategy_scoring.evaluate_signal 中实现。
"""
def __init__(self, symbol: str):
super().__init__(
symbol,
WINDOW_FAST,
WINDOW_MID,
WINDOW_DAY,
WINDOW_VWAP,
ATR_PERIOD_MS,
ATR_LENGTH,
fetch_market_indicators,
)
# 评分相关方法已抽离到 backend/strategy_scoring.py
# 这里保留一个纯状态容器封装,不再实现打分逻辑。
# ─── PG DB操作 ───────────────────────────────────────────────────
# ─── PG DB操作 ───────────────────────────────────────────────────
def load_historical(state: SymbolState, window_ms: int):
now_ms = int(time.time() * 1000)
start_ms = now_ms - window_ms
count = 0
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT agg_id, price, qty, time_ms, is_buyer_maker FROM agg_trades "
"WHERE symbol = %s AND time_ms >= %s ORDER BY agg_id ASC",
(state.symbol, start_ms)
)
while True:
rows = cur.fetchmany(5000)
if not rows:
break
for r in rows:
state.process_trade(r[0], r[3], r[1], r[2], r[4])
count += 1
logger.info(f"[{state.symbol}] 冷启动完成: 加载{count:,}条历史数据 (窗口={window_ms//3600000}h)")
state.warmup = False
def fetch_new_trades(symbol: str, last_id: int) -> list:
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT agg_id, price, qty, time_ms, is_buyer_maker FROM agg_trades "
"WHERE symbol = %s AND agg_id > %s ORDER BY agg_id ASC LIMIT 10000",
(symbol, last_id)
)
return [{"agg_id": r[0], "price": r[1], "qty": r[2], "time_ms": r[3], "is_buyer_maker": r[4]}
for r in cur.fetchall()]
def save_indicator(ts: int, symbol: str, result: dict, strategy: str = "v52_8signals",
strategy_id: Optional[str] = None, strategy_name_snapshot: Optional[str] = None):
with get_sync_conn() as conn:
with conn.cursor() as cur:
import json as _json3
factors_json = _json3.dumps(result.get("factors")) if result.get("factors") else None
cvd_fast_5m = result.get("cvd_fast_5m") # v53_fast 专用5m窗口CVD其他策略为None
cur.execute(
"INSERT INTO signal_indicators "
"(ts,symbol,strategy,cvd_fast,cvd_mid,cvd_day,cvd_fast_slope,atr_5m,atr_percentile,atr_value,vwap_30m,price,p95_qty,p99_qty,score,signal,factors,cvd_fast_5m,strategy_id,strategy_name_snapshot) "
"VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
(ts, symbol, strategy, result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["cvd_fast_slope"],
result["atr"], result["atr_pct"], result.get("atr_value", result["atr"]),
result["vwap"], result["price"],
result["p95"], result["p99"], result["score"], result.get("signal"), factors_json,
cvd_fast_5m, strategy_id, strategy_name_snapshot)
)
# 有信号时通知live_executor
if result.get("signal"):
cur.execute("NOTIFY new_signal, %s", (f"{symbol}:{strategy}:{result['signal']}:{result['score']}",))
conn.commit()
def save_feature_event(ts: int, symbol: str, result: dict, strategy: str):
"""
V5.3 专用:每次评分后把 raw features + score 层写入 signal_feature_events。
只对 v53_alt / v53_btc 调用,其他策略跳过。
"""
if not strategy.startswith("v53"):
return
f = result.get("factors") or {}
track = f.get("track", "ALT")
side = result.get("direction") or ("LONG" if result.get("score", 0) >= 0 else "SHORT")
score_direction = (f.get("direction") or {}).get("score", 0) if track == "ALT" else (f.get("direction") or {}).get("score", 0)
score_crowding = (f.get("crowding") or {}).get("score", 0)
score_env = (f.get("environment") or {}).get("score", 0)
score_aux = (f.get("auxiliary") or {}).get("score", 0)
gate_passed = f.get("gate_passed", True)
block_reason = f.get("gate_block") or f.get("block_reason")
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO signal_feature_events
(ts, symbol, track, side, strategy, strategy_version,
cvd_fast_raw, cvd_mid_raw, cvd_day_raw, cvd_fast_slope_raw,
p95_qty_raw, p99_qty_raw,
atr_value, atr_percentile,
oi_delta_raw, ls_ratio_raw, top_pos_raw, coinbase_premium_raw,
obi_raw, tiered_cvd_whale_raw,
score_direction, score_crowding, score_environment, score_aux, score_total,
gate_passed, block_reason, price)
VALUES
(%s,%s,%s,%s,%s,%s,
%s,%s,%s,%s,
%s,%s,
%s,%s,
%s,%s,%s,%s,
%s,%s,
%s,%s,%s,%s,%s,
%s,%s,%s)
""",
(
ts, symbol, track, side, strategy, "v5.3",
result.get("cvd_fast"), result.get("cvd_mid"), result.get("cvd_day"), result.get("cvd_fast_slope"),
result.get("p95"), result.get("p99"),
result.get("atr_value", result.get("atr")), result.get("atr_pct"),
result.get("oi_delta"), result.get("ls_ratio"), result.get("top_trader_position"),
(f.get("auxiliary") or {}).get("coinbase_premium"),
f.get("obi_raw"), f.get("whale_cvd_ratio"),
score_direction, score_crowding, score_env, score_aux, result.get("score", 0),
gate_passed, block_reason, result.get("price"),
)
)
conn.commit()
def save_indicator_1m(ts: int, symbol: str, result: dict):
bar_ts = (ts // 60000) * 60000
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT id FROM signal_indicators_1m WHERE ts=%s AND symbol=%s", (bar_ts, symbol))
if cur.fetchone():
cur.execute(
"UPDATE signal_indicators_1m SET cvd_fast=%s,cvd_mid=%s,cvd_day=%s,atr_5m=%s,vwap_30m=%s,price=%s,score=%s,signal=%s WHERE ts=%s AND symbol=%s",
(result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["atr"], result["vwap"],
result["price"], result["score"], result.get("signal"), bar_ts, symbol)
)
else:
cur.execute(
"INSERT INTO signal_indicators_1m (ts,symbol,cvd_fast,cvd_mid,cvd_day,atr_5m,vwap_30m,price,score,signal) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
(bar_ts, symbol, result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["atr"],
result["vwap"], result["price"], result["score"], result.get("signal"))
)
conn.commit()
# ─── 模拟盘 ──────────────────────────────────────────────────────
def strategy_applies_to_symbol(strategy_cfg: dict, symbol: str) -> bool:
"""
判断策略是否适用于当前 symbol。
- V5.4 单币策略strategy_cfg.symbol 必须匹配;
- 兼容旧版多币策略:若配置了 symbols则需在列表内。
"""
strategy_symbol = strategy_cfg.get("symbol")
if strategy_symbol and strategy_symbol != symbol:
return False
allowed_symbols = strategy_cfg.get("symbols", [])
if allowed_symbols and symbol not in allowed_symbols:
return False
return True
# ─── 实时 WebSocket 数据OBI + 期现背离)────────────────────────
_REALTIME_STATES: dict = {} # symbol -> SymbolState在main()里注入
async def _ws_obi_stream(symbol: str, state):
"""订阅期货盘口深度流,实时更新 state.rt_obi"""
stream = f"{symbol.lower()}@depth10@100ms"
url = f"wss://fstream.binance.com/stream?streams={stream}"
while True:
try:
async with websockets.connect(url, ping_interval=20, ping_timeout=10) as ws:
logger.info(f"[RT-OBI] {symbol} WebSocket connected")
async for raw in ws:
data = json.loads(raw)
book = data.get("data", data)
bids = book.get("b", [])
asks = book.get("a", [])
bid_vol = sum(float(b[1]) for b in bids)
ask_vol = sum(float(a[1]) for a in asks)
total = bid_vol + ask_vol
state.rt_obi = (bid_vol - ask_vol) / total if total > 0 else 0.0
except Exception as e:
logger.warning(f"[RT-OBI] {symbol} 断线重连: {e}")
await asyncio.sleep(3)
async def _ws_spot_perp_stream(symbol: str, state):
"""
订阅现货 bookTicker最优买卖价+ 期货 markPrice计算期现背离
spot_mid = (best_bid + best_ask) / 2
divergence = (spot_mid - mark_price) / mark_price
"""
spot_stream = f"{symbol.lower()}@bookTicker"
perp_stream = f"{symbol.lower()}@markPrice@1s"
spot_url = f"wss://stream.binance.com:9443/stream?streams={spot_stream}"
perp_url = f"wss://fstream.binance.com/stream?streams={perp_stream}"
spot_mid = [0.0] # mutable container for closure
mark_price = [0.0]
async def read_spot():
while True:
try:
async with websockets.connect(spot_url, ping_interval=20, ping_timeout=10) as ws:
logger.info(f"[RT-SPD] {symbol} spot WebSocket connected")
async for raw in ws:
d = json.loads(raw).get("data", json.loads(raw))
bid = float(d.get("b", 0) or 0)
ask = float(d.get("a", 0) or 0)
if bid > 0 and ask > 0:
spot_mid[0] = (bid + ask) / 2
if mark_price[0] > 0:
state.rt_spot_perp_div = (spot_mid[0] - mark_price[0]) / mark_price[0]
except Exception as e:
logger.warning(f"[RT-SPD] {symbol} spot 断线: {e}")
await asyncio.sleep(3)
async def read_perp():
while True:
try:
async with websockets.connect(perp_url, ping_interval=20, ping_timeout=10) as ws:
logger.info(f"[RT-SPD] {symbol} perp WebSocket connected")
async for raw in ws:
d = json.loads(raw).get("data", json.loads(raw))
mp = float(d.get("p", 0) or d.get("markPrice", 0) or 0)
if mp > 0:
mark_price[0] = mp
if spot_mid[0] > 0:
state.rt_spot_perp_div = (spot_mid[0] - mark_price[0]) / mark_price[0]
except Exception as e:
logger.warning(f"[RT-SPD] {symbol} perp 断线: {e}")
await asyncio.sleep(3)
await asyncio.gather(read_spot(), read_perp())
async def _realtime_ws_runner(states: dict):
"""统一启动所有实时WebSocket协程BTC + ALT (ETH/XRP/SOL)"""
coros = []
for sym, state in states.items():
# OBI流所有symbol都接perp depth10
coros.append(_ws_obi_stream(sym, state))
# 期现背离流:只有有现货+合约的币种BTC/ETH/XRP/SOL都有
coros.append(_ws_spot_perp_stream(sym, state))
if coros:
await asyncio.gather(*coros)
def start_realtime_ws(states: dict):
"""在独立线程里跑asyncio event loop驱动实时WebSocket采集"""
def _run():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(_realtime_ws_runner(states))
except Exception as e:
logger.error(f"[RT-WS] event loop 异常: {e}")
finally:
loop.close()
t = threading.Thread(target=_run, daemon=True, name="realtime-ws")
t.start()
logger.info("[RT-WS] 实时WebSocket后台线程已启动BTC OBI + 期现背离)")
# ─── 主循环 ──────────────────────────────────────────────────────
def main():
init_schema()
# V5.4: 优先从 DB 加载策略配置,失败 fallback 到 JSON
strategy_configs = load_strategy_configs_from_db()
if not strategy_configs:
logger.warning("[DB] 未能从 DB 加载策略配置,使用 JSON fallback")
strategy_configs = load_strategy_configs()
strategy_names = [cfg.get("name", "unknown") for cfg in strategy_configs]
logger.info(f"已加载策略配置: {', '.join(strategy_names)}")
# 选取 primary 策略用于 1m 指标表:优先 v53 系列,其次第一个配置
primary_candidates = ["v53", "v53_middle", "v53_fast"]
primary_strategy_name = None
for cand in primary_candidates:
if any(cfg.get("name") == cand for cfg in strategy_configs):
primary_strategy_name = cand
break
if not primary_strategy_name:
primary_strategy_name = strategy_names[0] if strategy_names else "v53"
states = {sym: SymbolState(sym) for sym in SYMBOLS}
for sym, state in states.items():
load_historical(state, WINDOW_MID)
logger.info("=== Signal Engine (PG) 启动完成 ===")
# 启动实时WebSocket后台线程BTC OBI + 期现背离)
start_realtime_ws(states)
last_1m_save = {}
cycle = 0
warmup_cycles = 3 # 启动后跳过前3轮45秒避免冷启动信号开仓
while True:
try:
now_ms = int(time.time() * 1000)
# 每轮重新加载配置支持API热更新开关
load_paper_config()
for sym, state in states.items():
new_trades = fetch_new_trades(sym, state.last_processed_id)
for t in new_trades:
state.process_trade(t["agg_id"], t["time_ms"], t["price"], t["qty"], t["is_buyer_maker"])
state.market_indicators = fetch_market_indicators(sym)
snapshot = state.build_evaluation_snapshot(now_ms)
strategy_results: list[tuple[dict, dict]] = []
for strategy_cfg in strategy_configs:
if not strategy_applies_to_symbol(strategy_cfg, sym):
continue
strategy_result = score_strategy(
state, now_ms, strategy_cfg=strategy_cfg, snapshot=snapshot
)
strategy_results.append((strategy_cfg, strategy_result))
if not strategy_results:
logger.warning(f"[{sym}] 当前无可用策略配置,跳过本轮")
continue
# 每个策略独立存储indicator
for strategy_cfg, strategy_result in strategy_results:
sname = strategy_cfg.get("name") or "v53"
sid = strategy_cfg.get("strategy_id")
ssnap = strategy_cfg.get("strategy_name_snapshot")
save_indicator(now_ms, sym, strategy_result, strategy=sname,
strategy_id=sid, strategy_name_snapshot=ssnap)
save_feature_event(now_ms, sym, strategy_result, strategy=sname)
# 1m表仍用primary图表用
primary_result = strategy_results[0][1]
for strategy_cfg, strategy_result in strategy_results:
if strategy_cfg.get("name") == primary_strategy_name:
primary_result = strategy_result
break
bar_1m = (now_ms // 60000) * 60000
if last_1m_save.get(sym) != bar_1m:
save_indicator_1m(now_ms, sym, primary_result)
last_1m_save[sym] = bar_1m
# 反向信号平仓按策略独立判断score>=每策略配置的 flip_threshold 才触发
if warmup_cycles <= 0:
for strategy_cfg, result in strategy_results:
strategy_name = strategy_cfg.get("name") or "v53"
if not is_strategy_enabled(strategy_name):
continue
# V5.4: custom策略只处理自己配置的symbol
# per-strategy 方向约束:只接受与策略方向一致的反向信号
dir_cfg_raw = (strategy_cfg.get("direction") or "both").lower()
if dir_cfg_raw not in ("long", "short", "both"):
dir_cfg_raw = "both"
eval_dir = result.get("direction")
existing_dir = paper_get_active_direction(sym, strategy_name)
# 反向平仓分数门槛:优先使用每个策略自己的 flip_thresholdfallback 到 entry_threshold/75
flip_thr = int(strategy_cfg.get("flip_threshold", strategy_cfg.get("threshold", 75)))
# 如果策略配置为只多/只空,则忽略与配置相反方向的评估结果
if dir_cfg_raw == "long" and eval_dir == "SHORT":
continue
if dir_cfg_raw == "short" and eval_dir == "LONG":
continue
if existing_dir and eval_dir and existing_dir != eval_dir and result["score"] >= flip_thr:
paper_close_by_signal(sym, result["price"], now_ms, strategy_name)
logger.info(
f"[{sym}] 📝 反向信号平仓[{strategy_name}]: {existing_dir}{eval_dir} "
f"(score={result['score']}, flip_thr={flip_thr})"
)
for strategy_cfg, result in strategy_results:
strategy_name = strategy_cfg.get("name") or "v53"
if result.get("signal"):
logger.info(
f"[{sym}] 🚨 信号[{strategy_name}]: {result['signal']} "
f"score={result['score']} price={result['price']:.1f}"
)
# 模拟盘开仓(需该策略启用 + 跳过冷启动)
if is_strategy_enabled(strategy_name) and warmup_cycles <= 0:
if not paper_has_active_position(sym, strategy_name):
active_count = paper_active_count(strategy_name)
if active_count < PAPER_MAX_POSITIONS:
tier = result.get("tier", "standard")
paper_open_trade(
sym,
result["signal"],
result["price"],
result["score"],
tier,
result["atr"],
now_ms,
factors=result.get("factors"),
strategy=strategy_name,
tp_sl=strategy_cfg.get("tp_sl"),
strategy_id=strategy_cfg.get("strategy_id"),
strategy_name_snapshot=strategy_cfg.get("strategy_name_snapshot"),
)
# 模拟盘持仓检查由paper_monitor.py通过WebSocket实时处理这里不再检查
cycle += 1
if warmup_cycles > 0:
warmup_cycles -= 1
if warmup_cycles == 0:
logger.info("冷启动保护期结束,模拟盘开仓已启用")
# 每10轮(约2-3分钟)热加载配置,不需要重启
if cycle % 10 == 0:
old_strategies = list(PAPER_ENABLED_STRATEGIES)
load_paper_config()
# V5.4: 热重载优先读 DB失败 fallback 到 JSON
new_configs = load_strategy_configs_from_db()
if new_configs:
strategy_configs = new_configs
else:
strategy_configs = load_strategy_configs() # A1: 热重载权重/阈值/TP/SL
strategy_names = [cfg.get("name", "unknown") for cfg in strategy_configs]
primary_candidates = ["v53", "v53_middle", "v53_fast"]
primary_strategy_name = None
for cand in primary_candidates:
if any(cfg.get("name") == cand for cfg in strategy_configs):
primary_strategy_name = cand
break
if not primary_strategy_name:
primary_strategy_name = strategy_names[0] if strategy_names else "v53"
if list(PAPER_ENABLED_STRATEGIES) != old_strategies:
logger.info(f"📋 配置热加载: enabled_strategies={PAPER_ENABLED_STRATEGIES}")
for sym, state in states.items():
logger.info(
f"[{sym}] 状态: CVD_fast={state.win_fast.cvd:.1f} "
f"CVD_mid={state.win_mid.cvd:.1f} ATR={state.atr_calc.atr:.2f} "
f"({state.atr_calc.atr_percentile:.0f}%) VWAP={state.win_vwap.vwap:.1f}"
)
except Exception as e:
logger.error(f"循环异常: {e}", exc_info=True)
time.sleep(LOOP_INTERVAL)
if __name__ == "__main__":
main()