Merge pull request #295 from goodboy/nspaths

`NamespacePath`: a message compatible "object reference" type
sort_subs_results_infected_aio
goodboy 2022-01-30 12:40:05 -05:00 committed by GitHub
commit 26bebc42b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 74 additions and 14 deletions

View File

@ -0,0 +1,3 @@
Add an experimental `tractor.msg.NamespacePath` type for passing Python
objects by "reference" through a ``str``-subtype message and using the
new ``pkgutil.resolve_name()`` for reference loading.

View File

@ -14,11 +14,12 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
'''
Memory boundary "Portals": an API for structured
concurrency linked tasks running in disparate memory domains.
"""
'''
from __future__ import annotations
import importlib
import inspect
from typing import (
@ -36,6 +37,7 @@ from async_generator import asynccontextmanager
from ._state import current_actor
from ._ipc import Channel
from .log import get_logger
from .msg import NamespacePath
from ._exceptions import (
unpack_error,
NoResult,
@ -66,13 +68,6 @@ async def maybe_open_nursery(
yield nursery
def func_deats(func: Callable) -> tuple[str, str]:
return (
func.__module__,
func.__name__,
)
def _unwrap_msg(
msg: dict[str, Any],
@ -86,6 +81,7 @@ def _unwrap_msg(
assert msg.get('cid'), "Received internal error at portal?"
raise unpack_error(msg, channel)
class MessagingError(Exception):
'Some kind of unexpected SC messaging dialog issue'
@ -316,7 +312,7 @@ class Portal:
raise TypeError(
f'{func} must be a non-streaming async function!')
fn_mod_path, fn_name = func_deats(func)
fn_mod_path, fn_name = NamespacePath.from_ref(func).to_tuple()
ctx = await self.actor.start_remote_task(
self.channel,
@ -346,7 +342,8 @@ class Portal:
raise TypeError(
f'{async_gen_func} must be an async generator function!')
fn_mod_path, fn_name = func_deats(async_gen_func)
fn_mod_path, fn_name = NamespacePath.from_ref(
async_gen_func).to_tuple()
ctx = await self.actor.start_remote_task(
self.channel,
fn_mod_path,
@ -412,7 +409,7 @@ class Portal:
raise TypeError(
f'{func} must be an async generator function!')
fn_mod_path, fn_name = func_deats(func)
fn_mod_path, fn_name = NamespacePath.from_ref(func).to_tuple()
ctx = await self.actor.start_remote_task(
self.channel,
@ -430,7 +427,7 @@ class Portal:
first = msg['started']
ctx._started_called = True
except KeyError as kerr:
except KeyError:
assert msg.get('cid'), ("Received internal error at context?")
if msg.get('error'):

View File

@ -15,6 +15,66 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Coming soon!
Built-in messaging patterns, types, APIs and helpers.
'''
# TODO: integration with our ``enable_modules: list[str]`` caps sys.
# ``pkgutil.resolve_name()`` internally uses
# ``importlib.import_module()`` which can be filtered by inserting
# a ``MetaPathFinder`` into ``sys.meta_path`` (which we could do before
# entering the ``Actor._process_messages()`` loop).
# - https://github.com/python/cpython/blob/main/Lib/pkgutil.py#L645
# - https://stackoverflow.com/questions/1350466/preventing-python-code-from-importing-certain-modules
# - https://stackoverflow.com/a/63320902
# - https://docs.python.org/3/library/sys.html#sys.meta_path
# the new "Implicit Namespace Packages" might be relevant?
# - https://www.python.org/dev/peps/pep-0420/
# add implicit serialized message type support so that paths can be
# handed directly to IPC primitives such as streams and `Portal.run()`
# calls:
# - via ``msgspec``:
# - https://jcristharif.com/msgspec/api.html#struct
# - https://jcristharif.com/msgspec/extending.html
# via ``msgpack-python``:
# - https://github.com/msgpack/msgpack-python#packingunpacking-of-custom-data-type
from __future__ import annotations
from pkgutil import resolve_name
class NamespacePath(str):
'''
A serializeable description of a (function) Python object location
described by the target's module path and namespace key meant as
a message-native "packet" to allows actors to point-and-load objects
by absolute reference.
'''
_ref: object = None
def load_ref(self) -> object:
if self._ref is None:
self._ref = resolve_name(self)
return self._ref
def to_tuple(
self,
) -> tuple[str, str]:
ref = self.load_ref()
return ref.__module__, getattr(ref, '__name__', '')
@classmethod
def from_ref(
cls,
ref,
) -> NamespacePath:
return cls(':'.join(
(ref.__module__,
getattr(ref, '__name__', ''))
))