diff --git a/backend/live_executor.py b/backend/live_executor.py new file mode 100644 index 0000000..557f8c0 --- /dev/null +++ b/backend/live_executor.py @@ -0,0 +1,550 @@ +#!/usr/bin/env python3 +""" +Live Executor - 实盘交易执行模块 +监听PG NOTIFY接收新信号,调币安API执行交易 + +架构: + signal_engine.py → NOTIFY new_signal → live_executor.py → Binance API + +不影响模拟盘任何进程。 +""" + +import os +import sys +import json +import time +import logging +import asyncio +import hashlib +import hmac +from urllib.parse import urlencode + +import psycopg2 +import psycopg2.extensions +import aiohttp + +# ============ 配置 ============ + +# 环境:testnet / production +TRADE_ENV = os.getenv("TRADE_ENV", "testnet") + +# 币安API端点 +BINANCE_ENDPOINTS = { + "testnet": "https://testnet.binancefuture.com", + "production": "https://fapi.binance.com", +} +BASE_URL = BINANCE_ENDPOINTS[TRADE_ENV] + +# 数据库 +DB_CONFIG = { + "host": os.getenv("DB_HOST", "10.106.0.3"), + "port": int(os.getenv("DB_PORT", "5432")), + "dbname": os.getenv("DB_NAME", "arb_engine"), + "user": os.getenv("DB_USER", "arb"), + "password": os.getenv("DB_PASSWORD", "arb_engine_2026"), +} + +# 策略 +ENABLED_STRATEGIES = json.loads(os.getenv("LIVE_STRATEGIES", '["v52_8signals"]')) + +# 风险参数 +RISK_PER_TRADE_USD = float(os.getenv("RISK_PER_TRADE_USD", "2")) # 测试网$2=1R +MAX_POSITIONS = int(os.getenv("MAX_POSITIONS", "4")) +FEE_RATE = 0.0005 # Taker 0.05% + +# 币种精度(币安要求) +SYMBOL_PRECISION = { + "BTCUSDT": {"qty": 3, "price": 1, "min_notional": 100}, + "ETHUSDT": {"qty": 3, "price": 2, "min_notional": 20}, + "XRPUSDT": {"qty": 0, "price": 4, "min_notional": 5}, + "SOLUSDT": {"qty": 2, "price": 2, "min_notional": 5}, +} + +# 日志 +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", +) +logger = logging.getLogger("live-executor") + +# ============ API Key管理 ============ + +_api_key = None +_secret_key = None + + +def load_api_keys(): + """从GCP Secret Manager或环境变量加载API Key""" + global _api_key, _secret_key + + # 优先环境变量(本地开发/测试用) + _api_key = os.getenv("BINANCE_API_KEY") + _secret_key = os.getenv("BINANCE_SECRET_KEY") + + if _api_key and _secret_key: + logger.info(f"API Key loaded from env (first 10: {_api_key[:10]}...)") + return + + # 从GCP Secret Manager读取 + try: + from google.cloud import secretmanager + client = secretmanager.SecretManagerServiceClient() + project = os.getenv("GCP_PROJECT", "gen-lang-client-0835616737") + + prefix = "binance-testnet" if TRADE_ENV == "testnet" else "binance-live" + api_key_name = f"projects/{project}/secrets/{prefix}-api-key/versions/latest" + secret_key_name = f"projects/{project}/secrets/{prefix}-secret-key/versions/latest" + + _api_key = client.access_secret_version(name=api_key_name).payload.data.decode() + _secret_key = client.access_secret_version(name=secret_key_name).payload.data.decode() + logger.info(f"API Key loaded from GCP Secret Manager ({prefix}, first 10: {_api_key[:10]}...)") + except Exception as e: + logger.error(f"Failed to load API keys: {e}") + sys.exit(1) + + +# ============ 币安API工具 ============ + +def sign_params(params: dict) -> dict: + """HMAC SHA256签名""" + params["timestamp"] = int(time.time() * 1000) + query_string = urlencode(params) + signature = hmac.new( + _secret_key.encode(), query_string.encode(), hashlib.sha256 + ).hexdigest() + params["signature"] = signature + return params + + +async def binance_request(session: aiohttp.ClientSession, method: str, path: str, params: dict = None, signed: bool = True): + """发送币安API请求""" + url = f"{BASE_URL}{path}" + headers = {"X-MBX-APIKEY": _api_key} + + if params is None: + params = {} + if signed: + params = sign_params(params) + + try: + if method == "GET": + async with session.get(url, params=params, headers=headers) as resp: + data = await resp.json() + if resp.status != 200: + logger.error(f"Binance API error: {resp.status} {data}") + return data, resp.status + elif method == "POST": + async with session.post(url, params=params, headers=headers) as resp: + data = await resp.json() + if resp.status != 200: + logger.error(f"Binance API error: {resp.status} {data}") + return data, resp.status + elif method == "DELETE": + async with session.delete(url, params=params, headers=headers) as resp: + data = await resp.json() + if resp.status != 200: + logger.error(f"Binance API error: {resp.status} {data}") + return data, resp.status + except Exception as e: + logger.error(f"Binance request failed: {e}") + return {"error": str(e)}, 500 + + +# ============ 交易执行 ============ + +async def set_leverage_and_margin(session: aiohttp.ClientSession, symbol: str, leverage: int = 20): + """设置杠杆和逐仓模式""" + # 设置逐仓 + await binance_request(session, "POST", "/fapi/v1/marginType", { + "symbol": symbol, "marginType": "ISOLATED" + }) + # 设置杠杆 + await binance_request(session, "POST", "/fapi/v1/leverage", { + "symbol": symbol, "leverage": leverage + }) + logger.info(f"[{symbol}] 设置逐仓 + {leverage}x杠杆") + + +async def place_market_order(session: aiohttp.ClientSession, symbol: str, side: str, quantity: float): + """市价开仓""" + prec = SYMBOL_PRECISION.get(symbol, {"qty": 3}) + qty_str = f"{quantity:.{prec['qty']}f}" + + params = { + "symbol": symbol, + "side": side, + "type": "MARKET", + "quantity": qty_str, + } + + t_submit = time.time() * 1000 + data, status = await binance_request(session, "POST", "/fapi/v1/order", params) + t_ack = time.time() * 1000 + + if status == 200: + logger.info(f"[{symbol}] ✅ 市价{side} {qty_str}个 | orderId={data.get('orderId')} | 延迟={t_ack-t_submit:.0f}ms") + return data, status, t_submit, t_ack + + +async def place_stop_order(session: aiohttp.ClientSession, symbol: str, side: str, stop_price: float, quantity: float, order_type: str = "STOP_MARKET"): + """挂止损/止盈单""" + prec = SYMBOL_PRECISION.get(symbol, {"qty": 3, "price": 2}) + qty_str = f"{quantity:.{prec['qty']}f}" + price_str = f"{stop_price:.{prec['price']}f}" + + params = { + "symbol": symbol, + "side": side, + "type": order_type, + "stopPrice": price_str, + "quantity": qty_str, + "reduceOnly": "true", + } + + data, status = await binance_request(session, "POST", "/fapi/v1/order", params) + if status == 200: + logger.info(f"[{symbol}] 📌 挂{order_type} {side} @ {price_str} qty={qty_str} | orderId={data.get('orderId')}") + return data, status + + +async def cancel_all_orders(session: aiohttp.ClientSession, symbol: str): + """取消某币种所有挂单""" + data, status = await binance_request(session, "DELETE", "/fapi/v1/allOpenOrders", {"symbol": symbol}) + if status == 200: + logger.info(f"[{symbol}] 🗑 已取消所有挂单") + return data, status + + +async def get_position(session: aiohttp.ClientSession, symbol: str): + """查询当前持仓""" + data, status = await binance_request(session, "GET", "/fapi/v2/positionRisk", {"symbol": symbol}) + if status == 200 and isinstance(data, list): + for pos in data: + if pos.get("symbol") == symbol and float(pos.get("positionAmt", 0)) != 0: + return pos + return None + + +async def get_account_balance(session: aiohttp.ClientSession): + """查询账户余额""" + data, status = await binance_request(session, "GET", "/fapi/v2/balance") + if status == 200 and isinstance(data, list): + for asset in data: + if asset.get("asset") == "USDT": + return float(asset.get("availableBalance", 0)) + return 0 + + +# ============ 核心:开仓流程 ============ + +async def execute_entry(session: aiohttp.ClientSession, signal: dict, db_conn): + """ + 完整开仓流程: + 1. 检查余额和持仓数 + 2. 计算仓位大小 + 3. 市价开仓 + 4. 挂SL + TP1 + TP2保护单 + 5. 写live_trades表 + 6. 记录延迟指标 + """ + symbol = signal["symbol"] + direction = signal["direction"] + score = signal["score"] + tier = signal.get("tier", "standard") + strategy = signal["strategy"] + risk_distance = signal["risk_distance"] + sl_price = signal["sl_price"] + tp1_price = signal["tp1_price"] + tp2_price = signal["tp2_price"] + atr = signal.get("atr", 0) + signal_ts = signal["signal_ts"] + signal_id = signal.get("signal_id") + factors = signal.get("factors") + + t_signal = signal_ts # 信号时间戳(ms) + + # 1. 检查余额 + balance = await get_account_balance(session) + if balance < RISK_PER_TRADE_USD * 2: + logger.warning(f"[{symbol}] ❌ 余额不足: ${balance:.2f} < ${RISK_PER_TRADE_USD * 2}") + return None + + # 2. 检查持仓数 + cur = db_conn.cursor() + cur.execute("SELECT count(*) FROM live_trades WHERE strategy=%s AND status='active'", (strategy,)) + active_count = cur.fetchone()[0] + if active_count >= MAX_POSITIONS: + logger.warning(f"[{symbol}] ❌ 已达最大持仓数 {MAX_POSITIONS}") + return None + + # 3. 检查是否已有同币种同方向持仓 + cur.execute("SELECT id FROM live_trades WHERE symbol=%s AND strategy=%s AND status='active'", (symbol, strategy)) + if cur.fetchone(): + logger.info(f"[{symbol}] ⏭ 已有活跃持仓,跳过") + return None + + # 4. 设置杠杆和逐仓 + await set_leverage_and_margin(session, symbol) + + # 5. 计算仓位 + # position_size(个) = risk_usd / risk_distance + qty = RISK_PER_TRADE_USD / risk_distance + prec = SYMBOL_PRECISION.get(symbol, {"qty": 3}) + qty = round(qty, prec["qty"]) + + # 检查最小名义值 + # 需要当前价来估算 + entry_side = "BUY" if direction == "LONG" else "SELL" + + # 6. 市价开仓 + t_before_order = time.time() * 1000 + order_data, order_status, t_submit, t_ack = await place_market_order(session, symbol, entry_side, qty) + + if order_status != 200: + logger.error(f"[{symbol}] ❌ 开仓失败: {order_data}") + return None + + # 解析成交信息 + order_id = str(order_data.get("orderId", "")) + fill_price = float(order_data.get("avgPrice", 0)) + if fill_price == 0: + fill_price = float(order_data.get("price", 0)) + t_fill = time.time() * 1000 + + signal_to_order_ms = int(t_submit - t_signal) + order_to_fill_ms = int(t_fill - t_submit) + + # 计算滑点 + signal_price = signal.get("signal_price", fill_price) + if signal_price > 0: + if direction == "LONG": + slippage_bps = (fill_price - signal_price) / signal_price * 10000 + else: + slippage_bps = (signal_price - fill_price) / signal_price * 10000 + else: + slippage_bps = 0 + + logger.info(f"[{symbol}] 📊 成交价={fill_price} | 信号价={signal_price} | 滑点={slippage_bps:.1f}bps | 信号→下单={signal_to_order_ms}ms | 下单→成交={order_to_fill_ms}ms") + + # 7. 挂保护单(SL + TP1半仓 + TP2半仓) + close_side = "SELL" if direction == "LONG" else "BUY" + half_qty = round(qty / 2, prec["qty"]) + other_half = round(qty - half_qty, prec["qty"]) + + # SL - 全仓 + t_before_sl = time.time() * 1000 + sl_data, sl_status = await place_stop_order(session, symbol, close_side, sl_price, qty, "STOP_MARKET") + t_after_sl = time.time() * 1000 + protection_gap_ms = int(t_after_sl - t_fill) + + if sl_status != 200: + logger.error(f"[{symbol}] ⚠️ SL挂单失败! 裸奔中! data={sl_data}") + # TODO: 自动补挂逻辑 + + # TP1 - 半仓 + tp1_type = "TAKE_PROFIT_MARKET" + await place_stop_order(session, symbol, close_side, tp1_price, half_qty, tp1_type) + + # TP2 - 半仓 + await place_stop_order(session, symbol, close_side, tp2_price, other_half, tp1_type) + + logger.info(f"[{symbol}] 🛡 保护单已挂 | SL={sl_price} TP1={tp1_price}(半仓) TP2={tp2_price}(半仓) | 裸奔={protection_gap_ms}ms") + + # 8. 写DB + cur.execute(""" + INSERT INTO live_trades ( + symbol, strategy, direction, entry_price, entry_ts, + sl_price, tp1_price, tp2_price, score, tier, status, + risk_distance, atr_at_entry, score_factors, signal_id, + binance_order_id, fill_price, slippage_bps, + protection_gap_ms, signal_to_order_ms, order_to_fill_ms + ) VALUES ( + %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, 'active', + %s, %s, %s, %s, + %s, %s, %s, + %s, %s, %s + ) RETURNING id + """, ( + symbol, strategy, direction, fill_price, int(t_fill), + sl_price, tp1_price, tp2_price, score, tier, + risk_distance, atr, json.dumps(factors) if factors else None, signal_id, + order_id, fill_price, round(slippage_bps, 2), + protection_gap_ms, signal_to_order_ms, order_to_fill_ms, + )) + trade_id = cur.fetchone()[0] + db_conn.commit() + + logger.info(f"[{symbol}] ✅ 实盘开仓完成 | trade_id={trade_id} | {direction} @ {fill_price} | score={score} | 策略={strategy}") + return trade_id + + +# ============ 信号监听 ============ + +def get_db_connection(): + """获取DB连接""" + conn = psycopg2.connect(**DB_CONFIG) + conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + return conn + + +def fetch_pending_signals(conn): + """查询未处理的新信号""" + cur = conn.cursor() + # 查最近60秒内的新信号(score>=阈值、策略匹配、未被live_executor处理过) + strategies_str = ",".join(f"'{s}'" for s in ENABLED_STRATEGIES) + cur.execute(f""" + SELECT si.id, si.symbol, si.signal, si.score, si.ts, si.factors, si.strategy, + si.price + FROM signal_indicators si + WHERE si.signal IS NOT NULL + AND si.signal != '' + AND si.strategy IN ({strategies_str}) + AND si.ts > extract(epoch from now()) * 1000 - 60000 + AND NOT EXISTS ( + SELECT 1 FROM live_trades lt + WHERE lt.signal_id = si.id AND lt.strategy = si.strategy + ) + ORDER BY si.ts DESC + """) + rows = cur.fetchall() + signals = [] + for row in rows: + factors = row[5] + if isinstance(factors, str): + try: + factors = json.loads(factors) + except: + factors = None + + signals.append({ + "signal_id": row[0], + "symbol": row[1], + "direction": row[2], + "score": row[3], + "signal_ts": row[4], + "factors": factors, + "strategy": row[6], + "signal_price": float(row[7]) if row[7] else 0, + }) + return signals + + +def enrich_signal_with_trade_params(signal: dict, conn): + """从策略配置计算TP/SL参数""" + symbol = signal["symbol"] + strategy = signal["strategy"] + direction = signal["direction"] + + # 读策略配置JSON + config_map = { + "v51_baseline": {"sl_multiplier": 1.4, "tp1_multiplier": 1.05, "tp2_multiplier": 2.1}, + "v52_8signals": {"sl_multiplier": 2.1, "tp1_multiplier": 1.4, "tp2_multiplier": 3.15}, + } + cfg = config_map.get(strategy) + if not cfg: + logger.warning(f"[{symbol}] 未知策略: {strategy}") + return False + + # 从signal_indicators读ATR和价格 + cur = conn.cursor() + cur.execute(""" + SELECT atr_5m, price FROM signal_indicators + WHERE id = %s + """, (signal["signal_id"],)) + row = cur.fetchone() + if not row or not row[0]: + logger.warning(f"[{symbol}] signal_id={signal['signal_id']} 无ATR数据") + return False + + atr = float(row[0]) + price = float(row[1]) + signal["atr"] = atr + signal["signal_price"] = price + + # 计算SL/TP(直接用ATR,不乘0.7) + risk_distance = cfg["sl_multiplier"] * atr + signal["risk_distance"] = risk_distance + + if direction == "LONG": + signal["sl_price"] = price - cfg["sl_multiplier"] * atr + signal["tp1_price"] = price + cfg["tp1_multiplier"] * atr + signal["tp2_price"] = price + cfg["tp2_multiplier"] * atr + else: # SHORT + signal["sl_price"] = price + cfg["sl_multiplier"] * atr + signal["tp1_price"] = price - cfg["tp1_multiplier"] * atr + signal["tp2_price"] = price - cfg["tp2_multiplier"] * atr + + return True + + +# ============ 主循环 ============ + +async def main(): + """主循环:监听PG NOTIFY + 定时轮询fallback""" + logger.info("=" * 60) + logger.info(f"🚀 Live Executor 启动 | 环境={TRADE_ENV} | 策略={ENABLED_STRATEGIES}") + logger.info(f" 风险/笔=${RISK_PER_TRADE_USD} | 最大持仓={MAX_POSITIONS}") + logger.info(f" API端点={BASE_URL}") + logger.info("=" * 60) + + load_api_keys() + + # 测试API连通性 + async with aiohttp.ClientSession() as http_session: + balance = await get_account_balance(http_session) + logger.info(f"💰 账户余额: ${balance:.2f} USDT") + + if balance <= 0: + logger.error("❌ 无法获取余额或余额为0,请检查API Key") + return + + # DB连接(用于LISTEN) + listen_conn = get_db_connection() + cur = listen_conn.cursor() + cur.execute("LISTEN new_signal;") + logger.info("👂 已注册PG LISTEN new_signal") + + # 工作DB连接 + work_conn = psycopg2.connect(**DB_CONFIG) + + async with aiohttp.ClientSession() as http_session: + while True: + try: + # 检查PG NOTIFY(非阻塞,超时1秒) + if listen_conn.poll() == psycopg2.extensions.POLL_OK: + while listen_conn.notifies: + notify = listen_conn.notifies.pop(0) + logger.info(f"📡 收到NOTIFY: {notify.payload}") + + # 获取待处理信号(NOTIFY + 轮询双保险) + signals = fetch_pending_signals(work_conn) + + for sig in signals: + # 补充TP/SL参数 + if not enrich_signal_with_trade_params(sig, work_conn): + continue + + logger.info(f"[{sig['symbol']}] 🎯 新信号: {sig['direction']} score={sig['score']} strategy={sig['strategy']}") + + # 执行开仓 + trade_id = await execute_entry(http_session, sig, work_conn) + if trade_id: + logger.info(f"[{sig['symbol']}] ✅ trade_id={trade_id} 开仓成功") + + await asyncio.sleep(1) # 1秒轮询作为fallback + + except KeyboardInterrupt: + logger.info("🛑 收到退出信号") + break + except Exception as e: + logger.error(f"❌ 主循环异常: {e}", exc_info=True) + await asyncio.sleep(5) + + listen_conn.close() + work_conn.close() + logger.info("Live Executor 已停止") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/backend/signal_engine.py b/backend/signal_engine.py index b974381..7605339 100644 --- a/backend/signal_engine.py +++ b/backend/signal_engine.py @@ -697,6 +697,9 @@ def save_indicator(ts: int, symbol: str, result: dict, strategy: str = "v52_8sig result["atr"], result["atr_pct"], result["vwap"], result["price"], result["p95"], result["p99"], result["score"], result.get("signal"), factors_json) ) + # 有信号时通知live_executor + if result.get("signal"): + cur.execute("NOTIFY new_signal, %s", (f"{symbol}:{strategy}:{result['signal']}:{result['score']}",)) conn.commit()