Compare commits
6 Commits
tractor_st
...
main
| Author | SHA1 | Date |
|---|---|---|
|
|
6b70fea5d4 | |
|
|
4e24cb1bff | |
|
|
3d83b61f3f | |
|
|
6f390dc88c | |
|
|
e1f3d7c3f8 | |
|
|
600636784c |
|
|
@ -729,6 +729,7 @@ class Router(Struct):
|
|||
except (
|
||||
trio.ClosedResourceError,
|
||||
trio.BrokenResourceError,
|
||||
tractor.TransportClosed,
|
||||
):
|
||||
to_remove.add(client_stream)
|
||||
log.warning(
|
||||
|
|
@ -1699,5 +1700,5 @@ async def _emsd_main(
|
|||
if not client_streams:
|
||||
log.warning(
|
||||
f'Order dialog is not being monitored:\n'
|
||||
f'{oid} ->\n{client_stream._ctx.chan.uid}'
|
||||
f'{oid!r} <-> {client_stream.chan.aid.reprol()}\n'
|
||||
)
|
||||
|
|
|
|||
|
|
@ -99,6 +99,7 @@ class Sampler:
|
|||
trio.BrokenResourceError,
|
||||
trio.ClosedResourceError,
|
||||
trio.EndOfChannel,
|
||||
tractor.TransportClosed,
|
||||
)
|
||||
|
||||
# holds all the ``tractor.Context`` remote subscriptions for
|
||||
|
|
@ -291,9 +292,10 @@ class Sampler:
|
|||
|
||||
except self.bcast_errors as err:
|
||||
log.error(
|
||||
f'Connection dropped for IPC ctx\n'
|
||||
f'{stream._ctx}\n\n'
|
||||
f'Due to {type(err)}'
|
||||
f'Connection dropped for IPC ctx due to,\n'
|
||||
f'{type(err)!r}\n'
|
||||
f'\n'
|
||||
f'{stream._ctx}'
|
||||
)
|
||||
borked.add(stream)
|
||||
else:
|
||||
|
|
@ -741,7 +743,7 @@ async def sample_and_broadcast(
|
|||
log.warning(
|
||||
f'Feed OVERRUN {sub_key}'
|
||||
f'@{bus.brokername} -> \n'
|
||||
f'feed @ {chan.uid}\n'
|
||||
f'feed @ {chan.aid.reprol()}\n'
|
||||
f'throttle = {throttle} Hz'
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ from typing import (
|
|||
AsyncContextManager,
|
||||
AsyncGenerator,
|
||||
Iterable,
|
||||
Type,
|
||||
)
|
||||
import json
|
||||
|
||||
|
|
@ -67,7 +68,7 @@ class NoBsWs:
|
|||
|
||||
'''
|
||||
# apparently we can QoS for all sorts of reasons..so catch em.
|
||||
recon_errors = (
|
||||
recon_errors: tuple[Type[Exception]] = (
|
||||
ConnectionClosed,
|
||||
DisconnectionTimeout,
|
||||
ConnectionRejected,
|
||||
|
|
@ -105,7 +106,10 @@ class NoBsWs:
|
|||
def connected(self) -> bool:
|
||||
return self._connected.is_set()
|
||||
|
||||
async def reset(self) -> None:
|
||||
async def reset(
|
||||
self,
|
||||
timeout: float,
|
||||
) -> bool:
|
||||
'''
|
||||
Reset the underlying ws connection by cancelling
|
||||
the bg relay task and waiting for it to signal
|
||||
|
|
@ -114,18 +118,31 @@ class NoBsWs:
|
|||
'''
|
||||
self._connected = trio.Event()
|
||||
self._cs.cancel()
|
||||
await self._connected.wait()
|
||||
with trio.move_on_after(timeout) as cs:
|
||||
await self._connected.wait()
|
||||
return True
|
||||
|
||||
assert cs.cancelled_caught
|
||||
return False
|
||||
|
||||
async def send_msg(
|
||||
self,
|
||||
data: Any,
|
||||
timeout: float = 3,
|
||||
) -> None:
|
||||
while True:
|
||||
try:
|
||||
msg: Any = self._dumps(data)
|
||||
return await self._ws.send_message(msg)
|
||||
except self.recon_errors:
|
||||
await self.reset()
|
||||
with trio.CancelScope(shield=True):
|
||||
reconnected: bool = await self.reset(
|
||||
timeout=timeout,
|
||||
)
|
||||
if not reconnected:
|
||||
log.warning(
|
||||
'Failed to reconnect after {timeout!r}s ??'
|
||||
)
|
||||
|
||||
async def recv_msg(self) -> Any:
|
||||
msg: Any = await self._rx.receive()
|
||||
|
|
@ -191,7 +208,9 @@ async def _reconnect_forever(
|
|||
f'{src_mod}\n'
|
||||
f'{url} connection bail with:'
|
||||
)
|
||||
await trio.sleep(0.5)
|
||||
with trio.CancelScope(shield=True):
|
||||
await trio.sleep(0.5)
|
||||
|
||||
rent_cs.cancel()
|
||||
|
||||
# go back to reonnect loop in parent task
|
||||
|
|
@ -291,7 +310,8 @@ async def _reconnect_forever(
|
|||
log.exception(
|
||||
'Reconnect-attempt failed ??\n'
|
||||
)
|
||||
await trio.sleep(0.2) # throttle
|
||||
with trio.CancelScope(shield=True):
|
||||
await trio.sleep(0.2) # throttle
|
||||
raise berr
|
||||
|
||||
#|_ws & nursery block ends
|
||||
|
|
@ -351,32 +371,39 @@ async def open_autorecon_ws(
|
|||
rcv: trio.MemoryReceiveChannel
|
||||
snd, rcv = trio.open_memory_channel(616)
|
||||
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn
|
||||
):
|
||||
nobsws = NoBsWs(
|
||||
url,
|
||||
rcv,
|
||||
msg_recv_timeout=msg_recv_timeout,
|
||||
)
|
||||
await tn.start(
|
||||
partial(
|
||||
_reconnect_forever,
|
||||
try:
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn
|
||||
):
|
||||
nobsws = NoBsWs(
|
||||
url,
|
||||
snd,
|
||||
nobsws,
|
||||
fixture=fixture,
|
||||
reset_after=reset_after,
|
||||
rcv,
|
||||
msg_recv_timeout=msg_recv_timeout,
|
||||
)
|
||||
await tn.start(
|
||||
partial(
|
||||
_reconnect_forever,
|
||||
url,
|
||||
snd,
|
||||
nobsws,
|
||||
fixture=fixture,
|
||||
reset_after=reset_after,
|
||||
)
|
||||
)
|
||||
await nobsws._connected.wait()
|
||||
assert nobsws._cs
|
||||
assert nobsws.connected()
|
||||
try:
|
||||
yield nobsws
|
||||
finally:
|
||||
tn.cancel_scope.cancel()
|
||||
|
||||
except NoBsWs.recon_errors as con_err:
|
||||
log.warning(
|
||||
f'Entire ws-channel disconnect due to,\n'
|
||||
f'con_err: {con_err!r}\n'
|
||||
)
|
||||
await nobsws._connected.wait()
|
||||
assert nobsws._cs
|
||||
assert nobsws.connected()
|
||||
try:
|
||||
yield nobsws
|
||||
finally:
|
||||
tn.cancel_scope.cancel()
|
||||
|
||||
|
||||
'''
|
||||
|
|
|
|||
Loading…
Reference in New Issue