1
0
Fork 0
tractor/tractor/_portal.py

622 lines
20 KiB
Python
Raw Normal View History

Re-license code base for distribution under AGPL This commit obviously denotes a re-license of all applicable parts of the code base. Acknowledgement of this change was completed in #274 by the majority of the current set of contributors. From here henceforth all changes will be AGPL licensed and distributed. This is purely an effort to maintain the same copy-left policy whilst closing the (perceived) SaaS loophole the GPL allows for. It is merely for this loophole: to avoid code hiding by any potential "network providers" who are attempting to use the project to make a profit without either compensating the authors or re-distributing their changes. I thought quite a bit about this change and can't see a reason not to close the SaaS loophole in our current license. We still are (hard) copy-left and I plan to keep the code base this way for a couple reasons: - The code base produces income/profit through parent projects and is demonstrably of high value. - I believe firms should not get free lunch for the sake of "contributions from their employees" or "usage as a service" which I have found to be a dubious argument at best. - If a firm who intends to profit from the code base wants to use it they can propose a secondary commercial license to purchase with the proceeds going to the project's authors under some form of well defined contract. - Many successful projects like Qt use this model; I see no reason it can't work in this case until such a time as the authors feel it should be loosened. There has been detailed discussion in #103 on licensing alternatives. The main point of this AGPL change is to protect the code base for the time being from exploitation while it grows and as we move into the next phase of development which will include extension into the multi-host distributed software space.
2021-12-13 18:08:32 +00:00
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Memory boundary "Portals": an API for structured
concurrency linked tasks running in disparate memory domains.
'''
from __future__ import annotations
2018-07-14 20:09:05 +00:00
import importlib
import inspect
from typing import (
Any, Optional,
2022-02-14 15:37:31 +00:00
Callable, AsyncGenerator,
Type,
)
from functools import partial
from dataclasses import dataclass
from pprint import pformat
import warnings
2018-07-14 20:09:05 +00:00
import trio
from async_generator import asynccontextmanager
2018-07-14 20:09:05 +00:00
from .trionics import maybe_open_nursery
2018-07-14 20:09:05 +00:00
from ._state import current_actor
2018-08-31 21:16:24 +00:00
from ._ipc import Channel
2018-07-14 20:09:05 +00:00
from .log import get_logger
from .msg import NamespacePath
from ._exceptions import (
unpack_error,
NoResult,
ContextCancelled,
)
from ._streaming import Context, ReceiveMsgStream
2018-07-14 20:09:05 +00:00
log = get_logger(__name__)
2018-07-14 20:09:05 +00:00
2021-11-29 13:40:59 +00:00
def _unwrap_msg(
msg: dict[str, Any],
channel: Channel
) -> Any:
try:
return msg['return']
except KeyError:
# internal error should never get here
assert msg.get('cid'), "Received internal error at portal?"
raise unpack_error(msg, channel)
class MessagingError(Exception):
'Some kind of unexpected SC messaging dialog issue'
2021-11-29 13:40:59 +00:00
2018-07-14 20:09:05 +00:00
class Portal:
2021-11-29 13:40:59 +00:00
'''
A 'portal' to a(n) (remote) ``Actor``.
2018-07-14 20:09:05 +00:00
A portal is "opened" (and eventually closed) by one side of an
inter-actor communication context. The side which opens the portal
is equivalent to a "caller" in function parlance and usually is
either the called actor's parent (in process tree hierarchy terms)
or a client interested in scheduling work to be done remotely in a
far process.
The portal api allows the "caller" actor to invoke remote routines
and receive results through an underlying ``tractor.Channel`` as
though the remote (async) function / generator was called locally.
It may be thought of loosely as an RPC api where native Python
function calling semantics are supported transparently; hence it is
like having a "portal" between the seperate actor memory spaces.
2018-07-14 20:09:05 +00:00
2021-11-29 13:40:59 +00:00
'''
# the timeout for a remote cancel request sent to
# a(n) (peer) actor.
cancel_timeout = 0.5
2018-08-31 21:16:24 +00:00
def __init__(self, channel: Channel) -> None:
2018-07-14 20:09:05 +00:00
self.channel = channel
2018-08-01 19:15:18 +00:00
# during the portal's lifetime
self._result_msg: Optional[dict] = None
# When set to a ``Context`` (when _submit_for_result is called)
# it is expected that ``result()`` will be awaited at some
# point.
self._expect_result: Optional[Context] = None
self._streams: set[ReceiveMsgStream] = set()
self.actor = current_actor()
2018-07-14 20:09:05 +00:00
async def _submit_for_result(
self,
ns: str,
func: str,
**kwargs
) -> None:
2021-04-28 15:38:31 +00:00
2018-08-01 19:15:18 +00:00
assert self._expect_result is None, \
"A pending main result has already been submitted"
2021-04-28 15:38:31 +00:00
self._expect_result = await self.actor.start_remote_task(
self.channel,
ns,
func,
kwargs
)
2018-08-01 19:15:18 +00:00
async def _return_once(
self,
ctx: Context,
2021-11-29 13:40:59 +00:00
) -> dict[str, Any]:
assert ctx._remote_func_type == 'asyncfunc' # single response
msg = await ctx._recv_chan.receive()
2021-11-29 13:40:59 +00:00
return msg
2018-07-14 20:09:05 +00:00
2018-08-31 21:16:24 +00:00
async def result(self) -> Any:
'''
Return the result(s) from the remote actor's "main" task.
'''
# Check for non-rpc errors slapped on the
# channel for which we always raise
exc = self.channel._exc
if exc:
raise exc
# not expecting a "main" result
2018-08-01 19:15:18 +00:00
if self._expect_result is None:
log.warning(
f"Portal for {self.channel.uid} not expecting a final"
" result?\nresult() should only be called if subactor"
" was spawned with `ActorNursery.run_in_actor()`")
return NoResult
# expecting a "main" result
assert self._expect_result
if self._result_msg is None:
self._result_msg = await self._return_once(
self._expect_result
)
2021-11-29 13:40:59 +00:00
return _unwrap_msg(self._result_msg, self.channel)
2018-07-14 20:09:05 +00:00
async def _cancel_streams(self):
# terminate all locally running async generator
# IPC calls
if self._streams:
log.cancel(
f"Cancelling all streams with {self.channel.uid}")
for stream in self._streams.copy():
try:
await stream.aclose()
except trio.ClosedResourceError:
# don't error the stream having already been closed
# (unless of course at some point down the road we
# won't expect this to always be the case or need to
# detect it for respawning purposes?)
log.debug(f"{stream} was already closed.")
2019-12-10 05:55:03 +00:00
async def aclose(self):
log.debug(f"Closing {self}")
# TODO: once we move to implementing our own `ReceiveChannel`
# (including remote task cancellation inside its `.aclose()`)
# we'll need to .aclose all those channels here
await self._cancel_streams()
2018-07-14 20:09:05 +00:00
async def cancel_actor(
self,
timeout: float = None,
) -> bool:
'''
Cancel the actor on the other end of this portal.
'''
if not self.channel.connected():
log.cancel("This channel is already closed can't cancel")
return False
log.cancel(
f"Sending actor cancel request to {self.channel.uid} on "
2018-07-14 20:09:05 +00:00
f"{self.channel}")
self.channel._cancel_called = True
2018-07-14 20:09:05 +00:00
try:
# send cancel cmd - might not get response
2020-08-08 18:47:52 +00:00
# XXX: sure would be nice to make this work with a proper shield
with trio.move_on_after(timeout or self.cancel_timeout) as cs:
cs.shield = True
await self.run_from_ns('self', 'cancel')
2018-07-14 20:09:05 +00:00
return True
if cs.cancelled_caught:
log.cancel(f"May have failed to cancel {self.channel.uid}")
2019-12-10 05:55:03 +00:00
# if we get here some weird cancellation case happened
return False
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
):
log.cancel(
f"{self.channel} for {self.channel.uid} was already "
"closed or broken?")
2018-07-14 20:09:05 +00:00
return False
async def run_from_ns(
self,
namespace_path: str,
function_name: str,
**kwargs,
) -> Any:
'''
Run a function from a (remote) namespace in a new task on the
far-end actor.
This is a more explitcit way to run tasks in a remote-process
actor using explicit object-path syntax. Hint: this is how
`.run()` works underneath.
Note::
A special namespace `self` can be used to invoke `Actor`
instance methods in the remote runtime. Currently this
should only be used solely for ``tractor`` runtime
internals.
'''
ctx = await self.actor.start_remote_task(
self.channel,
namespace_path,
function_name,
kwargs,
)
ctx._portal = self
msg = await self._return_once(ctx)
2021-11-29 13:40:59 +00:00
return _unwrap_msg(msg, self.channel)
async def run(
self,
func: str,
fn_name: Optional[str] = None,
**kwargs
) -> Any:
'''
Submit a remote function to be scheduled and run by actor, in
a new task, wrap and return its (stream of) result(s).
This is a blocking call and returns either a value from the
remote rpc task or a local async generator instance.
'''
if isinstance(func, str):
warnings.warn(
"`Portal.run(namespace: str, funcname: str)` is now"
"deprecated, pass a function reference directly instead\n"
"If you still want to run a remote function by name use"
"`Portal.run_from_ns()`",
DeprecationWarning,
stacklevel=2,
)
fn_mod_path = func
assert isinstance(fn_name, str)
else: # function reference was passed directly
2021-04-28 15:38:31 +00:00
if (
not inspect.iscoroutinefunction(func) or
(
inspect.iscoroutinefunction(func) and
getattr(func, '_tractor_stream_function', False)
)
):
2021-04-28 15:38:31 +00:00
raise TypeError(
f'{func} must be a non-streaming async function!')
fn_mod_path, fn_name = NamespacePath.from_ref(func).to_tuple()
ctx = await self.actor.start_remote_task(
self.channel,
fn_mod_path,
fn_name,
kwargs,
)
ctx._portal = self
2021-11-29 13:40:59 +00:00
return _unwrap_msg(
await self._return_once(ctx),
2021-11-29 13:40:59 +00:00
self.channel,
)
@asynccontextmanager
async def open_stream_from(
self,
async_gen_func: Callable, # typing: ignore
**kwargs,
) -> AsyncGenerator[ReceiveMsgStream, None]:
if not inspect.isasyncgenfunction(async_gen_func):
2021-04-28 15:38:31 +00:00
if not (
inspect.iscoroutinefunction(async_gen_func) and
getattr(async_gen_func, '_tractor_stream_function', False)
):
raise TypeError(
f'{async_gen_func} must be an async generator function!')
fn_mod_path, fn_name = NamespacePath.from_ref(
async_gen_func).to_tuple()
ctx = await self.actor.start_remote_task(
self.channel,
fn_mod_path,
fn_name,
kwargs
)
ctx._portal = self
# ensure receive-only stream entrypoint
assert ctx._remote_func_type == 'asyncgen'
try:
# deliver receive only stream
async with ReceiveMsgStream(
ctx, ctx._recv_chan,
) as rchan:
self._streams.add(rchan)
yield rchan
finally:
# cancel the far end task on consumer close
# NOTE: this is a special case since we assume that if using
# this ``.open_fream_from()`` api, the stream is one a one
# time use and we couple the far end tasks's lifetime to
# the consumer's scope; we don't ever send a `'stop'`
# message right now since there shouldn't be a reason to
# stop and restart the stream, right?
try:
with trio.CancelScope(shield=True):
await ctx.cancel()
except trio.ClosedResourceError:
# if the far end terminates before we send a cancel the
# underlying transport-channel may already be closed.
log.cancel(f'Context {ctx} was already closed?')
# XXX: should this always be done?
# await recv_chan.aclose()
self._streams.remove(rchan)
@asynccontextmanager
async def open_context(
self,
func: Callable,
**kwargs,
) -> AsyncGenerator[tuple[Context, Any], None]:
'''
Open an inter-actor task context.
This is a synchronous API which allows for deterministic
setup/teardown of a remote task. The yielded ``Context`` further
allows for opening bidirectional streams, explicit cancellation
and synchronized final result collection. See ``tractor.Context``.
'''
# conduct target func method structural checks
if not inspect.iscoroutinefunction(func) and (
getattr(func, '_tractor_contex_function', False)
):
raise TypeError(
f'{func} must be an async generator function!')
fn_mod_path, fn_name = NamespacePath.from_ref(func).to_tuple()
2021-12-06 15:52:18 +00:00
ctx = await self.actor.start_remote_task(
self.channel,
fn_mod_path,
fn_name,
kwargs
)
2021-12-06 15:52:18 +00:00
assert ctx._remote_func_type == 'context'
msg = await ctx._recv_chan.receive()
try:
# the "first" value here is delivered by the callee's
# ``Context.started()`` call.
first = msg['started']
ctx._started_called = True
except KeyError:
assert msg.get('cid'), ("Received internal error at context?")
if msg.get('error'):
# raise kerr from unpack_error(msg, self.channel)
raise unpack_error(msg, self.channel) from None
else:
raise MessagingError(
f'Context for {ctx.cid} was expecting a `started` message'
f' but received a non-error msg:\n{pformat(msg)}'
)
2021-07-08 16:48:34 +00:00
_err: Optional[BaseException] = None
ctx._portal = self
uid = self.channel.uid
cid = ctx.cid
2022-02-14 15:37:31 +00:00
etype: Optional[Type[BaseException]] = None
2021-07-08 16:48:34 +00:00
# deliver context instance and .started() msg value in open tuple.
try:
async with trio.open_nursery() as scope_nursery:
ctx._scope_nursery = scope_nursery
2021-07-08 16:48:34 +00:00
# do we need this?
# await trio.lowlevel.checkpoint()
2021-05-10 11:23:39 +00:00
yield ctx, first
except ContextCancelled as err:
_err = err
if not ctx._cancel_called:
# context was cancelled at the far end but was
# not part of this end requesting that cancel
# so raise for the local task to respond and handle.
raise
# if the context was cancelled by client code
# then we don't need to raise since user code
# is expecting this and the block should exit.
else:
log.debug(f'Context {ctx} cancelled gracefully')
except (
BaseException,
# more specifically, we need to handle these but not
# sure it's worth being pedantic:
# Exception,
# trio.Cancelled,
# KeyboardInterrupt,
) as err:
etype = type(err)
# the context cancels itself on any cancel
# causing error.
if ctx.chan.connected():
log.cancel(
'Context cancelled for task, sending cancel request..\n'
f'task:{cid}\n'
f'actor:{uid}'
)
await ctx.cancel()
else:
log.warning(
'IPC connection for context is broken?\n'
f'task:{cid}\n'
f'actor:{uid}'
)
raise
finally:
# in the case where a runtime nursery (due to internal bug)
# or a remote actor transmits an error we want to be
# sure we get the error the underlying feeder mem chan.
# if it's not raised here it *should* be raised from the
# msg loop nursery right?
if ctx.chan.connected():
log.info(
'Waiting on final context-task result for\n'
2022-06-26 17:47:43 +00:00
f'task: {cid}\n'
f'actor: {uid}'
)
result = await ctx.result()
# though it should be impossible for any tasks
# operating *in* this scope to have survived
# we tear down the runtime feeder chan last
# to avoid premature stream clobbers.
if ctx._recv_chan is not None:
# should we encapsulate this in the context api?
await ctx._recv_chan.aclose()
if etype:
if ctx._cancel_called:
log.cancel(
f'Context {fn_name} cancelled by caller with\n{etype}'
)
elif _err is not None:
log.cancel(
f'Context for task cancelled by callee with {etype}\n'
f'target: `{fn_name}`\n'
f'task:{cid}\n'
f'actor:{uid}'
)
else:
log.runtime(
f'Context {fn_name} returned '
2021-07-08 16:48:34 +00:00
f'value from callee `{result}`'
)
# XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside
# ``Actor._push_result()`` the msg will be discarded and in
# the case where that msg is global debugger unlock (via
# a "stop" msg for a stream), this can result in a deadlock
# where the root is waiting on the lock to clear but the
# child has already cleared it and clobbered IPC.
from ._debug import maybe_wait_for_debugger
await maybe_wait_for_debugger()
2021-12-06 15:52:18 +00:00
# remove the context from runtime tracking
self.actor._contexts.pop((self.channel.uid, ctx.cid))
@dataclass
2018-07-14 20:09:05 +00:00
class LocalPortal:
'''
A 'portal' to a local ``Actor``.
2018-07-14 20:09:05 +00:00
A compatibility shim for normal portals but for invoking functions
using an in process actor instance.
'''
actor: 'Actor' # type: ignore # noqa
channel: Channel
2018-07-14 20:09:05 +00:00
async def run_from_ns(self, ns: str, func_name: str, **kwargs) -> Any:
'''
Run a requested local function from a namespace path and
return it's result.
'''
2018-07-14 20:09:05 +00:00
obj = self.actor if ns == 'self' else importlib.import_module(ns)
2018-09-21 13:46:01 +00:00
func = getattr(obj, func_name)
return await func(**kwargs)
2018-07-14 20:09:05 +00:00
@asynccontextmanager
async def open_portal(
2018-08-31 21:16:24 +00:00
channel: Channel,
nursery: Optional[trio.Nursery] = None,
start_msg_loop: bool = True,
2020-08-08 18:47:52 +00:00
shield: bool = False,
) -> AsyncGenerator[Portal, None]:
'''
Open a ``Portal`` through the provided ``channel``.
2018-07-14 20:09:05 +00:00
Spawns a background task to handle message processing (normally
done by the actor-runtime implicitly).
'''
2018-07-14 20:09:05 +00:00
actor = current_actor()
assert actor
was_connected = False
2020-08-08 18:47:52 +00:00
async with maybe_open_nursery(nursery, shield=shield) as nursery:
2018-07-14 20:09:05 +00:00
if not channel.connected():
await channel.connect()
was_connected = True
if channel.uid is None:
await actor._do_handshake(channel)
2018-07-14 20:09:05 +00:00
2020-08-09 00:57:18 +00:00
msg_loop_cs: Optional[trio.CancelScope] = None
if start_msg_loop:
2022-08-03 19:14:36 +00:00
from ._runtime import process_messages
2020-08-09 00:57:18 +00:00
msg_loop_cs = await nursery.start(
partial(
2022-08-03 19:14:36 +00:00
process_messages,
actor,
channel,
# if the local task is cancelled we want to keep
# the msg loop running until our block ends
shield=True,
)
)
2018-07-14 20:09:05 +00:00
portal = Portal(channel)
try:
yield portal
finally:
await portal.aclose()
if was_connected:
# gracefully signal remote channel-msg loop
await channel.send(None)
# await channel.aclose()
# cancel background msg loop task
if msg_loop_cs:
msg_loop_cs.cancel()
nursery.cancel_scope.cancel()