- drop `shield` input to `MsgStream`
- check for cancel called prior to loading the feeder mem chan
in `Context.open_stream()`
- warn on a timeout when trying to cancel a remote task from
`Context.cancel()`
- drop noop endofchannel handler block
The `collections.deque` takes care of array length truncation of values
for us implicitly but in the future we'll likely want this value exposed
to alternate array implementations. This patch is to provide for that as
well as make `mypy` happy since the `dequeu.maxlen` can also be `None`.
Get rid of all the (requirements for) clones of the underlying
receivable. We can just use a uuid generated key for each instance
(thinking now this can probably just be `id(self)`). I'm fully convinced
now that channel cloning is only a source of confusion and anti-patterns
when we already have nurseries to define resource lifetimes. There is no
benefit in particular when you allocate subscriptions using a context
manager (not sure why `trio.open_memory_channel()` doesn't enforce
this).
Further refinements:
- add a `._closed` state that will error the receiver on reuse
- drop module script section; it's been moved to a real test
- call the "receiver" duck-type stub a new name
This allows for wrapping an existing stream by re-assigning its receive
method to the allocated broadcaster's `.receive()` so as to avoid
expecting any original consumer(s) of the stream to now know about the
broadcaster; this instead mutates the stream to delegate to the new
receive call behind the scenes any time `.subscribe()` is called.
Add a `typing.Protocol` for so called "cloneable channels" until we
decide/figure out a better keying system for each subscription and
mask all undesired typing failures.
Add `ReceiveMsgStream.subscribe()` which allows allocating a broadcast
receiver around the stream for use by multiple actor-local consumer
tasks. Entering this context manager idempotently mutates the stream's
receive machinery which for now can not be undone. Move `.clone()` to
the receive stream type.
Resolves#204
For every set of broadcast receivers which pull from the same producer,
we need a singleton state for all of,
- subscriptions
- the sender ready event
- the queue
Add a `BroadcastState` dataclass for this and pass it to all
subscriptions. This makes the design much more like the built-in memory
channels which do something very similar with `MemoryChannelState`.
Use a `filter()` on the subs list in the sequence update step, plus some
other commented approaches we can try for speed.
Using the current task as a subscription key fails horribly as soon as
you hand off new subscription receiver to another task you've spawned..
Instead use the underlying ``trio.abc.ReceiveChannel.clone()`` as a key
(so i guess we're assuming cloning is supported by the underlying?)
which makes this all work just like default mem chans. As a bonus, now
we can just close the underlying rx (which may be a clone) on
`.aclose()` and everything should just work in terms of the underlying
channels lifetime (i think?).
Change `.subscribe()` to be async since the receive channel type
interface only expects `.aclose()` and it actually ends up being
nicer for 3.9+ style `async with` parentheses style anyway.
Buncha improvements:
- pass in the queue via constructor
- tracking over all underlying memory channel closure using cloning
- do it like `tokio` and set lagged consumers to the last sequence
before raising
- copy the subs on first receiver wakeup for iteration instead of
iterating the table directly (and being forced to skip the current
tasks sequence increment)
- implement `.aclose()` to close the underlying clone for this task
- make `broadcast_receiver()` just take the recv chan since it doesn't
need anything on the send side.
We're not actually using this but it's for reference if we do end up
needing it.
The std lib's `pdb` internals override SIGINT handling whenever one
enters the debugger repl. Force a handler that kills the tree if SIGINT
is triggered from the root actor, otherwise ignore it since supervised
children should be managed already. This resolves an issue with guest
mode where `pdb` causes SIGINTs to be swallowed resulting in the host
loop never terminating the process tree.
The whole origin was not having an explicit open/close semantic for
streams. We have that now so this internal mechanic isn't needed and
further our streams become more correct by having `.aclose()` be
independent of cancellation.
Finally this makes a cancelled root actor nursery not clobber child
tasks which request and lock the root's tty for the debugger repl.
Using an edge triggered event which is set after all fifo-lock-queued
tasks are complete, we can be sure that no lingering child tasks are
going to get interrupted during pdb use and tty lock acquisition.
Further, even if new tasks do queue up to get the lock, the root will
incrementally send cancel msgs to each sub-actor only once the tty is
not locked by a (set of) child request task(s). Add shielding around all
the critical sections where the child attempts to allocate the lock from
the root such that it won't be disrupted from cancel messages from the
root after the acquire lock transaction has started.
If the root calls `trio.Process.kill()` on immediate child proc teardown
when the child is using pdb, we can get stdstreams clobbering that
results in a pdb++ repl where the user can't see what's been typed. Not
killing such children on cancellation / error seems to resolve this
issue whilst still giving reliable termination. For now, code that
special path until a time it becomes a problem for ensuring zombie
reaps.
A context is the natural fit (vs. a receive stream) for locking the root
proc's tty usage via it's `.started()` sync point. Simplify the
`_breakpoin()` routine to be a simple async func instead of all this
"returning a coroutine" stuff from before we decided that
`tractor.breakpoint()` must be async. Use `runtime` level for locking
logging making it easier to trace.
Another face palm that was causing serious issues for code that is using
the `.shielded` feature..
Add a bunch more detailed comments for all this subtlety and hopefully
get it right once and for all. Also aggregated the `trio` errors that
should trigger closure inside `.aclose()`, hopefully that's right too.
Revert this change since it really is poking at internals and doesn't
make a lot of sense. If the context is going to be cancelled then the
msg loop will tear down the feed memory channel when ready, we don't
need to be clobbering it and confusing the runtime machinery lol.
Add clear teardown semantics for `Context` such that the remote side
cancellation propagation happens only on error or if client code
explicitly requests it (either by exit flag to `Portal.open_context()`
or by manually calling `Context.cancel()`). Add `Context.result()`
to wait on and capture the final result from a remote context function;
any lingering msg sequence will be consumed/discarded.
Changes in order to make this possible:
- pass the runtime msg loop's feeder receive channel in to the context
on the calling (portal opening) side such that a final 'return' msg
can be waited upon using `Context.result()` which delivers the final
return value from the callee side `@tractor.context` async function.
- always await a final result from the target context function in
`Portal.open_context()`'s `__aexit__()` if the context has not
been (requested to be) cancelled by client code on block exit.
- add an internal `Context._cancel_called` for context "cancel
requested" tracking (much like `trio`'s cancel scope).
- allow flagging a stream as terminated using an internal
`._eoc` flag which will mark the stream as stopped for iteration.
- drop `StopAsyncIteration` catching in `.receive()`; it does
nothing.
This mostly adds the api described in
https://github.com/goodboy/tractor/issues/53#issuecomment-806258798
The first draft summary:
- formalize bidir steaming using the `trio.Channel` style interface
which we derive as a `MsgStream` type.
- add `Portal.open_context()` which provides a `trio.Nursery.start()`
remote task invocation style for setting up and tearing down tasks
contexts in remote actors.
- add a distinct `'started'` message to the ipc protocol to facilitate
`Context.start()` with a first return value.
- for our `ReceiveMsgStream` type, don't cancel the remote task in
`.aclose()`; this is now done explicitly by the surrounding `Context`
usage: `Context.cancel()`.
- streams in either direction still use a `'yield'` message keeping the
proto mostly symmetric without having to worry about which side is the
caller / portal opener.
- subtlety: only allow sending a `'stop'` message during a 2-way
streaming context from `ReceiveStream.aclose()`, detailed comment
with explanation is included.
Relates to #53
Since we currently have no real "discovery protocol" between process
trees, the current naive approach is to check via a connect and drop to
see if a TCP server is bound to a particular address during root actor
startup. This was a historical decision and had no real grounding beyond
taking a simple approach to get something working when the project
was first started.
This is obviously problematic from an error handling perspective since
we need to be able to avoid such quick connect-and-drops from cancelling
an "arbiter"'s (registry actor's) channel-msg loop machinery (which
would propagate and cancel the actor).
For now we map this particular TCP error, which gets remapped by `trio`
as a `trio.BrokenResourceError` to our own internal `TransportClosed`
which is swallowed by channel message loop processing and indicates
a graceful teardown of the far end actor.
This change some super old (and bad) code from the project's very early
days. For some redic reason i must have thought masking `trio`'s
internal stream / transport errors and a TCP EOF as `StopAsyncIteration`
somehow a good idea. The reality is you probably
want to know the difference between an unexpected transport error
and a simple EOF lol. This begins to resolve that by adding our own
special `TransportClosed` error to signal the "graceful" termination of
a channel's underlying transport. Oh, and this builds on the `msgspec`
integration which helped shed light on the core issues here B)
It's clear now that special attention is needed to handle the case where
a spawned `multiprocessing` proc is started but then the parent is
cancelled before the child can connect back; in this case we need to be
sure to kill the near-zombie child asap. This may end up being the
solution to other resiliency issues seen around mp with nested process
trees too. More testing is needed to be sure.
Relates to #84#89#134#146
NB: this is a breaking change removing support for `Portal.run()` being
able to invoke remote streaming functions and instead replacing the
method call with an async context manager api `Portal.open_stream_from()`
This style explicitly defines stream teardown at the call site instead
of expecting the user to handle tricky things correctly themselves: eg.
`async_geneartor.aclosing()`. Going forward `Portal.run()` can be used
only for invoking async functions.
Move receive stream into streaming modules and rebrand as a "message
stream". Factor out cancellation mechanics in `.aclose()` into the
`Context` type which will soon provide the api for for cancelling portal
invocations. Comment-stage a few methods on both types in anticipation
of a new bi-directional streaming api. Add a `MsgStream` bidirectional
channel type which will be the eventual type yielded from
`Context.open_stream()`. Adjust the response/dialog types to be the set
`{'asyncfun', 'asyncgen', 'context'}`. OH, and add async func checking
in `Portal.run()` to catch and error on sync funcs early.
You can always wrap a sync function in an async one and there seems to
be no good reason to support invoking them directly especially since
cancellation won't work without some thread hackery. If it's requested
we'll point users to `trio-parallel`.
Resolves#77
Add a sync method that can be used to cancel the current actor from
a synchronous context. This is useful in debugging situations where
sync debugger code may need to kill the process tree.
Also, make the internal "lifetime stack" a global var; easier to manage
from client code that may was to add callbacks prior to the actor
runtime being fully setup.
Using `None` as the default key for a `@msg.pub` can cause conflicts if
there is more then one "taskless" (no tasks={,} passed) pub offered on
an actor... So instead use the first trio "task name" (usually just the
function name) instead thus avoiding this very hard to debug and
understand problem.
Probably should throw in a test but I'm super lazy today.
This begins the move to dropping support for `tractor.run()` which we
don't really need since the runtime is started (as it always has been)
from a new sub-task / nursery. Instead this introduces starting the
actor tree through a `open_root_actor()` async context manager which
we'll likely implicitly call (from the root) on the first use of an
actor nursery.
Drop `_actor._start_actor()` and factor its contents into this new api.
Make `run()` and `run_daemon()` use `open_root_actor()` until we decide
to remove them.
Relates to #168 and #177
It turns out in order to maintain our sneaky little "call an `Actor`
method in this remote process" we still need the ability to invoke
functions from a namespace. We're currently using a "self" namespace as
a way to do this for internal inter-process method calling. Either way,
I see no reason not to keep a public method for this invoke style (we
just won't market it) since it is still how the machinery works
underneath.