From 52efbfc2cdd9a2bf4ff277e6660853d540e542be Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 1 Dec 2019 23:26:25 -0500 Subject: [PATCH 1/7] Log task and actor names where possible Prepend the actor and task names in each log emission. This makes debugging much more sane since you can see from which process and running task the log message originates from! Resolves #13 --- tractor/_state.py | 25 +++++++++++++++++++++++-- tractor/log.py | 15 +++++++++++---- 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/tractor/_state.py b/tractor/_state.py index 704fae7..f479d41 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -3,6 +3,7 @@ Per process state """ from typing import Optional +import trio _current_actor: Optional['Actor'] = None # type: ignore @@ -10,6 +11,26 @@ _current_actor: Optional['Actor'] = None # type: ignore def current_actor() -> 'Actor': # type: ignore """Get the process-local actor instance. """ - if not _current_actor: - raise RuntimeError("No actor instance has been defined yet?") + if _current_actor is None: + raise RuntimeError("No local actor has been initialized yet") return _current_actor + + +class ActorContextInfo: + "Dyanmic lookup for local actor and task names" + def __iter__(self): + return iter(('task', 'actor')) + + def __getitem__(self, key: str): + if key == 'task': + try: + return trio._core.current_task().name + except RuntimeError: + # not inside `trio.run()` yet + return 'no task context' + elif key == 'actor': + try: + return current_actor().name + except RuntimeError: + # no local actor initialize yet + return 'no actor context' diff --git a/tractor/log.py b/tractor/log.py index 9433e93..769fae4 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -7,6 +7,8 @@ import logging import colorlog # type: ignore from typing import Optional +from ._state import ActorContextInfo + _proj_name = 'tractor' _default_loglevel = None @@ -18,7 +20,7 @@ LOG_FORMAT = ( # "{bold_white}{log_color}{asctime}{reset}" "{log_color}{asctime}{reset}" " {bold_white}{thin_white}({reset}" - "{thin_white}{processName}: {threadName}{reset}{bold_white}{thin_white})" + "{thin_white}{actor}, {process}, {task}){reset}{bold_white}{thin_white})" " {reset}{log_color}[{reset}{bold_log_color}{levelname}{reset}{log_color}]" " {log_color}{name}" " {thin_white}{filename}{log_color}:{reset}{thin_white}{lineno}{log_color}" @@ -54,6 +56,10 @@ def get_logger(name: str = None) -> logging.Logger: log = rlog.getChild(name) log.level = rlog.level + # add our actor-task aware adapter which will dynamically look up + # the actor and task names at each log emit + log = logging.LoggerAdapter(log, ActorContextInfo()) + # additional levels for name, val in LEVELS.items(): logging.addLevelName(val, name) @@ -66,9 +72,10 @@ def get_logger(name: str = None) -> logging.Logger: def get_console_log(level: str = None, name: str = None) -> logging.Logger: '''Get the package logger and enable a handler which writes to stderr. - Yeah yeah, i know we can use ``DictConfig``. You do it... + Yeah yeah, i know we can use ``DictConfig``. You do it. ''' log = get_logger(name) # our root logger + logger = log.logger if not level: return log @@ -77,7 +84,7 @@ def get_console_log(level: str = None, name: str = None) -> logging.Logger: if not any( handler.stream == sys.stderr # type: ignore - for handler in log.handlers if getattr(handler, 'stream', None) + for handler in logger.handlers if getattr(handler, 'stream', None) ): handler = logging.StreamHandler() formatter = colorlog.ColoredFormatter( @@ -88,7 +95,7 @@ def get_console_log(level: str = None, name: str = None) -> logging.Logger: style='{', ) handler.setFormatter(formatter) - log.addHandler(handler) + logger.addHandler(handler) return log From cf732835869ad837bf16f8af72403456833f1e05 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 8 Dec 2019 19:40:08 -0500 Subject: [PATCH 2/7] Make info object a mapping type Make the info object a `Mapping` to play nicer with static type checking. Simplify the task or actor context method lookup using a dict. --- tractor/_state.py | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/tractor/_state.py b/tractor/_state.py index f479d41..ea0d547 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -2,6 +2,7 @@ Per process state """ from typing import Optional +from collections import Mapping import trio @@ -16,21 +17,22 @@ def current_actor() -> 'Actor': # type: ignore return _current_actor -class ActorContextInfo: +class ActorContextInfo(Mapping): "Dyanmic lookup for local actor and task names" + _context_keys = ('task', 'actor') + + def __len__(self): + return len(self._context_keys) + def __iter__(self): - return iter(('task', 'actor')) + return iter(self._context_keys) def __getitem__(self, key: str): - if key == 'task': - try: - return trio._core.current_task().name - except RuntimeError: - # not inside `trio.run()` yet - return 'no task context' - elif key == 'actor': - try: - return current_actor().name - except RuntimeError: - # no local actor initialize yet - return 'no actor context' + try: + return { + 'task': trio.hazmat.current_task, + 'actor': current_actor + }[key]().name + except RuntimeError: + # no local actor/task context initialized yet + return f'no {key} context' From 14bfef0df73cdc112ed9b35f8fdeff954b8287da Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 9 Dec 2019 22:10:15 -0500 Subject: [PATCH 3/7] Update types for log adapter --- tractor/log.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/tractor/log.py b/tractor/log.py index 769fae4..abe5a72 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -48,7 +48,9 @@ BOLD_PALETTE = { } -def get_logger(name: str = None) -> logging.Logger: +def get_logger( + name: str = None +) -> logging.LoggerAdapter: '''Return the package log or a sub-log for `name` if provided. ''' log = rlog = logging.getLogger(_proj_name) @@ -58,18 +60,21 @@ def get_logger(name: str = None) -> logging.Logger: # add our actor-task aware adapter which will dynamically look up # the actor and task names at each log emit - log = logging.LoggerAdapter(log, ActorContextInfo()) + logger = logging.LoggerAdapter(log, ActorContextInfo()) # additional levels for name, val in LEVELS.items(): logging.addLevelName(val, name) - # ex. create ``log.trace()`` - setattr(log, name.lower(), partial(log.log, val)) + # ex. create ``logger.trace()`` + setattr(logger, name.lower(), partial(logger.log, val)) - return log + return logger -def get_console_log(level: str = None, name: str = None) -> logging.Logger: +def get_console_log( + level: str = None, + name: str = None +) -> logging.LoggerAdapter: '''Get the package logger and enable a handler which writes to stderr. Yeah yeah, i know we can use ``DictConfig``. You do it. From 7947eeebff6182ac6c1409c00c2d7753196adeb5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 9 Dec 2019 22:56:13 -0500 Subject: [PATCH 4/7] Use trio_typing stubs --- setup.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 190070e..4f56472 100755 --- a/setup.py +++ b/setup.py @@ -38,7 +38,9 @@ setup( 'tractor.testing', ], install_requires=[ - 'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt'], + 'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt', + 'trio_typing', + ], tests_require=['pytest'], python_requires=">=3.7", keywords=[ From 79c152fe381ce30c45e45688f49835d35eeca894 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 10 Dec 2019 00:55:03 -0500 Subject: [PATCH 5/7] Make latest mpypy happy --- tractor/__init__.py | 17 ++++++------ tractor/_actor.py | 66 +++++++++++++++++++++++++------------------- tractor/_ipc.py | 28 +++++++++++++------ tractor/_portal.py | 15 ++++++---- tractor/_trionics.py | 12 ++++---- 5 files changed, 81 insertions(+), 57 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index fb9b095..10bb895 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -4,7 +4,7 @@ tractor: An actor model micro-framework built on """ import importlib from functools import partial -from typing import Tuple, Any +from typing import Tuple, Any, Optional import typing import trio # type: ignore @@ -47,8 +47,8 @@ async def _main( async_fn: typing.Callable[..., typing.Awaitable], args: Tuple, kwargs: typing.Dict[str, typing.Any], - name: str, - arbiter_addr: Tuple[str, int] + arbiter_addr: Tuple[str, int], + name: Optional[str] = None, ) -> typing.Any: """Async entry point for ``tractor``. """ @@ -89,26 +89,27 @@ async def _main( # for it to stay up indefinitely until a re-election process has # taken place - which is not implemented yet FYI). return await _start_actor( - actor, main, host, port, arbiter_addr=arbiter_addr) + actor, main, host, port, arbiter_addr=arbiter_addr + ) def run( async_fn: typing.Callable[..., typing.Awaitable], - *args: Tuple, - name: str = None, + *args, + name: Optional[str] = None, arbiter_addr: Tuple[str, int] = ( _default_arbiter_host, _default_arbiter_port), # the `multiprocessing` start method: # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods start_method: str = 'forkserver', - **kwargs: typing.Dict[str, typing.Any], + **kwargs, ) -> Any: """Run a trio-actor async function in process. This is tractor's main entry and the start point for any async actor. """ _spawn.try_set_start_method(start_method) - return trio.run(_main, async_fn, args, kwargs, name, arbiter_addr) + return trio.run(_main, async_fn, args, kwargs, arbiter_addr, name) def run_daemon( diff --git a/tractor/_actor.py b/tractor/_actor.py index 5b5e7ff..6755b0b 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -11,6 +11,7 @@ import typing from typing import Dict, List, Tuple, Any, Optional import trio # type: ignore +from trio_typing import TaskStatus from async_generator import aclosing from ._ipc import Channel @@ -155,7 +156,11 @@ class Actor: with other actors through "portals" which provide a native async API around "channels". """ - is_arbiter = False + is_arbiter: bool = False + + # placeholders filled in by `_async_main` after fork + _root_nursery: trio.Nursery + _server_nursery: trio.Nursery def __init__( self, @@ -170,15 +175,13 @@ class Actor: self.uid = (name, uid or str(uuid.uuid4())) self.rpc_module_paths = rpc_module_paths self._mods: dict = {} + # TODO: consider making this a dynamically defined # @dataclass once we get py3.7 self.statespace = statespace or {} self.loglevel = loglevel self._arb_addr = arbiter_addr - # filled in by `_async_main` after fork - self._root_nursery: trio._core._run.Nursery = None - self._server_nursery: trio._core._run.Nursery = None self._peers: defaultdict = defaultdict(list) self._peer_connected: dict = {} self._no_more_peers = trio.Event() @@ -188,15 +191,20 @@ class Actor: # (chan, cid) -> (cancel_scope, func) self._rpc_tasks: Dict[ Tuple[Channel, str], - Tuple[trio._core._run.CancelScope, typing.Callable, trio.Event] + Tuple[trio.CancelScope, typing.Callable, trio.Event] ] = {} # map {uids -> {callids -> waiter queues}} self._cids2qs: Dict[ Tuple[Tuple[str, str], str], - trio.abc.SendChannel[Any]] = {} + Tuple[ + trio.abc.SendChannel[Any], + trio.abc.ReceiveChannel[Any] + ] + ] = {} self._listeners: List[trio.abc.Listener] = [] self._parent_chan: Optional[Channel] = None - self._forkserver_info: Optional[Tuple[Any, Any, Any, Any, Any]] = None + self._forkserver_info: Optional[ + Tuple[Any, Any, Any, Any, Any]] = None async def wait_for_peer( self, uid: Tuple[str, str] @@ -303,8 +311,8 @@ class Actor: actorid = chan.uid assert actorid, f"`actorid` can't be {actorid}" cid = msg['cid'] - send_chan = self._cids2qs[(actorid, cid)] - assert send_chan.cid == cid + send_chan, recv_chan = self._cids2qs[(actorid, cid)] + assert send_chan.cid == cid # type: ignore if 'stop' in msg: log.debug(f"{send_chan} was terminated at remote end") return await send_chan.aclose() @@ -321,16 +329,17 @@ class Actor: self, actorid: Tuple[str, str], cid: str - ) -> trio.abc.ReceiveChannel: + ) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]: log.debug(f"Getting result queue for {actorid} cid {cid}") try: - recv_chan = self._cids2qs[(actorid, cid)] + send_chan, recv_chan = self._cids2qs[(actorid, cid)] except KeyError: send_chan, recv_chan = trio.open_memory_channel(1000) - send_chan.cid = cid - self._cids2qs[(actorid, cid)] = send_chan + send_chan.cid = cid # type: ignore + recv_chan.cid = cid # type: ignore + self._cids2qs[(actorid, cid)] = send_chan, recv_chan - return recv_chan + return send_chan, recv_chan async def send_cmd( self, @@ -345,7 +354,7 @@ class Actor: """ cid = str(uuid.uuid4()) assert chan.uid - recv_chan = self.get_memchans(chan.uid, cid) + send_chan, recv_chan = self.get_memchans(chan.uid, cid) log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) return cid, recv_chan @@ -355,7 +364,7 @@ class Actor: chan: Channel, treat_as_gen: bool = False, shield: bool = False, - task_status=trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> None: """Process messages for the channel async-RPC style. @@ -511,7 +520,7 @@ class Actor: accept_addr: Tuple[str, int], arbiter_addr: Optional[Tuple[str, int]] = None, parent_addr: Optional[Tuple[str, int]] = None, - task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: """Start the channel server, maybe connect back to the parent, and start the main task. @@ -548,10 +557,12 @@ class Actor: " closing server") await self.cancel() self._parent_chan = None - - # handle new connection back to parent - nursery.start_soon( - self._process_messages, self._parent_chan) + raise + else: + # handle new connection back to parent + assert self._parent_chan + nursery.start_soon( + self._process_messages, self._parent_chan) # load exposed/allowed RPC modules # XXX: do this **after** establishing connection to parent @@ -560,6 +571,7 @@ class Actor: # register with the arbiter if we're told its addr log.debug(f"Registering {self} for role `{self.name}`") + assert isinstance(arbiter_addr, tuple) async with get_arbiter(*arbiter_addr) as arb_portal: await arb_portal.run( 'self', 'register_actor', @@ -615,7 +627,7 @@ class Actor: # (host, port) to bind for channel server accept_host: Tuple[str, int] = None, accept_port: int = 0, - task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: """Start the channel server, begin listening for new connections. @@ -720,13 +732,11 @@ class Actor: self._server_nursery.cancel_scope.cancel() @property - def accept_addr(self) -> Optional[Tuple[str, int]]: + def accept_addr(self) -> Tuple[str, int]: """Primary address to which the channel server is bound. """ - try: - return self._listeners[0].socket.getsockname() - except OSError: - return None + # throws OSError on failure + return self._listeners[0].socket.getsockname() # type: ignore def get_parent(self) -> Portal: """Return a portal to our parent actor.""" @@ -826,7 +836,7 @@ async def _start_actor( host: str, port: int, arbiter_addr: Tuple[str, int], - nursery: trio._core._run.Nursery = None + nursery: trio.Nursery = None ): """Spawn a local actor by starting a task to execute it's main async function. diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 94f978f..0d24307 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -17,9 +17,16 @@ class MsgpackStream: """ def __init__(self, stream: trio.SocketStream) -> None: self.stream = stream + assert self.stream.socket + # should both be IP sockets + lsockname = stream.socket.getsockname() + assert isinstance(lsockname, tuple) + self._laddr = lsockname[:2] + rsockname = stream.socket.getpeername() + assert isinstance(rsockname, tuple) + self._raddr = rsockname[:2] + self._agen = self._iter_packets() - self._laddr = self.stream.socket.getsockname()[:2] - self._raddr = self.stream.socket.getpeername()[:2] self._send_lock = trio.StrictFIFOLock() async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: @@ -43,14 +50,15 @@ class MsgpackStream: yield packet @property - def laddr(self) -> Tuple[str, int]: + def laddr(self) -> Tuple[Any, ...]: return self._laddr @property - def raddr(self) -> Tuple[str, int]: + def raddr(self) -> Tuple[Any, ...]: return self._raddr - async def send(self, data: Any) -> int: + # XXX: should this instead be called `.sendall()`? + async def send(self, data: Any) -> None: async with self._send_lock: return await self.stream.send_all( msgpack.dumps(data, use_bin_type=True)) @@ -95,24 +103,26 @@ class Channel: def __repr__(self) -> str: if self.msgstream: return repr( - self.msgstream.stream.socket._sock).replace( + self.msgstream.stream.socket._sock).replace( # type: ignore "socket.socket", "Channel") return object.__repr__(self) @property - def laddr(self) -> Optional[Tuple[str, int]]: + def laddr(self) -> Optional[Tuple[Any, ...]]: return self.msgstream.laddr if self.msgstream else None @property - def raddr(self) -> Optional[Tuple[str, int]]: + def raddr(self) -> Optional[Tuple[Any, ...]]: return self.msgstream.raddr if self.msgstream else None async def connect( - self, destaddr: Tuple[str, int] = None, **kwargs + self, destaddr: Tuple[Any, ...] = None, + **kwargs ) -> trio.SocketStream: if self.connected(): raise RuntimeError("channel is already connected?") destaddr = destaddr or self._destaddr + assert isinstance(destaddr, tuple) stream = await trio.open_tcp_stream(*destaddr, **kwargs) self.msgstream = MsgpackStream(stream) return stream diff --git a/tractor/_portal.py b/tractor/_portal.py index 925e3ad..bd49170 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -21,7 +21,9 @@ log = get_logger('tractor') @asynccontextmanager -async def maybe_open_nursery(nursery: trio._core._run.Nursery = None): +async def maybe_open_nursery( + nursery: trio.Nursery = None +) -> typing.AsyncGenerator[trio.Nursery, Any]: """Create a new nursery if None provided. Blocks on exit as expected if no input nursery is provided. @@ -252,14 +254,14 @@ class Portal: for stream in self._streams.copy(): await stream.aclose() - async def aclose(self) -> None: + async def aclose(self): log.debug(f"Closing {self}") # TODO: once we move to implementing our own `ReceiveChannel` # (including remote task cancellation inside its `.aclose()`) # we'll need to .aclose all those channels here await self._cancel_streams() - async def cancel_actor(self) -> bool: + async def cancel_actor(self): """Cancel the actor on the other end of this portal. """ if not self.channel.connected(): @@ -279,7 +281,9 @@ class Portal: return True if cancel_scope.cancelled_caught: log.warning(f"May have failed to cancel {self.channel.uid}") - return False + + # if we get here some weird cancellation case happened + return False except trio.ClosedResourceError: log.warning( f"{self.channel} for {self.channel.uid} was already closed?") @@ -309,7 +313,7 @@ class LocalPortal: @asynccontextmanager async def open_portal( channel: Channel, - nursery: trio._core._run.Nursery = None + nursery: Optional[trio.Nursery] = None ) -> typing.AsyncGenerator[Portal, None]: """Open a ``Portal`` through the provided ``channel``. @@ -320,7 +324,6 @@ async def open_portal( was_connected = False async with maybe_open_nursery(nursery) as nursery: - if not channel.connected(): await channel.connect() was_connected = True diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 6954367..5250f81 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -64,7 +64,6 @@ class ActorNursery: arbiter_addr=current_actor()._arb_addr, ) parent_addr = self._actor.accept_addr - assert parent_addr proc = _spawn.new_proc( name, actor, @@ -192,8 +191,7 @@ class ActorNursery: async def wait_for_proc( proc: mp.Process, actor: Actor, - portal: Portal, - cancel_scope: Optional[trio._core._run.CancelScope] = None, + cancel_scope: Optional[trio.CancelScope] = None, ) -> None: # TODO: timeout block here? if proc.is_alive(): @@ -207,7 +205,7 @@ class ActorNursery: # proc terminated, cancel result waiter that may have # been spawned in tandem if not done already - if cancel_scope: # and not portal._cancelled: + if cancel_scope: log.warning( f"Cancelling existing result waiter task for {actor.uid}") cancel_scope.cancel() @@ -223,11 +221,12 @@ class ActorNursery: cs = None # portal from ``run_in_actor()`` if portal in self._cancel_after_result_on_exit: + assert portal cs = await nursery.start( cancel_on_completion, portal, subactor) # TODO: how do we handle remote host spawned actors? nursery.start_soon( - wait_for_proc, proc, subactor, portal, cs) + wait_for_proc, proc, subactor, cs) if errors: if not self.cancelled: @@ -247,7 +246,8 @@ class ActorNursery: async with trio.open_nursery() as nursery: for subactor, proc, portal in children.values(): # TODO: how do we handle remote host spawned actors? - nursery.start_soon(wait_for_proc, proc, subactor, portal, cs) + assert portal + nursery.start_soon(wait_for_proc, proc, subactor, cs) log.debug(f"All subactors for {self} have terminated") if errors: From e2c9477122d78dc58e40f40829d9d493b3693360 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 20 Dec 2019 16:37:17 -0500 Subject: [PATCH 6/7] Allow overriding the root logger name Handy if other dependent projects want to use the logging system but also want to slap their own root "branding" onto the record prefix. --- tractor/log.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tractor/log.py b/tractor/log.py index abe5a72..2f04c7b 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -49,11 +49,12 @@ BOLD_PALETTE = { def get_logger( - name: str = None + name: str = None, + _root_name: str = _proj_name, ) -> logging.LoggerAdapter: '''Return the package log or a sub-log for `name` if provided. ''' - log = rlog = logging.getLogger(_proj_name) + log = rlog = logging.getLogger(_root_name) if name and name != _proj_name: log = rlog.getChild(name) log.level = rlog.level @@ -73,13 +74,13 @@ def get_logger( def get_console_log( level: str = None, - name: str = None + **kwargs, ) -> logging.LoggerAdapter: '''Get the package logger and enable a handler which writes to stderr. Yeah yeah, i know we can use ``DictConfig``. You do it. ''' - log = get_logger(name) # our root logger + log = get_logger(**kwargs) # our root logger logger = log.logger if not level: From 698951c515af5d794ca91da66da86e9381af7839 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 15 Jan 2020 19:48:37 -0500 Subject: [PATCH 7/7] More mypy apeasement on 3.7 --- .travis.yml | 2 +- requirements-test.txt | 1 + setup.py | 1 - tractor/_actor.py | 8 ++++---- tractor/_portal.py | 2 +- tractor/_spawn.py | 4 ++-- 6 files changed, 9 insertions(+), 9 deletions(-) diff --git a/.travis.yml b/.travis.yml index 956a5bf..d8f58be 100644 --- a/.travis.yml +++ b/.travis.yml @@ -31,5 +31,5 @@ install: - pip install -U . -r requirements-test.txt --upgrade-strategy eager script: - - mypy tractor/ --ignore-missing-imports + - mypy tractor/ --ignore-missing-imports - pytest tests/ --no-print-logs diff --git a/requirements-test.txt b/requirements-test.txt index 447b18c..e9dae32 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -2,3 +2,4 @@ pytest pytest-trio pdbpp mypy +trio_typing diff --git a/setup.py b/setup.py index 4f56472..cb44d1c 100755 --- a/setup.py +++ b/setup.py @@ -39,7 +39,6 @@ setup( ], install_requires=[ 'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt', - 'trio_typing', ], tests_require=['pytest'], python_requires=">=3.7", diff --git a/tractor/_actor.py b/tractor/_actor.py index 6755b0b..22e2253 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -445,8 +445,8 @@ class Actor: # spin up a task for the requested function log.debug(f"Spawning task for {func}") cs = await self._root_nursery.start( - _invoke, self, cid, chan, func, kwargs, - name=funcname + partial(_invoke, self, cid, chan, func, kwargs), + name=funcname, ) # never allow cancelling cancel requests (results in # deadlock and other weird behaviour) @@ -639,7 +639,7 @@ class Actor: # TODO: might want to consider having a separate nursery # for the stream handler such that the server can be cancelled # whilst leaving existing channels up - listeners = await nursery.start( + listeners: List[trio.abc.Listener] = await nursery.start( partial( trio.serve_tcp, self._stream_handler, @@ -650,7 +650,7 @@ class Actor: ) ) log.debug( - f"Started tcp server(s) on {[l.socket for l in listeners]}") + f"Started tcp server(s) on {[l.socket for l in listeners]}") # type: ignore self._listeners.extend(listeners) task_status.started() diff --git a/tractor/_portal.py b/tractor/_portal.py index bd49170..8f5899b 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -331,7 +331,7 @@ async def open_portal( if channel.uid is None: await actor._do_handshake(channel) - msg_loop_cs = await nursery.start( + msg_loop_cs: trio.CancelScope = await nursery.start( partial( actor._process_messages, channel, diff --git a/tractor/_spawn.py b/tractor/_spawn.py index a759946..f299b9d 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -21,7 +21,7 @@ from ._state import current_actor from ._actor import Actor -_ctx: mp.context.BaseContext = mp.get_context("spawn") +_ctx: mp.context.BaseContext = mp.get_context("spawn") # type: ignore def try_set_start_method(name: str) -> mp.context.BaseContext: @@ -95,7 +95,7 @@ def new_proc( else: fs_info = (None, None, None, None, None) - return _ctx.Process( + return _ctx.Process( # type: ignore target=actor._fork_main, args=( bind_addr,