binance: port `stream_messages()` to use `match:` and a new `L1` struct

rekt_pps
Tyler Goodlet 2023-04-21 15:04:47 -04:00
parent f6cd08c6fa
commit af068c5c51
1 changed files with 91 additions and 39 deletions

View File

@ -176,6 +176,18 @@ class OHLC(Struct):
bar_wap: float = 0.0 bar_wap: float = 0.0
class L1(Struct):
# https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams
update_id: int
sym: str
bid: float
bsize: float
ask: float
asize: float
# convert datetime obj timestamp to unixtime in milliseconds # convert datetime obj timestamp to unixtime in milliseconds
def binance_timestamp( def binance_timestamp(
when: datetime when: datetime
@ -363,48 +375,88 @@ async def stream_messages(
) -> AsyncGenerator[NoBsWs, dict]: ) -> AsyncGenerator[NoBsWs, dict]:
# TODO: match syntax here! # TODO: match syntax here!
msg: dict[str, Any]
async for msg in ws: async for msg in ws:
match msg:
# for l1 streams binance doesn't add an event type field so
# identify those messages by matching keys
# https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams
case {
# NOTE: this is never an old value it seems, so
# they are always sending real L1 spread updates.
'u': upid, # update id
's': sym,
'b': bid,
'B': bsize,
'a': ask,
'A': asize,
}:
# TODO: it would be super nice to have a `L1` piker type
# which "renders" incremental tick updates from a packed
# msg-struct:
# - backend msgs after packed into the type such that we
# can reduce IPC usage but without each backend having
# to do that incremental update logic manually B)
# - would it maybe be more efficient to use this instead?
# https://binance-docs.github.io/apidocs/spot/en/#diff-depth-stream
l1 = L1(
update_id=upid,
sym=sym,
bid=bid,
bsize=bsize,
ask=ask,
asize=asize,
)
l1.typecast()
# for l1 streams binance doesn't add an event type field so # repack into piker's tick-quote format
# identify those messages by matching keys yield 'l1', {
# https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams 'symbol': l1.sym,
if msg.get('u'): 'ticks': [
sym = msg['s'] {
bid = float(msg['b']) 'type': 'bid',
bsize = float(msg['B']) 'price': l1.bid,
ask = float(msg['a']) 'size': l1.bsize,
asize = float(msg['A']) },
{
'type': 'bsize',
'price': l1.bid,
'size': l1.bsize,
},
{
'type': 'ask',
'price': l1.ask,
'size': l1.asize,
},
{
'type': 'asize',
'price': l1.ask,
'size': l1.asize,
}
]
}
yield 'l1', { # https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams
'symbol': sym, case {
'ticks': [ 'e': 'aggTrade',
{'type': 'bid', 'price': bid, 'size': bsize}, }:
{'type': 'bsize', 'price': bid, 'size': bsize}, # NOTE: this is purely for a definition,
{'type': 'ask', 'price': ask, 'size': asize}, # ``msgspec.Struct`` does not runtime-validate until you
{'type': 'asize', 'price': ask, 'size': asize} # decode/encode, see:
] # https://jcristharif.com/msgspec/structs.html#type-validation
} msg = AggTrade(**msg)
msg.typecast()
elif msg.get('e') == 'aggTrade': yield 'trade', {
'symbol': msg.s,
# NOTE: this is purely for a definition, ``msgspec.Struct`` 'last': msg.p,
# does not runtime-validate until you decode/encode. 'brokerd_ts': time.time(),
# see: https://jcristharif.com/msgspec/structs.html#type-validation 'ticks': [{
msg = AggTrade(**msg) 'type': 'trade',
'price': msg.p,
# TODO: type out and require this quote format 'size': msg.q,
# from all backends! 'broker_ts': msg.T,
yield 'trade', { }],
'symbol': msg.s, }
'last': msg.p,
'brokerd_ts': time.time(),
'ticks': [{
'type': 'trade',
'price': float(msg.p),
'size': float(msg.q),
'broker_ts': msg.T,
}],
}
def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]: def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]: