Use `aclosing()` around ws async gen
parent
ba93f96c71
commit
a20a8d95d5
|
@ -32,6 +32,7 @@ from typing import (
|
|||
Union,
|
||||
)
|
||||
|
||||
from async_generator import aclosing
|
||||
from bidict import bidict
|
||||
import pendulum
|
||||
import trio
|
||||
|
@ -316,6 +317,7 @@ async def trades_dialogue(
|
|||
),
|
||||
) as ws,
|
||||
trio.open_nursery() as n,
|
||||
aclosing(stream_messages(ws)) as stream,
|
||||
):
|
||||
# task local msg dialog tracking
|
||||
emsflow: dict[
|
||||
|
@ -339,7 +341,7 @@ async def trades_dialogue(
|
|||
|
||||
# enter relay loop
|
||||
await handle_order_updates(
|
||||
ws,
|
||||
stream,
|
||||
ems_stream,
|
||||
emsflow,
|
||||
ids,
|
||||
|
@ -351,7 +353,7 @@ async def trades_dialogue(
|
|||
|
||||
|
||||
async def handle_order_updates(
|
||||
ws: NoBsWs,
|
||||
ws_stream: NoBsWs,
|
||||
ems_stream: tractor.MsgStream,
|
||||
emsflow: dict[str, list[MsgUnion]],
|
||||
ids: bidict[str, int],
|
||||
|
@ -372,7 +374,7 @@ async def handle_order_updates(
|
|||
# on new trade clearing events (aka order "fills")
|
||||
trans: list[pp.Transaction]
|
||||
|
||||
async for msg in stream_messages(ws):
|
||||
async for msg in ws_stream:
|
||||
match msg:
|
||||
# process and relay clearing trade events to ems
|
||||
# https://docs.kraken.com/websockets/#message-ownTrades
|
||||
|
|
Loading…
Reference in New Issue