Use `match:` syntax in data feed subs processing

kraken_ws_orders
Tyler Goodlet 2022-07-04 22:52:25 -04:00
parent 00378c330c
commit f79d9865a0
1 changed files with 63 additions and 32 deletions

View File

@ -117,9 +117,8 @@ async def stream_messages(
too_slow_count = 0 too_slow_count = 0
continue continue
if isinstance(msg, dict): match msg:
if msg.get('event') == 'heartbeat': case {'event': 'heartbeat'}:
now = time.time() now = time.time()
delay = now - last_hb delay = now - last_hb
last_hb = now last_hb = now
@ -130,11 +129,20 @@ async def stream_messages(
continue continue
err = msg.get('errorMessage') case {
if err: 'connectionID': _,
raise BrokerError(err) 'event': 'systemStatus',
else: 'status': 'online',
yield msg 'version': _,
} as msg:
log.info(
'WS connection is up:\n'
f'{msg}'
)
continue
case _:
yield msg
async def process_data_feed_msgs( async def process_data_feed_msgs(
@ -145,37 +153,60 @@ async def process_data_feed_msgs(
''' '''
async for msg in stream_messages(ws): async for msg in stream_messages(ws):
match msg:
case {
'errorMessage': errmsg
}:
raise BrokerError(errmsg)
chan_id, *payload_array, chan_name, pair = msg case {
'event': 'subscriptionStatus',
} as sub:
log.info(
'WS subscription is active:\n'
f'{sub}'
)
continue
if 'ohlc' in chan_name: case [
chan_id,
*payload_array,
chan_name,
pair
]:
if 'ohlc' in chan_name:
yield 'ohlc', OHLC(
chan_id,
chan_name,
pair,
*payload_array[0]
)
yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0]) elif 'spread' in chan_name:
elif 'spread' in chan_name: bid, ask, ts, bsize, asize = map(
float, payload_array[0])
bid, ask, ts, bsize, asize = map(float, payload_array[0]) # TODO: really makes you think IB has a horrible API...
quote = {
'symbol': pair.replace('/', ''),
'ticks': [
{'type': 'bid', 'price': bid, 'size': bsize},
{'type': 'bsize', 'price': bid, 'size': bsize},
# TODO: really makes you think IB has a horrible API... {'type': 'ask', 'price': ask, 'size': asize},
quote = { {'type': 'asize', 'price': ask, 'size': asize},
'symbol': pair.replace('/', ''), ],
'ticks': [ }
{'type': 'bid', 'price': bid, 'size': bsize}, yield 'l1', quote
{'type': 'bsize', 'price': bid, 'size': bsize},
{'type': 'ask', 'price': ask, 'size': asize}, # elif 'book' in msg[-2]:
{'type': 'asize', 'price': ask, 'size': asize}, # chan_id, *payload_array, chan_name, pair = msg
], # print(msg)
}
yield 'l1', quote
# elif 'book' in msg[-2]: case _:
# chan_id, *payload_array, chan_name, pair = msg print(f'UNHANDLED MSG: {msg}')
# print(msg) # yield msg
else:
print(f'UNHANDLED MSG: {msg}')
yield msg
def normalize( def normalize(
@ -385,7 +416,7 @@ async def stream_quotes(
msg_gen = process_data_feed_msgs(ws) msg_gen = process_data_feed_msgs(ws)
# TODO: use ``anext()`` when it lands in 3.10! # TODO: use ``anext()`` when it lands in 3.10!
typ, ohlc_last = await msg_gen.__anext__() typ, ohlc_last = await anext(msg_gen)
topic, quote = normalize(ohlc_last) topic, quote = normalize(ohlc_last)