P0-1: JWT_SECRET生产环境强制配置,测试环境保留默认 P0-2: DB密码生产环境强制从env读,测试环境保留fallback P0-3: SL三次失败→查真实持仓→reduceOnly平仓→校验结果→写event P0-4: TP1后SL重挂失败则不推进tp1_hit状态,continue等下轮重试 P0-5: 超时自动平仓用SYMBOL_QTY_PRECISION格式化+校验结果 P0-6: 同币种去重改为不区分策略(币安单向模式共享净仓位) P1-1: 手续费窗口entry_ts-200→+200(避免纳入开仓前成交) 额外: 模拟盘*200和实盘*2硬编码→从配置动态读取
701 lines
26 KiB
Python
701 lines
26 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Live Executor - 实盘交易执行模块
|
||
监听PG NOTIFY接收新信号,调币安API执行交易
|
||
|
||
架构:
|
||
signal_engine.py → NOTIFY new_signal → live_executor.py → Binance API
|
||
|
||
不影响模拟盘任何进程。
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
from pathlib import Path
|
||
try:
|
||
from dotenv import load_dotenv; load_dotenv(Path(__file__).parent / ".env")
|
||
except ImportError:
|
||
pass
|
||
import json
|
||
import time
|
||
import logging
|
||
import asyncio
|
||
import hashlib
|
||
import hmac
|
||
from urllib.parse import urlencode
|
||
|
||
import psycopg2
|
||
import psycopg2.extensions
|
||
import aiohttp
|
||
|
||
# ============ 配置 ============
|
||
|
||
# 环境:testnet / production
|
||
TRADE_ENV = os.getenv("TRADE_ENV", "testnet")
|
||
|
||
# 币安API端点
|
||
BINANCE_ENDPOINTS = {
|
||
"testnet": "https://testnet.binancefuture.com",
|
||
"production": "https://fapi.binance.com",
|
||
}
|
||
BASE_URL = BINANCE_ENDPOINTS[TRADE_ENV]
|
||
|
||
# 数据库
|
||
_DB_PASSWORD = os.getenv("DB_PASSWORD", "arb_engine_2026" if TRADE_ENV == "testnet" else "")
|
||
if not _DB_PASSWORD:
|
||
print("FATAL: DB_PASSWORD 未设置(生产环境必须配置)", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
DB_CONFIG = {
|
||
"host": os.getenv("DB_HOST", "10.106.0.3"),
|
||
"port": int(os.getenv("DB_PORT", "5432")),
|
||
"dbname": os.getenv("DB_NAME", "arb_engine"),
|
||
"user": os.getenv("DB_USER", "arb"),
|
||
"password": _DB_PASSWORD,
|
||
}
|
||
|
||
# 策略
|
||
ENABLED_STRATEGIES = json.loads(os.getenv("LIVE_STRATEGIES", '["v52_8signals"]'))
|
||
|
||
# 风险参数(默认值,会被DB live_config覆盖)
|
||
RISK_PER_TRADE_USD = float(os.getenv("RISK_PER_TRADE_USD", "2"))
|
||
MAX_POSITIONS = int(os.getenv("MAX_POSITIONS", "4"))
|
||
FEE_RATE = 0.0005 # Taker 0.05%
|
||
_config_cache = {}
|
||
_config_cache_ts = 0
|
||
|
||
def reload_live_config(conn):
|
||
"""从live_config表读取配置,每60秒刷新一次"""
|
||
global RISK_PER_TRADE_USD, MAX_POSITIONS, _config_cache, _config_cache_ts
|
||
now = time.time()
|
||
if now - _config_cache_ts < 60:
|
||
return
|
||
try:
|
||
cur = conn.cursor()
|
||
cur.execute("SELECT key, value FROM live_config")
|
||
for row in cur.fetchall():
|
||
_config_cache[row[0]] = row[1]
|
||
RISK_PER_TRADE_USD = float(_config_cache.get("risk_per_trade_usd", "2"))
|
||
MAX_POSITIONS = int(_config_cache.get("max_positions", "4"))
|
||
_config_cache_ts = now
|
||
logger.info(f"📋 配置刷新: 1R=${RISK_PER_TRADE_USD} | 最大持仓={MAX_POSITIONS}")
|
||
except Exception as e:
|
||
logger.warning(f"读取live_config失败: {e}")
|
||
|
||
# 币种精度(从共用配置导入)
|
||
from trade_config import SYMBOL_PRECISION
|
||
|
||
# 日志
|
||
from logging.handlers import RotatingFileHandler
|
||
_log_fmt = "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
|
||
logging.basicConfig(level=logging.INFO, format=_log_fmt)
|
||
logger = logging.getLogger("live-executor")
|
||
_fh = RotatingFileHandler("logs/live_executor.log", maxBytes=10*1024*1024, backupCount=5)
|
||
_fh.setFormatter(logging.Formatter(_log_fmt))
|
||
logger.addHandler(_fh)
|
||
|
||
# ============ API Key管理 ============
|
||
|
||
_api_key = None
|
||
_secret_key = None
|
||
|
||
|
||
def load_api_keys():
|
||
"""从GCP Secret Manager或环境变量加载API Key"""
|
||
global _api_key, _secret_key
|
||
|
||
# 优先环境变量(本地开发/测试用)
|
||
_api_key = os.getenv("BINANCE_API_KEY")
|
||
_secret_key = os.getenv("BINANCE_SECRET_KEY")
|
||
|
||
if _api_key and _secret_key:
|
||
logger.info(f"API Key loaded from env (first 10: {_api_key[:10]}...)")
|
||
return
|
||
|
||
# 从GCP Secret Manager读取
|
||
try:
|
||
from google.cloud import secretmanager
|
||
client = secretmanager.SecretManagerServiceClient()
|
||
project = os.getenv("GCP_PROJECT", "gen-lang-client-0835616737")
|
||
|
||
prefix = "binance-testnet" if TRADE_ENV == "testnet" else "binance-live"
|
||
api_key_name = f"projects/{project}/secrets/{prefix}-api-key/versions/latest"
|
||
secret_key_name = f"projects/{project}/secrets/{prefix}-secret-key/versions/latest"
|
||
|
||
_api_key = client.access_secret_version(name=api_key_name).payload.data.decode()
|
||
_secret_key = client.access_secret_version(name=secret_key_name).payload.data.decode()
|
||
logger.info(f"API Key loaded from GCP Secret Manager ({prefix}, first 10: {_api_key[:10]}...)")
|
||
except Exception as e:
|
||
logger.error(f"Failed to load API keys: {e}")
|
||
sys.exit(1)
|
||
|
||
|
||
# ============ 币安API工具 ============
|
||
|
||
def sign_params(params: dict) -> dict:
|
||
"""HMAC SHA256签名"""
|
||
params["timestamp"] = int(time.time() * 1000)
|
||
query_string = urlencode(params)
|
||
signature = hmac.new(
|
||
_secret_key.encode(), query_string.encode(), hashlib.sha256
|
||
).hexdigest()
|
||
params["signature"] = signature
|
||
return params
|
||
|
||
|
||
async def binance_request(session: aiohttp.ClientSession, method: str, path: str, params: dict = None, signed: bool = True):
|
||
"""发送币安API请求"""
|
||
url = f"{BASE_URL}{path}"
|
||
headers = {"X-MBX-APIKEY": _api_key}
|
||
|
||
if params is None:
|
||
params = {}
|
||
if signed:
|
||
params = sign_params(params)
|
||
|
||
try:
|
||
if method == "GET":
|
||
async with session.get(url, params=params, headers=headers) as resp:
|
||
data = await resp.json()
|
||
if resp.status != 200:
|
||
logger.error(f"Binance API error: {resp.status} {data}")
|
||
return data, resp.status
|
||
elif method == "POST":
|
||
async with session.post(url, params=params, headers=headers) as resp:
|
||
data = await resp.json()
|
||
if resp.status != 200:
|
||
logger.error(f"Binance API error: {resp.status} {data}")
|
||
return data, resp.status
|
||
elif method == "DELETE":
|
||
async with session.delete(url, params=params, headers=headers) as resp:
|
||
data = await resp.json()
|
||
if resp.status != 200:
|
||
logger.error(f"Binance API error: {resp.status} {data}")
|
||
return data, resp.status
|
||
except Exception as e:
|
||
logger.error(f"Binance request failed: {e}")
|
||
return {"error": str(e)}, 500
|
||
|
||
|
||
# ============ 交易执行 ============
|
||
|
||
async def set_leverage_and_margin(session: aiohttp.ClientSession, symbol: str, leverage: int = 20):
|
||
"""设置杠杆和逐仓模式"""
|
||
# 设置逐仓
|
||
await binance_request(session, "POST", "/fapi/v1/marginType", {
|
||
"symbol": symbol, "marginType": "ISOLATED"
|
||
})
|
||
# 设置杠杆
|
||
await binance_request(session, "POST", "/fapi/v1/leverage", {
|
||
"symbol": symbol, "leverage": leverage
|
||
})
|
||
logger.info(f"[{symbol}] 设置逐仓 + {leverage}x杠杆")
|
||
|
||
|
||
async def place_market_order(session: aiohttp.ClientSession, symbol: str, side: str, quantity: float):
|
||
"""市价开仓"""
|
||
prec = SYMBOL_PRECISION.get(symbol, {"qty": 3})
|
||
qty_str = f"{quantity:.{prec['qty']}f}"
|
||
|
||
params = {
|
||
"symbol": symbol,
|
||
"side": side,
|
||
"type": "MARKET",
|
||
"quantity": qty_str,
|
||
}
|
||
|
||
t_submit = time.time() * 1000
|
||
data, status = await binance_request(session, "POST", "/fapi/v1/order", params)
|
||
t_ack = time.time() * 1000
|
||
|
||
if status == 200:
|
||
logger.info(f"[{symbol}] ✅ 市价{side} {qty_str}个 | orderId={data.get('orderId')} | 延迟={t_ack-t_submit:.0f}ms")
|
||
return data, status, t_submit, t_ack
|
||
|
||
|
||
async def place_stop_order(session: aiohttp.ClientSession, symbol: str, side: str, stop_price: float, quantity: float, order_type: str = "STOP_MARKET"):
|
||
"""挂止损/止盈单"""
|
||
prec = SYMBOL_PRECISION.get(symbol, {"qty": 3, "price": 2})
|
||
qty_str = f"{quantity:.{prec['qty']}f}"
|
||
price_str = f"{stop_price:.{prec['price']}f}"
|
||
|
||
params = {
|
||
"symbol": symbol,
|
||
"side": side,
|
||
"type": order_type,
|
||
"stopPrice": price_str,
|
||
"quantity": qty_str,
|
||
"reduceOnly": "true",
|
||
}
|
||
|
||
# 先尝试传统endpoint
|
||
data, status = await binance_request(session, "POST", "/fapi/v1/order", params)
|
||
|
||
# 如果不支持(-4120),降级到Algo Order API
|
||
if status == 400 and isinstance(data, dict) and data.get("code") == -4120:
|
||
logger.info(f"[{symbol}] 传统endpoint不支持{order_type},切换Algo Order API")
|
||
algo_params = {
|
||
"symbol": symbol,
|
||
"side": side,
|
||
"type": order_type,
|
||
"stopPrice": price_str,
|
||
"quantity": qty_str,
|
||
"reduceOnly": "true",
|
||
}
|
||
data, status = await binance_request(session, "POST", "/fapi/v1/algo/order", algo_params)
|
||
if status == 200:
|
||
logger.info(f"[{symbol}] 📌 挂{order_type} {side} @ {price_str} qty={qty_str} | orderId={data.get('orderId')}")
|
||
return data, status
|
||
|
||
|
||
async def cancel_all_orders(session: aiohttp.ClientSession, symbol: str):
|
||
"""取消某币种所有挂单"""
|
||
data, status = await binance_request(session, "DELETE", "/fapi/v1/allOpenOrders", {"symbol": symbol})
|
||
if status == 200:
|
||
logger.info(f"[{symbol}] 🗑 已取消所有挂单")
|
||
return data, status
|
||
|
||
|
||
async def get_position(session: aiohttp.ClientSession, symbol: str):
|
||
"""查询当前持仓"""
|
||
data, status = await binance_request(session, "GET", "/fapi/v2/positionRisk", {"symbol": symbol})
|
||
if status == 200 and isinstance(data, list):
|
||
for pos in data:
|
||
if pos.get("symbol") == symbol and float(pos.get("positionAmt", 0)) != 0:
|
||
return pos
|
||
return None
|
||
|
||
|
||
async def get_account_balance(session: aiohttp.ClientSession):
|
||
"""查询账户余额"""
|
||
data, status = await binance_request(session, "GET", "/fapi/v2/balance")
|
||
if status == 200 and isinstance(data, list):
|
||
for asset in data:
|
||
if asset.get("asset") == "USDT":
|
||
return float(asset.get("availableBalance", 0))
|
||
return 0
|
||
|
||
|
||
# ============ 核心:开仓流程 ============
|
||
|
||
async def execute_entry(session: aiohttp.ClientSession, signal: dict, db_conn):
|
||
"""
|
||
完整开仓流程:
|
||
1. 检查余额和持仓数
|
||
2. 计算仓位大小
|
||
3. 市价开仓
|
||
4. 挂SL + TP1 + TP2保护单
|
||
5. 写live_trades表
|
||
6. 记录延迟指标
|
||
"""
|
||
symbol = signal["symbol"]
|
||
direction = signal["direction"]
|
||
score = signal["score"]
|
||
tier = signal.get("tier", "standard")
|
||
strategy = signal["strategy"]
|
||
risk_distance = signal["risk_distance"]
|
||
sl_price = signal["sl_price"]
|
||
tp1_price = signal["tp1_price"]
|
||
tp2_price = signal["tp2_price"]
|
||
atr = signal.get("atr", 0)
|
||
signal_ts = signal["signal_ts"]
|
||
signal_id = signal.get("signal_id")
|
||
factors = signal.get("factors")
|
||
|
||
t_signal = signal_ts # 信号时间戳(ms)
|
||
|
||
# 刷新配置(每60秒从DB读一次)
|
||
reload_live_config(db_conn)
|
||
|
||
# 0. 信号新鲜度检查(超过2秒弃仓)
|
||
SIGNAL_MAX_AGE_MS = 2000
|
||
signal_age_ms = time.time() * 1000 - t_signal
|
||
if signal_age_ms > SIGNAL_MAX_AGE_MS:
|
||
logger.warning(f"[{symbol}] ❌ 信号过期: {signal_age_ms:.0f}ms > {SIGNAL_MAX_AGE_MS}ms,弃仓")
|
||
_log_event(db_conn, "warn", "trade",
|
||
f"信号过期弃仓 {direction} {symbol} | age={signal_age_ms:.0f}ms | score={score}",
|
||
symbol, {"signal_age_ms": round(signal_age_ms), "score": score})
|
||
return None
|
||
|
||
# 0.5 检查风控状态(读risk_guard写的状态文件)
|
||
try:
|
||
with open("/tmp/risk_guard_state.json") as f:
|
||
risk_state = json.load(f)
|
||
if risk_state.get("block_new_entries"):
|
||
logger.warning(f"[{symbol}] ❌ 风控禁止新开仓: {risk_state.get('circuit_break_reason', '未知原因')}")
|
||
return None
|
||
if risk_state.get("reduce_only"):
|
||
logger.warning(f"[{symbol}] ❌ 只减仓模式,拒绝开仓")
|
||
return None
|
||
except FileNotFoundError:
|
||
pass # risk_guard还没启动,允许交易
|
||
except Exception as e:
|
||
logger.warning(f"[{symbol}] ⚠ 读取风控状态失败: {e},继续交易")
|
||
|
||
# 检查前端紧急操作指令
|
||
try:
|
||
with open("/tmp/risk_guard_emergency.json") as f:
|
||
emergency = json.load(f)
|
||
action = emergency.get("action")
|
||
if action in ("close_all", "block_new"):
|
||
logger.warning(f"[{symbol}] ❌ 紧急指令生效: {action},拒绝开仓")
|
||
return None
|
||
except FileNotFoundError:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
|
||
# 1. 检查余额
|
||
balance = await get_account_balance(session)
|
||
if balance < RISK_PER_TRADE_USD * 2:
|
||
logger.warning(f"[{symbol}] ❌ 余额不足: ${balance:.2f} < ${RISK_PER_TRADE_USD * 2}")
|
||
return None
|
||
|
||
# 2. 检查持仓数
|
||
cur = db_conn.cursor()
|
||
cur.execute("SELECT count(*) FROM live_trades WHERE strategy=%s AND status='active'", (strategy,))
|
||
active_count = cur.fetchone()[0]
|
||
if active_count >= MAX_POSITIONS:
|
||
logger.warning(f"[{symbol}] ❌ 已达最大持仓数 {MAX_POSITIONS}")
|
||
return None
|
||
|
||
# 3. 检查是否已有同币种持仓(不区分策略,币安单向模式下同币共享净仓位)
|
||
cur.execute("SELECT id, strategy FROM live_trades WHERE symbol=%s AND status IN ('active', 'tp1_hit')", (symbol,))
|
||
existing = cur.fetchone()
|
||
if existing:
|
||
logger.info(f"[{symbol}] ⏭ 已有活跃持仓(id={existing[0]}, strategy={existing[1]}),跳过")
|
||
return None
|
||
|
||
# 4. 设置杠杆和逐仓
|
||
await set_leverage_and_margin(session, symbol)
|
||
|
||
# 5. 计算仓位
|
||
# position_size(个) = risk_usd / risk_distance
|
||
qty = RISK_PER_TRADE_USD / risk_distance
|
||
prec = SYMBOL_PRECISION.get(symbol, {"qty": 3})
|
||
qty = round(qty, prec["qty"])
|
||
|
||
# 检查最小名义值
|
||
# 需要当前价来估算
|
||
entry_side = "BUY" if direction == "LONG" else "SELL"
|
||
|
||
# 6. 市价开仓
|
||
t_before_order = time.time() * 1000
|
||
order_data, order_status, t_submit, t_ack = await place_market_order(session, symbol, entry_side, qty)
|
||
|
||
if order_status != 200:
|
||
logger.error(f"[{symbol}] ❌ 开仓失败: {order_data}")
|
||
return None
|
||
|
||
# 解析成交信息
|
||
order_id = str(order_data.get("orderId", ""))
|
||
fill_price = float(order_data.get("avgPrice", 0))
|
||
if fill_price == 0:
|
||
fill_price = float(order_data.get("price", 0))
|
||
t_fill = time.time() * 1000
|
||
|
||
signal_to_order_ms = int(t_submit - t_signal)
|
||
order_to_fill_ms = int(t_fill - t_submit)
|
||
|
||
# 计算滑点
|
||
signal_price = signal.get("signal_price", fill_price)
|
||
if signal_price > 0:
|
||
if direction == "LONG":
|
||
slippage_bps = (fill_price - signal_price) / signal_price * 10000
|
||
else:
|
||
slippage_bps = (signal_price - fill_price) / signal_price * 10000
|
||
else:
|
||
slippage_bps = 0
|
||
|
||
logger.info(f"[{symbol}] 📊 成交价={fill_price} | 信号价={signal_price} | 滑点={slippage_bps:.1f}bps | 信号→下单={signal_to_order_ms}ms | 下单→成交={order_to_fill_ms}ms")
|
||
|
||
# 7. 挂保护单(SL + TP1半仓 + TP2半仓)
|
||
close_side = "SELL" if direction == "LONG" else "BUY"
|
||
half_qty = round(qty / 2, prec["qty"])
|
||
other_half = round(qty - half_qty, prec["qty"])
|
||
|
||
# SL - 全仓(失败重试2次,3次都失败则紧急平仓)
|
||
t_before_sl = time.time() * 1000
|
||
sl_data, sl_status = await place_stop_order(session, symbol, close_side, sl_price, qty, "STOP_MARKET")
|
||
if sl_status != 200:
|
||
for retry in range(2):
|
||
logger.warning(f"[{symbol}] SL挂单重试 {retry+1}/2...")
|
||
await asyncio.sleep(0.3)
|
||
sl_data, sl_status = await place_stop_order(session, symbol, close_side, sl_price, qty, "STOP_MARKET")
|
||
if sl_status == 200:
|
||
break
|
||
if sl_status != 200:
|
||
logger.error(f"[{symbol}] ❌ SL 3次全部失败,紧急reduceOnly平仓! data={sl_data}")
|
||
# 查真实持仓量,用reduceOnly市价平仓(避免反向开仓)
|
||
emergency_pos = await get_position(session, symbol)
|
||
if emergency_pos:
|
||
emergency_amt = abs(float(emergency_pos.get("positionAmt", 0)))
|
||
emergency_prec = SYMBOL_PRECISION.get(symbol, {"qty": 3})
|
||
emergency_qty_str = f"{emergency_amt:.{emergency_prec['qty']}f}"
|
||
close_data, close_status = await binance_request(session, "POST", "/fapi/v1/order", {
|
||
"symbol": symbol, "side": close_side, "type": "MARKET",
|
||
"quantity": emergency_qty_str, "reduceOnly": "true",
|
||
})
|
||
if close_status != 200:
|
||
logger.error(f"[{symbol}] ❌ 紧急平仓也失败! close_data={close_data}")
|
||
_log_event(db_conn, "critical", "trade",
|
||
f"SL失败后紧急平仓也失败,需人工介入", symbol,
|
||
{"sl_data": str(sl_data), "close_data": str(close_data)})
|
||
else:
|
||
logger.info(f"[{symbol}] ✅ 紧急reduceOnly平仓成功 qty={emergency_qty_str}")
|
||
_log_event(db_conn, "critical", "trade", f"SL挂单3次失败,已紧急reduceOnly平仓", symbol, {"sl_data": str(sl_data)})
|
||
else:
|
||
logger.warning(f"[{symbol}] SL失败但币安已无持仓,无需平仓")
|
||
_log_event(db_conn, "critical", "trade", f"SL挂单3次失败,但币安无持仓", symbol, {"sl_data": str(sl_data)})
|
||
return None
|
||
t_after_sl = time.time() * 1000
|
||
protection_gap_ms = int(t_after_sl - t_fill)
|
||
|
||
# TP1 - 半仓
|
||
tp1_type = "TAKE_PROFIT_MARKET"
|
||
await place_stop_order(session, symbol, close_side, tp1_price, half_qty, tp1_type)
|
||
|
||
# TP2 - 半仓
|
||
await place_stop_order(session, symbol, close_side, tp2_price, other_half, tp1_type)
|
||
|
||
logger.info(f"[{symbol}] 🛡 保护单已挂 | SL={sl_price} TP1={tp1_price}(半仓) TP2={tp2_price}(半仓) | 裸奔={protection_gap_ms}ms")
|
||
|
||
# 8. 写DB
|
||
cur.execute("""
|
||
INSERT INTO live_trades (
|
||
symbol, strategy, direction, entry_price, entry_ts,
|
||
sl_price, tp1_price, tp2_price, score, tier, status,
|
||
risk_distance, atr_at_entry, score_factors, signal_id,
|
||
binance_order_id, fill_price, slippage_bps,
|
||
protection_gap_ms, signal_to_order_ms, order_to_fill_ms,
|
||
qty
|
||
) VALUES (
|
||
%s, %s, %s, %s, %s,
|
||
%s, %s, %s, %s, %s, 'active',
|
||
%s, %s, %s, %s,
|
||
%s, %s, %s,
|
||
%s, %s, %s,
|
||
%s
|
||
) RETURNING id
|
||
""", (
|
||
symbol, strategy, direction, fill_price, int(t_fill),
|
||
sl_price, tp1_price, tp2_price, score, tier,
|
||
risk_distance, atr, json.dumps(factors) if factors else None, signal_id,
|
||
order_id, fill_price, round(slippage_bps, 2),
|
||
protection_gap_ms, signal_to_order_ms, order_to_fill_ms,
|
||
qty,
|
||
))
|
||
trade_id = cur.fetchone()[0]
|
||
db_conn.commit()
|
||
|
||
logger.info(f"[{symbol}] ✅ 实盘开仓完成 | trade_id={trade_id} | {direction} @ {fill_price} | score={score} | 策略={strategy}")
|
||
|
||
# 写event
|
||
_log_event(db_conn, "info", "trade", f"{direction} {symbol} @ {fill_price} | score={score} | 滑点={slippage_bps:.1f}bps", symbol,
|
||
{"trade_id": trade_id, "score": score, "fill_price": fill_price, "slippage_bps": round(slippage_bps, 2)})
|
||
|
||
return trade_id
|
||
|
||
|
||
# ============ 事件日志 ============
|
||
|
||
def _log_event(conn, level, category, message, symbol=None, detail=None):
|
||
"""写live_events表"""
|
||
try:
|
||
cur = conn.cursor()
|
||
cur.execute(
|
||
"INSERT INTO live_events (level, category, symbol, message, detail) VALUES (%s, %s, %s, %s, %s)",
|
||
(level, category, symbol, message, json.dumps(detail) if detail else None)
|
||
)
|
||
conn.commit()
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
# ============ 信号监听 ============
|
||
|
||
def get_db_connection():
|
||
"""获取DB连接"""
|
||
conn = psycopg2.connect(**DB_CONFIG)
|
||
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||
return conn
|
||
|
||
|
||
def fetch_pending_signals(conn):
|
||
"""查询未处理的新信号"""
|
||
cur = conn.cursor()
|
||
# 查最近60秒内的新信号(score>=阈值、策略匹配、未被live_executor处理过)
|
||
strategies_str = ",".join(f"'{s}'" for s in ENABLED_STRATEGIES)
|
||
cur.execute(f"""
|
||
SELECT si.id, si.symbol, si.signal, si.score, si.ts, si.factors, si.strategy,
|
||
si.price
|
||
FROM signal_indicators si
|
||
WHERE si.signal IS NOT NULL
|
||
AND si.signal != ''
|
||
AND si.strategy IN ({strategies_str})
|
||
AND si.ts > extract(epoch from now()) * 1000 - 60000
|
||
AND NOT EXISTS (
|
||
SELECT 1 FROM live_trades lt
|
||
WHERE lt.signal_id = si.id AND lt.strategy = si.strategy
|
||
)
|
||
ORDER BY si.ts DESC
|
||
""")
|
||
rows = cur.fetchall()
|
||
signals = []
|
||
for row in rows:
|
||
factors = row[5]
|
||
if isinstance(factors, str):
|
||
try:
|
||
factors = json.loads(factors)
|
||
except:
|
||
factors = None
|
||
|
||
signals.append({
|
||
"signal_id": row[0],
|
||
"symbol": row[1],
|
||
"direction": row[2],
|
||
"score": row[3],
|
||
"signal_ts": row[4],
|
||
"factors": factors,
|
||
"strategy": row[6],
|
||
"signal_price": float(row[7]) if row[7] else 0,
|
||
})
|
||
return signals
|
||
|
||
|
||
def enrich_signal_with_trade_params(signal: dict, conn):
|
||
"""从策略配置计算TP/SL参数"""
|
||
symbol = signal["symbol"]
|
||
strategy = signal["strategy"]
|
||
direction = signal["direction"]
|
||
|
||
# 读策略配置JSON
|
||
config_map = {
|
||
"v51_baseline": {"sl_multiplier": 1.4, "tp1_multiplier": 1.05, "tp2_multiplier": 2.1},
|
||
"v52_8signals": {"sl_multiplier": 2.1, "tp1_multiplier": 1.4, "tp2_multiplier": 3.15},
|
||
}
|
||
cfg = config_map.get(strategy)
|
||
if not cfg:
|
||
logger.warning(f"[{symbol}] 未知策略: {strategy}")
|
||
return False
|
||
|
||
# 从signal_indicators读ATR和价格
|
||
cur = conn.cursor()
|
||
cur.execute("""
|
||
SELECT atr_5m, price FROM signal_indicators
|
||
WHERE id = %s
|
||
""", (signal["signal_id"],))
|
||
row = cur.fetchone()
|
||
if not row or not row[0]:
|
||
logger.warning(f"[{symbol}] signal_id={signal['signal_id']} 无ATR数据")
|
||
return False
|
||
|
||
atr = float(row[0])
|
||
price = float(row[1])
|
||
signal["atr"] = atr
|
||
signal["signal_price"] = price
|
||
|
||
# 计算SL/TP(直接用ATR,不乘0.7)
|
||
risk_distance = cfg["sl_multiplier"] * atr
|
||
signal["risk_distance"] = risk_distance
|
||
|
||
if direction == "LONG":
|
||
signal["sl_price"] = price - cfg["sl_multiplier"] * atr
|
||
signal["tp1_price"] = price + cfg["tp1_multiplier"] * atr
|
||
signal["tp2_price"] = price + cfg["tp2_multiplier"] * atr
|
||
else: # SHORT
|
||
signal["sl_price"] = price + cfg["sl_multiplier"] * atr
|
||
signal["tp1_price"] = price - cfg["tp1_multiplier"] * atr
|
||
signal["tp2_price"] = price - cfg["tp2_multiplier"] * atr
|
||
|
||
return True
|
||
|
||
|
||
# ============ 主循环 ============
|
||
|
||
async def main():
|
||
"""主循环:监听PG NOTIFY + 定时轮询fallback"""
|
||
logger.info("=" * 60)
|
||
logger.info(f"🚀 Live Executor 启动 | 环境={TRADE_ENV} | 策略={ENABLED_STRATEGIES}")
|
||
logger.info(f" 风险/笔=${RISK_PER_TRADE_USD} | 最大持仓={MAX_POSITIONS}")
|
||
logger.info(f" API端点={BASE_URL}")
|
||
logger.info("=" * 60)
|
||
|
||
load_api_keys()
|
||
|
||
# 测试API连通性
|
||
async with aiohttp.ClientSession() as http_session:
|
||
balance = await get_account_balance(http_session)
|
||
logger.info(f"💰 账户余额: ${balance:.2f} USDT")
|
||
|
||
if balance <= 0:
|
||
logger.error("❌ 无法获取余额或余额为0,请检查API Key")
|
||
return
|
||
|
||
# DB连接(用于LISTEN)
|
||
listen_conn = get_db_connection()
|
||
cur = listen_conn.cursor()
|
||
cur.execute("LISTEN new_signal;")
|
||
logger.info("👂 已注册PG LISTEN new_signal")
|
||
|
||
# 工作DB连接
|
||
work_conn = psycopg2.connect(**DB_CONFIG)
|
||
|
||
def ensure_db_conn(conn):
|
||
try:
|
||
conn.cursor().execute("SELECT 1")
|
||
return conn
|
||
except Exception:
|
||
logger.warning("⚠️ DB连接断开,重连中...")
|
||
try:
|
||
conn.close()
|
||
except Exception:
|
||
pass
|
||
return psycopg2.connect(**DB_CONFIG)
|
||
|
||
async with aiohttp.ClientSession() as http_session:
|
||
while True:
|
||
try:
|
||
# 检查PG NOTIFY(非阻塞)
|
||
got_notify = False
|
||
if listen_conn.poll() == psycopg2.extensions.POLL_OK:
|
||
while listen_conn.notifies:
|
||
notify = listen_conn.notifies.pop(0)
|
||
logger.info(f"📡 收到NOTIFY: {notify.payload}")
|
||
got_notify = True
|
||
|
||
work_conn = ensure_db_conn(work_conn)
|
||
|
||
# 获取待处理信号(NOTIFY + 轮询双保险)
|
||
signals = fetch_pending_signals(work_conn)
|
||
|
||
for sig in signals:
|
||
# 补充TP/SL参数
|
||
if not enrich_signal_with_trade_params(sig, work_conn):
|
||
continue
|
||
|
||
logger.info(f"[{sig['symbol']}] 🎯 新信号: {sig['direction']} score={sig['score']} strategy={sig['strategy']}")
|
||
|
||
# 执行开仓
|
||
trade_id = await execute_entry(http_session, sig, work_conn)
|
||
if trade_id:
|
||
logger.info(f"[{sig['symbol']}] ✅ trade_id={trade_id} 开仓成功")
|
||
|
||
if not got_notify:
|
||
await asyncio.sleep(1) # 无NOTIFY时才sleep,有NOTIFY立即处理下一轮
|
||
|
||
except KeyboardInterrupt:
|
||
logger.info("🛑 收到退出信号")
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"❌ 主循环异常: {e}", exc_info=True)
|
||
await asyncio.sleep(5)
|
||
|
||
listen_conn.close()
|
||
work_conn.close()
|
||
logger.info("Live Executor 已停止")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|