arbitrage-engine/backend/signal_engine.py

361 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
signal_engine.py — V5 短线交易信号引擎PostgreSQL版
架构:
- 独立PM2进程每5秒循环
- 内存滚动窗口计算指标CVD/ATR/VWAP/大单阈值)
- 启动时回灌历史数据冷启动warmup
- 信号评估核心3条件+加分3条件
- 输出signal_indicators表 + signal_trades表 + Discord推送
指标:
- CVD_fast (30m滚动) / CVD_mid (4h滚动) / CVD_day (UTC日内)
- ATR (5m, 14周期)
- VWAP_30m
- 大单阈值 P95/P99 (24h滚动)
"""
import logging
import os
import time
from collections import deque
from datetime import datetime, timezone
from typing import Optional
from db import get_sync_conn, init_schema
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler(os.path.join(os.path.dirname(__file__), "..", "signal-engine.log")),
],
)
logger = logging.getLogger("signal-engine")
SYMBOLS = ["BTCUSDT", "ETHUSDT"]
LOOP_INTERVAL = 15 # 秒从5改15CPU降60%,信号质量无影响)
# 窗口大小(毫秒)
WINDOW_FAST = 30 * 60 * 1000 # 30分钟
WINDOW_MID = 4 * 3600 * 1000 # 4小时
WINDOW_DAY = 24 * 3600 * 1000 # 24小时
WINDOW_VWAP = 30 * 60 * 1000 # 30分钟
# ATR参数
ATR_PERIOD_MS = 5 * 60 * 1000
ATR_LENGTH = 14
# 信号冷却
COOLDOWN_MS = 10 * 60 * 1000
# ─── 滚动窗口 ───────────────────────────────────────────────────
class TradeWindow:
def __init__(self, window_ms: int):
self.window_ms = window_ms
self.trades: deque = deque()
self.buy_vol = 0.0
self.sell_vol = 0.0
self.pq_sum = 0.0
self.q_sum = 0.0
def add(self, time_ms: int, qty: float, price: float, is_buyer_maker: int):
self.trades.append((time_ms, qty, price, is_buyer_maker))
pq = price * qty
self.pq_sum += pq
self.q_sum += qty
if is_buyer_maker == 0:
self.buy_vol += qty
else:
self.sell_vol += qty
def trim(self, now_ms: int):
cutoff = now_ms - self.window_ms
while self.trades and self.trades[0][0] < cutoff:
t_ms, qty, price, ibm = self.trades.popleft()
self.pq_sum -= price * qty
self.q_sum -= qty
if ibm == 0:
self.buy_vol -= qty
else:
self.sell_vol -= qty
@property
def cvd(self) -> float:
return self.buy_vol - self.sell_vol
@property
def vwap(self) -> float:
return self.pq_sum / self.q_sum if self.q_sum > 0 else 0.0
class ATRCalculator:
def __init__(self, period_ms: int = ATR_PERIOD_MS, length: int = ATR_LENGTH):
self.period_ms = period_ms
self.length = length
self.candles: deque = deque(maxlen=length + 1)
self.current_candle: Optional[dict] = None
self.atr_history: deque = deque(maxlen=288)
def update(self, time_ms: int, price: float):
bar_ms = (time_ms // self.period_ms) * self.period_ms
if self.current_candle is None or self.current_candle["bar"] != bar_ms:
if self.current_candle is not None:
self.candles.append(self.current_candle)
self.current_candle = {"bar": bar_ms, "open": price, "high": price, "low": price, "close": price}
else:
c = self.current_candle
c["high"] = max(c["high"], price)
c["low"] = min(c["low"], price)
c["close"] = price
@property
def atr(self) -> float:
if len(self.candles) < 2:
return 0.0
trs = []
candles_list = list(self.candles)
for i in range(1, len(candles_list)):
prev_close = candles_list[i-1]["close"]
c = candles_list[i]
tr = max(c["high"] - c["low"], abs(c["high"] - prev_close), abs(c["low"] - prev_close))
trs.append(tr)
if not trs:
return 0.0
atr_val = trs[0]
for tr in trs[1:]:
atr_val = (atr_val * (self.length - 1) + tr) / self.length
return atr_val
@property
def atr_percentile(self) -> float:
current = self.atr
if current == 0:
return 50.0
self.atr_history.append(current)
if len(self.atr_history) < 10:
return 50.0
sorted_hist = sorted(self.atr_history)
rank = sum(1 for x in sorted_hist if x <= current)
return (rank / len(sorted_hist)) * 100
class SymbolState:
def __init__(self, symbol: str):
self.symbol = symbol
self.win_fast = TradeWindow(WINDOW_FAST)
self.win_mid = TradeWindow(WINDOW_MID)
self.win_day = TradeWindow(WINDOW_DAY)
self.win_vwap = TradeWindow(WINDOW_VWAP)
self.atr_calc = ATRCalculator()
self.last_processed_id = 0
self.warmup = True
self.prev_cvd_fast = 0.0
self.last_signal_ts = 0
self.last_signal_dir = ""
self.recent_large_trades: deque = deque()
def process_trade(self, agg_id: int, time_ms: int, price: float, qty: float, is_buyer_maker: int):
now_ms = time_ms
self.win_fast.add(time_ms, qty, price, is_buyer_maker)
self.win_mid.add(time_ms, qty, price, is_buyer_maker)
self.win_day.add(time_ms, qty, price, is_buyer_maker)
self.win_vwap.add(time_ms, qty, price, is_buyer_maker)
self.atr_calc.update(time_ms, price)
self.win_fast.trim(now_ms)
self.win_mid.trim(now_ms)
self.win_day.trim(now_ms)
self.win_vwap.trim(now_ms)
self.last_processed_id = agg_id
def compute_p95_p99(self) -> tuple:
if len(self.win_day.trades) < 100:
return 5.0, 10.0
qtys = sorted([t[1] for t in self.win_day.trades])
n = len(qtys)
p95 = qtys[int(n * 0.95)]
p99 = qtys[int(n * 0.99)]
if "BTC" in self.symbol:
p95 = max(p95, 5.0); p99 = max(p99, 10.0)
else:
p95 = max(p95, 50.0); p99 = max(p99, 100.0)
return p95, p99
def update_large_trades(self, now_ms: int, p99: float):
cutoff = now_ms - 15 * 60 * 1000
while self.recent_large_trades and self.recent_large_trades[0][0] < cutoff:
self.recent_large_trades.popleft()
for t in self.win_fast.trades:
if t[1] >= p99 and t[0] > cutoff:
self.recent_large_trades.append((t[0], t[1], t[3]))
def evaluate_signal(self, now_ms: int) -> dict:
cvd_fast = self.win_fast.cvd
cvd_mid = self.win_mid.cvd
vwap = self.win_vwap.vwap
atr = self.atr_calc.atr
atr_pct = self.atr_calc.atr_percentile
p95, p99 = self.compute_p95_p99()
self.update_large_trades(now_ms, p99)
price = vwap if vwap > 0 else 0
cvd_fast_slope = cvd_fast - self.prev_cvd_fast
self.prev_cvd_fast = cvd_fast
result = {
"cvd_fast": cvd_fast, "cvd_mid": cvd_mid, "cvd_day": self.win_day.cvd,
"cvd_fast_slope": cvd_fast_slope,
"atr": atr, "atr_pct": atr_pct, "vwap": vwap, "price": price,
"p95": p95, "p99": p99, "signal": None, "direction": None, "score": 0,
}
if self.warmup or price == 0 or atr == 0:
return result
if now_ms - self.last_signal_ts < COOLDOWN_MS:
return result
long_core = cvd_fast > 0 and cvd_fast_slope > 0 and cvd_mid > 0 and price > vwap
short_core = cvd_fast < 0 and cvd_fast_slope < 0 and cvd_mid < 0 and price < vwap
if not long_core and not short_core:
return result
direction = "LONG" if long_core else "SHORT"
score = 0
if atr_pct > 60:
score += 25
has_adverse = any(
(direction == "LONG" and lt[2] == 1) or (direction == "SHORT" and lt[2] == 0)
for lt in self.recent_large_trades
)
if not has_adverse:
score += 20
result["signal"] = direction
result["direction"] = direction
result["score"] = score
self.last_signal_ts = now_ms
self.last_signal_dir = direction
return result
# ─── PG DB操作 ───────────────────────────────────────────────────
def load_historical(state: SymbolState, window_ms: int):
now_ms = int(time.time() * 1000)
start_ms = now_ms - window_ms
count = 0
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT agg_id, price, qty, time_ms, is_buyer_maker FROM agg_trades "
"WHERE symbol = %s AND time_ms >= %s ORDER BY agg_id ASC",
(state.symbol, start_ms)
)
while True:
rows = cur.fetchmany(5000)
if not rows:
break
for r in rows:
state.process_trade(r[0], r[3], r[1], r[2], r[4])
count += 1
logger.info(f"[{state.symbol}] 冷启动完成: 加载{count:,}条历史数据 (窗口={window_ms//3600000}h)")
state.warmup = False
def fetch_new_trades(symbol: str, last_id: int) -> list:
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT agg_id, price, qty, time_ms, is_buyer_maker FROM agg_trades "
"WHERE symbol = %s AND agg_id > %s ORDER BY agg_id ASC LIMIT 10000",
(symbol, last_id)
)
return [{"agg_id": r[0], "price": r[1], "qty": r[2], "time_ms": r[3], "is_buyer_maker": r[4]}
for r in cur.fetchall()]
def save_indicator(ts: int, symbol: str, result: dict):
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO signal_indicators "
"(ts,symbol,cvd_fast,cvd_mid,cvd_day,cvd_fast_slope,atr_5m,atr_percentile,vwap_30m,price,p95_qty,p99_qty,score,signal) "
"VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
(ts, symbol, result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["cvd_fast_slope"],
result["atr"], result["atr_pct"], result["vwap"], result["price"],
result["p95"], result["p99"], result["score"], result.get("signal"))
)
conn.commit()
def save_indicator_1m(ts: int, symbol: str, result: dict):
bar_ts = (ts // 60000) * 60000
with get_sync_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT id FROM signal_indicators_1m WHERE ts=%s AND symbol=%s", (bar_ts, symbol))
if cur.fetchone():
cur.execute(
"UPDATE signal_indicators_1m SET cvd_fast=%s,cvd_mid=%s,cvd_day=%s,atr_5m=%s,vwap_30m=%s,price=%s,score=%s,signal=%s WHERE ts=%s AND symbol=%s",
(result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["atr"], result["vwap"],
result["price"], result["score"], result.get("signal"), bar_ts, symbol)
)
else:
cur.execute(
"INSERT INTO signal_indicators_1m (ts,symbol,cvd_fast,cvd_mid,cvd_day,atr_5m,vwap_30m,price,score,signal) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
(bar_ts, symbol, result["cvd_fast"], result["cvd_mid"], result["cvd_day"], result["atr"],
result["vwap"], result["price"], result["score"], result.get("signal"))
)
conn.commit()
# ─── 主循环 ──────────────────────────────────────────────────────
def main():
init_schema()
states = {sym: SymbolState(sym) for sym in SYMBOLS}
for sym, state in states.items():
load_historical(state, WINDOW_MID)
logger.info("=== Signal Engine (PG) 启动完成 ===")
last_1m_save = {}
cycle = 0
while True:
try:
now_ms = int(time.time() * 1000)
for sym, state in states.items():
new_trades = fetch_new_trades(sym, state.last_processed_id)
for t in new_trades:
state.process_trade(t["agg_id"], t["time_ms"], t["price"], t["qty"], t["is_buyer_maker"])
result = state.evaluate_signal(now_ms)
save_indicator(now_ms, sym, result)
bar_1m = (now_ms // 60000) * 60000
if last_1m_save.get(sym) != bar_1m:
save_indicator_1m(now_ms, sym, result)
last_1m_save[sym] = bar_1m
if result.get("signal"):
logger.info(f"[{sym}] 🚨 信号: {result['signal']} score={result['score']} price={result['price']:.1f}")
cycle += 1
if cycle % 60 == 0:
for sym, state in states.items():
r = state.evaluate_signal(now_ms)
logger.info(f"[{sym}] 状态: CVD_fast={r['cvd_fast']:.1f} CVD_mid={r['cvd_mid']:.1f} ATR={r['atr']:.2f}({r['atr_pct']:.0f}%) VWAP={r['vwap']:.1f}")
except Exception as e:
logger.error(f"循环异常: {e}", exc_info=True)
time.sleep(LOOP_INTERVAL)
if __name__ == "__main__":
main()