tractor/tractor/_portal.py

572 lines
18 KiB
Python
Raw Normal View History

Re-license code base for distribution under AGPL This commit obviously denotes a re-license of all applicable parts of the code base. Acknowledgement of this change was completed in #274 by the majority of the current set of contributors. From here henceforth all changes will be AGPL licensed and distributed. This is purely an effort to maintain the same copy-left policy whilst closing the (perceived) SaaS loophole the GPL allows for. It is merely for this loophole: to avoid code hiding by any potential "network providers" who are attempting to use the project to make a profit without either compensating the authors or re-distributing their changes. I thought quite a bit about this change and can't see a reason not to close the SaaS loophole in our current license. We still are (hard) copy-left and I plan to keep the code base this way for a couple reasons: - The code base produces income/profit through parent projects and is demonstrably of high value. - I believe firms should not get free lunch for the sake of "contributions from their employees" or "usage as a service" which I have found to be a dubious argument at best. - If a firm who intends to profit from the code base wants to use it they can propose a secondary commercial license to purchase with the proceeds going to the project's authors under some form of well defined contract. - Many successful projects like Qt use this model; I see no reason it can't work in this case until such a time as the authors feel it should be loosened. There has been detailed discussion in #103 on licensing alternatives. The main point of this AGPL change is to protect the code base for the time being from exploitation while it grows and as we move into the next phase of development which will include extension into the multi-host distributed software space.
2021-12-13 18:08:32 +00:00
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Memory "portal" contruct.
"Memory portals" are both an API and set of IPC wrapping primitives
for managing structured concurrency "cancel-scope linked" tasks
running in disparate virtual memory domains - at least in different
OS processes, possibly on different (hardware) hosts.
'''
from __future__ import annotations
from contextlib import asynccontextmanager as acm
2018-07-14 20:09:05 +00:00
import importlib
import inspect
from typing import (
Any,
Callable,
AsyncGenerator,
First draft "payload receiver in a new `.msg._ops` As per much tinkering, re-designs and preceding rubber-ducking via many "commit msg novelas", **finally** this adds the (hopefully) final missing layer for typed msg safety: `tractor.msg._ops.PldRx` (or `PayloadReceiver`? haven't decided how verbose to go..) Design justification summary: ------ - ------ - need a way to be as-close-as-possible to the `tractor`-application such that when `MsgType.pld: PayloadT` validation takes place, it is straightforward and obvious how user code can decide to handle any resulting `MsgTypeError`. - there should be a common and optional-yet-modular way to modify **how** data delivered via IPC (possibly embedded as user defined, type-constrained `.pld: msgspec.Struct`s) can be handled and processed during fault conditions and/or IPC "msg attacks". - support for nested type constraints within a `MsgType.pld` field should be simple to define, implement and understand at runtime. - a layer between the app-level IPC primitive APIs (`Context`/`MsgStream`) and application-task code (consumer code of those APIs) should be easily customized and prove-to-be-as-such through demonstrably rigorous internal (sub-sys) use! -> eg. via seemless runtime RPC eps support like `Actor.cancel()` -> by correctly implementing our `.devx._debug.Lock` REPL TTY mgmt dialog prot, via a dead simple payload-as-ctl-msg-spec. There are some fairly detailed doc strings included so I won't duplicate that content, the majority of the work here is actually somewhat of a factoring of many similar blocks that are doing more or less the same `msg = await Context._rx_chan.receive()` with boilerplate for `Error`/`Stop` handling via `_raise_from_no_key_in_msg()`. The new `PldRx` basically provides a shim layer for this common "receive msg, decode its payload, yield it up to the consuming app task" by pairing the RPC feeder mem-chan with a msg-payload decoder and expecting IPC API internals to use **one** API instead of re-implementing the same pattern all over the place XD `PldRx` breakdown ------ - ------ - for now only expects a `._msgdec: MsgDec` which allows for override-able `MsgType.pld` validation and most obviously used in the impl of `.dec_msg()`, the decode message method. - provides multiple mem-chan receive options including: |_ `.recv_pld()` which does the e2e operation of receiving a payload item. |_ a sync `.recv_pld_nowait()` version. |_ a `.recv_msg_w_pld()` which optionally allows retreiving both the shuttling `MsgType` as well as it's `.pld` body for use cases where info on both is important (eg. draining a `MsgStream`). Dirty internal changeover/implementation deatz: ------ - ------ - obvi move over all the IPC "primitives" that previously had the duplicate recv-n-yield logic: - `MsgStream.receive[_nowait]()` delegating instead to the equivalent `PldRx.recv_pld[_nowait]()`. - add `Context._pld_rx: PldRx`, created and passed in by `mk_context()`; use it for the `.started()` -> `first: Started` retrieval inside `open_context_from_portal()`. - all the relevant `Portal` invocation methods: `.result()`, `.run_from_ns()`, `.run()`; also allows for dropping `_unwrap_msg()` and `.Portal_return_once()` outright Bo - rename `Context.ctx._recv_chan` -> `._rx_chan`. - add detailed `Context._scope` info for logging whether or not it's cancelled inside `_maybe_cancel_and_set_remote_error()`. - move `._context._drain_to_final_msg()` -> `._ops.drain_to_final_msg()` since it's really not necessarily ctx specific per say, and it does kinda fit with "msg operations" more abstractly ;)
2024-04-23 21:43:45 +00:00
TYPE_CHECKING,
)
from functools import partial
from dataclasses import dataclass
import warnings
2018-07-14 20:09:05 +00:00
import trio
from .trionics import maybe_open_nursery
2024-02-19 22:00:46 +00:00
from ._state import (
current_actor,
)
2018-08-31 21:16:24 +00:00
from ._ipc import Channel
2018-07-14 20:09:05 +00:00
from .log import get_logger
2024-04-02 17:41:52 +00:00
from .msg import (
First draft "payload receiver in a new `.msg._ops` As per much tinkering, re-designs and preceding rubber-ducking via many "commit msg novelas", **finally** this adds the (hopefully) final missing layer for typed msg safety: `tractor.msg._ops.PldRx` (or `PayloadReceiver`? haven't decided how verbose to go..) Design justification summary: ------ - ------ - need a way to be as-close-as-possible to the `tractor`-application such that when `MsgType.pld: PayloadT` validation takes place, it is straightforward and obvious how user code can decide to handle any resulting `MsgTypeError`. - there should be a common and optional-yet-modular way to modify **how** data delivered via IPC (possibly embedded as user defined, type-constrained `.pld: msgspec.Struct`s) can be handled and processed during fault conditions and/or IPC "msg attacks". - support for nested type constraints within a `MsgType.pld` field should be simple to define, implement and understand at runtime. - a layer between the app-level IPC primitive APIs (`Context`/`MsgStream`) and application-task code (consumer code of those APIs) should be easily customized and prove-to-be-as-such through demonstrably rigorous internal (sub-sys) use! -> eg. via seemless runtime RPC eps support like `Actor.cancel()` -> by correctly implementing our `.devx._debug.Lock` REPL TTY mgmt dialog prot, via a dead simple payload-as-ctl-msg-spec. There are some fairly detailed doc strings included so I won't duplicate that content, the majority of the work here is actually somewhat of a factoring of many similar blocks that are doing more or less the same `msg = await Context._rx_chan.receive()` with boilerplate for `Error`/`Stop` handling via `_raise_from_no_key_in_msg()`. The new `PldRx` basically provides a shim layer for this common "receive msg, decode its payload, yield it up to the consuming app task" by pairing the RPC feeder mem-chan with a msg-payload decoder and expecting IPC API internals to use **one** API instead of re-implementing the same pattern all over the place XD `PldRx` breakdown ------ - ------ - for now only expects a `._msgdec: MsgDec` which allows for override-able `MsgType.pld` validation and most obviously used in the impl of `.dec_msg()`, the decode message method. - provides multiple mem-chan receive options including: |_ `.recv_pld()` which does the e2e operation of receiving a payload item. |_ a sync `.recv_pld_nowait()` version. |_ a `.recv_msg_w_pld()` which optionally allows retreiving both the shuttling `MsgType` as well as it's `.pld` body for use cases where info on both is important (eg. draining a `MsgStream`). Dirty internal changeover/implementation deatz: ------ - ------ - obvi move over all the IPC "primitives" that previously had the duplicate recv-n-yield logic: - `MsgStream.receive[_nowait]()` delegating instead to the equivalent `PldRx.recv_pld[_nowait]()`. - add `Context._pld_rx: PldRx`, created and passed in by `mk_context()`; use it for the `.started()` -> `first: Started` retrieval inside `open_context_from_portal()`. - all the relevant `Portal` invocation methods: `.result()`, `.run_from_ns()`, `.run()`; also allows for dropping `_unwrap_msg()` and `.Portal_return_once()` outright Bo - rename `Context.ctx._recv_chan` -> `._rx_chan`. - add detailed `Context._scope` info for logging whether or not it's cancelled inside `_maybe_cancel_and_set_remote_error()`. - move `._context._drain_to_final_msg()` -> `._ops.drain_to_final_msg()` since it's really not necessarily ctx specific per say, and it does kinda fit with "msg operations" more abstractly ;)
2024-04-23 21:43:45 +00:00
# Error,
PayloadMsg,
2024-04-02 17:41:52 +00:00
NamespacePath,
Return,
)
from ._exceptions import (
First draft "payload receiver in a new `.msg._ops` As per much tinkering, re-designs and preceding rubber-ducking via many "commit msg novelas", **finally** this adds the (hopefully) final missing layer for typed msg safety: `tractor.msg._ops.PldRx` (or `PayloadReceiver`? haven't decided how verbose to go..) Design justification summary: ------ - ------ - need a way to be as-close-as-possible to the `tractor`-application such that when `MsgType.pld: PayloadT` validation takes place, it is straightforward and obvious how user code can decide to handle any resulting `MsgTypeError`. - there should be a common and optional-yet-modular way to modify **how** data delivered via IPC (possibly embedded as user defined, type-constrained `.pld: msgspec.Struct`s) can be handled and processed during fault conditions and/or IPC "msg attacks". - support for nested type constraints within a `MsgType.pld` field should be simple to define, implement and understand at runtime. - a layer between the app-level IPC primitive APIs (`Context`/`MsgStream`) and application-task code (consumer code of those APIs) should be easily customized and prove-to-be-as-such through demonstrably rigorous internal (sub-sys) use! -> eg. via seemless runtime RPC eps support like `Actor.cancel()` -> by correctly implementing our `.devx._debug.Lock` REPL TTY mgmt dialog prot, via a dead simple payload-as-ctl-msg-spec. There are some fairly detailed doc strings included so I won't duplicate that content, the majority of the work here is actually somewhat of a factoring of many similar blocks that are doing more or less the same `msg = await Context._rx_chan.receive()` with boilerplate for `Error`/`Stop` handling via `_raise_from_no_key_in_msg()`. The new `PldRx` basically provides a shim layer for this common "receive msg, decode its payload, yield it up to the consuming app task" by pairing the RPC feeder mem-chan with a msg-payload decoder and expecting IPC API internals to use **one** API instead of re-implementing the same pattern all over the place XD `PldRx` breakdown ------ - ------ - for now only expects a `._msgdec: MsgDec` which allows for override-able `MsgType.pld` validation and most obviously used in the impl of `.dec_msg()`, the decode message method. - provides multiple mem-chan receive options including: |_ `.recv_pld()` which does the e2e operation of receiving a payload item. |_ a sync `.recv_pld_nowait()` version. |_ a `.recv_msg_w_pld()` which optionally allows retreiving both the shuttling `MsgType` as well as it's `.pld` body for use cases where info on both is important (eg. draining a `MsgStream`). Dirty internal changeover/implementation deatz: ------ - ------ - obvi move over all the IPC "primitives" that previously had the duplicate recv-n-yield logic: - `MsgStream.receive[_nowait]()` delegating instead to the equivalent `PldRx.recv_pld[_nowait]()`. - add `Context._pld_rx: PldRx`, created and passed in by `mk_context()`; use it for the `.started()` -> `first: Started` retrieval inside `open_context_from_portal()`. - all the relevant `Portal` invocation methods: `.result()`, `.run_from_ns()`, `.run()`; also allows for dropping `_unwrap_msg()` and `.Portal_return_once()` outright Bo - rename `Context.ctx._recv_chan` -> `._rx_chan`. - add detailed `Context._scope` info for logging whether or not it's cancelled inside `_maybe_cancel_and_set_remote_error()`. - move `._context._drain_to_final_msg()` -> `._ops.drain_to_final_msg()` since it's really not necessarily ctx specific per say, and it does kinda fit with "msg operations" more abstractly ;)
2024-04-23 21:43:45 +00:00
# unpack_error,
NoResult,
)
from ._context import (
Context,
open_context_from_portal,
)
from ._streaming import (
MsgStream,
)
2018-07-14 20:09:05 +00:00
First draft "payload receiver in a new `.msg._ops` As per much tinkering, re-designs and preceding rubber-ducking via many "commit msg novelas", **finally** this adds the (hopefully) final missing layer for typed msg safety: `tractor.msg._ops.PldRx` (or `PayloadReceiver`? haven't decided how verbose to go..) Design justification summary: ------ - ------ - need a way to be as-close-as-possible to the `tractor`-application such that when `MsgType.pld: PayloadT` validation takes place, it is straightforward and obvious how user code can decide to handle any resulting `MsgTypeError`. - there should be a common and optional-yet-modular way to modify **how** data delivered via IPC (possibly embedded as user defined, type-constrained `.pld: msgspec.Struct`s) can be handled and processed during fault conditions and/or IPC "msg attacks". - support for nested type constraints within a `MsgType.pld` field should be simple to define, implement and understand at runtime. - a layer between the app-level IPC primitive APIs (`Context`/`MsgStream`) and application-task code (consumer code of those APIs) should be easily customized and prove-to-be-as-such through demonstrably rigorous internal (sub-sys) use! -> eg. via seemless runtime RPC eps support like `Actor.cancel()` -> by correctly implementing our `.devx._debug.Lock` REPL TTY mgmt dialog prot, via a dead simple payload-as-ctl-msg-spec. There are some fairly detailed doc strings included so I won't duplicate that content, the majority of the work here is actually somewhat of a factoring of many similar blocks that are doing more or less the same `msg = await Context._rx_chan.receive()` with boilerplate for `Error`/`Stop` handling via `_raise_from_no_key_in_msg()`. The new `PldRx` basically provides a shim layer for this common "receive msg, decode its payload, yield it up to the consuming app task" by pairing the RPC feeder mem-chan with a msg-payload decoder and expecting IPC API internals to use **one** API instead of re-implementing the same pattern all over the place XD `PldRx` breakdown ------ - ------ - for now only expects a `._msgdec: MsgDec` which allows for override-able `MsgType.pld` validation and most obviously used in the impl of `.dec_msg()`, the decode message method. - provides multiple mem-chan receive options including: |_ `.recv_pld()` which does the e2e operation of receiving a payload item. |_ a sync `.recv_pld_nowait()` version. |_ a `.recv_msg_w_pld()` which optionally allows retreiving both the shuttling `MsgType` as well as it's `.pld` body for use cases where info on both is important (eg. draining a `MsgStream`). Dirty internal changeover/implementation deatz: ------ - ------ - obvi move over all the IPC "primitives" that previously had the duplicate recv-n-yield logic: - `MsgStream.receive[_nowait]()` delegating instead to the equivalent `PldRx.recv_pld[_nowait]()`. - add `Context._pld_rx: PldRx`, created and passed in by `mk_context()`; use it for the `.started()` -> `first: Started` retrieval inside `open_context_from_portal()`. - all the relevant `Portal` invocation methods: `.result()`, `.run_from_ns()`, `.run()`; also allows for dropping `_unwrap_msg()` and `.Portal_return_once()` outright Bo - rename `Context.ctx._recv_chan` -> `._rx_chan`. - add detailed `Context._scope` info for logging whether or not it's cancelled inside `_maybe_cancel_and_set_remote_error()`. - move `._context._drain_to_final_msg()` -> `._ops.drain_to_final_msg()` since it's really not necessarily ctx specific per say, and it does kinda fit with "msg operations" more abstractly ;)
2024-04-23 21:43:45 +00:00
if TYPE_CHECKING:
from ._runtime import Actor
2018-07-14 20:09:05 +00:00
log = get_logger(__name__)
2018-07-14 20:09:05 +00:00
class Portal:
2021-11-29 13:40:59 +00:00
'''
A 'portal' to a memory-domain-separated `Actor`.
2018-07-14 20:09:05 +00:00
A portal is "opened" (and eventually closed) by one side of an
inter-actor communication context. The side which opens the portal
is equivalent to a "caller" in function parlance and usually is
either the called actor's parent (in process tree hierarchy terms)
or a client interested in scheduling work to be done remotely in a
process which has a separate (virtual) memory domain.
The portal api allows the "caller" actor to invoke remote routines
and receive results through an underlying ``tractor.Channel`` as
though the remote (async) function / generator was called locally.
It may be thought of loosely as an RPC api where native Python
function calling semantics are supported transparently; hence it is
like having a "portal" between the seperate actor memory spaces.
2018-07-14 20:09:05 +00:00
2021-11-29 13:40:59 +00:00
'''
# global timeout for remote cancel requests sent to
# connected (peer) actors.
cancel_timeout: float = 0.5
First draft "payload receiver in a new `.msg._ops` As per much tinkering, re-designs and preceding rubber-ducking via many "commit msg novelas", **finally** this adds the (hopefully) final missing layer for typed msg safety: `tractor.msg._ops.PldRx` (or `PayloadReceiver`? haven't decided how verbose to go..) Design justification summary: ------ - ------ - need a way to be as-close-as-possible to the `tractor`-application such that when `MsgType.pld: PayloadT` validation takes place, it is straightforward and obvious how user code can decide to handle any resulting `MsgTypeError`. - there should be a common and optional-yet-modular way to modify **how** data delivered via IPC (possibly embedded as user defined, type-constrained `.pld: msgspec.Struct`s) can be handled and processed during fault conditions and/or IPC "msg attacks". - support for nested type constraints within a `MsgType.pld` field should be simple to define, implement and understand at runtime. - a layer between the app-level IPC primitive APIs (`Context`/`MsgStream`) and application-task code (consumer code of those APIs) should be easily customized and prove-to-be-as-such through demonstrably rigorous internal (sub-sys) use! -> eg. via seemless runtime RPC eps support like `Actor.cancel()` -> by correctly implementing our `.devx._debug.Lock` REPL TTY mgmt dialog prot, via a dead simple payload-as-ctl-msg-spec. There are some fairly detailed doc strings included so I won't duplicate that content, the majority of the work here is actually somewhat of a factoring of many similar blocks that are doing more or less the same `msg = await Context._rx_chan.receive()` with boilerplate for `Error`/`Stop` handling via `_raise_from_no_key_in_msg()`. The new `PldRx` basically provides a shim layer for this common "receive msg, decode its payload, yield it up to the consuming app task" by pairing the RPC feeder mem-chan with a msg-payload decoder and expecting IPC API internals to use **one** API instead of re-implementing the same pattern all over the place XD `PldRx` breakdown ------ - ------ - for now only expects a `._msgdec: MsgDec` which allows for override-able `MsgType.pld` validation and most obviously used in the impl of `.dec_msg()`, the decode message method. - provides multiple mem-chan receive options including: |_ `.recv_pld()` which does the e2e operation of receiving a payload item. |_ a sync `.recv_pld_nowait()` version. |_ a `.recv_msg_w_pld()` which optionally allows retreiving both the shuttling `MsgType` as well as it's `.pld` body for use cases where info on both is important (eg. draining a `MsgStream`). Dirty internal changeover/implementation deatz: ------ - ------ - obvi move over all the IPC "primitives" that previously had the duplicate recv-n-yield logic: - `MsgStream.receive[_nowait]()` delegating instead to the equivalent `PldRx.recv_pld[_nowait]()`. - add `Context._pld_rx: PldRx`, created and passed in by `mk_context()`; use it for the `.started()` -> `first: Started` retrieval inside `open_context_from_portal()`. - all the relevant `Portal` invocation methods: `.result()`, `.run_from_ns()`, `.run()`; also allows for dropping `_unwrap_msg()` and `.Portal_return_once()` outright Bo - rename `Context.ctx._recv_chan` -> `._rx_chan`. - add detailed `Context._scope` info for logging whether or not it's cancelled inside `_maybe_cancel_and_set_remote_error()`. - move `._context._drain_to_final_msg()` -> `._ops.drain_to_final_msg()` since it's really not necessarily ctx specific per say, and it does kinda fit with "msg operations" more abstractly ;)
2024-04-23 21:43:45 +00:00
def __init__(
self,
channel: Channel,
) -> None:
self._chan: Channel = channel
2018-08-01 19:15:18 +00:00
# during the portal's lifetime
self._final_result_pld: Any|None = None
self._final_result_msg: PayloadMsg|None = None
# When set to a ``Context`` (when _submit_for_result is called)
# it is expected that ``result()`` will be awaited at some
# point.
First draft "payload receiver in a new `.msg._ops` As per much tinkering, re-designs and preceding rubber-ducking via many "commit msg novelas", **finally** this adds the (hopefully) final missing layer for typed msg safety: `tractor.msg._ops.PldRx` (or `PayloadReceiver`? haven't decided how verbose to go..) Design justification summary: ------ - ------ - need a way to be as-close-as-possible to the `tractor`-application such that when `MsgType.pld: PayloadT` validation takes place, it is straightforward and obvious how user code can decide to handle any resulting `MsgTypeError`. - there should be a common and optional-yet-modular way to modify **how** data delivered via IPC (possibly embedded as user defined, type-constrained `.pld: msgspec.Struct`s) can be handled and processed during fault conditions and/or IPC "msg attacks". - support for nested type constraints within a `MsgType.pld` field should be simple to define, implement and understand at runtime. - a layer between the app-level IPC primitive APIs (`Context`/`MsgStream`) and application-task code (consumer code of those APIs) should be easily customized and prove-to-be-as-such through demonstrably rigorous internal (sub-sys) use! -> eg. via seemless runtime RPC eps support like `Actor.cancel()` -> by correctly implementing our `.devx._debug.Lock` REPL TTY mgmt dialog prot, via a dead simple payload-as-ctl-msg-spec. There are some fairly detailed doc strings included so I won't duplicate that content, the majority of the work here is actually somewhat of a factoring of many similar blocks that are doing more or less the same `msg = await Context._rx_chan.receive()` with boilerplate for `Error`/`Stop` handling via `_raise_from_no_key_in_msg()`. The new `PldRx` basically provides a shim layer for this common "receive msg, decode its payload, yield it up to the consuming app task" by pairing the RPC feeder mem-chan with a msg-payload decoder and expecting IPC API internals to use **one** API instead of re-implementing the same pattern all over the place XD `PldRx` breakdown ------ - ------ - for now only expects a `._msgdec: MsgDec` which allows for override-able `MsgType.pld` validation and most obviously used in the impl of `.dec_msg()`, the decode message method. - provides multiple mem-chan receive options including: |_ `.recv_pld()` which does the e2e operation of receiving a payload item. |_ a sync `.recv_pld_nowait()` version. |_ a `.recv_msg_w_pld()` which optionally allows retreiving both the shuttling `MsgType` as well as it's `.pld` body for use cases where info on both is important (eg. draining a `MsgStream`). Dirty internal changeover/implementation deatz: ------ - ------ - obvi move over all the IPC "primitives" that previously had the duplicate recv-n-yield logic: - `MsgStream.receive[_nowait]()` delegating instead to the equivalent `PldRx.recv_pld[_nowait]()`. - add `Context._pld_rx: PldRx`, created and passed in by `mk_context()`; use it for the `.started()` -> `first: Started` retrieval inside `open_context_from_portal()`. - all the relevant `Portal` invocation methods: `.result()`, `.run_from_ns()`, `.run()`; also allows for dropping `_unwrap_msg()` and `.Portal_return_once()` outright Bo - rename `Context.ctx._recv_chan` -> `._rx_chan`. - add detailed `Context._scope` info for logging whether or not it's cancelled inside `_maybe_cancel_and_set_remote_error()`. - move `._context._drain_to_final_msg()` -> `._ops.drain_to_final_msg()` since it's really not necessarily ctx specific per say, and it does kinda fit with "msg operations" more abstractly ;)
2024-04-23 21:43:45 +00:00
self._expect_result_ctx: Context|None = None
self._streams: set[MsgStream] = set()
First draft "payload receiver in a new `.msg._ops` As per much tinkering, re-designs and preceding rubber-ducking via many "commit msg novelas", **finally** this adds the (hopefully) final missing layer for typed msg safety: `tractor.msg._ops.PldRx` (or `PayloadReceiver`? haven't decided how verbose to go..) Design justification summary: ------ - ------ - need a way to be as-close-as-possible to the `tractor`-application such that when `MsgType.pld: PayloadT` validation takes place, it is straightforward and obvious how user code can decide to handle any resulting `MsgTypeError`. - there should be a common and optional-yet-modular way to modify **how** data delivered via IPC (possibly embedded as user defined, type-constrained `.pld: msgspec.Struct`s) can be handled and processed during fault conditions and/or IPC "msg attacks". - support for nested type constraints within a `MsgType.pld` field should be simple to define, implement and understand at runtime. - a layer between the app-level IPC primitive APIs (`Context`/`MsgStream`) and application-task code (consumer code of those APIs) should be easily customized and prove-to-be-as-such through demonstrably rigorous internal (sub-sys) use! -> eg. via seemless runtime RPC eps support like `Actor.cancel()` -> by correctly implementing our `.devx._debug.Lock` REPL TTY mgmt dialog prot, via a dead simple payload-as-ctl-msg-spec. There are some fairly detailed doc strings included so I won't duplicate that content, the majority of the work here is actually somewhat of a factoring of many similar blocks that are doing more or less the same `msg = await Context._rx_chan.receive()` with boilerplate for `Error`/`Stop` handling via `_raise_from_no_key_in_msg()`. The new `PldRx` basically provides a shim layer for this common "receive msg, decode its payload, yield it up to the consuming app task" by pairing the RPC feeder mem-chan with a msg-payload decoder and expecting IPC API internals to use **one** API instead of re-implementing the same pattern all over the place XD `PldRx` breakdown ------ - ------ - for now only expects a `._msgdec: MsgDec` which allows for override-able `MsgType.pld` validation and most obviously used in the impl of `.dec_msg()`, the decode message method. - provides multiple mem-chan receive options including: |_ `.recv_pld()` which does the e2e operation of receiving a payload item. |_ a sync `.recv_pld_nowait()` version. |_ a `.recv_msg_w_pld()` which optionally allows retreiving both the shuttling `MsgType` as well as it's `.pld` body for use cases where info on both is important (eg. draining a `MsgStream`). Dirty internal changeover/implementation deatz: ------ - ------ - obvi move over all the IPC "primitives" that previously had the duplicate recv-n-yield logic: - `MsgStream.receive[_nowait]()` delegating instead to the equivalent `PldRx.recv_pld[_nowait]()`. - add `Context._pld_rx: PldRx`, created and passed in by `mk_context()`; use it for the `.started()` -> `first: Started` retrieval inside `open_context_from_portal()`. - all the relevant `Portal` invocation methods: `.result()`, `.run_from_ns()`, `.run()`; also allows for dropping `_unwrap_msg()` and `.Portal_return_once()` outright Bo - rename `Context.ctx._recv_chan` -> `._rx_chan`. - add detailed `Context._scope` info for logging whether or not it's cancelled inside `_maybe_cancel_and_set_remote_error()`. - move `._context._drain_to_final_msg()` -> `._ops.drain_to_final_msg()` since it's really not necessarily ctx specific per say, and it does kinda fit with "msg operations" more abstractly ;)
2024-04-23 21:43:45 +00:00
self.actor: Actor = current_actor()
2018-07-14 20:09:05 +00:00
@property
def chan(self) -> Channel:
return self._chan
@property
def channel(self) -> Channel:
'''
Proxy to legacy attr name..
Consider the shorter `Portal.chan` instead of `.channel` ;)
'''
log.debug(
'Consider the shorter `Portal.chan` instead of `.channel` ;)'
)
return self.chan
# TODO: factor this out into a `.highlevel` API-wrapper that uses
# a single `.open_context()` call underneath.
async def _submit_for_result(
self,
ns: str,
func: str,
**kwargs
) -> None:
2021-04-28 15:38:31 +00:00
First draft "payload receiver in a new `.msg._ops` As per much tinkering, re-designs and preceding rubber-ducking via many "commit msg novelas", **finally** this adds the (hopefully) final missing layer for typed msg safety: `tractor.msg._ops.PldRx` (or `PayloadReceiver`? haven't decided how verbose to go..) Design justification summary: ------ - ------ - need a way to be as-close-as-possible to the `tractor`-application such that when `MsgType.pld: PayloadT` validation takes place, it is straightforward and obvious how user code can decide to handle any resulting `MsgTypeError`. - there should be a common and optional-yet-modular way to modify **how** data delivered via IPC (possibly embedded as user defined, type-constrained `.pld: msgspec.Struct`s) can be handled and processed during fault conditions and/or IPC "msg attacks". - support for nested type constraints within a `MsgType.pld` field should be simple to define, implement and understand at runtime. - a layer between the app-level IPC primitive APIs (`Context`/`MsgStream`) and application-task code (consumer code of those APIs) should be easily customized and prove-to-be-as-such through demonstrably rigorous internal (sub-sys) use! -> eg. via seemless runtime RPC eps support like `Actor.cancel()` -> by correctly implementing our `.devx._debug.Lock` REPL TTY mgmt dialog prot, via a dead simple payload-as-ctl-msg-spec. There are some fairly detailed doc strings included so I won't duplicate that content, the majority of the work here is actually somewhat of a factoring of many similar blocks that are doing more or less the same `msg = await Context._rx_chan.receive()` with boilerplate for `Error`/`Stop` handling via `_raise_from_no_key_in_msg()`. The new `PldRx` basically provides a shim layer for this common "receive msg, decode its payload, yield it up to the consuming app task" by pairing the RPC feeder mem-chan with a msg-payload decoder and expecting IPC API internals to use **one** API instead of re-implementing the same pattern all over the place XD `PldRx` breakdown ------ - ------ - for now only expects a `._msgdec: MsgDec` which allows for override-able `MsgType.pld` validation and most obviously used in the impl of `.dec_msg()`, the decode message method. - provides multiple mem-chan receive options including: |_ `.recv_pld()` which does the e2e operation of receiving a payload item. |_ a sync `.recv_pld_nowait()` version. |_ a `.recv_msg_w_pld()` which optionally allows retreiving both the shuttling `MsgType` as well as it's `.pld` body for use cases where info on both is important (eg. draining a `MsgStream`). Dirty internal changeover/implementation deatz: ------ - ------ - obvi move over all the IPC "primitives" that previously had the duplicate recv-n-yield logic: - `MsgStream.receive[_nowait]()` delegating instead to the equivalent `PldRx.recv_pld[_nowait]()`. - add `Context._pld_rx: PldRx`, created and passed in by `mk_context()`; use it for the `.started()` -> `first: Started` retrieval inside `open_context_from_portal()`. - all the relevant `Portal` invocation methods: `.result()`, `.run_from_ns()`, `.run()`; also allows for dropping `_unwrap_msg()` and `.Portal_return_once()` outright Bo - rename `Context.ctx._recv_chan` -> `._rx_chan`. - add detailed `Context._scope` info for logging whether or not it's cancelled inside `_maybe_cancel_and_set_remote_error()`. - move `._context._drain_to_final_msg()` -> `._ops.drain_to_final_msg()` since it's really not necessarily ctx specific per say, and it does kinda fit with "msg operations" more abstractly ;)
2024-04-23 21:43:45 +00:00
if self._expect_result_ctx is not None:
raise RuntimeError(
'A pending main result has already been submitted'
)
2021-04-28 15:38:31 +00:00
self._expect_result_ctx: Context = await self.actor.start_remote_task(
self.channel,
nsf=NamespacePath(f'{ns}:{func}'),
kwargs=kwargs,
portal=self,
)
2018-08-01 19:15:18 +00:00
# TODO: we should deprecate this API right? since if we remove
# `.run_in_actor()` (and instead move it to a `.highlevel`
# wrapper api (around a single `.open_context()` call) we don't
# really have any notion of a "main" remote task any more?
#
# @api_frame
async def wait_for_result(
self,
hide_tb: bool = True,
) -> Any:
'''
Return the final result delivered by a `Return`-msg from the
remote peer actor's "main" task's `return` statement.
'''
__tracebackhide__: bool = hide_tb
# Check for non-rpc errors slapped on the
# channel for which we always raise
exc = self.channel._exc
if exc:
raise exc
# not expecting a "main" result
First draft "payload receiver in a new `.msg._ops` As per much tinkering, re-designs and preceding rubber-ducking via many "commit msg novelas", **finally** this adds the (hopefully) final missing layer for typed msg safety: `tractor.msg._ops.PldRx` (or `PayloadReceiver`? haven't decided how verbose to go..) Design justification summary: ------ - ------ - need a way to be as-close-as-possible to the `tractor`-application such that when `MsgType.pld: PayloadT` validation takes place, it is straightforward and obvious how user code can decide to handle any resulting `MsgTypeError`. - there should be a common and optional-yet-modular way to modify **how** data delivered via IPC (possibly embedded as user defined, type-constrained `.pld: msgspec.Struct`s) can be handled and processed during fault conditions and/or IPC "msg attacks". - support for nested type constraints within a `MsgType.pld` field should be simple to define, implement and understand at runtime. - a layer between the app-level IPC primitive APIs (`Context`/`MsgStream`) and application-task code (consumer code of those APIs) should be easily customized and prove-to-be-as-such through demonstrably rigorous internal (sub-sys) use! -> eg. via seemless runtime RPC eps support like `Actor.cancel()` -> by correctly implementing our `.devx._debug.Lock` REPL TTY mgmt dialog prot, via a dead simple payload-as-ctl-msg-spec. There are some fairly detailed doc strings included so I won't duplicate that content, the majority of the work here is actually somewhat of a factoring of many similar blocks that are doing more or less the same `msg = await Context._rx_chan.receive()` with boilerplate for `Error`/`Stop` handling via `_raise_from_no_key_in_msg()`. The new `PldRx` basically provides a shim layer for this common "receive msg, decode its payload, yield it up to the consuming app task" by pairing the RPC feeder mem-chan with a msg-payload decoder and expecting IPC API internals to use **one** API instead of re-implementing the same pattern all over the place XD `PldRx` breakdown ------ - ------ - for now only expects a `._msgdec: MsgDec` which allows for override-able `MsgType.pld` validation and most obviously used in the impl of `.dec_msg()`, the decode message method. - provides multiple mem-chan receive options including: |_ `.recv_pld()` which does the e2e operation of receiving a payload item. |_ a sync `.recv_pld_nowait()` version. |_ a `.recv_msg_w_pld()` which optionally allows retreiving both the shuttling `MsgType` as well as it's `.pld` body for use cases where info on both is important (eg. draining a `MsgStream`). Dirty internal changeover/implementation deatz: ------ - ------ - obvi move over all the IPC "primitives" that previously had the duplicate recv-n-yield logic: - `MsgStream.receive[_nowait]()` delegating instead to the equivalent `PldRx.recv_pld[_nowait]()`. - add `Context._pld_rx: PldRx`, created and passed in by `mk_context()`; use it for the `.started()` -> `first: Started` retrieval inside `open_context_from_portal()`. - all the relevant `Portal` invocation methods: `.result()`, `.run_from_ns()`, `.run()`; also allows for dropping `_unwrap_msg()` and `.Portal_return_once()` outright Bo - rename `Context.ctx._recv_chan` -> `._rx_chan`. - add detailed `Context._scope` info for logging whether or not it's cancelled inside `_maybe_cancel_and_set_remote_error()`. - move `._context._drain_to_final_msg()` -> `._ops.drain_to_final_msg()` since it's really not necessarily ctx specific per say, and it does kinda fit with "msg operations" more abstractly ;)
2024-04-23 21:43:45 +00:00
if self._expect_result_ctx is None:
log.warning(
f"Portal for {self.channel.uid} not expecting a final"
" result?\nresult() should only be called if subactor"
" was spawned with `ActorNursery.run_in_actor()`")
return NoResult
# expecting a "main" result
First draft "payload receiver in a new `.msg._ops` As per much tinkering, re-designs and preceding rubber-ducking via many "commit msg novelas", **finally** this adds the (hopefully) final missing layer for typed msg safety: `tractor.msg._ops.PldRx` (or `PayloadReceiver`? haven't decided how verbose to go..) Design justification summary: ------ - ------ - need a way to be as-close-as-possible to the `tractor`-application such that when `MsgType.pld: PayloadT` validation takes place, it is straightforward and obvious how user code can decide to handle any resulting `MsgTypeError`. - there should be a common and optional-yet-modular way to modify **how** data delivered via IPC (possibly embedded as user defined, type-constrained `.pld: msgspec.Struct`s) can be handled and processed during fault conditions and/or IPC "msg attacks". - support for nested type constraints within a `MsgType.pld` field should be simple to define, implement and understand at runtime. - a layer between the app-level IPC primitive APIs (`Context`/`MsgStream`) and application-task code (consumer code of those APIs) should be easily customized and prove-to-be-as-such through demonstrably rigorous internal (sub-sys) use! -> eg. via seemless runtime RPC eps support like `Actor.cancel()` -> by correctly implementing our `.devx._debug.Lock` REPL TTY mgmt dialog prot, via a dead simple payload-as-ctl-msg-spec. There are some fairly detailed doc strings included so I won't duplicate that content, the majority of the work here is actually somewhat of a factoring of many similar blocks that are doing more or less the same `msg = await Context._rx_chan.receive()` with boilerplate for `Error`/`Stop` handling via `_raise_from_no_key_in_msg()`. The new `PldRx` basically provides a shim layer for this common "receive msg, decode its payload, yield it up to the consuming app task" by pairing the RPC feeder mem-chan with a msg-payload decoder and expecting IPC API internals to use **one** API instead of re-implementing the same pattern all over the place XD `PldRx` breakdown ------ - ------ - for now only expects a `._msgdec: MsgDec` which allows for override-able `MsgType.pld` validation and most obviously used in the impl of `.dec_msg()`, the decode message method. - provides multiple mem-chan receive options including: |_ `.recv_pld()` which does the e2e operation of receiving a payload item. |_ a sync `.recv_pld_nowait()` version. |_ a `.recv_msg_w_pld()` which optionally allows retreiving both the shuttling `MsgType` as well as it's `.pld` body for use cases where info on both is important (eg. draining a `MsgStream`). Dirty internal changeover/implementation deatz: ------ - ------ - obvi move over all the IPC "primitives" that previously had the duplicate recv-n-yield logic: - `MsgStream.receive[_nowait]()` delegating instead to the equivalent `PldRx.recv_pld[_nowait]()`. - add `Context._pld_rx: PldRx`, created and passed in by `mk_context()`; use it for the `.started()` -> `first: Started` retrieval inside `open_context_from_portal()`. - all the relevant `Portal` invocation methods: `.result()`, `.run_from_ns()`, `.run()`; also allows for dropping `_unwrap_msg()` and `.Portal_return_once()` outright Bo - rename `Context.ctx._recv_chan` -> `._rx_chan`. - add detailed `Context._scope` info for logging whether or not it's cancelled inside `_maybe_cancel_and_set_remote_error()`. - move `._context._drain_to_final_msg()` -> `._ops.drain_to_final_msg()` since it's really not necessarily ctx specific per say, and it does kinda fit with "msg operations" more abstractly ;)
2024-04-23 21:43:45 +00:00
assert self._expect_result_ctx
if self._final_result_msg is None:
try:
(
self._final_result_msg,
self._final_result_pld,
) = await self._expect_result_ctx._pld_rx.recv_msg_w_pld(
ipc=self._expect_result_ctx,
expect_msg=Return,
)
except BaseException as err:
# TODO: wrap this into `@api_frame` optionally with
# some kinda filtering mechanism like log levels?
__tracebackhide__: bool = False
raise err
return self._final_result_pld
2018-07-14 20:09:05 +00:00
# TODO: factor this out into a `.highlevel` API-wrapper that uses
# a single `.open_context()` call underneath.
async def result(
self,
*args,
**kwargs,
) -> Any|Exception:
typname: str = type(self).__name__
log.warning(
f'`{typname}.result()` is DEPRECATED!\n'
f'Use `{typname}.wait_for_result()` instead!\n'
)
return await self.wait_for_result(
*args,
**kwargs,
)
async def _cancel_streams(self):
# terminate all locally running async generator
# IPC calls
if self._streams:
log.cancel(
f"Cancelling all streams with {self.channel.uid}")
for stream in self._streams.copy():
try:
await stream.aclose()
except trio.ClosedResourceError:
# don't error the stream having already been closed
# (unless of course at some point down the road we
# won't expect this to always be the case or need to
# detect it for respawning purposes?)
log.debug(f"{stream} was already closed.")
2019-12-10 05:55:03 +00:00
async def aclose(self):
log.debug(f"Closing {self}")
# TODO: once we move to implementing our own `ReceiveChannel`
# (including remote task cancellation inside its `.aclose()`)
# we'll need to .aclose all those channels here
await self._cancel_streams()
2018-07-14 20:09:05 +00:00
async def cancel_actor(
self,
timeout: float | None = None,
) -> bool:
'''
Cancel the actor runtime (and thus process) on the far
end of this portal.
**NOTE** THIS CANCELS THE ENTIRE RUNTIME AND THE
SUBPROCESS, it DOES NOT just cancel the remote task. If you
want to have a handle to cancel a remote ``tri.Task`` look
at `.open_context()` and the definition of
`._context.Context.cancel()` which CAN be used for this
purpose.
'''
__runtimeframe__: int = 1 # noqa
chan: Channel = self.channel
if not chan.connected():
log.runtime(
'This channel is already closed, skipping cancel request..'
)
return False
reminfo: str = (
2024-07-04 19:06:15 +00:00
f'c)=> {self.channel.uid}\n'
f' |_{chan}\n'
)
log.cancel(
2024-07-04 19:06:15 +00:00
f'Requesting actor-runtime cancel for peer\n\n'
f'{reminfo}'
)
# XXX the one spot we set it?
self.channel._cancel_called: bool = True
2018-07-14 20:09:05 +00:00
try:
# send cancel cmd - might not get response
# XXX: sure would be nice to make this work with
# a proper shield
Remote `Context` cancellation semantics rework B) This adds remote cancellation semantics to our `tractor.Context` machinery to more closely match that of `trio.CancelScope` but with operational differences to handle the nature of parallel tasks interoperating across multiple memory boundaries: - if an actor task cancels some context it has opened via `Context.cancel()`, the remote (scope linked) task will be cancelled using the normal `CancelScope` semantics of `trio` meaning the remote cancel scope surrounding the far side task is cancelled and `trio.Cancelled`s are expected to be raised in that scope as per normal `trio` operation, and in the case where no error is raised in that remote scope, a `ContextCancelled` error is raised inside the runtime machinery and relayed back to the opener/caller side of the context. - if any actor task cancels a full remote actor runtime using `Portal.cancel_actor()` the same semantics as above apply except every other remote actor task which also has an open context with the actor which was cancelled will also be sent a `ContextCancelled` **but** with the `.canceller` field set to the uid of the original cancel requesting actor. This changeset also includes a more "proper" solution to the issue of "allowing overruns" during streaming without attempting to implement any form of IPC streaming backpressure. Implementing task-granularity backpressure cross-process turns out to be more or less impossible without augmenting out streaming protocol (likely at the cost of performance). Further allowing overruns requires special care since any blocking of the runtime RPC msg loop task effectively can block control msgs such as cancels and stream terminations. The implementation details per abstraction layer are as follows. ._streaming.Context: - add a new contructor factor func `mk_context()` which provides a strictly private init-er whilst allowing us to not have to define an `.__init__()` on the type def. - add public `.cancel_called` and `.cancel_called_remote` properties. - general rename of what was the internal `._backpressure` var to `._allow_overruns: bool`. - move the old contents of `Actor._push_result()` into a new `._deliver_msg()` allowing for better encapsulation of per-ctx msg handling. - always check for received 'error' msgs and process them with the new `_maybe_cancel_and_set_remote_error()` **before** any msg delivery to the local task, thus guaranteeing error and cancellation handling despite any overflow handling. - add a new `._drain_overflows()` task-method for use with new `._allow_overruns: bool = True` mode. - add back a `._scope_nursery: trio.Nursery` (allocated in `Portal.open_context()`) who's sole purpose is to spawn a single task which runs the above method; anything else is an error. - augment `._deliver_msg()` to start a task and run the above method when operating in no overrun mode; the task queues overflow msgs and attempts to send them to the underlying mem chan using a blocking `.send()` call. - on context exit, any existing "drainer task" will be cancelled and remaining overflow queued msgs are discarded with a warning. - rename `._error` -> `_remote_error` and set it in a new method `_maybe_cancel_and_set_remote_error()` which is called before processing - adjust `.result()` to always call `._maybe_raise_remote_err()` at its start such that whenever a `ContextCancelled` arrives we do logic for whether or not to immediately raise that error or ignore it due to the current actor being the one who requested the cancel, by checking the error's `.canceller` field. - set the default value of `._result` to be `id(Context()` thus avoiding conflict with any `.result()` actually being `False`.. ._runtime.Actor: - augment `.cancel()` and `._cancel_task()` and `.cancel_rpc_tasks()` to take a `requesting_uid: tuple` indicating the source actor of every cancellation request. - pass through the new `Context._allow_overruns` through `.get_context()` - call the new `Context._deliver_msg()` from `._push_result()` (since the factoring out that method's contents). ._runtime._invoke: - `TastStatus.started()` back a `Context` (unless an error is raised) instead of the cancel scope to make it easy to set/get state on that context for the purposes of cancellation and remote error relay. - always raise any remote error via `Context._maybe_raise_remote_err()` before doing any `ContextCancelled` logic. - assign any `Context._cancel_called_remote` set by the `requesting_uid` cancel methods (mentioned above) to the `ContextCancelled.canceller`. ._runtime.process_messages: - always pass a `requesting_uid: tuple` to `Actor.cancel()` and `._cancel_task` to that any corresponding `ContextCancelled.canceller` can be set inside `._invoke()`.
2023-04-13 20:03:35 +00:00
with trio.move_on_after(
timeout
or
self.cancel_timeout
Remote `Context` cancellation semantics rework B) This adds remote cancellation semantics to our `tractor.Context` machinery to more closely match that of `trio.CancelScope` but with operational differences to handle the nature of parallel tasks interoperating across multiple memory boundaries: - if an actor task cancels some context it has opened via `Context.cancel()`, the remote (scope linked) task will be cancelled using the normal `CancelScope` semantics of `trio` meaning the remote cancel scope surrounding the far side task is cancelled and `trio.Cancelled`s are expected to be raised in that scope as per normal `trio` operation, and in the case where no error is raised in that remote scope, a `ContextCancelled` error is raised inside the runtime machinery and relayed back to the opener/caller side of the context. - if any actor task cancels a full remote actor runtime using `Portal.cancel_actor()` the same semantics as above apply except every other remote actor task which also has an open context with the actor which was cancelled will also be sent a `ContextCancelled` **but** with the `.canceller` field set to the uid of the original cancel requesting actor. This changeset also includes a more "proper" solution to the issue of "allowing overruns" during streaming without attempting to implement any form of IPC streaming backpressure. Implementing task-granularity backpressure cross-process turns out to be more or less impossible without augmenting out streaming protocol (likely at the cost of performance). Further allowing overruns requires special care since any blocking of the runtime RPC msg loop task effectively can block control msgs such as cancels and stream terminations. The implementation details per abstraction layer are as follows. ._streaming.Context: - add a new contructor factor func `mk_context()` which provides a strictly private init-er whilst allowing us to not have to define an `.__init__()` on the type def. - add public `.cancel_called` and `.cancel_called_remote` properties. - general rename of what was the internal `._backpressure` var to `._allow_overruns: bool`. - move the old contents of `Actor._push_result()` into a new `._deliver_msg()` allowing for better encapsulation of per-ctx msg handling. - always check for received 'error' msgs and process them with the new `_maybe_cancel_and_set_remote_error()` **before** any msg delivery to the local task, thus guaranteeing error and cancellation handling despite any overflow handling. - add a new `._drain_overflows()` task-method for use with new `._allow_overruns: bool = True` mode. - add back a `._scope_nursery: trio.Nursery` (allocated in `Portal.open_context()`) who's sole purpose is to spawn a single task which runs the above method; anything else is an error. - augment `._deliver_msg()` to start a task and run the above method when operating in no overrun mode; the task queues overflow msgs and attempts to send them to the underlying mem chan using a blocking `.send()` call. - on context exit, any existing "drainer task" will be cancelled and remaining overflow queued msgs are discarded with a warning. - rename `._error` -> `_remote_error` and set it in a new method `_maybe_cancel_and_set_remote_error()` which is called before processing - adjust `.result()` to always call `._maybe_raise_remote_err()` at its start such that whenever a `ContextCancelled` arrives we do logic for whether or not to immediately raise that error or ignore it due to the current actor being the one who requested the cancel, by checking the error's `.canceller` field. - set the default value of `._result` to be `id(Context()` thus avoiding conflict with any `.result()` actually being `False`.. ._runtime.Actor: - augment `.cancel()` and `._cancel_task()` and `.cancel_rpc_tasks()` to take a `requesting_uid: tuple` indicating the source actor of every cancellation request. - pass through the new `Context._allow_overruns` through `.get_context()` - call the new `Context._deliver_msg()` from `._push_result()` (since the factoring out that method's contents). ._runtime._invoke: - `TastStatus.started()` back a `Context` (unless an error is raised) instead of the cancel scope to make it easy to set/get state on that context for the purposes of cancellation and remote error relay. - always raise any remote error via `Context._maybe_raise_remote_err()` before doing any `ContextCancelled` logic. - assign any `Context._cancel_called_remote` set by the `requesting_uid` cancel methods (mentioned above) to the `ContextCancelled.canceller`. ._runtime.process_messages: - always pass a `requesting_uid: tuple` to `Actor.cancel()` and `._cancel_task` to that any corresponding `ContextCancelled.canceller` can be set inside `._invoke()`.
2023-04-13 20:03:35 +00:00
) as cs:
cs.shield: bool = True
await self.run_from_ns(
'self',
'cancel',
)
2018-07-14 20:09:05 +00:00
return True
if cs.cancelled_caught:
# may timeout and we never get an ack (obvi racy)
# but that doesn't mean it wasn't cancelled.
log.debug(
'May have failed to cancel peer?\n'
f'{reminfo}'
)
2019-12-10 05:55:03 +00:00
# if we get here some weird cancellation case happened
return False
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
):
log.debug(
'IPC chan for actor already closed or broken?\n\n'
f'{self.channel.uid}\n'
f' |_{self.channel}\n'
)
2018-07-14 20:09:05 +00:00
return False
# TODO: do we still need this for low level `Actor`-runtime
# method calls or can we also remove it?
async def run_from_ns(
self,
namespace_path: str,
function_name: str,
**kwargs,
) -> Any:
'''
Run a function from a (remote) namespace in a new task on the
far-end actor.
This is a more explitcit way to run tasks in a remote-process
actor using explicit object-path syntax. Hint: this is how
`.run()` works underneath.
Note::
A special namespace `self` can be used to invoke `Actor`
instance methods in the remote runtime. Currently this
should only ever be used for `Actor` (method) runtime
internals!
'''
__runtimeframe__: int = 1 # noqa
nsf = NamespacePath(
f'{namespace_path}:{function_name}'
)
ctx: Context = await self.actor.start_remote_task(
chan=self.channel,
nsf=nsf,
kwargs=kwargs,
portal=self,
)
First draft "payload receiver in a new `.msg._ops` As per much tinkering, re-designs and preceding rubber-ducking via many "commit msg novelas", **finally** this adds the (hopefully) final missing layer for typed msg safety: `tractor.msg._ops.PldRx` (or `PayloadReceiver`? haven't decided how verbose to go..) Design justification summary: ------ - ------ - need a way to be as-close-as-possible to the `tractor`-application such that when `MsgType.pld: PayloadT` validation takes place, it is straightforward and obvious how user code can decide to handle any resulting `MsgTypeError`. - there should be a common and optional-yet-modular way to modify **how** data delivered via IPC (possibly embedded as user defined, type-constrained `.pld: msgspec.Struct`s) can be handled and processed during fault conditions and/or IPC "msg attacks". - support for nested type constraints within a `MsgType.pld` field should be simple to define, implement and understand at runtime. - a layer between the app-level IPC primitive APIs (`Context`/`MsgStream`) and application-task code (consumer code of those APIs) should be easily customized and prove-to-be-as-such through demonstrably rigorous internal (sub-sys) use! -> eg. via seemless runtime RPC eps support like `Actor.cancel()` -> by correctly implementing our `.devx._debug.Lock` REPL TTY mgmt dialog prot, via a dead simple payload-as-ctl-msg-spec. There are some fairly detailed doc strings included so I won't duplicate that content, the majority of the work here is actually somewhat of a factoring of many similar blocks that are doing more or less the same `msg = await Context._rx_chan.receive()` with boilerplate for `Error`/`Stop` handling via `_raise_from_no_key_in_msg()`. The new `PldRx` basically provides a shim layer for this common "receive msg, decode its payload, yield it up to the consuming app task" by pairing the RPC feeder mem-chan with a msg-payload decoder and expecting IPC API internals to use **one** API instead of re-implementing the same pattern all over the place XD `PldRx` breakdown ------ - ------ - for now only expects a `._msgdec: MsgDec` which allows for override-able `MsgType.pld` validation and most obviously used in the impl of `.dec_msg()`, the decode message method. - provides multiple mem-chan receive options including: |_ `.recv_pld()` which does the e2e operation of receiving a payload item. |_ a sync `.recv_pld_nowait()` version. |_ a `.recv_msg_w_pld()` which optionally allows retreiving both the shuttling `MsgType` as well as it's `.pld` body for use cases where info on both is important (eg. draining a `MsgStream`). Dirty internal changeover/implementation deatz: ------ - ------ - obvi move over all the IPC "primitives" that previously had the duplicate recv-n-yield logic: - `MsgStream.receive[_nowait]()` delegating instead to the equivalent `PldRx.recv_pld[_nowait]()`. - add `Context._pld_rx: PldRx`, created and passed in by `mk_context()`; use it for the `.started()` -> `first: Started` retrieval inside `open_context_from_portal()`. - all the relevant `Portal` invocation methods: `.result()`, `.run_from_ns()`, `.run()`; also allows for dropping `_unwrap_msg()` and `.Portal_return_once()` outright Bo - rename `Context.ctx._recv_chan` -> `._rx_chan`. - add detailed `Context._scope` info for logging whether or not it's cancelled inside `_maybe_cancel_and_set_remote_error()`. - move `._context._drain_to_final_msg()` -> `._ops.drain_to_final_msg()` since it's really not necessarily ctx specific per say, and it does kinda fit with "msg operations" more abstractly ;)
2024-04-23 21:43:45 +00:00
return await ctx._pld_rx.recv_pld(
ipc=ctx,
First draft "payload receiver in a new `.msg._ops` As per much tinkering, re-designs and preceding rubber-ducking via many "commit msg novelas", **finally** this adds the (hopefully) final missing layer for typed msg safety: `tractor.msg._ops.PldRx` (or `PayloadReceiver`? haven't decided how verbose to go..) Design justification summary: ------ - ------ - need a way to be as-close-as-possible to the `tractor`-application such that when `MsgType.pld: PayloadT` validation takes place, it is straightforward and obvious how user code can decide to handle any resulting `MsgTypeError`. - there should be a common and optional-yet-modular way to modify **how** data delivered via IPC (possibly embedded as user defined, type-constrained `.pld: msgspec.Struct`s) can be handled and processed during fault conditions and/or IPC "msg attacks". - support for nested type constraints within a `MsgType.pld` field should be simple to define, implement and understand at runtime. - a layer between the app-level IPC primitive APIs (`Context`/`MsgStream`) and application-task code (consumer code of those APIs) should be easily customized and prove-to-be-as-such through demonstrably rigorous internal (sub-sys) use! -> eg. via seemless runtime RPC eps support like `Actor.cancel()` -> by correctly implementing our `.devx._debug.Lock` REPL TTY mgmt dialog prot, via a dead simple payload-as-ctl-msg-spec. There are some fairly detailed doc strings included so I won't duplicate that content, the majority of the work here is actually somewhat of a factoring of many similar blocks that are doing more or less the same `msg = await Context._rx_chan.receive()` with boilerplate for `Error`/`Stop` handling via `_raise_from_no_key_in_msg()`. The new `PldRx` basically provides a shim layer for this common "receive msg, decode its payload, yield it up to the consuming app task" by pairing the RPC feeder mem-chan with a msg-payload decoder and expecting IPC API internals to use **one** API instead of re-implementing the same pattern all over the place XD `PldRx` breakdown ------ - ------ - for now only expects a `._msgdec: MsgDec` which allows for override-able `MsgType.pld` validation and most obviously used in the impl of `.dec_msg()`, the decode message method. - provides multiple mem-chan receive options including: |_ `.recv_pld()` which does the e2e operation of receiving a payload item. |_ a sync `.recv_pld_nowait()` version. |_ a `.recv_msg_w_pld()` which optionally allows retreiving both the shuttling `MsgType` as well as it's `.pld` body for use cases where info on both is important (eg. draining a `MsgStream`). Dirty internal changeover/implementation deatz: ------ - ------ - obvi move over all the IPC "primitives" that previously had the duplicate recv-n-yield logic: - `MsgStream.receive[_nowait]()` delegating instead to the equivalent `PldRx.recv_pld[_nowait]()`. - add `Context._pld_rx: PldRx`, created and passed in by `mk_context()`; use it for the `.started()` -> `first: Started` retrieval inside `open_context_from_portal()`. - all the relevant `Portal` invocation methods: `.result()`, `.run_from_ns()`, `.run()`; also allows for dropping `_unwrap_msg()` and `.Portal_return_once()` outright Bo - rename `Context.ctx._recv_chan` -> `._rx_chan`. - add detailed `Context._scope` info for logging whether or not it's cancelled inside `_maybe_cancel_and_set_remote_error()`. - move `._context._drain_to_final_msg()` -> `._ops.drain_to_final_msg()` since it's really not necessarily ctx specific per say, and it does kinda fit with "msg operations" more abstractly ;)
2024-04-23 21:43:45 +00:00
expect_msg=Return,
)
# TODO: factor this out into a `.highlevel` API-wrapper that uses
# a single `.open_context()` call underneath.
async def run(
self,
func: str,
fn_name: str|None = None,
**kwargs
) -> Any:
'''
Submit a remote function to be scheduled and run by actor, in
a new task, wrap and return its (stream of) result(s).
This is a blocking call and returns either a value from the
remote rpc task or a local async generator instance.
'''
__runtimeframe__: int = 1 # noqa
if isinstance(func, str):
warnings.warn(
"`Portal.run(namespace: str, funcname: str)` is now"
"deprecated, pass a function reference directly instead\n"
"If you still want to run a remote function by name use"
"`Portal.run_from_ns()`",
DeprecationWarning,
stacklevel=2,
)
fn_mod_path: str = func
assert isinstance(fn_name, str)
nsf = NamespacePath(f'{fn_mod_path}:{fn_name}')
else: # function reference was passed directly
2021-04-28 15:38:31 +00:00
if (
not inspect.iscoroutinefunction(func) or
(
inspect.iscoroutinefunction(func) and
getattr(func, '_tractor_stream_function', False)
)
):
2021-04-28 15:38:31 +00:00
raise TypeError(
f'{func} must be a non-streaming async function!')
nsf = NamespacePath.from_ref(func)
ctx = await self.actor.start_remote_task(
self.channel,
nsf=nsf,
kwargs=kwargs,
portal=self,
)
First draft "payload receiver in a new `.msg._ops` As per much tinkering, re-designs and preceding rubber-ducking via many "commit msg novelas", **finally** this adds the (hopefully) final missing layer for typed msg safety: `tractor.msg._ops.PldRx` (or `PayloadReceiver`? haven't decided how verbose to go..) Design justification summary: ------ - ------ - need a way to be as-close-as-possible to the `tractor`-application such that when `MsgType.pld: PayloadT` validation takes place, it is straightforward and obvious how user code can decide to handle any resulting `MsgTypeError`. - there should be a common and optional-yet-modular way to modify **how** data delivered via IPC (possibly embedded as user defined, type-constrained `.pld: msgspec.Struct`s) can be handled and processed during fault conditions and/or IPC "msg attacks". - support for nested type constraints within a `MsgType.pld` field should be simple to define, implement and understand at runtime. - a layer between the app-level IPC primitive APIs (`Context`/`MsgStream`) and application-task code (consumer code of those APIs) should be easily customized and prove-to-be-as-such through demonstrably rigorous internal (sub-sys) use! -> eg. via seemless runtime RPC eps support like `Actor.cancel()` -> by correctly implementing our `.devx._debug.Lock` REPL TTY mgmt dialog prot, via a dead simple payload-as-ctl-msg-spec. There are some fairly detailed doc strings included so I won't duplicate that content, the majority of the work here is actually somewhat of a factoring of many similar blocks that are doing more or less the same `msg = await Context._rx_chan.receive()` with boilerplate for `Error`/`Stop` handling via `_raise_from_no_key_in_msg()`. The new `PldRx` basically provides a shim layer for this common "receive msg, decode its payload, yield it up to the consuming app task" by pairing the RPC feeder mem-chan with a msg-payload decoder and expecting IPC API internals to use **one** API instead of re-implementing the same pattern all over the place XD `PldRx` breakdown ------ - ------ - for now only expects a `._msgdec: MsgDec` which allows for override-able `MsgType.pld` validation and most obviously used in the impl of `.dec_msg()`, the decode message method. - provides multiple mem-chan receive options including: |_ `.recv_pld()` which does the e2e operation of receiving a payload item. |_ a sync `.recv_pld_nowait()` version. |_ a `.recv_msg_w_pld()` which optionally allows retreiving both the shuttling `MsgType` as well as it's `.pld` body for use cases where info on both is important (eg. draining a `MsgStream`). Dirty internal changeover/implementation deatz: ------ - ------ - obvi move over all the IPC "primitives" that previously had the duplicate recv-n-yield logic: - `MsgStream.receive[_nowait]()` delegating instead to the equivalent `PldRx.recv_pld[_nowait]()`. - add `Context._pld_rx: PldRx`, created and passed in by `mk_context()`; use it for the `.started()` -> `first: Started` retrieval inside `open_context_from_portal()`. - all the relevant `Portal` invocation methods: `.result()`, `.run_from_ns()`, `.run()`; also allows for dropping `_unwrap_msg()` and `.Portal_return_once()` outright Bo - rename `Context.ctx._recv_chan` -> `._rx_chan`. - add detailed `Context._scope` info for logging whether or not it's cancelled inside `_maybe_cancel_and_set_remote_error()`. - move `._context._drain_to_final_msg()` -> `._ops.drain_to_final_msg()` since it's really not necessarily ctx specific per say, and it does kinda fit with "msg operations" more abstractly ;)
2024-04-23 21:43:45 +00:00
return await ctx._pld_rx.recv_pld(
ipc=ctx,
First draft "payload receiver in a new `.msg._ops` As per much tinkering, re-designs and preceding rubber-ducking via many "commit msg novelas", **finally** this adds the (hopefully) final missing layer for typed msg safety: `tractor.msg._ops.PldRx` (or `PayloadReceiver`? haven't decided how verbose to go..) Design justification summary: ------ - ------ - need a way to be as-close-as-possible to the `tractor`-application such that when `MsgType.pld: PayloadT` validation takes place, it is straightforward and obvious how user code can decide to handle any resulting `MsgTypeError`. - there should be a common and optional-yet-modular way to modify **how** data delivered via IPC (possibly embedded as user defined, type-constrained `.pld: msgspec.Struct`s) can be handled and processed during fault conditions and/or IPC "msg attacks". - support for nested type constraints within a `MsgType.pld` field should be simple to define, implement and understand at runtime. - a layer between the app-level IPC primitive APIs (`Context`/`MsgStream`) and application-task code (consumer code of those APIs) should be easily customized and prove-to-be-as-such through demonstrably rigorous internal (sub-sys) use! -> eg. via seemless runtime RPC eps support like `Actor.cancel()` -> by correctly implementing our `.devx._debug.Lock` REPL TTY mgmt dialog prot, via a dead simple payload-as-ctl-msg-spec. There are some fairly detailed doc strings included so I won't duplicate that content, the majority of the work here is actually somewhat of a factoring of many similar blocks that are doing more or less the same `msg = await Context._rx_chan.receive()` with boilerplate for `Error`/`Stop` handling via `_raise_from_no_key_in_msg()`. The new `PldRx` basically provides a shim layer for this common "receive msg, decode its payload, yield it up to the consuming app task" by pairing the RPC feeder mem-chan with a msg-payload decoder and expecting IPC API internals to use **one** API instead of re-implementing the same pattern all over the place XD `PldRx` breakdown ------ - ------ - for now only expects a `._msgdec: MsgDec` which allows for override-able `MsgType.pld` validation and most obviously used in the impl of `.dec_msg()`, the decode message method. - provides multiple mem-chan receive options including: |_ `.recv_pld()` which does the e2e operation of receiving a payload item. |_ a sync `.recv_pld_nowait()` version. |_ a `.recv_msg_w_pld()` which optionally allows retreiving both the shuttling `MsgType` as well as it's `.pld` body for use cases where info on both is important (eg. draining a `MsgStream`). Dirty internal changeover/implementation deatz: ------ - ------ - obvi move over all the IPC "primitives" that previously had the duplicate recv-n-yield logic: - `MsgStream.receive[_nowait]()` delegating instead to the equivalent `PldRx.recv_pld[_nowait]()`. - add `Context._pld_rx: PldRx`, created and passed in by `mk_context()`; use it for the `.started()` -> `first: Started` retrieval inside `open_context_from_portal()`. - all the relevant `Portal` invocation methods: `.result()`, `.run_from_ns()`, `.run()`; also allows for dropping `_unwrap_msg()` and `.Portal_return_once()` outright Bo - rename `Context.ctx._recv_chan` -> `._rx_chan`. - add detailed `Context._scope` info for logging whether or not it's cancelled inside `_maybe_cancel_and_set_remote_error()`. - move `._context._drain_to_final_msg()` -> `._ops.drain_to_final_msg()` since it's really not necessarily ctx specific per say, and it does kinda fit with "msg operations" more abstractly ;)
2024-04-23 21:43:45 +00:00
expect_msg=Return,
2021-11-29 13:40:59 +00:00
)
# TODO: factor this out into a `.highlevel` API-wrapper that uses
# a single `.open_context()` call underneath.
@acm
async def open_stream_from(
self,
async_gen_func: Callable, # typing: ignore
**kwargs,
) -> AsyncGenerator[MsgStream, None]:
'''
Legacy one-way streaming API.
TODO: re-impl on top `Portal.open_context()` + an async gen
around `Context.open_stream()`.
'''
__runtimeframe__: int = 1 # noqa
if not inspect.isasyncgenfunction(async_gen_func):
2021-04-28 15:38:31 +00:00
if not (
inspect.iscoroutinefunction(async_gen_func) and
getattr(async_gen_func, '_tractor_stream_function', False)
):
raise TypeError(
f'{async_gen_func} must be an async generator function!')
ctx: Context = await self.actor.start_remote_task(
self.channel,
nsf=NamespacePath.from_ref(async_gen_func),
kwargs=kwargs,
portal=self,
)
# ensure receive-only stream entrypoint
assert ctx._remote_func_type == 'asyncgen'
try:
# deliver receive only stream
async with MsgStream(
ctx=ctx,
First draft "payload receiver in a new `.msg._ops` As per much tinkering, re-designs and preceding rubber-ducking via many "commit msg novelas", **finally** this adds the (hopefully) final missing layer for typed msg safety: `tractor.msg._ops.PldRx` (or `PayloadReceiver`? haven't decided how verbose to go..) Design justification summary: ------ - ------ - need a way to be as-close-as-possible to the `tractor`-application such that when `MsgType.pld: PayloadT` validation takes place, it is straightforward and obvious how user code can decide to handle any resulting `MsgTypeError`. - there should be a common and optional-yet-modular way to modify **how** data delivered via IPC (possibly embedded as user defined, type-constrained `.pld: msgspec.Struct`s) can be handled and processed during fault conditions and/or IPC "msg attacks". - support for nested type constraints within a `MsgType.pld` field should be simple to define, implement and understand at runtime. - a layer between the app-level IPC primitive APIs (`Context`/`MsgStream`) and application-task code (consumer code of those APIs) should be easily customized and prove-to-be-as-such through demonstrably rigorous internal (sub-sys) use! -> eg. via seemless runtime RPC eps support like `Actor.cancel()` -> by correctly implementing our `.devx._debug.Lock` REPL TTY mgmt dialog prot, via a dead simple payload-as-ctl-msg-spec. There are some fairly detailed doc strings included so I won't duplicate that content, the majority of the work here is actually somewhat of a factoring of many similar blocks that are doing more or less the same `msg = await Context._rx_chan.receive()` with boilerplate for `Error`/`Stop` handling via `_raise_from_no_key_in_msg()`. The new `PldRx` basically provides a shim layer for this common "receive msg, decode its payload, yield it up to the consuming app task" by pairing the RPC feeder mem-chan with a msg-payload decoder and expecting IPC API internals to use **one** API instead of re-implementing the same pattern all over the place XD `PldRx` breakdown ------ - ------ - for now only expects a `._msgdec: MsgDec` which allows for override-able `MsgType.pld` validation and most obviously used in the impl of `.dec_msg()`, the decode message method. - provides multiple mem-chan receive options including: |_ `.recv_pld()` which does the e2e operation of receiving a payload item. |_ a sync `.recv_pld_nowait()` version. |_ a `.recv_msg_w_pld()` which optionally allows retreiving both the shuttling `MsgType` as well as it's `.pld` body for use cases where info on both is important (eg. draining a `MsgStream`). Dirty internal changeover/implementation deatz: ------ - ------ - obvi move over all the IPC "primitives" that previously had the duplicate recv-n-yield logic: - `MsgStream.receive[_nowait]()` delegating instead to the equivalent `PldRx.recv_pld[_nowait]()`. - add `Context._pld_rx: PldRx`, created and passed in by `mk_context()`; use it for the `.started()` -> `first: Started` retrieval inside `open_context_from_portal()`. - all the relevant `Portal` invocation methods: `.result()`, `.run_from_ns()`, `.run()`; also allows for dropping `_unwrap_msg()` and `.Portal_return_once()` outright Bo - rename `Context.ctx._recv_chan` -> `._rx_chan`. - add detailed `Context._scope` info for logging whether or not it's cancelled inside `_maybe_cancel_and_set_remote_error()`. - move `._context._drain_to_final_msg()` -> `._ops.drain_to_final_msg()` since it's really not necessarily ctx specific per say, and it does kinda fit with "msg operations" more abstractly ;)
2024-04-23 21:43:45 +00:00
rx_chan=ctx._rx_chan,
) as stream:
self._streams.add(stream)
ctx._stream = stream
yield stream
finally:
# cancel the far end task on consumer close
# NOTE: this is a special case since we assume that if using
# this ``.open_fream_from()`` api, the stream is one a one
# time use and we couple the far end tasks's lifetime to
# the consumer's scope; we don't ever send a `'stop'`
# message right now since there shouldn't be a reason to
# stop and restart the stream, right?
try:
with trio.CancelScope(shield=True):
await ctx.cancel()
except trio.ClosedResourceError:
# if the far end terminates before we send a cancel the
# underlying transport-channel may already be closed.
log.cancel(f'Context {ctx} was already closed?')
# XXX: should this always be done?
# await recv_chan.aclose()
self._streams.remove(stream)
# NOTE: impl is found in `._context`` mod to make
# reading/groking the details simpler code-org-wise. This
# method does not have to be used over that `@acm` module func
# directly, it is for conventience and from the original API
# design.
open_context = open_context_from_portal
2021-12-06 15:52:18 +00:00
@dataclass
2018-07-14 20:09:05 +00:00
class LocalPortal:
'''
A 'portal' to a local ``Actor``.
2018-07-14 20:09:05 +00:00
A compatibility shim for normal portals but for invoking functions
using an in process actor instance.
'''
actor: 'Actor' # type: ignore # noqa
channel: Channel
2018-07-14 20:09:05 +00:00
Init-support for "multi homed" transports Since we'd like to eventually allow a diverse set of transport (protocol) methods and stacks, and a multi-peer discovery system for distributed actor-tree applications, this reworks all runtime internals to support multi-homing for any given tree on a logical host. In other words any actor can now bind its transport server (currently only unsecured TCP + `msgspec`) to more then one address available in its (linux) network namespace. Further, registry actors (now dubbed "registars" instead of "arbiters") can also similarly bind to multiple network addresses and provide discovery services to remote actors via multiple addresses which can now be provided at runtime startup. Deats: - adjust `._runtime` internals to use a `list[tuple[str, int]]` (and thus pluralized) socket address sequence where applicable for transport server socket binds, now exposed via `Actor.accept_addrs`: - `Actor.__init__()` now takes a `registry_addrs: list`. - `Actor.is_arbiter` -> `.is_registrar`. - `._arb_addr` -> `._reg_addrs: list[tuple]`. - always reg and de-reg from all registrars in `async_main()`. - only set the global runtime var `'_root_mailbox'` to the loopback address since normally all in-tree processes should have access to it, right? - `._serve_forever()` task now takes `listen_sockaddrs: list[tuple]` - make `open_root_actor()` take a `registry_addrs: list[tuple[str, int]]` and defaults when not passed. - change `ActorNursery.start_..()` methods take `bind_addrs: list` and pass down through the spawning layer(s) via the parent-seed-msg. - generalize all `._discovery()` APIs to accept `registry_addrs`-like inputs and move all relevant subsystems to adopt the "registry" style naming instead of "arbiter": - make `find_actor()` support batched concurrent portal queries over all provided input addresses using `.trionics.gather_contexts()` Bo - syntax: move to using `async with <tuples>` 3.9+ style chained @acms. - a general modernization of the code to a python 3.9+ style. - start deprecation and change to "registry" naming / semantics: - `._discovery.get_arbiter()` -> `.get_registry()`
2023-09-27 19:19:30 +00:00
async def run_from_ns(
self,
ns: str,
func_name: str,
**kwargs,
) -> Any:
'''
Run a requested local function from a namespace path and
return it's result.
'''
2018-07-14 20:09:05 +00:00
obj = self.actor if ns == 'self' else importlib.import_module(ns)
2018-09-21 13:46:01 +00:00
func = getattr(obj, func_name)
return await func(**kwargs)
2018-07-14 20:09:05 +00:00
@acm
async def open_portal(
2018-08-31 21:16:24 +00:00
channel: Channel,
tn: trio.Nursery|None = None,
start_msg_loop: bool = True,
2020-08-08 18:47:52 +00:00
shield: bool = False,
) -> AsyncGenerator[Portal, None]:
'''
Open a ``Portal`` through the provided ``channel``.
2018-07-14 20:09:05 +00:00
Spawns a background task to handle RPC processing, normally
done by the actor-runtime implicitly via a call to
`._rpc.process_messages()`. just after connection establishment.
'''
2018-07-14 20:09:05 +00:00
actor = current_actor()
assert actor
Drop `None`-sentinel cancels RPC loop mechanism Pretty sure we haven't *needed it* for a while, it was always generally hazardous in terms of IPC msg types, AND it's definitely incompatible with a dynamically applied typed msg spec: you can't just expect a `None` to be willy nilly handled all the time XD For now I'm masking out all the code and leaving very detailed surrounding notes but am not removing it quite yet in case for strange reason it is needed by some edge case (though I haven't found according to the test suite). Backstory: ------ - ------ Originally (i'm pretty sure anyway) it was added as a super naive "remote cancellation" mechanism (back before there were specific `Actor` methods for such things) that was mostly (only?) used before IPC `Channel` closures to "more gracefully cancel" the connection's parented RPC tasks. Since we now have explicit runtime-RPC endpoints for conducting remote cancellation of both tasks and full actors, it should really be removed anyway, because: - a `None`-msg setinel is inconsistent with other RPC endpoint handling input patterns which (even prior to typed msging) had specific msg-value triggers. - the IPC endpoint's (block) implementation should use `Actor.cancel_rpc_tasks(parent_chan=chan)` instead of a manual loop through a `Actor._rpc_tasks.copy()`.. Deats: - mask the `Channel.send(None)` calls from both the `Actor._stream_handler()` tail as well as from the `._portal.open_portal()` was connected block. - mask the msg loop endpoint block and toss in lotsa notes. Unrelated tweaks: - drop `Actor._debug_mode`; unused. - make `Actor.cancel_server()` return a `bool`. - use `.msg.pretty_struct.Struct.pformat()` to show any msg that is ignored (bc invalid) in `._push_result()`.
2024-04-05 23:07:12 +00:00
was_connected: bool = False
2018-07-14 20:09:05 +00:00
async with maybe_open_nursery(
tn,
shield=shield,
) as tn:
2018-07-14 20:09:05 +00:00
if not channel.connected():
await channel.connect()
was_connected = True
if channel.uid is None:
await actor._do_handshake(channel)
2018-07-14 20:09:05 +00:00
msg_loop_cs: trio.CancelScope|None = None
if start_msg_loop:
2022-08-03 19:14:36 +00:00
from ._runtime import process_messages
msg_loop_cs = await tn.start(
partial(
2022-08-03 19:14:36 +00:00
process_messages,
actor,
channel,
# if the local task is cancelled we want to keep
# the msg loop running until our block ends
shield=True,
)
)
2018-07-14 20:09:05 +00:00
portal = Portal(channel)
try:
yield portal
finally:
await portal.aclose()
if was_connected:
Drop `None`-sentinel cancels RPC loop mechanism Pretty sure we haven't *needed it* for a while, it was always generally hazardous in terms of IPC msg types, AND it's definitely incompatible with a dynamically applied typed msg spec: you can't just expect a `None` to be willy nilly handled all the time XD For now I'm masking out all the code and leaving very detailed surrounding notes but am not removing it quite yet in case for strange reason it is needed by some edge case (though I haven't found according to the test suite). Backstory: ------ - ------ Originally (i'm pretty sure anyway) it was added as a super naive "remote cancellation" mechanism (back before there were specific `Actor` methods for such things) that was mostly (only?) used before IPC `Channel` closures to "more gracefully cancel" the connection's parented RPC tasks. Since we now have explicit runtime-RPC endpoints for conducting remote cancellation of both tasks and full actors, it should really be removed anyway, because: - a `None`-msg setinel is inconsistent with other RPC endpoint handling input patterns which (even prior to typed msging) had specific msg-value triggers. - the IPC endpoint's (block) implementation should use `Actor.cancel_rpc_tasks(parent_chan=chan)` instead of a manual loop through a `Actor._rpc_tasks.copy()`.. Deats: - mask the `Channel.send(None)` calls from both the `Actor._stream_handler()` tail as well as from the `._portal.open_portal()` was connected block. - mask the msg loop endpoint block and toss in lotsa notes. Unrelated tweaks: - drop `Actor._debug_mode`; unused. - make `Actor.cancel_server()` return a `bool`. - use `.msg.pretty_struct.Struct.pformat()` to show any msg that is ignored (bc invalid) in `._push_result()`.
2024-04-05 23:07:12 +00:00
await channel.aclose()
# cancel background msg loop task
if msg_loop_cs is not None:
msg_loop_cs.cancel()
tn.cancel_scope.cancel()