Add `Portal.run_from_ns()`
It turns out in order to maintain our sneaky little "call an `Actor` method in this remote process" we still need the ability to invoke functions from a namespace. We're currently using a "self" namespace as a way to do this for internal inter-process method calling. Either way, I see no reason not to keep a public method for this invoke style (we just won't market it) since it is still how the machinery works underneath.func_refs_always
parent
a668f714d5
commit
7134f35d6e
|
@ -120,7 +120,7 @@ class ReceiveStream(trio.abc.ReceiveChannel):
|
||||||
# NOTE: we're telling the far end actor to cancel a task
|
# NOTE: we're telling the far end actor to cancel a task
|
||||||
# corresponding to *this actor*. The far end local channel
|
# corresponding to *this actor*. The far end local channel
|
||||||
# instance is passed to `Actor._cancel_task()` implicitly.
|
# instance is passed to `Actor._cancel_task()` implicitly.
|
||||||
await self._portal.run('self', '_cancel_task', cid=cid)
|
await self._portal.run_from_ns('self', '_cancel_task', cid=cid)
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
if cs.cancelled_caught:
|
||||||
# XXX: there's no way to know if the remote task was indeed
|
# XXX: there's no way to know if the remote task was indeed
|
||||||
|
@ -212,9 +212,12 @@ class Portal:
|
||||||
"""
|
"""
|
||||||
if isinstance(func_or_ns, str):
|
if isinstance(func_or_ns, str):
|
||||||
warnings.warn(
|
warnings.warn(
|
||||||
"`Portal.run(namespace: str, funcname: str)` is now deprecated, "
|
"`Portal.run(namespace: str, funcname: str)` is now"
|
||||||
"pass a function reference directly instead",
|
"deprecated, pass a function reference directly instead\n"
|
||||||
DeprecationWarning
|
"If you still want to run a remote function by name use"
|
||||||
|
"`Portal.run_from_ns()`",
|
||||||
|
DeprecationWarning,
|
||||||
|
stacklevel=2,
|
||||||
)
|
)
|
||||||
fn_mod_path = func_or_ns
|
fn_mod_path = func_or_ns
|
||||||
assert isinstance(fn_name, str)
|
assert isinstance(fn_name, str)
|
||||||
|
@ -228,6 +231,28 @@ class Portal:
|
||||||
*(await self._submit(fn_mod_path, fn_name, kwargs))
|
*(await self._submit(fn_mod_path, fn_name, kwargs))
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def run_from_ns(
|
||||||
|
self,
|
||||||
|
namespace_path: str,
|
||||||
|
function_name: str,
|
||||||
|
**kwargs,
|
||||||
|
) -> Any:
|
||||||
|
"""Run a function from a (remote) namespace in a new task on the far-end actor.
|
||||||
|
|
||||||
|
This is a more explitcit way to run tasks in a remote-process
|
||||||
|
actor using explicit object-path syntax. Hint: this is how
|
||||||
|
`.run()` works underneath.
|
||||||
|
|
||||||
|
Note::
|
||||||
|
|
||||||
|
A special namespace `self` can be used to invoke `Actor`
|
||||||
|
instance methods in the remote runtime. Currently this should only
|
||||||
|
be used for `tractor` internals.
|
||||||
|
"""
|
||||||
|
return await self._return_from_resptype(
|
||||||
|
*(await self._submit(namespace_path, function_name, kwargs))
|
||||||
|
)
|
||||||
|
|
||||||
async def _return_from_resptype(
|
async def _return_from_resptype(
|
||||||
self,
|
self,
|
||||||
cid: str,
|
cid: str,
|
||||||
|
@ -329,13 +354,16 @@ class Portal:
|
||||||
# with trio.CancelScope(shield=True) as cancel_scope:
|
# with trio.CancelScope(shield=True) as cancel_scope:
|
||||||
with trio.move_on_after(0.5) as cancel_scope:
|
with trio.move_on_after(0.5) as cancel_scope:
|
||||||
cancel_scope.shield = True
|
cancel_scope.shield = True
|
||||||
await self.run('self', 'cancel')
|
|
||||||
|
await self.run_from_ns('self', 'cancel')
|
||||||
return True
|
return True
|
||||||
|
|
||||||
if cancel_scope.cancelled_caught:
|
if cancel_scope.cancelled_caught:
|
||||||
log.warning(f"May have failed to cancel {self.channel.uid}")
|
log.warning(f"May have failed to cancel {self.channel.uid}")
|
||||||
|
|
||||||
# if we get here some weird cancellation case happened
|
# if we get here some weird cancellation case happened
|
||||||
return False
|
return False
|
||||||
|
|
||||||
except trio.ClosedResourceError:
|
except trio.ClosedResourceError:
|
||||||
log.warning(
|
log.warning(
|
||||||
f"{self.channel} for {self.channel.uid} was already closed?")
|
f"{self.channel} for {self.channel.uid} was already closed?")
|
||||||
|
@ -352,8 +380,10 @@ class LocalPortal:
|
||||||
actor: 'Actor' # type: ignore # noqa
|
actor: 'Actor' # type: ignore # noqa
|
||||||
channel: Channel
|
channel: Channel
|
||||||
|
|
||||||
async def run(self, ns: str, func_name: str, **kwargs) -> Any:
|
async def run_from_ns(self, ns: str, func_name: str, **kwargs) -> Any:
|
||||||
"""Run a requested function locally and return it's result.
|
"""Run a requested local function from a namespace path and
|
||||||
|
return it's result.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
obj = self.actor if ns == 'self' else importlib.import_module(ns)
|
obj = self.actor if ns == 'self' else importlib.import_module(ns)
|
||||||
func = getattr(obj, func_name)
|
func = getattr(obj, func_name)
|
||||||
|
|
Loading…
Reference in New Issue