arbitrage-engine/backend/paper_monitor.py

180 lines
6.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
paper_monitor.py — 模拟盘实时监控独立PM2进程
用币安WebSocket实时推送价格毫秒级触发止盈止损。
"""
import asyncio
import json
import logging
import os
import sys
import time
import websockets
import psycopg2
sys.path.insert(0, os.path.dirname(__file__))
from db import get_sync_conn
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler(os.path.join(os.path.dirname(__file__), "..", "paper-monitor.log")),
],
)
logger = logging.getLogger("paper-monitor")
SYMBOLS = ["btcusdt", "ethusdt", "xrpusdt", "solusdt"]
FEE_RATE = 0.0005 # Taker 0.05%
def load_config() -> bool:
"""读取模拟盘开关"""
config_path = os.path.join(os.path.dirname(__file__), "paper_config.json")
try:
with open(config_path, "r") as f:
cfg = json.load(f)
return cfg.get("enabled", False)
except FileNotFoundError:
return False
def check_and_close(symbol_upper: str, price: float):
"""检查该币种的活跃持仓,价格到了就平仓"""
now_ms = int(time.time() * 1000)
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT id, direction, entry_price, tp1_price, tp2_price, sl_price, "
"tp1_hit, entry_ts, atr_at_entry "
"FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit')",
(symbol_upper,)
)
positions = cur.fetchall()
for pos in positions:
pid, direction, entry_price, tp1, tp2, sl, tp1_hit, entry_ts, atr_entry = pos
closed = False
new_status = None
pnl_r = 0.0
if direction == "LONG":
if price <= sl:
closed = True
if tp1_hit:
new_status = "sl_be"
pnl_r = 0.5 * 1.5
else:
new_status = "sl"
pnl_r = -1.0
elif not tp1_hit and price >= tp1:
new_sl = entry_price * 1.0005
cur.execute("UPDATE paper_trades SET tp1_hit=TRUE, sl_price=%s, status='tp1_hit' WHERE id=%s",
(new_sl, pid))
logger.info(f"[{symbol_upper}] ✅ TP1触发 LONG @ {price:.4f}, SL→{new_sl:.4f}")
elif tp1_hit and price >= tp2:
closed = True
new_status = "tp"
pnl_r = 2.25
else: # SHORT
if price >= sl:
closed = True
if tp1_hit:
new_status = "sl_be"
pnl_r = 0.5 * 1.5
else:
new_status = "sl"
pnl_r = -1.0
elif not tp1_hit and price <= tp1:
new_sl = entry_price * 0.9995
cur.execute("UPDATE paper_trades SET tp1_hit=TRUE, sl_price=%s, status='tp1_hit' WHERE id=%s",
(new_sl, pid))
logger.info(f"[{symbol_upper}] ✅ TP1触发 SHORT @ {price:.4f}, SL→{new_sl:.4f}")
elif tp1_hit and price <= tp2:
closed = True
new_status = "tp"
pnl_r = 2.25
# 时间止损60分钟
if not closed and (now_ms - entry_ts > 60 * 60 * 1000):
closed = True
new_status = "timeout"
risk_distance = 2.0 * 0.7 * atr_entry if atr_entry > 0 else 1
if direction == "LONG":
move = price - entry_price
else:
move = entry_price - price
pnl_r = move / risk_distance if risk_distance > 0 else 0
if tp1_hit:
pnl_r = max(pnl_r, 0.5 * 1.5)
if closed:
# 扣手续费
risk_distance = 2.0 * 0.7 * atr_entry if atr_entry > 0 else 1
fee_r = (2 * FEE_RATE * entry_price) / risk_distance if risk_distance > 0 else 0
pnl_r -= fee_r
cur.execute(
"UPDATE paper_trades SET status=%s, exit_price=%s, exit_ts=%s, pnl_r=%s WHERE id=%s",
(new_status, price, now_ms, round(pnl_r, 4), pid)
)
logger.info(
f"[{symbol_upper}] 📝 平仓: {direction} @ {price:.4f} "
f"status={new_status} pnl={pnl_r:+.2f}R (fee={fee_r:.3f}R)"
)
conn.commit()
async def ws_monitor():
"""连接币安WebSocket实时监控价格"""
# 组合流所有币种的markPrice@1s
streams = "/".join([f"{s}@markPrice@1s" for s in SYMBOLS])
url = f"wss://fstream.binance.com/stream?streams={streams}"
logger.info(f"=== Paper Monitor 启动 ===")
logger.info(f"监控币种: {[s.upper() for s in SYMBOLS]}")
config_check_counter = 0
while True:
enabled = load_config()
if not enabled:
logger.info("模拟盘未启用等待30秒后重试...")
await asyncio.sleep(30)
continue
try:
async with websockets.connect(url, ping_interval=20) as ws:
logger.info(f"WebSocket连接成功: {url[:80]}...")
while True:
msg = await asyncio.wait_for(ws.recv(), timeout=30)
data = json.loads(msg)
if "data" in data:
d = data["data"]
symbol = d.get("s", "") # e.g. "BTCUSDT"
mark_price = float(d.get("p", 0))
if mark_price > 0:
check_and_close(symbol, mark_price)
# 每60秒检查一次开关
config_check_counter += 1
if config_check_counter >= 60:
config_check_counter = 0
if not load_config():
logger.info("模拟盘已关闭断开WebSocket")
break
except Exception as e:
logger.error(f"WebSocket异常: {e}, 5秒后重连...")
await asyncio.sleep(5)
if __name__ == "__main__":
asyncio.run(ws_monitor())