#!/usr/bin/env python3 """ Position Sync - 仓位同步与对账模块 每30秒对账本地DB vs 币安实际持仓/挂单,SL缺失自动补挂 架构: live_executor.py 开仓 → position_sync.py 持续对账 + 监控TP/SL状态 """ import os import sys import json from pathlib import Path try: from dotenv import load_dotenv; load_dotenv(Path(__file__).parent / ".env") except ImportError: pass import time import logging import asyncio import hashlib import hmac from urllib.parse import urlencode import psycopg2 import aiohttp # ============ 配置 ============ TRADE_ENV = os.getenv("TRADE_ENV", "testnet") BINANCE_ENDPOINTS = { "testnet": "https://testnet.binancefuture.com", "production": "https://fapi.binance.com", } BASE_URL = BINANCE_ENDPOINTS[TRADE_ENV] _DB_PASSWORD = os.getenv("DB_PASSWORD", "arb_engine_2026" if TRADE_ENV == "testnet" else "") if not _DB_PASSWORD: print("FATAL: DB_PASSWORD 未设置(生产环境必须配置)", file=sys.stderr) sys.exit(1) DB_CONFIG = { "host": os.getenv("DB_HOST", "10.106.0.3"), "port": int(os.getenv("DB_PORT", "5432")), "dbname": os.getenv("DB_NAME", "arb_engine"), "user": os.getenv("DB_USER", "arb"), "password": _DB_PASSWORD, } CHECK_INTERVAL = 30 # 对账间隔(秒) SL_REHANG_DELAYS = [0, 3] # SL补挂重试延迟(秒) MAX_REHANG_RETRIES = 2 MISMATCH_ESCALATION_SEC = 60 # 差异持续超过此秒数升级告警 RISK_PER_TRADE_USD = float(os.getenv("RISK_PER_TRADE_USD", "2")) # $2=1R from trade_config import SYMBOLS, SYMBOL_PRECISION from logging.handlers import RotatingFileHandler _log_fmt = "%(asctime)s [%(levelname)s] %(name)s: %(message)s" logging.basicConfig(level=logging.INFO, format=_log_fmt) logger = logging.getLogger("position-sync") _fh = RotatingFileHandler("logs/position_sync.log", maxBytes=10*1024*1024, backupCount=5) _fh.setFormatter(logging.Formatter(_log_fmt)) logger.addHandler(_fh) # ============ API Key ============ _api_key = None _secret_key = None def load_api_keys(): global _api_key, _secret_key _api_key = os.getenv("BINANCE_API_KEY") _secret_key = os.getenv("BINANCE_SECRET_KEY") if _api_key and _secret_key: logger.info(f"API Key loaded from env") return try: from google.cloud import secretmanager client = secretmanager.SecretManagerServiceClient() project = os.getenv("GCP_PROJECT", "gen-lang-client-0835616737") prefix = "binance-testnet" if TRADE_ENV == "testnet" else "binance-live" _api_key = client.access_secret_version( name=f"projects/{project}/secrets/{prefix}-api-key/versions/latest" ).payload.data.decode() _secret_key = client.access_secret_version( name=f"projects/{project}/secrets/{prefix}-secret-key/versions/latest" ).payload.data.decode() logger.info(f"API Key loaded from GCP Secret Manager") except Exception as e: logger.error(f"Failed to load API keys: {e}") sys.exit(1) def sign_params(params): params["timestamp"] = int(time.time() * 1000) query_string = urlencode(params) signature = hmac.new(_secret_key.encode(), query_string.encode(), hashlib.sha256).hexdigest() params["signature"] = signature return params async def binance_request(session, method, path, params=None): url = f"{BASE_URL}{path}" headers = {"X-MBX-APIKEY": _api_key} if params is None: params = {} params = sign_params(params) try: if method == "GET": async with session.get(url, params=params, headers=headers) as resp: return await resp.json(), resp.status elif method == "POST": async with session.post(url, params=params, headers=headers) as resp: return await resp.json(), resp.status elif method == "DELETE": async with session.delete(url, params=params, headers=headers) as resp: return await resp.json(), resp.status except Exception as e: logger.error(f"Request failed: {e}") return {"error": str(e)}, 500 # ============ 对账核心 ============ async def get_binance_positions(session): """获取币安所有持仓""" data, status = await binance_request(session, "GET", "/fapi/v2/positionRisk") positions = {} if status == 200 and isinstance(data, list): for pos in data: amt = float(pos.get("positionAmt", 0)) if amt != 0: positions[pos["symbol"]] = { "amount": amt, "direction": "LONG" if amt > 0 else "SHORT", "entry_price": float(pos.get("entryPrice", 0)), "unrealized_pnl": float(pos.get("unRealizedProfit", 0)), "liquidation_price": float(pos.get("liquidationPrice", 0)), "mark_price": float(pos.get("markPrice", 0)), } return positions async def get_binance_open_orders(session, symbol=None): """获取币安挂单""" params = {} if symbol: params["symbol"] = symbol data, status = await binance_request(session, "GET", "/fapi/v1/openOrders", params) if status == 200 and isinstance(data, list): return data return [] def get_local_positions(conn): """获取本地DB活跃持仓""" cur = conn.cursor() cur.execute(""" SELECT id, symbol, strategy, direction, entry_price, sl_price, tp1_price, tp2_price, tp1_hit, status, risk_distance, binance_order_id, entry_ts, qty FROM live_trades WHERE status IN ('active', 'tp1_hit') ORDER BY entry_ts DESC """) positions = [] for row in cur.fetchall(): positions.append({ "id": row[0], "symbol": row[1], "strategy": row[2], "direction": row[3], "entry_price": row[4], "sl_price": row[5], "tp1_price": row[6], "tp2_price": row[7], "tp1_hit": row[8], "status": row[9], "risk_distance": row[10], "binance_order_id": row[11], "entry_ts": row[12], "qty": row[13], }) return positions async def check_sl_exists(session, symbol, direction): """检查是否有SL挂单""" orders = await get_binance_open_orders(session, symbol) close_side = "SELL" if direction == "LONG" else "BUY" for order in orders: if order.get("type") == "STOP_MARKET" and order.get("side") == close_side: return True, order return False, None async def rehang_sl(session, symbol, direction, sl_price, quantity): """补挂SL保护单""" prec = SYMBOL_PRECISION.get(symbol, {"qty": 3, "price": 2}) close_side = "SELL" if direction == "LONG" else "BUY" qty_str = f"{abs(quantity):.{prec['qty']}f}" price_str = f"{sl_price:.{prec['price']}f}" params = { "symbol": symbol, "side": close_side, "type": "STOP_MARKET", "stopPrice": price_str, "quantity": qty_str, "reduceOnly": "true", } data, status = await binance_request(session, "POST", "/fapi/v1/order", params) return status == 200, data async def reconcile(session, conn): """执行一次完整对账""" local_positions = get_local_positions(conn) binance_positions = await get_binance_positions(session) issues = [] cur = conn.cursor() # 1. 检查本地有仓但币安没有(漏单/已被清算) for lp in local_positions: symbol = lp["symbol"] bp = binance_positions.get(symbol) if not bp: issues.append({ "type": "local_only", "severity": "critical", "symbol": symbol, "detail": f"本地有{lp['direction']}仓位(id={lp['id']})但币安无持仓", }) continue # 方向不一致 if bp["direction"] != lp["direction"]: issues.append({ "type": "direction_mismatch", "severity": "critical", "symbol": symbol, "detail": f"本地={lp['direction']} vs 币安={bp['direction']}", }) continue # 2. 检查SL保护单是否存在 sl_exists, sl_order = await check_sl_exists(session, symbol, lp["direction"]) if not sl_exists: issues.append({ "type": "sl_missing", "severity": "critical", "symbol": symbol, "detail": f"SL保护单缺失! 仓位裸奔中!", }) # 自动补挂SL logger.warning(f"[{symbol}] ⚠️ SL缺失,开始自动补挂...") success = False for attempt, delay in enumerate(SL_REHANG_DELAYS): if delay > 0: await asyncio.sleep(delay) ok, data = await rehang_sl(session, symbol, lp["direction"], lp["sl_price"], bp["amount"]) if ok: logger.info(f"[{symbol}] ✅ SL补挂成功 (attempt={attempt+1})") success = True break else: logger.error(f"[{symbol}] ❌ SL补挂失败 (attempt={attempt+1}): {data}") if not success: issues.append({ "type": "sl_rehang_failed", "severity": "emergency", "symbol": symbol, "detail": f"SL补挂{MAX_REHANG_RETRIES}次全部失败! 建议进入只减仓模式!", }) # 3. 检查清算距离 if bp.get("liquidation_price") and bp.get("mark_price"): liq = bp["liquidation_price"] mark = bp["mark_price"] if liq > 0 and mark > 0: if lp["direction"] == "LONG": dist_pct = (mark - liq) / mark * 100 else: dist_pct = (liq - mark) / mark * 100 if dist_pct < 8: issues.append({ "type": "liquidation_near", "severity": "emergency", "symbol": symbol, "detail": f"距清算仅{dist_pct:.1f}%! 建议立即减仓!", }) elif dist_pct < 12: issues.append({ "type": "liquidation_warning", "severity": "high", "symbol": symbol, "detail": f"距清算{dist_pct:.1f}%", }) elif dist_pct < 20: issues.append({ "type": "liquidation_caution", "severity": "medium", "symbol": symbol, "detail": f"距清算{dist_pct:.1f}%", }) # 4. 检查币安有仓但本地没有(幽灵仓位) local_symbols = {lp["symbol"] for lp in local_positions} for symbol, bp in binance_positions.items(): if symbol not in local_symbols and symbol in [s for s in SYMBOLS]: issues.append({ "type": "exchange_only", "severity": "high", "symbol": symbol, "detail": f"币安有{bp['direction']}仓位但本地无记录", }) # 记录对账结果 now_ms = int(time.time() * 1000) result = { "timestamp_ms": now_ms, "local_count": len(local_positions), "exchange_count": len(binance_positions), "issues_count": len(issues), "issues": issues, "status": "ok" if len(issues) == 0 else "mismatch", } # 写对账结果到DB(可选:创建reconciliation_log表) if issues: for issue in issues: level = "🔴" if issue["severity"] in ("critical", "emergency") else "🟡" logger.warning(f"{level} [{issue['symbol']}] {issue['type']}: {issue['detail']}") else: logger.info(f"✅ 对账正常 | 本地={len(local_positions)}仓 | 币安={len(binance_positions)}仓") return result # ============ TP1触发监控 ============ async def check_tp1_triggers(session, conn): """检查TP1是否触发,触发后移SL到保本价""" local_positions = get_local_positions(conn) cur = conn.cursor() for lp in local_positions: if lp["tp1_hit"]: continue # 已处理 symbol = lp["symbol"] # 查币安挂单,看TP1是否已成交(不在挂单列表里了) orders = await get_binance_open_orders(session, symbol) close_side = "SELL" if lp["direction"] == "LONG" else "BUY" tp1_found = False for order in orders: if (order.get("type") == "TAKE_PROFIT_MARKET" and order.get("side") == close_side and abs(float(order.get("stopPrice", 0)) - lp["tp1_price"]) < lp["tp1_price"] * 0.001): tp1_found = True break if not tp1_found and lp["status"] == "active": # TP1可能已触发,验证仓位是否减半 bp = (await get_binance_positions(session)).get(symbol) entry_qty = lp.get("qty") or 0 if bp and entry_qty > 0 and abs(bp["amount"]) < entry_qty * 0.75: # 确认TP1触发 logger.info(f"[{symbol}] ✅ TP1触发! 移SL到保本价") # 取消旧SL await binance_request(session, "DELETE", "/fapi/v1/allOpenOrders", {"symbol": symbol}) # 计算保本SL if lp["direction"] == "LONG": new_sl = lp["entry_price"] * 1.0005 else: new_sl = lp["entry_price"] * 0.9995 # 挂新SL(半仓)— 失败则不推进状态 prec = SYMBOL_PRECISION.get(symbol, {"qty": 3, "price": 2}) ok, sl_resp = await rehang_sl(session, symbol, lp["direction"], new_sl, bp["amount"]) if not ok: logger.error(f"[{symbol}] ❌ TP1后重挂SL失败: {sl_resp},不推进tp1_hit状态") _log_event(conn, "critical", "trade", "TP1后重挂SL失败,仓位可能裸奔,需人工确认", symbol, {"trade_id": lp["id"], "sl_resp": str(sl_resp)}) continue # 重新挂TP2(半仓) tp2_price = lp["tp2_price"] qty_str = f"{abs(bp['amount']):.{prec['qty']}f}" price_str = f"{tp2_price:.{prec['price']}f}" tp2_data, tp2_status = await binance_request(session, "POST", "/fapi/v1/order", { "symbol": symbol, "side": close_side, "type": "TAKE_PROFIT_MARKET", "stopPrice": price_str, "quantity": qty_str, "reduceOnly": "true", }) if tp2_status != 200: logger.error(f"[{symbol}] ❌ TP2重挂失败: {tp2_data},SL已挂但TP2缺失") # SL成功才更新DB cur.execute(""" UPDATE live_trades SET tp1_hit=TRUE, sl_price=%s, status='tp1_hit' WHERE id=%s """, (new_sl, lp["id"])) conn.commit() logger.info(f"[{symbol}] 🛡 SL移至保本 {new_sl:.4f}, TP2={tp2_price:.4f}") # ============ 平仓检测 ============ async def check_closed_positions(session, conn): """检测已平仓的交易,更新DB""" local_positions = get_local_positions(conn) binance_positions = await get_binance_positions(session) cur = conn.cursor() for lp in local_positions: symbol = lp["symbol"] bp = binance_positions.get(symbol) # 币安无持仓但本地还active → 已平仓 if not bp: logger.info(f"[{symbol}] 📝 检测到平仓,查询成交记录...") # 查最近成交确定平仓价 # 简化:用当前标记价做近似 now_ms = int(time.time() * 1000) rd = lp["risk_distance"] or 1 # 判断平仓类型(通过挂单是否还在来推断) # 如果SL/TP都不在了,说明触发了其中一个 status = "unknown" exit_price = lp["entry_price"] # fallback # 尝试从最近交易记录获取成交价和手续费 entry_ts = lp.get("entry_ts", 0) trades_data, trades_status = await binance_request(session, "GET", "/fapi/v1/userTrades", { "symbol": symbol, "startTime": entry_ts, "limit": 100 }) actual_fee_usdt = 0 if trades_status == 200 and isinstance(trades_data, list) and trades_data: # 过滤平仓成交:LONG平仓是SELL(buyer=false), SHORT平仓是BUY(buyer=true) is_close_buyer = lp["direction"] == "SHORT" close_trades = [t for t in trades_data if bool(t.get("buyer")) == is_close_buyer and int(t.get("time", 0)) > entry_ts + 1000] if close_trades: total_qty = sum(float(t["qty"]) for t in close_trades) if total_qty > 0: exit_price = sum(float(t["price"]) * float(t["qty"]) for t in close_trades) / total_qty elif trades_data: exit_price = float(trades_data[-1].get("price", exit_price)) # 汇总手续费(开仓后200ms起算,避免含其他策略成交) for t in trades_data: t_time = int(t.get("time", 0)) if t_time >= entry_ts + 200: # 开仓后200ms起算,避免纳入开仓前成交 actual_fee_usdt += abs(float(t.get("commission", 0))) # 计算pnl — gross(不含费) if lp["direction"] == "LONG": gross_pnl_r = (exit_price - lp["entry_price"]) / rd else: gross_pnl_r = (lp["entry_price"] - exit_price) / rd if lp["tp1_hit"]: tp1_r = abs(lp["tp1_price"] - lp["entry_price"]) / rd gross_pnl_r = 0.5 * tp1_r + 0.5 * gross_pnl_r # 手续费(R) — 用实际成交手续费 if actual_fee_usdt > 0: fee_r = actual_fee_usdt / (RISK_PER_TRADE_USD if RISK_PER_TRADE_USD > 0 else rd) else: # fallback: 按0.1%估算(开+平各0.05%) fee_r = 0.001 * lp["entry_price"] / rd # funding费(R) funding_usdt = 0 cur.execute("SELECT COALESCE(funding_fee_usdt, 0) FROM live_trades WHERE id = %s", (lp["id"],)) fr_row = cur.fetchone() if fr_row: funding_usdt = fr_row[0] funding_r = abs(funding_usdt) / (RISK_PER_TRADE_USD if RISK_PER_TRADE_USD > 0 else rd) if funding_usdt < 0 else 0 # 净PnL = gross - fee - funding_cost pnl_r = gross_pnl_r - fee_r - funding_r # 判断状态 if pnl_r > 0.5: status = "tp" elif pnl_r < -0.5: status = "sl" elif lp["tp1_hit"] and pnl_r >= -0.1: status = "sl_be" else: status = "closed" cur.execute(""" UPDATE live_trades SET status=%s, exit_price=%s, exit_ts=%s, pnl_r=%s, fee_usdt=%s WHERE id=%s """, (status, exit_price, now_ms, round(pnl_r, 4), round(actual_fee_usdt, 4), lp["id"])) conn.commit() logger.info( f"[{symbol}] 📝 平仓: {status} | exit={exit_price:.4f} | " f"gross={gross_pnl_r:+.3f}R fee={fee_r:.3f}R({actual_fee_usdt:.4f}$) " f"funding={funding_usdt:+.4f}$ | net={pnl_r:+.3f}R" ) # 写event evt_level = "info" if pnl_r >= 0 else "warn" _log_event(conn, evt_level, "trade", f"平仓 {lp['direction']} {symbol} | {status} | net={pnl_r:+.3f}R (gross={gross_pnl_r:+.3f} fee=-{fee_r:.3f} fr={funding_usdt:+.4f}$)", symbol, {"trade_id": lp["id"], "status": status, "pnl_r": round(pnl_r, 4), "exit_price": exit_price}) # ============ 资金费率结算追踪 ============ # 币安结算时间:UTC 00:00, 08:00, 16:00 FUNDING_SETTLEMENT_HOURS = [0, 8, 16] _last_funding_check_ts = 0 # 上次查funding的时间戳 async def track_funding_fees(session, conn): """ 查询币安资金费率收支,更新到live_trades的funding_fee_usdt字段。 只在结算时间点附近查询(每8小时一次,±5分钟窗口内查一次)。 """ global _last_funding_check_ts import datetime as _dt now = _dt.datetime.now(_dt.timezone.utc) now_ts = time.time() # 判断是否在结算窗口内(结算时间后0-5分钟) in_settlement_window = False for h in FUNDING_SETTLEMENT_HOURS: settlement_time = now.replace(hour=h, minute=0, second=0, microsecond=0) diff_sec = (now - settlement_time).total_seconds() if 0 <= diff_sec <= 300: # 结算后5分钟内 in_settlement_window = True break # 不在窗口内,或者5分钟内已经查过了 if not in_settlement_window: return if now_ts - _last_funding_check_ts < 300: return _last_funding_check_ts = now_ts logger.info("💰 资金费率结算窗口,查询funding收支...") # 获取当前活跃持仓 cur = conn.cursor() cur.execute(""" SELECT id, symbol, direction, entry_ts, COALESCE(funding_fee_usdt, 0) as current_funding FROM live_trades WHERE status IN ('active', 'tp1_hit') """) active = cur.fetchall() if not active: logger.info("💰 无活跃持仓,跳过funding查询") return # 查币安最近的funding收入记录 # 对齐到本次结算周期(00:00/08:00/16:00 UTC) from datetime import datetime, timezone now_utc = datetime.fromtimestamp(now_ts, tz=timezone.utc) hour = now_utc.hour # 找到最近的结算时间点(0/8/16) settlement_hour = (hour // 8) * 8 settlement_time = now_utc.replace(hour=settlement_hour, minute=0, second=0, microsecond=0) settlement_start_ms = int(settlement_time.timestamp() * 1000) data, status = await binance_request(session, "GET", "/fapi/v1/income", { "incomeType": "FUNDING_FEE", "startTime": settlement_start_ms, "limit": 100, }) if status != 200 or not isinstance(data, list): logger.warning(f"💰 查询funding income失败: {status}") return # 按symbol汇总本次结算的funding(只取本结算周期内的) funding_by_symbol = {} for item in data: sym = item.get("symbol", "") income = float(item.get("income", 0)) ts = int(item.get("time", 0)) if ts >= settlement_start_ms: funding_by_symbol[sym] = funding_by_symbol.get(sym, 0) + income if not funding_by_symbol: logger.info("💰 本次结算无funding记录") return # 更新到live_trades for trade_id, symbol, direction, entry_ts, current_funding in active: fr_income = funding_by_symbol.get(symbol, 0) if fr_income != 0: new_total = current_funding + fr_income cur.execute("UPDATE live_trades SET funding_fee_usdt = %s WHERE id = %s", (new_total, trade_id)) logger.info(f"[{symbol}] 💰 Funding: {fr_income:+.4f} USDT (累计: {new_total:+.4f})") conn.commit() logger.info(f"💰 Funding更新完成: {funding_by_symbol}") def _log_event(conn, level, category, message, symbol=None, detail=None): """写live_events表""" try: cur = conn.cursor() cur.execute( "INSERT INTO live_events (level, category, symbol, message, detail) VALUES (%s, %s, %s, %s, %s)", (level, category, symbol, message, json.dumps(detail) if detail else None) ) conn.commit() except Exception: pass # ============ 主循环 ============ def ensure_db_conn(conn): """检查DB连接,断线则重连""" try: conn.cursor().execute("SELECT 1") return conn except Exception: logger.warning("⚠️ DB连接断开,重连中...") try: conn.close() except Exception: pass return psycopg2.connect(**DB_CONFIG) async def main(): logger.info("=" * 60) logger.info(f"🔄 Position Sync 启动 | 环境={TRADE_ENV} | 间隔={CHECK_INTERVAL}秒") logger.info("=" * 60) load_api_keys() conn = psycopg2.connect(**DB_CONFIG) async with aiohttp.ClientSession() as session: while True: try: conn = ensure_db_conn(conn) # 1. 对账 result = await reconcile(session, conn) # 2. 检查TP1触发 await check_tp1_triggers(session, conn) # 3. 检查已平仓 await check_closed_positions(session, conn) # 4. 资金费率结算追踪 await track_funding_fees(session, conn) await asyncio.sleep(CHECK_INTERVAL) except KeyboardInterrupt: break except Exception as e: logger.error(f"❌ 对账异常: {e}", exc_info=True) await asyncio.sleep(10) conn.close() logger.info("Position Sync 已停止") if __name__ == "__main__": asyncio.run(main())