Compare commits

..

No commits in common. "main" and "tractor_struct_and_godw_mod" have entirely different histories.

3 changed files with 36 additions and 66 deletions

View File

@ -729,7 +729,6 @@ class Router(Struct):
except ( except (
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
tractor.TransportClosed,
): ):
to_remove.add(client_stream) to_remove.add(client_stream)
log.warning( log.warning(
@ -1700,5 +1699,5 @@ async def _emsd_main(
if not client_streams: if not client_streams:
log.warning( log.warning(
f'Order dialog is not being monitored:\n' f'Order dialog is not being monitored:\n'
f'{oid!r} <-> {client_stream.chan.aid.reprol()}\n' f'{oid} ->\n{client_stream._ctx.chan.uid}'
) )

View File

@ -99,7 +99,6 @@ class Sampler:
trio.BrokenResourceError, trio.BrokenResourceError,
trio.ClosedResourceError, trio.ClosedResourceError,
trio.EndOfChannel, trio.EndOfChannel,
tractor.TransportClosed,
) )
# holds all the ``tractor.Context`` remote subscriptions for # holds all the ``tractor.Context`` remote subscriptions for
@ -292,10 +291,9 @@ class Sampler:
except self.bcast_errors as err: except self.bcast_errors as err:
log.error( log.error(
f'Connection dropped for IPC ctx due to,\n' f'Connection dropped for IPC ctx\n'
f'{type(err)!r}\n' f'{stream._ctx}\n\n'
f'\n' f'Due to {type(err)}'
f'{stream._ctx}'
) )
borked.add(stream) borked.add(stream)
else: else:
@ -743,7 +741,7 @@ async def sample_and_broadcast(
log.warning( log.warning(
f'Feed OVERRUN {sub_key}' f'Feed OVERRUN {sub_key}'
f'@{bus.brokername} -> \n' f'@{bus.brokername} -> \n'
f'feed @ {chan.aid.reprol()}\n' f'feed @ {chan.uid}\n'
f'throttle = {throttle} Hz' f'throttle = {throttle} Hz'
) )

View File

@ -31,7 +31,6 @@ from typing import (
AsyncContextManager, AsyncContextManager,
AsyncGenerator, AsyncGenerator,
Iterable, Iterable,
Type,
) )
import json import json
@ -68,7 +67,7 @@ class NoBsWs:
''' '''
# apparently we can QoS for all sorts of reasons..so catch em. # apparently we can QoS for all sorts of reasons..so catch em.
recon_errors: tuple[Type[Exception]] = ( recon_errors = (
ConnectionClosed, ConnectionClosed,
DisconnectionTimeout, DisconnectionTimeout,
ConnectionRejected, ConnectionRejected,
@ -106,10 +105,7 @@ class NoBsWs:
def connected(self) -> bool: def connected(self) -> bool:
return self._connected.is_set() return self._connected.is_set()
async def reset( async def reset(self) -> None:
self,
timeout: float,
) -> bool:
''' '''
Reset the underlying ws connection by cancelling Reset the underlying ws connection by cancelling
the bg relay task and waiting for it to signal the bg relay task and waiting for it to signal
@ -118,31 +114,18 @@ class NoBsWs:
''' '''
self._connected = trio.Event() self._connected = trio.Event()
self._cs.cancel() self._cs.cancel()
with trio.move_on_after(timeout) as cs:
await self._connected.wait() await self._connected.wait()
return True
assert cs.cancelled_caught
return False
async def send_msg( async def send_msg(
self, self,
data: Any, data: Any,
timeout: float = 3,
) -> None: ) -> None:
while True: while True:
try: try:
msg: Any = self._dumps(data) msg: Any = self._dumps(data)
return await self._ws.send_message(msg) return await self._ws.send_message(msg)
except self.recon_errors: except self.recon_errors:
with trio.CancelScope(shield=True): await self.reset()
reconnected: bool = await self.reset(
timeout=timeout,
)
if not reconnected:
log.warning(
'Failed to reconnect after {timeout!r}s ??'
)
async def recv_msg(self) -> Any: async def recv_msg(self) -> Any:
msg: Any = await self._rx.receive() msg: Any = await self._rx.receive()
@ -208,9 +191,7 @@ async def _reconnect_forever(
f'{src_mod}\n' f'{src_mod}\n'
f'{url} connection bail with:' f'{url} connection bail with:'
) )
with trio.CancelScope(shield=True):
await trio.sleep(0.5) await trio.sleep(0.5)
rent_cs.cancel() rent_cs.cancel()
# go back to reonnect loop in parent task # go back to reonnect loop in parent task
@ -310,7 +291,6 @@ async def _reconnect_forever(
log.exception( log.exception(
'Reconnect-attempt failed ??\n' 'Reconnect-attempt failed ??\n'
) )
with trio.CancelScope(shield=True):
await trio.sleep(0.2) # throttle await trio.sleep(0.2) # throttle
raise berr raise berr
@ -371,7 +351,6 @@ async def open_autorecon_ws(
rcv: trio.MemoryReceiveChannel rcv: trio.MemoryReceiveChannel
snd, rcv = trio.open_memory_channel(616) snd, rcv = trio.open_memory_channel(616)
try:
async with ( async with (
tractor.trionics.collapse_eg(), tractor.trionics.collapse_eg(),
trio.open_nursery() as tn trio.open_nursery() as tn
@ -399,12 +378,6 @@ async def open_autorecon_ws(
finally: finally:
tn.cancel_scope.cancel() tn.cancel_scope.cancel()
except NoBsWs.recon_errors as con_err:
log.warning(
f'Entire ws-channel disconnect due to,\n'
f'con_err: {con_err!r}\n'
)
''' '''
JSONRPC response-request style machinery for transparent multiplexing JSONRPC response-request style machinery for transparent multiplexing