arbitrage-engine/backend/market_data_collector.py
2026-03-31 08:56:11 +00:00

327 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
import logging
import os
import time
from typing import Any
import aiohttp
import psycopg2
from psycopg2.extras import Json
SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"]
INTERVAL_SECONDS = 300
PG_HOST = os.getenv("PG_HOST", "127.0.0.1")
PG_PORT = int(os.getenv("PG_PORT", "5432"))
PG_DB = os.getenv("PG_DB", "arb_engine")
PG_USER = os.getenv("PG_USER", "arb")
PG_PASS = os.getenv("PG_PASS")
if not PG_PASS:
raise RuntimeError("PG_PASS 未设置,请在 .env 或环境变量中注入数据库密码")
TABLE_SQL = """
CREATE TABLE IF NOT EXISTS market_indicators (
id SERIAL PRIMARY KEY,
symbol VARCHAR(20) NOT NULL,
indicator_type VARCHAR(50) NOT NULL,
timestamp_ms BIGINT NOT NULL,
value JSONB NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(symbol, indicator_type, timestamp_ms)
);
CREATE INDEX IF NOT EXISTS idx_market_indicators_lookup
ON market_indicators(symbol, indicator_type, timestamp_ms DESC);
"""
UPSERT_SQL = """
INSERT INTO market_indicators (symbol, indicator_type, timestamp_ms, value)
VALUES (%s, %s, %s, %s)
ON CONFLICT (symbol, indicator_type, timestamp_ms)
DO UPDATE SET value = EXCLUDED.value;
"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger("market_data_collector")
class MarketDataCollector:
def __init__(self) -> None:
self.conn = psycopg2.connect(
host=PG_HOST,
port=PG_PORT,
dbname=PG_DB,
user=PG_USER,
password=PG_PASS,
)
self.conn.autocommit = False
def close(self) -> None:
try:
self.conn.close()
except Exception:
pass
def ensure_table(self) -> None:
with self.conn.cursor() as cur:
cur.execute(TABLE_SQL)
self.conn.commit()
logger.info("market_indicators table ensured")
def save_indicator(self, symbol: str, indicator_type: str, timestamp_ms: int, payload: Any) -> None:
with self.conn.cursor() as cur:
cur.execute(UPSERT_SQL, (symbol, indicator_type, timestamp_ms, Json(payload)))
self.conn.commit()
async def fetch_json(self, session: aiohttp.ClientSession, url: str, params: dict[str, Any] | None = None) -> Any:
async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=20)) as resp:
if resp.status != 200:
text = await resp.text()
raise RuntimeError(f"HTTP {resp.status} {url} {text[:200]}")
return await resp.json()
async def collect_long_short_ratio(self, session: aiohttp.ClientSession, symbol: str) -> None:
endpoint = "https://fapi.binance.com/futures/data/globalLongShortAccountRatio"
data = await self.fetch_json(session, endpoint, {"symbol": symbol, "period": "5m", "limit": 1})
if not data:
raise RuntimeError("empty response")
item = data[0]
ts = int(item.get("timestamp") or int(time.time() * 1000))
self.save_indicator(symbol, "long_short_ratio", ts, item)
async def collect_top_trader_position(self, session: aiohttp.ClientSession, symbol: str) -> None:
endpoint = "https://fapi.binance.com/futures/data/topLongShortPositionRatio"
data = await self.fetch_json(session, endpoint, {"symbol": symbol, "period": "5m", "limit": 1})
if not data:
raise RuntimeError("empty response")
item = data[0]
ts = int(item.get("timestamp") or int(time.time() * 1000))
self.save_indicator(symbol, "top_trader_position", ts, item)
async def collect_open_interest_hist(self, session: aiohttp.ClientSession, symbol: str) -> None:
endpoint = "https://fapi.binance.com/futures/data/openInterestHist"
data = await self.fetch_json(session, endpoint, {"symbol": symbol, "period": "5m", "limit": 1})
if not data:
raise RuntimeError("empty response")
item = data[0]
ts = int(item.get("timestamp") or int(time.time() * 1000))
self.save_indicator(symbol, "open_interest_hist", ts, item)
async def collect_coinbase_premium(self, session: aiohttp.ClientSession, symbol: str) -> None:
pair_map = {
"BTCUSDT": "BTC-USD",
"ETHUSDT": "ETH-USD",
}
coinbase_pair = pair_map.get(symbol)
if not coinbase_pair:
return # XRP/SOL无Coinbase数据跳过
binance_url = "https://api.binance.com/api/v3/ticker/price"
coinbase_url = f"https://api.coinbase.com/v2/prices/{coinbase_pair}/spot"
binance_data = await self.fetch_json(session, binance_url, {"symbol": symbol})
coinbase_data = await self.fetch_json(session, coinbase_url)
binance_price = float(binance_data["price"])
coinbase_price = float(coinbase_data["data"]["amount"])
premium_pct = (coinbase_price - binance_price) / binance_price * 100.0
ts = int(time.time() * 1000)
payload = {
"symbol": symbol,
"coinbase_pair": coinbase_pair,
"binance": binance_data,
"coinbase": coinbase_data,
"premium_pct": premium_pct,
}
self.save_indicator(symbol, "coinbase_premium", ts, payload)
async def collect_funding_rate(self, session: aiohttp.ClientSession, symbol: str) -> None:
endpoint = "https://fapi.binance.com/fapi/v1/premiumIndex"
data = await self.fetch_json(session, endpoint, {"symbol": symbol})
if not data:
raise RuntimeError("empty response")
# premiumIndex returns a single object (not array)
item = data if isinstance(data, dict) else data[0]
# Use current time as timestamp so every 5-min poll stores a new row
ts = int(time.time() * 1000)
payload = {
"symbol": item.get("symbol"),
"markPrice": item.get("markPrice"),
"indexPrice": item.get("indexPrice"),
"lastFundingRate": item.get("lastFundingRate"),
"nextFundingTime": item.get("nextFundingTime"),
"interestRate": item.get("interestRate"),
"fundingRate": item.get("lastFundingRate"), # compat: signal_engine reads 'fundingRate'
"time": ts,
}
self.save_indicator(symbol, "funding_rate", ts, payload)
async def collect_obi_depth(self, session: aiohttp.ClientSession, symbol: str) -> None:
"""
OBI订单簿失衡采集 — Phase 2 BTC gate-control 核心特征
计算:(bid_vol - ask_vol) / (bid_vol + ask_vol),范围[-1,1]
正值=买压大,负值=卖压大
"""
endpoint = "https://fapi.binance.com/fapi/v1/depth"
data = await self.fetch_json(session, endpoint, {"symbol": symbol, "limit": 10})
ts = int(time.time() * 1000)
bids = data.get("bids", [])
asks = data.get("asks", [])
bid_vol = sum(float(b[1]) for b in bids)
ask_vol = sum(float(a[1]) for a in asks)
total_vol = bid_vol + ask_vol
obi = (bid_vol - ask_vol) / total_vol if total_vol > 0 else 0.0
best_bid = float(bids[0][0]) if bids else 0.0
best_ask = float(asks[0][0]) if asks else 0.0
spread_bps = ((best_ask - best_bid) / best_bid * 10000) if best_bid > 0 else 0.0
payload = {
"symbol": symbol,
"obi": round(obi, 6),
"bid_vol_10": round(bid_vol, 4),
"ask_vol_10": round(ask_vol, 4),
"best_bid": best_bid,
"best_ask": best_ask,
"spread_bps": round(spread_bps, 3),
}
self.save_indicator(symbol, "obi_depth_10", ts, payload)
async def collect_spot_perp_divergence(self, session: aiohttp.ClientSession, symbol: str) -> None:
"""
期现背离采集 — Phase 2 BTC gate-control 核心特征
divergence = (spot - mark) / mark正值=现货溢价,负值=现货折价
"""
spot_url = "https://api.binance.com/api/v3/ticker/price"
perp_url = "https://fapi.binance.com/fapi/v1/premiumIndex"
spot_data, perp_data = await asyncio.gather(
self.fetch_json(session, spot_url, {"symbol": symbol}),
self.fetch_json(session, perp_url, {"symbol": symbol}),
)
ts = int(time.time() * 1000)
spot_price = float(spot_data["price"])
mark_price = float(perp_data["markPrice"])
index_price = float(perp_data.get("indexPrice", mark_price))
divergence = (spot_price - mark_price) / mark_price if mark_price > 0 else 0.0
payload = {
"symbol": symbol,
"spot_price": spot_price,
"mark_price": mark_price,
"index_price": index_price,
"divergence": round(divergence, 8),
"divergence_bps": round(divergence * 10000, 3),
}
self.save_indicator(symbol, "spot_perp_divergence", ts, payload)
async def collect_tiered_cvd_whale(self, session: aiohttp.ClientSession, symbol: str) -> None:
"""
巨鲸CVD分层采集 — Phase 2 BTC gate-control 核心特征
分层small(<$10k), medium($10k-$100k), whale(>$100k)
net_cvd = buy_usd - sell_usd正=净买入)
"""
endpoint = "https://fapi.binance.com/fapi/v1/aggTrades"
data = await self.fetch_json(session, endpoint, {"symbol": symbol, "limit": 500})
ts = int(time.time() * 1000)
tiers = {
"small": {"buy": 0.0, "sell": 0.0},
"medium": {"buy": 0.0, "sell": 0.0},
"whale": {"buy": 0.0, "sell": 0.0},
}
for trade in data:
price = float(trade["p"])
qty = float(trade["q"])
usd_val = price * qty
is_sell = trade["m"] # m=True 表示卖单taker卖
if usd_val < 10_000:
tier = "small"
elif usd_val < 100_000:
tier = "medium"
else:
tier = "whale"
if is_sell:
tiers[tier]["sell"] += usd_val
else:
tiers[tier]["buy"] += usd_val
result = {}
for name, t in tiers.items():
buy, sell = t["buy"], t["sell"]
net = buy - sell
total = buy + sell
result[name] = {
"buy_usd": round(buy, 2),
"sell_usd": round(sell, 2),
"net_cvd": round(net, 2),
"cvd_ratio": round(net / total, 4) if total > 0 else 0.0,
}
payload = {
"symbol": symbol,
"tiers": result,
"whale_net_cvd": result["whale"]["net_cvd"],
"whale_cvd_ratio": result["whale"]["cvd_ratio"],
}
self.save_indicator(symbol, "tiered_cvd_whale", ts, payload)
async def collect_symbol(self, session: aiohttp.ClientSession, symbol: str) -> None:
tasks = [
("long_short_ratio", self.collect_long_short_ratio(session, symbol)),
("top_trader_position", self.collect_top_trader_position(session, symbol)),
("open_interest_hist", self.collect_open_interest_hist(session, symbol)),
("coinbase_premium", self.collect_coinbase_premium(session, symbol)),
("funding_rate", self.collect_funding_rate(session, symbol)),
("obi_depth_10", self.collect_obi_depth(session, symbol)),
("spot_perp_divergence", self.collect_spot_perp_divergence(session, symbol)),
("tiered_cvd_whale", self.collect_tiered_cvd_whale(session, symbol)),
]
results = await asyncio.gather(*(t[1] for t in tasks), return_exceptions=True)
for i, result in enumerate(results):
name = tasks[i][0]
if isinstance(result, Exception):
logger.error("[%s] %s failed: %s", symbol, name, result)
else:
logger.info("[%s] %s collected", symbol, name)
async def run_forever(self) -> None:
self.ensure_table()
headers = {"User-Agent": "ArbEngine/market-data-collector"}
async with aiohttp.ClientSession(headers=headers) as session:
while True:
started = time.time()
logger.info("start collection round")
await asyncio.gather(*(self.collect_symbol(session, s) for s in SYMBOLS))
elapsed = time.time() - started
sleep_for = max(1, INTERVAL_SECONDS - int(elapsed))
logger.info("round done in %.2fs, sleep %ss", elapsed, sleep_for)
await asyncio.sleep(sleep_for)
async def main() -> None:
collector = MarketDataCollector()
try:
await collector.run_forever()
finally:
collector.close()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("stopped by keyboard interrupt")