diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 61e5d5d8..6a10b917 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -32,6 +32,7 @@ from typing import ( Union, ) +from async_generator import aclosing from bidict import bidict import pendulum # from pydantic import BaseModel @@ -317,6 +318,7 @@ async def trades_dialogue( ), ) as ws, trio.open_nursery() as n, + aclosing(stream_messages(ws)) as stream, ): # task local msg dialog tracking emsflow: dict[ @@ -340,7 +342,7 @@ async def trades_dialogue( # enter relay loop await handle_order_updates( - ws, + stream, ems_stream, emsflow, ids, @@ -352,7 +354,7 @@ async def trades_dialogue( async def handle_order_updates( - ws: NoBsWs, + ws_stream: NoBsWs, ems_stream: tractor.MsgStream, emsflow: dict[str, list[MsgUnion]], ids: bidict[str, int], @@ -373,7 +375,7 @@ async def handle_order_updates( # on new trade clearing events (aka order "fills") trans: list[pp.Transaction] - async for msg in stream_messages(ws): + async for msg in ws_stream: match msg: # process and relay clearing trade events to ems # https://docs.kraken.com/websockets/#message-ownTrades