#!/usr/bin/env python3 """ Live Executor - 实盘交易执行模块 监听PG NOTIFY接收新信号,调币安API执行交易 架构: signal_engine.py → NOTIFY new_signal → live_executor.py → Binance API 不影响模拟盘任何进程。 """ import os import sys import json import time import logging import asyncio import hashlib import hmac from urllib.parse import urlencode import psycopg2 import psycopg2.extensions import aiohttp # ============ 配置 ============ # 环境:testnet / production TRADE_ENV = os.getenv("TRADE_ENV", "testnet") # 币安API端点 BINANCE_ENDPOINTS = { "testnet": "https://testnet.binancefuture.com", "production": "https://fapi.binance.com", } BASE_URL = BINANCE_ENDPOINTS[TRADE_ENV] # 数据库 DB_CONFIG = { "host": os.getenv("DB_HOST", "10.106.0.3"), "port": int(os.getenv("DB_PORT", "5432")), "dbname": os.getenv("DB_NAME", "arb_engine"), "user": os.getenv("DB_USER", "arb"), "password": os.getenv("DB_PASSWORD", "arb_engine_2026"), } # 策略 ENABLED_STRATEGIES = json.loads(os.getenv("LIVE_STRATEGIES", '["v52_8signals"]')) # 风险参数 RISK_PER_TRADE_USD = float(os.getenv("RISK_PER_TRADE_USD", "2")) # 测试网$2=1R MAX_POSITIONS = int(os.getenv("MAX_POSITIONS", "4")) FEE_RATE = 0.0005 # Taker 0.05% # 币种精度(币安要求) SYMBOL_PRECISION = { "BTCUSDT": {"qty": 3, "price": 1, "min_notional": 100}, "ETHUSDT": {"qty": 3, "price": 2, "min_notional": 20}, "XRPUSDT": {"qty": 0, "price": 4, "min_notional": 5}, "SOLUSDT": {"qty": 2, "price": 2, "min_notional": 5}, } # 日志 logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) logger = logging.getLogger("live-executor") # ============ API Key管理 ============ _api_key = None _secret_key = None def load_api_keys(): """从GCP Secret Manager或环境变量加载API Key""" global _api_key, _secret_key # 优先环境变量(本地开发/测试用) _api_key = os.getenv("BINANCE_API_KEY") _secret_key = os.getenv("BINANCE_SECRET_KEY") if _api_key and _secret_key: logger.info(f"API Key loaded from env (first 10: {_api_key[:10]}...)") return # 从GCP Secret Manager读取 try: from google.cloud import secretmanager client = secretmanager.SecretManagerServiceClient() project = os.getenv("GCP_PROJECT", "gen-lang-client-0835616737") prefix = "binance-testnet" if TRADE_ENV == "testnet" else "binance-live" api_key_name = f"projects/{project}/secrets/{prefix}-api-key/versions/latest" secret_key_name = f"projects/{project}/secrets/{prefix}-secret-key/versions/latest" _api_key = client.access_secret_version(name=api_key_name).payload.data.decode() _secret_key = client.access_secret_version(name=secret_key_name).payload.data.decode() logger.info(f"API Key loaded from GCP Secret Manager ({prefix}, first 10: {_api_key[:10]}...)") except Exception as e: logger.error(f"Failed to load API keys: {e}") sys.exit(1) # ============ 币安API工具 ============ def sign_params(params: dict) -> dict: """HMAC SHA256签名""" params["timestamp"] = int(time.time() * 1000) query_string = urlencode(params) signature = hmac.new( _secret_key.encode(), query_string.encode(), hashlib.sha256 ).hexdigest() params["signature"] = signature return params async def binance_request(session: aiohttp.ClientSession, method: str, path: str, params: dict = None, signed: bool = True): """发送币安API请求""" url = f"{BASE_URL}{path}" headers = {"X-MBX-APIKEY": _api_key} if params is None: params = {} if signed: params = sign_params(params) try: if method == "GET": async with session.get(url, params=params, headers=headers) as resp: data = await resp.json() if resp.status != 200: logger.error(f"Binance API error: {resp.status} {data}") return data, resp.status elif method == "POST": async with session.post(url, params=params, headers=headers) as resp: data = await resp.json() if resp.status != 200: logger.error(f"Binance API error: {resp.status} {data}") return data, resp.status elif method == "DELETE": async with session.delete(url, params=params, headers=headers) as resp: data = await resp.json() if resp.status != 200: logger.error(f"Binance API error: {resp.status} {data}") return data, resp.status except Exception as e: logger.error(f"Binance request failed: {e}") return {"error": str(e)}, 500 # ============ 交易执行 ============ async def set_leverage_and_margin(session: aiohttp.ClientSession, symbol: str, leverage: int = 20): """设置杠杆和逐仓模式""" # 设置逐仓 await binance_request(session, "POST", "/fapi/v1/marginType", { "symbol": symbol, "marginType": "ISOLATED" }) # 设置杠杆 await binance_request(session, "POST", "/fapi/v1/leverage", { "symbol": symbol, "leverage": leverage }) logger.info(f"[{symbol}] 设置逐仓 + {leverage}x杠杆") async def place_market_order(session: aiohttp.ClientSession, symbol: str, side: str, quantity: float): """市价开仓""" prec = SYMBOL_PRECISION.get(symbol, {"qty": 3}) qty_str = f"{quantity:.{prec['qty']}f}" params = { "symbol": symbol, "side": side, "type": "MARKET", "quantity": qty_str, } t_submit = time.time() * 1000 data, status = await binance_request(session, "POST", "/fapi/v1/order", params) t_ack = time.time() * 1000 if status == 200: logger.info(f"[{symbol}] ✅ 市价{side} {qty_str}个 | orderId={data.get('orderId')} | 延迟={t_ack-t_submit:.0f}ms") return data, status, t_submit, t_ack async def place_stop_order(session: aiohttp.ClientSession, symbol: str, side: str, stop_price: float, quantity: float, order_type: str = "STOP_MARKET"): """挂止损/止盈单""" prec = SYMBOL_PRECISION.get(symbol, {"qty": 3, "price": 2}) qty_str = f"{quantity:.{prec['qty']}f}" price_str = f"{stop_price:.{prec['price']}f}" params = { "symbol": symbol, "side": side, "type": order_type, "stopPrice": price_str, "quantity": qty_str, "reduceOnly": "true", } data, status = await binance_request(session, "POST", "/fapi/v1/order", params) if status == 200: logger.info(f"[{symbol}] 📌 挂{order_type} {side} @ {price_str} qty={qty_str} | orderId={data.get('orderId')}") return data, status async def cancel_all_orders(session: aiohttp.ClientSession, symbol: str): """取消某币种所有挂单""" data, status = await binance_request(session, "DELETE", "/fapi/v1/allOpenOrders", {"symbol": symbol}) if status == 200: logger.info(f"[{symbol}] 🗑 已取消所有挂单") return data, status async def get_position(session: aiohttp.ClientSession, symbol: str): """查询当前持仓""" data, status = await binance_request(session, "GET", "/fapi/v2/positionRisk", {"symbol": symbol}) if status == 200 and isinstance(data, list): for pos in data: if pos.get("symbol") == symbol and float(pos.get("positionAmt", 0)) != 0: return pos return None async def get_account_balance(session: aiohttp.ClientSession): """查询账户余额""" data, status = await binance_request(session, "GET", "/fapi/v2/balance") if status == 200 and isinstance(data, list): for asset in data: if asset.get("asset") == "USDT": return float(asset.get("availableBalance", 0)) return 0 # ============ 核心:开仓流程 ============ async def execute_entry(session: aiohttp.ClientSession, signal: dict, db_conn): """ 完整开仓流程: 1. 检查余额和持仓数 2. 计算仓位大小 3. 市价开仓 4. 挂SL + TP1 + TP2保护单 5. 写live_trades表 6. 记录延迟指标 """ symbol = signal["symbol"] direction = signal["direction"] score = signal["score"] tier = signal.get("tier", "standard") strategy = signal["strategy"] risk_distance = signal["risk_distance"] sl_price = signal["sl_price"] tp1_price = signal["tp1_price"] tp2_price = signal["tp2_price"] atr = signal.get("atr", 0) signal_ts = signal["signal_ts"] signal_id = signal.get("signal_id") factors = signal.get("factors") t_signal = signal_ts # 信号时间戳(ms) # 1. 检查余额 balance = await get_account_balance(session) if balance < RISK_PER_TRADE_USD * 2: logger.warning(f"[{symbol}] ❌ 余额不足: ${balance:.2f} < ${RISK_PER_TRADE_USD * 2}") return None # 2. 检查持仓数 cur = db_conn.cursor() cur.execute("SELECT count(*) FROM live_trades WHERE strategy=%s AND status='active'", (strategy,)) active_count = cur.fetchone()[0] if active_count >= MAX_POSITIONS: logger.warning(f"[{symbol}] ❌ 已达最大持仓数 {MAX_POSITIONS}") return None # 3. 检查是否已有同币种同方向持仓 cur.execute("SELECT id FROM live_trades WHERE symbol=%s AND strategy=%s AND status='active'", (symbol, strategy)) if cur.fetchone(): logger.info(f"[{symbol}] ⏭ 已有活跃持仓,跳过") return None # 4. 设置杠杆和逐仓 await set_leverage_and_margin(session, symbol) # 5. 计算仓位 # position_size(个) = risk_usd / risk_distance qty = RISK_PER_TRADE_USD / risk_distance prec = SYMBOL_PRECISION.get(symbol, {"qty": 3}) qty = round(qty, prec["qty"]) # 检查最小名义值 # 需要当前价来估算 entry_side = "BUY" if direction == "LONG" else "SELL" # 6. 市价开仓 t_before_order = time.time() * 1000 order_data, order_status, t_submit, t_ack = await place_market_order(session, symbol, entry_side, qty) if order_status != 200: logger.error(f"[{symbol}] ❌ 开仓失败: {order_data}") return None # 解析成交信息 order_id = str(order_data.get("orderId", "")) fill_price = float(order_data.get("avgPrice", 0)) if fill_price == 0: fill_price = float(order_data.get("price", 0)) t_fill = time.time() * 1000 signal_to_order_ms = int(t_submit - t_signal) order_to_fill_ms = int(t_fill - t_submit) # 计算滑点 signal_price = signal.get("signal_price", fill_price) if signal_price > 0: if direction == "LONG": slippage_bps = (fill_price - signal_price) / signal_price * 10000 else: slippage_bps = (signal_price - fill_price) / signal_price * 10000 else: slippage_bps = 0 logger.info(f"[{symbol}] 📊 成交价={fill_price} | 信号价={signal_price} | 滑点={slippage_bps:.1f}bps | 信号→下单={signal_to_order_ms}ms | 下单→成交={order_to_fill_ms}ms") # 7. 挂保护单(SL + TP1半仓 + TP2半仓) close_side = "SELL" if direction == "LONG" else "BUY" half_qty = round(qty / 2, prec["qty"]) other_half = round(qty - half_qty, prec["qty"]) # SL - 全仓 t_before_sl = time.time() * 1000 sl_data, sl_status = await place_stop_order(session, symbol, close_side, sl_price, qty, "STOP_MARKET") t_after_sl = time.time() * 1000 protection_gap_ms = int(t_after_sl - t_fill) if sl_status != 200: logger.error(f"[{symbol}] ⚠️ SL挂单失败! 裸奔中! data={sl_data}") # TODO: 自动补挂逻辑 # TP1 - 半仓 tp1_type = "TAKE_PROFIT_MARKET" await place_stop_order(session, symbol, close_side, tp1_price, half_qty, tp1_type) # TP2 - 半仓 await place_stop_order(session, symbol, close_side, tp2_price, other_half, tp1_type) logger.info(f"[{symbol}] 🛡 保护单已挂 | SL={sl_price} TP1={tp1_price}(半仓) TP2={tp2_price}(半仓) | 裸奔={protection_gap_ms}ms") # 8. 写DB cur.execute(""" INSERT INTO live_trades ( symbol, strategy, direction, entry_price, entry_ts, sl_price, tp1_price, tp2_price, score, tier, status, risk_distance, atr_at_entry, score_factors, signal_id, binance_order_id, fill_price, slippage_bps, protection_gap_ms, signal_to_order_ms, order_to_fill_ms ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'active', %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ) RETURNING id """, ( symbol, strategy, direction, fill_price, int(t_fill), sl_price, tp1_price, tp2_price, score, tier, risk_distance, atr, json.dumps(factors) if factors else None, signal_id, order_id, fill_price, round(slippage_bps, 2), protection_gap_ms, signal_to_order_ms, order_to_fill_ms, )) trade_id = cur.fetchone()[0] db_conn.commit() logger.info(f"[{symbol}] ✅ 实盘开仓完成 | trade_id={trade_id} | {direction} @ {fill_price} | score={score} | 策略={strategy}") return trade_id # ============ 信号监听 ============ def get_db_connection(): """获取DB连接""" conn = psycopg2.connect(**DB_CONFIG) conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) return conn def fetch_pending_signals(conn): """查询未处理的新信号""" cur = conn.cursor() # 查最近60秒内的新信号(score>=阈值、策略匹配、未被live_executor处理过) strategies_str = ",".join(f"'{s}'" for s in ENABLED_STRATEGIES) cur.execute(f""" SELECT si.id, si.symbol, si.signal, si.score, si.ts, si.factors, si.strategy, si.price FROM signal_indicators si WHERE si.signal IS NOT NULL AND si.signal != '' AND si.strategy IN ({strategies_str}) AND si.ts > extract(epoch from now()) * 1000 - 60000 AND NOT EXISTS ( SELECT 1 FROM live_trades lt WHERE lt.signal_id = si.id AND lt.strategy = si.strategy ) ORDER BY si.ts DESC """) rows = cur.fetchall() signals = [] for row in rows: factors = row[5] if isinstance(factors, str): try: factors = json.loads(factors) except: factors = None signals.append({ "signal_id": row[0], "symbol": row[1], "direction": row[2], "score": row[3], "signal_ts": row[4], "factors": factors, "strategy": row[6], "signal_price": float(row[7]) if row[7] else 0, }) return signals def enrich_signal_with_trade_params(signal: dict, conn): """从策略配置计算TP/SL参数""" symbol = signal["symbol"] strategy = signal["strategy"] direction = signal["direction"] # 读策略配置JSON config_map = { "v51_baseline": {"sl_multiplier": 1.4, "tp1_multiplier": 1.05, "tp2_multiplier": 2.1}, "v52_8signals": {"sl_multiplier": 2.1, "tp1_multiplier": 1.4, "tp2_multiplier": 3.15}, } cfg = config_map.get(strategy) if not cfg: logger.warning(f"[{symbol}] 未知策略: {strategy}") return False # 从signal_indicators读ATR和价格 cur = conn.cursor() cur.execute(""" SELECT atr_5m, price FROM signal_indicators WHERE id = %s """, (signal["signal_id"],)) row = cur.fetchone() if not row or not row[0]: logger.warning(f"[{symbol}] signal_id={signal['signal_id']} 无ATR数据") return False atr = float(row[0]) price = float(row[1]) signal["atr"] = atr signal["signal_price"] = price # 计算SL/TP(直接用ATR,不乘0.7) risk_distance = cfg["sl_multiplier"] * atr signal["risk_distance"] = risk_distance if direction == "LONG": signal["sl_price"] = price - cfg["sl_multiplier"] * atr signal["tp1_price"] = price + cfg["tp1_multiplier"] * atr signal["tp2_price"] = price + cfg["tp2_multiplier"] * atr else: # SHORT signal["sl_price"] = price + cfg["sl_multiplier"] * atr signal["tp1_price"] = price - cfg["tp1_multiplier"] * atr signal["tp2_price"] = price - cfg["tp2_multiplier"] * atr return True # ============ 主循环 ============ async def main(): """主循环:监听PG NOTIFY + 定时轮询fallback""" logger.info("=" * 60) logger.info(f"🚀 Live Executor 启动 | 环境={TRADE_ENV} | 策略={ENABLED_STRATEGIES}") logger.info(f" 风险/笔=${RISK_PER_TRADE_USD} | 最大持仓={MAX_POSITIONS}") logger.info(f" API端点={BASE_URL}") logger.info("=" * 60) load_api_keys() # 测试API连通性 async with aiohttp.ClientSession() as http_session: balance = await get_account_balance(http_session) logger.info(f"💰 账户余额: ${balance:.2f} USDT") if balance <= 0: logger.error("❌ 无法获取余额或余额为0,请检查API Key") return # DB连接(用于LISTEN) listen_conn = get_db_connection() cur = listen_conn.cursor() cur.execute("LISTEN new_signal;") logger.info("👂 已注册PG LISTEN new_signal") # 工作DB连接 work_conn = psycopg2.connect(**DB_CONFIG) async with aiohttp.ClientSession() as http_session: while True: try: # 检查PG NOTIFY(非阻塞,超时1秒) if listen_conn.poll() == psycopg2.extensions.POLL_OK: while listen_conn.notifies: notify = listen_conn.notifies.pop(0) logger.info(f"📡 收到NOTIFY: {notify.payload}") # 获取待处理信号(NOTIFY + 轮询双保险) signals = fetch_pending_signals(work_conn) for sig in signals: # 补充TP/SL参数 if not enrich_signal_with_trade_params(sig, work_conn): continue logger.info(f"[{sig['symbol']}] 🎯 新信号: {sig['direction']} score={sig['score']} strategy={sig['strategy']}") # 执行开仓 trade_id = await execute_entry(http_session, sig, work_conn) if trade_id: logger.info(f"[{sig['symbol']}] ✅ trade_id={trade_id} 开仓成功") await asyncio.sleep(1) # 1秒轮询作为fallback except KeyboardInterrupt: logger.info("🛑 收到退出信号") break except Exception as e: logger.error(f"❌ 主循环异常: {e}", exc_info=True) await asyncio.sleep(5) listen_conn.close() work_conn.close() logger.info("Live Executor 已停止") if __name__ == "__main__": asyncio.run(main())