diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index d32b6321..1c551343 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -34,7 +34,6 @@ from typing import ( Union, ) -from async_generator import aclosing from bidict import bidict import pendulum import trio @@ -672,11 +671,9 @@ async def trades_dialogue( token=token, ), ) as ws, - aclosing(stream_messages(ws)) as stream, + stream_messages(ws) as stream, trio.open_nursery() as nurse, ): - stream = stream_messages(ws) - # task for processing inbound requests from ems nurse.start_soon( handle_order_requests, diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index e37fdb49..5ea96e28 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -78,6 +78,7 @@ class OHLC(Struct): ticks: list[Any] = [] +@trio_async_generator async def stream_messages( ws: NoBsWs, ): @@ -133,63 +134,64 @@ async def process_data_feed_msgs( Parse and pack data feed messages. ''' - async for msg in stream_messages(ws): - match msg: - case { - 'errorMessage': errmsg - }: - raise BrokerError(errmsg) + async with stream_messages(ws) as ws_stream: + async for msg in ws_stream: + match msg: + case { + 'errorMessage': errmsg + }: + raise BrokerError(errmsg) - case { - 'event': 'subscriptionStatus', - } as sub: - log.info( - 'WS subscription is active:\n' - f'{sub}' - ) - continue - - case [ - chan_id, - *payload_array, - chan_name, - pair - ]: - if 'ohlc' in chan_name: - ohlc = OHLC( - chan_id, - chan_name, - pair, - *payload_array[0] + case { + 'event': 'subscriptionStatus', + } as sub: + log.info( + 'WS subscription is active:\n' + f'{sub}' ) - ohlc.typecast() - yield 'ohlc', ohlc + continue - elif 'spread' in chan_name: + case [ + chan_id, + *payload_array, + chan_name, + pair + ]: + if 'ohlc' in chan_name: + ohlc = OHLC( + chan_id, + chan_name, + pair, + *payload_array[0] + ) + ohlc.typecast() + yield 'ohlc', ohlc - bid, ask, ts, bsize, asize = map( - float, payload_array[0]) + elif 'spread' in chan_name: - # TODO: really makes you think IB has a horrible API... - quote = { - 'symbol': pair.replace('/', ''), - 'ticks': [ - {'type': 'bid', 'price': bid, 'size': bsize}, - {'type': 'bsize', 'price': bid, 'size': bsize}, + bid, ask, ts, bsize, asize = map( + float, payload_array[0]) - {'type': 'ask', 'price': ask, 'size': asize}, - {'type': 'asize', 'price': ask, 'size': asize}, - ], - } - yield 'l1', quote + # TODO: really makes you think IB has a horrible API... + quote = { + 'symbol': pair.replace('/', ''), + 'ticks': [ + {'type': 'bid', 'price': bid, 'size': bsize}, + {'type': 'bsize', 'price': bid, 'size': bsize}, - # elif 'book' in msg[-2]: - # chan_id, *payload_array, chan_name, pair = msg - # print(msg) + {'type': 'ask', 'price': ask, 'size': asize}, + {'type': 'asize', 'price': ask, 'size': asize}, + ], + } + yield 'l1', quote - case _: - print(f'UNHANDLED MSG: {msg}') - # yield msg + # elif 'book' in msg[-2]: + # chan_id, *payload_array, chan_name, pair = msg + # print(msg) + + case _: + print(f'UNHANDLED MSG: {msg}') + # yield msg def normalize(