diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 4d886fbc..7e5eb810 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -27,7 +27,6 @@ from functools import partial from types import ModuleType from typing import ( Any, - Optional, Callable, AsyncContextManager, AsyncGenerator, @@ -35,6 +34,7 @@ from typing import ( ) import json +import tractor import trio from trio_typing import TaskStatus from trio_websocket import ( @@ -167,7 +167,7 @@ async def _reconnect_forever( async def proxy_msgs( ws: WebSocketConnection, - pcs: trio.CancelScope, # parent cancel scope + rent_cs: trio.CancelScope, # parent cancel scope ): ''' Receive (under `timeout` deadline) all msgs from from underlying @@ -192,7 +192,7 @@ async def _reconnect_forever( f'{url} connection bail with:' ) await trio.sleep(0.5) - pcs.cancel() + rent_cs.cancel() # go back to reonnect loop in parent task return @@ -204,7 +204,7 @@ async def _reconnect_forever( f'{src_mod}\n' 'WS feed seems down and slow af.. reconnecting\n' ) - pcs.cancel() + rent_cs.cancel() # go back to reonnect loop in parent task return @@ -228,7 +228,12 @@ async def _reconnect_forever( nobsws._connected = trio.Event() task_status.started() - while not snd._closed: + mc_state: trio._channel.MemoryChannelState = snd._state + while ( + mc_state.open_receive_channels > 0 + and + mc_state.open_send_channels > 0 + ): log.info( f'{src_mod}\n' f'{url} trying (RE)CONNECT' @@ -237,10 +242,11 @@ async def _reconnect_forever( ws: WebSocketConnection try: async with ( - trio.open_nursery() as n, open_websocket_url(url) as ws, + tractor.trionics.collapse_eg(), + trio.open_nursery() as tn, ): - cs = nobsws._cs = n.cancel_scope + cs = nobsws._cs = tn.cancel_scope nobsws._ws = ws log.info( f'{src_mod}\n' @@ -248,7 +254,7 @@ async def _reconnect_forever( ) # begin relay loop to forward msgs - n.start_soon( + tn.start_soon( proxy_msgs, ws, cs, @@ -262,7 +268,7 @@ async def _reconnect_forever( # TODO: should we return an explicit sub-cs # from this fixture task? - await n.start( + await tn.start( open_fixture, fixture, nobsws, @@ -272,11 +278,23 @@ async def _reconnect_forever( # to let tasks run **inside** the ws open block above. nobsws._connected.set() await trio.sleep_forever() - except HandshakeError: + + except ( + HandshakeError, + ConnectionRejected, + ): log.exception('Retrying connection') + await trio.sleep(0.5) # throttle - # ws & nursery block ends + except BaseException as _berr: + berr = _berr + log.exception( + 'Reconnect-attempt failed ??\n' + ) + await trio.sleep(0.2) # throttle + raise berr + #|_ws & nursery block ends nobsws._connected = trio.Event() if cs.cancelled_caught: log.cancel( @@ -324,21 +342,25 @@ async def open_autorecon_ws( connetivity errors, or some user defined recv timeout. You can provide a ``fixture`` async-context-manager which will be - entered/exitted around each connection reset; eg. for (re)requesting - subscriptions without requiring streaming setup code to rerun. + entered/exitted around each connection reset; eg. for + (re)requesting subscriptions without requiring streaming setup + code to rerun. ''' snd: trio.MemorySendChannel rcv: trio.MemoryReceiveChannel snd, rcv = trio.open_memory_channel(616) - async with trio.open_nursery() as n: + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery() as tn + ): nobsws = NoBsWs( url, rcv, msg_recv_timeout=msg_recv_timeout, ) - await n.start( + await tn.start( partial( _reconnect_forever, url, @@ -351,11 +373,10 @@ async def open_autorecon_ws( await nobsws._connected.wait() assert nobsws._cs assert nobsws.connected() - try: yield nobsws finally: - n.cancel_scope.cancel() + tn.cancel_scope.cancel() ''' @@ -368,8 +389,8 @@ of msgs over a `NoBsWs`. class JSONRPCResult(Struct): id: int jsonrpc: str = '2.0' - result: Optional[dict] = None - error: Optional[dict] = None + result: dict|None = None + error: dict|None = None @acm