327 lines
13 KiB
Python
327 lines
13 KiB
Python
import asyncio
|
||
import json
|
||
import logging
|
||
import os
|
||
import time
|
||
from typing import Any
|
||
|
||
import aiohttp
|
||
import psycopg2
|
||
from psycopg2.extras import Json
|
||
|
||
SYMBOLS = ["BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT"]
|
||
INTERVAL_SECONDS = 300
|
||
|
||
PG_HOST = os.getenv("PG_HOST", "10.106.0.3")
|
||
PG_PORT = int(os.getenv("PG_PORT", "5432"))
|
||
PG_DB = os.getenv("PG_DB", "arb_engine")
|
||
PG_USER = os.getenv("PG_USER", "arb")
|
||
PG_PASS = os.getenv("PG_PASS")
|
||
if not PG_PASS:
|
||
raise RuntimeError("PG_PASS 未设置,请在 .env 或环境变量中注入数据库密码")
|
||
|
||
TABLE_SQL = """
|
||
CREATE TABLE IF NOT EXISTS market_indicators (
|
||
id SERIAL PRIMARY KEY,
|
||
symbol VARCHAR(20) NOT NULL,
|
||
indicator_type VARCHAR(50) NOT NULL,
|
||
timestamp_ms BIGINT NOT NULL,
|
||
value JSONB NOT NULL,
|
||
created_at TIMESTAMP DEFAULT NOW(),
|
||
UNIQUE(symbol, indicator_type, timestamp_ms)
|
||
);
|
||
CREATE INDEX IF NOT EXISTS idx_market_indicators_lookup
|
||
ON market_indicators(symbol, indicator_type, timestamp_ms DESC);
|
||
"""
|
||
|
||
UPSERT_SQL = """
|
||
INSERT INTO market_indicators (symbol, indicator_type, timestamp_ms, value)
|
||
VALUES (%s, %s, %s, %s)
|
||
ON CONFLICT (symbol, indicator_type, timestamp_ms)
|
||
DO UPDATE SET value = EXCLUDED.value;
|
||
"""
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||
)
|
||
logger = logging.getLogger("market_data_collector")
|
||
|
||
|
||
class MarketDataCollector:
|
||
def __init__(self) -> None:
|
||
self.conn = psycopg2.connect(
|
||
host=PG_HOST,
|
||
port=PG_PORT,
|
||
dbname=PG_DB,
|
||
user=PG_USER,
|
||
password=PG_PASS,
|
||
)
|
||
self.conn.autocommit = False
|
||
|
||
def close(self) -> None:
|
||
try:
|
||
self.conn.close()
|
||
except Exception:
|
||
pass
|
||
|
||
def ensure_table(self) -> None:
|
||
with self.conn.cursor() as cur:
|
||
cur.execute(TABLE_SQL)
|
||
self.conn.commit()
|
||
logger.info("market_indicators table ensured")
|
||
|
||
def save_indicator(self, symbol: str, indicator_type: str, timestamp_ms: int, payload: Any) -> None:
|
||
with self.conn.cursor() as cur:
|
||
cur.execute(UPSERT_SQL, (symbol, indicator_type, timestamp_ms, Json(payload)))
|
||
self.conn.commit()
|
||
|
||
async def fetch_json(self, session: aiohttp.ClientSession, url: str, params: dict[str, Any] | None = None) -> Any:
|
||
async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=20)) as resp:
|
||
if resp.status != 200:
|
||
text = await resp.text()
|
||
raise RuntimeError(f"HTTP {resp.status} {url} {text[:200]}")
|
||
return await resp.json()
|
||
|
||
async def collect_long_short_ratio(self, session: aiohttp.ClientSession, symbol: str) -> None:
|
||
endpoint = "https://fapi.binance.com/futures/data/globalLongShortAccountRatio"
|
||
data = await self.fetch_json(session, endpoint, {"symbol": symbol, "period": "5m", "limit": 1})
|
||
if not data:
|
||
raise RuntimeError("empty response")
|
||
item = data[0]
|
||
ts = int(item.get("timestamp") or int(time.time() * 1000))
|
||
self.save_indicator(symbol, "long_short_ratio", ts, item)
|
||
|
||
async def collect_top_trader_position(self, session: aiohttp.ClientSession, symbol: str) -> None:
|
||
endpoint = "https://fapi.binance.com/futures/data/topLongShortPositionRatio"
|
||
data = await self.fetch_json(session, endpoint, {"symbol": symbol, "period": "5m", "limit": 1})
|
||
if not data:
|
||
raise RuntimeError("empty response")
|
||
item = data[0]
|
||
ts = int(item.get("timestamp") or int(time.time() * 1000))
|
||
self.save_indicator(symbol, "top_trader_position", ts, item)
|
||
|
||
async def collect_open_interest_hist(self, session: aiohttp.ClientSession, symbol: str) -> None:
|
||
endpoint = "https://fapi.binance.com/futures/data/openInterestHist"
|
||
data = await self.fetch_json(session, endpoint, {"symbol": symbol, "period": "5m", "limit": 1})
|
||
if not data:
|
||
raise RuntimeError("empty response")
|
||
item = data[0]
|
||
ts = int(item.get("timestamp") or int(time.time() * 1000))
|
||
self.save_indicator(symbol, "open_interest_hist", ts, item)
|
||
|
||
async def collect_coinbase_premium(self, session: aiohttp.ClientSession, symbol: str) -> None:
|
||
pair_map = {
|
||
"BTCUSDT": "BTC-USD",
|
||
"ETHUSDT": "ETH-USD",
|
||
}
|
||
coinbase_pair = pair_map.get(symbol)
|
||
if not coinbase_pair:
|
||
return # XRP/SOL无Coinbase数据,跳过
|
||
|
||
binance_url = "https://api.binance.com/api/v3/ticker/price"
|
||
coinbase_url = f"https://api.coinbase.com/v2/prices/{coinbase_pair}/spot"
|
||
|
||
binance_data = await self.fetch_json(session, binance_url, {"symbol": symbol})
|
||
coinbase_data = await self.fetch_json(session, coinbase_url)
|
||
|
||
binance_price = float(binance_data["price"])
|
||
coinbase_price = float(coinbase_data["data"]["amount"])
|
||
premium_pct = (coinbase_price - binance_price) / binance_price * 100.0
|
||
ts = int(time.time() * 1000)
|
||
|
||
payload = {
|
||
"symbol": symbol,
|
||
"coinbase_pair": coinbase_pair,
|
||
"binance": binance_data,
|
||
"coinbase": coinbase_data,
|
||
"premium_pct": premium_pct,
|
||
}
|
||
self.save_indicator(symbol, "coinbase_premium", ts, payload)
|
||
|
||
async def collect_funding_rate(self, session: aiohttp.ClientSession, symbol: str) -> None:
|
||
endpoint = "https://fapi.binance.com/fapi/v1/premiumIndex"
|
||
data = await self.fetch_json(session, endpoint, {"symbol": symbol})
|
||
if not data:
|
||
raise RuntimeError("empty response")
|
||
# premiumIndex returns a single object (not array)
|
||
item = data if isinstance(data, dict) else data[0]
|
||
# Use current time as timestamp so every 5-min poll stores a new row
|
||
ts = int(time.time() * 1000)
|
||
payload = {
|
||
"symbol": item.get("symbol"),
|
||
"markPrice": item.get("markPrice"),
|
||
"indexPrice": item.get("indexPrice"),
|
||
"lastFundingRate": item.get("lastFundingRate"),
|
||
"nextFundingTime": item.get("nextFundingTime"),
|
||
"interestRate": item.get("interestRate"),
|
||
"fundingRate": item.get("lastFundingRate"), # compat: signal_engine reads 'fundingRate'
|
||
"time": ts,
|
||
}
|
||
self.save_indicator(symbol, "funding_rate", ts, payload)
|
||
|
||
async def collect_obi_depth(self, session: aiohttp.ClientSession, symbol: str) -> None:
|
||
"""
|
||
OBI(订单簿失衡)采集 — Phase 2 BTC gate-control 核心特征
|
||
计算:(bid_vol - ask_vol) / (bid_vol + ask_vol),范围[-1,1]
|
||
正值=买压大,负值=卖压大
|
||
"""
|
||
endpoint = "https://fapi.binance.com/fapi/v1/depth"
|
||
data = await self.fetch_json(session, endpoint, {"symbol": symbol, "limit": 10})
|
||
ts = int(time.time() * 1000)
|
||
|
||
bids = data.get("bids", [])
|
||
asks = data.get("asks", [])
|
||
bid_vol = sum(float(b[1]) for b in bids)
|
||
ask_vol = sum(float(a[1]) for a in asks)
|
||
total_vol = bid_vol + ask_vol
|
||
obi = (bid_vol - ask_vol) / total_vol if total_vol > 0 else 0.0
|
||
|
||
best_bid = float(bids[0][0]) if bids else 0.0
|
||
best_ask = float(asks[0][0]) if asks else 0.0
|
||
spread_bps = ((best_ask - best_bid) / best_bid * 10000) if best_bid > 0 else 0.0
|
||
|
||
payload = {
|
||
"symbol": symbol,
|
||
"obi": round(obi, 6),
|
||
"bid_vol_10": round(bid_vol, 4),
|
||
"ask_vol_10": round(ask_vol, 4),
|
||
"best_bid": best_bid,
|
||
"best_ask": best_ask,
|
||
"spread_bps": round(spread_bps, 3),
|
||
}
|
||
self.save_indicator(symbol, "obi_depth_10", ts, payload)
|
||
|
||
async def collect_spot_perp_divergence(self, session: aiohttp.ClientSession, symbol: str) -> None:
|
||
"""
|
||
期现背离采集 — Phase 2 BTC gate-control 核心特征
|
||
divergence = (spot - mark) / mark,正值=现货溢价,负值=现货折价
|
||
"""
|
||
spot_url = "https://api.binance.com/api/v3/ticker/price"
|
||
perp_url = "https://fapi.binance.com/fapi/v1/premiumIndex"
|
||
|
||
spot_data, perp_data = await asyncio.gather(
|
||
self.fetch_json(session, spot_url, {"symbol": symbol}),
|
||
self.fetch_json(session, perp_url, {"symbol": symbol}),
|
||
)
|
||
ts = int(time.time() * 1000)
|
||
|
||
spot_price = float(spot_data["price"])
|
||
mark_price = float(perp_data["markPrice"])
|
||
index_price = float(perp_data.get("indexPrice", mark_price))
|
||
divergence = (spot_price - mark_price) / mark_price if mark_price > 0 else 0.0
|
||
|
||
payload = {
|
||
"symbol": symbol,
|
||
"spot_price": spot_price,
|
||
"mark_price": mark_price,
|
||
"index_price": index_price,
|
||
"divergence": round(divergence, 8),
|
||
"divergence_bps": round(divergence * 10000, 3),
|
||
}
|
||
self.save_indicator(symbol, "spot_perp_divergence", ts, payload)
|
||
|
||
async def collect_tiered_cvd_whale(self, session: aiohttp.ClientSession, symbol: str) -> None:
|
||
"""
|
||
巨鲸CVD分层采集 — Phase 2 BTC gate-control 核心特征
|
||
分层:small(<$10k), medium($10k-$100k), whale(>$100k)
|
||
net_cvd = buy_usd - sell_usd(正=净买入)
|
||
"""
|
||
endpoint = "https://fapi.binance.com/fapi/v1/aggTrades"
|
||
data = await self.fetch_json(session, endpoint, {"symbol": symbol, "limit": 500})
|
||
ts = int(time.time() * 1000)
|
||
|
||
tiers = {
|
||
"small": {"buy": 0.0, "sell": 0.0},
|
||
"medium": {"buy": 0.0, "sell": 0.0},
|
||
"whale": {"buy": 0.0, "sell": 0.0},
|
||
}
|
||
|
||
for trade in data:
|
||
price = float(trade["p"])
|
||
qty = float(trade["q"])
|
||
usd_val = price * qty
|
||
is_sell = trade["m"] # m=True 表示卖单(taker卖)
|
||
|
||
if usd_val < 10_000:
|
||
tier = "small"
|
||
elif usd_val < 100_000:
|
||
tier = "medium"
|
||
else:
|
||
tier = "whale"
|
||
|
||
if is_sell:
|
||
tiers[tier]["sell"] += usd_val
|
||
else:
|
||
tiers[tier]["buy"] += usd_val
|
||
|
||
result = {}
|
||
for name, t in tiers.items():
|
||
buy, sell = t["buy"], t["sell"]
|
||
net = buy - sell
|
||
total = buy + sell
|
||
result[name] = {
|
||
"buy_usd": round(buy, 2),
|
||
"sell_usd": round(sell, 2),
|
||
"net_cvd": round(net, 2),
|
||
"cvd_ratio": round(net / total, 4) if total > 0 else 0.0,
|
||
}
|
||
|
||
payload = {
|
||
"symbol": symbol,
|
||
"tiers": result,
|
||
"whale_net_cvd": result["whale"]["net_cvd"],
|
||
"whale_cvd_ratio": result["whale"]["cvd_ratio"],
|
||
}
|
||
self.save_indicator(symbol, "tiered_cvd_whale", ts, payload)
|
||
|
||
async def collect_symbol(self, session: aiohttp.ClientSession, symbol: str) -> None:
|
||
tasks = [
|
||
("long_short_ratio", self.collect_long_short_ratio(session, symbol)),
|
||
("top_trader_position", self.collect_top_trader_position(session, symbol)),
|
||
("open_interest_hist", self.collect_open_interest_hist(session, symbol)),
|
||
("coinbase_premium", self.collect_coinbase_premium(session, symbol)),
|
||
("funding_rate", self.collect_funding_rate(session, symbol)),
|
||
("obi_depth_10", self.collect_obi_depth(session, symbol)),
|
||
("spot_perp_divergence", self.collect_spot_perp_divergence(session, symbol)),
|
||
("tiered_cvd_whale", self.collect_tiered_cvd_whale(session, symbol)),
|
||
]
|
||
|
||
results = await asyncio.gather(*(t[1] for t in tasks), return_exceptions=True)
|
||
|
||
for i, result in enumerate(results):
|
||
name = tasks[i][0]
|
||
if isinstance(result, Exception):
|
||
logger.error("[%s] %s failed: %s", symbol, name, result)
|
||
else:
|
||
logger.info("[%s] %s collected", symbol, name)
|
||
|
||
async def run_forever(self) -> None:
|
||
self.ensure_table()
|
||
headers = {"User-Agent": "ArbEngine/market-data-collector"}
|
||
|
||
async with aiohttp.ClientSession(headers=headers) as session:
|
||
while True:
|
||
started = time.time()
|
||
logger.info("start collection round")
|
||
await asyncio.gather(*(self.collect_symbol(session, s) for s in SYMBOLS))
|
||
elapsed = time.time() - started
|
||
sleep_for = max(1, INTERVAL_SECONDS - int(elapsed))
|
||
logger.info("round done in %.2fs, sleep %ss", elapsed, sleep_for)
|
||
await asyncio.sleep(sleep_for)
|
||
|
||
|
||
async def main() -> None:
|
||
collector = MarketDataCollector()
|
||
try:
|
||
await collector.run_forever()
|
||
finally:
|
||
collector.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
asyncio.run(main())
|
||
except KeyboardInterrupt:
|
||
logger.info("stopped by keyboard interrupt")
|