import asyncio import json import logging import os import time from typing import Any import aiohttp import psycopg2 from psycopg2.extras import Json SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"] INTERVAL_SECONDS = 300 PG_HOST = os.getenv("PG_HOST", "10.106.0.3") PG_PORT = int(os.getenv("PG_PORT", "5432")) PG_DB = os.getenv("PG_DB", "arb_engine") PG_USER = os.getenv("PG_USER", "arb") PG_PASS = os.getenv("PG_PASS") if not PG_PASS: raise RuntimeError("PG_PASS 未设置,请在 .env 或环境变量中注入数据库密码") TABLE_SQL = """ CREATE TABLE IF NOT EXISTS market_indicators ( id SERIAL PRIMARY KEY, symbol VARCHAR(20) NOT NULL, indicator_type VARCHAR(50) NOT NULL, timestamp_ms BIGINT NOT NULL, value JSONB NOT NULL, created_at TIMESTAMP DEFAULT NOW(), UNIQUE(symbol, indicator_type, timestamp_ms) ); CREATE INDEX IF NOT EXISTS idx_market_indicators_lookup ON market_indicators(symbol, indicator_type, timestamp_ms DESC); """ UPSERT_SQL = """ INSERT INTO market_indicators (symbol, indicator_type, timestamp_ms, value) VALUES (%s, %s, %s, %s) ON CONFLICT (symbol, indicator_type, timestamp_ms) DO UPDATE SET value = EXCLUDED.value; """ logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) logger = logging.getLogger("market_data_collector") class MarketDataCollector: def __init__(self) -> None: self.conn = psycopg2.connect( host=PG_HOST, port=PG_PORT, dbname=PG_DB, user=PG_USER, password=PG_PASS, ) self.conn.autocommit = False def close(self) -> None: try: self.conn.close() except Exception: pass def ensure_table(self) -> None: with self.conn.cursor() as cur: cur.execute(TABLE_SQL) self.conn.commit() logger.info("market_indicators table ensured") def save_indicator(self, symbol: str, indicator_type: str, timestamp_ms: int, payload: Any) -> None: with self.conn.cursor() as cur: cur.execute(UPSERT_SQL, (symbol, indicator_type, timestamp_ms, Json(payload))) self.conn.commit() async def fetch_json(self, session: aiohttp.ClientSession, url: str, params: dict[str, Any] | None = None) -> Any: async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=20)) as resp: if resp.status != 200: text = await resp.text() raise RuntimeError(f"HTTP {resp.status} {url} {text[:200]}") return await resp.json() async def collect_long_short_ratio(self, session: aiohttp.ClientSession, symbol: str) -> None: endpoint = "https://fapi.binance.com/futures/data/globalLongShortAccountRatio" data = await self.fetch_json(session, endpoint, {"symbol": symbol, "period": "5m", "limit": 1}) if not data: raise RuntimeError("empty response") item = data[0] ts = int(item.get("timestamp") or int(time.time() * 1000)) self.save_indicator(symbol, "long_short_ratio", ts, item) async def collect_top_trader_position(self, session: aiohttp.ClientSession, symbol: str) -> None: endpoint = "https://fapi.binance.com/futures/data/topLongShortPositionRatio" data = await self.fetch_json(session, endpoint, {"symbol": symbol, "period": "5m", "limit": 1}) if not data: raise RuntimeError("empty response") item = data[0] ts = int(item.get("timestamp") or int(time.time() * 1000)) self.save_indicator(symbol, "top_trader_position", ts, item) async def collect_open_interest_hist(self, session: aiohttp.ClientSession, symbol: str) -> None: endpoint = "https://fapi.binance.com/futures/data/openInterestHist" data = await self.fetch_json(session, endpoint, {"symbol": symbol, "period": "5m", "limit": 1}) if not data: raise RuntimeError("empty response") item = data[0] ts = int(item.get("timestamp") or int(time.time() * 1000)) self.save_indicator(symbol, "open_interest_hist", ts, item) async def collect_coinbase_premium(self, session: aiohttp.ClientSession, symbol: str) -> None: pair_map = { "BTCUSDT": "BTC-USD", "ETHUSDT": "ETH-USD", } coinbase_pair = pair_map.get(symbol) if not coinbase_pair: return # XRP/SOL无Coinbase数据,跳过 binance_url = "https://api.binance.com/api/v3/ticker/price" coinbase_url = f"https://api.coinbase.com/v2/prices/{coinbase_pair}/spot" binance_data = await self.fetch_json(session, binance_url, {"symbol": symbol}) coinbase_data = await self.fetch_json(session, coinbase_url) binance_price = float(binance_data["price"]) coinbase_price = float(coinbase_data["data"]["amount"]) premium_pct = (coinbase_price - binance_price) / binance_price * 100.0 ts = int(time.time() * 1000) payload = { "symbol": symbol, "coinbase_pair": coinbase_pair, "binance": binance_data, "coinbase": coinbase_data, "premium_pct": premium_pct, } self.save_indicator(symbol, "coinbase_premium", ts, payload) async def collect_funding_rate(self, session: aiohttp.ClientSession, symbol: str) -> None: endpoint = "https://fapi.binance.com/fapi/v1/premiumIndex" data = await self.fetch_json(session, endpoint, {"symbol": symbol}) if not data: raise RuntimeError("empty response") # premiumIndex returns a single object (not array) item = data if isinstance(data, dict) else data[0] # Use current time as timestamp so every 5-min poll stores a new row ts = int(time.time() * 1000) payload = { "symbol": item.get("symbol"), "markPrice": item.get("markPrice"), "indexPrice": item.get("indexPrice"), "lastFundingRate": item.get("lastFundingRate"), "nextFundingTime": item.get("nextFundingTime"), "interestRate": item.get("interestRate"), "fundingRate": item.get("lastFundingRate"), # compat: signal_engine reads 'fundingRate' "time": ts, } self.save_indicator(symbol, "funding_rate", ts, payload) async def collect_obi_depth(self, session: aiohttp.ClientSession, symbol: str) -> None: """ OBI(订单簿失衡)采集 — Phase 2 BTC gate-control 核心特征 计算:(bid_vol - ask_vol) / (bid_vol + ask_vol),范围[-1,1] 正值=买压大,负值=卖压大 """ endpoint = "https://fapi.binance.com/fapi/v1/depth" data = await self.fetch_json(session, endpoint, {"symbol": symbol, "limit": 10}) ts = int(time.time() * 1000) bids = data.get("bids", []) asks = data.get("asks", []) bid_vol = sum(float(b[1]) for b in bids) ask_vol = sum(float(a[1]) for a in asks) total_vol = bid_vol + ask_vol obi = (bid_vol - ask_vol) / total_vol if total_vol > 0 else 0.0 best_bid = float(bids[0][0]) if bids else 0.0 best_ask = float(asks[0][0]) if asks else 0.0 spread_bps = ((best_ask - best_bid) / best_bid * 10000) if best_bid > 0 else 0.0 payload = { "symbol": symbol, "obi": round(obi, 6), "bid_vol_10": round(bid_vol, 4), "ask_vol_10": round(ask_vol, 4), "best_bid": best_bid, "best_ask": best_ask, "spread_bps": round(spread_bps, 3), } self.save_indicator(symbol, "obi_depth_10", ts, payload) async def collect_spot_perp_divergence(self, session: aiohttp.ClientSession, symbol: str) -> None: """ 期现背离采集 — Phase 2 BTC gate-control 核心特征 divergence = (spot - mark) / mark,正值=现货溢价,负值=现货折价 """ spot_url = "https://api.binance.com/api/v3/ticker/price" perp_url = "https://fapi.binance.com/fapi/v1/premiumIndex" spot_data, perp_data = await asyncio.gather( self.fetch_json(session, spot_url, {"symbol": symbol}), self.fetch_json(session, perp_url, {"symbol": symbol}), ) ts = int(time.time() * 1000) spot_price = float(spot_data["price"]) mark_price = float(perp_data["markPrice"]) index_price = float(perp_data.get("indexPrice", mark_price)) divergence = (spot_price - mark_price) / mark_price if mark_price > 0 else 0.0 payload = { "symbol": symbol, "spot_price": spot_price, "mark_price": mark_price, "index_price": index_price, "divergence": round(divergence, 8), "divergence_bps": round(divergence * 10000, 3), } self.save_indicator(symbol, "spot_perp_divergence", ts, payload) async def collect_tiered_cvd_whale(self, session: aiohttp.ClientSession, symbol: str) -> None: """ 巨鲸CVD分层采集 — Phase 2 BTC gate-control 核心特征 分层:small(<$10k), medium($10k-$100k), whale(>$100k) net_cvd = buy_usd - sell_usd(正=净买入) """ endpoint = "https://fapi.binance.com/fapi/v1/aggTrades" data = await self.fetch_json(session, endpoint, {"symbol": symbol, "limit": 500}) ts = int(time.time() * 1000) tiers = { "small": {"buy": 0.0, "sell": 0.0}, "medium": {"buy": 0.0, "sell": 0.0}, "whale": {"buy": 0.0, "sell": 0.0}, } for trade in data: price = float(trade["p"]) qty = float(trade["q"]) usd_val = price * qty is_sell = trade["m"] # m=True 表示卖单(taker卖) if usd_val < 10_000: tier = "small" elif usd_val < 100_000: tier = "medium" else: tier = "whale" if is_sell: tiers[tier]["sell"] += usd_val else: tiers[tier]["buy"] += usd_val result = {} for name, t in tiers.items(): buy, sell = t["buy"], t["sell"] net = buy - sell total = buy + sell result[name] = { "buy_usd": round(buy, 2), "sell_usd": round(sell, 2), "net_cvd": round(net, 2), "cvd_ratio": round(net / total, 4) if total > 0 else 0.0, } payload = { "symbol": symbol, "tiers": result, "whale_net_cvd": result["whale"]["net_cvd"], "whale_cvd_ratio": result["whale"]["cvd_ratio"], } self.save_indicator(symbol, "tiered_cvd_whale", ts, payload) async def collect_symbol(self, session: aiohttp.ClientSession, symbol: str) -> None: tasks = [ ("long_short_ratio", self.collect_long_short_ratio(session, symbol)), ("top_trader_position", self.collect_top_trader_position(session, symbol)), ("open_interest_hist", self.collect_open_interest_hist(session, symbol)), ("coinbase_premium", self.collect_coinbase_premium(session, symbol)), ("funding_rate", self.collect_funding_rate(session, symbol)), ("obi_depth_10", self.collect_obi_depth(session, symbol)), ("spot_perp_divergence", self.collect_spot_perp_divergence(session, symbol)), ("tiered_cvd_whale", self.collect_tiered_cvd_whale(session, symbol)), ] results = await asyncio.gather(*(t[1] for t in tasks), return_exceptions=True) for i, result in enumerate(results): name = tasks[i][0] if isinstance(result, Exception): logger.error("[%s] %s failed: %s", symbol, name, result) else: logger.info("[%s] %s collected", symbol, name) async def run_forever(self) -> None: self.ensure_table() headers = {"User-Agent": "ArbEngine/market-data-collector"} async with aiohttp.ClientSession(headers=headers) as session: while True: started = time.time() logger.info("start collection round") await asyncio.gather(*(self.collect_symbol(session, s) for s in SYMBOLS)) elapsed = time.time() - started sleep_for = max(1, INTERVAL_SECONDS - int(elapsed)) logger.info("round done in %.2fs, sleep %ss", elapsed, sleep_for) await asyncio.sleep(sleep_for) async def main() -> None: collector = MarketDataCollector() try: await collector.run_forever() finally: collector.close() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("stopped by keyboard interrupt")