commit
6b70fea5d4
|
|
@ -729,6 +729,7 @@ class Router(Struct):
|
||||||
except (
|
except (
|
||||||
trio.ClosedResourceError,
|
trio.ClosedResourceError,
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
|
tractor.TransportClosed,
|
||||||
):
|
):
|
||||||
to_remove.add(client_stream)
|
to_remove.add(client_stream)
|
||||||
log.warning(
|
log.warning(
|
||||||
|
|
@ -1699,5 +1700,5 @@ async def _emsd_main(
|
||||||
if not client_streams:
|
if not client_streams:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Order dialog is not being monitored:\n'
|
f'Order dialog is not being monitored:\n'
|
||||||
f'{oid} ->\n{client_stream._ctx.chan.uid}'
|
f'{oid!r} <-> {client_stream.chan.aid.reprol()}\n'
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -99,6 +99,7 @@ class Sampler:
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
trio.ClosedResourceError,
|
trio.ClosedResourceError,
|
||||||
trio.EndOfChannel,
|
trio.EndOfChannel,
|
||||||
|
tractor.TransportClosed,
|
||||||
)
|
)
|
||||||
|
|
||||||
# holds all the ``tractor.Context`` remote subscriptions for
|
# holds all the ``tractor.Context`` remote subscriptions for
|
||||||
|
|
@ -291,9 +292,10 @@ class Sampler:
|
||||||
|
|
||||||
except self.bcast_errors as err:
|
except self.bcast_errors as err:
|
||||||
log.error(
|
log.error(
|
||||||
f'Connection dropped for IPC ctx\n'
|
f'Connection dropped for IPC ctx due to,\n'
|
||||||
f'{stream._ctx}\n\n'
|
f'{type(err)!r}\n'
|
||||||
f'Due to {type(err)}'
|
f'\n'
|
||||||
|
f'{stream._ctx}'
|
||||||
)
|
)
|
||||||
borked.add(stream)
|
borked.add(stream)
|
||||||
else:
|
else:
|
||||||
|
|
@ -741,7 +743,7 @@ async def sample_and_broadcast(
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Feed OVERRUN {sub_key}'
|
f'Feed OVERRUN {sub_key}'
|
||||||
f'@{bus.brokername} -> \n'
|
f'@{bus.brokername} -> \n'
|
||||||
f'feed @ {chan.uid}\n'
|
f'feed @ {chan.aid.reprol()}\n'
|
||||||
f'throttle = {throttle} Hz'
|
f'throttle = {throttle} Hz'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -31,6 +31,7 @@ from typing import (
|
||||||
AsyncContextManager,
|
AsyncContextManager,
|
||||||
AsyncGenerator,
|
AsyncGenerator,
|
||||||
Iterable,
|
Iterable,
|
||||||
|
Type,
|
||||||
)
|
)
|
||||||
import json
|
import json
|
||||||
|
|
||||||
|
|
@ -67,7 +68,7 @@ class NoBsWs:
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# apparently we can QoS for all sorts of reasons..so catch em.
|
# apparently we can QoS for all sorts of reasons..so catch em.
|
||||||
recon_errors = (
|
recon_errors: tuple[Type[Exception]] = (
|
||||||
ConnectionClosed,
|
ConnectionClosed,
|
||||||
DisconnectionTimeout,
|
DisconnectionTimeout,
|
||||||
ConnectionRejected,
|
ConnectionRejected,
|
||||||
|
|
@ -105,7 +106,10 @@ class NoBsWs:
|
||||||
def connected(self) -> bool:
|
def connected(self) -> bool:
|
||||||
return self._connected.is_set()
|
return self._connected.is_set()
|
||||||
|
|
||||||
async def reset(self) -> None:
|
async def reset(
|
||||||
|
self,
|
||||||
|
timeout: float,
|
||||||
|
) -> bool:
|
||||||
'''
|
'''
|
||||||
Reset the underlying ws connection by cancelling
|
Reset the underlying ws connection by cancelling
|
||||||
the bg relay task and waiting for it to signal
|
the bg relay task and waiting for it to signal
|
||||||
|
|
@ -114,18 +118,31 @@ class NoBsWs:
|
||||||
'''
|
'''
|
||||||
self._connected = trio.Event()
|
self._connected = trio.Event()
|
||||||
self._cs.cancel()
|
self._cs.cancel()
|
||||||
await self._connected.wait()
|
with trio.move_on_after(timeout) as cs:
|
||||||
|
await self._connected.wait()
|
||||||
|
return True
|
||||||
|
|
||||||
|
assert cs.cancelled_caught
|
||||||
|
return False
|
||||||
|
|
||||||
async def send_msg(
|
async def send_msg(
|
||||||
self,
|
self,
|
||||||
data: Any,
|
data: Any,
|
||||||
|
timeout: float = 3,
|
||||||
) -> None:
|
) -> None:
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
msg: Any = self._dumps(data)
|
msg: Any = self._dumps(data)
|
||||||
return await self._ws.send_message(msg)
|
return await self._ws.send_message(msg)
|
||||||
except self.recon_errors:
|
except self.recon_errors:
|
||||||
await self.reset()
|
with trio.CancelScope(shield=True):
|
||||||
|
reconnected: bool = await self.reset(
|
||||||
|
timeout=timeout,
|
||||||
|
)
|
||||||
|
if not reconnected:
|
||||||
|
log.warning(
|
||||||
|
'Failed to reconnect after {timeout!r}s ??'
|
||||||
|
)
|
||||||
|
|
||||||
async def recv_msg(self) -> Any:
|
async def recv_msg(self) -> Any:
|
||||||
msg: Any = await self._rx.receive()
|
msg: Any = await self._rx.receive()
|
||||||
|
|
@ -191,7 +208,9 @@ async def _reconnect_forever(
|
||||||
f'{src_mod}\n'
|
f'{src_mod}\n'
|
||||||
f'{url} connection bail with:'
|
f'{url} connection bail with:'
|
||||||
)
|
)
|
||||||
await trio.sleep(0.5)
|
with trio.CancelScope(shield=True):
|
||||||
|
await trio.sleep(0.5)
|
||||||
|
|
||||||
rent_cs.cancel()
|
rent_cs.cancel()
|
||||||
|
|
||||||
# go back to reonnect loop in parent task
|
# go back to reonnect loop in parent task
|
||||||
|
|
@ -291,7 +310,8 @@ async def _reconnect_forever(
|
||||||
log.exception(
|
log.exception(
|
||||||
'Reconnect-attempt failed ??\n'
|
'Reconnect-attempt failed ??\n'
|
||||||
)
|
)
|
||||||
await trio.sleep(0.2) # throttle
|
with trio.CancelScope(shield=True):
|
||||||
|
await trio.sleep(0.2) # throttle
|
||||||
raise berr
|
raise berr
|
||||||
|
|
||||||
#|_ws & nursery block ends
|
#|_ws & nursery block ends
|
||||||
|
|
@ -351,32 +371,39 @@ async def open_autorecon_ws(
|
||||||
rcv: trio.MemoryReceiveChannel
|
rcv: trio.MemoryReceiveChannel
|
||||||
snd, rcv = trio.open_memory_channel(616)
|
snd, rcv = trio.open_memory_channel(616)
|
||||||
|
|
||||||
async with (
|
try:
|
||||||
tractor.trionics.collapse_eg(),
|
async with (
|
||||||
trio.open_nursery() as tn
|
tractor.trionics.collapse_eg(),
|
||||||
):
|
trio.open_nursery() as tn
|
||||||
nobsws = NoBsWs(
|
):
|
||||||
url,
|
nobsws = NoBsWs(
|
||||||
rcv,
|
|
||||||
msg_recv_timeout=msg_recv_timeout,
|
|
||||||
)
|
|
||||||
await tn.start(
|
|
||||||
partial(
|
|
||||||
_reconnect_forever,
|
|
||||||
url,
|
url,
|
||||||
snd,
|
rcv,
|
||||||
nobsws,
|
msg_recv_timeout=msg_recv_timeout,
|
||||||
fixture=fixture,
|
|
||||||
reset_after=reset_after,
|
|
||||||
)
|
)
|
||||||
|
await tn.start(
|
||||||
|
partial(
|
||||||
|
_reconnect_forever,
|
||||||
|
url,
|
||||||
|
snd,
|
||||||
|
nobsws,
|
||||||
|
fixture=fixture,
|
||||||
|
reset_after=reset_after,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
await nobsws._connected.wait()
|
||||||
|
assert nobsws._cs
|
||||||
|
assert nobsws.connected()
|
||||||
|
try:
|
||||||
|
yield nobsws
|
||||||
|
finally:
|
||||||
|
tn.cancel_scope.cancel()
|
||||||
|
|
||||||
|
except NoBsWs.recon_errors as con_err:
|
||||||
|
log.warning(
|
||||||
|
f'Entire ws-channel disconnect due to,\n'
|
||||||
|
f'con_err: {con_err!r}\n'
|
||||||
)
|
)
|
||||||
await nobsws._connected.wait()
|
|
||||||
assert nobsws._cs
|
|
||||||
assert nobsws.connected()
|
|
||||||
try:
|
|
||||||
yield nobsws
|
|
||||||
finally:
|
|
||||||
tn.cancel_scope.cancel()
|
|
||||||
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue