From 17a387b6f41e902621c066a291f823b1fc46947d Mon Sep 17 00:00:00 2001 From: root Date: Tue, 3 Mar 2026 13:42:21 +0000 Subject: [PATCH] feat: Phase 2 - collect obi_depth_10, spot_perp_divergence, tiered_cvd_whale for BTC gate-control --- backend/market_data_collector.py | 122 ++++++++++++++++++++++++++++++- backend/signal_engine.py | 18 ++++- 2 files changed, 136 insertions(+), 4 deletions(-) diff --git a/backend/market_data_collector.py b/backend/market_data_collector.py index 14e7851..ccd5940 100644 --- a/backend/market_data_collector.py +++ b/backend/market_data_collector.py @@ -16,7 +16,9 @@ PG_HOST = os.getenv("PG_HOST", "10.106.0.3") PG_PORT = int(os.getenv("PG_PORT", "5432")) PG_DB = os.getenv("PG_DB", "arb_engine") PG_USER = os.getenv("PG_USER", "arb") -PG_PASS = os.getenv("PG_PASS", "arb_engine_2026") +PG_PASS = os.getenv("PG_PASS") +if not PG_PASS: + raise RuntimeError("PG_PASS 未设置,请在 .env 或环境变量中注入数据库密码") TABLE_SQL = """ CREATE TABLE IF NOT EXISTS market_indicators ( @@ -158,6 +160,121 @@ class MarketDataCollector: } self.save_indicator(symbol, "funding_rate", ts, payload) + async def collect_obi_depth(self, session: aiohttp.ClientSession, symbol: str) -> None: + """ + OBI(订单簿失衡)采集 — Phase 2 BTC gate-control 核心特征 + 计算:(bid_vol - ask_vol) / (bid_vol + ask_vol),范围[-1,1] + 正值=买压大,负值=卖压大 + """ + endpoint = "https://fapi.binance.com/fapi/v1/depth" + data = await self.fetch_json(session, endpoint, {"symbol": symbol, "limit": 10}) + ts = int(time.time() * 1000) + + bids = data.get("bids", []) + asks = data.get("asks", []) + bid_vol = sum(float(b[1]) for b in bids) + ask_vol = sum(float(a[1]) for a in asks) + total_vol = bid_vol + ask_vol + obi = (bid_vol - ask_vol) / total_vol if total_vol > 0 else 0.0 + + best_bid = float(bids[0][0]) if bids else 0.0 + best_ask = float(asks[0][0]) if asks else 0.0 + spread_bps = ((best_ask - best_bid) / best_bid * 10000) if best_bid > 0 else 0.0 + + payload = { + "symbol": symbol, + "obi": round(obi, 6), + "bid_vol_10": round(bid_vol, 4), + "ask_vol_10": round(ask_vol, 4), + "best_bid": best_bid, + "best_ask": best_ask, + "spread_bps": round(spread_bps, 3), + } + self.save_indicator(symbol, "obi_depth_10", ts, payload) + + async def collect_spot_perp_divergence(self, session: aiohttp.ClientSession, symbol: str) -> None: + """ + 期现背离采集 — Phase 2 BTC gate-control 核心特征 + divergence = (spot - mark) / mark,正值=现货溢价,负值=现货折价 + """ + spot_url = "https://api.binance.com/api/v3/ticker/price" + perp_url = "https://fapi.binance.com/fapi/v1/premiumIndex" + + spot_data, perp_data = await asyncio.gather( + self.fetch_json(session, spot_url, {"symbol": symbol}), + self.fetch_json(session, perp_url, {"symbol": symbol}), + ) + ts = int(time.time() * 1000) + + spot_price = float(spot_data["price"]) + mark_price = float(perp_data["markPrice"]) + index_price = float(perp_data.get("indexPrice", mark_price)) + divergence = (spot_price - mark_price) / mark_price if mark_price > 0 else 0.0 + + payload = { + "symbol": symbol, + "spot_price": spot_price, + "mark_price": mark_price, + "index_price": index_price, + "divergence": round(divergence, 8), + "divergence_bps": round(divergence * 10000, 3), + } + self.save_indicator(symbol, "spot_perp_divergence", ts, payload) + + async def collect_tiered_cvd_whale(self, session: aiohttp.ClientSession, symbol: str) -> None: + """ + 巨鲸CVD分层采集 — Phase 2 BTC gate-control 核心特征 + 分层:small(<$10k), medium($10k-$100k), whale(>$100k) + net_cvd = buy_usd - sell_usd(正=净买入) + """ + endpoint = "https://fapi.binance.com/fapi/v1/aggTrades" + data = await self.fetch_json(session, endpoint, {"symbol": symbol, "limit": 500}) + ts = int(time.time() * 1000) + + tiers = { + "small": {"buy": 0.0, "sell": 0.0}, + "medium": {"buy": 0.0, "sell": 0.0}, + "whale": {"buy": 0.0, "sell": 0.0}, + } + + for trade in data: + price = float(trade["p"]) + qty = float(trade["q"]) + usd_val = price * qty + is_sell = trade["m"] # m=True 表示卖单(taker卖) + + if usd_val < 10_000: + tier = "small" + elif usd_val < 100_000: + tier = "medium" + else: + tier = "whale" + + if is_sell: + tiers[tier]["sell"] += usd_val + else: + tiers[tier]["buy"] += usd_val + + result = {} + for name, t in tiers.items(): + buy, sell = t["buy"], t["sell"] + net = buy - sell + total = buy + sell + result[name] = { + "buy_usd": round(buy, 2), + "sell_usd": round(sell, 2), + "net_cvd": round(net, 2), + "cvd_ratio": round(net / total, 4) if total > 0 else 0.0, + } + + payload = { + "symbol": symbol, + "tiers": result, + "whale_net_cvd": result["whale"]["net_cvd"], + "whale_cvd_ratio": result["whale"]["cvd_ratio"], + } + self.save_indicator(symbol, "tiered_cvd_whale", ts, payload) + async def collect_symbol(self, session: aiohttp.ClientSession, symbol: str) -> None: tasks = [ ("long_short_ratio", self.collect_long_short_ratio(session, symbol)), @@ -165,6 +282,9 @@ class MarketDataCollector: ("open_interest_hist", self.collect_open_interest_hist(session, symbol)), ("coinbase_premium", self.collect_coinbase_premium(session, symbol)), ("funding_rate", self.collect_funding_rate(session, symbol)), + ("obi_depth_10", self.collect_obi_depth(session, symbol)), + ("spot_perp_divergence", self.collect_spot_perp_divergence(session, symbol)), + ("tiered_cvd_whale", self.collect_tiered_cvd_whale(session, symbol)), ] results = await asyncio.gather(*(t[1] for t in tasks), return_exceptions=True) diff --git a/backend/signal_engine.py b/backend/signal_engine.py index ce13c6f..17b7c5c 100644 --- a/backend/signal_engine.py +++ b/backend/signal_engine.py @@ -126,7 +126,12 @@ def fetch_market_indicators(symbol: str) -> dict: with get_sync_conn() as conn: with conn.cursor() as cur: indicators = {} - for ind_type in ["long_short_ratio", "top_trader_position", "open_interest_hist", "coinbase_premium", "funding_rate"]: + ind_types = [ + "long_short_ratio", "top_trader_position", "open_interest_hist", + "coinbase_premium", "funding_rate", + "obi_depth_10", "spot_perp_divergence", "tiered_cvd_whale", # Phase 2 + ] + for ind_type in ind_types: cur.execute( "SELECT value FROM market_indicators WHERE symbol=%s AND indicator_type=%s ORDER BY timestamp_ms DESC LIMIT 1", (symbol, ind_type), @@ -135,7 +140,6 @@ def fetch_market_indicators(symbol: str) -> dict: if not row or row[0] is None: indicators[ind_type] = None continue - # value可能是JSON字符串或已解析的dict val = row[0] if isinstance(val, str): try: @@ -151,10 +155,18 @@ def fetch_market_indicators(symbol: str) -> dict: elif ind_type == "open_interest_hist": indicators[ind_type] = float(val.get("sumOpenInterestValue", 0)) elif ind_type == "coinbase_premium": - # premium_pct存的是百分比(如0.05=0.05%),转成比例(0.0005) indicators[ind_type] = float(val.get("premium_pct", 0)) / 100.0 elif ind_type == "funding_rate": indicators[ind_type] = float(val.get("fundingRate", val.get("lastFundingRate", 0))) + elif ind_type == "obi_depth_10": + # obi范围[-1,1],正值=买压,负值=卖压 + indicators[ind_type] = float(val.get("obi", 0)) + elif ind_type == "spot_perp_divergence": + # divergence = (spot - mark) / mark + indicators[ind_type] = float(val.get("divergence", 0)) + elif ind_type == "tiered_cvd_whale": + # 巨鲸净CVD比率[-1,1],正值=净买入 + indicators[ind_type] = float(val.get("whale_cvd_ratio", 0)) return indicators