Tpt-tolerance adjustments for latest tractor #73

Merged
goodboy merged 4 commits from tpt_closed_and_finally_footguns into main 2026-02-23 03:08:19 +00:00
3 changed files with 64 additions and 34 deletions

View File

@ -729,6 +729,7 @@ class Router(Struct):
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
tractor.TransportClosed,
):
to_remove.add(client_stream)
log.warning(
@ -1699,5 +1700,5 @@ async def _emsd_main(
if not client_streams:
log.warning(
f'Order dialog is not being monitored:\n'
f'{oid} ->\n{client_stream._ctx.chan.uid}'
f'{oid!r} <-> {client_stream.chan.aid.reprol()}\n'
)

View File

@ -99,6 +99,7 @@ class Sampler:
trio.BrokenResourceError,
trio.ClosedResourceError,
trio.EndOfChannel,
tractor.TransportClosed,
)
# holds all the ``tractor.Context`` remote subscriptions for
@ -291,9 +292,10 @@ class Sampler:
except self.bcast_errors as err:
log.error(
f'Connection dropped for IPC ctx\n'
f'{stream._ctx}\n\n'
f'Due to {type(err)}'
f'Connection dropped for IPC ctx due to,\n'
f'{type(err)!r}\n'
f'\n'
f'{stream._ctx}'
)
borked.add(stream)
else:
@ -741,7 +743,7 @@ async def sample_and_broadcast(
log.warning(
f'Feed OVERRUN {sub_key}'
f'@{bus.brokername} -> \n'
f'feed @ {chan.uid}\n'
f'feed @ {chan.aid.reprol()}\n'
f'throttle = {throttle} Hz'
)

View File

@ -31,6 +31,7 @@ from typing import (
AsyncContextManager,
AsyncGenerator,
Iterable,
Type,
)
import json
@ -67,7 +68,7 @@ class NoBsWs:
'''
# apparently we can QoS for all sorts of reasons..so catch em.
recon_errors = (
recon_errors: tuple[Type[Exception]] = (
ConnectionClosed,
DisconnectionTimeout,
ConnectionRejected,
@ -105,7 +106,10 @@ class NoBsWs:
def connected(self) -> bool:
return self._connected.is_set()
async def reset(self) -> None:
async def reset(
self,
timeout: float,
) -> bool:
'''
Reset the underlying ws connection by cancelling
the bg relay task and waiting for it to signal
@ -114,18 +118,31 @@ class NoBsWs:
'''
self._connected = trio.Event()
self._cs.cancel()
with trio.move_on_after(timeout) as cs:
await self._connected.wait()
return True
assert cs.cancelled_caught
return False
async def send_msg(
self,
data: Any,
timeout: float = 3,
) -> None:
while True:
try:
msg: Any = self._dumps(data)
return await self._ws.send_message(msg)
except self.recon_errors:
await self.reset()
with trio.CancelScope(shield=True):
reconnected: bool = await self.reset(
timeout=timeout,
)
if not reconnected:
log.warning(
'Failed to reconnect after {timeout!r}s ??'
)
async def recv_msg(self) -> Any:
msg: Any = await self._rx.receive()
@ -191,7 +208,9 @@ async def _reconnect_forever(
f'{src_mod}\n'
f'{url} connection bail with:'
)
with trio.CancelScope(shield=True):
await trio.sleep(0.5)
rent_cs.cancel()
# go back to reonnect loop in parent task
@ -291,6 +310,7 @@ async def _reconnect_forever(
log.exception(
'Reconnect-attempt failed ??\n'
)
with trio.CancelScope(shield=True):
await trio.sleep(0.2) # throttle
raise berr
@ -351,6 +371,7 @@ async def open_autorecon_ws(
rcv: trio.MemoryReceiveChannel
snd, rcv = trio.open_memory_channel(616)
try:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
@ -378,6 +399,12 @@ async def open_autorecon_ws(
finally:
tn.cancel_scope.cancel()
except NoBsWs.recon_errors as con_err:
log.warning(
f'Entire ws-channel disconnect due to,\n'
f'con_err: {con_err!r}\n'
)
'''
JSONRPC response-request style machinery for transparent multiplexing