Use `aclosing()` around ws async gen

krakenwsbackup
Tyler Goodlet 2022-07-08 17:17:28 -04:00
parent 2240066a12
commit 48b8607078
1 changed files with 5 additions and 3 deletions

View File

@ -32,6 +32,7 @@ from typing import (
Union, Union,
) )
from async_generator import aclosing
from bidict import bidict from bidict import bidict
import pendulum import pendulum
# from pydantic import BaseModel # from pydantic import BaseModel
@ -317,6 +318,7 @@ async def trades_dialogue(
), ),
) as ws, ) as ws,
trio.open_nursery() as n, trio.open_nursery() as n,
aclosing(stream_messages(ws)) as stream,
): ):
# task local msg dialog tracking # task local msg dialog tracking
emsflow: dict[ emsflow: dict[
@ -340,7 +342,7 @@ async def trades_dialogue(
# enter relay loop # enter relay loop
await handle_order_updates( await handle_order_updates(
ws, stream,
ems_stream, ems_stream,
emsflow, emsflow,
ids, ids,
@ -352,7 +354,7 @@ async def trades_dialogue(
async def handle_order_updates( async def handle_order_updates(
ws: NoBsWs, ws_stream: NoBsWs,
ems_stream: tractor.MsgStream, ems_stream: tractor.MsgStream,
emsflow: dict[str, list[MsgUnion]], emsflow: dict[str, list[MsgUnion]],
ids: bidict[str, int], ids: bidict[str, int],
@ -373,7 +375,7 @@ async def handle_order_updates(
# on new trade clearing events (aka order "fills") # on new trade clearing events (aka order "fills")
trans: list[pp.Transaction] trans: list[pp.Transaction]
async for msg in stream_messages(ws): async for msg in ws_stream:
match msg: match msg:
# process and relay clearing trade events to ems # process and relay clearing trade events to ems
# https://docs.kraken.com/websockets/#message-ownTrades # https://docs.kraken.com/websockets/#message-ownTrades