diff --git a/backend/market_data_collector.py b/backend/market_data_collector.py new file mode 100644 index 0000000..b9a4eb1 --- /dev/null +++ b/backend/market_data_collector.py @@ -0,0 +1,182 @@ +import asyncio +import json +import logging +import os +import time +from typing import Any + +import aiohttp +import psycopg2 +from psycopg2.extras import Json + +SYMBOLS = ["BTCUSDT", "ETHUSDT"] +INTERVAL_SECONDS = 300 + +PG_HOST = os.getenv("PG_HOST", "127.0.0.1") +PG_PORT = int(os.getenv("PG_PORT", "5432")) +PG_DB = os.getenv("PG_DB", "arb_engine") +PG_USER = os.getenv("PG_USER", "arb") +PG_PASS = os.getenv("PG_PASS", "arb_engine_2026") + +TABLE_SQL = """ +CREATE TABLE IF NOT EXISTS market_indicators ( + id SERIAL PRIMARY KEY, + symbol VARCHAR(20) NOT NULL, + indicator_type VARCHAR(50) NOT NULL, + timestamp_ms BIGINT NOT NULL, + value JSONB NOT NULL, + created_at TIMESTAMP DEFAULT NOW(), + UNIQUE(symbol, indicator_type, timestamp_ms) +); +CREATE INDEX IF NOT EXISTS idx_market_indicators_lookup +ON market_indicators(symbol, indicator_type, timestamp_ms DESC); +""" + +UPSERT_SQL = """ +INSERT INTO market_indicators (symbol, indicator_type, timestamp_ms, value) +VALUES (%s, %s, %s, %s) +ON CONFLICT (symbol, indicator_type, timestamp_ms) +DO UPDATE SET value = EXCLUDED.value; +""" + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", +) +logger = logging.getLogger("market_data_collector") + + +class MarketDataCollector: + def __init__(self) -> None: + self.conn = psycopg2.connect( + host=PG_HOST, + port=PG_PORT, + dbname=PG_DB, + user=PG_USER, + password=PG_PASS, + ) + self.conn.autocommit = False + + def close(self) -> None: + try: + self.conn.close() + except Exception: + pass + + def ensure_table(self) -> None: + with self.conn.cursor() as cur: + cur.execute(TABLE_SQL) + self.conn.commit() + logger.info("market_indicators table ensured") + + def save_indicator(self, symbol: str, indicator_type: str, timestamp_ms: int, payload: Any) -> None: + with self.conn.cursor() as cur: + cur.execute(UPSERT_SQL, (symbol, indicator_type, timestamp_ms, Json(payload))) + self.conn.commit() + + async def fetch_json(self, session: aiohttp.ClientSession, url: str, params: dict[str, Any] | None = None) -> Any: + async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=20)) as resp: + if resp.status != 200: + text = await resp.text() + raise RuntimeError(f"HTTP {resp.status} {url} {text[:200]}") + return await resp.json() + + async def collect_long_short_ratio(self, session: aiohttp.ClientSession, symbol: str) -> None: + endpoint = "https://fapi.binance.com/futures/data/globalLongShortAccountRatio" + data = await self.fetch_json(session, endpoint, {"symbol": symbol, "period": "5m", "limit": 1}) + if not data: + raise RuntimeError("empty response") + item = data[0] + ts = int(item.get("timestamp") or int(time.time() * 1000)) + self.save_indicator(symbol, "long_short_ratio", ts, item) + + async def collect_top_trader_position(self, session: aiohttp.ClientSession, symbol: str) -> None: + endpoint = "https://fapi.binance.com/futures/data/topLongShortPositionRatio" + data = await self.fetch_json(session, endpoint, {"symbol": symbol, "period": "5m", "limit": 1}) + if not data: + raise RuntimeError("empty response") + item = data[0] + ts = int(item.get("timestamp") or int(time.time() * 1000)) + self.save_indicator(symbol, "top_trader_position", ts, item) + + async def collect_open_interest_hist(self, session: aiohttp.ClientSession, symbol: str) -> None: + endpoint = "https://fapi.binance.com/futures/data/openInterestHist" + data = await self.fetch_json(session, endpoint, {"symbol": symbol, "period": "5m", "limit": 1}) + if not data: + raise RuntimeError("empty response") + item = data[0] + ts = int(item.get("timestamp") or int(time.time() * 1000)) + self.save_indicator(symbol, "open_interest_hist", ts, item) + + async def collect_coinbase_premium(self, session: aiohttp.ClientSession, symbol: str) -> None: + pair_map = { + "BTCUSDT": "BTC-USD", + "ETHUSDT": "ETH-USD", + } + coinbase_pair = pair_map[symbol] + + binance_url = "https://api.binance.com/api/v3/ticker/price" + coinbase_url = f"https://api.coinbase.com/v2/prices/{coinbase_pair}/spot" + + binance_data = await self.fetch_json(session, binance_url, {"symbol": symbol}) + coinbase_data = await self.fetch_json(session, coinbase_url) + + binance_price = float(binance_data["price"]) + coinbase_price = float(coinbase_data["data"]["amount"]) + premium_pct = (coinbase_price - binance_price) / binance_price * 100.0 + ts = int(time.time() * 1000) + + payload = { + "symbol": symbol, + "coinbase_pair": coinbase_pair, + "binance": binance_data, + "coinbase": coinbase_data, + "premium_pct": premium_pct, + } + self.save_indicator(symbol, "coinbase_premium", ts, payload) + + async def collect_symbol(self, session: aiohttp.ClientSession, symbol: str) -> None: + tasks = [ + ("long_short_ratio", self.collect_long_short_ratio(session, symbol)), + ("top_trader_position", self.collect_top_trader_position(session, symbol)), + ("open_interest_hist", self.collect_open_interest_hist(session, symbol)), + ("coinbase_premium", self.collect_coinbase_premium(session, symbol)), + ] + + results = await asyncio.gather(*(t[1] for t in tasks), return_exceptions=True) + + for i, result in enumerate(results): + name = tasks[i][0] + if isinstance(result, Exception): + logger.error("[%s] %s failed: %s", symbol, name, result) + else: + logger.info("[%s] %s collected", symbol, name) + + async def run_forever(self) -> None: + self.ensure_table() + headers = {"User-Agent": "ArbEngine/market-data-collector"} + + async with aiohttp.ClientSession(headers=headers) as session: + while True: + started = time.time() + logger.info("start collection round") + await asyncio.gather(*(self.collect_symbol(session, s) for s in SYMBOLS)) + elapsed = time.time() - started + sleep_for = max(1, INTERVAL_SECONDS - int(elapsed)) + logger.info("round done in %.2fs, sleep %ss", elapsed, sleep_for) + await asyncio.sleep(sleep_for) + + +async def main() -> None: + collector = MarketDataCollector() + try: + await collector.run_forever() + finally: + collector.close() + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + logger.info("stopped by keyboard interrupt")