forked from goodboy/tractor
Merge pull request #94 from goodboy/log_task_context
Well...after enough `# type: ignore`s `mypy` is happy, and after enough clicking of *rerun build* the windows CI passed so I think this is prolly gtg peeps!try_trip
commit
7c0efce84b
|
@ -2,3 +2,4 @@ pytest
|
||||||
pytest-trio
|
pytest-trio
|
||||||
pdbpp
|
pdbpp
|
||||||
mypy
|
mypy
|
||||||
|
trio_typing
|
||||||
|
|
3
setup.py
3
setup.py
|
@ -38,7 +38,8 @@ setup(
|
||||||
'tractor.testing',
|
'tractor.testing',
|
||||||
],
|
],
|
||||||
install_requires=[
|
install_requires=[
|
||||||
'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt'],
|
'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt',
|
||||||
|
],
|
||||||
tests_require=['pytest'],
|
tests_require=['pytest'],
|
||||||
python_requires=">=3.7",
|
python_requires=">=3.7",
|
||||||
keywords=[
|
keywords=[
|
||||||
|
|
|
@ -4,7 +4,7 @@ tractor: An actor model micro-framework built on
|
||||||
"""
|
"""
|
||||||
import importlib
|
import importlib
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from typing import Tuple, Any
|
from typing import Tuple, Any, Optional
|
||||||
import typing
|
import typing
|
||||||
|
|
||||||
import trio # type: ignore
|
import trio # type: ignore
|
||||||
|
@ -47,8 +47,8 @@ async def _main(
|
||||||
async_fn: typing.Callable[..., typing.Awaitable],
|
async_fn: typing.Callable[..., typing.Awaitable],
|
||||||
args: Tuple,
|
args: Tuple,
|
||||||
kwargs: typing.Dict[str, typing.Any],
|
kwargs: typing.Dict[str, typing.Any],
|
||||||
name: str,
|
arbiter_addr: Tuple[str, int],
|
||||||
arbiter_addr: Tuple[str, int]
|
name: Optional[str] = None,
|
||||||
) -> typing.Any:
|
) -> typing.Any:
|
||||||
"""Async entry point for ``tractor``.
|
"""Async entry point for ``tractor``.
|
||||||
"""
|
"""
|
||||||
|
@ -89,26 +89,27 @@ async def _main(
|
||||||
# for it to stay up indefinitely until a re-election process has
|
# for it to stay up indefinitely until a re-election process has
|
||||||
# taken place - which is not implemented yet FYI).
|
# taken place - which is not implemented yet FYI).
|
||||||
return await _start_actor(
|
return await _start_actor(
|
||||||
actor, main, host, port, arbiter_addr=arbiter_addr)
|
actor, main, host, port, arbiter_addr=arbiter_addr
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def run(
|
def run(
|
||||||
async_fn: typing.Callable[..., typing.Awaitable],
|
async_fn: typing.Callable[..., typing.Awaitable],
|
||||||
*args: Tuple,
|
*args,
|
||||||
name: str = None,
|
name: Optional[str] = None,
|
||||||
arbiter_addr: Tuple[str, int] = (
|
arbiter_addr: Tuple[str, int] = (
|
||||||
_default_arbiter_host, _default_arbiter_port),
|
_default_arbiter_host, _default_arbiter_port),
|
||||||
# the `multiprocessing` start method:
|
# the `multiprocessing` start method:
|
||||||
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
|
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
|
||||||
start_method: str = 'forkserver',
|
start_method: str = 'forkserver',
|
||||||
**kwargs: typing.Dict[str, typing.Any],
|
**kwargs,
|
||||||
) -> Any:
|
) -> Any:
|
||||||
"""Run a trio-actor async function in process.
|
"""Run a trio-actor async function in process.
|
||||||
|
|
||||||
This is tractor's main entry and the start point for any async actor.
|
This is tractor's main entry and the start point for any async actor.
|
||||||
"""
|
"""
|
||||||
_spawn.try_set_start_method(start_method)
|
_spawn.try_set_start_method(start_method)
|
||||||
return trio.run(_main, async_fn, args, kwargs, name, arbiter_addr)
|
return trio.run(_main, async_fn, args, kwargs, arbiter_addr, name)
|
||||||
|
|
||||||
|
|
||||||
def run_daemon(
|
def run_daemon(
|
||||||
|
|
|
@ -11,6 +11,7 @@ import typing
|
||||||
from typing import Dict, List, Tuple, Any, Optional
|
from typing import Dict, List, Tuple, Any, Optional
|
||||||
|
|
||||||
import trio # type: ignore
|
import trio # type: ignore
|
||||||
|
from trio_typing import TaskStatus
|
||||||
from async_generator import aclosing
|
from async_generator import aclosing
|
||||||
|
|
||||||
from ._ipc import Channel
|
from ._ipc import Channel
|
||||||
|
@ -155,7 +156,11 @@ class Actor:
|
||||||
with other actors through "portals" which provide a native async API
|
with other actors through "portals" which provide a native async API
|
||||||
around "channels".
|
around "channels".
|
||||||
"""
|
"""
|
||||||
is_arbiter = False
|
is_arbiter: bool = False
|
||||||
|
|
||||||
|
# placeholders filled in by `_async_main` after fork
|
||||||
|
_root_nursery: trio.Nursery
|
||||||
|
_server_nursery: trio.Nursery
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
|
@ -170,15 +175,13 @@ class Actor:
|
||||||
self.uid = (name, uid or str(uuid.uuid4()))
|
self.uid = (name, uid or str(uuid.uuid4()))
|
||||||
self.rpc_module_paths = rpc_module_paths
|
self.rpc_module_paths = rpc_module_paths
|
||||||
self._mods: dict = {}
|
self._mods: dict = {}
|
||||||
|
|
||||||
# TODO: consider making this a dynamically defined
|
# TODO: consider making this a dynamically defined
|
||||||
# @dataclass once we get py3.7
|
# @dataclass once we get py3.7
|
||||||
self.statespace = statespace or {}
|
self.statespace = statespace or {}
|
||||||
self.loglevel = loglevel
|
self.loglevel = loglevel
|
||||||
self._arb_addr = arbiter_addr
|
self._arb_addr = arbiter_addr
|
||||||
|
|
||||||
# filled in by `_async_main` after fork
|
|
||||||
self._root_nursery: trio._core._run.Nursery = None
|
|
||||||
self._server_nursery: trio._core._run.Nursery = None
|
|
||||||
self._peers: defaultdict = defaultdict(list)
|
self._peers: defaultdict = defaultdict(list)
|
||||||
self._peer_connected: dict = {}
|
self._peer_connected: dict = {}
|
||||||
self._no_more_peers = trio.Event()
|
self._no_more_peers = trio.Event()
|
||||||
|
@ -188,15 +191,20 @@ class Actor:
|
||||||
# (chan, cid) -> (cancel_scope, func)
|
# (chan, cid) -> (cancel_scope, func)
|
||||||
self._rpc_tasks: Dict[
|
self._rpc_tasks: Dict[
|
||||||
Tuple[Channel, str],
|
Tuple[Channel, str],
|
||||||
Tuple[trio._core._run.CancelScope, typing.Callable, trio.Event]
|
Tuple[trio.CancelScope, typing.Callable, trio.Event]
|
||||||
] = {}
|
] = {}
|
||||||
# map {uids -> {callids -> waiter queues}}
|
# map {uids -> {callids -> waiter queues}}
|
||||||
self._cids2qs: Dict[
|
self._cids2qs: Dict[
|
||||||
Tuple[Tuple[str, str], str],
|
Tuple[Tuple[str, str], str],
|
||||||
trio.abc.SendChannel[Any]] = {}
|
Tuple[
|
||||||
|
trio.abc.SendChannel[Any],
|
||||||
|
trio.abc.ReceiveChannel[Any]
|
||||||
|
]
|
||||||
|
] = {}
|
||||||
self._listeners: List[trio.abc.Listener] = []
|
self._listeners: List[trio.abc.Listener] = []
|
||||||
self._parent_chan: Optional[Channel] = None
|
self._parent_chan: Optional[Channel] = None
|
||||||
self._forkserver_info: Optional[Tuple[Any, Any, Any, Any, Any]] = None
|
self._forkserver_info: Optional[
|
||||||
|
Tuple[Any, Any, Any, Any, Any]] = None
|
||||||
|
|
||||||
async def wait_for_peer(
|
async def wait_for_peer(
|
||||||
self, uid: Tuple[str, str]
|
self, uid: Tuple[str, str]
|
||||||
|
@ -303,8 +311,8 @@ class Actor:
|
||||||
actorid = chan.uid
|
actorid = chan.uid
|
||||||
assert actorid, f"`actorid` can't be {actorid}"
|
assert actorid, f"`actorid` can't be {actorid}"
|
||||||
cid = msg['cid']
|
cid = msg['cid']
|
||||||
send_chan = self._cids2qs[(actorid, cid)]
|
send_chan, recv_chan = self._cids2qs[(actorid, cid)]
|
||||||
assert send_chan.cid == cid
|
assert send_chan.cid == cid # type: ignore
|
||||||
if 'stop' in msg:
|
if 'stop' in msg:
|
||||||
log.debug(f"{send_chan} was terminated at remote end")
|
log.debug(f"{send_chan} was terminated at remote end")
|
||||||
return await send_chan.aclose()
|
return await send_chan.aclose()
|
||||||
|
@ -321,16 +329,17 @@ class Actor:
|
||||||
self,
|
self,
|
||||||
actorid: Tuple[str, str],
|
actorid: Tuple[str, str],
|
||||||
cid: str
|
cid: str
|
||||||
) -> trio.abc.ReceiveChannel:
|
) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]:
|
||||||
log.debug(f"Getting result queue for {actorid} cid {cid}")
|
log.debug(f"Getting result queue for {actorid} cid {cid}")
|
||||||
try:
|
try:
|
||||||
recv_chan = self._cids2qs[(actorid, cid)]
|
send_chan, recv_chan = self._cids2qs[(actorid, cid)]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
send_chan, recv_chan = trio.open_memory_channel(1000)
|
send_chan, recv_chan = trio.open_memory_channel(1000)
|
||||||
send_chan.cid = cid
|
send_chan.cid = cid # type: ignore
|
||||||
self._cids2qs[(actorid, cid)] = send_chan
|
recv_chan.cid = cid # type: ignore
|
||||||
|
self._cids2qs[(actorid, cid)] = send_chan, recv_chan
|
||||||
|
|
||||||
return recv_chan
|
return send_chan, recv_chan
|
||||||
|
|
||||||
async def send_cmd(
|
async def send_cmd(
|
||||||
self,
|
self,
|
||||||
|
@ -345,7 +354,7 @@ class Actor:
|
||||||
"""
|
"""
|
||||||
cid = str(uuid.uuid4())
|
cid = str(uuid.uuid4())
|
||||||
assert chan.uid
|
assert chan.uid
|
||||||
recv_chan = self.get_memchans(chan.uid, cid)
|
send_chan, recv_chan = self.get_memchans(chan.uid, cid)
|
||||||
log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
|
log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
|
||||||
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
|
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
|
||||||
return cid, recv_chan
|
return cid, recv_chan
|
||||||
|
@ -355,7 +364,7 @@ class Actor:
|
||||||
chan: Channel,
|
chan: Channel,
|
||||||
treat_as_gen: bool = False,
|
treat_as_gen: bool = False,
|
||||||
shield: bool = False,
|
shield: bool = False,
|
||||||
task_status=trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Process messages for the channel async-RPC style.
|
"""Process messages for the channel async-RPC style.
|
||||||
|
|
||||||
|
@ -436,8 +445,8 @@ class Actor:
|
||||||
# spin up a task for the requested function
|
# spin up a task for the requested function
|
||||||
log.debug(f"Spawning task for {func}")
|
log.debug(f"Spawning task for {func}")
|
||||||
cs = await self._root_nursery.start(
|
cs = await self._root_nursery.start(
|
||||||
_invoke, self, cid, chan, func, kwargs,
|
partial(_invoke, self, cid, chan, func, kwargs),
|
||||||
name=funcname
|
name=funcname,
|
||||||
)
|
)
|
||||||
# never allow cancelling cancel requests (results in
|
# never allow cancelling cancel requests (results in
|
||||||
# deadlock and other weird behaviour)
|
# deadlock and other weird behaviour)
|
||||||
|
@ -511,7 +520,7 @@ class Actor:
|
||||||
accept_addr: Tuple[str, int],
|
accept_addr: Tuple[str, int],
|
||||||
arbiter_addr: Optional[Tuple[str, int]] = None,
|
arbiter_addr: Optional[Tuple[str, int]] = None,
|
||||||
parent_addr: Optional[Tuple[str, int]] = None,
|
parent_addr: Optional[Tuple[str, int]] = None,
|
||||||
task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Start the channel server, maybe connect back to the parent, and
|
"""Start the channel server, maybe connect back to the parent, and
|
||||||
start the main task.
|
start the main task.
|
||||||
|
@ -548,8 +557,10 @@ class Actor:
|
||||||
" closing server")
|
" closing server")
|
||||||
await self.cancel()
|
await self.cancel()
|
||||||
self._parent_chan = None
|
self._parent_chan = None
|
||||||
|
raise
|
||||||
|
else:
|
||||||
# handle new connection back to parent
|
# handle new connection back to parent
|
||||||
|
assert self._parent_chan
|
||||||
nursery.start_soon(
|
nursery.start_soon(
|
||||||
self._process_messages, self._parent_chan)
|
self._process_messages, self._parent_chan)
|
||||||
|
|
||||||
|
@ -560,6 +571,7 @@ class Actor:
|
||||||
|
|
||||||
# register with the arbiter if we're told its addr
|
# register with the arbiter if we're told its addr
|
||||||
log.debug(f"Registering {self} for role `{self.name}`")
|
log.debug(f"Registering {self} for role `{self.name}`")
|
||||||
|
assert isinstance(arbiter_addr, tuple)
|
||||||
async with get_arbiter(*arbiter_addr) as arb_portal:
|
async with get_arbiter(*arbiter_addr) as arb_portal:
|
||||||
await arb_portal.run(
|
await arb_portal.run(
|
||||||
'self', 'register_actor',
|
'self', 'register_actor',
|
||||||
|
@ -615,7 +627,7 @@ class Actor:
|
||||||
# (host, port) to bind for channel server
|
# (host, port) to bind for channel server
|
||||||
accept_host: Tuple[str, int] = None,
|
accept_host: Tuple[str, int] = None,
|
||||||
accept_port: int = 0,
|
accept_port: int = 0,
|
||||||
task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Start the channel server, begin listening for new connections.
|
"""Start the channel server, begin listening for new connections.
|
||||||
|
|
||||||
|
@ -627,7 +639,7 @@ class Actor:
|
||||||
# TODO: might want to consider having a separate nursery
|
# TODO: might want to consider having a separate nursery
|
||||||
# for the stream handler such that the server can be cancelled
|
# for the stream handler such that the server can be cancelled
|
||||||
# whilst leaving existing channels up
|
# whilst leaving existing channels up
|
||||||
listeners = await nursery.start(
|
listeners: List[trio.abc.Listener] = await nursery.start(
|
||||||
partial(
|
partial(
|
||||||
trio.serve_tcp,
|
trio.serve_tcp,
|
||||||
self._stream_handler,
|
self._stream_handler,
|
||||||
|
@ -638,7 +650,7 @@ class Actor:
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
log.debug(
|
log.debug(
|
||||||
f"Started tcp server(s) on {[l.socket for l in listeners]}")
|
f"Started tcp server(s) on {[l.socket for l in listeners]}") # type: ignore
|
||||||
self._listeners.extend(listeners)
|
self._listeners.extend(listeners)
|
||||||
task_status.started()
|
task_status.started()
|
||||||
|
|
||||||
|
@ -720,13 +732,11 @@ class Actor:
|
||||||
self._server_nursery.cancel_scope.cancel()
|
self._server_nursery.cancel_scope.cancel()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def accept_addr(self) -> Optional[Tuple[str, int]]:
|
def accept_addr(self) -> Tuple[str, int]:
|
||||||
"""Primary address to which the channel server is bound.
|
"""Primary address to which the channel server is bound.
|
||||||
"""
|
"""
|
||||||
try:
|
# throws OSError on failure
|
||||||
return self._listeners[0].socket.getsockname()
|
return self._listeners[0].socket.getsockname() # type: ignore
|
||||||
except OSError:
|
|
||||||
return None
|
|
||||||
|
|
||||||
def get_parent(self) -> Portal:
|
def get_parent(self) -> Portal:
|
||||||
"""Return a portal to our parent actor."""
|
"""Return a portal to our parent actor."""
|
||||||
|
@ -826,7 +836,7 @@ async def _start_actor(
|
||||||
host: str,
|
host: str,
|
||||||
port: int,
|
port: int,
|
||||||
arbiter_addr: Tuple[str, int],
|
arbiter_addr: Tuple[str, int],
|
||||||
nursery: trio._core._run.Nursery = None
|
nursery: trio.Nursery = None
|
||||||
):
|
):
|
||||||
"""Spawn a local actor by starting a task to execute it's main async
|
"""Spawn a local actor by starting a task to execute it's main async
|
||||||
function.
|
function.
|
||||||
|
|
|
@ -17,9 +17,16 @@ class MsgpackStream:
|
||||||
"""
|
"""
|
||||||
def __init__(self, stream: trio.SocketStream) -> None:
|
def __init__(self, stream: trio.SocketStream) -> None:
|
||||||
self.stream = stream
|
self.stream = stream
|
||||||
|
assert self.stream.socket
|
||||||
|
# should both be IP sockets
|
||||||
|
lsockname = stream.socket.getsockname()
|
||||||
|
assert isinstance(lsockname, tuple)
|
||||||
|
self._laddr = lsockname[:2]
|
||||||
|
rsockname = stream.socket.getpeername()
|
||||||
|
assert isinstance(rsockname, tuple)
|
||||||
|
self._raddr = rsockname[:2]
|
||||||
|
|
||||||
self._agen = self._iter_packets()
|
self._agen = self._iter_packets()
|
||||||
self._laddr = self.stream.socket.getsockname()[:2]
|
|
||||||
self._raddr = self.stream.socket.getpeername()[:2]
|
|
||||||
self._send_lock = trio.StrictFIFOLock()
|
self._send_lock = trio.StrictFIFOLock()
|
||||||
|
|
||||||
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
|
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
|
||||||
|
@ -43,14 +50,15 @@ class MsgpackStream:
|
||||||
yield packet
|
yield packet
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def laddr(self) -> Tuple[str, int]:
|
def laddr(self) -> Tuple[Any, ...]:
|
||||||
return self._laddr
|
return self._laddr
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def raddr(self) -> Tuple[str, int]:
|
def raddr(self) -> Tuple[Any, ...]:
|
||||||
return self._raddr
|
return self._raddr
|
||||||
|
|
||||||
async def send(self, data: Any) -> int:
|
# XXX: should this instead be called `.sendall()`?
|
||||||
|
async def send(self, data: Any) -> None:
|
||||||
async with self._send_lock:
|
async with self._send_lock:
|
||||||
return await self.stream.send_all(
|
return await self.stream.send_all(
|
||||||
msgpack.dumps(data, use_bin_type=True))
|
msgpack.dumps(data, use_bin_type=True))
|
||||||
|
@ -95,24 +103,26 @@ class Channel:
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
if self.msgstream:
|
if self.msgstream:
|
||||||
return repr(
|
return repr(
|
||||||
self.msgstream.stream.socket._sock).replace(
|
self.msgstream.stream.socket._sock).replace( # type: ignore
|
||||||
"socket.socket", "Channel")
|
"socket.socket", "Channel")
|
||||||
return object.__repr__(self)
|
return object.__repr__(self)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def laddr(self) -> Optional[Tuple[str, int]]:
|
def laddr(self) -> Optional[Tuple[Any, ...]]:
|
||||||
return self.msgstream.laddr if self.msgstream else None
|
return self.msgstream.laddr if self.msgstream else None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def raddr(self) -> Optional[Tuple[str, int]]:
|
def raddr(self) -> Optional[Tuple[Any, ...]]:
|
||||||
return self.msgstream.raddr if self.msgstream else None
|
return self.msgstream.raddr if self.msgstream else None
|
||||||
|
|
||||||
async def connect(
|
async def connect(
|
||||||
self, destaddr: Tuple[str, int] = None, **kwargs
|
self, destaddr: Tuple[Any, ...] = None,
|
||||||
|
**kwargs
|
||||||
) -> trio.SocketStream:
|
) -> trio.SocketStream:
|
||||||
if self.connected():
|
if self.connected():
|
||||||
raise RuntimeError("channel is already connected?")
|
raise RuntimeError("channel is already connected?")
|
||||||
destaddr = destaddr or self._destaddr
|
destaddr = destaddr or self._destaddr
|
||||||
|
assert isinstance(destaddr, tuple)
|
||||||
stream = await trio.open_tcp_stream(*destaddr, **kwargs)
|
stream = await trio.open_tcp_stream(*destaddr, **kwargs)
|
||||||
self.msgstream = MsgpackStream(stream)
|
self.msgstream = MsgpackStream(stream)
|
||||||
return stream
|
return stream
|
||||||
|
|
|
@ -21,7 +21,9 @@ log = get_logger('tractor')
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def maybe_open_nursery(nursery: trio._core._run.Nursery = None):
|
async def maybe_open_nursery(
|
||||||
|
nursery: trio.Nursery = None
|
||||||
|
) -> typing.AsyncGenerator[trio.Nursery, Any]:
|
||||||
"""Create a new nursery if None provided.
|
"""Create a new nursery if None provided.
|
||||||
|
|
||||||
Blocks on exit as expected if no input nursery is provided.
|
Blocks on exit as expected if no input nursery is provided.
|
||||||
|
@ -252,14 +254,14 @@ class Portal:
|
||||||
for stream in self._streams.copy():
|
for stream in self._streams.copy():
|
||||||
await stream.aclose()
|
await stream.aclose()
|
||||||
|
|
||||||
async def aclose(self) -> None:
|
async def aclose(self):
|
||||||
log.debug(f"Closing {self}")
|
log.debug(f"Closing {self}")
|
||||||
# TODO: once we move to implementing our own `ReceiveChannel`
|
# TODO: once we move to implementing our own `ReceiveChannel`
|
||||||
# (including remote task cancellation inside its `.aclose()`)
|
# (including remote task cancellation inside its `.aclose()`)
|
||||||
# we'll need to .aclose all those channels here
|
# we'll need to .aclose all those channels here
|
||||||
await self._cancel_streams()
|
await self._cancel_streams()
|
||||||
|
|
||||||
async def cancel_actor(self) -> bool:
|
async def cancel_actor(self):
|
||||||
"""Cancel the actor on the other end of this portal.
|
"""Cancel the actor on the other end of this portal.
|
||||||
"""
|
"""
|
||||||
if not self.channel.connected():
|
if not self.channel.connected():
|
||||||
|
@ -279,6 +281,8 @@ class Portal:
|
||||||
return True
|
return True
|
||||||
if cancel_scope.cancelled_caught:
|
if cancel_scope.cancelled_caught:
|
||||||
log.warning(f"May have failed to cancel {self.channel.uid}")
|
log.warning(f"May have failed to cancel {self.channel.uid}")
|
||||||
|
|
||||||
|
# if we get here some weird cancellation case happened
|
||||||
return False
|
return False
|
||||||
except trio.ClosedResourceError:
|
except trio.ClosedResourceError:
|
||||||
log.warning(
|
log.warning(
|
||||||
|
@ -309,7 +313,7 @@ class LocalPortal:
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def open_portal(
|
async def open_portal(
|
||||||
channel: Channel,
|
channel: Channel,
|
||||||
nursery: trio._core._run.Nursery = None
|
nursery: Optional[trio.Nursery] = None
|
||||||
) -> typing.AsyncGenerator[Portal, None]:
|
) -> typing.AsyncGenerator[Portal, None]:
|
||||||
"""Open a ``Portal`` through the provided ``channel``.
|
"""Open a ``Portal`` through the provided ``channel``.
|
||||||
|
|
||||||
|
@ -320,7 +324,6 @@ async def open_portal(
|
||||||
was_connected = False
|
was_connected = False
|
||||||
|
|
||||||
async with maybe_open_nursery(nursery) as nursery:
|
async with maybe_open_nursery(nursery) as nursery:
|
||||||
|
|
||||||
if not channel.connected():
|
if not channel.connected():
|
||||||
await channel.connect()
|
await channel.connect()
|
||||||
was_connected = True
|
was_connected = True
|
||||||
|
@ -328,7 +331,7 @@ async def open_portal(
|
||||||
if channel.uid is None:
|
if channel.uid is None:
|
||||||
await actor._do_handshake(channel)
|
await actor._do_handshake(channel)
|
||||||
|
|
||||||
msg_loop_cs = await nursery.start(
|
msg_loop_cs: trio.CancelScope = await nursery.start(
|
||||||
partial(
|
partial(
|
||||||
actor._process_messages,
|
actor._process_messages,
|
||||||
channel,
|
channel,
|
||||||
|
|
|
@ -21,7 +21,7 @@ from ._state import current_actor
|
||||||
from ._actor import Actor
|
from ._actor import Actor
|
||||||
|
|
||||||
|
|
||||||
_ctx: mp.context.BaseContext = mp.get_context("spawn")
|
_ctx: mp.context.BaseContext = mp.get_context("spawn") # type: ignore
|
||||||
|
|
||||||
|
|
||||||
def try_set_start_method(name: str) -> mp.context.BaseContext:
|
def try_set_start_method(name: str) -> mp.context.BaseContext:
|
||||||
|
@ -95,7 +95,7 @@ def new_proc(
|
||||||
else:
|
else:
|
||||||
fs_info = (None, None, None, None, None)
|
fs_info = (None, None, None, None, None)
|
||||||
|
|
||||||
return _ctx.Process(
|
return _ctx.Process( # type: ignore
|
||||||
target=actor._fork_main,
|
target=actor._fork_main,
|
||||||
args=(
|
args=(
|
||||||
bind_addr,
|
bind_addr,
|
||||||
|
|
|
@ -2,7 +2,9 @@
|
||||||
Per process state
|
Per process state
|
||||||
"""
|
"""
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
from collections import Mapping
|
||||||
|
|
||||||
|
import trio
|
||||||
|
|
||||||
_current_actor: Optional['Actor'] = None # type: ignore
|
_current_actor: Optional['Actor'] = None # type: ignore
|
||||||
|
|
||||||
|
@ -10,6 +12,27 @@ _current_actor: Optional['Actor'] = None # type: ignore
|
||||||
def current_actor() -> 'Actor': # type: ignore
|
def current_actor() -> 'Actor': # type: ignore
|
||||||
"""Get the process-local actor instance.
|
"""Get the process-local actor instance.
|
||||||
"""
|
"""
|
||||||
if not _current_actor:
|
if _current_actor is None:
|
||||||
raise RuntimeError("No actor instance has been defined yet?")
|
raise RuntimeError("No local actor has been initialized yet")
|
||||||
return _current_actor
|
return _current_actor
|
||||||
|
|
||||||
|
|
||||||
|
class ActorContextInfo(Mapping):
|
||||||
|
"Dyanmic lookup for local actor and task names"
|
||||||
|
_context_keys = ('task', 'actor')
|
||||||
|
|
||||||
|
def __len__(self):
|
||||||
|
return len(self._context_keys)
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
return iter(self._context_keys)
|
||||||
|
|
||||||
|
def __getitem__(self, key: str):
|
||||||
|
try:
|
||||||
|
return {
|
||||||
|
'task': trio.hazmat.current_task,
|
||||||
|
'actor': current_actor
|
||||||
|
}[key]().name
|
||||||
|
except RuntimeError:
|
||||||
|
# no local actor/task context initialized yet
|
||||||
|
return f'no {key} context'
|
||||||
|
|
|
@ -64,7 +64,6 @@ class ActorNursery:
|
||||||
arbiter_addr=current_actor()._arb_addr,
|
arbiter_addr=current_actor()._arb_addr,
|
||||||
)
|
)
|
||||||
parent_addr = self._actor.accept_addr
|
parent_addr = self._actor.accept_addr
|
||||||
assert parent_addr
|
|
||||||
proc = _spawn.new_proc(
|
proc = _spawn.new_proc(
|
||||||
name,
|
name,
|
||||||
actor,
|
actor,
|
||||||
|
@ -192,8 +191,7 @@ class ActorNursery:
|
||||||
async def wait_for_proc(
|
async def wait_for_proc(
|
||||||
proc: mp.Process,
|
proc: mp.Process,
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
portal: Portal,
|
cancel_scope: Optional[trio.CancelScope] = None,
|
||||||
cancel_scope: Optional[trio._core._run.CancelScope] = None,
|
|
||||||
) -> None:
|
) -> None:
|
||||||
# TODO: timeout block here?
|
# TODO: timeout block here?
|
||||||
if proc.is_alive():
|
if proc.is_alive():
|
||||||
|
@ -207,7 +205,7 @@ class ActorNursery:
|
||||||
|
|
||||||
# proc terminated, cancel result waiter that may have
|
# proc terminated, cancel result waiter that may have
|
||||||
# been spawned in tandem if not done already
|
# been spawned in tandem if not done already
|
||||||
if cancel_scope: # and not portal._cancelled:
|
if cancel_scope:
|
||||||
log.warning(
|
log.warning(
|
||||||
f"Cancelling existing result waiter task for {actor.uid}")
|
f"Cancelling existing result waiter task for {actor.uid}")
|
||||||
cancel_scope.cancel()
|
cancel_scope.cancel()
|
||||||
|
@ -223,11 +221,12 @@ class ActorNursery:
|
||||||
cs = None
|
cs = None
|
||||||
# portal from ``run_in_actor()``
|
# portal from ``run_in_actor()``
|
||||||
if portal in self._cancel_after_result_on_exit:
|
if portal in self._cancel_after_result_on_exit:
|
||||||
|
assert portal
|
||||||
cs = await nursery.start(
|
cs = await nursery.start(
|
||||||
cancel_on_completion, portal, subactor)
|
cancel_on_completion, portal, subactor)
|
||||||
# TODO: how do we handle remote host spawned actors?
|
# TODO: how do we handle remote host spawned actors?
|
||||||
nursery.start_soon(
|
nursery.start_soon(
|
||||||
wait_for_proc, proc, subactor, portal, cs)
|
wait_for_proc, proc, subactor, cs)
|
||||||
|
|
||||||
if errors:
|
if errors:
|
||||||
if not self.cancelled:
|
if not self.cancelled:
|
||||||
|
@ -247,7 +246,8 @@ class ActorNursery:
|
||||||
async with trio.open_nursery() as nursery:
|
async with trio.open_nursery() as nursery:
|
||||||
for subactor, proc, portal in children.values():
|
for subactor, proc, portal in children.values():
|
||||||
# TODO: how do we handle remote host spawned actors?
|
# TODO: how do we handle remote host spawned actors?
|
||||||
nursery.start_soon(wait_for_proc, proc, subactor, portal, cs)
|
assert portal
|
||||||
|
nursery.start_soon(wait_for_proc, proc, subactor, cs)
|
||||||
|
|
||||||
log.debug(f"All subactors for {self} have terminated")
|
log.debug(f"All subactors for {self} have terminated")
|
||||||
if errors:
|
if errors:
|
||||||
|
|
|
@ -7,6 +7,8 @@ import logging
|
||||||
import colorlog # type: ignore
|
import colorlog # type: ignore
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
|
from ._state import ActorContextInfo
|
||||||
|
|
||||||
|
|
||||||
_proj_name = 'tractor'
|
_proj_name = 'tractor'
|
||||||
_default_loglevel = None
|
_default_loglevel = None
|
||||||
|
@ -18,7 +20,7 @@ LOG_FORMAT = (
|
||||||
# "{bold_white}{log_color}{asctime}{reset}"
|
# "{bold_white}{log_color}{asctime}{reset}"
|
||||||
"{log_color}{asctime}{reset}"
|
"{log_color}{asctime}{reset}"
|
||||||
" {bold_white}{thin_white}({reset}"
|
" {bold_white}{thin_white}({reset}"
|
||||||
"{thin_white}{processName}: {threadName}{reset}{bold_white}{thin_white})"
|
"{thin_white}{actor}, {process}, {task}){reset}{bold_white}{thin_white})"
|
||||||
" {reset}{log_color}[{reset}{bold_log_color}{levelname}{reset}{log_color}]"
|
" {reset}{log_color}[{reset}{bold_log_color}{levelname}{reset}{log_color}]"
|
||||||
" {log_color}{name}"
|
" {log_color}{name}"
|
||||||
" {thin_white}{filename}{log_color}:{reset}{thin_white}{lineno}{log_color}"
|
" {thin_white}{filename}{log_color}:{reset}{thin_white}{lineno}{log_color}"
|
||||||
|
@ -46,29 +48,40 @@ BOLD_PALETTE = {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def get_logger(name: str = None) -> logging.Logger:
|
def get_logger(
|
||||||
|
name: str = None,
|
||||||
|
_root_name: str = _proj_name,
|
||||||
|
) -> logging.LoggerAdapter:
|
||||||
'''Return the package log or a sub-log for `name` if provided.
|
'''Return the package log or a sub-log for `name` if provided.
|
||||||
'''
|
'''
|
||||||
log = rlog = logging.getLogger(_proj_name)
|
log = rlog = logging.getLogger(_root_name)
|
||||||
if name and name != _proj_name:
|
if name and name != _proj_name:
|
||||||
log = rlog.getChild(name)
|
log = rlog.getChild(name)
|
||||||
log.level = rlog.level
|
log.level = rlog.level
|
||||||
|
|
||||||
|
# add our actor-task aware adapter which will dynamically look up
|
||||||
|
# the actor and task names at each log emit
|
||||||
|
logger = logging.LoggerAdapter(log, ActorContextInfo())
|
||||||
|
|
||||||
# additional levels
|
# additional levels
|
||||||
for name, val in LEVELS.items():
|
for name, val in LEVELS.items():
|
||||||
logging.addLevelName(val, name)
|
logging.addLevelName(val, name)
|
||||||
# ex. create ``log.trace()``
|
# ex. create ``logger.trace()``
|
||||||
setattr(log, name.lower(), partial(log.log, val))
|
setattr(logger, name.lower(), partial(logger.log, val))
|
||||||
|
|
||||||
return log
|
return logger
|
||||||
|
|
||||||
|
|
||||||
def get_console_log(level: str = None, name: str = None) -> logging.Logger:
|
def get_console_log(
|
||||||
|
level: str = None,
|
||||||
|
**kwargs,
|
||||||
|
) -> logging.LoggerAdapter:
|
||||||
'''Get the package logger and enable a handler which writes to stderr.
|
'''Get the package logger and enable a handler which writes to stderr.
|
||||||
|
|
||||||
Yeah yeah, i know we can use ``DictConfig``. You do it...
|
Yeah yeah, i know we can use ``DictConfig``. You do it.
|
||||||
'''
|
'''
|
||||||
log = get_logger(name) # our root logger
|
log = get_logger(**kwargs) # our root logger
|
||||||
|
logger = log.logger
|
||||||
|
|
||||||
if not level:
|
if not level:
|
||||||
return log
|
return log
|
||||||
|
@ -77,7 +90,7 @@ def get_console_log(level: str = None, name: str = None) -> logging.Logger:
|
||||||
|
|
||||||
if not any(
|
if not any(
|
||||||
handler.stream == sys.stderr # type: ignore
|
handler.stream == sys.stderr # type: ignore
|
||||||
for handler in log.handlers if getattr(handler, 'stream', None)
|
for handler in logger.handlers if getattr(handler, 'stream', None)
|
||||||
):
|
):
|
||||||
handler = logging.StreamHandler()
|
handler = logging.StreamHandler()
|
||||||
formatter = colorlog.ColoredFormatter(
|
formatter = colorlog.ColoredFormatter(
|
||||||
|
@ -88,7 +101,7 @@ def get_console_log(level: str = None, name: str = None) -> logging.Logger:
|
||||||
style='{',
|
style='{',
|
||||||
)
|
)
|
||||||
handler.setFormatter(formatter)
|
handler.setFormatter(formatter)
|
||||||
log.addHandler(handler)
|
logger.addHandler(handler)
|
||||||
|
|
||||||
return log
|
return log
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue