Compare commits

...

3 Commits

Author SHA1 Message Date
Gud Boi ef3309adf9 Add timeout + shielding to `NoBsWs` reconnect logic
Add timeout param to `.reset()` and `.send_msg()` to prevent
indefinite blocking on reconnect attempts. Shield reconnect
sleeps from cancellation to ensure we avoid any "finally footgun" type
scenarios where `trio.Cancelled` masks an underlying exc per,
- https://github.com/goodboy/tractor/pull/387
- https://github.com/goodboy/tractor/pull/391

Deats,
- add `timeout` param to `.reset()`, return `bool` for success
- add `timeout=3` default to `.send_msg()` for reconnect wait
- shield `.reset()` call in `.send_msg()` error handler
- log warning when reconnect timeout exceeded
- shield throttled sleeps in `_reconnect_forever()` error paths

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-04 20:09:35 -05:00
Gud Boi ac6ab3791e Handle `tractor.TransportClosed` as "stream-closed"
In both the ems and sampler since on new `tractor` this is the
"wrapping" exception raised when the transport layer terminates early
but in a psuedo-"graceful" way, expected when a peer actors disconnect.
Previously we were crashing in this case since old `tractor` just raised
the underlying `trio`-source-exceptions verbatim.

Also,
- use `Aid.reprol()` in log msgs vs old `.chan.uid` refs

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-04 20:02:28 -05:00
Gud Boi d0eb6b479d .data.feed: move `Flume` import to avoid cycle
Move `Flume` to `TYPE_CHECKING` and add runtime imports in
`allocate_persistent_feed()` + `open_feed()` to avoid cycle
with `.flows` mod.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-04 19:31:33 -05:00
4 changed files with 34 additions and 9 deletions

View File

@ -729,6 +729,7 @@ class Router(Struct):
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
tractor.TransportClosed,
):
to_remove.add(client_stream)
log.warning(
@ -1699,5 +1700,5 @@ async def _emsd_main(
if not client_streams:
log.warning(
f'Order dialog is not being monitored:\n'
f'{oid} ->\n{client_stream._ctx.chan.uid}'
f'{oid!r} <-> {client_stream.chan.aid.reprol()}\n'
)

View File

@ -99,6 +99,7 @@ class Sampler:
trio.BrokenResourceError,
trio.ClosedResourceError,
trio.EndOfChannel,
tractor.TransportClosed,
)
# holds all the ``tractor.Context`` remote subscriptions for
@ -753,7 +754,7 @@ async def sample_and_broadcast(
log.warning(
f'Feed OVERRUN {sub_key}'
f'@{bus.brokername} -> \n'
f'feed @ {chan.uid}\n'
f'feed @ {chan.aid.reprol()}\n'
f'throttle = {throttle} Hz'
)

View File

@ -105,7 +105,10 @@ class NoBsWs:
def connected(self) -> bool:
return self._connected.is_set()
async def reset(self) -> None:
async def reset(
self,
timeout: float,
) -> bool:
'''
Reset the underlying ws connection by cancelling
the bg relay task and waiting for it to signal
@ -114,18 +117,31 @@ class NoBsWs:
'''
self._connected = trio.Event()
self._cs.cancel()
await self._connected.wait()
with trio.move_on_after(timeout) as cs:
await self._connected.wait()
return True
assert cs.cancelled_caught
return False
async def send_msg(
self,
data: Any,
timeout: float = 3,
) -> None:
while True:
try:
msg: Any = self._dumps(data)
return await self._ws.send_message(msg)
except self.recon_errors:
await self.reset()
with trio.CancelScope(shield=True):
reconnected: bool = await self.reset(
timeout=timeout,
)
if not reconnected:
log.warning(
'Failed to reconnect after {timeout!r}s ??'
)
async def recv_msg(self) -> Any:
msg: Any = await self._rx.receive()
@ -191,7 +207,9 @@ async def _reconnect_forever(
f'{src_mod}\n'
f'{url} connection bail with:'
)
await trio.sleep(0.5)
with trio.CancelScope(shield=True):
await trio.sleep(0.5)
rent_cs.cancel()
# go back to reonnect loop in parent task
@ -291,7 +309,8 @@ async def _reconnect_forever(
log.exception(
'Reconnect-attempt failed ??\n'
)
await trio.sleep(0.2) # throttle
with trio.CancelScope(shield=True):
await trio.sleep(0.2) # throttle
raise berr
#|_ws & nursery block ends

View File

@ -62,7 +62,6 @@ from ._util import (
log,
get_console_log,
)
from .flows import Flume
from .validate import (
FeedInit,
validate_backend,
@ -77,6 +76,7 @@ from ._sampling import (
)
if TYPE_CHECKING:
from .flows import Flume
from tractor._addr import Address
from tractor.msg.types import Aid
@ -362,6 +362,8 @@ async def allocate_persistent_feed(
)
await some_data_ready.wait()
# XXX, avoid cycle; it imports this mod.
from .flows import Flume
flume = Flume(
# TODO: we have to use this for now since currently the
@ -500,7 +502,6 @@ async def open_feed_bus(
sub_registered = trio.Event()
flumes: dict[str, Flume] = {}
for symbol in symbols:
# if no cached feed for this symbol has been created for this
@ -684,6 +685,7 @@ class Feed(Struct):
'''
mods: dict[str, ModuleType] = {}
portals: dict[ModuleType, tractor.Portal] = {}
flumes: dict[
str, # FQME
Flume,
@ -951,6 +953,8 @@ async def open_feed(
assert len(feed.mods) == len(feed.portals)
# XXX, avoid cycle; it imports this mod.
from .flows import Flume
async with (
trionics.gather_contexts(bus_ctxs) as ctxs,
):