diff --git a/.travis.yml b/.travis.yml index 956a5bf..d8f58be 100644 --- a/.travis.yml +++ b/.travis.yml @@ -31,5 +31,5 @@ install: - pip install -U . -r requirements-test.txt --upgrade-strategy eager script: - - mypy tractor/ --ignore-missing-imports + - mypy tractor/ --ignore-missing-imports - pytest tests/ --no-print-logs diff --git a/requirements-test.txt b/requirements-test.txt index 447b18c..e9dae32 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -2,3 +2,4 @@ pytest pytest-trio pdbpp mypy +trio_typing diff --git a/setup.py b/setup.py index 190070e..cb44d1c 100755 --- a/setup.py +++ b/setup.py @@ -38,7 +38,8 @@ setup( 'tractor.testing', ], install_requires=[ - 'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt'], + 'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt', + ], tests_require=['pytest'], python_requires=">=3.7", keywords=[ diff --git a/tractor/__init__.py b/tractor/__init__.py index fb9b095..10bb895 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -4,7 +4,7 @@ tractor: An actor model micro-framework built on """ import importlib from functools import partial -from typing import Tuple, Any +from typing import Tuple, Any, Optional import typing import trio # type: ignore @@ -47,8 +47,8 @@ async def _main( async_fn: typing.Callable[..., typing.Awaitable], args: Tuple, kwargs: typing.Dict[str, typing.Any], - name: str, - arbiter_addr: Tuple[str, int] + arbiter_addr: Tuple[str, int], + name: Optional[str] = None, ) -> typing.Any: """Async entry point for ``tractor``. """ @@ -89,26 +89,27 @@ async def _main( # for it to stay up indefinitely until a re-election process has # taken place - which is not implemented yet FYI). return await _start_actor( - actor, main, host, port, arbiter_addr=arbiter_addr) + actor, main, host, port, arbiter_addr=arbiter_addr + ) def run( async_fn: typing.Callable[..., typing.Awaitable], - *args: Tuple, - name: str = None, + *args, + name: Optional[str] = None, arbiter_addr: Tuple[str, int] = ( _default_arbiter_host, _default_arbiter_port), # the `multiprocessing` start method: # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods start_method: str = 'forkserver', - **kwargs: typing.Dict[str, typing.Any], + **kwargs, ) -> Any: """Run a trio-actor async function in process. This is tractor's main entry and the start point for any async actor. """ _spawn.try_set_start_method(start_method) - return trio.run(_main, async_fn, args, kwargs, name, arbiter_addr) + return trio.run(_main, async_fn, args, kwargs, arbiter_addr, name) def run_daemon( diff --git a/tractor/_actor.py b/tractor/_actor.py index 5b5e7ff..22e2253 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -11,6 +11,7 @@ import typing from typing import Dict, List, Tuple, Any, Optional import trio # type: ignore +from trio_typing import TaskStatus from async_generator import aclosing from ._ipc import Channel @@ -155,7 +156,11 @@ class Actor: with other actors through "portals" which provide a native async API around "channels". """ - is_arbiter = False + is_arbiter: bool = False + + # placeholders filled in by `_async_main` after fork + _root_nursery: trio.Nursery + _server_nursery: trio.Nursery def __init__( self, @@ -170,15 +175,13 @@ class Actor: self.uid = (name, uid or str(uuid.uuid4())) self.rpc_module_paths = rpc_module_paths self._mods: dict = {} + # TODO: consider making this a dynamically defined # @dataclass once we get py3.7 self.statespace = statespace or {} self.loglevel = loglevel self._arb_addr = arbiter_addr - # filled in by `_async_main` after fork - self._root_nursery: trio._core._run.Nursery = None - self._server_nursery: trio._core._run.Nursery = None self._peers: defaultdict = defaultdict(list) self._peer_connected: dict = {} self._no_more_peers = trio.Event() @@ -188,15 +191,20 @@ class Actor: # (chan, cid) -> (cancel_scope, func) self._rpc_tasks: Dict[ Tuple[Channel, str], - Tuple[trio._core._run.CancelScope, typing.Callable, trio.Event] + Tuple[trio.CancelScope, typing.Callable, trio.Event] ] = {} # map {uids -> {callids -> waiter queues}} self._cids2qs: Dict[ Tuple[Tuple[str, str], str], - trio.abc.SendChannel[Any]] = {} + Tuple[ + trio.abc.SendChannel[Any], + trio.abc.ReceiveChannel[Any] + ] + ] = {} self._listeners: List[trio.abc.Listener] = [] self._parent_chan: Optional[Channel] = None - self._forkserver_info: Optional[Tuple[Any, Any, Any, Any, Any]] = None + self._forkserver_info: Optional[ + Tuple[Any, Any, Any, Any, Any]] = None async def wait_for_peer( self, uid: Tuple[str, str] @@ -303,8 +311,8 @@ class Actor: actorid = chan.uid assert actorid, f"`actorid` can't be {actorid}" cid = msg['cid'] - send_chan = self._cids2qs[(actorid, cid)] - assert send_chan.cid == cid + send_chan, recv_chan = self._cids2qs[(actorid, cid)] + assert send_chan.cid == cid # type: ignore if 'stop' in msg: log.debug(f"{send_chan} was terminated at remote end") return await send_chan.aclose() @@ -321,16 +329,17 @@ class Actor: self, actorid: Tuple[str, str], cid: str - ) -> trio.abc.ReceiveChannel: + ) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]: log.debug(f"Getting result queue for {actorid} cid {cid}") try: - recv_chan = self._cids2qs[(actorid, cid)] + send_chan, recv_chan = self._cids2qs[(actorid, cid)] except KeyError: send_chan, recv_chan = trio.open_memory_channel(1000) - send_chan.cid = cid - self._cids2qs[(actorid, cid)] = send_chan + send_chan.cid = cid # type: ignore + recv_chan.cid = cid # type: ignore + self._cids2qs[(actorid, cid)] = send_chan, recv_chan - return recv_chan + return send_chan, recv_chan async def send_cmd( self, @@ -345,7 +354,7 @@ class Actor: """ cid = str(uuid.uuid4()) assert chan.uid - recv_chan = self.get_memchans(chan.uid, cid) + send_chan, recv_chan = self.get_memchans(chan.uid, cid) log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) return cid, recv_chan @@ -355,7 +364,7 @@ class Actor: chan: Channel, treat_as_gen: bool = False, shield: bool = False, - task_status=trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> None: """Process messages for the channel async-RPC style. @@ -436,8 +445,8 @@ class Actor: # spin up a task for the requested function log.debug(f"Spawning task for {func}") cs = await self._root_nursery.start( - _invoke, self, cid, chan, func, kwargs, - name=funcname + partial(_invoke, self, cid, chan, func, kwargs), + name=funcname, ) # never allow cancelling cancel requests (results in # deadlock and other weird behaviour) @@ -511,7 +520,7 @@ class Actor: accept_addr: Tuple[str, int], arbiter_addr: Optional[Tuple[str, int]] = None, parent_addr: Optional[Tuple[str, int]] = None, - task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: """Start the channel server, maybe connect back to the parent, and start the main task. @@ -548,10 +557,12 @@ class Actor: " closing server") await self.cancel() self._parent_chan = None - - # handle new connection back to parent - nursery.start_soon( - self._process_messages, self._parent_chan) + raise + else: + # handle new connection back to parent + assert self._parent_chan + nursery.start_soon( + self._process_messages, self._parent_chan) # load exposed/allowed RPC modules # XXX: do this **after** establishing connection to parent @@ -560,6 +571,7 @@ class Actor: # register with the arbiter if we're told its addr log.debug(f"Registering {self} for role `{self.name}`") + assert isinstance(arbiter_addr, tuple) async with get_arbiter(*arbiter_addr) as arb_portal: await arb_portal.run( 'self', 'register_actor', @@ -615,7 +627,7 @@ class Actor: # (host, port) to bind for channel server accept_host: Tuple[str, int] = None, accept_port: int = 0, - task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: """Start the channel server, begin listening for new connections. @@ -627,7 +639,7 @@ class Actor: # TODO: might want to consider having a separate nursery # for the stream handler such that the server can be cancelled # whilst leaving existing channels up - listeners = await nursery.start( + listeners: List[trio.abc.Listener] = await nursery.start( partial( trio.serve_tcp, self._stream_handler, @@ -638,7 +650,7 @@ class Actor: ) ) log.debug( - f"Started tcp server(s) on {[l.socket for l in listeners]}") + f"Started tcp server(s) on {[l.socket for l in listeners]}") # type: ignore self._listeners.extend(listeners) task_status.started() @@ -720,13 +732,11 @@ class Actor: self._server_nursery.cancel_scope.cancel() @property - def accept_addr(self) -> Optional[Tuple[str, int]]: + def accept_addr(self) -> Tuple[str, int]: """Primary address to which the channel server is bound. """ - try: - return self._listeners[0].socket.getsockname() - except OSError: - return None + # throws OSError on failure + return self._listeners[0].socket.getsockname() # type: ignore def get_parent(self) -> Portal: """Return a portal to our parent actor.""" @@ -826,7 +836,7 @@ async def _start_actor( host: str, port: int, arbiter_addr: Tuple[str, int], - nursery: trio._core._run.Nursery = None + nursery: trio.Nursery = None ): """Spawn a local actor by starting a task to execute it's main async function. diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 94f978f..0d24307 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -17,9 +17,16 @@ class MsgpackStream: """ def __init__(self, stream: trio.SocketStream) -> None: self.stream = stream + assert self.stream.socket + # should both be IP sockets + lsockname = stream.socket.getsockname() + assert isinstance(lsockname, tuple) + self._laddr = lsockname[:2] + rsockname = stream.socket.getpeername() + assert isinstance(rsockname, tuple) + self._raddr = rsockname[:2] + self._agen = self._iter_packets() - self._laddr = self.stream.socket.getsockname()[:2] - self._raddr = self.stream.socket.getpeername()[:2] self._send_lock = trio.StrictFIFOLock() async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: @@ -43,14 +50,15 @@ class MsgpackStream: yield packet @property - def laddr(self) -> Tuple[str, int]: + def laddr(self) -> Tuple[Any, ...]: return self._laddr @property - def raddr(self) -> Tuple[str, int]: + def raddr(self) -> Tuple[Any, ...]: return self._raddr - async def send(self, data: Any) -> int: + # XXX: should this instead be called `.sendall()`? + async def send(self, data: Any) -> None: async with self._send_lock: return await self.stream.send_all( msgpack.dumps(data, use_bin_type=True)) @@ -95,24 +103,26 @@ class Channel: def __repr__(self) -> str: if self.msgstream: return repr( - self.msgstream.stream.socket._sock).replace( + self.msgstream.stream.socket._sock).replace( # type: ignore "socket.socket", "Channel") return object.__repr__(self) @property - def laddr(self) -> Optional[Tuple[str, int]]: + def laddr(self) -> Optional[Tuple[Any, ...]]: return self.msgstream.laddr if self.msgstream else None @property - def raddr(self) -> Optional[Tuple[str, int]]: + def raddr(self) -> Optional[Tuple[Any, ...]]: return self.msgstream.raddr if self.msgstream else None async def connect( - self, destaddr: Tuple[str, int] = None, **kwargs + self, destaddr: Tuple[Any, ...] = None, + **kwargs ) -> trio.SocketStream: if self.connected(): raise RuntimeError("channel is already connected?") destaddr = destaddr or self._destaddr + assert isinstance(destaddr, tuple) stream = await trio.open_tcp_stream(*destaddr, **kwargs) self.msgstream = MsgpackStream(stream) return stream diff --git a/tractor/_portal.py b/tractor/_portal.py index 925e3ad..8f5899b 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -21,7 +21,9 @@ log = get_logger('tractor') @asynccontextmanager -async def maybe_open_nursery(nursery: trio._core._run.Nursery = None): +async def maybe_open_nursery( + nursery: trio.Nursery = None +) -> typing.AsyncGenerator[trio.Nursery, Any]: """Create a new nursery if None provided. Blocks on exit as expected if no input nursery is provided. @@ -252,14 +254,14 @@ class Portal: for stream in self._streams.copy(): await stream.aclose() - async def aclose(self) -> None: + async def aclose(self): log.debug(f"Closing {self}") # TODO: once we move to implementing our own `ReceiveChannel` # (including remote task cancellation inside its `.aclose()`) # we'll need to .aclose all those channels here await self._cancel_streams() - async def cancel_actor(self) -> bool: + async def cancel_actor(self): """Cancel the actor on the other end of this portal. """ if not self.channel.connected(): @@ -279,7 +281,9 @@ class Portal: return True if cancel_scope.cancelled_caught: log.warning(f"May have failed to cancel {self.channel.uid}") - return False + + # if we get here some weird cancellation case happened + return False except trio.ClosedResourceError: log.warning( f"{self.channel} for {self.channel.uid} was already closed?") @@ -309,7 +313,7 @@ class LocalPortal: @asynccontextmanager async def open_portal( channel: Channel, - nursery: trio._core._run.Nursery = None + nursery: Optional[trio.Nursery] = None ) -> typing.AsyncGenerator[Portal, None]: """Open a ``Portal`` through the provided ``channel``. @@ -320,7 +324,6 @@ async def open_portal( was_connected = False async with maybe_open_nursery(nursery) as nursery: - if not channel.connected(): await channel.connect() was_connected = True @@ -328,7 +331,7 @@ async def open_portal( if channel.uid is None: await actor._do_handshake(channel) - msg_loop_cs = await nursery.start( + msg_loop_cs: trio.CancelScope = await nursery.start( partial( actor._process_messages, channel, diff --git a/tractor/_spawn.py b/tractor/_spawn.py index a759946..f299b9d 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -21,7 +21,7 @@ from ._state import current_actor from ._actor import Actor -_ctx: mp.context.BaseContext = mp.get_context("spawn") +_ctx: mp.context.BaseContext = mp.get_context("spawn") # type: ignore def try_set_start_method(name: str) -> mp.context.BaseContext: @@ -95,7 +95,7 @@ def new_proc( else: fs_info = (None, None, None, None, None) - return _ctx.Process( + return _ctx.Process( # type: ignore target=actor._fork_main, args=( bind_addr, diff --git a/tractor/_state.py b/tractor/_state.py index 704fae7..ea0d547 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -2,7 +2,9 @@ Per process state """ from typing import Optional +from collections import Mapping +import trio _current_actor: Optional['Actor'] = None # type: ignore @@ -10,6 +12,27 @@ _current_actor: Optional['Actor'] = None # type: ignore def current_actor() -> 'Actor': # type: ignore """Get the process-local actor instance. """ - if not _current_actor: - raise RuntimeError("No actor instance has been defined yet?") + if _current_actor is None: + raise RuntimeError("No local actor has been initialized yet") return _current_actor + + +class ActorContextInfo(Mapping): + "Dyanmic lookup for local actor and task names" + _context_keys = ('task', 'actor') + + def __len__(self): + return len(self._context_keys) + + def __iter__(self): + return iter(self._context_keys) + + def __getitem__(self, key: str): + try: + return { + 'task': trio.hazmat.current_task, + 'actor': current_actor + }[key]().name + except RuntimeError: + # no local actor/task context initialized yet + return f'no {key} context' diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 6954367..5250f81 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -64,7 +64,6 @@ class ActorNursery: arbiter_addr=current_actor()._arb_addr, ) parent_addr = self._actor.accept_addr - assert parent_addr proc = _spawn.new_proc( name, actor, @@ -192,8 +191,7 @@ class ActorNursery: async def wait_for_proc( proc: mp.Process, actor: Actor, - portal: Portal, - cancel_scope: Optional[trio._core._run.CancelScope] = None, + cancel_scope: Optional[trio.CancelScope] = None, ) -> None: # TODO: timeout block here? if proc.is_alive(): @@ -207,7 +205,7 @@ class ActorNursery: # proc terminated, cancel result waiter that may have # been spawned in tandem if not done already - if cancel_scope: # and not portal._cancelled: + if cancel_scope: log.warning( f"Cancelling existing result waiter task for {actor.uid}") cancel_scope.cancel() @@ -223,11 +221,12 @@ class ActorNursery: cs = None # portal from ``run_in_actor()`` if portal in self._cancel_after_result_on_exit: + assert portal cs = await nursery.start( cancel_on_completion, portal, subactor) # TODO: how do we handle remote host spawned actors? nursery.start_soon( - wait_for_proc, proc, subactor, portal, cs) + wait_for_proc, proc, subactor, cs) if errors: if not self.cancelled: @@ -247,7 +246,8 @@ class ActorNursery: async with trio.open_nursery() as nursery: for subactor, proc, portal in children.values(): # TODO: how do we handle remote host spawned actors? - nursery.start_soon(wait_for_proc, proc, subactor, portal, cs) + assert portal + nursery.start_soon(wait_for_proc, proc, subactor, cs) log.debug(f"All subactors for {self} have terminated") if errors: diff --git a/tractor/log.py b/tractor/log.py index 9433e93..2f04c7b 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -7,6 +7,8 @@ import logging import colorlog # type: ignore from typing import Optional +from ._state import ActorContextInfo + _proj_name = 'tractor' _default_loglevel = None @@ -18,7 +20,7 @@ LOG_FORMAT = ( # "{bold_white}{log_color}{asctime}{reset}" "{log_color}{asctime}{reset}" " {bold_white}{thin_white}({reset}" - "{thin_white}{processName}: {threadName}{reset}{bold_white}{thin_white})" + "{thin_white}{actor}, {process}, {task}){reset}{bold_white}{thin_white})" " {reset}{log_color}[{reset}{bold_log_color}{levelname}{reset}{log_color}]" " {log_color}{name}" " {thin_white}{filename}{log_color}:{reset}{thin_white}{lineno}{log_color}" @@ -46,29 +48,40 @@ BOLD_PALETTE = { } -def get_logger(name: str = None) -> logging.Logger: +def get_logger( + name: str = None, + _root_name: str = _proj_name, +) -> logging.LoggerAdapter: '''Return the package log or a sub-log for `name` if provided. ''' - log = rlog = logging.getLogger(_proj_name) + log = rlog = logging.getLogger(_root_name) if name and name != _proj_name: log = rlog.getChild(name) log.level = rlog.level + # add our actor-task aware adapter which will dynamically look up + # the actor and task names at each log emit + logger = logging.LoggerAdapter(log, ActorContextInfo()) + # additional levels for name, val in LEVELS.items(): logging.addLevelName(val, name) - # ex. create ``log.trace()`` - setattr(log, name.lower(), partial(log.log, val)) + # ex. create ``logger.trace()`` + setattr(logger, name.lower(), partial(logger.log, val)) - return log + return logger -def get_console_log(level: str = None, name: str = None) -> logging.Logger: +def get_console_log( + level: str = None, + **kwargs, +) -> logging.LoggerAdapter: '''Get the package logger and enable a handler which writes to stderr. - Yeah yeah, i know we can use ``DictConfig``. You do it... + Yeah yeah, i know we can use ``DictConfig``. You do it. ''' - log = get_logger(name) # our root logger + log = get_logger(**kwargs) # our root logger + logger = log.logger if not level: return log @@ -77,7 +90,7 @@ def get_console_log(level: str = None, name: str = None) -> logging.Logger: if not any( handler.stream == sys.stderr # type: ignore - for handler in log.handlers if getattr(handler, 'stream', None) + for handler in logger.handlers if getattr(handler, 'stream', None) ): handler = logging.StreamHandler() formatter = colorlog.ColoredFormatter( @@ -88,7 +101,7 @@ def get_console_log(level: str = None, name: str = None) -> logging.Logger: style='{', ) handler.setFormatter(formatter) - log.addHandler(handler) + logger.addHandler(handler) return log