Compare commits

..

6 Commits

Author SHA1 Message Date
Gud Boi 6b70fea5d4 Merge pull request 'Tpt-tolerance adjustments for latest `tractor`' (#73)
Reviewed-on: #73
2026-02-23 03:08:18 +00:00
Gud Boi 4e24cb1bff Adjust sampler's "IPC-dropped" log msg styling
Refmt the "connection-dropped" error-log in `Sampler`'s broadcast loop
to show error type first, then the IPC context details; mks it all
easier to grok/less-noisy on console imo.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-22 20:08:20 -05:00
Gud Boi 3d83b61f3f Wrap `open_autorecon_ws()` body for comms failures
Add outer `try/except` around the nursery block in
`open_autorecon_ws()` to catch any `NoBsWs.recon_errors` that
escape the inner reconnect loop, logging a warning instead of
propagating.

Also,
- correct `NoBsWs.recon_errors` typing to `tuple[Type[Exception]]`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-22 20:08:20 -05:00
Gud Boi 6f390dc88c Add timeout + shielding to `NoBsWs` reconnect logic
Add timeout param to `.reset()` and `.send_msg()` to prevent
indefinite blocking on reconnect attempts. Shield reconnect
sleeps from cancellation to ensure we avoid any "finally footgun" type
scenarios where `trio.Cancelled` masks an underlying exc per,
- https://github.com/goodboy/tractor/pull/387
- https://github.com/goodboy/tractor/pull/391

Deats,
- add `timeout` param to `.reset()`, return `bool` for success
- add `timeout=3` default to `.send_msg()` for reconnect wait
- shield `.reset()` call in `.send_msg()` error handler
- log warning when reconnect timeout exceeded
- shield throttled sleeps in `_reconnect_forever()` error paths

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-22 20:08:20 -05:00
Gud Boi e1f3d7c3f8 Handle `tractor.TransportClosed` as "stream-closed"
In both the ems and sampler since on new `tractor` this is the
"wrapping" exception raised when the transport layer terminates early
but in a psuedo-"graceful" way, expected when a peer actors disconnect.
Previously we were crashing in this case since old `tractor` just raised
the underlying `trio`-source-exceptions verbatim.

Also,
- use `Aid.reprol()` in log msgs vs old `.chan.uid` refs

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-22 20:08:20 -05:00
Gud Boi 600636784c Merge pull request 'tractor_struct_and_godw_mod' (#72)
Reviewed-on: #72
2026-02-22 23:39:10 +00:00
3 changed files with 64 additions and 34 deletions

View File

@ -729,6 +729,7 @@ class Router(Struct):
except ( except (
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
tractor.TransportClosed,
): ):
to_remove.add(client_stream) to_remove.add(client_stream)
log.warning( log.warning(
@ -1699,5 +1700,5 @@ async def _emsd_main(
if not client_streams: if not client_streams:
log.warning( log.warning(
f'Order dialog is not being monitored:\n' f'Order dialog is not being monitored:\n'
f'{oid} ->\n{client_stream._ctx.chan.uid}' f'{oid!r} <-> {client_stream.chan.aid.reprol()}\n'
) )

View File

@ -99,6 +99,7 @@ class Sampler:
trio.BrokenResourceError, trio.BrokenResourceError,
trio.ClosedResourceError, trio.ClosedResourceError,
trio.EndOfChannel, trio.EndOfChannel,
tractor.TransportClosed,
) )
# holds all the ``tractor.Context`` remote subscriptions for # holds all the ``tractor.Context`` remote subscriptions for
@ -291,9 +292,10 @@ class Sampler:
except self.bcast_errors as err: except self.bcast_errors as err:
log.error( log.error(
f'Connection dropped for IPC ctx\n' f'Connection dropped for IPC ctx due to,\n'
f'{stream._ctx}\n\n' f'{type(err)!r}\n'
f'Due to {type(err)}' f'\n'
f'{stream._ctx}'
) )
borked.add(stream) borked.add(stream)
else: else:
@ -741,7 +743,7 @@ async def sample_and_broadcast(
log.warning( log.warning(
f'Feed OVERRUN {sub_key}' f'Feed OVERRUN {sub_key}'
f'@{bus.brokername} -> \n' f'@{bus.brokername} -> \n'
f'feed @ {chan.uid}\n' f'feed @ {chan.aid.reprol()}\n'
f'throttle = {throttle} Hz' f'throttle = {throttle} Hz'
) )

View File

@ -31,6 +31,7 @@ from typing import (
AsyncContextManager, AsyncContextManager,
AsyncGenerator, AsyncGenerator,
Iterable, Iterable,
Type,
) )
import json import json
@ -67,7 +68,7 @@ class NoBsWs:
''' '''
# apparently we can QoS for all sorts of reasons..so catch em. # apparently we can QoS for all sorts of reasons..so catch em.
recon_errors = ( recon_errors: tuple[Type[Exception]] = (
ConnectionClosed, ConnectionClosed,
DisconnectionTimeout, DisconnectionTimeout,
ConnectionRejected, ConnectionRejected,
@ -105,7 +106,10 @@ class NoBsWs:
def connected(self) -> bool: def connected(self) -> bool:
return self._connected.is_set() return self._connected.is_set()
async def reset(self) -> None: async def reset(
self,
timeout: float,
) -> bool:
''' '''
Reset the underlying ws connection by cancelling Reset the underlying ws connection by cancelling
the bg relay task and waiting for it to signal the bg relay task and waiting for it to signal
@ -114,18 +118,31 @@ class NoBsWs:
''' '''
self._connected = trio.Event() self._connected = trio.Event()
self._cs.cancel() self._cs.cancel()
with trio.move_on_after(timeout) as cs:
await self._connected.wait() await self._connected.wait()
return True
assert cs.cancelled_caught
return False
async def send_msg( async def send_msg(
self, self,
data: Any, data: Any,
timeout: float = 3,
) -> None: ) -> None:
while True: while True:
try: try:
msg: Any = self._dumps(data) msg: Any = self._dumps(data)
return await self._ws.send_message(msg) return await self._ws.send_message(msg)
except self.recon_errors: except self.recon_errors:
await self.reset() with trio.CancelScope(shield=True):
reconnected: bool = await self.reset(
timeout=timeout,
)
if not reconnected:
log.warning(
'Failed to reconnect after {timeout!r}s ??'
)
async def recv_msg(self) -> Any: async def recv_msg(self) -> Any:
msg: Any = await self._rx.receive() msg: Any = await self._rx.receive()
@ -191,7 +208,9 @@ async def _reconnect_forever(
f'{src_mod}\n' f'{src_mod}\n'
f'{url} connection bail with:' f'{url} connection bail with:'
) )
with trio.CancelScope(shield=True):
await trio.sleep(0.5) await trio.sleep(0.5)
rent_cs.cancel() rent_cs.cancel()
# go back to reonnect loop in parent task # go back to reonnect loop in parent task
@ -291,6 +310,7 @@ async def _reconnect_forever(
log.exception( log.exception(
'Reconnect-attempt failed ??\n' 'Reconnect-attempt failed ??\n'
) )
with trio.CancelScope(shield=True):
await trio.sleep(0.2) # throttle await trio.sleep(0.2) # throttle
raise berr raise berr
@ -351,6 +371,7 @@ async def open_autorecon_ws(
rcv: trio.MemoryReceiveChannel rcv: trio.MemoryReceiveChannel
snd, rcv = trio.open_memory_channel(616) snd, rcv = trio.open_memory_channel(616)
try:
async with ( async with (
tractor.trionics.collapse_eg(), tractor.trionics.collapse_eg(),
trio.open_nursery() as tn trio.open_nursery() as tn
@ -378,6 +399,12 @@ async def open_autorecon_ws(
finally: finally:
tn.cancel_scope.cancel() tn.cancel_scope.cancel()
except NoBsWs.recon_errors as con_err:
log.warning(
f'Entire ws-channel disconnect due to,\n'
f'con_err: {con_err!r}\n'
)
''' '''
JSONRPC response-request style machinery for transparent multiplexing JSONRPC response-request style machinery for transparent multiplexing