Compare commits
No commits in common. "134a84e39e7abe8b64051daf48e8e4c72a99f80c" and "dc17e51bc52c9557200c0b67628174cdf8d4cdc3" have entirely different histories.
134a84e39e
...
dc17e51bc5
11
setup.py
11
setup.py
|
@ -38,20 +38,13 @@ setup(
|
||||||
'tractor.testing',
|
'tractor.testing',
|
||||||
],
|
],
|
||||||
install_requires=[
|
install_requires=[
|
||||||
|
|
||||||
# trio related
|
|
||||||
'trio>0.8',
|
'trio>0.8',
|
||||||
|
'msgpack',
|
||||||
'async_generator',
|
'async_generator',
|
||||||
'tricycle',
|
|
||||||
'trio_typing',
|
|
||||||
|
|
||||||
'colorlog',
|
'colorlog',
|
||||||
'wrapt',
|
'wrapt',
|
||||||
|
'trio_typing',
|
||||||
'pdbpp',
|
'pdbpp',
|
||||||
|
|
||||||
# serialization
|
|
||||||
'msgpack',
|
|
||||||
'msgspec',
|
|
||||||
],
|
],
|
||||||
tests_require=['pytest'],
|
tests_require=['pytest'],
|
||||||
python_requires=">=3.7",
|
python_requires=">=3.7",
|
||||||
|
|
|
@ -31,7 +31,6 @@ from ._exceptions import (
|
||||||
ModuleNotExposed,
|
ModuleNotExposed,
|
||||||
is_multi_cancelled,
|
is_multi_cancelled,
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
TransportClosed,
|
|
||||||
)
|
)
|
||||||
from . import _debug
|
from . import _debug
|
||||||
from ._discovery import get_arbiter
|
from ._discovery import get_arbiter
|
||||||
|
@ -250,7 +249,7 @@ class Actor:
|
||||||
enable_modules: List[str] = [],
|
enable_modules: List[str] = [],
|
||||||
uid: str = None,
|
uid: str = None,
|
||||||
loglevel: str = None,
|
loglevel: str = None,
|
||||||
arbiter_addr: Optional[Tuple[str, int]] = (None, None),
|
arbiter_addr: Optional[Tuple[str, int]] = None,
|
||||||
spawn_method: Optional[str] = None
|
spawn_method: Optional[str] = None
|
||||||
) -> None:
|
) -> None:
|
||||||
"""This constructor is called in the parent actor **before** the spawning
|
"""This constructor is called in the parent actor **before** the spawning
|
||||||
|
@ -280,7 +279,7 @@ class Actor:
|
||||||
# TODO: consider making this a dynamically defined
|
# TODO: consider making this a dynamically defined
|
||||||
# @dataclass once we get py3.7
|
# @dataclass once we get py3.7
|
||||||
self.loglevel = loglevel
|
self.loglevel = loglevel
|
||||||
self._arb_addr = tuple(arbiter_addr)
|
self._arb_addr = arbiter_addr
|
||||||
|
|
||||||
# marked by the process spawning backend at startup
|
# marked by the process spawning backend at startup
|
||||||
# will be None for the parent most process started manually
|
# will be None for the parent most process started manually
|
||||||
|
@ -386,18 +385,7 @@ class Actor:
|
||||||
# send/receive initial handshake response
|
# send/receive initial handshake response
|
||||||
try:
|
try:
|
||||||
uid = await self._do_handshake(chan)
|
uid = await self._do_handshake(chan)
|
||||||
|
except StopAsyncIteration:
|
||||||
except (
|
|
||||||
trio.BrokenResourceError,
|
|
||||||
trio.ClosedResourceError,
|
|
||||||
TransportClosed,
|
|
||||||
):
|
|
||||||
# XXX: This may propagate up from ``Channel._aiter_recv()``
|
|
||||||
# and ``MsgpackStream._inter_packets()`` on a read from the
|
|
||||||
# stream particularly when the runtime is first starting up
|
|
||||||
# inside ``open_root_actor()`` where there is a check for
|
|
||||||
# a bound listener on the "arbiter" addr. the reset will be
|
|
||||||
# because the handshake was never meant took place.
|
|
||||||
log.warning(f"Channel {chan} failed to handshake")
|
log.warning(f"Channel {chan} failed to handshake")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -541,7 +529,6 @@ class Actor:
|
||||||
# ``scope = Nursery.start()``
|
# ``scope = Nursery.start()``
|
||||||
task_status.started(loop_cs)
|
task_status.started(loop_cs)
|
||||||
async for msg in chan:
|
async for msg in chan:
|
||||||
|
|
||||||
if msg is None: # loop terminate sentinel
|
if msg is None: # loop terminate sentinel
|
||||||
|
|
||||||
log.debug(
|
log.debug(
|
||||||
|
@ -648,39 +635,22 @@ class Actor:
|
||||||
)
|
)
|
||||||
await self.cancel_rpc_tasks(chan)
|
await self.cancel_rpc_tasks(chan)
|
||||||
|
|
||||||
except (
|
|
||||||
TransportClosed,
|
|
||||||
trio.BrokenResourceError,
|
|
||||||
trio.ClosedResourceError
|
|
||||||
):
|
|
||||||
# channels "breaking" is ok since we don't have a teardown
|
|
||||||
# handshake for them (yet) and instead we simply bail out
|
|
||||||
# of the message loop and expect the teardown sequence
|
|
||||||
# to clean up.
|
|
||||||
log.error(f"{chan} form {chan.uid} closed abruptly")
|
|
||||||
# raise
|
|
||||||
|
|
||||||
except trio.ClosedResourceError:
|
except trio.ClosedResourceError:
|
||||||
log.error(f"{chan} form {chan.uid} broke")
|
log.error(f"{chan} form {chan.uid} broke")
|
||||||
|
|
||||||
except (Exception, trio.MultiError) as err:
|
except (Exception, trio.MultiError) as err:
|
||||||
# ship any "internal" exception (i.e. one from internal machinery
|
# ship any "internal" exception (i.e. one from internal machinery
|
||||||
# not from an rpc task) to parent
|
# not from an rpc task) to parent
|
||||||
log.exception("Actor errored:")
|
log.exception("Actor errored:")
|
||||||
if self._parent_chan:
|
if self._parent_chan:
|
||||||
await self._parent_chan.send(pack_error(err))
|
await self._parent_chan.send(pack_error(err))
|
||||||
|
raise
|
||||||
# if this is the `MainProcess` we expect the error broadcasting
|
# if this is the `MainProcess` we expect the error broadcasting
|
||||||
# above to trigger an error at consuming portal "checkpoints"
|
# above to trigger an error at consuming portal "checkpoints"
|
||||||
raise
|
|
||||||
|
|
||||||
except trio.Cancelled:
|
except trio.Cancelled:
|
||||||
# debugging only
|
# debugging only
|
||||||
log.debug(f"Msg loop was cancelled for {chan}")
|
log.debug(f"Msg loop was cancelled for {chan}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# msg debugging for when he machinery is brokey
|
|
||||||
log.debug(
|
log.debug(
|
||||||
f"Exiting msg loop for {chan} from {chan.uid} "
|
f"Exiting msg loop for {chan} from {chan.uid} "
|
||||||
f"with last msg:\n{msg}")
|
f"with last msg:\n{msg}")
|
||||||
|
@ -720,14 +690,6 @@ class Actor:
|
||||||
_state._runtime_vars.update(rvs)
|
_state._runtime_vars.update(rvs)
|
||||||
|
|
||||||
for attr, value in parent_data.items():
|
for attr, value in parent_data.items():
|
||||||
|
|
||||||
if attr == '_arb_addr':
|
|
||||||
# XXX: msgspec doesn't support serializing tuples
|
|
||||||
# so just cash manually here since it's what our
|
|
||||||
# internals expect.
|
|
||||||
self._arb_addr = tuple(value)
|
|
||||||
|
|
||||||
else:
|
|
||||||
setattr(self, attr, value)
|
setattr(self, attr, value)
|
||||||
|
|
||||||
# Disable sigint handling in children if NOT running in
|
# Disable sigint handling in children if NOT running in
|
||||||
|
@ -1113,10 +1075,10 @@ class Actor:
|
||||||
parlance.
|
parlance.
|
||||||
"""
|
"""
|
||||||
await chan.send(self.uid)
|
await chan.send(self.uid)
|
||||||
uid: Tuple[str, str] = tuple(await chan.recv())
|
uid: Tuple[str, str] = await chan.recv()
|
||||||
|
|
||||||
# if not isinstance(uid, tuple):
|
if not isinstance(uid, tuple):
|
||||||
# raise ValueError(f"{uid} is not a valid uid?!")
|
raise ValueError(f"{uid} is not a valid uid?!")
|
||||||
|
|
||||||
chan.uid = uid
|
chan.uid = uid
|
||||||
log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
|
log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
|
||||||
|
@ -1183,9 +1145,8 @@ class Arbiter(Actor):
|
||||||
async def register_actor(
|
async def register_actor(
|
||||||
self, uid: Tuple[str, str], sockaddr: Tuple[str, int]
|
self, uid: Tuple[str, str], sockaddr: Tuple[str, int]
|
||||||
) -> None:
|
) -> None:
|
||||||
uid = tuple(uid)
|
|
||||||
name, uuid = uid
|
name, uuid = uid
|
||||||
self._registry[uid] = tuple(sockaddr)
|
self._registry[uid] = sockaddr
|
||||||
|
|
||||||
# pop and signal all waiter events
|
# pop and signal all waiter events
|
||||||
events = self._waiters.pop(name, ())
|
events = self._waiters.pop(name, ())
|
||||||
|
@ -1195,4 +1156,4 @@ class Arbiter(Actor):
|
||||||
event.set()
|
event.set()
|
||||||
|
|
||||||
async def unregister_actor(self, uid: Tuple[str, str]) -> None:
|
async def unregister_actor(self, uid: Tuple[str, str]) -> None:
|
||||||
self._registry.pop(tuple(uid))
|
self._registry.pop(uid)
|
||||||
|
|
|
@ -41,10 +41,6 @@ class ContextCancelled(RemoteActorError):
|
||||||
"Inter-actor task context cancelled itself on the callee side."
|
"Inter-actor task context cancelled itself on the callee side."
|
||||||
|
|
||||||
|
|
||||||
class TransportClosed(trio.ClosedResourceError):
|
|
||||||
"Underlying channel transport was closed prior to use"
|
|
||||||
|
|
||||||
|
|
||||||
class NoResult(RuntimeError):
|
class NoResult(RuntimeError):
|
||||||
"No final result is expected for this actor"
|
"No final result is expected for this actor"
|
||||||
|
|
||||||
|
@ -70,15 +66,12 @@ def pack_error(exc: BaseException) -> Dict[str, Any]:
|
||||||
|
|
||||||
|
|
||||||
def unpack_error(
|
def unpack_error(
|
||||||
|
|
||||||
msg: Dict[str, Any],
|
msg: Dict[str, Any],
|
||||||
chan=None,
|
chan=None,
|
||||||
err_type=RemoteActorError
|
err_type=RemoteActorError
|
||||||
|
|
||||||
) -> Exception:
|
) -> Exception:
|
||||||
"""Unpack an 'error' message from the wire
|
"""Unpack an 'error' message from the wire
|
||||||
into a local ``RemoteActorError``.
|
into a local ``RemoteActorError``.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
error = msg['error']
|
error = msg['error']
|
||||||
|
|
||||||
|
|
131
tractor/_ipc.py
131
tractor/_ipc.py
|
@ -1,20 +1,16 @@
|
||||||
"""
|
"""
|
||||||
Inter-process comms abstractions
|
Inter-process comms abstractions
|
||||||
"""
|
"""
|
||||||
from functools import partial
|
|
||||||
import struct
|
|
||||||
import typing
|
import typing
|
||||||
from typing import Any, Tuple, Optional
|
from typing import Any, Tuple, Optional
|
||||||
|
from functools import partial
|
||||||
|
|
||||||
from tricycle import BufferedReceiveStream
|
|
||||||
import msgpack
|
import msgpack
|
||||||
import msgspec
|
|
||||||
import trio
|
import trio
|
||||||
from async_generator import asynccontextmanager
|
from async_generator import asynccontextmanager
|
||||||
|
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from ._exceptions import TransportClosed
|
log = get_logger('ipc')
|
||||||
log = get_logger(__name__)
|
|
||||||
|
|
||||||
# :eyeroll:
|
# :eyeroll:
|
||||||
try:
|
try:
|
||||||
|
@ -25,32 +21,21 @@ except ImportError:
|
||||||
Unpacker = partial(msgpack.Unpacker, strict_map_key=False)
|
Unpacker = partial(msgpack.Unpacker, strict_map_key=False)
|
||||||
|
|
||||||
|
|
||||||
class MsgpackTCPStream:
|
class MsgpackStream:
|
||||||
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
|
"""A ``trio.SocketStream`` delivering ``msgpack`` formatted data.
|
||||||
using ``msgpack-python``.
|
"""
|
||||||
|
def __init__(self, stream: trio.SocketStream) -> None:
|
||||||
'''
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
stream: trio.SocketStream,
|
|
||||||
|
|
||||||
) -> None:
|
|
||||||
|
|
||||||
self.stream = stream
|
self.stream = stream
|
||||||
assert self.stream.socket
|
assert self.stream.socket
|
||||||
|
|
||||||
# should both be IP sockets
|
# should both be IP sockets
|
||||||
lsockname = stream.socket.getsockname()
|
lsockname = stream.socket.getsockname()
|
||||||
assert isinstance(lsockname, tuple)
|
assert isinstance(lsockname, tuple)
|
||||||
self._laddr = lsockname[:2]
|
self._laddr = lsockname[:2]
|
||||||
|
|
||||||
rsockname = stream.socket.getpeername()
|
rsockname = stream.socket.getpeername()
|
||||||
assert isinstance(rsockname, tuple)
|
assert isinstance(rsockname, tuple)
|
||||||
self._raddr = rsockname[:2]
|
self._raddr = rsockname[:2]
|
||||||
|
|
||||||
# start first entry to read loop
|
|
||||||
self._agen = self._iter_packets()
|
self._agen = self._iter_packets()
|
||||||
|
|
||||||
self._send_lock = trio.StrictFIFOLock()
|
self._send_lock = trio.StrictFIFOLock()
|
||||||
|
|
||||||
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
|
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
|
||||||
|
@ -61,13 +46,16 @@ class MsgpackTCPStream:
|
||||||
use_list=False,
|
use_list=False,
|
||||||
)
|
)
|
||||||
while True:
|
while True:
|
||||||
|
try:
|
||||||
data = await self.stream.receive_some(2**10)
|
data = await self.stream.receive_some(2**10)
|
||||||
log.trace(f"received {data}") # type: ignore
|
log.trace(f"received {data}") # type: ignore
|
||||||
|
except trio.BrokenResourceError:
|
||||||
|
log.warning(f"Stream connection {self.raddr} broke")
|
||||||
|
return
|
||||||
|
|
||||||
if data == b'':
|
if data == b'':
|
||||||
raise TransportClosed(
|
log.debug(f"Stream connection {self.raddr} was closed")
|
||||||
f'transport {self} was already closed prior ro read'
|
return
|
||||||
)
|
|
||||||
|
|
||||||
unpacker.feed(data)
|
unpacker.feed(data)
|
||||||
for packet in unpacker:
|
for packet in unpacker:
|
||||||
|
@ -85,8 +73,7 @@ class MsgpackTCPStream:
|
||||||
async def send(self, data: Any) -> None:
|
async def send(self, data: Any) -> None:
|
||||||
async with self._send_lock:
|
async with self._send_lock:
|
||||||
return await self.stream.send_all(
|
return await self.stream.send_all(
|
||||||
msgpack.dumps(data, use_bin_type=True)
|
msgpack.dumps(data, use_bin_type=True))
|
||||||
)
|
|
||||||
|
|
||||||
async def recv(self) -> Any:
|
async def recv(self) -> Any:
|
||||||
return await self._agen.asend(None)
|
return await self._agen.asend(None)
|
||||||
|
@ -98,92 +85,26 @@ class MsgpackTCPStream:
|
||||||
return self.stream.socket.fileno() != -1
|
return self.stream.socket.fileno() != -1
|
||||||
|
|
||||||
|
|
||||||
class MsgspecTCPStream(MsgpackTCPStream):
|
|
||||||
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
|
|
||||||
using ``msgspec``.
|
|
||||||
|
|
||||||
'''
|
|
||||||
ms_encode = msgspec.Encoder().encode
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
stream: trio.SocketStream,
|
|
||||||
prefix_size: int = 4,
|
|
||||||
|
|
||||||
) -> None:
|
|
||||||
super().__init__(stream)
|
|
||||||
self.recv_stream = BufferedReceiveStream(transport_stream=stream)
|
|
||||||
self.prefix_size = prefix_size
|
|
||||||
|
|
||||||
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
|
|
||||||
"""Yield packets from the underlying stream.
|
|
||||||
"""
|
|
||||||
decoder = msgspec.Decoder() # dict[str, Any])
|
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
header = await self.recv_stream.receive_exactly(4)
|
|
||||||
|
|
||||||
except (ValueError):
|
|
||||||
raise TransportClosed(
|
|
||||||
f'transport {self} was already closed prior ro read'
|
|
||||||
)
|
|
||||||
|
|
||||||
if header == b'':
|
|
||||||
raise TransportClosed(
|
|
||||||
f'transport {self} was already closed prior ro read'
|
|
||||||
)
|
|
||||||
|
|
||||||
size, = struct.unpack("<I", header)
|
|
||||||
|
|
||||||
log.trace(f'received header {size}')
|
|
||||||
|
|
||||||
msg_bytes = await self.recv_stream.receive_exactly(size)
|
|
||||||
|
|
||||||
log.trace(f"received {msg_bytes}") # type: ignore
|
|
||||||
yield decoder.decode(msg_bytes)
|
|
||||||
|
|
||||||
async def send(self, data: Any) -> None:
|
|
||||||
async with self._send_lock:
|
|
||||||
|
|
||||||
bytes_data = self.ms_encode(data)
|
|
||||||
|
|
||||||
# supposedly the fastest says,
|
|
||||||
# https://stackoverflow.com/a/54027962
|
|
||||||
size: int = struct.pack("<I", len(bytes_data))
|
|
||||||
|
|
||||||
return await self.stream.send_all(size + bytes_data)
|
|
||||||
|
|
||||||
|
|
||||||
class Channel:
|
class Channel:
|
||||||
"""An inter-process channel for communication between (remote) actors.
|
"""An inter-process channel for communication between (remote) actors.
|
||||||
|
|
||||||
Currently the only supported transport is a ``trio.SocketStream``.
|
Currently the only supported transport is a ``trio.SocketStream``.
|
||||||
"""
|
"""
|
||||||
def __init__(
|
def __init__(
|
||||||
|
|
||||||
self,
|
self,
|
||||||
destaddr: Optional[Tuple[str, int]] = None,
|
destaddr: Optional[Tuple[str, int]] = None,
|
||||||
on_reconnect: typing.Callable[..., typing.Awaitable] = None,
|
on_reconnect: typing.Callable[..., typing.Awaitable] = None,
|
||||||
auto_reconnect: bool = False,
|
auto_reconnect: bool = False,
|
||||||
stream: trio.SocketStream = None, # expected to be active
|
stream: trio.SocketStream = None, # expected to be active
|
||||||
|
|
||||||
# stream_serializer_type: type = MsgspecTCPStream,
|
|
||||||
stream_serializer_type: type = MsgpackTCPStream,
|
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
self._recon_seq = on_reconnect
|
self._recon_seq = on_reconnect
|
||||||
self._autorecon = auto_reconnect
|
self._autorecon = auto_reconnect
|
||||||
self.stream_serializer_type = stream_serializer_type
|
self.msgstream: Optional[MsgpackStream] = MsgpackStream(
|
||||||
self.msgstream: Optional[type] = stream_serializer_type(
|
|
||||||
stream) if stream else None
|
stream) if stream else None
|
||||||
|
|
||||||
if self.msgstream and destaddr:
|
if self.msgstream and destaddr:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"A stream was provided with local addr {self.laddr}"
|
f"A stream was provided with local addr {self.laddr}"
|
||||||
)
|
)
|
||||||
|
|
||||||
self._destaddr = self.msgstream.raddr if self.msgstream else destaddr
|
self._destaddr = self.msgstream.raddr if self.msgstream else destaddr
|
||||||
# set after handshake - always uid of far end
|
# set after handshake - always uid of far end
|
||||||
self.uid: Optional[Tuple[str, str]] = None
|
self.uid: Optional[Tuple[str, str]] = None
|
||||||
|
@ -191,8 +112,6 @@ class Channel:
|
||||||
self._exc: Optional[Exception] = None
|
self._exc: Optional[Exception] = None
|
||||||
self._agen = self._aiter_recv()
|
self._agen = self._aiter_recv()
|
||||||
|
|
||||||
self._closed: bool = False
|
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
if self.msgstream:
|
if self.msgstream:
|
||||||
return repr(
|
return repr(
|
||||||
|
@ -209,51 +128,35 @@ class Channel:
|
||||||
return self.msgstream.raddr if self.msgstream else None
|
return self.msgstream.raddr if self.msgstream else None
|
||||||
|
|
||||||
async def connect(
|
async def connect(
|
||||||
self,
|
self, destaddr: Tuple[Any, ...] = None,
|
||||||
destaddr: Tuple[Any, ...] = None,
|
|
||||||
**kwargs
|
**kwargs
|
||||||
|
|
||||||
) -> trio.SocketStream:
|
) -> trio.SocketStream:
|
||||||
|
|
||||||
if self.connected():
|
if self.connected():
|
||||||
raise RuntimeError("channel is already connected?")
|
raise RuntimeError("channel is already connected?")
|
||||||
|
|
||||||
destaddr = destaddr or self._destaddr
|
destaddr = destaddr or self._destaddr
|
||||||
assert isinstance(destaddr, tuple)
|
assert isinstance(destaddr, tuple)
|
||||||
|
stream = await trio.open_tcp_stream(*destaddr, **kwargs)
|
||||||
stream = await trio.open_tcp_stream(
|
self.msgstream = MsgpackStream(stream)
|
||||||
*destaddr,
|
|
||||||
**kwargs
|
|
||||||
)
|
|
||||||
self.msgstream = self.stream_serializer_type(stream)
|
|
||||||
return stream
|
return stream
|
||||||
|
|
||||||
async def send(self, item: Any) -> None:
|
async def send(self, item: Any) -> None:
|
||||||
|
|
||||||
log.trace(f"send `{item}`") # type: ignore
|
log.trace(f"send `{item}`") # type: ignore
|
||||||
assert self.msgstream
|
assert self.msgstream
|
||||||
|
|
||||||
await self.msgstream.send(item)
|
await self.msgstream.send(item)
|
||||||
|
|
||||||
async def recv(self) -> Any:
|
async def recv(self) -> Any:
|
||||||
assert self.msgstream
|
assert self.msgstream
|
||||||
|
|
||||||
try:
|
try:
|
||||||
return await self.msgstream.recv()
|
return await self.msgstream.recv()
|
||||||
|
|
||||||
except trio.BrokenResourceError:
|
except trio.BrokenResourceError:
|
||||||
if self._autorecon:
|
if self._autorecon:
|
||||||
await self._reconnect()
|
await self._reconnect()
|
||||||
return await self.recv()
|
return await self.recv()
|
||||||
|
|
||||||
raise
|
|
||||||
|
|
||||||
async def aclose(self) -> None:
|
async def aclose(self) -> None:
|
||||||
log.debug(f"Closing {self}")
|
log.debug(f"Closing {self}")
|
||||||
assert self.msgstream
|
assert self.msgstream
|
||||||
await self.msgstream.stream.aclose()
|
await self.msgstream.stream.aclose()
|
||||||
self._closed = True
|
|
||||||
log.error(f'CLOSING CHAN {self}')
|
|
||||||
|
|
||||||
async def __aenter__(self):
|
async def __aenter__(self):
|
||||||
await self.connect()
|
await self.connect()
|
||||||
|
|
|
@ -12,7 +12,7 @@ import trio
|
||||||
from async_generator import asynccontextmanager
|
from async_generator import asynccontextmanager
|
||||||
|
|
||||||
from . import _debug
|
from . import _debug
|
||||||
from ._state import current_actor, is_main_process, is_root_process
|
from ._state import current_actor, is_main_process
|
||||||
from .log import get_logger, get_loglevel
|
from .log import get_logger, get_loglevel
|
||||||
from ._actor import Actor
|
from ._actor import Actor
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
|
@ -254,26 +254,6 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
"to complete"
|
"to complete"
|
||||||
)
|
)
|
||||||
except BaseException as err:
|
except BaseException as err:
|
||||||
|
|
||||||
if is_root_process() and (
|
|
||||||
type(err) in {
|
|
||||||
Exception, trio.MultiError, trio.Cancelled
|
|
||||||
}
|
|
||||||
):
|
|
||||||
# if we error in the root but the debugger is
|
|
||||||
# engaged we don't want to prematurely kill (and
|
|
||||||
# thus clobber access to) the local tty streams.
|
|
||||||
# instead try to wait for pdb to be released before
|
|
||||||
# tearing down.
|
|
||||||
debug_complete = _debug._pdb_complete
|
|
||||||
if debug_complete and not debug_complete.is_set():
|
|
||||||
log.warning(
|
|
||||||
"Root has errored but pdb is active..waiting "
|
|
||||||
"on debug lock")
|
|
||||||
await _debug._pdb_complete.wait()
|
|
||||||
|
|
||||||
# raise
|
|
||||||
|
|
||||||
# if the caller's scope errored then we activate our
|
# if the caller's scope errored then we activate our
|
||||||
# one-cancels-all supervisor strategy (don't
|
# one-cancels-all supervisor strategy (don't
|
||||||
# worry more are coming).
|
# worry more are coming).
|
||||||
|
@ -388,12 +368,27 @@ async def open_nursery(
|
||||||
async with open_root_actor(**kwargs) as actor:
|
async with open_root_actor(**kwargs) as actor:
|
||||||
assert actor is current_actor()
|
assert actor is current_actor()
|
||||||
|
|
||||||
# try:
|
try:
|
||||||
async with _open_and_supervise_one_cancels_all_nursery(
|
async with _open_and_supervise_one_cancels_all_nursery(
|
||||||
actor
|
actor
|
||||||
) as anursery:
|
) as anursery:
|
||||||
yield anursery
|
yield anursery
|
||||||
|
|
||||||
|
except (Exception, trio.MultiError, trio.Cancelled):
|
||||||
|
# if we error in the root but the debugger is
|
||||||
|
# engaged we don't want to prematurely kill (and
|
||||||
|
# thus clobber access to) the local tty streams.
|
||||||
|
# instead try to wait for pdb to be released before
|
||||||
|
# tearing down.
|
||||||
|
if not _debug._pdb_complete.is_set():
|
||||||
|
log.warning(
|
||||||
|
"Root has errored but pdb is active..waiting "
|
||||||
|
"on debug lock")
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
await _debug._pdb_complete.wait()
|
||||||
|
|
||||||
|
raise
|
||||||
|
|
||||||
else: # sub-nursery case
|
else: # sub-nursery case
|
||||||
|
|
||||||
async with _open_and_supervise_one_cancels_all_nursery(
|
async with _open_and_supervise_one_cancels_all_nursery(
|
||||||
|
|
Loading…
Reference in New Issue