arbitrage-engine/backend/market_data_collector.py

207 lines
8.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
import logging
import os
import time
from typing import Any
import aiohttp
import psycopg2
from psycopg2.extras import Json
SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"]
INTERVAL_SECONDS = 300
PG_HOST = os.getenv("PG_HOST", "10.106.0.3")
PG_PORT = int(os.getenv("PG_PORT", "5432"))
PG_DB = os.getenv("PG_DB", "arb_engine")
PG_USER = os.getenv("PG_USER", "arb")
PG_PASS = os.getenv("PG_PASS", "arb_engine_2026")
TABLE_SQL = """
CREATE TABLE IF NOT EXISTS market_indicators (
id SERIAL PRIMARY KEY,
symbol VARCHAR(20) NOT NULL,
indicator_type VARCHAR(50) NOT NULL,
timestamp_ms BIGINT NOT NULL,
value JSONB NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(symbol, indicator_type, timestamp_ms)
);
CREATE INDEX IF NOT EXISTS idx_market_indicators_lookup
ON market_indicators(symbol, indicator_type, timestamp_ms DESC);
"""
UPSERT_SQL = """
INSERT INTO market_indicators (symbol, indicator_type, timestamp_ms, value)
VALUES (%s, %s, %s, %s)
ON CONFLICT (symbol, indicator_type, timestamp_ms)
DO UPDATE SET value = EXCLUDED.value;
"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger("market_data_collector")
class MarketDataCollector:
def __init__(self) -> None:
self.conn = psycopg2.connect(
host=PG_HOST,
port=PG_PORT,
dbname=PG_DB,
user=PG_USER,
password=PG_PASS,
)
self.conn.autocommit = False
def close(self) -> None:
try:
self.conn.close()
except Exception:
pass
def ensure_table(self) -> None:
with self.conn.cursor() as cur:
cur.execute(TABLE_SQL)
self.conn.commit()
logger.info("market_indicators table ensured")
def save_indicator(self, symbol: str, indicator_type: str, timestamp_ms: int, payload: Any) -> None:
with self.conn.cursor() as cur:
cur.execute(UPSERT_SQL, (symbol, indicator_type, timestamp_ms, Json(payload)))
self.conn.commit()
async def fetch_json(self, session: aiohttp.ClientSession, url: str, params: dict[str, Any] | None = None) -> Any:
async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=20)) as resp:
if resp.status != 200:
text = await resp.text()
raise RuntimeError(f"HTTP {resp.status} {url} {text[:200]}")
return await resp.json()
async def collect_long_short_ratio(self, session: aiohttp.ClientSession, symbol: str) -> None:
endpoint = "https://fapi.binance.com/futures/data/globalLongShortAccountRatio"
data = await self.fetch_json(session, endpoint, {"symbol": symbol, "period": "5m", "limit": 1})
if not data:
raise RuntimeError("empty response")
item = data[0]
ts = int(item.get("timestamp") or int(time.time() * 1000))
self.save_indicator(symbol, "long_short_ratio", ts, item)
async def collect_top_trader_position(self, session: aiohttp.ClientSession, symbol: str) -> None:
endpoint = "https://fapi.binance.com/futures/data/topLongShortPositionRatio"
data = await self.fetch_json(session, endpoint, {"symbol": symbol, "period": "5m", "limit": 1})
if not data:
raise RuntimeError("empty response")
item = data[0]
ts = int(item.get("timestamp") or int(time.time() * 1000))
self.save_indicator(symbol, "top_trader_position", ts, item)
async def collect_open_interest_hist(self, session: aiohttp.ClientSession, symbol: str) -> None:
endpoint = "https://fapi.binance.com/futures/data/openInterestHist"
data = await self.fetch_json(session, endpoint, {"symbol": symbol, "period": "5m", "limit": 1})
if not data:
raise RuntimeError("empty response")
item = data[0]
ts = int(item.get("timestamp") or int(time.time() * 1000))
self.save_indicator(symbol, "open_interest_hist", ts, item)
async def collect_coinbase_premium(self, session: aiohttp.ClientSession, symbol: str) -> None:
pair_map = {
"BTCUSDT": "BTC-USD",
"ETHUSDT": "ETH-USD",
}
coinbase_pair = pair_map.get(symbol)
if not coinbase_pair:
return # XRP/SOL无Coinbase数据跳过
binance_url = "https://api.binance.com/api/v3/ticker/price"
coinbase_url = f"https://api.coinbase.com/v2/prices/{coinbase_pair}/spot"
binance_data = await self.fetch_json(session, binance_url, {"symbol": symbol})
coinbase_data = await self.fetch_json(session, coinbase_url)
binance_price = float(binance_data["price"])
coinbase_price = float(coinbase_data["data"]["amount"])
premium_pct = (coinbase_price - binance_price) / binance_price * 100.0
ts = int(time.time() * 1000)
payload = {
"symbol": symbol,
"coinbase_pair": coinbase_pair,
"binance": binance_data,
"coinbase": coinbase_data,
"premium_pct": premium_pct,
}
self.save_indicator(symbol, "coinbase_premium", ts, payload)
async def collect_funding_rate(self, session: aiohttp.ClientSession, symbol: str) -> None:
endpoint = "https://fapi.binance.com/fapi/v1/premiumIndex"
data = await self.fetch_json(session, endpoint, {"symbol": symbol})
if not data:
raise RuntimeError("empty response")
# premiumIndex returns a single object (not array)
item = data if isinstance(data, dict) else data[0]
# Use current time as timestamp so every 5-min poll stores a new row
ts = int(time.time() * 1000)
payload = {
"symbol": item.get("symbol"),
"markPrice": item.get("markPrice"),
"indexPrice": item.get("indexPrice"),
"lastFundingRate": item.get("lastFundingRate"),
"nextFundingTime": item.get("nextFundingTime"),
"interestRate": item.get("interestRate"),
"fundingRate": item.get("lastFundingRate"), # compat: signal_engine reads 'fundingRate'
"time": ts,
}
self.save_indicator(symbol, "funding_rate", ts, payload)
async def collect_symbol(self, session: aiohttp.ClientSession, symbol: str) -> None:
tasks = [
("long_short_ratio", self.collect_long_short_ratio(session, symbol)),
("top_trader_position", self.collect_top_trader_position(session, symbol)),
("open_interest_hist", self.collect_open_interest_hist(session, symbol)),
("coinbase_premium", self.collect_coinbase_premium(session, symbol)),
("funding_rate", self.collect_funding_rate(session, symbol)),
]
results = await asyncio.gather(*(t[1] for t in tasks), return_exceptions=True)
for i, result in enumerate(results):
name = tasks[i][0]
if isinstance(result, Exception):
logger.error("[%s] %s failed: %s", symbol, name, result)
else:
logger.info("[%s] %s collected", symbol, name)
async def run_forever(self) -> None:
self.ensure_table()
headers = {"User-Agent": "ArbEngine/market-data-collector"}
async with aiohttp.ClientSession(headers=headers) as session:
while True:
started = time.time()
logger.info("start collection round")
await asyncio.gather(*(self.collect_symbol(session, s) for s in SYMBOLS))
elapsed = time.time() - started
sleep_for = max(1, INTERVAL_SECONDS - int(elapsed))
logger.info("round done in %.2fs, sleep %ss", elapsed, sleep_for)
await asyncio.sleep(sleep_for)
async def main() -> None:
collector = MarketDataCollector()
try:
await collector.run_forever()
finally:
collector.close()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("stopped by keyboard interrupt")