Port `.data._web_bs` stuff to strict-EGs
Using `tractor.trionics.collapse_eg()` as needed and doing some renames, in similar style as elsewhere: - `pcs` -> `rent_cs`, - `n` -> `tn` for nursery handles, Also, - tweak the `._reconnect_forever()` while loop to use the (also) `trio`-internal `mc_state: trio._channel.MemoryChannelState = snd._state` instead of `snd._close` to poll for open send/receive consumer task counts since, 1. it seems more reliable then using the `snd._closed`, 2. there's no other way to access the info.. afaik? - handle `ConnectionRejected` explicitly alongside handshake-errs as a retry case. - add a base-exc handler which `.exception()` reports the reconnect attempt failure explicitly. - drop some lingering `Optional` usage.testing_utils
parent
d4c10b2b0f
commit
d35e1e5c67
|
@ -27,7 +27,6 @@ from functools import partial
|
|||
from types import ModuleType
|
||||
from typing import (
|
||||
Any,
|
||||
Optional,
|
||||
Callable,
|
||||
AsyncContextManager,
|
||||
AsyncGenerator,
|
||||
|
@ -35,6 +34,7 @@ from typing import (
|
|||
)
|
||||
import json
|
||||
|
||||
import tractor
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
from trio_websocket import (
|
||||
|
@ -167,7 +167,7 @@ async def _reconnect_forever(
|
|||
|
||||
async def proxy_msgs(
|
||||
ws: WebSocketConnection,
|
||||
pcs: trio.CancelScope, # parent cancel scope
|
||||
rent_cs: trio.CancelScope, # parent cancel scope
|
||||
):
|
||||
'''
|
||||
Receive (under `timeout` deadline) all msgs from from underlying
|
||||
|
@ -192,7 +192,7 @@ async def _reconnect_forever(
|
|||
f'{url} connection bail with:'
|
||||
)
|
||||
await trio.sleep(0.5)
|
||||
pcs.cancel()
|
||||
rent_cs.cancel()
|
||||
|
||||
# go back to reonnect loop in parent task
|
||||
return
|
||||
|
@ -204,7 +204,7 @@ async def _reconnect_forever(
|
|||
f'{src_mod}\n'
|
||||
'WS feed seems down and slow af.. reconnecting\n'
|
||||
)
|
||||
pcs.cancel()
|
||||
rent_cs.cancel()
|
||||
|
||||
# go back to reonnect loop in parent task
|
||||
return
|
||||
|
@ -228,7 +228,12 @@ async def _reconnect_forever(
|
|||
nobsws._connected = trio.Event()
|
||||
task_status.started()
|
||||
|
||||
while not snd._closed:
|
||||
mc_state: trio._channel.MemoryChannelState = snd._state
|
||||
while (
|
||||
mc_state.open_receive_channels > 0
|
||||
and
|
||||
mc_state.open_send_channels > 0
|
||||
):
|
||||
log.info(
|
||||
f'{src_mod}\n'
|
||||
f'{url} trying (RE)CONNECT'
|
||||
|
@ -237,10 +242,11 @@ async def _reconnect_forever(
|
|||
ws: WebSocketConnection
|
||||
try:
|
||||
async with (
|
||||
trio.open_nursery() as n,
|
||||
open_websocket_url(url) as ws,
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
cs = nobsws._cs = n.cancel_scope
|
||||
cs = nobsws._cs = tn.cancel_scope
|
||||
nobsws._ws = ws
|
||||
log.info(
|
||||
f'{src_mod}\n'
|
||||
|
@ -248,7 +254,7 @@ async def _reconnect_forever(
|
|||
)
|
||||
|
||||
# begin relay loop to forward msgs
|
||||
n.start_soon(
|
||||
tn.start_soon(
|
||||
proxy_msgs,
|
||||
ws,
|
||||
cs,
|
||||
|
@ -262,7 +268,7 @@ async def _reconnect_forever(
|
|||
|
||||
# TODO: should we return an explicit sub-cs
|
||||
# from this fixture task?
|
||||
await n.start(
|
||||
await tn.start(
|
||||
open_fixture,
|
||||
fixture,
|
||||
nobsws,
|
||||
|
@ -272,11 +278,23 @@ async def _reconnect_forever(
|
|||
# to let tasks run **inside** the ws open block above.
|
||||
nobsws._connected.set()
|
||||
await trio.sleep_forever()
|
||||
except HandshakeError:
|
||||
|
||||
except (
|
||||
HandshakeError,
|
||||
ConnectionRejected,
|
||||
):
|
||||
log.exception('Retrying connection')
|
||||
await trio.sleep(0.5) # throttle
|
||||
|
||||
# ws & nursery block ends
|
||||
except BaseException as _berr:
|
||||
berr = _berr
|
||||
log.exception(
|
||||
'Reconnect-attempt failed ??\n'
|
||||
)
|
||||
await trio.sleep(0.2) # throttle
|
||||
raise berr
|
||||
|
||||
#|_ws & nursery block ends
|
||||
nobsws._connected = trio.Event()
|
||||
if cs.cancelled_caught:
|
||||
log.cancel(
|
||||
|
@ -324,21 +342,25 @@ async def open_autorecon_ws(
|
|||
connetivity errors, or some user defined recv timeout.
|
||||
|
||||
You can provide a ``fixture`` async-context-manager which will be
|
||||
entered/exitted around each connection reset; eg. for (re)requesting
|
||||
subscriptions without requiring streaming setup code to rerun.
|
||||
entered/exitted around each connection reset; eg. for
|
||||
(re)requesting subscriptions without requiring streaming setup
|
||||
code to rerun.
|
||||
|
||||
'''
|
||||
snd: trio.MemorySendChannel
|
||||
rcv: trio.MemoryReceiveChannel
|
||||
snd, rcv = trio.open_memory_channel(616)
|
||||
|
||||
async with trio.open_nursery() as n:
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn
|
||||
):
|
||||
nobsws = NoBsWs(
|
||||
url,
|
||||
rcv,
|
||||
msg_recv_timeout=msg_recv_timeout,
|
||||
)
|
||||
await n.start(
|
||||
await tn.start(
|
||||
partial(
|
||||
_reconnect_forever,
|
||||
url,
|
||||
|
@ -351,11 +373,10 @@ async def open_autorecon_ws(
|
|||
await nobsws._connected.wait()
|
||||
assert nobsws._cs
|
||||
assert nobsws.connected()
|
||||
|
||||
try:
|
||||
yield nobsws
|
||||
finally:
|
||||
n.cancel_scope.cancel()
|
||||
tn.cancel_scope.cancel()
|
||||
|
||||
|
||||
'''
|
||||
|
@ -368,8 +389,8 @@ of msgs over a `NoBsWs`.
|
|||
class JSONRPCResult(Struct):
|
||||
id: int
|
||||
jsonrpc: str = '2.0'
|
||||
result: Optional[dict] = None
|
||||
error: Optional[dict] = None
|
||||
result: dict|None = None
|
||||
error: dict|None = None
|
||||
|
||||
|
||||
@acm
|
||||
|
|
Loading…
Reference in New Issue