arbitrage-engine/backend/market_data_collector.py
root 02a769f513 fix: FR collector use premiumIndex API for real-time 5min data
- Changed from /fapi/v1/fundingRate (8h settlement only) to /fapi/v1/premiumIndex (real-time)
- Each 5-min poll now stores a new row with current timestamp
- Payload includes both fundingRate and lastFundingRate for compat
- Fixes: FR data was only 6 rows per 4 days, now ~288/day per coin
2026-03-02 04:22:12 +00:00

205 lines
7.9 KiB
Python

import asyncio
import json
import logging
import os
import time
from typing import Any
import aiohttp
import psycopg2
from psycopg2.extras import Json
SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"]
INTERVAL_SECONDS = 300
PG_HOST = os.getenv("PG_HOST", "127.0.0.1")
PG_PORT = int(os.getenv("PG_PORT", "5432"))
PG_DB = os.getenv("PG_DB", "arb_engine")
PG_USER = os.getenv("PG_USER", "arb")
PG_PASS = os.getenv("PG_PASS", "arb_engine_2026")
TABLE_SQL = """
CREATE TABLE IF NOT EXISTS market_indicators (
id SERIAL PRIMARY KEY,
symbol VARCHAR(20) NOT NULL,
indicator_type VARCHAR(50) NOT NULL,
timestamp_ms BIGINT NOT NULL,
value JSONB NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(symbol, indicator_type, timestamp_ms)
);
CREATE INDEX IF NOT EXISTS idx_market_indicators_lookup
ON market_indicators(symbol, indicator_type, timestamp_ms DESC);
"""
UPSERT_SQL = """
INSERT INTO market_indicators (symbol, indicator_type, timestamp_ms, value)
VALUES (%s, %s, %s, %s)
ON CONFLICT (symbol, indicator_type, timestamp_ms)
DO UPDATE SET value = EXCLUDED.value;
"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger("market_data_collector")
class MarketDataCollector:
def __init__(self) -> None:
self.conn = psycopg2.connect(
host=PG_HOST,
port=PG_PORT,
dbname=PG_DB,
user=PG_USER,
password=PG_PASS,
)
self.conn.autocommit = False
def close(self) -> None:
try:
self.conn.close()
except Exception:
pass
def ensure_table(self) -> None:
with self.conn.cursor() as cur:
cur.execute(TABLE_SQL)
self.conn.commit()
logger.info("market_indicators table ensured")
def save_indicator(self, symbol: str, indicator_type: str, timestamp_ms: int, payload: Any) -> None:
with self.conn.cursor() as cur:
cur.execute(UPSERT_SQL, (symbol, indicator_type, timestamp_ms, Json(payload)))
self.conn.commit()
async def fetch_json(self, session: aiohttp.ClientSession, url: str, params: dict[str, Any] | None = None) -> Any:
async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=20)) as resp:
if resp.status != 200:
text = await resp.text()
raise RuntimeError(f"HTTP {resp.status} {url} {text[:200]}")
return await resp.json()
async def collect_long_short_ratio(self, session: aiohttp.ClientSession, symbol: str) -> None:
endpoint = "https://fapi.binance.com/futures/data/globalLongShortAccountRatio"
data = await self.fetch_json(session, endpoint, {"symbol": symbol, "period": "5m", "limit": 1})
if not data:
raise RuntimeError("empty response")
item = data[0]
ts = int(item.get("timestamp") or int(time.time() * 1000))
self.save_indicator(symbol, "long_short_ratio", ts, item)
async def collect_top_trader_position(self, session: aiohttp.ClientSession, symbol: str) -> None:
endpoint = "https://fapi.binance.com/futures/data/topLongShortPositionRatio"
data = await self.fetch_json(session, endpoint, {"symbol": symbol, "period": "5m", "limit": 1})
if not data:
raise RuntimeError("empty response")
item = data[0]
ts = int(item.get("timestamp") or int(time.time() * 1000))
self.save_indicator(symbol, "top_trader_position", ts, item)
async def collect_open_interest_hist(self, session: aiohttp.ClientSession, symbol: str) -> None:
endpoint = "https://fapi.binance.com/futures/data/openInterestHist"
data = await self.fetch_json(session, endpoint, {"symbol": symbol, "period": "5m", "limit": 1})
if not data:
raise RuntimeError("empty response")
item = data[0]
ts = int(item.get("timestamp") or int(time.time() * 1000))
self.save_indicator(symbol, "open_interest_hist", ts, item)
async def collect_coinbase_premium(self, session: aiohttp.ClientSession, symbol: str) -> None:
pair_map = {
"BTCUSDT": "BTC-USD",
"ETHUSDT": "ETH-USD",
}
coinbase_pair = pair_map[symbol]
binance_url = "https://api.binance.com/api/v3/ticker/price"
coinbase_url = f"https://api.coinbase.com/v2/prices/{coinbase_pair}/spot"
binance_data = await self.fetch_json(session, binance_url, {"symbol": symbol})
coinbase_data = await self.fetch_json(session, coinbase_url)
binance_price = float(binance_data["price"])
coinbase_price = float(coinbase_data["data"]["amount"])
premium_pct = (coinbase_price - binance_price) / binance_price * 100.0
ts = int(time.time() * 1000)
payload = {
"symbol": symbol,
"coinbase_pair": coinbase_pair,
"binance": binance_data,
"coinbase": coinbase_data,
"premium_pct": premium_pct,
}
self.save_indicator(symbol, "coinbase_premium", ts, payload)
async def collect_funding_rate(self, session: aiohttp.ClientSession, symbol: str) -> None:
endpoint = "https://fapi.binance.com/fapi/v1/premiumIndex"
data = await self.fetch_json(session, endpoint, {"symbol": symbol})
if not data:
raise RuntimeError("empty response")
# premiumIndex returns a single object (not array)
item = data if isinstance(data, dict) else data[0]
# Use current time as timestamp so every 5-min poll stores a new row
ts = int(time.time() * 1000)
payload = {
"symbol": item.get("symbol"),
"markPrice": item.get("markPrice"),
"indexPrice": item.get("indexPrice"),
"lastFundingRate": item.get("lastFundingRate"),
"nextFundingTime": item.get("nextFundingTime"),
"interestRate": item.get("interestRate"),
"fundingRate": item.get("lastFundingRate"), # compat: signal_engine reads 'fundingRate'
"time": ts,
}
self.save_indicator(symbol, "funding_rate", ts, payload)
async def collect_symbol(self, session: aiohttp.ClientSession, symbol: str) -> None:
tasks = [
("long_short_ratio", self.collect_long_short_ratio(session, symbol)),
("top_trader_position", self.collect_top_trader_position(session, symbol)),
("open_interest_hist", self.collect_open_interest_hist(session, symbol)),
("coinbase_premium", self.collect_coinbase_premium(session, symbol)),
("funding_rate", self.collect_funding_rate(session, symbol)),
]
results = await asyncio.gather(*(t[1] for t in tasks), return_exceptions=True)
for i, result in enumerate(results):
name = tasks[i][0]
if isinstance(result, Exception):
logger.error("[%s] %s failed: %s", symbol, name, result)
else:
logger.info("[%s] %s collected", symbol, name)
async def run_forever(self) -> None:
self.ensure_table()
headers = {"User-Agent": "ArbEngine/market-data-collector"}
async with aiohttp.ClientSession(headers=headers) as session:
while True:
started = time.time()
logger.info("start collection round")
await asyncio.gather(*(self.collect_symbol(session, s) for s in SYMBOLS))
elapsed = time.time() - started
sleep_for = max(1, INTERVAL_SECONDS - int(elapsed))
logger.info("round done in %.2fs, sleep %ss", elapsed, sleep_for)
await asyncio.sleep(sleep_for)
async def main() -> None:
collector = MarketDataCollector()
try:
await collector.run_forever()
finally:
collector.close()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("stopped by keyboard interrupt")