From e757e1f2779d0f6c04451b6ab3c1d825165ae6e9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Jul 2022 22:52:25 -0400 Subject: [PATCH] Use `match:` syntax in data feed subs processing --- piker/brokers/kraken/feed.py | 95 ++++++++++++++++++++++++------------ 1 file changed, 63 insertions(+), 32 deletions(-) diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index 71b75082..2d6ecbf1 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -117,9 +117,8 @@ async def stream_messages( too_slow_count = 0 continue - if isinstance(msg, dict): - if msg.get('event') == 'heartbeat': - + match msg: + case {'event': 'heartbeat'}: now = time.time() delay = now - last_hb last_hb = now @@ -130,11 +129,20 @@ async def stream_messages( continue - err = msg.get('errorMessage') - if err: - raise BrokerError(err) - else: - yield msg + case { + 'connectionID': _, + 'event': 'systemStatus', + 'status': 'online', + 'version': _, + } as msg: + log.info( + 'WS connection is up:\n' + f'{msg}' + ) + continue + + case _: + yield msg async def process_data_feed_msgs( @@ -145,37 +153,60 @@ async def process_data_feed_msgs( ''' async for msg in stream_messages(ws): + match msg: + case { + 'errorMessage': errmsg + }: + raise BrokerError(errmsg) - chan_id, *payload_array, chan_name, pair = msg + case { + 'event': 'subscriptionStatus', + } as sub: + log.info( + 'WS subscription is active:\n' + f'{sub}' + ) + continue - if 'ohlc' in chan_name: + case [ + chan_id, + *payload_array, + chan_name, + pair + ]: + if 'ohlc' in chan_name: + yield 'ohlc', OHLC( + chan_id, + chan_name, + pair, + *payload_array[0] + ) - yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0]) + elif 'spread' in chan_name: - elif 'spread' in chan_name: + bid, ask, ts, bsize, asize = map( + float, payload_array[0]) - bid, ask, ts, bsize, asize = map(float, payload_array[0]) + # TODO: really makes you think IB has a horrible API... + quote = { + 'symbol': pair.replace('/', ''), + 'ticks': [ + {'type': 'bid', 'price': bid, 'size': bsize}, + {'type': 'bsize', 'price': bid, 'size': bsize}, - # TODO: really makes you think IB has a horrible API... - quote = { - 'symbol': pair.replace('/', ''), - 'ticks': [ - {'type': 'bid', 'price': bid, 'size': bsize}, - {'type': 'bsize', 'price': bid, 'size': bsize}, + {'type': 'ask', 'price': ask, 'size': asize}, + {'type': 'asize', 'price': ask, 'size': asize}, + ], + } + yield 'l1', quote - {'type': 'ask', 'price': ask, 'size': asize}, - {'type': 'asize', 'price': ask, 'size': asize}, - ], - } - yield 'l1', quote + # elif 'book' in msg[-2]: + # chan_id, *payload_array, chan_name, pair = msg + # print(msg) - # elif 'book' in msg[-2]: - # chan_id, *payload_array, chan_name, pair = msg - # print(msg) - - else: - print(f'UNHANDLED MSG: {msg}') - yield msg + case _: + print(f'UNHANDLED MSG: {msg}') + # yield msg def normalize( @@ -385,7 +416,7 @@ async def stream_quotes( msg_gen = process_data_feed_msgs(ws) # TODO: use ``anext()`` when it lands in 3.10! - typ, ohlc_last = await msg_gen.__anext__() + typ, ohlc_last = await anext(msg_gen) topic, quote = normalize(ohlc_last)