diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index dcc1dbe1..46fa9d78 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -32,6 +32,7 @@ from typing import ( Union, ) +from async_generator import aclosing from bidict import bidict import pendulum import trio @@ -316,6 +317,7 @@ async def trades_dialogue( ), ) as ws, trio.open_nursery() as n, + aclosing(stream_messages(ws)) as stream, ): # task local msg dialog tracking emsflow: dict[ @@ -339,7 +341,7 @@ async def trades_dialogue( # enter relay loop await handle_order_updates( - ws, + stream, ems_stream, emsflow, ids, @@ -351,7 +353,7 @@ async def trades_dialogue( async def handle_order_updates( - ws: NoBsWs, + ws_stream: NoBsWs, ems_stream: tractor.MsgStream, emsflow: dict[str, list[MsgUnion]], ids: bidict[str, int], @@ -372,7 +374,7 @@ async def handle_order_updates( # on new trade clearing events (aka order "fills") trans: list[pp.Transaction] - async for msg in stream_messages(ws): + async for msg in ws_stream: match msg: # process and relay clearing trade events to ems # https://docs.kraken.com/websockets/#message-ownTrades