Compare commits

...

6 Commits

Author SHA1 Message Date
Gud Boi 6b70fea5d4 Merge pull request 'Tpt-tolerance adjustments for latest `tractor`' (#73)
Reviewed-on: #73
2026-02-23 03:08:18 +00:00
Gud Boi 4e24cb1bff Adjust sampler's "IPC-dropped" log msg styling
Refmt the "connection-dropped" error-log in `Sampler`'s broadcast loop
to show error type first, then the IPC context details; mks it all
easier to grok/less-noisy on console imo.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-22 20:08:20 -05:00
Gud Boi 3d83b61f3f Wrap `open_autorecon_ws()` body for comms failures
Add outer `try/except` around the nursery block in
`open_autorecon_ws()` to catch any `NoBsWs.recon_errors` that
escape the inner reconnect loop, logging a warning instead of
propagating.

Also,
- correct `NoBsWs.recon_errors` typing to `tuple[Type[Exception]]`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-22 20:08:20 -05:00
Gud Boi 6f390dc88c Add timeout + shielding to `NoBsWs` reconnect logic
Add timeout param to `.reset()` and `.send_msg()` to prevent
indefinite blocking on reconnect attempts. Shield reconnect
sleeps from cancellation to ensure we avoid any "finally footgun" type
scenarios where `trio.Cancelled` masks an underlying exc per,
- https://github.com/goodboy/tractor/pull/387
- https://github.com/goodboy/tractor/pull/391

Deats,
- add `timeout` param to `.reset()`, return `bool` for success
- add `timeout=3` default to `.send_msg()` for reconnect wait
- shield `.reset()` call in `.send_msg()` error handler
- log warning when reconnect timeout exceeded
- shield throttled sleeps in `_reconnect_forever()` error paths

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-22 20:08:20 -05:00
Gud Boi e1f3d7c3f8 Handle `tractor.TransportClosed` as "stream-closed"
In both the ems and sampler since on new `tractor` this is the
"wrapping" exception raised when the transport layer terminates early
but in a psuedo-"graceful" way, expected when a peer actors disconnect.
Previously we were crashing in this case since old `tractor` just raised
the underlying `trio`-source-exceptions verbatim.

Also,
- use `Aid.reprol()` in log msgs vs old `.chan.uid` refs

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-22 20:08:20 -05:00
Gud Boi 600636784c Merge pull request 'tractor_struct_and_godw_mod' (#72)
Reviewed-on: #72
2026-02-22 23:39:10 +00:00
3 changed files with 64 additions and 34 deletions

View File

@ -729,6 +729,7 @@ class Router(Struct):
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
tractor.TransportClosed,
):
to_remove.add(client_stream)
log.warning(
@ -1699,5 +1700,5 @@ async def _emsd_main(
if not client_streams:
log.warning(
f'Order dialog is not being monitored:\n'
f'{oid} ->\n{client_stream._ctx.chan.uid}'
f'{oid!r} <-> {client_stream.chan.aid.reprol()}\n'
)

View File

@ -99,6 +99,7 @@ class Sampler:
trio.BrokenResourceError,
trio.ClosedResourceError,
trio.EndOfChannel,
tractor.TransportClosed,
)
# holds all the ``tractor.Context`` remote subscriptions for
@ -291,9 +292,10 @@ class Sampler:
except self.bcast_errors as err:
log.error(
f'Connection dropped for IPC ctx\n'
f'{stream._ctx}\n\n'
f'Due to {type(err)}'
f'Connection dropped for IPC ctx due to,\n'
f'{type(err)!r}\n'
f'\n'
f'{stream._ctx}'
)
borked.add(stream)
else:
@ -741,7 +743,7 @@ async def sample_and_broadcast(
log.warning(
f'Feed OVERRUN {sub_key}'
f'@{bus.brokername} -> \n'
f'feed @ {chan.uid}\n'
f'feed @ {chan.aid.reprol()}\n'
f'throttle = {throttle} Hz'
)

View File

@ -31,6 +31,7 @@ from typing import (
AsyncContextManager,
AsyncGenerator,
Iterable,
Type,
)
import json
@ -67,7 +68,7 @@ class NoBsWs:
'''
# apparently we can QoS for all sorts of reasons..so catch em.
recon_errors = (
recon_errors: tuple[Type[Exception]] = (
ConnectionClosed,
DisconnectionTimeout,
ConnectionRejected,
@ -105,7 +106,10 @@ class NoBsWs:
def connected(self) -> bool:
return self._connected.is_set()
async def reset(self) -> None:
async def reset(
self,
timeout: float,
) -> bool:
'''
Reset the underlying ws connection by cancelling
the bg relay task and waiting for it to signal
@ -114,18 +118,31 @@ class NoBsWs:
'''
self._connected = trio.Event()
self._cs.cancel()
with trio.move_on_after(timeout) as cs:
await self._connected.wait()
return True
assert cs.cancelled_caught
return False
async def send_msg(
self,
data: Any,
timeout: float = 3,
) -> None:
while True:
try:
msg: Any = self._dumps(data)
return await self._ws.send_message(msg)
except self.recon_errors:
await self.reset()
with trio.CancelScope(shield=True):
reconnected: bool = await self.reset(
timeout=timeout,
)
if not reconnected:
log.warning(
'Failed to reconnect after {timeout!r}s ??'
)
async def recv_msg(self) -> Any:
msg: Any = await self._rx.receive()
@ -191,7 +208,9 @@ async def _reconnect_forever(
f'{src_mod}\n'
f'{url} connection bail with:'
)
with trio.CancelScope(shield=True):
await trio.sleep(0.5)
rent_cs.cancel()
# go back to reonnect loop in parent task
@ -291,6 +310,7 @@ async def _reconnect_forever(
log.exception(
'Reconnect-attempt failed ??\n'
)
with trio.CancelScope(shield=True):
await trio.sleep(0.2) # throttle
raise berr
@ -351,6 +371,7 @@ async def open_autorecon_ws(
rcv: trio.MemoryReceiveChannel
snd, rcv = trio.open_memory_channel(616)
try:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
@ -378,6 +399,12 @@ async def open_autorecon_ws(
finally:
tn.cancel_scope.cancel()
except NoBsWs.recon_errors as con_err:
log.warning(
f'Entire ws-channel disconnect due to,\n'
f'con_err: {con_err!r}\n'
)
'''
JSONRPC response-request style machinery for transparent multiplexing