180 lines
6.6 KiB
Python
180 lines
6.6 KiB
Python
"""
|
||
paper_monitor.py — 模拟盘实时监控(独立PM2进程)
|
||
|
||
用币安WebSocket实时推送价格,毫秒级触发止盈止损。
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import os
|
||
import sys
|
||
import time
|
||
import websockets
|
||
import psycopg2
|
||
|
||
sys.path.insert(0, os.path.dirname(__file__))
|
||
from db import get_sync_conn
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
||
handlers=[
|
||
logging.StreamHandler(),
|
||
logging.FileHandler(os.path.join(os.path.dirname(__file__), "..", "paper-monitor.log")),
|
||
],
|
||
)
|
||
logger = logging.getLogger("paper-monitor")
|
||
|
||
SYMBOLS = ["btcusdt", "ethusdt", "xrpusdt", "solusdt"]
|
||
FEE_RATE = 0.0005 # Taker 0.05%
|
||
|
||
|
||
def load_config() -> bool:
|
||
"""读取模拟盘开关"""
|
||
config_path = os.path.join(os.path.dirname(__file__), "paper_config.json")
|
||
try:
|
||
with open(config_path, "r") as f:
|
||
cfg = json.load(f)
|
||
return cfg.get("enabled", False)
|
||
except FileNotFoundError:
|
||
return False
|
||
|
||
|
||
def check_and_close(symbol_upper: str, price: float):
|
||
"""检查该币种的活跃持仓,价格到了就平仓"""
|
||
now_ms = int(time.time() * 1000)
|
||
|
||
with get_sync_conn() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"SELECT id, direction, entry_price, tp1_price, tp2_price, sl_price, "
|
||
"tp1_hit, entry_ts, atr_at_entry "
|
||
"FROM paper_trades WHERE symbol=%s AND status IN ('active','tp1_hit')",
|
||
(symbol_upper,)
|
||
)
|
||
positions = cur.fetchall()
|
||
|
||
for pos in positions:
|
||
pid, direction, entry_price, tp1, tp2, sl, tp1_hit, entry_ts, atr_entry = pos
|
||
closed = False
|
||
new_status = None
|
||
pnl_r = 0.0
|
||
|
||
if direction == "LONG":
|
||
if price <= sl:
|
||
closed = True
|
||
if tp1_hit:
|
||
new_status = "sl_be"
|
||
pnl_r = 0.5 * 1.5
|
||
else:
|
||
new_status = "sl"
|
||
pnl_r = -1.0
|
||
elif not tp1_hit and price >= tp1:
|
||
new_sl = entry_price * 1.0005
|
||
cur.execute("UPDATE paper_trades SET tp1_hit=TRUE, sl_price=%s, status='tp1_hit' WHERE id=%s",
|
||
(new_sl, pid))
|
||
logger.info(f"[{symbol_upper}] ✅ TP1触发 LONG @ {price:.4f}, SL→{new_sl:.4f}")
|
||
elif tp1_hit and price >= tp2:
|
||
closed = True
|
||
new_status = "tp"
|
||
pnl_r = 2.25
|
||
else: # SHORT
|
||
if price >= sl:
|
||
closed = True
|
||
if tp1_hit:
|
||
new_status = "sl_be"
|
||
pnl_r = 0.5 * 1.5
|
||
else:
|
||
new_status = "sl"
|
||
pnl_r = -1.0
|
||
elif not tp1_hit and price <= tp1:
|
||
new_sl = entry_price * 0.9995
|
||
cur.execute("UPDATE paper_trades SET tp1_hit=TRUE, sl_price=%s, status='tp1_hit' WHERE id=%s",
|
||
(new_sl, pid))
|
||
logger.info(f"[{symbol_upper}] ✅ TP1触发 SHORT @ {price:.4f}, SL→{new_sl:.4f}")
|
||
elif tp1_hit and price <= tp2:
|
||
closed = True
|
||
new_status = "tp"
|
||
pnl_r = 2.25
|
||
|
||
# 时间止损:60分钟
|
||
if not closed and (now_ms - entry_ts > 60 * 60 * 1000):
|
||
closed = True
|
||
new_status = "timeout"
|
||
risk_distance = 2.0 * 0.7 * atr_entry if atr_entry > 0 else 1
|
||
if direction == "LONG":
|
||
move = price - entry_price
|
||
else:
|
||
move = entry_price - price
|
||
pnl_r = move / risk_distance if risk_distance > 0 else 0
|
||
if tp1_hit:
|
||
pnl_r = max(pnl_r, 0.5 * 1.5)
|
||
|
||
if closed:
|
||
# 扣手续费
|
||
risk_distance = 2.0 * 0.7 * atr_entry if atr_entry > 0 else 1
|
||
fee_r = (2 * FEE_RATE * entry_price) / risk_distance if risk_distance > 0 else 0
|
||
pnl_r -= fee_r
|
||
|
||
cur.execute(
|
||
"UPDATE paper_trades SET status=%s, exit_price=%s, exit_ts=%s, pnl_r=%s WHERE id=%s",
|
||
(new_status, price, now_ms, round(pnl_r, 4), pid)
|
||
)
|
||
logger.info(
|
||
f"[{symbol_upper}] 📝 平仓: {direction} @ {price:.4f} "
|
||
f"status={new_status} pnl={pnl_r:+.2f}R (fee={fee_r:.3f}R)"
|
||
)
|
||
|
||
conn.commit()
|
||
|
||
|
||
async def ws_monitor():
|
||
"""连接币安WebSocket,实时监控价格"""
|
||
# 组合流:所有币种的markPrice@1s
|
||
streams = "/".join([f"{s}@markPrice@1s" for s in SYMBOLS])
|
||
url = f"wss://fstream.binance.com/stream?streams={streams}"
|
||
|
||
logger.info(f"=== Paper Monitor 启动 ===")
|
||
logger.info(f"监控币种: {[s.upper() for s in SYMBOLS]}")
|
||
|
||
config_check_counter = 0
|
||
|
||
while True:
|
||
enabled = load_config()
|
||
if not enabled:
|
||
logger.info("模拟盘未启用,等待30秒后重试...")
|
||
await asyncio.sleep(30)
|
||
continue
|
||
|
||
try:
|
||
async with websockets.connect(url, ping_interval=20) as ws:
|
||
logger.info(f"WebSocket连接成功: {url[:80]}...")
|
||
while True:
|
||
msg = await asyncio.wait_for(ws.recv(), timeout=30)
|
||
data = json.loads(msg)
|
||
|
||
if "data" in data:
|
||
d = data["data"]
|
||
symbol = d.get("s", "") # e.g. "BTCUSDT"
|
||
mark_price = float(d.get("p", 0))
|
||
|
||
if mark_price > 0:
|
||
check_and_close(symbol, mark_price)
|
||
|
||
# 每60秒检查一次开关
|
||
config_check_counter += 1
|
||
if config_check_counter >= 60:
|
||
config_check_counter = 0
|
||
if not load_config():
|
||
logger.info("模拟盘已关闭,断开WebSocket")
|
||
break
|
||
|
||
except Exception as e:
|
||
logger.error(f"WebSocket异常: {e}, 5秒后重连...")
|
||
await asyncio.sleep(5)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(ws_monitor())
|