Compare commits

...

35 Commits

Author SHA1 Message Date
Tyler Goodlet 1c73c0c0ee Start a very basic ipc-server unit test suite
For now it just boots a server, parametrized over all tpt-protos, sin
any actor runtime bootup. Obvi the future todo is ensuring it all works
with a client connecting via the equivalent lowlevel
`.ipc._chan._connect_chan()` API(s).
2025-07-08 12:59:22 -04:00
Tyler Goodlet 101cd94e89 Decouple actor-state from low-level ipc-server
As much as is possible given we currently do some graceful
cancellation join-waiting on any connected sub-actors whenever an active
`local_nursery: AcrtorNursery` in the post-rpc teardown sequence of
`handle_stream_from_peer()` is detected. In such cases we try to allow
the higher level inter-actor (task) context(s) to fully cancelled-ack
before conducting IPC machinery shutdown.

The main immediate motivation for all this is to support unit testing
the `.ipc._server` APIs but in the future may be useful for anyone
wanting to use our modular IPC transport layer sin-"actors".

Impl deats,
- drop passing an `actor: Actor` ref from as many routines in
  `.ipc._server` as possible instead opting to use
  `._state.current_actor()` where abs needed; thus the fns dropping an
  `actor` input param are:
  - `open_ipc_server()`
  - `IPCServer.listen_on()`
  - `._serve_ipc_eps()`
  - `.handle_stream_from_peer()`
- factor the above mentioned graceful remote-cancel-ack waiting into
  a new `maybe_wait_on_canced_subs()` which is called from
  `handle_stream_from_peer()` and delivers a
  maybe-`local_nursery: ActorNursery` for downstream logic; it's this
  new fn which primarily still needs to call `current_actor()`.
- in `handle_stream_from_peer()` also use `current_actor()` to check if
  a handshake is needed (or if it was called as part of some
  actor-runtime-less operation like our unit test suite!).
- also don't pass an `actor` to `._rpc.process_messages()` see how-n-why
  below..

Surrounding ipc-server client/caller adjustments,
- `._rpc.process_messages()` no longer takes an `actor` input and
  now calls `current_actor()` instead.
- `._portal.open_portal()` is adjusted to ^.
- `._runtime.async_main()` is adjusted to the `.ipc._server`'s removal
  of `actor` ref passing.

Also,
- drop some server `log.info()`s to `.runtime()`
2025-07-08 12:59:22 -04:00
Tyler Goodlet 3f33ba1cc0 Log listener bind status for TCP as for UDS 2025-07-08 12:59:22 -04:00
Tyler Goodlet 70f5315506 Move peer-tracking attrs from `Actor` -> `IPCServer`
Namely transferring the `Actor` peer-`Channel` tracking attrs,
- `._peers` which maps the uids to client channels (with duplicates
  apparently..)
- the `._peer_connected: dict[tuple[str, str], trio.Event]` child-peer
  syncing table mostly used by parent actors to wait on sub's to connect
  back during spawn.
- the `._no_more_peers = trio.Event()` level triggered state signal.

Further we move over with some minor reworks,
- `.wait_for_peer()` verbatim (adjusting all dependants).
- factor the no-more-peers shielded wait branch-block out of
  the end of `async_main()` into 2 new server meths,
  * `.has_peers()` with optional chan-connected checking flag.
  * `.wait_for_no_more_peers()` which *just* does the
    maybe-shielded `._no_more_peers.wait()`
2025-07-08 12:59:19 -04:00
Tyler Goodlet 496fac04bb Mv `Actor._stream_handler()` to `.ipc._server` func
Call it `handle_stream_from_peer()` and bind in the `actor: Actor` via
a `handler=partial()` to `trio.serve_listeners()`.

With this (minus the `Actor._peers/._peer_connected/._no_more_peers`
attrs ofc) we get nearly full separation of IPC-connection-processing
(concerns) from `Actor` state. Thus it's a first look at modularizing
the low-level runtime into isolated subsystems which will hopefully
improve the entire code base's grok-ability and ease any new feature
design discussions especially pertaining to introducing and/or
composing-together any new transport protocols.
2025-07-08 12:57:29 -04:00
Tyler Goodlet 02baeb6a8b Passthrough `_pause()` kwargs from `_maybe_enter_pm()` 2025-07-08 12:57:29 -04:00
Tyler Goodlet d4ab802e14 Fix assert on `.devx.maybe_open_crash_handler()` delivered `bxerr` 2025-07-08 12:57:29 -04:00
Tyler Goodlet fdeaeef9f7 Improve bit of tooling for `test_resource_cache.py`
Namely while what I was actually trying to solve was why
`TransportClosed` was getting raised from `Portal.cancel_actor()` but
still useful edge case auditing either way. Also opts into the
`debug_mode` fixture with apprope timeout adjustment B)
2025-07-08 12:57:29 -04:00
Tyler Goodlet 41609d1433 Never hide non-[msgtype/tpt-closed] error tbs in `Channel.send()` 2025-07-08 12:57:29 -04:00
Tyler Goodlet c9068522ed Set `_state._def_tpt_proto` in `tpt_proto` fixture
Such that the global test-session always (and only) runs against the CLI
specified `--tpt-proto=` transport protocol.
2025-07-08 12:57:29 -04:00
Tyler Goodlet f3285ea870 Use `current_ipc_protos()` as the `enable_transports`-default-when-`None`
Also ensure we assertion-error whenever the list is > 1 entry for now!
2025-07-08 12:57:29 -04:00
Tyler Goodlet a8caff9077 Add `_state.current_ipc_protos()`
For now just wrapping wtv the `._def_tpt_proto` per-actor setting is.
2025-07-08 12:57:29 -04:00
Tyler Goodlet 31a7e3b3c5 Another `tn` eg-loosify inside `ActorNursery.cancel()`.. 2025-07-08 12:57:29 -04:00
Tyler Goodlet 6163d42424 Absorb `TransportClosed` in `Portal.cancel_actor()`
Just like we *were* for the `trio`-resource-errors it normally wraps
since we now also do the same wrapping in `MsgpackTransport.send()`
and we don't normally care to raise tpt-closure-errors on graceful actor
cancel requests.

Also, warn-report any non-tpt-closed low-level `trio` errors we haven't
yet re-wrapped (likely bc they haven't shown up).
2025-07-08 12:57:29 -04:00
Tyler Goodlet 4540309296 Add `TransportClosed.from_src_exc()`
Such that re-wrapping/raising from a low-level `trio` resource error is
simpler and includes the `.src_exc` in the `__repr__()` and
`.message/.args` rendered at higher layers (like from `Channel` and
`._rpc` machinery).

Impl deats,
- mainly leverages packing in a new cls-method `.repr_src_exc() -> str:`
  repr of the underlying error before an optional `body: str` all as
  handled by the previously augmented `.pformat()`'s delegation to
  `pformat_exc()`.
- change `.src_exc` to be a property around a renamed `._src_exc`.

But wait, why?
- use it inside `MsgpackTransport.send()` to rewrap any
  `trio.BrokenResourceError`s so we always see the underlying
  `trio`-src-exc just like in the `.recv()._iter_packets()` handlers.
2025-07-08 12:57:29 -04:00
Tyler Goodlet 4c8fadac85 Factor actor-embedded IPC-tpt-server to `ipc` subsys
Primarily moving the `Actor._serve_forever()`-task-as-method and
supporting actor-instance attributes to a new `.ipo._server` sub-mod
which now encapsulates,
- the coupling various `trio.Nursery`s (and their independent lifetime mgmt)
  to different `trio.serve_listener()`s tasks and `SocketStream`
  handler scopes.
- `Address` and `SocketListener` mgmt and tracking through the idea of
  an "IPC endpoint": each "bound-and-active instance" of a served-listener
  for some (varied transport protocol's socket) address.
- start and shutdown of the entire server's lifetime via an `@acm`.
- delegation of starting/stopping tpt-protocol-specific `trio.abc.Listener`s
  to the corresponding `.ipc._<proto_key>` sub-module (newly defined
  mod-top-level instead of `Address` method) `start/close_listener()`
  funcs.

Impl details of the `.ipc._server` sub-sys,
- add new `IPCServer`, allocated with `open_ipc_server()`, and which
  encapsulates starting multiple-transport-proto-`trio.abc.Listener`s
  from an input set of `._addr.Address`s using,
  |_`IPCServer.listen_on()` which internally spawns tasks that delegate to a new
    `_serve_ipc_eps()`, a rework of what was (effectively)
    `Actor._serve_forever()` and which now,
    * allocates a new `IPCEndpoint`-struct (see below) for each
      address-listener pair alongside the specified
      listener-serving/stream-handling `trio.Nursery`s provided by the
      caller.
    * starts and stops each transport (socket's) listener by calling
      `IPCEndpoint.start/close_listener()` which in turn delegates to
      the underlying `inspect.getmodule(IPCEndpoint.addr)` backend tpt
      module's equivalent impl.
    * tracks all created endpoints in a `._endpoints: list[IPCEndpoint]`
      which is further exposed through public properties for
      introspection of served transport-protocols and their addresses.
  |_`IPCServer._[parent/stream_handler]_tn: Nursery`s which are either
     allocated (in which case, as the same instance) or provided by the
     caller of `open_ipc_server()` such that the same nursery-cancel-scope
     controls offered by `trio.serve_listeners(handler_nursery=)` are
     offered where the `._parent_tn` is used to spawn `_serve_ipc_eps()`
     tasks, and `._stream_handler_tn` is passed verbatim as `handler_nursery`.
- a new `IPCEndpoint`-struct (as mentioned) which wraps each
  transport-proto's address + listener + allocated-supervising-nursery
  to encapsulate the "lifetime of a server IPC endpoint" such that
  eventually we can track and managed per-protocol/address/`.listen_on()`-call
  scoped starts/stops/restarts for the purposes of filtering/banning
  peer traffic.
  |_ also included is an unused `.peer_tpts` table which we can
    hopefully use to replace `Actor._peers` in a `Channel`-tracking
    transport-proto-aware way!

Surrounding changes to `.ipc.*` primitives to match,
- make `[TCP|UDS]Address` types `msgspec.Struct(frozen=True)` and thus
  drop any-and-all `addr._host =` style mutation throughout.
  |_ as such also drop their `.__init__()` and `.__eq__()` meths.
  |_ UDS tweaks to field names and thus `.__repr__()`.
- move `[TCP|UDS]Address.[start/close]_listener()` meths to be mod-level
  equiv `start|close_listener()` funcs.
- just hard code the `.ipc._types._key_to_transport/._addr_to_transport`
  table entries instead of all the prior fancy dynamic class property
  reading stuff (remember, "explicit is better then implicit").

Modified in `._runtime.Actor` internals,
- drop the `._serve_forever()` and `.cancel_server()`, methods and
  `._server_down` waiting logic from `.cancel_soon()`
- add `.[_]ipc_server` which is opened just after the `._service_n` and
  delegate to it for any equivalent publicly exposed instance
  attributes/properties.
2025-07-08 12:57:29 -04:00
Tyler Goodlet 252a325bb6 Move concrete `Address`es to each tpt module
That is moving from `._addr`,
- `TCPAddress` to `.ipc._tcp`
- `UDSAddress` to `.ipc._uds`

Obviously this requires adjusting a buncha stuff in `._addr` to avoid
import cycles (the original reason the module was not also included in
the new `.ipc` subpkg) including,

- avoiding "unnecessary" imports of `[Unwrapped]Address` in various modules.
  * since `Address` is a protocol and the main point is that it **does
    not need to be inherited** per
    (https://typing.python.org/en/latest/spec/protocol.html#terminology)
    thus I removed the need for it in both transport submods.
  * and `UnwrappedAddress` is a type alias for tuples.. so we don't
    really always need to be importing it since it also kinda obfuscates
    what the underlying pairs are.
- not exporting everything in submods at the `.ipc` top level and
  importing from specific submods by default.
- only importing various types under a `if typing.TYPE_CHECKING:` guard
  as needed.
2025-07-08 12:57:29 -04:00
Tyler Goodlet 4621c528ac Add API-modernize-todo on `experimental._pubsub.fan_out_to_ctxs` 2025-07-08 12:57:29 -04:00
Tyler Goodlet ddb299488a Skip the ringbuf test mod for now since data-gen is a bit "heavy/laggy" atm 2025-07-08 12:57:29 -04:00
Tyler Goodlet 9f3702b64c Improve `TransportClosed.__repr__()`, add `src_exc`
By borrowing from the implementation of `RemoteActorError.pformat()`
which is now factored into a new `.devx.pformat_exc()` and re-used for
both error types while maintaining the same func-sig. Obviously delegate
`RemoteActorError.pformat()` to the new helper accordingly and keeping
the prior `body` generation from `.devx.pformat_boxed_tb()` as before.

The new helper allows for,
- passing any of a `header|message|body: str` which are all combined in
  that order in the final output.
- getting the `exc.message` as the default `message` part.
- generating an objecty-looking "type-name" header to be rendered by
  default when `header` is not overridden.
- "first-line-of `message`" processing which we split-off and then
  re-inject as a `f'<{type(exc).__name__}( {first} )>'` top line header.
- an optional `tail: str = '>'` to "close the object"-look only added
  when `with_type_header: bool = True`.

Adjustments to `TransportClosed` around this include,
- replacing the init `cause` arg for a `src_exc` which is now always
  assigned to a same named instance var.
- displaying that new `.src_exc` in the `body: str` arg to the
  `.devx.pformat.pformat_exc()` call so you can always see the
  underlying (normally `trio`) source error.
- just make it inherit from `Exception` not `trio.BrokenResourceError`
  to avoid handlers catching `TransportClosed` as the former
  particularly in testing when we want to sometimes to distinguish them.
2025-07-08 12:57:29 -04:00
Tyler Goodlet 2361a5f47f Handle unconsidered fault-edge cases for UDS
In `tests/test_advanced_faults.py` that is.
Since instead of zero-responses like we'd expect from a network-socket
we actually can get a few differences from the OS when "everything IPC
is known"

XD

Namely it's about underlying `trio` exceptions versus how we wrap them
and how we expect to box them. A `TransportClosed` boxing improvement
is coming in follow up btw to make this all work!

B)
2025-07-08 12:57:29 -04:00
Tyler Goodlet f2595c45d0 Woops, ensure we use `global` before setting `daemon()` fixture spawn delay.. 2025-07-08 12:57:29 -04:00
Tyler Goodlet 8d72423bb4 Support multiple IPC transports in test harness!
Via a new accumulative `--tpt-proto` arg you can select which
`tpt_protos: list[str]`-fixture protocol keys will be delivered to
opting in tests!

B)

Also includes,
- CLI quote handling/stripping.
- default of 'tcp'.
- only support one selection per session at the moment (until we figure
  out how we want to support multiples, either simultaneously or
  sequentially).
- draft a (masked) dynamic-`metafunc` parametrization in the
  `pytest_generate_tests()` hook.
- first proven and working use in the `test_advanced_faults`-suite (and
  thus its underlying
  `examples/advanced_faults/ipc_failure_during_stream.py` script)!
 |_ actually needed this to prove that the suite only has 2 failures on
    'uds' seemingly due to low-level `trio` error semantics translation
    differences to do with with calling `socket.close()`..

On a very nearly related topic,
- draft an (also commented out) `set_script_runtime_args()` fixture idea
  for a std way of `partial`-ling in runtime args to `examples/`
  scripts-as-modules defining a `main()` which would proxy to
  `tractor.open_nursery()`.
2025-07-08 12:57:29 -04:00
Tyler Goodlet e3232aed30 Unwrap `UDSAddress` as `tuple[str, str]`, i.e. sin pid
Since in hindsight the real analog of a net-proto's "bindspace"
(normally its routing layer's addresses-port-set) is more akin to the
"location in the file-system" for a UDS socket file (aka the file's
parent directory) determines whether or not the "port" (aka it's
file-name) collides with any other.

So the `._filedir: Path` is like the allocated "address" and,
the `._filename: Path|str` is basically the "port",

at least in my mind.. Bp

Thinking about fs dirs like a "host address" means you can get
essentially the same benefits/behaviour of say an (ip)
addresses-port-space but using the (current process-namespace's)
filesys-tree. Note that for UDS sockets in particular the
network-namespace is what would normally isolate so called "abstract
sockets" (i.e. UDS sockets that do NOT use file-paths by setting `struct
sockaddr_un.sun_path = 'abstract', see `man unix`); using directories is
even easier and definitely more explicit/readable/immediately-obvious as
a human-user.

As such this reworks all the necessary `UDSAddress` meths,
- `.unwrap()` now returns a `tuple(str(._filedir, str(._filename))`,
- `wrap_address()` now matches UDS on a 2nd tuple `str()` element,
- `.get_root()` no longer passes `maybe_pid`.

AND adjusts `MsgpackUDSStream` to,
- use the new `unwrap_sockpath()` on the `socket.get[sock/peer]name()`
  output before passing directly as `UDSAddress.__init__(filedir, filename)`
  instead of via `.from_addr()`.
- also pass `maybe_pid`s to init since no longer included in the
  unwrapped-type form.
2025-07-08 12:57:29 -04:00
Tyler Goodlet 1a591208f5 s/`._addr.preferred_transport`/`_state._def_tpt_proto`
Such that the "global-ish" setting (actor-local) is managed with the
others per actor-process and type it as a `Literal['tcp', 'uds']` of the
currently support protocol keys.

Here obvi `_tpt` is some kinda shorthand for "transport" and `_proto` is
for "protocol" Bp

Change imports and refs in all dependent modules.

Oh right, and disable UDS in `wrap_address()` for the moment while
i figure out how to avoid the unwrapped type collision..
2025-07-08 12:57:29 -04:00
Tyler Goodlet 1efe5fa893 Add `Arbiter.is_registry()` in prep for proper `.discovery._registry` 2025-07-08 12:57:29 -04:00
Tyler Goodlet 9579fa35a5 Repair weird spawn test, start `test_root_runtime`
There was a very strange legacy test
`test_spawning.test_local_arbiter_subactor_global_state` which was
causing unforseen hangs/errors on the UDS tpt and looking deeper this
test was already doing root-actor things that should never have been
valid XD

So rework that test to properly demonstrate something of value
(i guess..) and add a new suite which start more rigorously auditing our
`open_root_actor()` permitted usage.

For the old test,
- since the main point of this test seemed to be the ability to invoke
  the same function in both the parent and child actor (using the very
  legacy `ActorNursery.run_in_actor()`.. due to be deprecated) rename it
  to `test_run_in_actor_same_func_in_child`,
- don't re-enter `.open_root_actor()` since that's invalid usage (tested
  in new suite see below),
- adjust some `spawn()` arg/var naming and ensure we only return in the
  child.

For the new suite add tests for,
- ensuring the implicit `open_root_actor()` call under `open_nursery()`.
- double open of `open_root_actor()` from within the same process tree
  both from a root and sub.

Intro some new `_exceptions` used in the new suite,
- a top level `RuntimeFailure` for generically expressing faults not of
  our own doing that prevent successful operation; this is what we now
  (changed in this commit) raise on attempts to open a 2nd root.
- mk `ActorFailure` derive from the former; it's already used from
  `._spawn` when subprocs fail to boot.
2025-07-08 12:57:29 -04:00
Tyler Goodlet e4f5aac290 Some more log message tweaks
- aggregate the `MsgStream.aclose()` "reader tasks" stats content into a
  common `message: str` before emit.
- tweak an `_rpc.process_messages()` emit per new `Channel.__repr__()`.
2025-07-08 12:57:29 -04:00
Tyler Goodlet f20168b7e8 Change some low-hanging `.uid`s to `.aid`
Throughout `_context` and `_spawn` where it causes no big disruption.
Still lots to work out for things like how to pass `--uid
<tuple-as-str>` to spawned subactors and whether we want a diff name for
the minimum `tuple` required to distinguish a subactor pre-process-ID
allocation by the OS.
2025-07-08 12:57:29 -04:00
Tyler Goodlet 993c745729 Mv to `Channel._do_handshake()` in `open_portal()`
As per the method migration in the last commit. Also adjust all `.uid`
usage to the new `.aid`.
2025-07-08 12:57:29 -04:00
Tyler Goodlet 844525ed34 Mv `Actor._do_handshake()` to `Channel`, add `.aid`
Finally.. i've been meaning todo this for ages since the
actor-id-swap-as-handshake is better layered as part of the IPC msg-ing
machinery and then let's us encapsulate the connection-time-assignment
of a remote peer's `Aid` as a new `Channel.aid: Aid`. For now we
continue to offer the `.uid: tuple[str, str]` attr (by delegating to the
`.uid` field) since there's still a few things relying on it in the
runtime and ctx layers

Nice bonuses from this,
- it's very easy to get the peer's `Aid.pid: int` from anywhere in an
  IPC ctx by just reading it from the chan.
- we aren't saving more then the wire struct-msg received.

Also add deprecation warnings around usage to get us moving on porting
the rest of consuming runtime code to the new attr!
2025-07-08 12:57:29 -04:00
Tyler Goodlet 9abd306860 UDS: translate file dne to connection-error
For the case where there's clearly no socket file created/bound
obviously the `trio.socket.connect()` call will raise
`FileNotFoundError`, so just translate this to
a builtin-`ConnectionError` at the transport layer so we can report the
guilty `UDSAddress`.
2025-07-08 12:57:29 -04:00
Tyler Goodlet 1d113e3ae5 More `._addr` boxing refinements
The more I think about it, it seems @guille's orig approach of
unwrapping UDS socket-file addresses to strings (or `Path`) is making
the most sense. I had originally thought that pairing it with the
listening side's pid would add clarity (and it definitely does for
introspection/debug/logging) but since we don't end up passing that pid
to the eventual `.connect()` call on the client side, it doesn't make
much sense to wrap it for the wire just to discard.. Further, the
`tuple[str, int]` makes `wrap_address()` break for TCP since it will
always match on uds first.

So, on that note this patch refines a few things in prep for going back
to that original `UnwrappedAddress` as `str` type though longer run
i think the more "builtin approach" would be to add `msgspec` codec
hooks for these types to avoid all the `.wrap()`/`.unwrap()` calls
throughout the runtime.

Down-low deats,
- add `wrap_address()` doc string, detailed (todo) comments and handle
  the `[None, None]` case that can come directly from
  `._state._runtime_vars['_root_mailbox']`.
- buncha adjustments to `UDSAddress`,
  - add a `filedir`, chng `filepath` -> `filename` and mk `maybe_pid` optional.
  - the intent `filedir` is act as the equivalent of the host part in a network proto's
    socket address and when it's null use the `.def_bindspace = get_rt_dir()`.
  - always ensure the `filedir / filename` is an absolute path and
    expose it as a new `.sockpath: Path` property.
  - mk `.is_valid` actually verify the `.sockpath` is in the valid
    `.bindspace: namely just checking it's in the expected dir.
  - add pedantic `match:`ing to `.from_addr()` such that we error on
    unexpected `type(addr)` inputs and otherwise parse any `sockpath:
    Path` inputs using a new `unwrap_sockpath()` which simply splits an
    abs file path to dir, file-name parts.
  - `.unwrap()` now just `str`-ifies the `.sockpath: Path`
  - adjust `.open/close_listener()` to use `.sockpath`.
2025-07-08 12:57:29 -04:00
Tyler Goodlet c911d224ca Move `DebugRequestError` to `._exceptions` 2025-07-08 12:57:29 -04:00
Tyler Goodlet 89b16f27de Start protoyping multi-transport testing
Such that we can run (opting-in) tests on both TCP and UDS backends and
ensure the `reg_addr` fixture and various timeouts are adjusted
accordingly.

Impl deats,
- add a new `tpc_proto` CLI option and fixture to allow choosing which
  "transport protocol" will be used in the test suites (either globally
  or contextually).
- rm `_reg_addr` instead opting for a `_rando_port` which will only be
  used for `reg_addr`s which are net-tpt-protos.
- rejig `reg_addr` fixture to set a ideally session-unique `testrun_reg_addr`
  based on the `tpt_proto` setting making appropriate calls to `._addr`
  APIs as needed.
- refine `daemon` fixture a bit with typing, `tpt_proto` timings, and
  stderr capture.
- in `test_discovery` do a ton of type-annots, add `debug_mode` fixture
  opt ins, augment `spawn_and_check_registry()` with `psutil.Process`
  passing for introspection (when things go wrong..).
2025-07-08 12:57:29 -04:00
35 changed files with 2842 additions and 1298 deletions

View File

@ -120,6 +120,7 @@ async def main(
break_parent_ipc_after: int|bool = False,
break_child_ipc_after: int|bool = False,
pre_close: bool = False,
tpt_proto: str = 'tcp',
) -> None:
@ -131,6 +132,7 @@ async def main(
# a hang since it never engages due to broken IPC
debug_mode=debug_mode,
loglevel=loglevel,
enable_transports=[tpt_proto],
) as an,
):
@ -145,7 +147,8 @@ async def main(
_testing.expect_ctxc(
yay=(
break_parent_ipc_after
or break_child_ipc_after
or
break_child_ipc_after
),
# TODO: we CAN'T remove this right?
# since we need the ctxc to bubble up from either

View File

@ -1,6 +1,8 @@
"""
``tractor`` testing!!
Top level of the testing suites!
"""
from __future__ import annotations
import sys
import subprocess
import os
@ -30,7 +32,11 @@ else:
_KILL_SIGNAL = signal.SIGKILL
_INT_SIGNAL = signal.SIGINT
_INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value
_PROC_SPAWN_WAIT = 0.6 if sys.version_info < (3, 7) else 0.4
_PROC_SPAWN_WAIT = (
0.6
if sys.version_info < (3, 7)
else 0.4
)
no_windows = pytest.mark.skipif(
@ -39,7 +45,9 @@ no_windows = pytest.mark.skipif(
)
def pytest_addoption(parser):
def pytest_addoption(
parser: pytest.Parser,
):
parser.addoption(
"--ll",
action="store",
@ -56,7 +64,8 @@ def pytest_addoption(parser):
)
parser.addoption(
"--tpdb", "--debug-mode",
"--tpdb",
"--debug-mode",
action="store_true",
dest='tractor_debug_mode',
# default=False,
@ -67,6 +76,17 @@ def pytest_addoption(parser):
),
)
# provide which IPC transport protocols opting-in test suites
# should accumulatively run against.
parser.addoption(
"--tpt-proto",
nargs='+', # accumulate-multiple-args
action="store",
dest='tpt_protos',
default=['tcp'],
help="Transport protocol to use under the `tractor.ipc.Channel`",
)
def pytest_configure(config):
backend = config.option.spawn_backend
@ -74,7 +94,7 @@ def pytest_configure(config):
@pytest.fixture(scope='session')
def debug_mode(request):
def debug_mode(request) -> bool:
debug_mode: bool = request.config.option.tractor_debug_mode
# if debug_mode:
# breakpoint()
@ -95,11 +115,43 @@ def spawn_backend(request) -> str:
return request.config.option.spawn_backend
# @pytest.fixture(scope='function', autouse=True)
# def debug_enabled(request) -> str:
# from tractor import _state
# if _state._runtime_vars['_debug_mode']:
# breakpoint()
@pytest.fixture(scope='session')
def tpt_protos(request) -> list[str]:
# allow quoting on CLI
proto_keys: list[str] = [
proto_key.replace('"', '').replace("'", "")
for proto_key in request.config.option.tpt_protos
]
# ?TODO, eventually support multiple protos per test-sesh?
if len(proto_keys) > 1:
pytest.fail(
'We only support one `--tpt-proto <key>` atm!\n'
)
# XXX ensure we support the protocol by name via lookup!
for proto_key in proto_keys:
addr_type = tractor._addr._address_types[proto_key]
assert addr_type.proto_key == proto_key
yield proto_keys
@pytest.fixture(
scope='session',
autouse=True,
)
def tpt_proto(
tpt_protos: list[str],
) -> str:
proto_key: str = tpt_protos[0]
from tractor import _state
if _state._def_tpt_proto != proto_key:
_state._def_tpt_proto = proto_key
# breakpoint()
yield proto_key
_ci_env: bool = os.environ.get('CI', False)
@ -107,7 +159,7 @@ _ci_env: bool = os.environ.get('CI', False)
@pytest.fixture(scope='session')
def ci_env() -> bool:
'''
Detect CI envoirment.
Detect CI environment.
'''
return _ci_env
@ -115,30 +167,45 @@ def ci_env() -> bool:
# TODO: also move this to `._testing` for now?
# -[ ] possibly generalize and re-use for multi-tree spawning
# along with the new stuff for multi-addrs in distribute_dis
# branch?
# along with the new stuff for multi-addrs?
#
# choose randomly at import time
_reg_addr: tuple[str, int] = (
'127.0.0.1',
random.randint(1000, 9999),
)
# choose random port at import time
_rando_port: str = random.randint(1000, 9999)
@pytest.fixture(scope='session')
def reg_addr() -> tuple[str, int]:
def reg_addr(
tpt_proto: str,
) -> tuple[str, int|str]:
# globally override the runtime to the per-test-session-dynamic
# addr so that all tests never conflict with any other actor
# tree using the default.
from tractor import _root
_root._default_lo_addrs = [_reg_addr]
from tractor import (
_addr,
)
addr_type = _addr._address_types[tpt_proto]
def_reg_addr: tuple[str, int] = _addr._default_lo_addrs[tpt_proto]
return _reg_addr
testrun_reg_addr: tuple[str, int]
match tpt_proto:
case 'tcp':
testrun_reg_addr = (
addr_type.def_bindspace,
_rando_port,
)
# NOTE, file-name uniqueness (no-collisions) will be based on
# the runtime-directory and root (pytest-proc's) pid.
case 'uds':
testrun_reg_addr = addr_type.get_random().unwrap()
assert def_reg_addr != testrun_reg_addr
return testrun_reg_addr
def pytest_generate_tests(metafunc):
spawn_backend = metafunc.config.option.spawn_backend
spawn_backend: str = metafunc.config.option.spawn_backend
if not spawn_backend:
# XXX some weird windows bug with `pytest`?
@ -151,45 +218,53 @@ def pytest_generate_tests(metafunc):
'trio',
)
# NOTE: used to be used to dyanmically parametrize tests for when
# NOTE: used-to-be-used-to dyanmically parametrize tests for when
# you just passed --spawn-backend=`mp` on the cli, but now we expect
# that cli input to be manually specified, BUT, maybe we'll do
# something like this again in the future?
if 'start_method' in metafunc.fixturenames:
metafunc.parametrize("start_method", [spawn_backend], scope='module')
metafunc.parametrize(
"start_method",
[spawn_backend],
scope='module',
)
# TODO, parametrize any `tpt_proto: str` declaring tests!
# proto_tpts: list[str] = metafunc.config.option.proto_tpts
# if 'tpt_proto' in metafunc.fixturenames:
# metafunc.parametrize(
# 'tpt_proto',
# proto_tpts, # TODO, double check this list usage!
# scope='module',
# )
# TODO: a way to let test scripts (like from `examples/`)
# guarantee they won't registry addr collide!
# @pytest.fixture
# def open_test_runtime(
# reg_addr: tuple,
# ) -> AsyncContextManager:
# return partial(
# tractor.open_nursery,
# registry_addrs=[reg_addr],
# )
def sig_prog(proc, sig):
def sig_prog(
proc: subprocess.Popen,
sig: int,
canc_timeout: float = 0.1,
) -> int:
"Kill the actor-process with ``sig``."
proc.send_signal(sig)
time.sleep(0.1)
time.sleep(canc_timeout)
if not proc.poll():
# TODO: why sometimes does SIGINT not work on teardown?
# seems to happen only when trace logging enabled?
proc.send_signal(_KILL_SIGNAL)
ret = proc.wait()
ret: int = proc.wait()
assert ret
# TODO: factor into @cm and move to `._testing`?
@pytest.fixture
def daemon(
debug_mode: bool,
loglevel: str,
testdir,
reg_addr: tuple[str, int],
):
tpt_proto: str,
) -> subprocess.Popen:
'''
Run a daemon root actor as a separate actor-process tree and
"remote registrar" for discovery-protocol related tests.
@ -200,28 +275,100 @@ def daemon(
loglevel: str = 'info'
code: str = (
"import tractor; "
"tractor.run_daemon([], registry_addrs={reg_addrs}, loglevel={ll})"
"import tractor; "
"tractor.run_daemon([], "
"registry_addrs={reg_addrs}, "
"debug_mode={debug_mode}, "
"loglevel={ll})"
).format(
reg_addrs=str([reg_addr]),
ll="'{}'".format(loglevel) if loglevel else None,
debug_mode=debug_mode,
)
cmd: list[str] = [
sys.executable,
'-c', code,
]
# breakpoint()
kwargs = {}
if platform.system() == 'Windows':
# without this, tests hang on windows forever
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
proc = testdir.popen(
proc: subprocess.Popen = testdir.popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs,
)
assert not proc.returncode
# UDS sockets are **really** fast to bind()/listen()/connect()
# so it's often required that we delay a bit more starting
# the first actor-tree..
if tpt_proto == 'uds':
global _PROC_SPAWN_WAIT
_PROC_SPAWN_WAIT = 0.6
time.sleep(_PROC_SPAWN_WAIT)
assert not proc.returncode
yield proc
sig_prog(proc, _INT_SIGNAL)
# XXX! yeah.. just be reaaal careful with this bc sometimes it
# can lock up on the `_io.BufferedReader` and hang..
stderr: str = proc.stderr.read().decode()
if stderr:
print(
f'Daemon actor tree produced STDERR:\n'
f'{proc.args}\n'
f'\n'
f'{stderr}\n'
)
if proc.returncode != -2:
raise RuntimeError(
'Daemon actor tree failed !?\n'
f'{proc.args}\n'
)
# @pytest.fixture(autouse=True)
# def shared_last_failed(pytestconfig):
# val = pytestconfig.cache.get("example/value", None)
# breakpoint()
# if val is None:
# pytestconfig.cache.set("example/value", val)
# return val
# TODO: a way to let test scripts (like from `examples/`)
# guarantee they won't `registry_addrs` collide!
# -[ ] maybe use some kinda standard `def main()` arg-spec that
# we can introspect from a fixture that is called from the test
# body?
# -[ ] test and figure out typing for below prototype! Bp
#
# @pytest.fixture
# def set_script_runtime_args(
# reg_addr: tuple,
# ) -> Callable[[...], None]:
# def import_n_partial_in_args_n_triorun(
# script: Path, # under examples?
# **runtime_args,
# ) -> Callable[[], Any]: # a `partial`-ed equiv of `trio.run()`
# # NOTE, below is taken from
# # `.test_advanced_faults.test_ipc_channel_break_during_stream`
# mod: ModuleType = import_path(
# examples_dir() / 'advanced_faults'
# / 'ipc_failure_during_stream.py',
# root=examples_dir(),
# consider_namespace_packages=False,
# )
# return partial(
# trio.run,
# partial(
# mod.main,
# **runtime_args,
# )
# )
# return import_n_partial_in_args_n_triorun

View File

@ -0,0 +1,4 @@
'''
`tractor.ipc` subsystem(s)/unit testing suites.
'''

View File

@ -0,0 +1,72 @@
'''
High-level `.ipc._server` unit tests.
'''
from __future__ import annotations
import pytest
import trio
from tractor import (
devx,
ipc,
log,
)
from tractor._testing.addr import (
get_rando_addr,
)
# TODO, use/check-roundtripping with some of these wrapper types?
#
# from .._addr import Address
# from ._chan import Channel
# from ._transport import MsgTransport
# from ._uds import UDSAddress
# from ._tcp import TCPAddress
@pytest.mark.parametrize(
'_tpt_proto',
['uds', 'tcp']
)
def test_basic_ipc_server(
_tpt_proto: str,
debug_mode: bool,
loglevel: str,
):
# so we see the socket-listener reporting on console
log.get_console_log("INFO")
rando_addr: tuple = get_rando_addr(
tpt_proto=_tpt_proto,
)
async def main():
async with ipc._server.open_ipc_server() as server:
assert (
server._parent_tn
and
server._parent_tn is server._stream_handler_tn
)
assert server._no_more_peers.is_set()
eps: list[ipc.IPCEndpoint] = await server.listen_on(
accept_addrs=[rando_addr],
stream_handler_nursery=None,
)
assert (
len(eps) == 1
and
(ep := eps[0])._listener
and
not ep.peer_tpts
)
server._parent_tn.cancel_scope.cancel()
# !TODO! actually make a bg-task connection from a client
# using `ipc._chan._connect_chan()`
with devx.maybe_open_crash_handler(
pdb=debug_mode,
):
trio.run(main)

View File

@ -10,6 +10,9 @@ import pytest
from _pytest.pathlib import import_path
import trio
import tractor
from tractor import (
TransportClosed,
)
from tractor._testing import (
examples_dir,
break_ipc,
@ -74,6 +77,7 @@ def test_ipc_channel_break_during_stream(
spawn_backend: str,
ipc_break: dict|None,
pre_aclose_msgstream: bool,
tpt_proto: str,
):
'''
Ensure we can have an IPC channel break its connection during
@ -91,7 +95,7 @@ def test_ipc_channel_break_during_stream(
# non-`trio` spawners should never hit the hang condition that
# requires the user to do ctl-c to cancel the actor tree.
# expect_final_exc = trio.ClosedResourceError
expect_final_exc = tractor.TransportClosed
expect_final_exc = TransportClosed
mod: ModuleType = import_path(
examples_dir() / 'advanced_faults'
@ -104,6 +108,8 @@ def test_ipc_channel_break_during_stream(
# period" wherein the user eventually hits ctl-c to kill the
# root-actor tree.
expect_final_exc: BaseException = KeyboardInterrupt
expect_final_cause: BaseException|None = None
if (
# only expect EoC if trans is broken on the child side,
ipc_break['break_child_ipc_after'] is not False
@ -138,6 +144,9 @@ def test_ipc_channel_break_during_stream(
# a user sending ctl-c by raising a KBI.
if pre_aclose_msgstream:
expect_final_exc = KeyboardInterrupt
if tpt_proto == 'uds':
expect_final_exc = TransportClosed
expect_final_cause = trio.BrokenResourceError
# XXX OLD XXX
# if child calls `MsgStream.aclose()` then expect EoC.
@ -157,6 +166,10 @@ def test_ipc_channel_break_during_stream(
if pre_aclose_msgstream:
expect_final_exc = KeyboardInterrupt
if tpt_proto == 'uds':
expect_final_exc = TransportClosed
expect_final_cause = trio.BrokenResourceError
# NOTE when the parent IPC side dies (even if the child does as well
# but the child fails BEFORE the parent) we always expect the
# IPC layer to raise a closed-resource, NEVER do we expect
@ -169,8 +182,8 @@ def test_ipc_channel_break_during_stream(
and
ipc_break['break_child_ipc_after'] is False
):
# expect_final_exc = trio.ClosedResourceError
expect_final_exc = tractor.TransportClosed
expect_final_cause = trio.ClosedResourceError
# BOTH but, PARENT breaks FIRST
elif (
@ -181,8 +194,8 @@ def test_ipc_channel_break_during_stream(
ipc_break['break_parent_ipc_after']
)
):
# expect_final_exc = trio.ClosedResourceError
expect_final_exc = tractor.TransportClosed
expect_final_cause = trio.ClosedResourceError
with pytest.raises(
expected_exception=(
@ -198,6 +211,7 @@ def test_ipc_channel_break_during_stream(
start_method=spawn_backend,
loglevel=loglevel,
pre_close=pre_aclose_msgstream,
tpt_proto=tpt_proto,
**ipc_break,
)
)
@ -220,10 +234,15 @@ def test_ipc_channel_break_during_stream(
)
cause: Exception = tc.__cause__
assert (
type(cause) is trio.ClosedResourceError
and
cause.args[0] == 'another task closed this fd'
# type(cause) is trio.ClosedResourceError
type(cause) is expect_final_cause
# TODO, should we expect a certain exc-message (per
# tpt) as well??
# and
# cause.args[0] == 'another task closed this fd'
)
raise
# get raw instance from pytest wrapper

View File

@ -7,7 +7,9 @@ import platform
from functools import partial
import itertools
import psutil
import pytest
import subprocess
import tractor
from tractor._testing import tractor_test
import trio
@ -152,13 +154,23 @@ async def unpack_reg(actor_or_portal):
async def spawn_and_check_registry(
reg_addr: tuple,
use_signal: bool,
debug_mode: bool = False,
remote_arbiter: bool = False,
with_streaming: bool = False,
maybe_daemon: tuple[
subprocess.Popen,
psutil.Process,
]|None = None,
) -> None:
if maybe_daemon:
popen, proc = maybe_daemon
# breakpoint()
async with tractor.open_root_actor(
registry_addrs=[reg_addr],
debug_mode=debug_mode,
):
async with tractor.get_registry(reg_addr) as portal:
# runtime needs to be up to call this
@ -176,11 +188,11 @@ async def spawn_and_check_registry(
extra = 2 # local root actor + remote arbiter
# ensure current actor is registered
registry = await get_reg()
registry: dict = await get_reg()
assert actor.uid in registry
try:
async with tractor.open_nursery() as n:
async with tractor.open_nursery() as an:
async with trio.open_nursery(
strict_exception_groups=False,
) as trion:
@ -189,17 +201,17 @@ async def spawn_and_check_registry(
for i in range(3):
name = f'a{i}'
if with_streaming:
portals[name] = await n.start_actor(
portals[name] = await an.start_actor(
name=name, enable_modules=[__name__])
else: # no streaming
portals[name] = await n.run_in_actor(
portals[name] = await an.run_in_actor(
trio.sleep_forever, name=name)
# wait on last actor to come up
async with tractor.wait_for_actor(name):
registry = await get_reg()
for uid in n._children:
for uid in an._children:
assert uid in registry
assert len(portals) + extra == len(registry)
@ -232,6 +244,7 @@ async def spawn_and_check_registry(
@pytest.mark.parametrize('use_signal', [False, True])
@pytest.mark.parametrize('with_streaming', [False, True])
def test_subactors_unregister_on_cancel(
debug_mode: bool,
start_method,
use_signal,
reg_addr,
@ -248,6 +261,7 @@ def test_subactors_unregister_on_cancel(
spawn_and_check_registry,
reg_addr,
use_signal,
debug_mode=debug_mode,
remote_arbiter=False,
with_streaming=with_streaming,
),
@ -257,7 +271,8 @@ def test_subactors_unregister_on_cancel(
@pytest.mark.parametrize('use_signal', [False, True])
@pytest.mark.parametrize('with_streaming', [False, True])
def test_subactors_unregister_on_cancel_remote_daemon(
daemon,
daemon: subprocess.Popen,
debug_mode: bool,
start_method,
use_signal,
reg_addr,
@ -273,8 +288,13 @@ def test_subactors_unregister_on_cancel_remote_daemon(
spawn_and_check_registry,
reg_addr,
use_signal,
debug_mode=debug_mode,
remote_arbiter=True,
with_streaming=with_streaming,
maybe_daemon=(
daemon,
psutil.Process(daemon.pid)
),
),
)
@ -373,7 +393,7 @@ def test_close_channel_explicit(
@pytest.mark.parametrize('use_signal', [False, True])
def test_close_channel_explicit_remote_arbiter(
daemon,
daemon: subprocess.Popen,
start_method,
use_signal,
reg_addr,

View File

@ -100,16 +100,29 @@ async def streamer(
@acm
async def open_stream() -> Awaitable[tractor.MsgStream]:
async with tractor.open_nursery() as tn:
portal = await tn.start_actor('streamer', enable_modules=[__name__])
async with (
portal.open_context(streamer) as (ctx, first),
ctx.open_stream() as stream,
):
yield stream
try:
async with tractor.open_nursery() as an:
portal = await an.start_actor(
'streamer',
enable_modules=[__name__],
)
async with (
portal.open_context(streamer) as (ctx, first),
ctx.open_stream() as stream,
):
yield stream
await portal.cancel_actor()
print('CANCELLED STREAMER')
print('Cancelling streamer')
await portal.cancel_actor()
print('Cancelled streamer')
except Exception as err:
print(
f'`open_stream()` errored?\n'
f'{err!r}\n'
)
await tractor.pause(shield=True)
raise err
@acm
@ -132,19 +145,28 @@ async def maybe_open_stream(taskname: str):
yield stream
def test_open_local_sub_to_stream():
def test_open_local_sub_to_stream(
debug_mode: bool,
):
'''
Verify a single inter-actor stream can can be fanned-out shared to
N local tasks using ``trionics.maybe_open_context():``.
N local tasks using `trionics.maybe_open_context()`.
'''
timeout: float = 3.6 if platform.system() != "Windows" else 10
timeout: float = 3.6
if platform.system() == "Windows":
timeout: float = 10
if debug_mode:
timeout = 999
async def main():
full = list(range(1000))
async def get_sub_and_pull(taskname: str):
stream: tractor.MsgStream
async with (
maybe_open_stream(taskname) as stream,
):
@ -165,17 +187,27 @@ def test_open_local_sub_to_stream():
assert set(seq).issubset(set(full))
print(f'{taskname} finished')
with trio.fail_after(timeout):
with trio.fail_after(timeout) as cs:
# TODO: turns out this isn't multi-task entrant XD
# We probably need an indepotent entry semantic?
async with tractor.open_root_actor():
async with tractor.open_root_actor(
debug_mode=debug_mode,
):
async with (
trio.open_nursery() as nurse,
trio.open_nursery() as tn,
):
for i in range(10):
nurse.start_soon(get_sub_and_pull, f'task_{i}')
tn.start_soon(
get_sub_and_pull,
f'task_{i}',
)
await trio.sleep(0.001)
print('all consumer tasks finished')
if cs.cancelled_caught:
pytest.fail(
'Should NOT time out in `open_root_actor()` ?'
)
trio.run(main)

View File

@ -2,14 +2,20 @@ import time
import trio
import pytest
import tractor
from tractor.ipc import (
from tractor.ipc._ringbuf import (
open_ringbuf,
RBToken,
RingBuffSender,
RingBuffReceiver
)
from tractor._testing.samples import generate_sample_messages
from tractor._testing.samples import (
generate_sample_messages,
)
# in case you don't want to melt your cores, uncomment dis!
pytestmark = pytest.mark.skip
@tractor.context

View File

@ -0,0 +1,85 @@
'''
Runtime boot/init sanity.
'''
import pytest
import trio
import tractor
from tractor._exceptions import RuntimeFailure
@tractor.context
async def open_new_root_in_sub(
ctx: tractor.Context,
) -> None:
async with tractor.open_root_actor():
pass
@pytest.mark.parametrize(
'open_root_in',
['root', 'sub'],
ids='open_2nd_root_in={}'.format,
)
def test_only_one_root_actor(
open_root_in: str,
reg_addr: tuple,
debug_mode: bool
):
'''
Verify we specially fail whenever more then one root actor
is attempted to be opened within an already opened tree.
'''
async def main():
async with tractor.open_nursery() as an:
if open_root_in == 'root':
async with tractor.open_root_actor(
registry_addrs=[reg_addr],
):
pass
ptl: tractor.Portal = await an.start_actor(
name='bad_rooty_boi',
enable_modules=[__name__],
)
async with ptl.open_context(
open_new_root_in_sub,
) as (ctx, first):
pass
if open_root_in == 'root':
with pytest.raises(
RuntimeFailure
) as excinfo:
trio.run(main)
else:
with pytest.raises(
tractor.RemoteActorError,
) as excinfo:
trio.run(main)
assert excinfo.value.boxed_type is RuntimeFailure
def test_implicit_root_via_first_nursery(
reg_addr: tuple,
debug_mode: bool
):
'''
The first `ActorNursery` open should implicitly call
`_root.open_root_actor()`.
'''
async def main():
async with tractor.open_nursery() as an:
assert an._implicit_runtime_started
assert tractor.current_actor().aid.name == 'root'
trio.run(main)

View File

@ -2,6 +2,7 @@
Spawning basics
"""
from functools import partial
from typing import (
Any,
)
@ -12,74 +13,99 @@ import tractor
from tractor._testing import tractor_test
data_to_pass_down = {'doggy': 10, 'kitty': 4}
data_to_pass_down = {
'doggy': 10,
'kitty': 4,
}
async def spawn(
is_arbiter: bool,
should_be_root: bool,
data: dict,
reg_addr: tuple[str, int],
debug_mode: bool = False,
):
namespaces = [__name__]
await trio.sleep(0.1)
actor = tractor.current_actor(err_on_no_runtime=False)
async with tractor.open_root_actor(
arbiter_addr=reg_addr,
):
actor = tractor.current_actor()
assert actor.is_arbiter == is_arbiter
data = data_to_pass_down
if should_be_root:
assert actor is None # no runtime yet
async with (
tractor.open_root_actor(
arbiter_addr=reg_addr,
),
tractor.open_nursery() as an,
):
# now runtime exists
actor: tractor.Actor = tractor.current_actor()
assert actor.is_arbiter == should_be_root
if actor.is_arbiter:
async with tractor.open_nursery() as nursery:
# spawns subproc here
portal: tractor.Portal = await an.run_in_actor(
fn=spawn,
# forks here
portal = await nursery.run_in_actor(
spawn,
is_arbiter=False,
name='sub-actor',
data=data,
reg_addr=reg_addr,
enable_modules=namespaces,
)
# spawning args
name='sub-actor',
enable_modules=[__name__],
assert len(nursery._children) == 1
assert portal.channel.uid in tractor.current_actor()._peers
# be sure we can still get the result
result = await portal.result()
assert result == 10
return result
else:
return 10
# passed to a subactor-recursive RPC invoke
# of this same `spawn()` fn.
should_be_root=False,
data=data_to_pass_down,
reg_addr=reg_addr,
)
assert len(an._children) == 1
assert (
portal.channel.uid
in
tractor.current_actor().ipc_server._peers
)
# get result from child subactor
result = await portal.result()
assert result == 10
return result
else:
assert actor.is_arbiter == should_be_root
return 10
def test_local_arbiter_subactor_global_state(
reg_addr,
def test_run_in_actor_same_func_in_child(
reg_addr: tuple,
debug_mode: bool,
):
result = trio.run(
spawn,
True,
data_to_pass_down,
reg_addr,
partial(
spawn,
should_be_root=True,
data=data_to_pass_down,
reg_addr=reg_addr,
debug_mode=debug_mode,
)
)
assert result == 10
async def movie_theatre_question():
"""A question asked in a dark theatre, in a tangent
'''
A question asked in a dark theatre, in a tangent
(errr, I mean different) process.
"""
'''
return 'have you ever seen a portal?'
@tractor_test
async def test_movie_theatre_convo(start_method):
"""The main ``tractor`` routine.
"""
async with tractor.open_nursery(debug_mode=True) as n:
'''
The main ``tractor`` routine.
portal = await n.start_actor(
'''
async with tractor.open_nursery(debug_mode=True) as an:
portal = await an.start_actor(
'frank',
# enable the actor to run funcs from this current module
enable_modules=[__name__],
@ -118,8 +144,8 @@ async def test_most_beautiful_word(
with trio.fail_after(1):
async with tractor.open_nursery(
debug_mode=debug_mode,
) as n:
portal = await n.run_in_actor(
) as an:
portal = await an.run_in_actor(
cellar_door,
return_value=return_value,
name='some_linguist',

View File

@ -180,7 +180,8 @@ def test_acm_embedded_nursery_propagates_enter_err(
with tractor.devx.maybe_open_crash_handler(
pdb=debug_mode,
) as bxerr:
assert not bxerr.value
if bxerr:
assert not bxerr.value
async with (
wraps_tn_that_always_cancels() as tn,

View File

@ -14,33 +14,25 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from pathlib import Path
import os
# import tempfile
from uuid import uuid4
from typing import (
Protocol,
ClassVar,
# TypeVar,
# Union,
Type,
TYPE_CHECKING,
)
from bidict import bidict
# import trio
from trio import (
socket,
SocketListener,
open_tcp_listeners,
)
from .log import get_logger
from ._state import (
get_rt_dir,
current_actor,
is_root_process,
_def_tpt_proto,
)
from .ipc._tcp import TCPAddress
from .ipc._uds import UDSAddress
if TYPE_CHECKING:
from ._runtime import Actor
@ -178,245 +170,6 @@ class Address(Protocol):
...
class TCPAddress(Address):
proto_key: str = 'tcp'
unwrapped_type: type = tuple[str, int]
def_bindspace: str = '127.0.0.1'
def __init__(
self,
host: str,
port: int
):
if (
not isinstance(host, str)
or
not isinstance(port, int)
):
raise TypeError(
f'Expected host {host!r} to be str and port {port!r} to be int'
)
self._host: str = host
self._port: int = port
@property
def is_valid(self) -> bool:
return self._port != 0
@property
def bindspace(self) -> str:
return self._host
@property
def domain(self) -> str:
return self._host
@classmethod
def from_addr(
cls,
addr: tuple[str, int]
) -> TCPAddress:
return TCPAddress(addr[0], addr[1])
def unwrap(self) -> tuple[str, int]:
return (
self._host,
self._port,
)
@classmethod
def get_random(
cls,
current_actor: Actor,
bindspace: str = def_bindspace,
) -> TCPAddress:
return TCPAddress(bindspace, 0)
@classmethod
def get_root(cls) -> Address:
return TCPAddress(
'127.0.0.1',
1616,
)
def __repr__(self) -> str:
return (
f'{type(self).__name__}[{self.unwrap()}]'
)
def __eq__(self, other) -> bool:
if not isinstance(other, TCPAddress):
raise TypeError(
f'Can not compare {type(other)} with {type(self)}'
)
return (
self._host == other._host
and
self._port == other._port
)
async def open_listener(
self,
**kwargs,
) -> SocketListener:
listeners: list[SocketListener] = await open_tcp_listeners(
host=self._host,
port=self._port,
**kwargs
)
assert len(listeners) == 1
listener = listeners[0]
self._host, self._port = listener.socket.getsockname()[:2]
return listener
async def close_listener(self):
...
class UDSAddress(Address):
# TODO, maybe we should use better field and value
# -[x] really this is a `.protocol_key` not a "name" of anything.
# -[ ] consider a 'unix' proto-key instead?
# -[ ] need to check what other mult-transport frameworks do
# like zmq, nng, uri-spec et al!
proto_key: str = 'uds'
unwrapped_type: type = tuple[str, int]
def_bindspace: Path = get_rt_dir()
def __init__(
self,
filepath: str|Path,
maybe_pid: int,
# ^XXX, in the sense you can also pass
# a "non-real-world-process-id" such as is handy to represent
# our host-local default "port-like" key for the very first
# root actor to create a registry address.
):
self._filepath: Path = Path(filepath).absolute()
self._pid: int = maybe_pid
@property
def is_valid(self) -> bool:
'''
We block socket files not allocated under the runtime subdir.
'''
return self.bindspace in self._filepath.parents
@property
def bindspace(self) -> Path:
'''
We replicate the "ip-set-of-hosts" part of a UDS socket as
just the sub-directory in which we allocate socket files.
'''
return self.def_bindspace
@classmethod
def from_addr(
cls,
addr: tuple[Path, int]
) -> UDSAddress:
return UDSAddress(
filepath=addr[0],
maybe_pid=addr[1],
)
def unwrap(self) -> tuple[Path, int]:
return (
str(self._filepath),
# XXX NOTE, since this gets passed DIRECTLY to
# `open_unix_socket_w_passcred()` above!
self._pid,
)
@classmethod
def get_random(
cls,
bindspace: Path|None = None, # default netns
) -> UDSAddress:
bs: Path = bindspace or get_rt_dir()
pid: int = os.getpid()
actor: Actor|None = current_actor(
err_on_no_runtime=False,
)
if actor:
sockname: str = '::'.join(actor.uid) + f'@{pid}'
else:
prefix: str = '<unknown-actor>'
if is_root_process():
prefix: str = 'root'
sockname: str = f'{prefix}@{pid}'
sockpath: Path = Path(f'{bs}/{sockname}.sock')
return UDSAddress(
# filename=f'{tempfile.gettempdir()}/{uuid4()}.sock'
filepath=sockpath,
maybe_pid=pid,
)
@classmethod
def get_root(cls) -> Address:
def_uds_filepath: Path = (
get_rt_dir()
/
'registry@1616.sock'
)
return UDSAddress(
filepath=def_uds_filepath,
maybe_pid=1616
)
def __repr__(self) -> str:
return (
f'{type(self).__name__}'
f'['
f'({self._filepath}, {self._pid})'
f']'
)
def __eq__(self, other) -> bool:
if not isinstance(other, UDSAddress):
raise TypeError(
f'Can not compare {type(other)} with {type(self)}'
)
return self._filepath == other._filepath
# async def open_listener(self, **kwargs) -> SocketListener:
async def open_listener(
self,
**kwargs,
) -> SocketListener:
self._sock = socket.socket(
socket.AF_UNIX,
socket.SOCK_STREAM
)
log.info(
f'Attempting to bind UDS socket\n'
f'>[\n'
f'|_{self}\n'
)
await self._sock.bind(self._filepath)
self._sock.listen(1)
log.info(
f'Listening on UDS socket\n'
f'[>\n'
f' |_{self}\n'
)
return SocketListener(self._sock)
def close_listener(self):
self._sock.close()
os.unlink(self._filepath)
preferred_transport: str = 'uds'
_address_types: bidict[str, Type[Address]] = {
'tcp': TCPAddress,
'uds': UDSAddress
@ -455,29 +208,61 @@ def mk_uuid() -> str:
def wrap_address(
addr: UnwrappedAddress
) -> Address:
'''
Wrap an `UnwrappedAddress` as an `Address`-type based
on matching builtin python data-structures which we adhoc
use for each.
XXX NOTE, careful care must be placed to ensure
`UnwrappedAddress` cases are **definitely unique** otherwise the
wrong transport backend may be loaded and will break many
low-level things in our runtime in a not-fun-to-debug way!
XD
'''
if is_wrapped_addr(addr):
return addr
cls: Type|None = None
# if 'sock' in addr[0]:
# import pdbp; pdbp.set_trace()
match addr:
case (
str()|Path(),
int(),
):
cls = UDSAddress
case tuple() | list():
# classic network socket-address as tuple/list
case (
(str(), int())
|
[str(), int()]
):
cls = TCPAddress
case None:
cls: Type[Address] = get_address_cls(preferred_transport)
case (
# (str()|Path(), str()|Path()),
# ^TODO? uhh why doesn't this work!?
(_, filename)
) if type(filename) is str:
cls = UDSAddress
# likely an unset UDS or TCP reg address as defaulted in
# `_state._runtime_vars['_root_mailbox']`
#
# TODO? figure out when/if we even need this?
case (
None
|
[None, None]
):
cls: Type[Address] = get_address_cls(_def_tpt_proto)
addr: UnwrappedAddress = cls.get_root().unwrap()
case _:
# import pdbp; pdbp.set_trace()
raise TypeError(
f'Can not wrap address {type(addr)}\n'
f'{addr!r}\n'
f'Can not wrap unwrapped-address ??\n'
f'type(addr): {type(addr)!r}\n'
f'addr: {addr!r}\n'
)
return cls.from_addr(addr)

View File

@ -105,7 +105,7 @@ from ._state import (
if TYPE_CHECKING:
from ._portal import Portal
from ._runtime import Actor
from .ipc import MsgTransport
from .ipc._transport import MsgTransport
from .devx._frame_stack import (
CallerInfo,
)
@ -366,7 +366,7 @@ class Context:
# f' ---\n'
f' |_ipc: {self.dst_maddr}\n'
# f' dst_maddr{ds}{self.dst_maddr}\n'
f" uid{ds}'{self.chan.uid}'\n"
f" uid{ds}'{self.chan.aid}'\n"
f" cid{ds}'{self.cid}'\n"
# f' ---\n'
f'\n'
@ -945,10 +945,10 @@ class Context:
reminfo: str = (
# ' =>\n'
# f'Context.cancel() => {self.chan.uid}\n'
f'\n'
f'c)=> {self.chan.uid}\n'
# f'{self.chan.uid}\n'
f' |_ @{self.dst_maddr}\n'
f' >> {self.repr_rpc}\n'
f' |_[{self.dst_maddr}\n'
f' >>{self.repr_rpc}\n'
# f' >> {self._nsf}() -> {codec}[dict]:\n\n'
# TODO: pull msg-type from spec re #320
)

View File

@ -33,7 +33,6 @@ from .ipc import _connect_chan, Channel
from ._addr import (
UnwrappedAddress,
Address,
preferred_transport,
wrap_address
)
from ._portal import (
@ -44,10 +43,12 @@ from ._portal import (
from ._state import (
current_actor,
_runtime_vars,
_def_tpt_proto,
)
if TYPE_CHECKING:
from ._runtime import Actor
from .ipc._server import IPCServer
log = get_logger(__name__)
@ -79,7 +80,7 @@ async def get_registry(
)
else:
# TODO: try to look pre-existing connection from
# `Actor._peers` and use it instead?
# `IPCServer._peers` and use it instead?
async with (
_connect_chan(addr) as chan,
open_portal(chan) as regstr_ptl,
@ -111,7 +112,7 @@ def get_peer_by_name(
) -> list[Channel]|None: # at least 1
'''
Scan for an existing connection (set) to a named actor
and return any channels from `Actor._peers`.
and return any channels from `IPCServer._peers: dict`.
This is an optimization method over querying the registrar for
the same info.
@ -209,7 +210,7 @@ async def maybe_open_portal(
async def find_actor(
name: str,
registry_addrs: list[UnwrappedAddress]|None = None,
enable_transports: list[str] = [preferred_transport],
enable_transports: list[str] = [_def_tpt_proto],
only_first: bool = True,
raise_on_none: bool = False,

View File

@ -23,7 +23,6 @@ import builtins
import importlib
from pprint import pformat
from pdb import bdb
import sys
from types import (
TracebackType,
)
@ -72,8 +71,22 @@ log = get_logger('tractor')
_this_mod = importlib.import_module(__name__)
class ActorFailure(Exception):
"General actor failure"
class RuntimeFailure(RuntimeError):
'''
General `Actor`-runtime failure due to,
- a bad runtime-env,
- falied spawning (bad input to process),
- API usage.
'''
class ActorFailure(RuntimeFailure):
'''
`Actor` failed to boot before/after spawn
'''
class InternalError(RuntimeError):
@ -126,6 +139,12 @@ class TrioTaskExited(Exception):
'''
class DebugRequestError(RuntimeError):
'''
Failed to request stdio lock from root actor!
'''
# NOTE: more or less should be close to these:
# 'boxed_type',
# 'src_type',
@ -191,6 +210,8 @@ def get_err_type(type_name: str) -> BaseException|None:
):
return type_ref
return None
def pack_from_raise(
local_err: (
@ -521,7 +542,6 @@ class RemoteActorError(Exception):
if val:
_repr += f'{key}={val_str}{end_char}'
return _repr
def reprol(self) -> str:
@ -600,56 +620,9 @@ class RemoteActorError(Exception):
the type name is already implicitly shown by python).
'''
header: str = ''
body: str = ''
message: str = ''
# XXX when the currently raised exception is this instance,
# we do not ever use the "type header" style repr.
is_being_raised: bool = False
if (
(exc := sys.exception())
and
exc is self
):
is_being_raised: bool = True
with_type_header: bool = (
with_type_header
and
not is_being_raised
)
# <RemoteActorError( .. )> style
if with_type_header:
header: str = f'<{type(self).__name__}('
if message := self._message:
# split off the first line so, if needed, it isn't
# indented the same like the "boxed content" which
# since there is no `.tb_str` is just the `.message`.
lines: list[str] = message.splitlines()
first: str = lines[0]
message: str = message.removeprefix(first)
# with a type-style header we,
# - have no special message "first line" extraction/handling
# - place the message a space in from the header:
# `MsgTypeError( <message> ..`
# ^-here
# - indent the `.message` inside the type body.
if with_type_header:
first = f' {first} )>'
message: str = textwrap.indent(
message,
prefix=' '*2,
)
message: str = first + message
# IFF there is an embedded traceback-str we always
# draw the ascii-box around it.
body: str = ''
if tb_str := self.tb_str:
fields: str = self._mk_fields_str(
_body_fields
@ -670,21 +643,15 @@ class RemoteActorError(Exception):
boxer_header=self.relay_uid,
)
tail = ''
if (
with_type_header
and not message
):
tail: str = '>'
return (
header
+
message
+
f'{body}'
+
tail
# !TODO, it'd be nice to import these top level without
# cycles!
from tractor.devx.pformat import (
pformat_exc,
)
return pformat_exc(
exc=self,
with_type_header=with_type_header,
body=body,
)
__repr__ = pformat
@ -962,7 +929,7 @@ class StreamOverrun(
'''
class TransportClosed(trio.BrokenResourceError):
class TransportClosed(Exception):
'''
IPC transport (protocol) connection was closed or broke and
indicates that the wrapping communication `Channel` can no longer
@ -973,24 +940,39 @@ class TransportClosed(trio.BrokenResourceError):
self,
message: str,
loglevel: str = 'transport',
cause: BaseException|None = None,
src_exc: Exception|None = None,
raise_on_report: bool = False,
) -> None:
self.message: str = message
self._loglevel = loglevel
self._loglevel: str = loglevel
super().__init__(message)
if cause is not None:
self.__cause__ = cause
self._src_exc = src_exc
# set the cause manually if not already set by python
if (
src_exc is not None
and
not self.__cause__
):
self.__cause__ = src_exc
# flag to toggle whether the msg loop should raise
# the exc in its `TransportClosed` handler block.
self._raise_on_report = raise_on_report
@property
def src_exc(self) -> Exception:
return (
self.__cause__
or
self._src_exc
)
def report_n_maybe_raise(
self,
message: str|None = None,
hide_tb: bool = True,
) -> None:
'''
@ -998,9 +980,10 @@ class TransportClosed(trio.BrokenResourceError):
for this error.
'''
__tracebackhide__: bool = hide_tb
message: str = message or self.message
# when a cause is set, slap it onto the log emission.
if cause := self.__cause__:
if cause := self.src_exc:
cause_tb_str: str = ''.join(
traceback.format_tb(cause.__traceback__)
)
@ -1009,13 +992,86 @@ class TransportClosed(trio.BrokenResourceError):
f' {cause}\n' # exc repr
)
getattr(log, self._loglevel)(message)
getattr(
log,
self._loglevel
)(message)
# some errors we want to blow up from
# inside the RPC msg loop
if self._raise_on_report:
raise self from cause
@classmethod
def repr_src_exc(
self,
src_exc: Exception|None = None,
) -> str:
if src_exc is None:
return '<unknown>'
src_msg: tuple[str] = src_exc.args
src_exc_repr: str = (
f'{type(src_exc).__name__}[ {src_msg} ]'
)
return src_exc_repr
def pformat(self) -> str:
from tractor.devx.pformat import (
pformat_exc,
)
return pformat_exc(
exc=self,
)
# delegate to `str`-ified pformat
__repr__ = pformat
@classmethod
def from_src_exc(
cls,
src_exc: (
Exception|
trio.ClosedResource|
trio.BrokenResourceError
),
message: str,
body: str = '',
**init_kws,
) -> TransportClosed:
'''
Convenience constructor for creation from an underlying
`trio`-sourced async-resource/chan/stream error.
Embeds the original `src_exc`'s repr within the
`Exception.args` via a first-line-in-`.message`-put-in-header
pre-processing and allows inserting additional content beyond
the main message via a `body: str`.
'''
repr_src_exc: str = cls.repr_src_exc(
src_exc,
)
next_line: str = f' src_exc: {repr_src_exc}\n'
if body:
body: str = textwrap.indent(
body,
prefix=' '*2,
)
return TransportClosed(
message=(
message
+
next_line
+
body
),
src_exc=src_exc,
**init_kws,
)
class NoResult(RuntimeError):
"No final result is expected for this actor"

View File

@ -52,8 +52,8 @@ from .msg import (
Return,
)
from ._exceptions import (
# unpack_error,
NoResult,
TransportClosed,
)
from ._context import (
Context,
@ -175,7 +175,7 @@ class Portal:
# not expecting a "main" result
if self._expect_result_ctx is None:
log.warning(
f"Portal for {self.channel.uid} not expecting a final"
f"Portal for {self.channel.aid} not expecting a final"
" result?\nresult() should only be called if subactor"
" was spawned with `ActorNursery.run_in_actor()`")
return NoResult
@ -222,7 +222,7 @@ class Portal:
# IPC calls
if self._streams:
log.cancel(
f"Cancelling all streams with {self.channel.uid}")
f"Cancelling all streams with {self.channel.aid}")
for stream in self._streams.copy():
try:
await stream.aclose()
@ -267,7 +267,7 @@ class Portal:
return False
reminfo: str = (
f'c)=> {self.channel.uid}\n'
f'c)=> {self.channel.aid}\n'
f' |_{chan}\n'
)
log.cancel(
@ -305,14 +305,34 @@ class Portal:
return False
except (
# XXX, should never really get raised unless we aren't
# wrapping them in the below type by mistake?
#
# Leaving the catch here for now until we're very sure
# all the cases (for various tpt protos) have indeed been
# re-wrapped ;p
trio.ClosedResourceError,
trio.BrokenResourceError,
):
log.debug(
'IPC chan for actor already closed or broken?\n\n'
f'{self.channel.uid}\n'
TransportClosed,
) as tpt_err:
report: str = (
f'IPC chan for actor already closed or broken?\n\n'
f'{self.channel.aid}\n'
f' |_{self.channel}\n'
)
match tpt_err:
case TransportClosed():
log.debug(report)
case _:
report += (
f'\n'
f'Unhandled low-level transport-closed/error during\n'
f'Portal.cancel_actor()` request?\n'
f'<{type(tpt_err).__name__}( {tpt_err} )>\n'
)
log.warning(report)
return False
# TODO: do we still need this for low level `Actor`-runtime
@ -551,17 +571,18 @@ async def open_portal(
await channel.connect()
was_connected = True
if channel.uid is None:
await actor._do_handshake(channel)
if channel.aid is None:
await channel._do_handshake(
aid=actor.aid,
)
msg_loop_cs: trio.CancelScope|None = None
if start_msg_loop:
from ._runtime import process_messages
from . import _rpc
msg_loop_cs = await tn.start(
partial(
process_messages,
actor,
channel,
_rpc.process_messages,
chan=channel,
# if the local task is cancelled we want to keep
# the msg loop running until our block ends
shield=True,

View File

@ -56,11 +56,10 @@ from ._addr import (
UnwrappedAddress,
default_lo_addrs,
mk_uuid,
preferred_transport,
wrap_address,
)
from ._exceptions import (
ActorFailure,
RuntimeFailure,
is_multi_cancelled,
)
@ -139,6 +138,7 @@ async def maybe_block_bp(
os.environ.pop('PYTHONBREAKPOINT', None)
@acm
async def open_root_actor(
*,
@ -148,7 +148,11 @@ async def open_root_actor(
# defaults are above
arbiter_addr: tuple[UnwrappedAddress]|None = None,
enable_transports: list[str] = [preferred_transport],
enable_transports: list[
# TODO, this should eventually be the pairs as
# defined by (codec, proto) as on `MsgTransport.
_state.TransportProtocolKey,
]|None = None,
name: str|None = 'root',
@ -195,7 +199,7 @@ async def open_root_actor(
rtvs: dict[str, Any] = _state._runtime_vars
root_mailbox: list[str, int] = rtvs['_root_mailbox']
registry_addrs: list[list[str, int]] = rtvs['_registry_addrs']
raise ActorFailure(
raise RuntimeFailure(
f'A current actor already exists !?\n'
f'({already_actor}\n'
f'\n'
@ -211,6 +215,14 @@ async def open_root_actor(
debug_mode=debug_mode,
maybe_enable_greenback=maybe_enable_greenback,
):
if enable_transports is None:
enable_transports: list[str] = _state.current_ipc_protos()
# TODO! support multi-tpts per actor! Bo
assert (
len(enable_transports) == 1
), 'No multi-tpt support yet!'
_debug.hide_runtime_frames()
__tracebackhide__: bool = hide_tb

View File

@ -869,7 +869,6 @@ async def try_ship_error_to_remote(
async def process_messages(
actor: Actor,
chan: Channel,
shield: bool = False,
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
@ -907,6 +906,7 @@ async def process_messages(
(as utilized inside `Portal.cancel_actor()` ).
'''
actor: Actor = _state.current_actor()
assert actor._service_n # runtime state sanity
# TODO: once `trio` get's an "obvious way" for req/resp we
@ -1219,8 +1219,10 @@ async def process_messages(
# -[ ] figure out how this will break with other transports?
tc.report_n_maybe_raise(
message=(
f'peer IPC channel closed abruptly?\n\n'
f'<=x {chan}\n'
f'peer IPC channel closed abruptly?\n'
f'\n'
f'<=x[\n'
f' {chan}\n'
f' |_{chan.raddr}\n\n'
)
+

View File

@ -40,9 +40,7 @@ from __future__ import annotations
from contextlib import (
ExitStack,
)
from collections import defaultdict
from functools import partial
from itertools import chain
import importlib
import importlib.util
import os
@ -74,13 +72,16 @@ from tractor.msg import (
pretty_struct,
types as msgtypes,
)
from .ipc import Channel
from .ipc import (
Channel,
# IPCServer, # causes cycles atm..
_server,
)
from ._addr import (
UnwrappedAddress,
Address,
default_lo_addrs,
# default_lo_addrs,
get_address_cls,
preferred_transport,
wrap_address,
)
from ._context import (
@ -94,18 +95,13 @@ from ._exceptions import (
ModuleNotExposed,
MsgTypeError,
unpack_error,
TransportClosed,
)
from .devx import _debug
from ._discovery import get_registry
from ._portal import Portal
from . import _state
from . import _mp_fixup_main
from ._rpc import (
process_messages,
try_ship_error_to_remote,
)
from . import _rpc
if TYPE_CHECKING:
from ._supervise import ActorNursery
@ -158,16 +154,23 @@ class Actor:
# nursery placeholders filled in by `async_main()` after fork
_root_n: Nursery|None = None
_service_n: Nursery|None = None
_server_n: Nursery|None = None
_ipc_server: _server.IPCServer|None = None
@property
def ipc_server(self) -> _server.IPCServer:
'''
The IPC transport-server for this actor; normally
a process-singleton.
'''
return self._ipc_server
# Information about `__main__` from parent
_parent_main_data: dict[str, str]
_parent_chan_cs: CancelScope|None = None
_spawn_spec: msgtypes.SpawnSpec|None = None
# syncs for setup/teardown sequences
_server_down: trio.Event|None = None
# if started on ``asycio`` running ``trio`` in guest mode
_infected_aio: bool = False
@ -241,14 +244,6 @@ class Actor:
# by the user (currently called the "arbiter")
self._spawn_method: str = spawn_method
self._peers: defaultdict[
str, # uaid
list[Channel], # IPC conns from peer
] = defaultdict(list)
self._peer_connected: dict[tuple[str, str], trio.Event] = {}
self._no_more_peers = trio.Event()
self._no_more_peers.set()
# RPC state
self._ongoing_rpc_tasks = trio.Event()
self._ongoing_rpc_tasks.set()
@ -267,8 +262,6 @@ class Actor:
Context
] = {}
self._listeners: list[trio.abc.Listener] = []
self._listen_addrs: list[Address] = []
self._parent_chan: Channel|None = None
self._forkserver_info: tuple|None = None
@ -289,7 +282,9 @@ class Actor:
@property
def aid(self) -> msgtypes.Aid:
'''
This process-singleton-actor's "unique ID" in struct form.
This process-singleton-actor's "unique actor ID" in struct form.
See the `tractor.msg.Aid` struct for details.
'''
return self._aid
@ -308,6 +303,17 @@ class Actor:
process plane.
'''
msg: str = (
f'`{type(self).__name__}.uid` is now deprecated.\n'
'Use the new `.aid: tractor.msg.Aid` (struct) instead '
'which also provides additional named (optional) fields '
'beyond just the `.name` and `.uuid`.'
)
warnings.warn(
msg,
DeprecationWarning,
stacklevel=2,
)
return (
self._aid.name,
self._aid.uuid,
@ -322,8 +328,12 @@ class Actor:
parent_uid: tuple|None = None
if rent_chan := self._parent_chan:
parent_uid = rent_chan.uid
peers: list[tuple] = list(self._peer_connected)
listen_addrs: str = pformat(self._listen_addrs)
peers: list = []
server: _server.IPCServer = self.ipc_server
if server:
peers: list[tuple] = list(server._peer_connected)
fmtstr: str = (
f' |_id: {self.aid!r}\n'
# f" aid{ds}{self.aid!r}\n"
@ -331,8 +341,7 @@ class Actor:
f'\n'
f' |_ipc: {len(peers)!r} connected peers\n'
f" peers{ds}{peers!r}\n"
f" _listen_addrs{ds}'{listen_addrs}'\n"
f" _listeners{ds}'{self._listeners}'\n"
f" ipc_server{ds}{self._ipc_server}\n"
f'\n'
f' |_rpc: {len(self._rpc_tasks)} tasks\n'
f" ctxs{ds}{len(self._contexts)}\n"
@ -380,25 +389,6 @@ class Actor:
self._reg_addrs = addrs
async def wait_for_peer(
self,
uid: tuple[str, str],
) -> tuple[trio.Event, Channel]:
'''
Wait for a connection back from a (spawned sub-)actor with
a `uid` using a `trio.Event` for sync.
'''
log.debug(f'Waiting for peer {uid!r} to connect')
event = self._peer_connected.setdefault(uid, trio.Event())
await event.wait()
log.debug(f'{uid!r} successfully connected back to us')
return (
event,
self._peers[uid][-1],
)
def load_modules(
self,
# debug_mode: bool = False,
@ -474,423 +464,6 @@ class Actor:
raise mne
# TODO: maybe change to mod-func and rename for implied
# multi-transport semantics?
async def _stream_handler(
self,
stream: trio.SocketStream,
) -> None:
'''
Entry point for new inbound IPC connections on a specific
transport server.
'''
self._no_more_peers = trio.Event() # unset by making new
chan = Channel.from_stream(stream)
con_status: str = (
'New inbound IPC connection <=\n'
f'|_{chan}\n'
)
# send/receive initial handshake response
try:
uid: tuple|None = await self._do_handshake(chan)
except (
TransportClosed,
# ^XXX NOTE, the above wraps `trio` exc types raised
# during various `SocketStream.send/receive_xx()` calls
# under different fault conditions such as,
#
# trio.BrokenResourceError,
# trio.ClosedResourceError,
#
# Inside our `.ipc._transport` layer we absorb and
# re-raise our own `TransportClosed` exc such that this
# higher level runtime code can only worry one
# "kinda-error" that we expect to tolerate during
# discovery-sys related pings, queires, DoS etc.
):
# XXX: This may propagate up from `Channel._aiter_recv()`
# and `MsgpackStream._inter_packets()` on a read from the
# stream particularly when the runtime is first starting up
# inside `open_root_actor()` where there is a check for
# a bound listener on the "arbiter" addr. the reset will be
# because the handshake was never meant took place.
log.runtime(
con_status
+
' -> But failed to handshake? Ignoring..\n'
)
return
familiar: str = 'new-peer'
if _pre_chan := self._peers.get(uid):
familiar: str = 'pre-existing-peer'
uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
con_status += (
f' -> Handshake with {familiar} `{uid_short}` complete\n'
)
if _pre_chan:
# con_status += (
# ^TODO^ swap once we minimize conn duplication
# -[ ] last thing might be reg/unreg runtime reqs?
# log.warning(
log.debug(
f'?Wait?\n'
f'We already have IPC with peer {uid_short!r}\n'
f'|_{_pre_chan}\n'
)
# IPC connection tracking for both peers and new children:
# - if this is a new channel to a locally spawned
# sub-actor there will be a spawn wait even registered
# by a call to `.wait_for_peer()`.
# - if a peer is connecting no such event will exit.
event: trio.Event|None = self._peer_connected.pop(
uid,
None,
)
if event:
con_status += (
' -> Waking subactor spawn waiters: '
f'{event.statistics().tasks_waiting}\n'
f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n'
# f' {event}\n'
# f' |{event.statistics()}\n'
)
# wake tasks waiting on this IPC-transport "connect-back"
event.set()
else:
con_status += (
f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n'
) # type: ignore
chans: list[Channel] = self._peers[uid]
# if chans:
# # TODO: re-use channels for new connections instead
# # of always new ones?
# # => will require changing all the discovery funcs..
# append new channel
# TODO: can we just use list-ref directly?
chans.append(chan)
con_status += ' -> Entering RPC msg loop..\n'
log.runtime(con_status)
# Begin channel management - respond to remote requests and
# process received reponses.
disconnected: bool = False
last_msg: MsgType
try:
(
disconnected,
last_msg,
) = await process_messages(
self,
chan,
)
except trio.Cancelled:
log.cancel(
'IPC transport msg loop was cancelled\n'
f'c)>\n'
f' |_{chan}\n'
)
raise
finally:
local_nursery: (
ActorNursery|None
) = self._actoruid2nursery.get(uid)
# This is set in ``Portal.cancel_actor()``. So if
# the peer was cancelled we try to wait for them
# to tear down their side of the connection before
# moving on with closing our own side.
if (
local_nursery
and (
self._cancel_called
or
chan._cancel_called
)
#
# ^-TODO-^ along with this is there another condition
# that we should filter with to avoid entering this
# waiting block needlessly?
# -[ ] maybe `and local_nursery.cancelled` and/or
# only if the `._children` table is empty or has
# only `Portal`s with .chan._cancel_called ==
# True` as per what we had below; the MAIN DIFF
# BEING that just bc one `Portal.cancel_actor()`
# was called, doesn't mean the whole actor-nurse
# is gonna exit any time soon right!?
#
# or
# all(chan._cancel_called for chan in chans)
):
log.cancel(
'Waiting on cancel request to peer..\n'
f'c)=>\n'
f' |_{chan.uid}\n'
)
# XXX: this is a soft wait on the channel (and its
# underlying transport protocol) to close from the
# remote peer side since we presume that any channel
# which is mapped to a sub-actor (i.e. it's managed
# by local actor-nursery) has a message that is sent
# to the peer likely by this actor (which may be in
# a shutdown sequence due to cancellation) when the
# local runtime here is now cancelled while
# (presumably) in the middle of msg loop processing.
chan_info: str = (
f'{chan.uid}\n'
f'|_{chan}\n'
f' |_{chan.transport}\n\n'
)
with trio.move_on_after(0.5) as drain_cs:
drain_cs.shield = True
# attempt to wait for the far end to close the
# channel and bail after timeout (a 2-generals
# problem on closure).
assert chan.transport
async for msg in chan.transport.drain():
# try to deliver any lingering msgs
# before we destroy the channel.
# This accomplishes deterministic
# ``Portal.cancel_actor()`` cancellation by
# making sure any RPC response to that call is
# delivered the local calling task.
# TODO: factor this into a helper?
log.warning(
'Draining msg from disconnected peer\n'
f'{chan_info}'
f'{pformat(msg)}\n'
)
# cid: str|None = msg.get('cid')
cid: str|None = msg.cid
if cid:
# deliver response to local caller/waiter
await self._deliver_ctx_payload(
chan,
cid,
msg,
)
if drain_cs.cancelled_caught:
log.warning(
'Timed out waiting on IPC transport channel to drain?\n'
f'{chan_info}'
)
# XXX NOTE XXX when no explicit call to
# `open_root_actor()` was made by the application
# (normally we implicitly make that call inside
# the first `.open_nursery()` in root-actor
# user/app code), we can assume that either we
# are NOT the root actor or are root but the
# runtime was started manually. and thus DO have
# to wait for the nursery-enterer to exit before
# shutting down the local runtime to avoid
# clobbering any ongoing subactor
# teardown/debugging/graceful-cancel.
#
# see matching note inside `._supervise.open_nursery()`
#
# TODO: should we have a separate cs + timeout
# block here?
if (
# XXX SO either,
# - not root OR,
# - is root but `open_root_actor()` was
# entered manually (in which case we do
# the equiv wait there using the
# `devx._debug` sub-sys APIs).
not local_nursery._implicit_runtime_started
):
log.runtime(
'Waiting on local actor nursery to exit..\n'
f'|_{local_nursery}\n'
)
with trio.move_on_after(0.5) as an_exit_cs:
an_exit_cs.shield = True
await local_nursery.exited.wait()
# TODO: currently this is always triggering for every
# sub-daemon spawned from the `piker.services._mngr`?
# -[ ] how do we ensure that the IPC is supposed to
# be long lived and isn't just a register?
# |_ in the register case how can we signal that the
# ephemeral msg loop was intentional?
if (
# not local_nursery._implicit_runtime_started
# and
an_exit_cs.cancelled_caught
):
report: str = (
'Timed out waiting on local actor-nursery to exit?\n'
f'c)>\n'
f' |_{local_nursery}\n'
)
if children := local_nursery._children:
# indent from above local-nurse repr
report += (
f' |_{pformat(children)}\n'
)
log.warning(report)
if disconnected:
# if the transport died and this actor is still
# registered within a local nursery, we report
# that the IPC layer may have failed
# unexpectedly since it may be the cause of
# other downstream errors.
entry: tuple|None = local_nursery._children.get(uid)
if entry:
proc: trio.Process
_, proc, _ = entry
if (
(poll := getattr(proc, 'poll', None))
and
poll() is None # proc still alive
):
# TODO: change log level based on
# detecting whether chan was created for
# ephemeral `.register_actor()` request!
# -[ ] also, that should be avoidable by
# re-using any existing chan from the
# `._discovery.get_registry()` call as
# well..
log.runtime(
f'Peer IPC broke but subproc is alive?\n\n'
f'<=x {chan.uid}@{chan.raddr}\n'
f' |_{proc}\n'
)
# ``Channel`` teardown and closure sequence
# drop ref to channel so it can be gc-ed and disconnected
con_teardown_status: str = (
f'IPC channel disconnected:\n'
f'<=x uid: {chan.uid}\n'
f' |_{pformat(chan)}\n\n'
)
chans.remove(chan)
# TODO: do we need to be this pedantic?
if not chans:
con_teardown_status += (
f'-> No more channels with {chan.uid}'
)
self._peers.pop(uid, None)
peers_str: str = ''
for uid, chans in self._peers.items():
peers_str += (
f'uid: {uid}\n'
)
for i, chan in enumerate(chans):
peers_str += (
f' |_[{i}] {pformat(chan)}\n'
)
con_teardown_status += (
f'-> Remaining IPC {len(self._peers)} peers: {peers_str}\n'
)
# No more channels to other actors (at all) registered
# as connected.
if not self._peers:
con_teardown_status += (
'Signalling no more peer channel connections'
)
self._no_more_peers.set()
# NOTE: block this actor from acquiring the
# debugger-TTY-lock since we have no way to know if we
# cancelled it and further there is no way to ensure the
# lock will be released if acquired due to having no
# more active IPC channels.
if _state.is_root_process():
pdb_lock = _debug.Lock
pdb_lock._blocked.add(uid)
# TODO: NEEEDS TO BE TESTED!
# actually, no idea if this ever even enters.. XD
#
# XXX => YES IT DOES, when i was testing ctl-c
# from broken debug TTY locking due to
# msg-spec races on application using RunVar...
if (
(ctx_in_debug := pdb_lock.ctx_in_debug)
and
(pdb_user_uid := ctx_in_debug.chan.uid)
and
local_nursery
):
entry: tuple|None = local_nursery._children.get(
tuple(pdb_user_uid)
)
if entry:
proc: trio.Process
_, proc, _ = entry
if (
(poll := getattr(proc, 'poll', None))
and poll() is None
):
log.cancel(
'Root actor reports no-more-peers, BUT\n'
'a DISCONNECTED child still has the debug '
'lock!\n\n'
# f'root uid: {self.uid}\n'
f'last disconnected child uid: {uid}\n'
f'locking child uid: {pdb_user_uid}\n'
)
await _debug.maybe_wait_for_debugger(
child_in_debug=True
)
# TODO: just bc a child's transport dropped
# doesn't mean it's not still using the pdb
# REPL! so,
# -[ ] ideally we can check out child proc
# tree to ensure that its alive (and
# actually using the REPL) before we cancel
# it's lock acquire by doing the below!
# -[ ] create a way to read the tree of each actor's
# grandchildren such that when an
# intermediary parent is cancelled but their
# child has locked the tty, the grandparent
# will not allow the parent to cancel or
# zombie reap the child! see open issue:
# - https://github.com/goodboy/tractor/issues/320
# ------ - ------
# if a now stale local task has the TTY lock still
# we cancel it to allow servicing other requests for
# the lock.
if (
(db_cs := pdb_lock.get_locking_task_cs())
and not db_cs.cancel_called
and uid == pdb_user_uid
):
log.critical(
f'STALE DEBUG LOCK DETECTED FOR {uid}'
)
# TODO: figure out why this breaks tests..
db_cs.cancel()
log.runtime(con_teardown_status)
# finally block closure
# TODO: rename to `._deliver_payload()` since this handles
# more then just `result` msgs now obvi XD
async def _deliver_ctx_payload(
@ -1127,9 +700,8 @@ class Actor:
)
assert isinstance(chan, Channel)
# TODO: move this into a `Channel.handshake()`?
# Initial handshake: swap names.
await self._do_handshake(chan)
# init handshake: swap actor-IDs.
await chan._do_handshake(aid=self.aid)
accept_addrs: list[UnwrappedAddress]|None = None
@ -1270,90 +842,20 @@ class Actor:
# -[ ] need to extend the `SpawnSpec` tho!
)
except OSError: # failed to connect
# failed to connect back?
except (
OSError,
ConnectionError,
):
log.warning(
f'Failed to connect to spawning parent actor!?\n'
f'\n'
f'x=> {parent_addr}\n'
f'|_{self}\n\n'
f' |_{self}\n\n'
)
await self.cancel(req_chan=None) # self cancel
raise
async def _serve_forever(
self,
handler_nursery: Nursery,
*,
listen_addrs: list[UnwrappedAddress]|None = None,
task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Start the IPC transport server, begin listening/accepting new
`trio.SocketStream` connections.
This will cause an actor to continue living (and thus
blocking at the process/OS-thread level) until
`.cancel_server()` is called.
'''
if listen_addrs is None:
listen_addrs = default_lo_addrs([preferred_transport])
else:
listen_addrs: list[Address] = [
wrap_address(a) for a in listen_addrs
]
self._server_down = trio.Event()
try:
async with trio.open_nursery() as server_n:
listeners: list[trio.abc.Listener] = []
for addr in listen_addrs:
try:
listener: trio.abc.Listener = await addr.open_listener()
except OSError as oserr:
if (
'[Errno 98] Address already in use'
in
oserr.args[0]
):
log.exception(
f'Address already in use?\n'
f'{addr}\n'
)
raise
listeners.append(listener)
await server_n.start(
partial(
trio.serve_listeners,
handler=self._stream_handler,
listeners=listeners,
# NOTE: configured such that new
# connections will stay alive even if
# this server is cancelled!
handler_nursery=handler_nursery
)
)
log.runtime(
'Started server(s)\n'
'\n'.join([f'|_{addr}' for addr in listen_addrs])
)
self._listen_addrs.extend(listen_addrs)
self._listeners.extend(listeners)
task_status.started(server_n)
finally:
addr: Address
for addr in listen_addrs:
addr.close_listener()
# signal the server is down since nursery above terminated
self._server_down.set()
def cancel_soon(self) -> None:
'''
Cancel this actor asap; can be called from a sync context.
@ -1453,13 +955,9 @@ class Actor:
)
# stop channel server
self.cancel_server()
if self._server_down is not None:
await self._server_down.wait()
else:
log.warning(
'Transport[TCP] server was cancelled start?'
)
if ipc_server := self.ipc_server:
ipc_server.cancel()
await ipc_server.wait_for_shutdown()
# cancel all rpc tasks permanently
if self._service_n:
@ -1690,24 +1188,6 @@ class Actor:
)
await self._ongoing_rpc_tasks.wait()
def cancel_server(self) -> bool:
'''
Cancel the internal IPC transport server nursery thereby
preventing any new inbound IPC connections establishing.
'''
if self._server_n:
# TODO: obvi a different server type when we eventually
# support some others XD
server_prot: str = 'TCP'
log.runtime(
f'Cancelling {server_prot} server'
)
self._server_n.cancel_scope.cancel()
return True
return False
@property
def accept_addrs(self) -> list[UnwrappedAddress]:
'''
@ -1715,7 +1195,7 @@ class Actor:
and listens for new connections.
'''
return [a.unwrap() for a in self._listen_addrs]
return self._ipc_server.accept_addrs
@property
def accept_addr(self) -> UnwrappedAddress:
@ -1745,41 +1225,6 @@ class Actor:
'''
return self._peers[uid]
# TODO: move to `Channel.handshake(uid)`
async def _do_handshake(
self,
chan: Channel
) -> msgtypes.Aid:
'''
Exchange `(name, UUIDs)` identifiers as the first
communication step with any (peer) remote `Actor`.
These are essentially the "mailbox addresses" found in
"actor model" parlance.
'''
name, uuid = self.uid
await chan.send(
msgtypes.Aid(
name=name,
uuid=uuid,
)
)
aid: msgtypes.Aid = await chan.recv()
chan.aid = aid
uid: tuple[str, str] = (
aid.name,
aid.uuid,
)
if not isinstance(uid, tuple):
raise ValueError(f"{uid} is not a valid uid?!")
chan.uid = uid
return uid
def is_infected_aio(self) -> bool:
'''
If `True`, this actor is running `trio` in guest mode on
@ -1817,6 +1262,10 @@ async def async_main(
the actor's "runtime" and all thus all ongoing RPC tasks.
'''
# XXX NOTE, `_state._current_actor` **must** be set prior to
# calling this core runtime entrypoint!
assert actor is _state.current_actor()
actor._task: trio.Task = trio.lowlevel.current_task()
# attempt to retreive ``trio``'s sigint handler and stash it
@ -1849,7 +1298,7 @@ async def async_main(
enable_transports: list[str] = (
maybe_preferred_transports_says_rent
or
[preferred_transport]
[_state._def_tpt_proto]
)
for transport_key in enable_transports:
transport_cls: Type[Address] = get_address_cls(
@ -1858,6 +1307,7 @@ async def async_main(
addr: Address = transport_cls.get_random()
accept_addrs.append(addr.unwrap())
assert accept_addrs
# The "root" nursery ensures the channel with the immediate
# parent is kept alive as a resilient service until
# cancellation steps have (mostly) occurred in
@ -1868,15 +1318,36 @@ async def async_main(
actor._root_n = root_nursery
assert actor._root_n
async with trio.open_nursery(
strict_exception_groups=False,
) as service_nursery:
ipc_server: _server.IPCServer
async with (
trio.open_nursery(
strict_exception_groups=False,
) as service_nursery,
_server.open_ipc_server(
parent_tn=service_nursery,
stream_handler_tn=service_nursery,
) as ipc_server,
# ) as actor._ipc_server,
# ^TODO? prettier?
):
# This nursery is used to handle all inbound
# connections to us such that if the TCP server
# is killed, connections can continue to process
# in the background until this nursery is cancelled.
actor._service_n = service_nursery
assert actor._service_n
actor._ipc_server = ipc_server
assert (
actor._service_n
and (
actor._service_n
is
actor._ipc_server._parent_tn
is
ipc_server._stream_handler_tn
)
)
# load exposed/allowed RPC modules
# XXX: do this **after** establishing a channel to the parent
@ -1900,30 +1371,41 @@ async def async_main(
# - subactor: the bind address is sent by our parent
# over our established channel
# - root actor: the ``accept_addr`` passed to this method
assert accept_addrs
# TODO: why is this not with the root nursery?
try:
# TODO: why is this not with the root nursery?
actor._server_n = await service_nursery.start(
partial(
actor._serve_forever,
service_nursery,
listen_addrs=accept_addrs,
)
log.runtime(
'Booting IPC server'
)
eps: list = await ipc_server.listen_on(
accept_addrs=accept_addrs,
stream_handler_nursery=service_nursery,
)
log.runtime(
f'Booted IPC server\n'
f'{ipc_server}\n'
)
assert (
(eps[0].listen_tn)
is not service_nursery
)
except OSError as oserr:
# NOTE: always allow runtime hackers to debug
# tranport address bind errors - normally it's
# something silly like the wrong socket-address
# passed via a config or CLI Bo
entered_debug: bool = await _debug._maybe_enter_pm(oserr)
entered_debug: bool = await _debug._maybe_enter_pm(
oserr,
)
if not entered_debug:
log.exception('Failed to init IPC channel server !?\n')
log.exception('Failed to init IPC server !?\n')
else:
log.runtime('Exited debug REPL..')
raise
# TODO, just read direct from ipc_server?
accept_addrs: list[UnwrappedAddress] = actor.accept_addrs
# NOTE: only set the loopback addr for the
@ -1956,7 +1438,9 @@ async def async_main(
async with get_registry(addr) as reg_portal:
for accept_addr in accept_addrs:
accept_addr = wrap_address(accept_addr)
assert accept_addr.is_valid
if not accept_addr.is_valid:
breakpoint()
await reg_portal.run_from_ns(
'self',
@ -1977,9 +1461,8 @@ async def async_main(
if actor._parent_chan:
await root_nursery.start(
partial(
process_messages,
actor,
actor._parent_chan,
_rpc.process_messages,
chan=actor._parent_chan,
shield=True,
)
)
@ -2020,7 +1503,7 @@ async def async_main(
log.exception(err_report)
if actor._parent_chan:
await try_ship_error_to_remote(
await _rpc.try_ship_error_to_remote(
actor._parent_chan,
internal_err,
)
@ -2114,16 +1597,18 @@ async def async_main(
)
# Ensure all peers (actors connected to us as clients) are finished
if not actor._no_more_peers.is_set():
if any(
chan.connected() for chan in chain(*actor._peers.values())
):
teardown_report += (
f'-> Waiting for remaining peers {actor._peers} to clear..\n'
)
log.runtime(teardown_report)
with CancelScope(shield=True):
await actor._no_more_peers.wait()
if (
(ipc_server := actor.ipc_server)
and
ipc_server.has_peers(check_chans=True)
):
teardown_report += (
f'-> Waiting for remaining peers {ipc_server._peers} to clear..\n'
)
log.runtime(teardown_report)
await ipc_server.wait_for_no_more_peers(
shield=True,
)
teardown_report += (
'-> All peer channels are complete\n'
@ -2137,15 +1622,15 @@ async def async_main(
log.info(teardown_report)
# TODO: rename to `Registry` and move to `._discovery`!
# TODO: rename to `Registry` and move to `.discovery._registry`!
class Arbiter(Actor):
'''
A special registrar actor who can contact all other actors
within its immediate process tree and possibly keeps a registry
of others meant to be discoverable in a distributed
application. Normally the registrar is also the "root actor"
and thus always has access to the top-most-level actor
(process) nursery.
A special registrar (and for now..) `Actor` who can contact all
other actors within its immediate process tree and possibly keeps
a registry of others meant to be discoverable in a distributed
application. Normally the registrar is also the "root actor" and
thus always has access to the top-most-level actor (process)
nursery.
By default, the registrar is always initialized when and if no
other registrar socket addrs have been specified to runtime
@ -2165,6 +1650,12 @@ class Arbiter(Actor):
'''
is_arbiter = True
# TODO, implement this as a read on there existing a `._state` of
# some sort setup by whenever we impl this all as
# a `.discovery._registry.open_registry()` API
def is_registry(self) -> bool:
return self.is_arbiter
def __init__(
self,
*args,

View File

@ -52,14 +52,17 @@ from tractor._runtime import Actor
from tractor._entry import _mp_main
from tractor._exceptions import ActorFailure
from tractor.msg.types import (
Aid,
SpawnSpec,
)
if TYPE_CHECKING:
from ipc import IPCServer
from ._supervise import ActorNursery
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
log = get_logger('tractor')
# placeholder for an mp start context if so using that backend
@ -164,7 +167,7 @@ async def exhaust_portal(
# TODO: merge with above?
log.warning(
'Cancelled portal result waiter task:\n'
f'uid: {portal.channel.uid}\n'
f'uid: {portal.channel.aid}\n'
f'error: {err}\n'
)
return err
@ -172,7 +175,7 @@ async def exhaust_portal(
else:
log.debug(
f'Returning final result from portal:\n'
f'uid: {portal.channel.uid}\n'
f'uid: {portal.channel.aid}\n'
f'result: {final}\n'
)
return final
@ -325,12 +328,12 @@ async def soft_kill(
see `.hard_kill()`).
'''
uid: tuple[str, str] = portal.channel.uid
peer_aid: Aid = portal.channel.aid
try:
log.cancel(
f'Soft killing sub-actor via portal request\n'
f'\n'
f'(c=> {portal.chan.uid}\n'
f'(c=> {peer_aid}\n'
f' |_{proc}\n'
)
# wait on sub-proc to signal termination
@ -379,7 +382,7 @@ async def soft_kill(
if proc.poll() is None: # type: ignore
log.warning(
'Subactor still alive after cancel request?\n\n'
f'uid: {uid}\n'
f'uid: {peer_aid}\n'
f'|_{proc}\n'
)
n.cancel_scope.cancel()
@ -460,6 +463,9 @@ async def trio_proc(
# the OS; it otherwise can be passed via the parent channel if
# we prefer in the future (for privacy).
"--uid",
# TODO, how to pass this over "wire" encodings like
# cmdline args?
# -[ ] maybe we can add an `Aid.min_tuple()` ?
str(subactor.uid),
# Address the child must connect to on startup
"--parent_addr",
@ -477,6 +483,7 @@ async def trio_proc(
cancelled_during_spawn: bool = False
proc: trio.Process|None = None
ipc_server: IPCServer = actor_nursery._actor.ipc_server
try:
try:
proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs)
@ -488,7 +495,7 @@ async def trio_proc(
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the
# local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer(
event, chan = await ipc_server.wait_for_peer(
subactor.uid
)
@ -720,12 +727,14 @@ async def mp_proc(
log.runtime(f"Started {proc}")
ipc_server: IPCServer = actor_nursery._actor.ipc_server
try:
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the
# local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer(
subactor.uid)
event, chan = await ipc_server.wait_for_peer(
subactor.uid,
)
# XXX: monkey patch poll API to match the ``subprocess`` API..
# not sure why they don't expose this but kk.

View File

@ -26,6 +26,7 @@ import os
from pathlib import Path
from typing import (
Any,
Literal,
TYPE_CHECKING,
)
@ -101,7 +102,7 @@ def current_actor(
return _current_actor
def is_main_process() -> bool:
def is_root_process() -> bool:
'''
Bool determining if this actor is running in the top-most process.
@ -110,8 +111,10 @@ def is_main_process() -> bool:
return mp.current_process().name == 'MainProcess'
# TODO, more verby name?
def debug_mode() -> bool:
is_main_process = is_root_process
def is_debug_mode() -> bool:
'''
Bool determining if "debug mode" is on which enables
remote subactor pdb entry on crashes.
@ -120,6 +123,9 @@ def debug_mode() -> bool:
return bool(_runtime_vars['_debug_mode'])
debug_mode = is_debug_mode
def is_root_process() -> bool:
return _runtime_vars['_is_root']
@ -164,3 +170,23 @@ def get_rt_dir(
if not rtdir.is_dir():
rtdir.mkdir()
return rtdir
# default IPC transport protocol settings
TransportProtocolKey = Literal[
'tcp',
'uds',
]
_def_tpt_proto: TransportProtocolKey = 'tcp'
def current_ipc_protos() -> list[str]:
'''
Return the list of IPC transport protocol keys currently
in use by this actor.
The keys are as declared by `MsgTransport` and `Address`
concrete-backend sub-types defined throughout `tractor.ipc`.
'''
return [_def_tpt_proto]

View File

@ -437,22 +437,23 @@ class MsgStream(trio.abc.Channel):
message: str = (
f'Stream self-closed by {this_side!r}-side before EoC from {peer_side!r}\n'
# } bc a stream is a "scope"/msging-phase inside an IPC
f'x}}>\n'
f'c}}>\n'
f' |_{self}\n'
)
log.cancel(message)
self._eoc = trio.EndOfChannel(message)
if (
(rx_chan := self._rx_chan)
and
(stats := rx_chan.statistics()).tasks_waiting_receive
):
log.cancel(
f'Msg-stream is closing but there is still reader tasks,\n'
message += (
f'AND there is still reader tasks,\n'
f'\n'
f'{stats}\n'
)
log.cancel(message)
self._eoc = trio.EndOfChannel(message)
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
# => NO, DEFINITELY NOT! <=
# if we're a bi-dir `MsgStream` BECAUSE this same
@ -811,13 +812,12 @@ async def open_stream_from_ctx(
# sanity, can remove?
assert eoc is stream._eoc
log.warning(
log.runtime(
'Stream was terminated by EoC\n\n'
# NOTE: won't show the error <Type> but
# does show txt followed by IPC msg.
f'{str(eoc)}\n'
)
finally:
if ctx._portal:
try:

View File

@ -34,7 +34,6 @@ import trio
from .devx._debug import maybe_wait_for_debugger
from ._addr import (
UnwrappedAddress,
preferred_transport,
mk_uuid,
)
from ._state import current_actor, is_main_process
@ -45,13 +44,18 @@ from ._exceptions import (
is_multi_cancelled,
ContextCancelled,
)
from ._root import open_root_actor
from ._root import (
open_root_actor,
)
from . import _state
from . import _spawn
if TYPE_CHECKING:
import multiprocessing as mp
# from .ipc._server import IPCServer
from .ipc import IPCServer
log = get_logger(__name__)
@ -138,7 +142,7 @@ class ActorNursery:
bind_addrs: list[UnwrappedAddress]|None = None,
rpc_module_paths: list[str]|None = None,
enable_transports: list[str] = [preferred_transport],
enable_transports: list[str] = [_state._def_tpt_proto],
enable_modules: list[str]|None = None,
loglevel: str|None = None, # set log level per subactor
debug_mode: bool|None = None,
@ -314,8 +318,13 @@ class ActorNursery:
children: dict = self._children
child_count: int = len(children)
msg: str = f'Cancelling actor nursery with {child_count} children\n'
server: IPCServer = self._actor.ipc_server
with trio.move_on_after(3) as cs:
async with trio.open_nursery() as tn:
async with trio.open_nursery(
strict_exception_groups=False,
) as tn:
subactor: Actor
proc: trio.Process
@ -334,7 +343,7 @@ class ActorNursery:
else:
if portal is None: # actor hasn't fully spawned yet
event = self._actor._peer_connected[subactor.uid]
event: trio.Event = server._peer_connected[subactor.uid]
log.warning(
f"{subactor.uid} never 't finished spawning?"
)
@ -350,7 +359,7 @@ class ActorNursery:
if portal is None:
# cancelled while waiting on the event
# to arrive
chan = self._actor._peers[subactor.uid][-1]
chan = server._peers[subactor.uid][-1]
if chan:
portal = Portal(chan)
else: # there's no other choice left

View File

@ -73,6 +73,7 @@ from tractor.log import get_logger
from tractor._context import Context
from tractor import _state
from tractor._exceptions import (
DebugRequestError,
InternalError,
NoRuntime,
is_multi_cancelled,
@ -91,7 +92,11 @@ from tractor._state import (
if TYPE_CHECKING:
from trio.lowlevel import Task
from threading import Thread
from tractor.ipc import Channel
from tractor.ipc import (
Channel,
IPCServer,
# _server, # TODO? export at top level?
)
from tractor._runtime import (
Actor,
)
@ -1433,6 +1438,7 @@ def any_connected_locker_child() -> bool:
'''
actor: Actor = current_actor()
server: IPCServer = actor.ipc_server
if not is_root_process():
raise InternalError('This is a root-actor only API!')
@ -1442,7 +1448,7 @@ def any_connected_locker_child() -> bool:
and
(uid_in_debug := ctx.chan.uid)
):
chans: list[tractor.Channel] = actor._peers.get(
chans: list[tractor.Channel] = server._peers.get(
tuple(uid_in_debug)
)
if chans:
@ -1740,13 +1746,6 @@ def sigint_shield(
_pause_msg: str = 'Opening a pdb REPL in paused actor'
class DebugRequestError(RuntimeError):
'''
Failed to request stdio lock from root actor!
'''
_repl_fail_msg: str|None = (
'Failed to REPl via `_pause()` '
)
@ -3009,6 +3008,7 @@ async def _maybe_enter_pm(
[BaseException|BaseExceptionGroup],
bool,
] = lambda err: not is_multi_cancelled(err),
**_pause_kws,
):
if (
@ -3035,6 +3035,7 @@ async def _maybe_enter_pm(
await post_mortem(
api_frame=api_frame,
tb=tb,
**_pause_kws,
)
return True

View File

@ -19,6 +19,7 @@ Pretty formatters for use throughout the code base.
Mostly handy for logging and exception message content.
'''
import sys
import textwrap
import traceback
@ -115,6 +116,85 @@ def pformat_boxed_tb(
)
def pformat_exc(
exc: Exception,
header: str = '',
message: str = '',
body: str = '',
with_type_header: bool = True,
) -> str:
# XXX when the currently raised exception is this instance,
# we do not ever use the "type header" style repr.
is_being_raised: bool = False
if (
(curr_exc := sys.exception())
and
curr_exc is exc
):
is_being_raised: bool = True
with_type_header: bool = (
with_type_header
and
not is_being_raised
)
# <RemoteActorError( .. )> style
if (
with_type_header
and
not header
):
header: str = f'<{type(exc).__name__}('
message: str = (
message
or
exc.message
)
if message:
# split off the first line so, if needed, it isn't
# indented the same like the "boxed content" which
# since there is no `.tb_str` is just the `.message`.
lines: list[str] = message.splitlines()
first: str = lines[0]
message: str = message.removeprefix(first)
# with a type-style header we,
# - have no special message "first line" extraction/handling
# - place the message a space in from the header:
# `MsgTypeError( <message> ..`
# ^-here
# - indent the `.message` inside the type body.
if with_type_header:
first = f' {first} )>'
message: str = textwrap.indent(
message,
prefix=' '*2,
)
message: str = first + message
tail: str = ''
if (
with_type_header
and
not message
):
tail: str = '>'
return (
header
+
message
+
f'{body}'
+
tail
)
def pformat_caller_frame(
stack_limit: int = 1,
box_tb: bool = True,

View File

@ -45,6 +45,8 @@ __all__ = ['pub']
log = get_logger('messaging')
# TODO! this needs to reworked to use the modern
# `Context`/`MsgStream` APIs!!
async def fan_out_to_ctxs(
pub_async_gen_func: typing.Callable, # it's an async gen ... gd mypy
topics2ctxs: dict[str, list],

View File

@ -13,43 +13,12 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import platform
from ._transport import (
MsgTransportKey as MsgTransportKey,
MsgType as MsgType,
MsgTransport as MsgTransport,
MsgpackTransport as MsgpackTransport
)
from ._tcp import MsgpackTCPStream as MsgpackTCPStream
from ._uds import MsgpackUDSStream as MsgpackUDSStream
from ._types import (
transport_from_addr as transport_from_addr,
transport_from_stream as transport_from_stream,
)
'''
A modular IPC layer supporting the power of cross-process SC!
'''
from ._chan import (
_connect_chan as _connect_chan,
Channel as Channel
)
if platform.system() == 'Linux':
from ._linux import (
EFD_SEMAPHORE as EFD_SEMAPHORE,
EFD_CLOEXEC as EFD_CLOEXEC,
EFD_NONBLOCK as EFD_NONBLOCK,
open_eventfd as open_eventfd,
write_eventfd as write_eventfd,
read_eventfd as read_eventfd,
close_eventfd as close_eventfd,
EventFD as EventFD,
)
from ._ringbuf import (
RBToken as RBToken,
RingBuffSender as RingBuffSender,
RingBuffReceiver as RingBuffReceiver,
open_ringbuf as open_ringbuf
)

View File

@ -24,18 +24,18 @@ from contextlib import (
asynccontextmanager as acm,
contextmanager as cm,
)
import os
import platform
from pprint import pformat
import typing
from typing import (
Any,
TYPE_CHECKING,
)
import warnings
import trio
from tractor.ipc._transport import MsgTransport
from tractor.ipc._types import (
from ._types import (
transport_from_addr,
transport_from_stream,
)
@ -49,8 +49,15 @@ from tractor.log import get_logger
from tractor._exceptions import (
MsgTypeError,
pack_from_raise,
TransportClosed,
)
from tractor.msg import MsgCodec
from tractor.msg import (
Aid,
MsgCodec,
)
if TYPE_CHECKING:
from ._transport import MsgTransport
log = get_logger(__name__)
@ -86,8 +93,8 @@ class Channel:
# user in ``.from_stream()``.
self._transport: MsgTransport|None = transport
# set after handshake - always uid of far end
self.uid: tuple[str, str]|None = None
# set after handshake - always info from peer end
self.aid: Aid|None = None
self._aiter_msgs = self._iter_msgs()
self._exc: Exception|None = None
@ -99,6 +106,29 @@ class Channel:
# runtime.
self._cancel_called: bool = False
@property
def uid(self) -> tuple[str, str]:
'''
Peer actor's unique id.
'''
msg: str = (
f'`{type(self).__name__}.uid` is now deprecated.\n'
'Use the new `.aid: tractor.msg.Aid` (struct) instead '
'which also provides additional named (optional) fields '
'beyond just the `.name` and `.uuid`.'
)
warnings.warn(
msg,
DeprecationWarning,
stacklevel=2,
)
peer_aid: Aid = self.aid
return (
peer_aid.name,
peer_aid.uuid,
)
@property
def stream(self) -> trio.abc.Stream | None:
return self._transport.stream if self._transport else None
@ -182,9 +212,7 @@ class Channel:
f' _closed={self._closed}\n'
f' _cancel_called={self._cancel_called}\n'
f'\n'
f' |_runtime: Actor\n'
f' pid={os.getpid()}\n'
f' uid={self.uid}\n'
f' |_peer: {self.aid}\n'
f'\n'
f' |_msgstream: {tpt_name}\n'
f' proto={tpt.laddr.proto_key!r}\n'
@ -229,7 +257,7 @@ class Channel:
self,
payload: Any,
hide_tb: bool = False,
hide_tb: bool = True,
) -> None:
'''
@ -247,18 +275,27 @@ class Channel:
payload,
hide_tb=hide_tb,
)
except BaseException as _err:
except (
BaseException,
MsgTypeError,
TransportClosed,
)as _err:
err = _err # bind for introspection
if not isinstance(_err, MsgTypeError):
# assert err
__tracebackhide__: bool = False
else:
try:
assert err.cid
except KeyError:
raise err
match err:
case MsgTypeError():
try:
assert err.cid
except KeyError:
raise err
case TransportClosed():
log.transport(
f'Transport stream closed due to\n'
f'{err.repr_src_exc()}\n'
)
case _:
# never suppress non-tpt sources
__tracebackhide__: bool = False
raise
async def recv(self) -> Any:
@ -281,7 +318,7 @@ class Channel:
async def aclose(self) -> None:
log.transport(
f'Closing channel to {self.uid} '
f'Closing channel to {self.aid} '
f'{self.laddr} -> {self.raddr}'
)
assert self._transport
@ -381,6 +418,29 @@ class Channel:
def connected(self) -> bool:
return self._transport.connected() if self._transport else False
async def _do_handshake(
self,
aid: Aid,
) -> Aid:
'''
Exchange `(name, UUIDs)` identifiers as the first
communication step with any (peer) remote `Actor`.
These are essentially the "mailbox addresses" found in
"actor model" parlance.
'''
await self.send(aid)
peer_aid: Aid = await self.recv()
log.runtime(
f'Received hanshake with peer actor,\n'
f'{peer_aid}\n'
)
# NOTE, we always are referencing the remote peer!
self.aid = peer_aid
return peer_aid
@acm
async def _connect_chan(

View File

@ -0,0 +1,163 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
File-descriptor-sharing on `linux` by "wilhelm_of_bohemia".
'''
from __future__ import annotations
import os
import array
import socket
import tempfile
from pathlib import Path
from contextlib import ExitStack
import trio
import tractor
from tractor.ipc import RBToken
actor_name = 'ringd'
_rings: dict[str, dict] = {}
async def _attach_to_ring(
ring_name: str
) -> tuple[int, int, int]:
actor = tractor.current_actor()
fd_amount = 3
sock_path = (
Path(tempfile.gettempdir())
/
f'{os.getpid()}-pass-ring-fds-{ring_name}-to-{actor.name}.sock'
)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(sock_path)
sock.listen(1)
async with (
tractor.find_actor(actor_name) as ringd,
ringd.open_context(
_pass_fds,
name=ring_name,
sock_path=sock_path
) as (ctx, _sent)
):
# prepare array to receive FD
fds = array.array("i", [0] * fd_amount)
conn, _ = sock.accept()
# receive FD
msg, ancdata, flags, addr = conn.recvmsg(
1024,
socket.CMSG_LEN(fds.itemsize * fd_amount)
)
for (
cmsg_level,
cmsg_type,
cmsg_data,
) in ancdata:
if (
cmsg_level == socket.SOL_SOCKET
and
cmsg_type == socket.SCM_RIGHTS
):
fds.frombytes(cmsg_data[:fds.itemsize * fd_amount])
break
else:
raise RuntimeError("Receiver: No FDs received")
conn.close()
sock.close()
sock_path.unlink()
return RBToken.from_msg(
await ctx.wait_for_result()
)
@tractor.context
async def _pass_fds(
ctx: tractor.Context,
name: str,
sock_path: str
) -> RBToken:
global _rings
token = _rings[name]
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client.connect(sock_path)
await ctx.started()
fds = array.array('i', token.fds)
client.sendmsg([b'FDs'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
client.close()
return token
@tractor.context
async def _open_ringbuf(
ctx: tractor.Context,
name: str,
buf_size: int
) -> RBToken:
global _rings
is_owner = False
if name not in _rings:
stack = ExitStack()
token = stack.enter_context(
tractor.open_ringbuf(
name,
buf_size=buf_size
)
)
_rings[name] = {
'token': token,
'stack': stack,
}
is_owner = True
ring = _rings[name]
await ctx.started()
try:
await trio.sleep_forever()
except tractor.ContextCancelled:
...
finally:
if is_owner:
ring['stack'].close()
async def open_ringbuf(
name: str,
buf_size: int
) -> RBToken:
async with (
tractor.find_actor(actor_name) as ringd,
ringd.open_context(
_open_ringbuf,
name=name,
buf_size=buf_size
) as (rd_ctx, _)
):
yield await _attach_to_ring(name)
await rd_ctx.cancel()

File diff suppressed because it is too large Load Diff

View File

@ -18,18 +18,142 @@ TCP implementation of tractor.ipc._transport.MsgTransport protocol
'''
from __future__ import annotations
from typing import (
ClassVar,
)
# from contextlib import (
# asynccontextmanager as acm,
# )
import msgspec
import trio
from trio import (
SocketListener,
open_tcp_listeners,
)
from tractor.msg import MsgCodec
from tractor.log import get_logger
from tractor._addr import TCPAddress
from tractor.ipc._transport import MsgpackTransport
from tractor.ipc._transport import (
MsgTransport,
MsgpackTransport,
)
log = get_logger(__name__)
class TCPAddress(
msgspec.Struct,
frozen=True,
):
_host: str
_port: int
proto_key: ClassVar[str] = 'tcp'
unwrapped_type: ClassVar[type] = tuple[str, int]
def_bindspace: ClassVar[str] = '127.0.0.1'
@property
def is_valid(self) -> bool:
return self._port != 0
@property
def bindspace(self) -> str:
return self._host
@property
def domain(self) -> str:
return self._host
@classmethod
def from_addr(
cls,
addr: tuple[str, int]
) -> TCPAddress:
match addr:
case (str(), int()):
return TCPAddress(addr[0], addr[1])
case _:
raise ValueError(
f'Invalid unwrapped address for {cls}\n'
f'{addr}\n'
)
def unwrap(self) -> tuple[str, int]:
return (
self._host,
self._port,
)
@classmethod
def get_random(
cls,
bindspace: str = def_bindspace,
) -> TCPAddress:
return TCPAddress(bindspace, 0)
@classmethod
def get_root(cls) -> TCPAddress:
return TCPAddress(
'127.0.0.1',
1616,
)
def __repr__(self) -> str:
return (
f'{type(self).__name__}[{self.unwrap()}]'
)
@classmethod
def get_transport(
cls,
codec: str = 'msgpack',
) -> MsgTransport:
match codec:
case 'msgspack':
return MsgpackTCPStream
case _:
raise ValueError(
f'No IPC transport with {codec!r} supported !'
)
async def start_listener(
addr: TCPAddress,
**kwargs,
) -> SocketListener:
'''
Start a TCP socket listener on the given `TCPAddress`.
'''
log.info(
f'Attempting to bind TCP socket\n'
f'>[\n'
f'|_{addr}\n'
)
# ?TODO, maybe we should just change the lower-level call this is
# using internall per-listener?
listeners: list[SocketListener] = await open_tcp_listeners(
host=addr._host,
port=addr._port,
**kwargs
)
# NOTE, for now we don't expect non-singleton-resolving
# domain-addresses/multi-homed-hosts.
# (though it is supported by `open_tcp_listeners()`)
assert len(listeners) == 1
listener = listeners[0]
host, port = listener.socket.getsockname()[:2]
log.info(
f'Listening on TCP socket\n'
f'[>\n'
f' |_{addr}\n'
)
return listener
# TODO: typing oddity.. not sure why we have to inherit here, but it
# seems to be an issue with `get_msg_transport()` returning
# a `Type[Protocol]`; probably should make a `mypy` issue?

View File

@ -14,8 +14,8 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
typing.Protocol based generic msg API, implement this class to add backends for
tractor.ipc.Channel
typing.Protocol based generic msg API, implement this class to add
backends for tractor.ipc.Channel
'''
from __future__ import annotations
@ -23,8 +23,9 @@ from typing import (
runtime_checkable,
Type,
Protocol,
TypeVar,
ClassVar
# TypeVar,
ClassVar,
TYPE_CHECKING,
)
from collections.abc import (
AsyncGenerator,
@ -47,10 +48,13 @@ from tractor.msg import (
_ctxvar_MsgCodec,
# _codec, XXX see `self._codec` sanity/debug checks
MsgCodec,
MsgType,
types as msgtypes,
pretty_struct,
)
from tractor._addr import Address
if TYPE_CHECKING:
from tractor._addr import Address
log = get_logger(__name__)
@ -63,12 +67,13 @@ MsgTransportKey = tuple[str, str]
# ?TODO? this should be our `Union[*msgtypes.__spec__]` alias now right..?
# => BLEH, except can't bc prots must inherit typevar or param-spec
# vars..
MsgType = TypeVar('MsgType')
# MsgType = TypeVar('MsgType')
@runtime_checkable
class MsgTransport(Protocol[MsgType]):
class MsgTransport(Protocol):
#
# class MsgTransport(Protocol[MsgType]):
# ^-TODO-^ consider using a generic def and indexing with our
# eventual msg definition/types?
# - https://docs.python.org/3/library/typing.html#typing.Protocol
@ -99,7 +104,10 @@ class MsgTransport(Protocol[MsgType]):
@classmethod
def key(cls) -> MsgTransportKey:
return cls.codec_key, cls.address_type.proto_key
return (
cls.codec_key,
cls.address_type.proto_key,
)
@property
def laddr(self) -> Address:
@ -130,8 +138,8 @@ class MsgTransport(Protocol[MsgType]):
Address # remote
]:
'''
Return the `trio` streaming transport prot's addrs for both
the local and remote sides as a pair.
Return the transport protocol's address pair for the local
and remote-peer side.
'''
...
@ -208,6 +216,7 @@ class MsgpackTransport(MsgTransport):
'''
decodes_failed: int = 0
tpt_name: str = f'{type(self).__name__!r}'
while True:
try:
header: bytes = await self.recv_stream.receive_exactly(4)
@ -252,10 +261,9 @@ class MsgpackTransport(MsgTransport):
raise TransportClosed(
message=(
f'IPC transport already closed by peer\n'
f'x)> {type(trans_err)}\n'
f' |_{self}\n'
f'{tpt_name} already closed by peer\n'
),
src_exc=trans_err,
loglevel=loglevel,
) from trans_err
@ -267,18 +275,17 @@ class MsgpackTransport(MsgTransport):
#
# NOTE: as such we always re-raise this error from the
# RPC msg loop!
except trio.ClosedResourceError as closure_err:
except trio.ClosedResourceError as cre:
closure_err = cre
raise TransportClosed(
message=(
f'IPC transport already manually closed locally?\n'
f'x)> {type(closure_err)} \n'
f' |_{self}\n'
f'{tpt_name} was already closed locally ?\n'
),
src_exc=closure_err,
loglevel='error',
raise_on_report=(
closure_err.args[0] == 'another task closed this fd'
or
closure_err.args[0] in ['another task closed this fd']
'another task closed this fd' in closure_err.args
),
) from closure_err
@ -286,12 +293,9 @@ class MsgpackTransport(MsgTransport):
if header == b'':
raise TransportClosed(
message=(
f'IPC transport already gracefully closed\n'
f')>\n'
f'|_{self}\n'
f'{tpt_name} already gracefully closed\n'
),
loglevel='transport',
# cause=??? # handy or no?
)
size: int
@ -363,7 +367,7 @@ class MsgpackTransport(MsgTransport):
msg: msgtypes.MsgType,
strict_types: bool = True,
hide_tb: bool = False,
hide_tb: bool = True,
) -> None:
'''
@ -426,8 +430,9 @@ class MsgpackTransport(MsgTransport):
return await self.stream.send_all(size + bytes_data)
except (
trio.BrokenResourceError,
) as trans_err:
loglevel = 'transport'
) as bre:
trans_err = bre
tpt_name: str = f'{type(self).__name__!r}'
match trans_err:
case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe' in trans_err.args[0]
@ -438,21 +443,22 @@ class MsgpackTransport(MsgTransport):
# as it pertains to rando pings from the
# `.discovery` subsys and protos.
):
raise TransportClosed(
raise TransportClosed.from_src_exc(
message=(
f'IPC transport already closed by peer\n'
f'x)> {type(trans_err)}\n'
f' |_{self}\n'
f'{tpt_name} already closed by peer\n'
),
loglevel=loglevel,
) from trans_err
body=f'{self}\n',
src_exc=trans_err,
raise_on_report=True,
loglevel='transport',
) from bre
# unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn
# about it.
case _:
log.exception(
'Transport layer failed for {self.transport!r} ?\n'
'{tpt_name} layer failed pre-send ??\n'
)
raise trans_err
@ -497,11 +503,11 @@ class MsgpackTransport(MsgTransport):
def pformat(self) -> str:
return (
f'<{type(self).__name__}(\n'
f' |_task: {self._task}\n'
f'\n'
f' |_peers: 2\n'
f' laddr: {self._laddr}\n'
f' raddr: {self._raddr}\n'
# f'\n'
f' |_task: {self._task}\n'
f')>\n'
)

View File

@ -13,19 +13,37 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Type
'''
IPC subsys type-lookup helpers?
'''
from typing import (
Type,
# TYPE_CHECKING,
)
import trio
import socket
from tractor._addr import Address
from tractor.ipc._transport import (
MsgTransportKey,
MsgTransport
)
from tractor.ipc._tcp import MsgpackTCPStream
from tractor.ipc._uds import MsgpackUDSStream
from tractor.ipc._tcp import (
TCPAddress,
MsgpackTCPStream,
)
from tractor.ipc._uds import (
UDSAddress,
MsgpackUDSStream,
)
# if TYPE_CHECKING:
# from tractor._addr import Address
Address = TCPAddress|UDSAddress
# manually updated list of all supported msg transport types
_msg_transports = [
@ -35,15 +53,21 @@ _msg_transports = [
# convert a MsgTransportKey to the corresponding transport type
_key_to_transport: dict[MsgTransportKey, Type[MsgTransport]] = {
cls.key(): cls
for cls in _msg_transports
_key_to_transport: dict[
MsgTransportKey,
Type[MsgTransport],
] = {
('msgpack', 'tcp'): MsgpackTCPStream,
('msgpack', 'uds'): MsgpackUDSStream,
}
# convert an Address wrapper to its corresponding transport type
_addr_to_transport: dict[Type[Address], Type[MsgTransport]] = {
cls.address_type: cls
for cls in _msg_transports
_addr_to_transport: dict[
Type[TCPAddress|UDSAddress],
Type[MsgTransport]
] = {
TCPAddress: MsgpackTCPStream,
UDSAddress: MsgpackUDSStream,
}

View File

@ -21,7 +21,6 @@ from __future__ import annotations
from pathlib import Path
import os
from socket import (
# socket,
AF_UNIX,
SOCK_STREAM,
SO_PASSCRED,
@ -29,8 +28,17 @@ from socket import (
SOL_SOCKET,
)
import struct
from typing import (
TYPE_CHECKING,
ClassVar,
)
import msgspec
import trio
from trio import (
socket,
SocketListener,
)
from trio._highlevel_open_unix_stream import (
close_on_error,
has_unix,
@ -38,13 +46,214 @@ from trio._highlevel_open_unix_stream import (
from tractor.msg import MsgCodec
from tractor.log import get_logger
from tractor._addr import UDSAddress
from tractor.ipc._transport import MsgpackTransport
from tractor.ipc._transport import (
MsgpackTransport,
)
from .._state import (
get_rt_dir,
current_actor,
is_root_process,
)
if TYPE_CHECKING:
from ._runtime import Actor
log = get_logger(__name__)
def unwrap_sockpath(
sockpath: Path,
) -> tuple[Path, Path]:
return (
sockpath.parent,
sockpath.name,
)
class UDSAddress(
msgspec.Struct,
frozen=True,
):
filedir: str|Path|None
filename: str|Path
maybe_pid: int|None = None
# TODO, maybe we should use better field and value
# -[x] really this is a `.protocol_key` not a "name" of anything.
# -[ ] consider a 'unix' proto-key instead?
# -[ ] need to check what other mult-transport frameworks do
# like zmq, nng, uri-spec et al!
proto_key: ClassVar[str] = 'uds'
unwrapped_type: ClassVar[type] = tuple[str, int]
def_bindspace: ClassVar[Path] = get_rt_dir()
@property
def bindspace(self) -> Path:
'''
We replicate the "ip-set-of-hosts" part of a UDS socket as
just the sub-directory in which we allocate socket files.
'''
return (
self.filedir
or
self.def_bindspace
# or
# get_rt_dir()
)
@property
def sockpath(self) -> Path:
return self.bindspace / self.filename
@property
def is_valid(self) -> bool:
'''
We block socket files not allocated under the runtime subdir.
'''
return self.bindspace in self.sockpath.parents
@classmethod
def from_addr(
cls,
addr: (
tuple[Path|str, Path|str]|Path|str
),
) -> UDSAddress:
match addr:
case tuple()|list():
filedir = Path(addr[0])
filename = Path(addr[1])
return UDSAddress(
filedir=filedir,
filename=filename,
# maybe_pid=pid,
)
# NOTE, in case we ever decide to just `.unwrap()`
# to a `Path|str`?
case str()|Path():
sockpath: Path = Path(addr)
return UDSAddress(*unwrap_sockpath(sockpath))
case _:
# import pdbp; pdbp.set_trace()
raise TypeError(
f'Bad unwrapped-address for {cls} !\n'
f'{addr!r}\n'
)
def unwrap(self) -> tuple[str, int]:
# XXX NOTE, since this gets passed DIRECTLY to
# `.ipc._uds.open_unix_socket_w_passcred()`
return (
str(self.filedir),
str(self.filename),
)
@classmethod
def get_random(
cls,
bindspace: Path|None = None, # default netns
) -> UDSAddress:
filedir: Path = bindspace or cls.def_bindspace
pid: int = os.getpid()
actor: Actor|None = current_actor(
err_on_no_runtime=False,
)
if actor:
sockname: str = '::'.join(actor.uid) + f'@{pid}'
else:
prefix: str = '<unknown-actor>'
if is_root_process():
prefix: str = 'root'
sockname: str = f'{prefix}@{pid}'
sockpath: Path = Path(f'{sockname}.sock')
return UDSAddress(
filedir=filedir,
filename=sockpath,
maybe_pid=pid,
)
@classmethod
def get_root(cls) -> UDSAddress:
def_uds_filename: Path = 'registry@1616.sock'
return UDSAddress(
filedir=cls.def_bindspace,
filename=def_uds_filename,
# maybe_pid=1616,
)
# ?TODO, maybe we should just our .msg.pretty_struct.Struct` for
# this instead?
# -[ ] is it too "multi-line"y tho?
# the compact tuple/.unwrapped() form is simple enough?
#
def __repr__(self) -> str:
if not (pid := self.maybe_pid):
pid: str = '<unknown-peer-pid>'
body: str = (
f'({self.filedir}, {self.filename}, {pid})'
)
return (
f'{type(self).__name__}'
f'['
f'{body}'
f']'
)
async def start_listener(
addr: UDSAddress,
**kwargs,
) -> SocketListener:
# sock = addr._sock = socket.socket(
sock = socket.socket(
socket.AF_UNIX,
socket.SOCK_STREAM
)
log.info(
f'Attempting to bind UDS socket\n'
f'>[\n'
f'|_{addr}\n'
)
bindpath: Path = addr.sockpath
try:
await sock.bind(str(bindpath))
except (
FileNotFoundError,
) as fdne:
raise ConnectionError(
f'Bad UDS socket-filepath-as-address ??\n'
f'{addr}\n'
f' |_sockpath: {addr.sockpath}\n'
) from fdne
sock.listen(1)
log.info(
f'Listening on UDS socket\n'
f'[>\n'
f' |_{addr}\n'
)
return SocketListener(sock)
def close_listener(
addr: UDSAddress,
lstnr: SocketListener,
) -> None:
'''
Close and remove the listening unix socket's path.
'''
lstnr.socket.close()
os.unlink(addr.sockpath)
async def open_unix_socket_w_passcred(
filename: str|bytes|os.PathLike[str]|os.PathLike[bytes],
) -> trio.SocketStream:
@ -139,20 +348,28 @@ class MsgpackUDSStream(MsgpackTransport):
**kwargs
) -> MsgpackUDSStream:
filepath: Path
pid: int
(
filepath,
pid,
) = addr.unwrap()
# XXX NOTE, we don't need to provide the `.pid` part from
# the addr since the OS does this implicitly! .. lel
# stream = await trio.open_unix_socket(
stream = await open_unix_socket_w_passcred(
str(filepath),
**kwargs
)
sockpath: Path = addr.sockpath
#
# ^XXX NOTE, we don't provide any out-of-band `.pid` info
# (like, over the socket as extra msgs) since the (augmented)
# `.setsockopt()` call tells the OS provide it; the client
# pid can then be read on server/listen() side via
# `get_peer_info()` above.
try:
stream = await open_unix_socket_w_passcred(
str(sockpath),
**kwargs
)
except (
FileNotFoundError,
) as fdne:
raise ConnectionError(
f'Bad UDS socket-filepath-as-address ??\n'
f'{addr}\n'
f' |_sockpath: {sockpath}\n'
) from fdne
stream = MsgpackUDSStream(
stream,
prefix_size=prefix_size,
@ -186,16 +403,20 @@ class MsgpackUDSStream(MsgpackTransport):
case (bytes(), str()):
sock_path: Path = Path(sockname)
(
pid,
uid,
gid,
peer_pid,
_,
_,
) = get_peer_info(sock)
laddr = UDSAddress.from_addr((
sock_path,
os.getpid(),
))
raddr = UDSAddress.from_addr((
sock_path,
pid
))
filedir, filename = unwrap_sockpath(sock_path)
laddr = UDSAddress(
filedir=filedir,
filename=filename,
maybe_pid=os.getpid(),
)
raddr = UDSAddress(
filedir=filedir,
filename=filename,
maybe_pid=peer_pid
)
return (laddr, raddr)

View File

@ -48,7 +48,7 @@ from tractor.msg import (
pretty_struct,
)
from tractor.log import get_logger
from tractor._addr import UnwrappedAddress
# from tractor._addr import UnwrappedAddress
log = get_logger('tractor.msgspec')
@ -176,8 +176,8 @@ class SpawnSpec(
# TODO: not just sockaddr pairs?
# -[ ] abstract into a `TransportAddr` type?
reg_addrs: list[UnwrappedAddress]
bind_addrs: list[UnwrappedAddress]|None
reg_addrs: list[tuple[str, str|int]]
bind_addrs: list[tuple[str, str|int]]|None
# TODO: caps based RPC support in the payload?