Use `match:` syntax in data feed subs processing

tractor_typed_msg_hackin
Tyler Goodlet 2022-07-04 22:52:25 -04:00
parent ea5004c8d3
commit 78a78f5edb
1 changed files with 63 additions and 32 deletions

View File

@ -117,9 +117,8 @@ async def stream_messages(
too_slow_count = 0 too_slow_count = 0
continue continue
if isinstance(msg, dict): match msg:
if msg.get('event') == 'heartbeat': case {'event': 'heartbeat'}:
now = time.time() now = time.time()
delay = now - last_hb delay = now - last_hb
last_hb = now last_hb = now
@ -130,10 +129,19 @@ async def stream_messages(
continue continue
err = msg.get('errorMessage') case {
if err: 'connectionID': _,
raise BrokerError(err) 'event': 'systemStatus',
else: 'status': 'online',
'version': _,
} as msg:
log.info(
'WS connection is up:\n'
f'{msg}'
)
continue
case _:
yield msg yield msg
@ -145,16 +153,39 @@ async def process_data_feed_msgs(
''' '''
async for msg in stream_messages(ws): async for msg in stream_messages(ws):
match msg:
case {
'errorMessage': errmsg
}:
raise BrokerError(errmsg)
chan_id, *payload_array, chan_name, pair = msg case {
'event': 'subscriptionStatus',
} as sub:
log.info(
'WS subscription is active:\n'
f'{sub}'
)
continue
case [
chan_id,
*payload_array,
chan_name,
pair
]:
if 'ohlc' in chan_name: if 'ohlc' in chan_name:
yield 'ohlc', OHLC(
yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0]) chan_id,
chan_name,
pair,
*payload_array[0]
)
elif 'spread' in chan_name: elif 'spread' in chan_name:
bid, ask, ts, bsize, asize = map(float, payload_array[0]) bid, ask, ts, bsize, asize = map(
float, payload_array[0])
# TODO: really makes you think IB has a horrible API... # TODO: really makes you think IB has a horrible API...
quote = { quote = {
@ -173,9 +204,9 @@ async def process_data_feed_msgs(
# chan_id, *payload_array, chan_name, pair = msg # chan_id, *payload_array, chan_name, pair = msg
# print(msg) # print(msg)
else: case _:
print(f'UNHANDLED MSG: {msg}') print(f'UNHANDLED MSG: {msg}')
yield msg # yield msg
def normalize( def normalize(
@ -385,7 +416,7 @@ async def stream_quotes(
msg_gen = process_data_feed_msgs(ws) msg_gen = process_data_feed_msgs(ws)
# TODO: use ``anext()`` when it lands in 3.10! # TODO: use ``anext()`` when it lands in 3.10!
typ, ohlc_last = await msg_gen.__anext__() typ, ohlc_last = await anext(msg_gen)
topic, quote = normalize(ohlc_last) topic, quote = normalize(ohlc_last)