#!/usr/bin/env python3 """ Position Sync - 仓位同步与对账模块 每30秒对账本地DB vs 币安实际持仓/挂单,SL缺失自动补挂 架构: live_executor.py 开仓 → position_sync.py 持续对账 + 监控TP/SL状态 """ import os import sys import json import time import logging import asyncio import hashlib import hmac from urllib.parse import urlencode import psycopg2 import aiohttp # ============ 配置 ============ TRADE_ENV = os.getenv("TRADE_ENV", "testnet") BINANCE_ENDPOINTS = { "testnet": "https://testnet.binancefuture.com", "production": "https://fapi.binance.com", } BASE_URL = BINANCE_ENDPOINTS[TRADE_ENV] DB_CONFIG = { "host": os.getenv("DB_HOST", "10.106.0.3"), "port": int(os.getenv("DB_PORT", "5432")), "dbname": os.getenv("DB_NAME", "arb_engine"), "user": os.getenv("DB_USER", "arb"), "password": os.getenv("DB_PASSWORD", "arb_engine_2026"), } CHECK_INTERVAL = 30 # 对账间隔(秒) SL_REHANG_DELAYS = [0, 3] # SL补挂重试延迟(秒) MAX_REHANG_RETRIES = 2 MISMATCH_ESCALATION_SEC = 60 # 差异持续超过此秒数升级告警 SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"] SYMBOL_PRECISION = { "BTCUSDT": {"qty": 3, "price": 1}, "ETHUSDT": {"qty": 3, "price": 2}, "XRPUSDT": {"qty": 0, "price": 4}, "SOLUSDT": {"qty": 2, "price": 2}, } logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) logger = logging.getLogger("position-sync") # ============ API Key ============ _api_key = None _secret_key = None def load_api_keys(): global _api_key, _secret_key _api_key = os.getenv("BINANCE_API_KEY") _secret_key = os.getenv("BINANCE_SECRET_KEY") if _api_key and _secret_key: logger.info(f"API Key loaded from env") return try: from google.cloud import secretmanager client = secretmanager.SecretManagerServiceClient() project = os.getenv("GCP_PROJECT", "gen-lang-client-0835616737") prefix = "binance-testnet" if TRADE_ENV == "testnet" else "binance-live" _api_key = client.access_secret_version( name=f"projects/{project}/secrets/{prefix}-api-key/versions/latest" ).payload.data.decode() _secret_key = client.access_secret_version( name=f"projects/{project}/secrets/{prefix}-secret-key/versions/latest" ).payload.data.decode() logger.info(f"API Key loaded from GCP Secret Manager") except Exception as e: logger.error(f"Failed to load API keys: {e}") sys.exit(1) def sign_params(params): params["timestamp"] = int(time.time() * 1000) query_string = urlencode(params) signature = hmac.new(_secret_key.encode(), query_string.encode(), hashlib.sha256).hexdigest() params["signature"] = signature return params async def binance_request(session, method, path, params=None): url = f"{BASE_URL}{path}" headers = {"X-MBX-APIKEY": _api_key} if params is None: params = {} params = sign_params(params) try: if method == "GET": async with session.get(url, params=params, headers=headers) as resp: return await resp.json(), resp.status elif method == "POST": async with session.post(url, params=params, headers=headers) as resp: return await resp.json(), resp.status elif method == "DELETE": async with session.delete(url, params=params, headers=headers) as resp: return await resp.json(), resp.status except Exception as e: logger.error(f"Request failed: {e}") return {"error": str(e)}, 500 # ============ 对账核心 ============ async def get_binance_positions(session): """获取币安所有持仓""" data, status = await binance_request(session, "GET", "/fapi/v2/positionRisk") positions = {} if status == 200 and isinstance(data, list): for pos in data: amt = float(pos.get("positionAmt", 0)) if amt != 0: positions[pos["symbol"]] = { "amount": amt, "direction": "LONG" if amt > 0 else "SHORT", "entry_price": float(pos.get("entryPrice", 0)), "unrealized_pnl": float(pos.get("unRealizedProfit", 0)), "liquidation_price": float(pos.get("liquidationPrice", 0)), "mark_price": float(pos.get("markPrice", 0)), } return positions async def get_binance_open_orders(session, symbol=None): """获取币安挂单""" params = {} if symbol: params["symbol"] = symbol data, status = await binance_request(session, "GET", "/fapi/v1/openOrders", params) if status == 200 and isinstance(data, list): return data return [] def get_local_positions(conn): """获取本地DB活跃持仓""" cur = conn.cursor() cur.execute(""" SELECT id, symbol, strategy, direction, entry_price, sl_price, tp1_price, tp2_price, tp1_hit, status, risk_distance, binance_order_id FROM live_trades WHERE status IN ('active', 'tp1_hit') ORDER BY entry_ts DESC """) positions = [] for row in cur.fetchall(): positions.append({ "id": row[0], "symbol": row[1], "strategy": row[2], "direction": row[3], "entry_price": row[4], "sl_price": row[5], "tp1_price": row[6], "tp2_price": row[7], "tp1_hit": row[8], "status": row[9], "risk_distance": row[10], "binance_order_id": row[11], }) return positions async def check_sl_exists(session, symbol, direction): """检查是否有SL挂单""" orders = await get_binance_open_orders(session, symbol) close_side = "SELL" if direction == "LONG" else "BUY" for order in orders: if order.get("type") == "STOP_MARKET" and order.get("side") == close_side: return True, order return False, None async def rehang_sl(session, symbol, direction, sl_price, quantity): """补挂SL保护单""" prec = SYMBOL_PRECISION.get(symbol, {"qty": 3, "price": 2}) close_side = "SELL" if direction == "LONG" else "BUY" qty_str = f"{abs(quantity):.{prec['qty']}f}" price_str = f"{sl_price:.{prec['price']}f}" params = { "symbol": symbol, "side": close_side, "type": "STOP_MARKET", "stopPrice": price_str, "quantity": qty_str, "reduceOnly": "true", } data, status = await binance_request(session, "POST", "/fapi/v1/order", params) return status == 200, data async def reconcile(session, conn): """执行一次完整对账""" local_positions = get_local_positions(conn) binance_positions = await get_binance_positions(session) issues = [] cur = conn.cursor() # 1. 检查本地有仓但币安没有(漏单/已被清算) for lp in local_positions: symbol = lp["symbol"] bp = binance_positions.get(symbol) if not bp: issues.append({ "type": "local_only", "severity": "critical", "symbol": symbol, "detail": f"本地有{lp['direction']}仓位(id={lp['id']})但币安无持仓", }) continue # 方向不一致 if bp["direction"] != lp["direction"]: issues.append({ "type": "direction_mismatch", "severity": "critical", "symbol": symbol, "detail": f"本地={lp['direction']} vs 币安={bp['direction']}", }) continue # 2. 检查SL保护单是否存在 sl_exists, sl_order = await check_sl_exists(session, symbol, lp["direction"]) if not sl_exists: issues.append({ "type": "sl_missing", "severity": "critical", "symbol": symbol, "detail": f"SL保护单缺失! 仓位裸奔中!", }) # 自动补挂SL logger.warning(f"[{symbol}] ⚠️ SL缺失,开始自动补挂...") success = False for attempt, delay in enumerate(SL_REHANG_DELAYS): if delay > 0: await asyncio.sleep(delay) ok, data = await rehang_sl(session, symbol, lp["direction"], lp["sl_price"], bp["amount"]) if ok: logger.info(f"[{symbol}] ✅ SL补挂成功 (attempt={attempt+1})") success = True break else: logger.error(f"[{symbol}] ❌ SL补挂失败 (attempt={attempt+1}): {data}") if not success: issues.append({ "type": "sl_rehang_failed", "severity": "emergency", "symbol": symbol, "detail": f"SL补挂{MAX_REHANG_RETRIES}次全部失败! 建议进入只减仓模式!", }) # 3. 检查清算距离 if bp.get("liquidation_price") and bp.get("mark_price"): liq = bp["liquidation_price"] mark = bp["mark_price"] if liq > 0 and mark > 0: if lp["direction"] == "LONG": dist_pct = (mark - liq) / mark * 100 else: dist_pct = (liq - mark) / mark * 100 if dist_pct < 8: issues.append({ "type": "liquidation_near", "severity": "emergency", "symbol": symbol, "detail": f"距清算仅{dist_pct:.1f}%! 建议立即减仓!", }) elif dist_pct < 12: issues.append({ "type": "liquidation_warning", "severity": "high", "symbol": symbol, "detail": f"距清算{dist_pct:.1f}%", }) elif dist_pct < 20: issues.append({ "type": "liquidation_caution", "severity": "medium", "symbol": symbol, "detail": f"距清算{dist_pct:.1f}%", }) # 4. 检查币安有仓但本地没有(幽灵仓位) local_symbols = {lp["symbol"] for lp in local_positions} for symbol, bp in binance_positions.items(): if symbol not in local_symbols and symbol in [s for s in SYMBOLS]: issues.append({ "type": "exchange_only", "severity": "high", "symbol": symbol, "detail": f"币安有{bp['direction']}仓位但本地无记录", }) # 记录对账结果 now_ms = int(time.time() * 1000) result = { "timestamp_ms": now_ms, "local_count": len(local_positions), "exchange_count": len(binance_positions), "issues_count": len(issues), "issues": issues, "status": "ok" if len(issues) == 0 else "mismatch", } # 写对账结果到DB(可选:创建reconciliation_log表) if issues: for issue in issues: level = "🔴" if issue["severity"] in ("critical", "emergency") else "🟡" logger.warning(f"{level} [{issue['symbol']}] {issue['type']}: {issue['detail']}") else: logger.info(f"✅ 对账正常 | 本地={len(local_positions)}仓 | 币安={len(binance_positions)}仓") return result # ============ TP1触发监控 ============ async def check_tp1_triggers(session, conn): """检查TP1是否触发,触发后移SL到保本价""" local_positions = get_local_positions(conn) cur = conn.cursor() for lp in local_positions: if lp["tp1_hit"]: continue # 已处理 symbol = lp["symbol"] # 查币安挂单,看TP1是否已成交(不在挂单列表里了) orders = await get_binance_open_orders(session, symbol) close_side = "SELL" if lp["direction"] == "LONG" else "BUY" tp1_found = False for order in orders: if (order.get("type") == "TAKE_PROFIT_MARKET" and order.get("side") == close_side and abs(float(order.get("stopPrice", 0)) - lp["tp1_price"]) < lp["tp1_price"] * 0.001): tp1_found = True break if not tp1_found and lp["status"] == "active": # TP1可能已触发,验证仓位是否减半 bp = (await get_binance_positions(session)).get(symbol) if bp and abs(bp["amount"]) < abs(float(lp.get("binance_order_id", "0") or "0")): # 确认TP1触发 logger.info(f"[{symbol}] ✅ TP1触发! 移SL到保本价") # 取消旧SL await binance_request(session, "DELETE", "/fapi/v1/allOpenOrders", {"symbol": symbol}) # 计算保本SL if lp["direction"] == "LONG": new_sl = lp["entry_price"] * 1.0005 else: new_sl = lp["entry_price"] * 0.9995 # 挂新SL(半仓) prec = SYMBOL_PRECISION.get(symbol, {"qty": 3, "price": 2}) ok, _ = await rehang_sl(session, symbol, lp["direction"], new_sl, bp["amount"]) # 重新挂TP2(半仓) tp2_price = lp["tp2_price"] qty_str = f"{abs(bp['amount']):.{prec['qty']}f}" price_str = f"{tp2_price:.{prec['price']}f}" await binance_request(session, "POST", "/fapi/v1/order", { "symbol": symbol, "side": close_side, "type": "TAKE_PROFIT_MARKET", "stopPrice": price_str, "quantity": qty_str, "reduceOnly": "true", }) # 更新DB cur.execute(""" UPDATE live_trades SET tp1_hit=TRUE, sl_price=%s, status='tp1_hit' WHERE id=%s """, (new_sl, lp["id"])) conn.commit() logger.info(f"[{symbol}] 🛡 SL移至保本 {new_sl:.4f}, TP2={tp2_price:.4f}") # ============ 平仓检测 ============ async def check_closed_positions(session, conn): """检测已平仓的交易,更新DB""" local_positions = get_local_positions(conn) binance_positions = await get_binance_positions(session) cur = conn.cursor() for lp in local_positions: symbol = lp["symbol"] bp = binance_positions.get(symbol) # 币安无持仓但本地还active → 已平仓 if not bp: logger.info(f"[{symbol}] 📝 检测到平仓,查询成交记录...") # 查最近成交确定平仓价 # 简化:用当前标记价做近似 now_ms = int(time.time() * 1000) rd = lp["risk_distance"] or 1 # 判断平仓类型(通过挂单是否还在来推断) # 如果SL/TP都不在了,说明触发了其中一个 status = "unknown" exit_price = lp["entry_price"] # fallback # 尝试从最近交易记录获取 trades_data, trades_status = await binance_request(session, "GET", "/fapi/v1/userTrades", { "symbol": symbol, "limit": 5 }) if trades_status == 200 and isinstance(trades_data, list) and trades_data: last_trade = trades_data[-1] exit_price = float(last_trade.get("price", exit_price)) # 计算pnl if lp["direction"] == "LONG": raw_pnl_r = (exit_price - lp["entry_price"]) / rd else: raw_pnl_r = (lp["entry_price"] - exit_price) / rd if lp["tp1_hit"]: tp1_r = abs(lp["tp1_price"] - lp["entry_price"]) / rd pnl_r = 0.5 * tp1_r + 0.5 * raw_pnl_r else: pnl_r = raw_pnl_r # 扣手续费 fee_r = 0.001 * lp["entry_price"] / rd pnl_r -= fee_r # 判断状态 if pnl_r > 0.5: status = "tp" elif pnl_r < -0.5: status = "sl" elif lp["tp1_hit"] and pnl_r >= -0.1: status = "sl_be" else: status = "closed" cur.execute(""" UPDATE live_trades SET status=%s, exit_price=%s, exit_ts=%s, pnl_r=%s WHERE id=%s """, (status, exit_price, now_ms, round(pnl_r, 4), lp["id"])) conn.commit() logger.info(f"[{symbol}] 📝 平仓记录: {status} | exit={exit_price:.4f} | pnl={pnl_r:+.2f}R") # ============ 主循环 ============ async def main(): logger.info("=" * 60) logger.info(f"🔄 Position Sync 启动 | 环境={TRADE_ENV} | 间隔={CHECK_INTERVAL}秒") logger.info("=" * 60) load_api_keys() conn = psycopg2.connect(**DB_CONFIG) async with aiohttp.ClientSession() as session: while True: try: # 1. 对账 result = await reconcile(session, conn) # 2. 检查TP1触发 await check_tp1_triggers(session, conn) # 3. 检查已平仓 await check_closed_positions(session, conn) await asyncio.sleep(CHECK_INTERVAL) except KeyboardInterrupt: break except Exception as e: logger.error(f"❌ 对账异常: {e}", exc_info=True) await asyncio.sleep(10) conn.close() logger.info("Position Sync 已停止") if __name__ == "__main__": asyncio.run(main())