diff --git a/backend/position_sync.py b/backend/position_sync.py new file mode 100644 index 0000000..93cb653 --- /dev/null +++ b/backend/position_sync.py @@ -0,0 +1,494 @@ +#!/usr/bin/env python3 +""" +Position Sync - 仓位同步与对账模块 +每30秒对账本地DB vs 币安实际持仓/挂单,SL缺失自动补挂 + +架构: + live_executor.py 开仓 → position_sync.py 持续对账 + 监控TP/SL状态 +""" + +import os +import sys +import json +import time +import logging +import asyncio +import hashlib +import hmac +from urllib.parse import urlencode + +import psycopg2 +import aiohttp + +# ============ 配置 ============ + +TRADE_ENV = os.getenv("TRADE_ENV", "testnet") +BINANCE_ENDPOINTS = { + "testnet": "https://testnet.binancefuture.com", + "production": "https://fapi.binance.com", +} +BASE_URL = BINANCE_ENDPOINTS[TRADE_ENV] + +DB_CONFIG = { + "host": os.getenv("DB_HOST", "10.106.0.3"), + "port": int(os.getenv("DB_PORT", "5432")), + "dbname": os.getenv("DB_NAME", "arb_engine"), + "user": os.getenv("DB_USER", "arb"), + "password": os.getenv("DB_PASSWORD", "arb_engine_2026"), +} + +CHECK_INTERVAL = 30 # 对账间隔(秒) +SL_REHANG_DELAYS = [0, 3] # SL补挂重试延迟(秒) +MAX_REHANG_RETRIES = 2 +MISMATCH_ESCALATION_SEC = 60 # 差异持续超过此秒数升级告警 + +SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"] + +SYMBOL_PRECISION = { + "BTCUSDT": {"qty": 3, "price": 1}, + "ETHUSDT": {"qty": 3, "price": 2}, + "XRPUSDT": {"qty": 0, "price": 4}, + "SOLUSDT": {"qty": 2, "price": 2}, +} + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", +) +logger = logging.getLogger("position-sync") + +# ============ API Key ============ +_api_key = None +_secret_key = None + + +def load_api_keys(): + global _api_key, _secret_key + _api_key = os.getenv("BINANCE_API_KEY") + _secret_key = os.getenv("BINANCE_SECRET_KEY") + if _api_key and _secret_key: + logger.info(f"API Key loaded from env") + return + try: + from google.cloud import secretmanager + client = secretmanager.SecretManagerServiceClient() + project = os.getenv("GCP_PROJECT", "gen-lang-client-0835616737") + prefix = "binance-testnet" if TRADE_ENV == "testnet" else "binance-live" + _api_key = client.access_secret_version( + name=f"projects/{project}/secrets/{prefix}-api-key/versions/latest" + ).payload.data.decode() + _secret_key = client.access_secret_version( + name=f"projects/{project}/secrets/{prefix}-secret-key/versions/latest" + ).payload.data.decode() + logger.info(f"API Key loaded from GCP Secret Manager") + except Exception as e: + logger.error(f"Failed to load API keys: {e}") + sys.exit(1) + + +def sign_params(params): + params["timestamp"] = int(time.time() * 1000) + query_string = urlencode(params) + signature = hmac.new(_secret_key.encode(), query_string.encode(), hashlib.sha256).hexdigest() + params["signature"] = signature + return params + + +async def binance_request(session, method, path, params=None): + url = f"{BASE_URL}{path}" + headers = {"X-MBX-APIKEY": _api_key} + if params is None: + params = {} + params = sign_params(params) + try: + if method == "GET": + async with session.get(url, params=params, headers=headers) as resp: + return await resp.json(), resp.status + elif method == "POST": + async with session.post(url, params=params, headers=headers) as resp: + return await resp.json(), resp.status + elif method == "DELETE": + async with session.delete(url, params=params, headers=headers) as resp: + return await resp.json(), resp.status + except Exception as e: + logger.error(f"Request failed: {e}") + return {"error": str(e)}, 500 + + +# ============ 对账核心 ============ + +async def get_binance_positions(session): + """获取币安所有持仓""" + data, status = await binance_request(session, "GET", "/fapi/v2/positionRisk") + positions = {} + if status == 200 and isinstance(data, list): + for pos in data: + amt = float(pos.get("positionAmt", 0)) + if amt != 0: + positions[pos["symbol"]] = { + "amount": amt, + "direction": "LONG" if amt > 0 else "SHORT", + "entry_price": float(pos.get("entryPrice", 0)), + "unrealized_pnl": float(pos.get("unRealizedProfit", 0)), + "liquidation_price": float(pos.get("liquidationPrice", 0)), + "mark_price": float(pos.get("markPrice", 0)), + } + return positions + + +async def get_binance_open_orders(session, symbol=None): + """获取币安挂单""" + params = {} + if symbol: + params["symbol"] = symbol + data, status = await binance_request(session, "GET", "/fapi/v1/openOrders", params) + if status == 200 and isinstance(data, list): + return data + return [] + + +def get_local_positions(conn): + """获取本地DB活跃持仓""" + cur = conn.cursor() + cur.execute(""" + SELECT id, symbol, strategy, direction, entry_price, sl_price, tp1_price, tp2_price, + tp1_hit, status, risk_distance, binance_order_id + FROM live_trades + WHERE status IN ('active', 'tp1_hit') + ORDER BY entry_ts DESC + """) + positions = [] + for row in cur.fetchall(): + positions.append({ + "id": row[0], "symbol": row[1], "strategy": row[2], "direction": row[3], + "entry_price": row[4], "sl_price": row[5], "tp1_price": row[6], "tp2_price": row[7], + "tp1_hit": row[8], "status": row[9], "risk_distance": row[10], + "binance_order_id": row[11], + }) + return positions + + +async def check_sl_exists(session, symbol, direction): + """检查是否有SL挂单""" + orders = await get_binance_open_orders(session, symbol) + close_side = "SELL" if direction == "LONG" else "BUY" + for order in orders: + if order.get("type") == "STOP_MARKET" and order.get("side") == close_side: + return True, order + return False, None + + +async def rehang_sl(session, symbol, direction, sl_price, quantity): + """补挂SL保护单""" + prec = SYMBOL_PRECISION.get(symbol, {"qty": 3, "price": 2}) + close_side = "SELL" if direction == "LONG" else "BUY" + qty_str = f"{abs(quantity):.{prec['qty']}f}" + price_str = f"{sl_price:.{prec['price']}f}" + + params = { + "symbol": symbol, + "side": close_side, + "type": "STOP_MARKET", + "stopPrice": price_str, + "quantity": qty_str, + "reduceOnly": "true", + } + data, status = await binance_request(session, "POST", "/fapi/v1/order", params) + return status == 200, data + + +async def reconcile(session, conn): + """执行一次完整对账""" + local_positions = get_local_positions(conn) + binance_positions = await get_binance_positions(session) + + issues = [] + cur = conn.cursor() + + # 1. 检查本地有仓但币安没有(漏单/已被清算) + for lp in local_positions: + symbol = lp["symbol"] + bp = binance_positions.get(symbol) + + if not bp: + issues.append({ + "type": "local_only", + "severity": "critical", + "symbol": symbol, + "detail": f"本地有{lp['direction']}仓位(id={lp['id']})但币安无持仓", + }) + continue + + # 方向不一致 + if bp["direction"] != lp["direction"]: + issues.append({ + "type": "direction_mismatch", + "severity": "critical", + "symbol": symbol, + "detail": f"本地={lp['direction']} vs 币安={bp['direction']}", + }) + continue + + # 2. 检查SL保护单是否存在 + sl_exists, sl_order = await check_sl_exists(session, symbol, lp["direction"]) + if not sl_exists: + issues.append({ + "type": "sl_missing", + "severity": "critical", + "symbol": symbol, + "detail": f"SL保护单缺失! 仓位裸奔中!", + }) + # 自动补挂SL + logger.warning(f"[{symbol}] ⚠️ SL缺失,开始自动补挂...") + success = False + for attempt, delay in enumerate(SL_REHANG_DELAYS): + if delay > 0: + await asyncio.sleep(delay) + ok, data = await rehang_sl(session, symbol, lp["direction"], lp["sl_price"], bp["amount"]) + if ok: + logger.info(f"[{symbol}] ✅ SL补挂成功 (attempt={attempt+1})") + success = True + break + else: + logger.error(f"[{symbol}] ❌ SL补挂失败 (attempt={attempt+1}): {data}") + + if not success: + issues.append({ + "type": "sl_rehang_failed", + "severity": "emergency", + "symbol": symbol, + "detail": f"SL补挂{MAX_REHANG_RETRIES}次全部失败! 建议进入只减仓模式!", + }) + + # 3. 检查清算距离 + if bp.get("liquidation_price") and bp.get("mark_price"): + liq = bp["liquidation_price"] + mark = bp["mark_price"] + if liq > 0 and mark > 0: + if lp["direction"] == "LONG": + dist_pct = (mark - liq) / mark * 100 + else: + dist_pct = (liq - mark) / mark * 100 + + if dist_pct < 8: + issues.append({ + "type": "liquidation_near", + "severity": "emergency", + "symbol": symbol, + "detail": f"距清算仅{dist_pct:.1f}%! 建议立即减仓!", + }) + elif dist_pct < 12: + issues.append({ + "type": "liquidation_warning", + "severity": "high", + "symbol": symbol, + "detail": f"距清算{dist_pct:.1f}%", + }) + elif dist_pct < 20: + issues.append({ + "type": "liquidation_caution", + "severity": "medium", + "symbol": symbol, + "detail": f"距清算{dist_pct:.1f}%", + }) + + # 4. 检查币安有仓但本地没有(幽灵仓位) + local_symbols = {lp["symbol"] for lp in local_positions} + for symbol, bp in binance_positions.items(): + if symbol not in local_symbols and symbol in [s for s in SYMBOLS]: + issues.append({ + "type": "exchange_only", + "severity": "high", + "symbol": symbol, + "detail": f"币安有{bp['direction']}仓位但本地无记录", + }) + + # 记录对账结果 + now_ms = int(time.time() * 1000) + result = { + "timestamp_ms": now_ms, + "local_count": len(local_positions), + "exchange_count": len(binance_positions), + "issues_count": len(issues), + "issues": issues, + "status": "ok" if len(issues) == 0 else "mismatch", + } + + # 写对账结果到DB(可选:创建reconciliation_log表) + if issues: + for issue in issues: + level = "🔴" if issue["severity"] in ("critical", "emergency") else "🟡" + logger.warning(f"{level} [{issue['symbol']}] {issue['type']}: {issue['detail']}") + else: + logger.info(f"✅ 对账正常 | 本地={len(local_positions)}仓 | 币安={len(binance_positions)}仓") + + return result + + +# ============ TP1触发监控 ============ + +async def check_tp1_triggers(session, conn): + """检查TP1是否触发,触发后移SL到保本价""" + local_positions = get_local_positions(conn) + cur = conn.cursor() + + for lp in local_positions: + if lp["tp1_hit"]: + continue # 已处理 + + symbol = lp["symbol"] + # 查币安挂单,看TP1是否已成交(不在挂单列表里了) + orders = await get_binance_open_orders(session, symbol) + close_side = "SELL" if lp["direction"] == "LONG" else "BUY" + + tp1_found = False + for order in orders: + if (order.get("type") == "TAKE_PROFIT_MARKET" + and order.get("side") == close_side + and abs(float(order.get("stopPrice", 0)) - lp["tp1_price"]) < lp["tp1_price"] * 0.001): + tp1_found = True + break + + if not tp1_found and lp["status"] == "active": + # TP1可能已触发,验证仓位是否减半 + bp = (await get_binance_positions(session)).get(symbol) + if bp and abs(bp["amount"]) < abs(float(lp.get("binance_order_id", "0") or "0")): + # 确认TP1触发 + logger.info(f"[{symbol}] ✅ TP1触发! 移SL到保本价") + + # 取消旧SL + await binance_request(session, "DELETE", "/fapi/v1/allOpenOrders", {"symbol": symbol}) + + # 计算保本SL + if lp["direction"] == "LONG": + new_sl = lp["entry_price"] * 1.0005 + else: + new_sl = lp["entry_price"] * 0.9995 + + # 挂新SL(半仓) + prec = SYMBOL_PRECISION.get(symbol, {"qty": 3, "price": 2}) + ok, _ = await rehang_sl(session, symbol, lp["direction"], new_sl, bp["amount"]) + + # 重新挂TP2(半仓) + tp2_price = lp["tp2_price"] + qty_str = f"{abs(bp['amount']):.{prec['qty']}f}" + price_str = f"{tp2_price:.{prec['price']}f}" + await binance_request(session, "POST", "/fapi/v1/order", { + "symbol": symbol, "side": close_side, "type": "TAKE_PROFIT_MARKET", + "stopPrice": price_str, "quantity": qty_str, "reduceOnly": "true", + }) + + # 更新DB + cur.execute(""" + UPDATE live_trades SET tp1_hit=TRUE, sl_price=%s, status='tp1_hit' + WHERE id=%s + """, (new_sl, lp["id"])) + conn.commit() + logger.info(f"[{symbol}] 🛡 SL移至保本 {new_sl:.4f}, TP2={tp2_price:.4f}") + + +# ============ 平仓检测 ============ + +async def check_closed_positions(session, conn): + """检测已平仓的交易,更新DB""" + local_positions = get_local_positions(conn) + binance_positions = await get_binance_positions(session) + cur = conn.cursor() + + for lp in local_positions: + symbol = lp["symbol"] + bp = binance_positions.get(symbol) + + # 币安无持仓但本地还active → 已平仓 + if not bp: + logger.info(f"[{symbol}] 📝 检测到平仓,查询成交记录...") + + # 查最近成交确定平仓价 + # 简化:用当前标记价做近似 + now_ms = int(time.time() * 1000) + rd = lp["risk_distance"] or 1 + + # 判断平仓类型(通过挂单是否还在来推断) + # 如果SL/TP都不在了,说明触发了其中一个 + status = "unknown" + exit_price = lp["entry_price"] # fallback + + # 尝试从最近交易记录获取 + trades_data, trades_status = await binance_request(session, "GET", "/fapi/v1/userTrades", { + "symbol": symbol, "limit": 5 + }) + if trades_status == 200 and isinstance(trades_data, list) and trades_data: + last_trade = trades_data[-1] + exit_price = float(last_trade.get("price", exit_price)) + + # 计算pnl + if lp["direction"] == "LONG": + raw_pnl_r = (exit_price - lp["entry_price"]) / rd + else: + raw_pnl_r = (lp["entry_price"] - exit_price) / rd + + if lp["tp1_hit"]: + tp1_r = abs(lp["tp1_price"] - lp["entry_price"]) / rd + pnl_r = 0.5 * tp1_r + 0.5 * raw_pnl_r + else: + pnl_r = raw_pnl_r + + # 扣手续费 + fee_r = 0.001 * lp["entry_price"] / rd + pnl_r -= fee_r + + # 判断状态 + if pnl_r > 0.5: + status = "tp" + elif pnl_r < -0.5: + status = "sl" + elif lp["tp1_hit"] and pnl_r >= -0.1: + status = "sl_be" + else: + status = "closed" + + cur.execute(""" + UPDATE live_trades SET status=%s, exit_price=%s, exit_ts=%s, pnl_r=%s + WHERE id=%s + """, (status, exit_price, now_ms, round(pnl_r, 4), lp["id"])) + conn.commit() + + logger.info(f"[{symbol}] 📝 平仓记录: {status} | exit={exit_price:.4f} | pnl={pnl_r:+.2f}R") + + +# ============ 主循环 ============ + +async def main(): + logger.info("=" * 60) + logger.info(f"🔄 Position Sync 启动 | 环境={TRADE_ENV} | 间隔={CHECK_INTERVAL}秒") + logger.info("=" * 60) + + load_api_keys() + conn = psycopg2.connect(**DB_CONFIG) + + async with aiohttp.ClientSession() as session: + while True: + try: + # 1. 对账 + result = await reconcile(session, conn) + + # 2. 检查TP1触发 + await check_tp1_triggers(session, conn) + + # 3. 检查已平仓 + await check_closed_positions(session, conn) + + await asyncio.sleep(CHECK_INTERVAL) + + except KeyboardInterrupt: + break + except Exception as e: + logger.error(f"❌ 对账异常: {e}", exc_info=True) + await asyncio.sleep(10) + + conn.close() + logger.info("Position Sync 已停止") + + +if __name__ == "__main__": + asyncio.run(main())