""" paper_monitor.py — 模拟盘实时监控(独立PM2进程) 用币安WebSocket实时推送价格,毫秒级触发止盈止损。 """ import asyncio import json import logging import os import sys import time import websockets import psycopg2 sys.path.insert(0, os.path.dirname(__file__)) from db import get_sync_conn logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[ logging.StreamHandler(), logging.FileHandler(os.path.join(os.path.dirname(__file__), "..", "paper-monitor.log")), ], ) logger = logging.getLogger("paper-monitor") SYMBOLS = ["btcusdt", "ethusdt", "xrpusdt", "solusdt"] FEE_RATE = 0.0005 # Taker 0.05% def load_config() -> bool: """读取模拟盘开关""" config_path = os.path.join(os.path.dirname(__file__), "paper_config.json") try: with open(config_path, "r") as f: cfg = json.load(f) return cfg.get("enabled", False) except FileNotFoundError: return False def check_and_close(symbol_upper: str, price: float): """检查该币种的活跃持仓,价格到了就平仓""" now_ms = int(time.time() * 1000) with get_sync_conn() as conn: with conn.cursor() as cur: cur.execute( "SELECT id, direction, entry_price, tp1_price, tp2_price, sl_price, " "tp1_hit, entry_ts, atr_at_entry " "FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit')", (symbol_upper,) ) positions = cur.fetchall() for pos in positions: pid, direction, entry_price, tp1, tp2, sl, tp1_hit, entry_ts, atr_entry = pos closed = False new_status = None pnl_r = 0.0 # 统一计算risk_distance (1R基准距离) risk_distance = 2.0 * 0.7 * atr_entry if atr_entry > 0 else 1 # === 实盘模拟:TP/SL视为限价单,以挂单价成交(非市价) === if direction == "LONG": if price <= sl: closed = True exit_price = sl # 限价止损单以SL价成交 if tp1_hit: new_status = "sl_be" tp1_r = (tp1 - entry_price) / risk_distance if risk_distance > 0 else 0 pnl_r = 0.5 * tp1_r # TP1半仓已锁定 else: new_status = "sl" pnl_r = (exit_price - entry_price) / risk_distance if risk_distance > 0 else -1.0 elif not tp1_hit and price >= tp1: new_sl = entry_price * 1.0005 cur.execute("UPDATE paper_trades SET tp1_hit=TRUE, sl_price=%s, status='tp1_hit' WHERE id=%s", (new_sl, pid)) logger.info(f"[{symbol_upper}] ✅ TP1触发 LONG @ {tp1:.4f}, SL→{new_sl:.4f}") elif tp1_hit and price >= tp2: closed = True exit_price = tp2 # 限价止盈单以TP2价成交 new_status = "tp" tp1_r = (tp1 - entry_price) / risk_distance if risk_distance > 0 else 0 tp2_r = (tp2 - entry_price) / risk_distance if risk_distance > 0 else 0 pnl_r = 0.5 * tp1_r + 0.5 * tp2_r else: # SHORT if price >= sl: closed = True exit_price = sl # 限价止损单以SL价成交 if tp1_hit: new_status = "sl_be" tp1_r = (entry_price - tp1) / risk_distance if risk_distance > 0 else 0 pnl_r = 0.5 * tp1_r else: new_status = "sl" pnl_r = (entry_price - exit_price) / risk_distance if risk_distance > 0 else -1.0 elif not tp1_hit and price <= tp1: new_sl = entry_price * 0.9995 cur.execute("UPDATE paper_trades SET tp1_hit=TRUE, sl_price=%s, status='tp1_hit' WHERE id=%s", (new_sl, pid)) logger.info(f"[{symbol_upper}] ✅ TP1触发 SHORT @ {tp1:.4f}, SL→{new_sl:.4f}") elif tp1_hit and price <= tp2: closed = True exit_price = tp2 # 限价止盈单以TP2价成交 new_status = "tp" tp1_r = (entry_price - tp1) / risk_distance if risk_distance > 0 else 0 tp2_r = (entry_price - tp2) / risk_distance if risk_distance > 0 else 0 pnl_r = 0.5 * tp1_r + 0.5 * tp2_r # 时间止损:60分钟(市价平仓,用当前价) if not closed and (now_ms - entry_ts > 60 * 60 * 1000): closed = True exit_price = price # 超时是市价平仓 new_status = "timeout" if direction == "LONG": move = price - entry_price else: move = entry_price - price pnl_r = move / risk_distance if risk_distance > 0 else 0 if tp1_hit: tp1_r = abs(tp1 - entry_price) / risk_distance if risk_distance > 0 else 0 pnl_r = max(pnl_r, 0.5 * tp1_r) if closed: # 扣手续费 risk_distance = 2.0 * 0.7 * atr_entry if atr_entry > 0 else 1 fee_r = (2 * FEE_RATE * entry_price) / risk_distance if risk_distance > 0 else 0 pnl_r -= fee_r cur.execute( "UPDATE paper_trades SET status=%s, exit_price=%s, exit_ts=%s, pnl_r=%s WHERE id=%s", (new_status, exit_price, now_ms, round(pnl_r, 4), pid) ) logger.info( f"[{symbol_upper}] 📝 平仓: {direction} @ {exit_price:.4f} " f"status={new_status} pnl={pnl_r:+.2f}R (fee={fee_r:.3f}R)" ) conn.commit() async def ws_monitor(): """连接币安WebSocket,实时监控价格""" # 组合流:所有币种的markPrice@1s streams = "/".join([f"{s}@markPrice@1s" for s in SYMBOLS]) url = f"wss://fstream.binance.com/stream?streams={streams}" logger.info(f"=== Paper Monitor 启动 ===") logger.info(f"监控币种: {[s.upper() for s in SYMBOLS]}") config_check_counter = 0 while True: enabled = load_config() if not enabled: logger.info("模拟盘未启用,等待30秒后重试...") await asyncio.sleep(30) continue try: async with websockets.connect(url, ping_interval=20) as ws: logger.info(f"WebSocket连接成功: {url[:80]}...") while True: msg = await asyncio.wait_for(ws.recv(), timeout=30) data = json.loads(msg) if "data" in data: d = data["data"] symbol = d.get("s", "") # e.g. "BTCUSDT" mark_price = float(d.get("p", 0)) if mark_price > 0: check_and_close(symbol, mark_price) # 每60秒检查一次开关 config_check_counter += 1 if config_check_counter >= 60: config_check_counter = 0 if not load_config(): logger.info("模拟盘已关闭,断开WebSocket") break except Exception as e: logger.error(f"WebSocket异常: {e}, 5秒后重连...") await asyncio.sleep(5) if __name__ == "__main__": asyncio.run(ws_monitor())