Make latest mpypy happy

log_task_context
Tyler Goodlet 2019-12-10 00:55:03 -05:00
parent 7947eeebff
commit 79c152fe38
5 changed files with 81 additions and 57 deletions

View File

@ -4,7 +4,7 @@ tractor: An actor model micro-framework built on
"""
import importlib
from functools import partial
from typing import Tuple, Any
from typing import Tuple, Any, Optional
import typing
import trio # type: ignore
@ -47,8 +47,8 @@ async def _main(
async_fn: typing.Callable[..., typing.Awaitable],
args: Tuple,
kwargs: typing.Dict[str, typing.Any],
name: str,
arbiter_addr: Tuple[str, int]
arbiter_addr: Tuple[str, int],
name: Optional[str] = None,
) -> typing.Any:
"""Async entry point for ``tractor``.
"""
@ -89,26 +89,27 @@ async def _main(
# for it to stay up indefinitely until a re-election process has
# taken place - which is not implemented yet FYI).
return await _start_actor(
actor, main, host, port, arbiter_addr=arbiter_addr)
actor, main, host, port, arbiter_addr=arbiter_addr
)
def run(
async_fn: typing.Callable[..., typing.Awaitable],
*args: Tuple,
name: str = None,
*args,
name: Optional[str] = None,
arbiter_addr: Tuple[str, int] = (
_default_arbiter_host, _default_arbiter_port),
# the `multiprocessing` start method:
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
start_method: str = 'forkserver',
**kwargs: typing.Dict[str, typing.Any],
**kwargs,
) -> Any:
"""Run a trio-actor async function in process.
This is tractor's main entry and the start point for any async actor.
"""
_spawn.try_set_start_method(start_method)
return trio.run(_main, async_fn, args, kwargs, name, arbiter_addr)
return trio.run(_main, async_fn, args, kwargs, arbiter_addr, name)
def run_daemon(

View File

@ -11,6 +11,7 @@ import typing
from typing import Dict, List, Tuple, Any, Optional
import trio # type: ignore
from trio_typing import TaskStatus
from async_generator import aclosing
from ._ipc import Channel
@ -155,7 +156,11 @@ class Actor:
with other actors through "portals" which provide a native async API
around "channels".
"""
is_arbiter = False
is_arbiter: bool = False
# placeholders filled in by `_async_main` after fork
_root_nursery: trio.Nursery
_server_nursery: trio.Nursery
def __init__(
self,
@ -170,15 +175,13 @@ class Actor:
self.uid = (name, uid or str(uuid.uuid4()))
self.rpc_module_paths = rpc_module_paths
self._mods: dict = {}
# TODO: consider making this a dynamically defined
# @dataclass once we get py3.7
self.statespace = statespace or {}
self.loglevel = loglevel
self._arb_addr = arbiter_addr
# filled in by `_async_main` after fork
self._root_nursery: trio._core._run.Nursery = None
self._server_nursery: trio._core._run.Nursery = None
self._peers: defaultdict = defaultdict(list)
self._peer_connected: dict = {}
self._no_more_peers = trio.Event()
@ -188,15 +191,20 @@ class Actor:
# (chan, cid) -> (cancel_scope, func)
self._rpc_tasks: Dict[
Tuple[Channel, str],
Tuple[trio._core._run.CancelScope, typing.Callable, trio.Event]
Tuple[trio.CancelScope, typing.Callable, trio.Event]
] = {}
# map {uids -> {callids -> waiter queues}}
self._cids2qs: Dict[
Tuple[Tuple[str, str], str],
trio.abc.SendChannel[Any]] = {}
Tuple[
trio.abc.SendChannel[Any],
trio.abc.ReceiveChannel[Any]
]
] = {}
self._listeners: List[trio.abc.Listener] = []
self._parent_chan: Optional[Channel] = None
self._forkserver_info: Optional[Tuple[Any, Any, Any, Any, Any]] = None
self._forkserver_info: Optional[
Tuple[Any, Any, Any, Any, Any]] = None
async def wait_for_peer(
self, uid: Tuple[str, str]
@ -303,8 +311,8 @@ class Actor:
actorid = chan.uid
assert actorid, f"`actorid` can't be {actorid}"
cid = msg['cid']
send_chan = self._cids2qs[(actorid, cid)]
assert send_chan.cid == cid
send_chan, recv_chan = self._cids2qs[(actorid, cid)]
assert send_chan.cid == cid # type: ignore
if 'stop' in msg:
log.debug(f"{send_chan} was terminated at remote end")
return await send_chan.aclose()
@ -321,16 +329,17 @@ class Actor:
self,
actorid: Tuple[str, str],
cid: str
) -> trio.abc.ReceiveChannel:
) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]:
log.debug(f"Getting result queue for {actorid} cid {cid}")
try:
recv_chan = self._cids2qs[(actorid, cid)]
send_chan, recv_chan = self._cids2qs[(actorid, cid)]
except KeyError:
send_chan, recv_chan = trio.open_memory_channel(1000)
send_chan.cid = cid
self._cids2qs[(actorid, cid)] = send_chan
send_chan.cid = cid # type: ignore
recv_chan.cid = cid # type: ignore
self._cids2qs[(actorid, cid)] = send_chan, recv_chan
return recv_chan
return send_chan, recv_chan
async def send_cmd(
self,
@ -345,7 +354,7 @@ class Actor:
"""
cid = str(uuid.uuid4())
assert chan.uid
recv_chan = self.get_memchans(chan.uid, cid)
send_chan, recv_chan = self.get_memchans(chan.uid, cid)
log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
return cid, recv_chan
@ -355,7 +364,7 @@ class Actor:
chan: Channel,
treat_as_gen: bool = False,
shield: bool = False,
task_status=trio.TASK_STATUS_IGNORED,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Process messages for the channel async-RPC style.
@ -511,7 +520,7 @@ class Actor:
accept_addr: Tuple[str, int],
arbiter_addr: Optional[Tuple[str, int]] = None,
parent_addr: Optional[Tuple[str, int]] = None,
task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Start the channel server, maybe connect back to the parent, and
start the main task.
@ -548,8 +557,10 @@ class Actor:
" closing server")
await self.cancel()
self._parent_chan = None
raise
else:
# handle new connection back to parent
assert self._parent_chan
nursery.start_soon(
self._process_messages, self._parent_chan)
@ -560,6 +571,7 @@ class Actor:
# register with the arbiter if we're told its addr
log.debug(f"Registering {self} for role `{self.name}`")
assert isinstance(arbiter_addr, tuple)
async with get_arbiter(*arbiter_addr) as arb_portal:
await arb_portal.run(
'self', 'register_actor',
@ -615,7 +627,7 @@ class Actor:
# (host, port) to bind for channel server
accept_host: Tuple[str, int] = None,
accept_port: int = 0,
task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Start the channel server, begin listening for new connections.
@ -720,13 +732,11 @@ class Actor:
self._server_nursery.cancel_scope.cancel()
@property
def accept_addr(self) -> Optional[Tuple[str, int]]:
def accept_addr(self) -> Tuple[str, int]:
"""Primary address to which the channel server is bound.
"""
try:
return self._listeners[0].socket.getsockname()
except OSError:
return None
# throws OSError on failure
return self._listeners[0].socket.getsockname() # type: ignore
def get_parent(self) -> Portal:
"""Return a portal to our parent actor."""
@ -826,7 +836,7 @@ async def _start_actor(
host: str,
port: int,
arbiter_addr: Tuple[str, int],
nursery: trio._core._run.Nursery = None
nursery: trio.Nursery = None
):
"""Spawn a local actor by starting a task to execute it's main async
function.

View File

@ -17,9 +17,16 @@ class MsgpackStream:
"""
def __init__(self, stream: trio.SocketStream) -> None:
self.stream = stream
assert self.stream.socket
# should both be IP sockets
lsockname = stream.socket.getsockname()
assert isinstance(lsockname, tuple)
self._laddr = lsockname[:2]
rsockname = stream.socket.getpeername()
assert isinstance(rsockname, tuple)
self._raddr = rsockname[:2]
self._agen = self._iter_packets()
self._laddr = self.stream.socket.getsockname()[:2]
self._raddr = self.stream.socket.getpeername()[:2]
self._send_lock = trio.StrictFIFOLock()
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
@ -43,14 +50,15 @@ class MsgpackStream:
yield packet
@property
def laddr(self) -> Tuple[str, int]:
def laddr(self) -> Tuple[Any, ...]:
return self._laddr
@property
def raddr(self) -> Tuple[str, int]:
def raddr(self) -> Tuple[Any, ...]:
return self._raddr
async def send(self, data: Any) -> int:
# XXX: should this instead be called `.sendall()`?
async def send(self, data: Any) -> None:
async with self._send_lock:
return await self.stream.send_all(
msgpack.dumps(data, use_bin_type=True))
@ -95,24 +103,26 @@ class Channel:
def __repr__(self) -> str:
if self.msgstream:
return repr(
self.msgstream.stream.socket._sock).replace(
self.msgstream.stream.socket._sock).replace( # type: ignore
"socket.socket", "Channel")
return object.__repr__(self)
@property
def laddr(self) -> Optional[Tuple[str, int]]:
def laddr(self) -> Optional[Tuple[Any, ...]]:
return self.msgstream.laddr if self.msgstream else None
@property
def raddr(self) -> Optional[Tuple[str, int]]:
def raddr(self) -> Optional[Tuple[Any, ...]]:
return self.msgstream.raddr if self.msgstream else None
async def connect(
self, destaddr: Tuple[str, int] = None, **kwargs
self, destaddr: Tuple[Any, ...] = None,
**kwargs
) -> trio.SocketStream:
if self.connected():
raise RuntimeError("channel is already connected?")
destaddr = destaddr or self._destaddr
assert isinstance(destaddr, tuple)
stream = await trio.open_tcp_stream(*destaddr, **kwargs)
self.msgstream = MsgpackStream(stream)
return stream

View File

@ -21,7 +21,9 @@ log = get_logger('tractor')
@asynccontextmanager
async def maybe_open_nursery(nursery: trio._core._run.Nursery = None):
async def maybe_open_nursery(
nursery: trio.Nursery = None
) -> typing.AsyncGenerator[trio.Nursery, Any]:
"""Create a new nursery if None provided.
Blocks on exit as expected if no input nursery is provided.
@ -252,14 +254,14 @@ class Portal:
for stream in self._streams.copy():
await stream.aclose()
async def aclose(self) -> None:
async def aclose(self):
log.debug(f"Closing {self}")
# TODO: once we move to implementing our own `ReceiveChannel`
# (including remote task cancellation inside its `.aclose()`)
# we'll need to .aclose all those channels here
await self._cancel_streams()
async def cancel_actor(self) -> bool:
async def cancel_actor(self):
"""Cancel the actor on the other end of this portal.
"""
if not self.channel.connected():
@ -279,6 +281,8 @@ class Portal:
return True
if cancel_scope.cancelled_caught:
log.warning(f"May have failed to cancel {self.channel.uid}")
# if we get here some weird cancellation case happened
return False
except trio.ClosedResourceError:
log.warning(
@ -309,7 +313,7 @@ class LocalPortal:
@asynccontextmanager
async def open_portal(
channel: Channel,
nursery: trio._core._run.Nursery = None
nursery: Optional[trio.Nursery] = None
) -> typing.AsyncGenerator[Portal, None]:
"""Open a ``Portal`` through the provided ``channel``.
@ -320,7 +324,6 @@ async def open_portal(
was_connected = False
async with maybe_open_nursery(nursery) as nursery:
if not channel.connected():
await channel.connect()
was_connected = True

View File

@ -64,7 +64,6 @@ class ActorNursery:
arbiter_addr=current_actor()._arb_addr,
)
parent_addr = self._actor.accept_addr
assert parent_addr
proc = _spawn.new_proc(
name,
actor,
@ -192,8 +191,7 @@ class ActorNursery:
async def wait_for_proc(
proc: mp.Process,
actor: Actor,
portal: Portal,
cancel_scope: Optional[trio._core._run.CancelScope] = None,
cancel_scope: Optional[trio.CancelScope] = None,
) -> None:
# TODO: timeout block here?
if proc.is_alive():
@ -207,7 +205,7 @@ class ActorNursery:
# proc terminated, cancel result waiter that may have
# been spawned in tandem if not done already
if cancel_scope: # and not portal._cancelled:
if cancel_scope:
log.warning(
f"Cancelling existing result waiter task for {actor.uid}")
cancel_scope.cancel()
@ -223,11 +221,12 @@ class ActorNursery:
cs = None
# portal from ``run_in_actor()``
if portal in self._cancel_after_result_on_exit:
assert portal
cs = await nursery.start(
cancel_on_completion, portal, subactor)
# TODO: how do we handle remote host spawned actors?
nursery.start_soon(
wait_for_proc, proc, subactor, portal, cs)
wait_for_proc, proc, subactor, cs)
if errors:
if not self.cancelled:
@ -247,7 +246,8 @@ class ActorNursery:
async with trio.open_nursery() as nursery:
for subactor, proc, portal in children.values():
# TODO: how do we handle remote host spawned actors?
nursery.start_soon(wait_for_proc, proc, subactor, portal, cs)
assert portal
nursery.start_soon(wait_for_proc, proc, subactor, cs)
log.debug(f"All subactors for {self} have terminated")
if errors: