Try out `@trio_util.async_generator` for streaming

Apparently it will likely fix our `trio`-cancel-scopes-corrupted crash
when we try to let our `._web_bs.NoBsWs` do reconnect logic around
the asyn-generator implemented data-feed streaming routines in `binance`
and `kraken`.  See the project docs for deatz; obvs we add the lib as
a dep.
binance_ws_ep_update
Tyler Goodlet 2023-03-17 20:21:19 -04:00
parent 78eb784091
commit 609b91e848
3 changed files with 15 additions and 5 deletions

View File

@ -26,7 +26,7 @@ from typing import (
) )
import time import time
from async_generator import aclosing from trio_util import trio_async_generator
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import pendulum import pendulum
@ -318,7 +318,10 @@ class AggTrade(Struct):
M: bool # Ignore M: bool # Ignore
async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]: @trio_async_generator
async def stream_messages(
ws: NoBsWs,
) -> AsyncGenerator[NoBsWs, dict]:
timeouts = 0 timeouts = 0
while True: while True:
@ -540,7 +543,7 @@ async def stream_quotes(
) as ws, ) as ws,
# avoid stream-gen closure from breaking trio.. # avoid stream-gen closure from breaking trio..
aclosing(stream_messages(ws)) as msg_gen, stream_messages(ws) as msg_gen,
): ):
typ, quote = await anext(msg_gen) typ, quote = await anext(msg_gen)

View File

@ -27,11 +27,11 @@ from typing import (
) )
import time import time
from async_generator import aclosing
from fuzzywuzzy import process as fuzzy from fuzzywuzzy import process as fuzzy
import numpy as np import numpy as np
import pendulum import pendulum
from trio_typing import TaskStatus from trio_typing import TaskStatus
from trio_util import trio_async_generator
import tractor import tractor
import trio import trio
@ -122,6 +122,7 @@ async def stream_messages(
yield msg yield msg
@trio_async_generator
async def process_data_feed_msgs( async def process_data_feed_msgs(
ws: NoBsWs, ws: NoBsWs,
): ):
@ -378,7 +379,12 @@ async def stream_quotes(
'wss://ws.kraken.com/', 'wss://ws.kraken.com/',
fixture=subscribe, fixture=subscribe,
) as ws, ) as ws,
aclosing(process_data_feed_msgs(ws)) as msg_gen,
# avoid stream-gen closure from breaking trio..
# NOTE: not sure this actually works XD particularly
# if we call `ws._connect()` manally in the streaming
# async gen..
process_data_feed_msgs(ws) as msg_gen,
): ):
# pull a first quote and deliver # pull a first quote and deliver
typ, ohlc_last = await anext(msg_gen) typ, ohlc_last = await anext(msg_gen)

View File

@ -53,6 +53,7 @@ setup(
# async # async
'trio', 'trio',
'trio-websocket', 'trio-websocket',
'trio-util',
'async_generator', 'async_generator',
# from github currently (see requirements.txt) # from github currently (see requirements.txt)