From 086df43b59decff2007b652c8cb1458840256969 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 31 Aug 2018 17:16:24 -0400 Subject: [PATCH] Woot! mypy run is clean! --- tractor/_actor.py | 113 ++++++++++++++++++-------------- tractor/_forkserver_hackzorz.py | 9 +-- tractor/_ipc.py | 50 ++++++++------ tractor/_portal.py | 41 +++++++----- tractor/_state.py | 7 +- tractor/_trionics.py | 48 +++++++++----- tractor/log.py | 5 +- 7 files changed, 157 insertions(+), 116 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index febd07e..2e00e99 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -7,8 +7,9 @@ from itertools import chain import importlib import inspect import traceback -import typing import uuid +import typing +from typing import Dict, List, Tuple, Any, Optional, Union import trio # type: ignore from async_generator import asynccontextmanager, aclosing @@ -41,7 +42,7 @@ async def _invoke( cid: str, chan: Channel, func: typing.Callable, - kwargs: typing.Dict[str, typing.Any], + kwargs: Dict[str, Any], task_status=trio.TASK_STATUS_IGNORED ): """Invoke local func and return results over provided channel. @@ -152,43 +153,45 @@ class Actor: def __init__( self, name: str, - rpc_module_paths: typing.List[str] = [], - statespace: typing.Dict[str, typing.Any] = {}, + rpc_module_paths: List[str] = [], + statespace: Optional[Dict[str, Any]] = None, uid: str = None, loglevel: str = None, - arbiter_addr: typing.Tuple[str, int] = None, - ): + arbiter_addr: Optional[Tuple[str, int]] = None, + ) -> None: self.name = name self.uid = (name, uid or str(uuid.uuid1())) self.rpc_module_paths = rpc_module_paths - self._mods = {} + self._mods: dict = {} # TODO: consider making this a dynamically defined # @dataclass once we get py3.7 - self.statespace = statespace + self.statespace = statespace or {} self.loglevel = loglevel self._arb_addr = arbiter_addr # filled in by `_async_main` after fork - self._root_nursery = None - self._server_nursery = None - self._peers = defaultdict(list) - self._peer_connected = {} + self._root_nursery: trio._core._run.Nursery = None + self._server_nursery: trio._core._run.Nursery = None + self._peers: defaultdict = defaultdict(list) + self._peer_connected: dict = {} self._no_more_peers = trio.Event() self._no_more_peers.set() self._no_more_rpc_tasks = trio.Event() self._no_more_rpc_tasks.set() - self._rpc_tasks = {} - - self._actors2calls = {} # map {uids -> {callids -> waiter queues}} - self._listeners = [] - self._parent_chan = None - self._accept_host = None - self._forkserver_info = None + self._rpc_tasks: Dict[ + Channel, + List[Tuple[trio._core._run.CancelScope, typing.Callable]] + ] = {} + # map {uids -> {callids -> waiter queues}} + self._actors2calls: Dict[Tuple[str, str], Dict[str, trio.Queue]] = {} + self._listeners: List[trio.abc.Listener] = [] + self._parent_chan: Optional[Channel] = None + self._forkserver_info: Optional[Tuple[Any, Any, Any, Any, Any]] = None async def wait_for_peer( - self, uid: typing.Tuple[str, str] - ) -> (trio.Event, Channel): + self, uid: Tuple[str, str] + ) -> Tuple[trio.Event, Channel]: """Wait for a connection back from a spawned actor with a given ``uid``. """ @@ -287,7 +290,9 @@ class Actor: await q.put(msg) def get_waitq( - self, actorid: typing.Tuple[str, str], cid: str + self, + actorid: Tuple[str, str], + cid: str ) -> trio.Queue: log.debug(f"Getting result queue for {actorid} cid {cid}") cids2qs = self._actors2calls.setdefault(actorid, {}) @@ -295,12 +300,13 @@ class Actor: async def send_cmd( self, chan: Channel, ns: str, func: str, kwargs: dict - ) -> typing.Tuple[str, trio.Queue]: + ) -> Tuple[str, trio.Queue]: """Send a ``'cmd'`` message to a remote actor and return a caller id and a ``trio.Queue`` that can be used to wait for responses delivered by the local message processing loop. """ cid = str(uuid.uuid1()) + assert chan.uid q = self.get_waitq(chan.uid, cid) log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) @@ -344,6 +350,7 @@ class Actor: # push any non-rpc-response error to all local consumers # and mark the channel as errored chan._exc = err = msg['error'] + assert chan.uid for cid in self._actors2calls[chan.uid]: await self._push_result(chan.uid, cid, msg) raise InternalActorError(f"{chan.uid}\n" + err) @@ -387,9 +394,10 @@ class Actor: log.debug(f"Exiting msg loop for {chan} from {chan.uid}") def _fork_main( - self, accept_addr: typing.Tuple[str, int], - forkserver_info: tuple, - parent_addr: typing.Tuple[str, int] = None + self, + accept_addr: Tuple[str, int], + forkserver_info: Tuple[Any, Any, Any, Any, Any], + parent_addr: Tuple[str, int] = None ) -> None: # after fork routine which invokes a fresh ``trio.run`` # log.warn("Log level after fork is {self.loglevel}") @@ -410,9 +418,9 @@ class Actor: async def _async_main( self, - accept_addr: typing.Tuple[str, int], - arbiter_addr: typing.Tuple[str, int] = None, - parent_addr: typing.Tuple[str, int] = None, + accept_addr: Tuple[str, int], + arbiter_addr: Optional[Tuple[str, int]] = None, + parent_addr: Optional[Tuple[str, int]] = None, task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED, ) -> None: """Start the channel server, maybe connect back to the parent, and @@ -510,7 +518,7 @@ class Actor: self, *, # (host, port) to bind for channel server - accept_host: typing.Tuple[str, int] = None, + accept_host: Tuple[str, int] = None, accept_port: int = 0, task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED, ) -> None: @@ -539,7 +547,7 @@ class Actor: self._listeners.extend(listeners) task_status.started() - async def _do_unreg(self, arbiter_addr: typing.Tuple[str, int]) -> None: + async def _do_unreg(self, arbiter_addr: Optional[Tuple[str, int]]) -> None: # UNregister actor from the arbiter try: if arbiter_addr is not None: @@ -566,16 +574,16 @@ class Actor: """Cancel all existing RPC responder tasks using the cancel scope registered for each. """ - scopes = self._rpc_tasks - log.info(f"Cancelling all {len(scopes)} rpc tasks:\n{scopes}") - for chan, scopes in scopes.items(): + tasks = self._rpc_tasks + log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks}") + for chan, scopes in tasks.items(): log.debug(f"Cancelling all tasks for {chan.uid}") for scope, func in scopes: log.debug(f"Cancelling task for {func}") scope.cancel() - if scopes: + if tasks: log.info( - f"Waiting for remaining rpc tasks to complete {scopes}") + f"Waiting for remaining rpc tasks to complete {tasks}") await self._no_more_rpc_tasks.wait() def cancel_server(self) -> None: @@ -586,19 +594,20 @@ class Actor: self._server_nursery.cancel_scope.cancel() @property - def accept_addr(self) -> typing.Tuple[str, int]: + def accept_addr(self) -> Optional[Tuple[str, int]]: """Primary address to which the channel server is bound. """ try: return self._listeners[0].socket.getsockname() except OSError: - return + return None def get_parent(self) -> Portal: """Return a portal to our parent actor.""" + assert self._parent_chan, "No parent channel for this actor?" return Portal(self._parent_chan) - def get_chans(self, uid: typing.Tuple[str, str]) -> typing.List[Channel]: + def get_chans(self, uid: Tuple[str, str]) -> List[Channel]: """Return all channels to the actor with provided uid.""" return self._peers[uid] @@ -619,14 +628,16 @@ class Arbiter(Actor): self._waiters = {} super().__init__(*args, **kwargs) - def find_actor(self, name: str) -> typing.Tuple[str, int]: + def find_actor(self, name: str) -> Optional[Tuple[str, int]]: for uid, sockaddr in self._registry.items(): if name in uid: return sockaddr + return None + async def wait_for_actor( self, name: str - ) -> typing.List[typing.Tuple[str, int]]: + ) -> List[Tuple[str, int]]: """Wait for a particular actor to register. This is a blocking call if no actor by the provided name is currently @@ -648,7 +659,7 @@ class Arbiter(Actor): return sockaddrs def register_actor( - self, uid: typing.Tuple[str, str], sockaddr: typing.Tuple[str, int] + self, uid: Tuple[str, str], sockaddr: Tuple[str, int] ) -> None: name, uuid = uid self._registry[uid] = sockaddr @@ -659,16 +670,16 @@ class Arbiter(Actor): for event in events: event.set() - def unregister_actor(self, uid: typing.Tuple[str, str]) -> None: + def unregister_actor(self, uid: Tuple[str, str]) -> None: self._registry.pop(uid, None) async def _start_actor( actor: Actor, - main: typing.Coroutine, + main: typing.Callable[..., typing.Awaitable], host: str, port: int, - arbiter_addr: typing.Tuple[str, int], + arbiter_addr: Tuple[str, int], nursery: trio._core._run.Nursery = None ): """Spawn a local actor by starting a task to execute it's main async @@ -709,7 +720,9 @@ async def _start_actor( @asynccontextmanager -async def get_arbiter(host: str, port: int) -> Portal: +async def get_arbiter( + host: str, port: int +) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: """Return a portal instance connected to a local or remote arbiter. """ @@ -729,8 +742,8 @@ async def get_arbiter(host: str, port: int) -> Portal: @asynccontextmanager async def find_actor( - name: str, arbiter_sockaddr: typing.Tuple[str, int] = None -) -> Portal: + name: str, arbiter_sockaddr: Tuple[str, int] = None +) -> typing.AsyncGenerator[Optional[Portal], None]: """Ask the arbiter to find actor(s) by name. Returns a connected portal to the last registered matching actor @@ -752,8 +765,8 @@ async def find_actor( @asynccontextmanager async def wait_for_actor( name: str, - arbiter_sockaddr: typing.Tuple[str, int] = None -) -> Portal: + arbiter_sockaddr: Tuple[str, int] = None +) -> typing.AsyncGenerator[Portal, None]: """Wait on an actor to register with the arbiter. A portal to the first actor which registered is be returned. diff --git a/tractor/_forkserver_hackzorz.py b/tractor/_forkserver_hackzorz.py index 827f744..411d190 100644 --- a/tractor/_forkserver_hackzorz.py +++ b/tractor/_forkserver_hackzorz.py @@ -15,15 +15,12 @@ import errno import selectors import warnings -from multiprocessing import ( - forkserver, semaphore_tracker, spawn, process, util, - connection -) +from multiprocessing import semaphore_tracker, spawn, process # type: ignore +from multiprocessing import forkserver, util, connection # type: ignore from multiprocessing.forkserver import ( ForkServer, MAXFDS_TO_SEND - # _serve_one, ) -from multiprocessing.context import reduction +from multiprocessing.context import reduction # type: ignore # taken from 3.8 diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 24b84a8..6162150 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -2,6 +2,7 @@ Inter-process comms abstractions """ import typing +from typing import Any, Tuple, Optional import msgpack import trio @@ -14,7 +15,7 @@ log = get_logger('ipc') class StreamQueue: """Stream wrapped as a queue that delivers ``msgpack`` serialized objects. """ - def __init__(self, stream: trio.SocketStream): + def __init__(self, stream: trio.SocketStream) -> None: self.stream = stream self._agen = self._iter_packets() self._laddr = self.stream.socket.getsockname()[:2] @@ -28,7 +29,7 @@ class StreamQueue: while True: try: data = await self.stream.receive_some(2**10) - log.trace(f"received {data}") + log.trace(f"received {data}") # type: ignore except trio.BrokenStreamError: log.error(f"Stream connection {self.raddr} broke") return @@ -42,19 +43,19 @@ class StreamQueue: yield packet @property - def laddr(self) -> typing.Tuple[str, int]: + def laddr(self) -> Tuple[str, int]: return self._laddr @property - def raddr(self) -> typing.Tuple[str, int]: + def raddr(self) -> Tuple[str, int]: return self._raddr - async def put(self, data: typing.Any) -> int: + async def put(self, data: Any) -> int: async with self._send_lock: return await self.stream.send_all( msgpack.dumps(data, use_bin_type=True)) - async def get(self) -> typing.Any: + async def get(self) -> Any: return await self._agen.asend(None) def __aiter__(self): @@ -72,21 +73,24 @@ class Channel: """ def __init__( self, - destaddr: tuple = None, - on_reconnect: typing.Coroutine = None, + destaddr: Optional[Tuple[str, int]] = None, + on_reconnect: typing.Callable[..., typing.Awaitable] = None, auto_reconnect: bool = False, stream: trio.SocketStream = None, # expected to be active ) -> None: self._recon_seq = on_reconnect self._autorecon = auto_reconnect - self.squeue = StreamQueue(stream) if stream else None + self.squeue: Optional[StreamQueue] = StreamQueue( + stream) if stream else None if self.squeue and destaddr: raise ValueError( f"A stream was provided with local addr {self.laddr}" ) - self._destaddr = destaddr or self.squeue.raddr + self._destaddr = self.squeue.raddr if self.squeue else destaddr # set after handshake - always uid of far end - self.uid = None + self.uid: Optional[Tuple[str, str]] = None + # set if far end actor errors internally + self._exc: Optional[Exception] = None self._agen = self._aiter_recv() def __repr__(self) -> str: @@ -97,15 +101,15 @@ class Channel: return object.__repr__(self) @property - def laddr(self) -> typing.Tuple[str, int]: - return self.squeue.laddr if self.squeue else (None, None) + def laddr(self) -> Optional[Tuple[str, int]]: + return self.squeue.laddr if self.squeue else None @property - def raddr(self) -> typing.Tuple[str, int]: - return self.squeue.raddr if self.squeue else (None, None) + def raddr(self) -> Optional[Tuple[str, int]]: + return self.squeue.raddr if self.squeue else None async def connect( - self, destaddr: typing.Tuple[str, int] = None, **kwargs + self, destaddr: Tuple[str, int] = None, **kwargs ) -> trio.SocketStream: if self.connected(): raise RuntimeError("channel is already connected?") @@ -114,11 +118,13 @@ class Channel: self.squeue = StreamQueue(stream) return stream - async def send(self, item: typing.Any) -> None: - log.trace(f"send `{item}`") + async def send(self, item: Any) -> None: + log.trace(f"send `{item}`") # type: ignore + assert self.squeue await self.squeue.put(item) - async def recv(self) -> typing.Any: + async def recv(self) -> Any: + assert self.squeue try: return await self.squeue.get() except trio.BrokenStreamError: @@ -128,6 +134,7 @@ class Channel: async def aclose(self) -> None: log.debug(f"Closing {self}") + assert self.squeue await self.squeue.stream.aclose() async def __aenter__(self): @@ -171,9 +178,10 @@ class Channel: async def _aiter_recv( self - ) -> typing.AsyncGenerator[typing.Any, None]: + ) -> typing.AsyncGenerator[Any, None]: """Async iterate items from underlying stream. """ + assert self.squeue while True: try: async for item in self.squeue: @@ -200,7 +208,7 @@ class Channel: @asynccontextmanager async def _connect_chan( host: str, port: int -) -> typing.AsyncContextManager[Channel]: +) -> typing.AsyncGenerator[Channel, None]: """Create and connect a channel with disconnect on context manager teardown. """ diff --git a/tractor/_portal.py b/tractor/_portal.py index a42d5f8..ddbea76 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -3,11 +3,13 @@ Portal api """ import importlib import typing +from typing import Tuple, Any, Dict, Optional import trio from async_generator import asynccontextmanager from ._state import current_actor +from ._ipc import Channel from .log import get_logger @@ -32,10 +34,11 @@ async def maybe_open_nursery(nursery: trio._core._run.Nursery = None): async def _do_handshake( - actor: 'Actor', chan: 'Channel' -)-> typing.Tuple[str, str]: + actor: 'Actor', # type: ignore + chan: Channel +)-> Any: await chan.send(actor.uid) - uid = await chan.recv() + uid: Tuple[str, str] = await chan.recv() if not isinstance(uid, tuple): raise ValueError(f"{uid} is not a valid uid?!") @@ -54,14 +57,16 @@ class Portal: Think of this like an native async IPC API. """ - def __init__(self, channel: 'Channel'): + def __init__(self, channel: Channel) -> None: self.channel = channel # when this is set to a tuple returned from ``_submit()`` then # it is expected that ``result()`` will be awaited at some point # during the portal's lifetime self._result = None - self._exc = None - self._expect_result = None + self._exc: Optional[RemoteActorError] = None + self._expect_result: Optional[ + Tuple[str, Any, str, Dict[str, Any]] + ] = None async def aclose(self) -> None: log.debug(f"Closing {self}") @@ -71,7 +76,7 @@ class Portal: async def _submit( self, ns: str, func: str, **kwargs - ) -> typing.Tuple[str, trio.Queue, str, typing.Dict[str, typing.Any]]: + ) -> Tuple[str, trio.Queue, str, Dict[str, Any]]: """Submit a function to be scheduled and run by actor, return the associated caller id, response queue, response type str, first message packet as a tuple. @@ -103,7 +108,7 @@ class Portal: "A pending main result has already been submitted" self._expect_result = await self._submit(ns, func, **kwargs) - async def run(self, ns: str, func: str, **kwargs) -> typing.Any: + async def run(self, ns: str, func: str, **kwargs) -> Any: """Submit a function to be scheduled and run by actor, wrap and return its (stream of) result(s). @@ -115,7 +120,7 @@ class Portal: async def _return_from_resptype( self, cid: str, q: trio.Queue, resptype: str, first_msg: dict - ) -> typing.Any: + ) -> Any: # TODO: not this needs some serious work and thinking about how # to make async-generators the fundamental IPC API over channels! # (think `yield from`, `gen.send()`, and functional reactive stuff) @@ -152,7 +157,7 @@ class Portal: else: raise ValueError(f"Unknown msg response type: {first_msg}") - async def result(self) -> typing.Any: + async def result(self) -> Any: """Return the result(s) from the remote actor's "main" task. """ if self._expect_result is None: @@ -160,7 +165,7 @@ class Portal: # teardown can reraise them exc = self.channel._exc if exc: - raise RemoteActorError(f"{self.channel.uid}\n" + exc) + raise RemoteActorError(f"{self.channel.uid}\n{exc}") else: raise RuntimeError( f"Portal for {self.channel.uid} is not expecting a final" @@ -205,22 +210,24 @@ class LocalPortal: A compatibility shim for normal portals but for invoking functions using an in process actor instance. """ - def __init__(self, actor: 'Actor'): + def __init__( + self, + actor: 'Actor' # type: ignore + ) -> None: self.actor = actor - async def run(self, ns: str, func: str, **kwargs) -> typing.Any: + async def run(self, ns: str, func: str, **kwargs) -> Any: """Run a requested function locally and return it's result. """ obj = self.actor if ns == 'self' else importlib.import_module(ns) - func = getattr(obj, func) - return func(**kwargs) + return getattr(obj, func)(**kwargs) @asynccontextmanager async def open_portal( - channel: 'Channel', + channel: Channel, nursery: trio._core._run.Nursery = None -) -> typing.AsyncContextManager[Portal]: +) -> typing.AsyncGenerator[Portal, None]: """Open a ``Portal`` through the provided ``channel``. Spawns a background task to handle message processing. diff --git a/tractor/_state.py b/tractor/_state.py index 767bb27..704fae7 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -1,10 +1,13 @@ """ Per process state """ -_current_actor = None +from typing import Optional -def current_actor() -> 'Actor': +_current_actor: Optional['Actor'] = None # type: ignore + + +def current_actor() -> 'Actor': # type: ignore """Get the process-local actor instance. """ if not _current_actor: diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 310fea9..c25137e 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -3,7 +3,8 @@ """ import multiprocessing as mp import inspect -from multiprocessing import forkserver, semaphore_tracker +from multiprocessing import forkserver, semaphore_tracker # type: ignore +from typing import Tuple, List, Dict, Optional, Any import typing import trio @@ -24,14 +25,17 @@ log = get_logger('tractor') class ActorNursery: """Spawn scoped subprocess actors. """ - def __init__(self, actor: Actor): + def __init__(self, actor: Actor) -> None: # self.supervisor = supervisor # TODO - self._actor = actor - self._children = {} + self._actor: Actor = actor + self._children: Dict[ + Tuple[str, str], + Tuple[Actor, mp.Process, Optional[Portal]] + ] = {} # portals spawned with ``run_in_actor()`` - self._cancel_after_result_on_exit = set() - self.cancelled = False - self._forkserver = None + self._cancel_after_result_on_exit: set = set() + self.cancelled: bool = False + self._forkserver: forkserver.ForkServer = None async def __aenter__(self): return self @@ -39,9 +43,9 @@ class ActorNursery: async def start_actor( self, name: str, - bind_addr: (str, int) = ('127.0.0.1', 0), - statespace: dict = None, - rpc_module_paths: [str] = None, + bind_addr: Tuple[str, int] = ('127.0.0.1', 0), + statespace: Optional[Dict[str, Any]] = None, + rpc_module_paths: List[str] = None, loglevel: str = None, # set log level per subactor ) -> Portal: loglevel = loglevel or self._actor.loglevel or get_loglevel() @@ -71,6 +75,7 @@ class ActorNursery: semaphore_tracker._semaphore_tracker._fd, ) else: + assert self._actor._forkserver_info fs_info = ( fs._forkserver_address, fs._forkserver_alive_fd, @@ -88,7 +93,7 @@ class ActorNursery: # register the process before start in case we get a cancel # request before the actor has fully spawned - then we can wait # for it to fully come up before sending a cancel request - self._children[actor.uid] = [actor, proc, None] + self._children[actor.uid] = (actor, proc, None) proc.start() if not proc.is_alive(): @@ -100,15 +105,15 @@ class ActorNursery: # local actor by the time we get a ref to it event, chan = await self._actor.wait_for_peer(actor.uid) portal = Portal(chan) - self._children[actor.uid][2] = portal + self._children[actor.uid] = (actor, proc, portal) return portal async def run_in_actor( self, name: str, fn: typing.Callable, - bind_addr: (str, int) = ('127.0.0.1', 0), - rpc_module_paths: [str] = None, + bind_addr: Tuple[str, int] = ('127.0.0.1', 0), + rpc_module_paths: List[str] = None, statespace: dict = None, loglevel: str = None, # set log level per subactor **kwargs, # explicit args to ``fn`` @@ -155,7 +160,12 @@ class ActorNursery: async for item in agen: log.debug(f"Consuming item {item}") - async def wait_for_proc(proc, actor, portal, cancel_scope): + async def wait_for_proc( + proc: mp.Process, + actor: Actor, + portal: Portal, + cancel_scope: trio._core._run.CancelScope, + ) -> None: # TODO: timeout block here? if proc.is_alive(): await trio.hazmat.wait_readable(proc.sentinel) @@ -172,9 +182,10 @@ class ActorNursery: cancel_scope.cancel() async def wait_for_actor( - portal, actor, + portal: Portal, + actor: Actor, task_status=trio.TASK_STATUS_IGNORED, - ): + ) -> None: # cancel the actor gracefully with trio.open_cancel_scope() as cs: task_status.started(cs) @@ -231,6 +242,7 @@ class ActorNursery: do_hard_kill(proc) # spawn cancel tasks async + assert portal n.start_soon(portal.cancel_actor) log.debug(f"Waiting on all subactors to complete") @@ -275,7 +287,7 @@ class ActorNursery: @asynccontextmanager -async def open_nursery() -> typing.AsyncContextManager[ActorNursery]: +async def open_nursery() -> typing.AsyncGenerator[None, ActorNursery]: """Create and yield a new ``ActorNursery``. """ actor = current_actor() diff --git a/tractor/log.py b/tractor/log.py index 8d52d1b..5a65603 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -2,9 +2,10 @@ Log like a forester! """ from functools import partial -import sys import logging import colorlog # type: ignore +from typing import Optional + _proj_name = 'tractor' _default_loglevel = None @@ -86,5 +87,5 @@ def get_console_log(level: str = None, name: str = None) -> logging.Logger: return log -def get_loglevel() -> str: +def get_loglevel() -> Optional[str]: return _default_loglevel