From af068c5c513af0853c8df457acc679e9d3eee311 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 21 Apr 2023 15:04:47 -0400 Subject: [PATCH] binance: port `stream_messages()` to use `match:` and a new `L1` struct --- piker/brokers/binance.py | 130 +++++++++++++++++++++++++++------------ 1 file changed, 91 insertions(+), 39 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index cde20d3f..06c3ed46 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -176,6 +176,18 @@ class OHLC(Struct): bar_wap: float = 0.0 +class L1(Struct): + # https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams + + update_id: int + sym: str + + bid: float + bsize: float + ask: float + asize: float + + # convert datetime obj timestamp to unixtime in milliseconds def binance_timestamp( when: datetime @@ -363,48 +375,88 @@ async def stream_messages( ) -> AsyncGenerator[NoBsWs, dict]: # TODO: match syntax here! + msg: dict[str, Any] async for msg in ws: + match msg: + # for l1 streams binance doesn't add an event type field so + # identify those messages by matching keys + # https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams + case { + # NOTE: this is never an old value it seems, so + # they are always sending real L1 spread updates. + 'u': upid, # update id + 's': sym, + 'b': bid, + 'B': bsize, + 'a': ask, + 'A': asize, + }: + # TODO: it would be super nice to have a `L1` piker type + # which "renders" incremental tick updates from a packed + # msg-struct: + # - backend msgs after packed into the type such that we + # can reduce IPC usage but without each backend having + # to do that incremental update logic manually B) + # - would it maybe be more efficient to use this instead? + # https://binance-docs.github.io/apidocs/spot/en/#diff-depth-stream + l1 = L1( + update_id=upid, + sym=sym, + bid=bid, + bsize=bsize, + ask=ask, + asize=asize, + ) + l1.typecast() - # for l1 streams binance doesn't add an event type field so - # identify those messages by matching keys - # https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams - if msg.get('u'): - sym = msg['s'] - bid = float(msg['b']) - bsize = float(msg['B']) - ask = float(msg['a']) - asize = float(msg['A']) + # repack into piker's tick-quote format + yield 'l1', { + 'symbol': l1.sym, + 'ticks': [ + { + 'type': 'bid', + 'price': l1.bid, + 'size': l1.bsize, + }, + { + 'type': 'bsize', + 'price': l1.bid, + 'size': l1.bsize, + }, + { + 'type': 'ask', + 'price': l1.ask, + 'size': l1.asize, + }, + { + 'type': 'asize', + 'price': l1.ask, + 'size': l1.asize, + } + ] + } - yield 'l1', { - 'symbol': sym, - 'ticks': [ - {'type': 'bid', 'price': bid, 'size': bsize}, - {'type': 'bsize', 'price': bid, 'size': bsize}, - {'type': 'ask', 'price': ask, 'size': asize}, - {'type': 'asize', 'price': ask, 'size': asize} - ] - } - - elif msg.get('e') == 'aggTrade': - - # NOTE: this is purely for a definition, ``msgspec.Struct`` - # does not runtime-validate until you decode/encode. - # see: https://jcristharif.com/msgspec/structs.html#type-validation - msg = AggTrade(**msg) - - # TODO: type out and require this quote format - # from all backends! - yield 'trade', { - 'symbol': msg.s, - 'last': msg.p, - 'brokerd_ts': time.time(), - 'ticks': [{ - 'type': 'trade', - 'price': float(msg.p), - 'size': float(msg.q), - 'broker_ts': msg.T, - }], - } + # https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams + case { + 'e': 'aggTrade', + }: + # NOTE: this is purely for a definition, + # ``msgspec.Struct`` does not runtime-validate until you + # decode/encode, see: + # https://jcristharif.com/msgspec/structs.html#type-validation + msg = AggTrade(**msg) + msg.typecast() + yield 'trade', { + 'symbol': msg.s, + 'last': msg.p, + 'brokerd_ts': time.time(), + 'ticks': [{ + 'type': 'trade', + 'price': msg.p, + 'size': msg.q, + 'broker_ts': msg.T, + }], + } def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]: