arbitrage-engine/backend/market_data_collector.py
dev-worker 31e6e19ea6 fix: V3全面审阅修复 — 12项问题
P0-1: 风控Fail-Closed(状态文件缺失/过期/异常→拒绝开仓)
P0-2: 1R基准跨模块统一(position_sync+risk_guard从live_config动态读)
P0-3: close_all紧急全平校验返回值+二次验仓
P0-4: Coinbase Premium单位修复(premium_pct/100→比例值)
P1-3: 正向funding计入净PnL(不再只扣负值)
P1-4: 数据新鲜度检查落地(查signal_indicators最新ts)
P1-6: live表DDL补全到SCHEMA_SQL(live_config/live_events/live_trades)
P2-1: _get_risk_usd()加60秒缓存
P2-3: 模拟盘前端*200→从config动态算paper1R
P2-4: XRP/SOL跳过Coinbase Premium采集(无数据源)
P3-2: SQL参数化(fetch_pending_signals用ANY替代f-string)
额外: pnl_r公式修正(gross-fee+funding,funding正负都正确计入)
2026-03-02 17:28:23 +00:00

207 lines
8.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
import logging
import os
import time
from typing import Any
import aiohttp
import psycopg2
from psycopg2.extras import Json
SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"]
INTERVAL_SECONDS = 300
PG_HOST = os.getenv("PG_HOST", "127.0.0.1")
PG_PORT = int(os.getenv("PG_PORT", "5432"))
PG_DB = os.getenv("PG_DB", "arb_engine")
PG_USER = os.getenv("PG_USER", "arb")
PG_PASS = os.getenv("PG_PASS", "arb_engine_2026")
TABLE_SQL = """
CREATE TABLE IF NOT EXISTS market_indicators (
id SERIAL PRIMARY KEY,
symbol VARCHAR(20) NOT NULL,
indicator_type VARCHAR(50) NOT NULL,
timestamp_ms BIGINT NOT NULL,
value JSONB NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(symbol, indicator_type, timestamp_ms)
);
CREATE INDEX IF NOT EXISTS idx_market_indicators_lookup
ON market_indicators(symbol, indicator_type, timestamp_ms DESC);
"""
UPSERT_SQL = """
INSERT INTO market_indicators (symbol, indicator_type, timestamp_ms, value)
VALUES (%s, %s, %s, %s)
ON CONFLICT (symbol, indicator_type, timestamp_ms)
DO UPDATE SET value = EXCLUDED.value;
"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger("market_data_collector")
class MarketDataCollector:
def __init__(self) -> None:
self.conn = psycopg2.connect(
host=PG_HOST,
port=PG_PORT,
dbname=PG_DB,
user=PG_USER,
password=PG_PASS,
)
self.conn.autocommit = False
def close(self) -> None:
try:
self.conn.close()
except Exception:
pass
def ensure_table(self) -> None:
with self.conn.cursor() as cur:
cur.execute(TABLE_SQL)
self.conn.commit()
logger.info("market_indicators table ensured")
def save_indicator(self, symbol: str, indicator_type: str, timestamp_ms: int, payload: Any) -> None:
with self.conn.cursor() as cur:
cur.execute(UPSERT_SQL, (symbol, indicator_type, timestamp_ms, Json(payload)))
self.conn.commit()
async def fetch_json(self, session: aiohttp.ClientSession, url: str, params: dict[str, Any] | None = None) -> Any:
async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=20)) as resp:
if resp.status != 200:
text = await resp.text()
raise RuntimeError(f"HTTP {resp.status} {url} {text[:200]}")
return await resp.json()
async def collect_long_short_ratio(self, session: aiohttp.ClientSession, symbol: str) -> None:
endpoint = "https://fapi.binance.com/futures/data/globalLongShortAccountRatio"
data = await self.fetch_json(session, endpoint, {"symbol": symbol, "period": "5m", "limit": 1})
if not data:
raise RuntimeError("empty response")
item = data[0]
ts = int(item.get("timestamp") or int(time.time() * 1000))
self.save_indicator(symbol, "long_short_ratio", ts, item)
async def collect_top_trader_position(self, session: aiohttp.ClientSession, symbol: str) -> None:
endpoint = "https://fapi.binance.com/futures/data/topLongShortPositionRatio"
data = await self.fetch_json(session, endpoint, {"symbol": symbol, "period": "5m", "limit": 1})
if not data:
raise RuntimeError("empty response")
item = data[0]
ts = int(item.get("timestamp") or int(time.time() * 1000))
self.save_indicator(symbol, "top_trader_position", ts, item)
async def collect_open_interest_hist(self, session: aiohttp.ClientSession, symbol: str) -> None:
endpoint = "https://fapi.binance.com/futures/data/openInterestHist"
data = await self.fetch_json(session, endpoint, {"symbol": symbol, "period": "5m", "limit": 1})
if not data:
raise RuntimeError("empty response")
item = data[0]
ts = int(item.get("timestamp") or int(time.time() * 1000))
self.save_indicator(symbol, "open_interest_hist", ts, item)
async def collect_coinbase_premium(self, session: aiohttp.ClientSession, symbol: str) -> None:
pair_map = {
"BTCUSDT": "BTC-USD",
"ETHUSDT": "ETH-USD",
}
coinbase_pair = pair_map.get(symbol)
if not coinbase_pair:
return # XRP/SOL无Coinbase数据跳过
binance_url = "https://api.binance.com/api/v3/ticker/price"
coinbase_url = f"https://api.coinbase.com/v2/prices/{coinbase_pair}/spot"
binance_data = await self.fetch_json(session, binance_url, {"symbol": symbol})
coinbase_data = await self.fetch_json(session, coinbase_url)
binance_price = float(binance_data["price"])
coinbase_price = float(coinbase_data["data"]["amount"])
premium_pct = (coinbase_price - binance_price) / binance_price * 100.0
ts = int(time.time() * 1000)
payload = {
"symbol": symbol,
"coinbase_pair": coinbase_pair,
"binance": binance_data,
"coinbase": coinbase_data,
"premium_pct": premium_pct,
}
self.save_indicator(symbol, "coinbase_premium", ts, payload)
async def collect_funding_rate(self, session: aiohttp.ClientSession, symbol: str) -> None:
endpoint = "https://fapi.binance.com/fapi/v1/premiumIndex"
data = await self.fetch_json(session, endpoint, {"symbol": symbol})
if not data:
raise RuntimeError("empty response")
# premiumIndex returns a single object (not array)
item = data if isinstance(data, dict) else data[0]
# Use current time as timestamp so every 5-min poll stores a new row
ts = int(time.time() * 1000)
payload = {
"symbol": item.get("symbol"),
"markPrice": item.get("markPrice"),
"indexPrice": item.get("indexPrice"),
"lastFundingRate": item.get("lastFundingRate"),
"nextFundingTime": item.get("nextFundingTime"),
"interestRate": item.get("interestRate"),
"fundingRate": item.get("lastFundingRate"), # compat: signal_engine reads 'fundingRate'
"time": ts,
}
self.save_indicator(symbol, "funding_rate", ts, payload)
async def collect_symbol(self, session: aiohttp.ClientSession, symbol: str) -> None:
tasks = [
("long_short_ratio", self.collect_long_short_ratio(session, symbol)),
("top_trader_position", self.collect_top_trader_position(session, symbol)),
("open_interest_hist", self.collect_open_interest_hist(session, symbol)),
("coinbase_premium", self.collect_coinbase_premium(session, symbol)),
("funding_rate", self.collect_funding_rate(session, symbol)),
]
results = await asyncio.gather(*(t[1] for t in tasks), return_exceptions=True)
for i, result in enumerate(results):
name = tasks[i][0]
if isinstance(result, Exception):
logger.error("[%s] %s failed: %s", symbol, name, result)
else:
logger.info("[%s] %s collected", symbol, name)
async def run_forever(self) -> None:
self.ensure_table()
headers = {"User-Agent": "ArbEngine/market-data-collector"}
async with aiohttp.ClientSession(headers=headers) as session:
while True:
started = time.time()
logger.info("start collection round")
await asyncio.gather(*(self.collect_symbol(session, s) for s in SYMBOLS))
elapsed = time.time() - started
sleep_for = max(1, INTERVAL_SECONDS - int(elapsed))
logger.info("round done in %.2fs, sleep %ss", elapsed, sleep_for)
await asyncio.sleep(sleep_for)
async def main() -> None:
collector = MarketDataCollector()
try:
await collector.run_forever()
finally:
collector.close()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("stopped by keyboard interrupt")