arbitrage-engine/backend/position_sync.py
root 1bf880cebb chore: 三个实盘模块加dotenv加载.env
live_executor/position_sync/risk_guard启动时自动加载backend/.env
确保TRADE_ENV/BINANCE_API_KEY/SECRET_KEY/PG_HOST等环境变量生效
2026-03-02 10:28:39 +00:00

635 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Position Sync - 仓位同步与对账模块
每30秒对账本地DB vs 币安实际持仓/挂单SL缺失自动补挂
架构:
live_executor.py 开仓 → position_sync.py 持续对账 + 监控TP/SL状态
"""
import os
import sys
import json
from pathlib import Path
try:
from dotenv import load_dotenv; load_dotenv(Path(__file__).parent / ".env")
except ImportError:
pass
import time
import logging
import asyncio
import hashlib
import hmac
from urllib.parse import urlencode
import psycopg2
import aiohttp
# ============ 配置 ============
TRADE_ENV = os.getenv("TRADE_ENV", "testnet")
BINANCE_ENDPOINTS = {
"testnet": "https://testnet.binancefuture.com",
"production": "https://fapi.binance.com",
}
BASE_URL = BINANCE_ENDPOINTS[TRADE_ENV]
DB_CONFIG = {
"host": os.getenv("DB_HOST", "10.106.0.3"),
"port": int(os.getenv("DB_PORT", "5432")),
"dbname": os.getenv("DB_NAME", "arb_engine"),
"user": os.getenv("DB_USER", "arb"),
"password": os.getenv("DB_PASSWORD", "arb_engine_2026"),
}
CHECK_INTERVAL = 30 # 对账间隔(秒)
SL_REHANG_DELAYS = [0, 3] # SL补挂重试延迟(秒)
MAX_REHANG_RETRIES = 2
MISMATCH_ESCALATION_SEC = 60 # 差异持续超过此秒数升级告警
RISK_PER_TRADE_USD = float(os.getenv("RISK_PER_TRADE_USD", "2")) # $2=1R
SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"]
SYMBOL_PRECISION = {
"BTCUSDT": {"qty": 3, "price": 1},
"ETHUSDT": {"qty": 3, "price": 2},
"XRPUSDT": {"qty": 0, "price": 4},
"SOLUSDT": {"qty": 2, "price": 2},
}
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("position-sync")
# ============ API Key ============
_api_key = None
_secret_key = None
def load_api_keys():
global _api_key, _secret_key
_api_key = os.getenv("BINANCE_API_KEY")
_secret_key = os.getenv("BINANCE_SECRET_KEY")
if _api_key and _secret_key:
logger.info(f"API Key loaded from env")
return
try:
from google.cloud import secretmanager
client = secretmanager.SecretManagerServiceClient()
project = os.getenv("GCP_PROJECT", "gen-lang-client-0835616737")
prefix = "binance-testnet" if TRADE_ENV == "testnet" else "binance-live"
_api_key = client.access_secret_version(
name=f"projects/{project}/secrets/{prefix}-api-key/versions/latest"
).payload.data.decode()
_secret_key = client.access_secret_version(
name=f"projects/{project}/secrets/{prefix}-secret-key/versions/latest"
).payload.data.decode()
logger.info(f"API Key loaded from GCP Secret Manager")
except Exception as e:
logger.error(f"Failed to load API keys: {e}")
sys.exit(1)
def sign_params(params):
params["timestamp"] = int(time.time() * 1000)
query_string = urlencode(params)
signature = hmac.new(_secret_key.encode(), query_string.encode(), hashlib.sha256).hexdigest()
params["signature"] = signature
return params
async def binance_request(session, method, path, params=None):
url = f"{BASE_URL}{path}"
headers = {"X-MBX-APIKEY": _api_key}
if params is None:
params = {}
params = sign_params(params)
try:
if method == "GET":
async with session.get(url, params=params, headers=headers) as resp:
return await resp.json(), resp.status
elif method == "POST":
async with session.post(url, params=params, headers=headers) as resp:
return await resp.json(), resp.status
elif method == "DELETE":
async with session.delete(url, params=params, headers=headers) as resp:
return await resp.json(), resp.status
except Exception as e:
logger.error(f"Request failed: {e}")
return {"error": str(e)}, 500
# ============ 对账核心 ============
async def get_binance_positions(session):
"""获取币安所有持仓"""
data, status = await binance_request(session, "GET", "/fapi/v2/positionRisk")
positions = {}
if status == 200 and isinstance(data, list):
for pos in data:
amt = float(pos.get("positionAmt", 0))
if amt != 0:
positions[pos["symbol"]] = {
"amount": amt,
"direction": "LONG" if amt > 0 else "SHORT",
"entry_price": float(pos.get("entryPrice", 0)),
"unrealized_pnl": float(pos.get("unRealizedProfit", 0)),
"liquidation_price": float(pos.get("liquidationPrice", 0)),
"mark_price": float(pos.get("markPrice", 0)),
}
return positions
async def get_binance_open_orders(session, symbol=None):
"""获取币安挂单"""
params = {}
if symbol:
params["symbol"] = symbol
data, status = await binance_request(session, "GET", "/fapi/v1/openOrders", params)
if status == 200 and isinstance(data, list):
return data
return []
def get_local_positions(conn):
"""获取本地DB活跃持仓"""
cur = conn.cursor()
cur.execute("""
SELECT id, symbol, strategy, direction, entry_price, sl_price, tp1_price, tp2_price,
tp1_hit, status, risk_distance, binance_order_id, entry_ts
FROM live_trades
WHERE status IN ('active', 'tp1_hit')
ORDER BY entry_ts DESC
""")
positions = []
for row in cur.fetchall():
positions.append({
"id": row[0], "symbol": row[1], "strategy": row[2], "direction": row[3],
"entry_price": row[4], "sl_price": row[5], "tp1_price": row[6], "tp2_price": row[7],
"tp1_hit": row[8], "status": row[9], "risk_distance": row[10],
"binance_order_id": row[11], "entry_ts": row[12],
"binance_order_id": row[11],
})
return positions
async def check_sl_exists(session, symbol, direction):
"""检查是否有SL挂单"""
orders = await get_binance_open_orders(session, symbol)
close_side = "SELL" if direction == "LONG" else "BUY"
for order in orders:
if order.get("type") == "STOP_MARKET" and order.get("side") == close_side:
return True, order
return False, None
async def rehang_sl(session, symbol, direction, sl_price, quantity):
"""补挂SL保护单"""
prec = SYMBOL_PRECISION.get(symbol, {"qty": 3, "price": 2})
close_side = "SELL" if direction == "LONG" else "BUY"
qty_str = f"{abs(quantity):.{prec['qty']}f}"
price_str = f"{sl_price:.{prec['price']}f}"
params = {
"symbol": symbol,
"side": close_side,
"type": "STOP_MARKET",
"stopPrice": price_str,
"quantity": qty_str,
"reduceOnly": "true",
}
data, status = await binance_request(session, "POST", "/fapi/v1/order", params)
return status == 200, data
async def reconcile(session, conn):
"""执行一次完整对账"""
local_positions = get_local_positions(conn)
binance_positions = await get_binance_positions(session)
issues = []
cur = conn.cursor()
# 1. 检查本地有仓但币安没有(漏单/已被清算)
for lp in local_positions:
symbol = lp["symbol"]
bp = binance_positions.get(symbol)
if not bp:
issues.append({
"type": "local_only",
"severity": "critical",
"symbol": symbol,
"detail": f"本地有{lp['direction']}仓位(id={lp['id']})但币安无持仓",
})
continue
# 方向不一致
if bp["direction"] != lp["direction"]:
issues.append({
"type": "direction_mismatch",
"severity": "critical",
"symbol": symbol,
"detail": f"本地={lp['direction']} vs 币安={bp['direction']}",
})
continue
# 2. 检查SL保护单是否存在
sl_exists, sl_order = await check_sl_exists(session, symbol, lp["direction"])
if not sl_exists:
issues.append({
"type": "sl_missing",
"severity": "critical",
"symbol": symbol,
"detail": f"SL保护单缺失! 仓位裸奔中!",
})
# 自动补挂SL
logger.warning(f"[{symbol}] ⚠️ SL缺失开始自动补挂...")
success = False
for attempt, delay in enumerate(SL_REHANG_DELAYS):
if delay > 0:
await asyncio.sleep(delay)
ok, data = await rehang_sl(session, symbol, lp["direction"], lp["sl_price"], bp["amount"])
if ok:
logger.info(f"[{symbol}] ✅ SL补挂成功 (attempt={attempt+1})")
success = True
break
else:
logger.error(f"[{symbol}] ❌ SL补挂失败 (attempt={attempt+1}): {data}")
if not success:
issues.append({
"type": "sl_rehang_failed",
"severity": "emergency",
"symbol": symbol,
"detail": f"SL补挂{MAX_REHANG_RETRIES}次全部失败! 建议进入只减仓模式!",
})
# 3. 检查清算距离
if bp.get("liquidation_price") and bp.get("mark_price"):
liq = bp["liquidation_price"]
mark = bp["mark_price"]
if liq > 0 and mark > 0:
if lp["direction"] == "LONG":
dist_pct = (mark - liq) / mark * 100
else:
dist_pct = (liq - mark) / mark * 100
if dist_pct < 8:
issues.append({
"type": "liquidation_near",
"severity": "emergency",
"symbol": symbol,
"detail": f"距清算仅{dist_pct:.1f}%! 建议立即减仓!",
})
elif dist_pct < 12:
issues.append({
"type": "liquidation_warning",
"severity": "high",
"symbol": symbol,
"detail": f"距清算{dist_pct:.1f}%",
})
elif dist_pct < 20:
issues.append({
"type": "liquidation_caution",
"severity": "medium",
"symbol": symbol,
"detail": f"距清算{dist_pct:.1f}%",
})
# 4. 检查币安有仓但本地没有(幽灵仓位)
local_symbols = {lp["symbol"] for lp in local_positions}
for symbol, bp in binance_positions.items():
if symbol not in local_symbols and symbol in [s for s in SYMBOLS]:
issues.append({
"type": "exchange_only",
"severity": "high",
"symbol": symbol,
"detail": f"币安有{bp['direction']}仓位但本地无记录",
})
# 记录对账结果
now_ms = int(time.time() * 1000)
result = {
"timestamp_ms": now_ms,
"local_count": len(local_positions),
"exchange_count": len(binance_positions),
"issues_count": len(issues),
"issues": issues,
"status": "ok" if len(issues) == 0 else "mismatch",
}
# 写对账结果到DB可选创建reconciliation_log表
if issues:
for issue in issues:
level = "🔴" if issue["severity"] in ("critical", "emergency") else "🟡"
logger.warning(f"{level} [{issue['symbol']}] {issue['type']}: {issue['detail']}")
else:
logger.info(f"✅ 对账正常 | 本地={len(local_positions)}仓 | 币安={len(binance_positions)}")
return result
# ============ TP1触发监控 ============
async def check_tp1_triggers(session, conn):
"""检查TP1是否触发触发后移SL到保本价"""
local_positions = get_local_positions(conn)
cur = conn.cursor()
for lp in local_positions:
if lp["tp1_hit"]:
continue # 已处理
symbol = lp["symbol"]
# 查币安挂单看TP1是否已成交不在挂单列表里了
orders = await get_binance_open_orders(session, symbol)
close_side = "SELL" if lp["direction"] == "LONG" else "BUY"
tp1_found = False
for order in orders:
if (order.get("type") == "TAKE_PROFIT_MARKET"
and order.get("side") == close_side
and abs(float(order.get("stopPrice", 0)) - lp["tp1_price"]) < lp["tp1_price"] * 0.001):
tp1_found = True
break
if not tp1_found and lp["status"] == "active":
# TP1可能已触发验证仓位是否减半
bp = (await get_binance_positions(session)).get(symbol)
if bp and abs(bp["amount"]) < abs(float(lp.get("binance_order_id", "0") or "0")):
# 确认TP1触发
logger.info(f"[{symbol}] ✅ TP1触发! 移SL到保本价")
# 取消旧SL
await binance_request(session, "DELETE", "/fapi/v1/allOpenOrders", {"symbol": symbol})
# 计算保本SL
if lp["direction"] == "LONG":
new_sl = lp["entry_price"] * 1.0005
else:
new_sl = lp["entry_price"] * 0.9995
# 挂新SL半仓
prec = SYMBOL_PRECISION.get(symbol, {"qty": 3, "price": 2})
ok, _ = await rehang_sl(session, symbol, lp["direction"], new_sl, bp["amount"])
# 重新挂TP2半仓
tp2_price = lp["tp2_price"]
qty_str = f"{abs(bp['amount']):.{prec['qty']}f}"
price_str = f"{tp2_price:.{prec['price']}f}"
await binance_request(session, "POST", "/fapi/v1/order", {
"symbol": symbol, "side": close_side, "type": "TAKE_PROFIT_MARKET",
"stopPrice": price_str, "quantity": qty_str, "reduceOnly": "true",
})
# 更新DB
cur.execute("""
UPDATE live_trades SET tp1_hit=TRUE, sl_price=%s, status='tp1_hit'
WHERE id=%s
""", (new_sl, lp["id"]))
conn.commit()
logger.info(f"[{symbol}] 🛡 SL移至保本 {new_sl:.4f}, TP2={tp2_price:.4f}")
# ============ 平仓检测 ============
async def check_closed_positions(session, conn):
"""检测已平仓的交易更新DB"""
local_positions = get_local_positions(conn)
binance_positions = await get_binance_positions(session)
cur = conn.cursor()
for lp in local_positions:
symbol = lp["symbol"]
bp = binance_positions.get(symbol)
# 币安无持仓但本地还active → 已平仓
if not bp:
logger.info(f"[{symbol}] 📝 检测到平仓,查询成交记录...")
# 查最近成交确定平仓价
# 简化:用当前标记价做近似
now_ms = int(time.time() * 1000)
rd = lp["risk_distance"] or 1
# 判断平仓类型(通过挂单是否还在来推断)
# 如果SL/TP都不在了说明触发了其中一个
status = "unknown"
exit_price = lp["entry_price"] # fallback
# 尝试从最近交易记录获取成交价和手续费
trades_data, trades_status = await binance_request(session, "GET", "/fapi/v1/userTrades", {
"symbol": symbol, "limit": 20
})
actual_fee_usdt = 0
if trades_status == 200 and isinstance(trades_data, list) and trades_data:
# 取最近的平仓成交reduceOnly或最后几笔
last_trade = trades_data[-1]
exit_price = float(last_trade.get("price", exit_price))
# 汇总最近相关成交的手续费(开仓+平仓)
entry_ts = lp.get("entry_ts", 0)
for t in trades_data:
t_time = int(t.get("time", 0))
if t_time >= entry_ts - 5000: # 开仓前5秒到现在的所有成交
actual_fee_usdt += abs(float(t.get("commission", 0)))
# 计算pnl — gross(不含费)
if lp["direction"] == "LONG":
gross_pnl_r = (exit_price - lp["entry_price"]) / rd
else:
gross_pnl_r = (lp["entry_price"] - exit_price) / rd
if lp["tp1_hit"]:
tp1_r = abs(lp["tp1_price"] - lp["entry_price"]) / rd
gross_pnl_r = 0.5 * tp1_r + 0.5 * gross_pnl_r
# 手续费(R) — 用实际成交手续费
if actual_fee_usdt > 0:
fee_r = actual_fee_usdt / (RISK_PER_TRADE_USD if RISK_PER_TRADE_USD > 0 else rd)
else:
# fallback: 按0.1%估算(开+平各0.05%)
fee_r = 0.001 * lp["entry_price"] / rd
# funding费(R)
funding_usdt = 0
cur.execute("SELECT COALESCE(funding_fee_usdt, 0) FROM live_trades WHERE id = %s", (lp["id"],))
fr_row = cur.fetchone()
if fr_row:
funding_usdt = fr_row[0]
funding_r = abs(funding_usdt) / (RISK_PER_TRADE_USD if RISK_PER_TRADE_USD > 0 else rd) if funding_usdt < 0 else 0
# 净PnL = gross - fee - funding_cost
pnl_r = gross_pnl_r - fee_r - funding_r
# 判断状态
if pnl_r > 0.5:
status = "tp"
elif pnl_r < -0.5:
status = "sl"
elif lp["tp1_hit"] and pnl_r >= -0.1:
status = "sl_be"
else:
status = "closed"
cur.execute("""
UPDATE live_trades SET status=%s, exit_price=%s, exit_ts=%s, pnl_r=%s, fee_usdt=%s
WHERE id=%s
""", (status, exit_price, now_ms, round(pnl_r, 4), round(actual_fee_usdt, 4), lp["id"]))
conn.commit()
logger.info(
f"[{symbol}] 📝 平仓: {status} | exit={exit_price:.4f} | "
f"gross={gross_pnl_r:+.3f}R fee={fee_r:.3f}R({actual_fee_usdt:.4f}$) "
f"funding={funding_usdt:+.4f}$ | net={pnl_r:+.3f}R"
)
# 写event
evt_level = "info" if pnl_r >= 0 else "warn"
_log_event(conn, evt_level, "trade",
f"平仓 {lp['direction']} {symbol} | {status} | net={pnl_r:+.3f}R (gross={gross_pnl_r:+.3f} fee=-{fee_r:.3f} fr={funding_usdt:+.4f}$)",
symbol, {"trade_id": lp["id"], "status": status, "pnl_r": round(pnl_r, 4), "exit_price": exit_price})
# ============ 资金费率结算追踪 ============
# 币安结算时间UTC 00:00, 08:00, 16:00
FUNDING_SETTLEMENT_HOURS = [0, 8, 16]
_last_funding_check_ts = 0 # 上次查funding的时间戳
async def track_funding_fees(session, conn):
"""
查询币安资金费率收支更新到live_trades的funding_fee_usdt字段。
只在结算时间点附近查询每8小时一次±5分钟窗口内查一次
"""
global _last_funding_check_ts
import datetime as _dt
now = _dt.datetime.now(_dt.timezone.utc)
now_ts = time.time()
# 判断是否在结算窗口内结算时间后0-5分钟
in_settlement_window = False
for h in FUNDING_SETTLEMENT_HOURS:
settlement_time = now.replace(hour=h, minute=0, second=0, microsecond=0)
diff_sec = (now - settlement_time).total_seconds()
if 0 <= diff_sec <= 300: # 结算后5分钟内
in_settlement_window = True
break
# 不在窗口内或者5分钟内已经查过了
if not in_settlement_window:
return
if now_ts - _last_funding_check_ts < 300:
return
_last_funding_check_ts = now_ts
logger.info("💰 资金费率结算窗口查询funding收支...")
# 获取当前活跃持仓
cur = conn.cursor()
cur.execute("""
SELECT id, symbol, direction, entry_ts, COALESCE(funding_fee_usdt, 0) as current_funding
FROM live_trades WHERE status IN ('active', 'tp1_hit')
""")
active = cur.fetchall()
if not active:
logger.info("💰 无活跃持仓跳过funding查询")
return
# 查币安最近的funding收入记录
# GET /fapi/v1/income?incomeType=FUNDING_FEE&limit=100
start_ts = int((now_ts - 3600) * 1000) # 最近1小时
data, status = await binance_request(session, "GET", "/fapi/v1/income", {
"incomeType": "FUNDING_FEE",
"startTime": start_ts,
"limit": 100,
})
if status != 200 or not isinstance(data, list):
logger.warning(f"💰 查询funding income失败: {status}")
return
# 按symbol汇总本次结算的funding
funding_by_symbol = {}
for item in data:
sym = item.get("symbol", "")
income = float(item.get("income", 0))
ts = int(item.get("time", 0))
# 只取最近30分钟内的本次结算的
if now_ts * 1000 - ts < 1800000:
funding_by_symbol[sym] = funding_by_symbol.get(sym, 0) + income
if not funding_by_symbol:
logger.info("💰 本次结算无funding记录")
return
# 更新到live_trades
for trade_id, symbol, direction, entry_ts, current_funding in active:
fr_income = funding_by_symbol.get(symbol, 0)
if fr_income != 0:
new_total = current_funding + fr_income
cur.execute("UPDATE live_trades SET funding_fee_usdt = %s WHERE id = %s", (new_total, trade_id))
logger.info(f"[{symbol}] 💰 Funding: {fr_income:+.4f} USDT (累计: {new_total:+.4f})")
conn.commit()
logger.info(f"💰 Funding更新完成: {funding_by_symbol}")
def _log_event(conn, level, category, message, symbol=None, detail=None):
"""写live_events表"""
try:
cur = conn.cursor()
cur.execute(
"INSERT INTO live_events (level, category, symbol, message, detail) VALUES (%s, %s, %s, %s, %s)",
(level, category, symbol, message, json.dumps(detail) if detail else None)
)
conn.commit()
except Exception:
pass
# ============ 主循环 ============
async def main():
logger.info("=" * 60)
logger.info(f"🔄 Position Sync 启动 | 环境={TRADE_ENV} | 间隔={CHECK_INTERVAL}")
logger.info("=" * 60)
load_api_keys()
conn = psycopg2.connect(**DB_CONFIG)
async with aiohttp.ClientSession() as session:
while True:
try:
# 1. 对账
result = await reconcile(session, conn)
# 2. 检查TP1触发
await check_tp1_triggers(session, conn)
# 3. 检查已平仓
await check_closed_positions(session, conn)
# 4. 资金费率结算追踪
await track_funding_fees(session, conn)
await asyncio.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
break
except Exception as e:
logger.error(f"❌ 对账异常: {e}", exc_info=True)
await asyncio.sleep(10)
conn.close()
logger.info("Position Sync 已停止")
if __name__ == "__main__":
asyncio.run(main())