diff --git a/backend/paper_monitor.py b/backend/paper_monitor.py new file mode 100644 index 0000000..1bb2672 --- /dev/null +++ b/backend/paper_monitor.py @@ -0,0 +1,179 @@ +""" +paper_monitor.py — 模拟盘实时监控(独立PM2进程) + +用币安WebSocket实时推送价格,毫秒级触发止盈止损。 +""" + +import asyncio +import json +import logging +import os +import sys +import time +import websockets +import psycopg2 + +sys.path.insert(0, os.path.dirname(__file__)) +from db import get_sync_conn + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + handlers=[ + logging.StreamHandler(), + logging.FileHandler(os.path.join(os.path.dirname(__file__), "..", "paper-monitor.log")), + ], +) +logger = logging.getLogger("paper-monitor") + +SYMBOLS = ["btcusdt", "ethusdt", "xrpusdt", "solusdt"] +FEE_RATE = 0.0005 # Taker 0.05% + + +def load_config() -> bool: + """读取模拟盘开关""" + config_path = os.path.join(os.path.dirname(__file__), "paper_config.json") + try: + with open(config_path, "r") as f: + cfg = json.load(f) + return cfg.get("enabled", False) + except FileNotFoundError: + return False + + +def check_and_close(symbol_upper: str, price: float): + """检查该币种的活跃持仓,价格到了就平仓""" + now_ms = int(time.time() * 1000) + + with get_sync_conn() as conn: + with conn.cursor() as cur: + cur.execute( + "SELECT id, direction, entry_price, tp1_price, tp2_price, sl_price, " + "tp1_hit, entry_ts, atr_at_entry " + "FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit')", + (symbol_upper,) + ) + positions = cur.fetchall() + + for pos in positions: + pid, direction, entry_price, tp1, tp2, sl, tp1_hit, entry_ts, atr_entry = pos + closed = False + new_status = None + pnl_r = 0.0 + + if direction == "LONG": + if price <= sl: + closed = True + if tp1_hit: + new_status = "sl_be" + pnl_r = 0.5 * 1.5 + else: + new_status = "sl" + pnl_r = -1.0 + elif not tp1_hit and price >= tp1: + new_sl = entry_price * 1.0005 + cur.execute("UPDATE paper_trades SET tp1_hit=TRUE, sl_price=%s, status='tp1_hit' WHERE id=%s", + (new_sl, pid)) + logger.info(f"[{symbol_upper}] ✅ TP1触发 LONG @ {price:.4f}, SL→{new_sl:.4f}") + elif tp1_hit and price >= tp2: + closed = True + new_status = "tp" + pnl_r = 2.25 + else: # SHORT + if price >= sl: + closed = True + if tp1_hit: + new_status = "sl_be" + pnl_r = 0.5 * 1.5 + else: + new_status = "sl" + pnl_r = -1.0 + elif not tp1_hit and price <= tp1: + new_sl = entry_price * 0.9995 + cur.execute("UPDATE paper_trades SET tp1_hit=TRUE, sl_price=%s, status='tp1_hit' WHERE id=%s", + (new_sl, pid)) + logger.info(f"[{symbol_upper}] ✅ TP1触发 SHORT @ {price:.4f}, SL→{new_sl:.4f}") + elif tp1_hit and price <= tp2: + closed = True + new_status = "tp" + pnl_r = 2.25 + + # 时间止损:60分钟 + if not closed and (now_ms - entry_ts > 60 * 60 * 1000): + closed = True + new_status = "timeout" + risk_distance = 2.0 * 0.7 * atr_entry if atr_entry > 0 else 1 + if direction == "LONG": + move = price - entry_price + else: + move = entry_price - price + pnl_r = move / risk_distance if risk_distance > 0 else 0 + if tp1_hit: + pnl_r = max(pnl_r, 0.5 * 1.5) + + if closed: + # 扣手续费 + risk_distance = 2.0 * 0.7 * atr_entry if atr_entry > 0 else 1 + fee_r = (2 * FEE_RATE * entry_price) / risk_distance if risk_distance > 0 else 0 + pnl_r -= fee_r + + cur.execute( + "UPDATE paper_trades SET status=%s, exit_price=%s, exit_ts=%s, pnl_r=%s WHERE id=%s", + (new_status, price, now_ms, round(pnl_r, 4), pid) + ) + logger.info( + f"[{symbol_upper}] 📝 平仓: {direction} @ {price:.4f} " + f"status={new_status} pnl={pnl_r:+.2f}R (fee={fee_r:.3f}R)" + ) + + conn.commit() + + +async def ws_monitor(): + """连接币安WebSocket,实时监控价格""" + # 组合流:所有币种的markPrice@1s + streams = "/".join([f"{s}@markPrice@1s" for s in SYMBOLS]) + url = f"wss://fstream.binance.com/stream?streams={streams}" + + logger.info(f"=== Paper Monitor 启动 ===") + logger.info(f"监控币种: {[s.upper() for s in SYMBOLS]}") + + config_check_counter = 0 + + while True: + enabled = load_config() + if not enabled: + logger.info("模拟盘未启用,等待30秒后重试...") + await asyncio.sleep(30) + continue + + try: + async with websockets.connect(url, ping_interval=20) as ws: + logger.info(f"WebSocket连接成功: {url[:80]}...") + while True: + msg = await asyncio.wait_for(ws.recv(), timeout=30) + data = json.loads(msg) + + if "data" in data: + d = data["data"] + symbol = d.get("s", "") # e.g. "BTCUSDT" + mark_price = float(d.get("p", 0)) + + if mark_price > 0: + check_and_close(symbol, mark_price) + + # 每60秒检查一次开关 + config_check_counter += 1 + if config_check_counter >= 60: + config_check_counter = 0 + if not load_config(): + logger.info("模拟盘已关闭,断开WebSocket") + break + + except Exception as e: + logger.error(f"WebSocket异常: {e}, 5秒后重连...") + await asyncio.sleep(5) + + +if __name__ == "__main__": + asyncio.run(ws_monitor()) diff --git a/backend/signal_engine.py b/backend/signal_engine.py index da7aa0b..e795b65 100644 --- a/backend/signal_engine.py +++ b/backend/signal_engine.py @@ -707,9 +707,7 @@ def main(): result["atr"], now_ms ) - # 模拟盘持仓检查(开关开着才检查) - if PAPER_TRADING_ENABLED and result.get("price") and result["price"] > 0: - paper_check_positions(sym, result["price"], now_ms) + # 模拟盘持仓检查由paper_monitor.py通过WebSocket实时处理,这里不再检查 cycle += 1 if cycle % 60 == 0: diff --git a/frontend/app/paper/page.tsx b/frontend/app/paper/page.tsx index 48fce07..709dce8 100644 --- a/frontend/app/paper/page.tsx +++ b/frontend/app/paper/page.tsx @@ -107,9 +107,29 @@ function SummaryCards() { function ActivePositions() { const [positions, setPositions] = useState([]); + const [wsPrices, setWsPrices] = useState>({}); + + // 从API获取持仓列表(10秒刷新) useEffect(() => { const f = async () => { try { const r = await authFetch("/api/paper/positions"); if (r.ok) { const j = await r.json(); setPositions(j.data || []); } } catch {} }; - f(); const iv = setInterval(f, 5000); return () => clearInterval(iv); + f(); const iv = setInterval(f, 10000); return () => clearInterval(iv); + }, []); + + // WebSocket实时价格 + useEffect(() => { + const streams = ["btcusdt", "ethusdt", "xrpusdt", "solusdt"].map(s => `${s}@markPrice@1s`).join("/"); + const ws = new WebSocket(`wss://fstream.binance.com/stream?streams=${streams}`); + ws.onmessage = (e) => { + try { + const msg = JSON.parse(e.data); + if (msg.data) { + const sym = msg.data.s; // e.g. "BTCUSDT" + const price = parseFloat(msg.data.p); + if (sym && price > 0) setWsPrices(prev => ({ ...prev, [sym]: price })); + } + } catch {} + }; + return () => ws.close(); }, []); if (positions.length === 0) return ( @@ -121,12 +141,18 @@ function ActivePositions() { return (
-

当前持仓

+

当前持仓 ● 实时

{positions.map((p: any) => { const sym = p.symbol?.replace("USDT", "") || ""; const holdMin = Math.round((Date.now() - p.entry_ts) / 60000); + const currentPrice = wsPrices[p.symbol] || p.current_price || 0; + const entry = p.entry_price || 0; + const atr = p.atr_at_entry || 1; + const riskDist = 2.0 * 0.7 * atr; + const unrealR = riskDist > 0 ? (p.direction === "LONG" ? (currentPrice - entry) / riskDist : (entry - currentPrice) / riskDist) : 0; + const unrealUsdt = unrealR * 200; return (
@@ -137,18 +163,18 @@ function ActivePositions() { 评分{p.score} · {p.tier === "heavy" ? "加仓" : p.tier === "standard" ? "标准" : "轻仓"}
- = 0 ? "text-emerald-600" : "text-red-500"}`}> - {(p.unrealized_pnl_r ?? 0) >= 0 ? "+" : ""}{(p.unrealized_pnl_r ?? 0).toFixed(2)}R + = 0 ? "text-emerald-600" : "text-red-500"}`}> + {unrealR >= 0 ? "+" : ""}{unrealR.toFixed(2)}R - = 0 ? "text-emerald-500" : "text-red-400"}`}> - ({(p.unrealized_pnl_usdt ?? 0) >= 0 ? "+" : ""}${(p.unrealized_pnl_usdt ?? 0).toFixed(0)}) + = 0 ? "text-emerald-500" : "text-red-400"}`}> + ({unrealUsdt >= 0 ? "+" : ""}${unrealUsdt.toFixed(0)}) {holdMin}m
入场: ${fmtPrice(p.entry_price)} - 现价: ${p.current_price ? fmtPrice(p.current_price) : "-"} + 现价: ${currentPrice ? fmtPrice(currentPrice) : "-"} TP1: ${fmtPrice(p.tp1_price)}{p.tp1_hit ? " ✅" : ""} TP2: ${fmtPrice(p.tp2_price)} SL: ${fmtPrice(p.sl_price)}