Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet e2e572ef51 Move some infos to runtime level 2021-06-26 23:40:21 -04:00
Tyler Goodlet 6cf6a0e70b Add PDB level and make runtime below info but above debug 2021-06-26 23:40:21 -04:00
Tyler Goodlet 0a5cf6bbd2 WIP attempt to relay context error via raised-in-scope-nursery task 2021-06-24 19:56:05 -04:00
Tyler Goodlet e6c9232b45 Add our own "transport closed" signal
This change some super old (and bad) code from the project's very early
days. For some redic reason i must have thought masking `trio`'s
internal stream / transport errors and a TCP EOF as `StopAsyncIteration`
somehow a good idea. The reality is you probably
want to know the difference between an unexpected transport error
and a simple EOF lol. This begins to resolve that by adding our own
special `TransportClosed` error to signal the "graceful" termination of
a channel's underlying transport. Oh, and this builds on the `msgspec`
integration which helped shed light on the core issues here B)
2021-06-24 18:49:51 -04:00
Tyler Goodlet 62ece4327d Just drop SIGINT masking; it seems to fix piker crash-hangs 2021-06-15 17:54:42 -04:00
Tyler Goodlet f7b7c3fb93 Set stream "end of channel" after shielded check!
Another face palm that was causing serious issues for code that is using
the `.shielded` feature..

Add a bunch more detailed comments for all this subtlety and hopefully
get it right once and for all. Also aggregated the `trio` errors that
should trigger closure inside `.aclose()`, hopefully that's right too.
2021-06-14 16:34:44 -04:00
Tyler Goodlet 186968d3e7 Don't clobber msg loop mem chan... 2021-06-14 13:27:52 -04:00
Tyler Goodlet 7e2a054dd7 Drop bad .close() call 2021-06-14 09:34:00 -04:00
Tyler Goodlet b494f29d55 Proxy asyncio cancelleds as well 2021-06-14 09:34:00 -04:00
Tyler Goodlet 9ced2066b9 Power of 2 cuz puters 2021-06-14 09:34:00 -04:00
8 changed files with 387 additions and 207 deletions

View File

@ -12,7 +12,7 @@ import uuid
import typing import typing
from typing import Dict, List, Tuple, Any, Optional, Union from typing import Dict, List, Tuple, Any, Optional, Union
from types import ModuleType from types import ModuleType
import signal # import signal
import sys import sys
import os import os
from contextlib import ExitStack from contextlib import ExitStack
@ -31,6 +31,7 @@ from ._exceptions import (
ModuleNotExposed, ModuleNotExposed,
is_multi_cancelled, is_multi_cancelled,
ContextCancelled, ContextCancelled,
TransportClosed,
) )
from . import _debug from . import _debug
from ._discovery import get_arbiter from ._discovery import get_arbiter
@ -47,6 +48,7 @@ class ActorFailure(Exception):
async def _invoke( async def _invoke(
actor: 'Actor', actor: 'Actor',
cid: str, cid: str,
chan: Channel, chan: Channel,
@ -59,10 +61,12 @@ async def _invoke(
"""Invoke local func and deliver result(s) over provided channel. """Invoke local func and deliver result(s) over provided channel.
""" """
treat_as_gen = False treat_as_gen = False
cs = None
cancel_scope = trio.CancelScope() cancel_scope = trio.CancelScope()
ctx = Context(chan, cid, _cancel_scope=cancel_scope) cs: trio.CancelScope = None
context = False
ctx = Context(chan, cid)
context: bool = False
if getattr(func, '_tractor_stream_function', False): if getattr(func, '_tractor_stream_function', False):
# handle decorated ``@tractor.stream`` async functions # handle decorated ``@tractor.stream`` async functions
@ -150,14 +154,22 @@ async def _invoke(
# context func with support for bi-dir streaming # context func with support for bi-dir streaming
await chan.send({'functype': 'context', 'cid': cid}) await chan.send({'functype': 'context', 'cid': cid})
with cancel_scope as cs: async with trio.open_nursery() as scope_nursery:
ctx._scope_nursery = scope_nursery
cs = scope_nursery.cancel_scope
task_status.started(cs) task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid}) await chan.send({'return': await coro, 'cid': cid})
if cs.cancelled_caught: if cs.cancelled_caught:
if ctx._cancel_called:
msg = f'{func.__name__} cancelled itself',
else:
msg = f'{func.__name__} was remotely cancelled',
# task-contex was cancelled so relay to the cancel to caller # task-contex was cancelled so relay to the cancel to caller
raise ContextCancelled( raise ContextCancelled(
f'{func.__name__} cancelled itself', msg,
suberror_type=trio.Cancelled, suberror_type=trio.Cancelled,
) )
@ -170,20 +182,29 @@ async def _invoke(
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
# TODO: maybe we'll want differnet "levels" of debugging if not is_multi_cancelled(err):
# eventualy such as ('app', 'supervisory', 'runtime') ?
if not isinstance(err, trio.ClosedResourceError) and ( log.exception("Actor crashed:")
not is_multi_cancelled(err)) and (
not isinstance(err, ContextCancelled) # TODO: maybe we'll want different "levels" of debugging
): # eventualy such as ('app', 'supervisory', 'runtime') ?
# XXX: is there any case where we'll want to debug IPC
# disconnects? I can't think of a reason that inspecting # if not isinstance(err, trio.ClosedResourceError) and (
# this type of failure will be useful for respawns or # if not is_multi_cancelled(err) and (
# recovery logic - the only case is some kind of strange bug
# in `trio` itself? if not isinstance(err, ContextCancelled) or (
entered = await _debug._maybe_enter_pm(err) isinstance(err, ContextCancelled) and ctx._cancel_called
if not entered: ):
log.exception("Actor crashed:") # XXX: is there any case where we'll want to debug IPC
# disconnects? I can't think of a reason that inspecting
# this type of failure will be useful for respawns or
# recovery logic - the only case is some kind of strange bug
# in `trio` itself?
await _debug._maybe_enter_pm(err)
# entered = await _debug._maybe_enter_pm(err)
# if not entered:
# log.exception("Actor crashed:")
# always ship errors back to caller # always ship errors back to caller
err_msg = pack_error(err) err_msg = pack_error(err)
@ -192,8 +213,10 @@ async def _invoke(
await chan.send(err_msg) await chan.send(err_msg)
except trio.ClosedResourceError: except trio.ClosedResourceError:
log.warning( # if we can't propagate the error that's a big boo boo
f"Failed to ship error to caller @ {chan.uid}") log.error(
f"Failed to ship error to caller @ {chan.uid} !?"
)
if cs is None: if cs is None:
# error is from above code not from rpc invocation # error is from above code not from rpc invocation
@ -378,19 +401,34 @@ class Actor:
raise mne raise mne
async def _stream_handler( async def _stream_handler(
self, self,
stream: trio.SocketStream, stream: trio.SocketStream,
) -> None: ) -> None:
"""Entry point for new inbound connections to the channel server. """Entry point for new inbound connections to the channel server.
""" """
self._no_more_peers = trio.Event() # unset self._no_more_peers = trio.Event() # unset
chan = Channel(stream=stream) chan = Channel(stream=stream)
log.info(f"New connection to us {chan}") log.info(f"New connection to us {chan}")
# send/receive initial handshake response # send/receive initial handshake response
try: try:
uid = await self._do_handshake(chan) uid = await self._do_handshake(chan)
except StopAsyncIteration:
except (
trio.BrokenResourceError,
trio.ClosedResourceError,
TransportClosed,
):
# XXX: This may propagate up from ``Channel._aiter_recv()``
# and ``MsgpackStream._inter_packets()`` on a read from the
# stream particularly when the runtime is first starting up
# inside ``open_root_actor()`` where there is a check for
# a bound listener on the "arbiter" addr. the reset will be
# because the handshake was never meant took place.
log.warning(f"Channel {chan} failed to handshake") log.warning(f"Channel {chan} failed to handshake")
return return
@ -418,10 +456,24 @@ class Actor:
try: try:
await self._process_messages(chan) await self._process_messages(chan)
finally: finally:
# channel cleanup sequence
# for (channel, cid) in self._rpc_tasks.copy():
# if channel is chan:
# with trio.CancelScope(shield=True):
# await self._cancel_task(cid, channel)
# # close all consumer side task mem chans
# send_chan, _ = self._cids2qs[(chan.uid, cid)]
# assert send_chan.cid == cid # type: ignore
# await send_chan.aclose()
# Drop ref to channel so it can be gc-ed and disconnected # Drop ref to channel so it can be gc-ed and disconnected
log.debug(f"Releasing channel {chan} from {chan.uid}") log.debug(f"Releasing channel {chan} from {chan.uid}")
chans = self._peers.get(chan.uid) chans = self._peers.get(chan.uid)
chans.remove(chan) chans.remove(chan)
if not chans: if not chans:
log.debug(f"No more channels for {chan.uid}") log.debug(f"No more channels for {chan.uid}")
self._peers.pop(chan.uid, None) self._peers.pop(chan.uid, None)
@ -434,14 +486,22 @@ class Actor:
# # XXX: is this necessary (GC should do it?) # # XXX: is this necessary (GC should do it?)
if chan.connected(): if chan.connected():
# if the channel is still connected it may mean the far
# end has not closed and we may have gotten here due to
# an error and so we should at least try to terminate
# the channel from this end gracefully.
log.debug(f"Disconnecting channel {chan}") log.debug(f"Disconnecting channel {chan}")
try: try:
# send our msg loop terminate sentinel # send a msg loop terminate sentinel
await chan.send(None) await chan.send(None)
# XXX: do we want this?
# causes "[104] connection reset by peer" on other end
# await chan.aclose() # await chan.aclose()
except trio.BrokenResourceError: except trio.BrokenResourceError:
log.exception( log.warning(f"Channel for {chan.uid} was already closed")
f"Channel for {chan.uid} was already zonked..")
async def _push_result( async def _push_result(
self, self,
@ -451,18 +511,22 @@ class Actor:
) -> None: ) -> None:
"""Push an RPC result to the local consumer's queue. """Push an RPC result to the local consumer's queue.
""" """
actorid = chan.uid # actorid = chan.uid
assert actorid, f"`actorid` can't be {actorid}" assert chan.uid, f"`chan.uid` can't be {chan.uid}"
send_chan, recv_chan = self._cids2qs[(actorid, cid)] send_chan, recv_chan = self._cids2qs[(chan.uid, cid)]
assert send_chan.cid == cid # type: ignore assert send_chan.cid == cid # type: ignore
# if 'stop' in msg: if 'error' in msg:
ctx = getattr(recv_chan, '_ctx', None)
# if ctx:
# ctx._error_from_remote_msg(msg)
# log.debug(f"{send_chan} was terminated at remote end") # log.debug(f"{send_chan} was terminated at remote end")
# # indicate to consumer that far end has stopped # # indicate to consumer that far end has stopped
# return await send_chan.aclose() # return await send_chan.aclose()
try: try:
log.debug(f"Delivering {msg} from {actorid} to caller {cid}") log.debug(f"Delivering {msg} from {chan.uid} to caller {cid}")
# maintain backpressure # maintain backpressure
await send_chan.send(msg) await send_chan.send(msg)
@ -481,12 +545,14 @@ class Actor:
self, self,
actorid: Tuple[str, str], actorid: Tuple[str, str],
cid: str cid: str
) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]: ) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]:
log.debug(f"Getting result queue for {actorid} cid {cid}") log.debug(f"Getting result queue for {actorid} cid {cid}")
try: try:
send_chan, recv_chan = self._cids2qs[(actorid, cid)] send_chan, recv_chan = self._cids2qs[(actorid, cid)]
except KeyError: except KeyError:
send_chan, recv_chan = trio.open_memory_channel(1000) send_chan, recv_chan = trio.open_memory_channel(2*10)
send_chan.cid = cid # type: ignore send_chan.cid = cid # type: ignore
recv_chan.cid = cid # type: ignore recv_chan.cid = cid # type: ignore
self._cids2qs[(actorid, cid)] = send_chan, recv_chan self._cids2qs[(actorid, cid)] = send_chan, recv_chan
@ -544,9 +610,15 @@ class Actor:
if channel is chan: if channel is chan:
await self._cancel_task(cid, channel) await self._cancel_task(cid, channel)
# close all consumer side task mem chans
# send_chan, _ = self._cids2qs[(chan.uid, cid)]
# assert send_chan.cid == cid # type: ignore
# await send_chan.aclose()
log.debug( log.debug(
f"Msg loop signalled to terminate for" f"Msg loop signalled to terminate for"
f" {chan} from {chan.uid}") f" {chan} from {chan.uid}")
break break
log.trace( # type: ignore log.trace( # type: ignore
@ -641,22 +713,36 @@ class Actor:
) )
await self.cancel_rpc_tasks(chan) await self.cancel_rpc_tasks(chan)
except trio.ClosedResourceError: except (
log.error(f"{chan} form {chan.uid} broke") TransportClosed,
trio.BrokenResourceError,
trio.ClosedResourceError
):
# channels "breaking" is ok since we don't have a teardown
# handshake for them (yet) and instead we simply bail out
# of the message loop and expect the teardown sequence
# to clean up.
log.error(f"{chan} form {chan.uid} closed abruptly")
# raise
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
# ship any "internal" exception (i.e. one from internal machinery # ship any "internal" exception (i.e. one from internal machinery
# not from an rpc task) to parent # not from an rpc task) to parent
log.exception("Actor errored:") log.exception("Actor errored:")
if self._parent_chan: if self._parent_chan:
await self._parent_chan.send(pack_error(err)) await self._parent_chan.send(pack_error(err))
raise
# if this is the `MainProcess` we expect the error broadcasting # if this is the `MainProcess` we expect the error broadcasting
# above to trigger an error at consuming portal "checkpoints" # above to trigger an error at consuming portal "checkpoints"
raise
except trio.Cancelled: except trio.Cancelled:
# debugging only # debugging only
log.debug(f"Msg loop was cancelled for {chan}") log.debug(f"Msg loop was cancelled for {chan}")
raise raise
finally: finally:
# msg debugging for when he machinery is brokey
log.debug( log.debug(
f"Exiting msg loop for {chan} from {chan.uid} " f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}") f"with last msg:\n{msg}")
@ -709,8 +795,8 @@ class Actor:
# Disable sigint handling in children if NOT running in # Disable sigint handling in children if NOT running in
# debug mode; we shouldn't need it thanks to our # debug mode; we shouldn't need it thanks to our
# cancellation machinery. # cancellation machinery.
if 'debug_mode' not in rvs: # if 'debug_mode' not in rvs:
signal.signal(signal.SIGINT, signal.SIG_IGN) # signal.signal(signal.SIGINT, signal.SIG_IGN)
return chan, accept_addr return chan, accept_addr
@ -1089,6 +1175,7 @@ class Actor:
parlance. parlance.
""" """
await chan.send(self.uid) await chan.send(self.uid)
# breakpoint()
uid: Tuple[str, str] = tuple(await chan.recv()) uid: Tuple[str, str] = tuple(await chan.recv())
# if not isinstance(uid, tuple): # if not isinstance(uid, tuple):

View File

@ -41,6 +41,10 @@ class ContextCancelled(RemoteActorError):
"Inter-actor task context cancelled itself on the callee side." "Inter-actor task context cancelled itself on the callee side."
class TransportClosed(trio.ClosedResourceError):
"Underlying channel transport was closed prior to use"
class NoResult(RuntimeError): class NoResult(RuntimeError):
"No final result is expected for this actor" "No final result is expected for this actor"
@ -66,12 +70,15 @@ def pack_error(exc: BaseException) -> Dict[str, Any]:
def unpack_error( def unpack_error(
msg: Dict[str, Any], msg: Dict[str, Any],
chan=None, chan=None,
err_type=RemoteActorError err_type=RemoteActorError
) -> Exception: ) -> Exception:
"""Unpack an 'error' message from the wire """Unpack an 'error' message from the wire
into a local ``RemoteActorError``. into a local ``RemoteActorError``.
""" """
error = msg['error'] error = msg['error']

View File

@ -2,6 +2,7 @@
Inter-process comms abstractions Inter-process comms abstractions
""" """
from functools import partial from functools import partial
import math
import struct import struct
import typing import typing
from typing import Any, Tuple, Optional from typing import Any, Tuple, Optional
@ -13,6 +14,7 @@ import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
from .log import get_logger from .log import get_logger
from ._exceptions import TransportClosed
log = get_logger(__name__) log = get_logger(__name__)
# :eyeroll: # :eyeroll:
@ -24,7 +26,7 @@ except ImportError:
Unpacker = partial(msgpack.Unpacker, strict_map_key=False) Unpacker = partial(msgpack.Unpacker, strict_map_key=False)
class MsgpackStream: class MsgpackTCPStream:
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
using ``msgpack-python``. using ``msgpack-python``.
@ -47,7 +49,10 @@ class MsgpackStream:
assert isinstance(rsockname, tuple) assert isinstance(rsockname, tuple)
self._raddr = rsockname[:2] self._raddr = rsockname[:2]
# start and seed first entry to read loop
self._agen = self._iter_packets() self._agen = self._iter_packets()
# self._agen.asend(None) is None
self._send_lock = trio.StrictFIFOLock() self._send_lock = trio.StrictFIFOLock()
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
@ -58,16 +63,13 @@ class MsgpackStream:
use_list=False, use_list=False,
) )
while True: while True:
try: data = await self.stream.receive_some(2**10)
data = await self.stream.receive_some(2**10) log.trace(f"received {data}") # type: ignore
log.trace(f"received {data}") # type: ignore
except trio.BrokenResourceError:
log.warning(f"Stream connection {self.raddr} broke")
return
if data == b'': if data == b'':
log.debug(f"Stream connection {self.raddr} was closed") raise TransportClosed(
return f'transport {self} was already closed prior ro read'
)
unpacker.feed(data) unpacker.feed(data)
for packet in unpacker: for packet in unpacker:
@ -98,7 +100,7 @@ class MsgpackStream:
return self.stream.socket.fileno() != -1 return self.stream.socket.fileno() != -1
class MsgspecStream(MsgpackStream): class MsgspecTCPStream(MsgpackTCPStream):
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
using ``msgspec``. using ``msgspec``.
@ -123,24 +125,22 @@ class MsgspecStream(MsgpackStream):
while True: while True:
try: try:
header = await self.recv_stream.receive_exactly(4) header = await self.recv_stream.receive_exactly(4)
if header is None:
continue
if header == b'': except (ValueError):
log.debug(f"Stream connection {self.raddr} was closed") raise TransportClosed(
return f'transport {self} was already closed prior ro read'
)
size, = struct.unpack("<I", header) if header == b'':
raise TransportClosed(
f'transport {self} was already closed prior ro read'
)
log.trace(f'received header {size}') size, = struct.unpack("<I", header)
msg_bytes = await self.recv_stream.receive_exactly(size) log.trace(f'received header {size}')
# the value error here is to catch a connect with immediate msg_bytes = await self.recv_stream.receive_exactly(size)
# disconnect that will cause an EOF error inside `tricycle`.
except (ValueError, trio.BrokenResourceError):
log.warning(f"Stream connection {self.raddr} broke")
return
log.trace(f"received {msg_bytes}") # type: ignore log.trace(f"received {msg_bytes}") # type: ignore
yield decoder.decode(msg_bytes) yield decoder.decode(msg_bytes)
@ -169,8 +169,9 @@ class Channel:
on_reconnect: typing.Callable[..., typing.Awaitable] = None, on_reconnect: typing.Callable[..., typing.Awaitable] = None,
auto_reconnect: bool = False, auto_reconnect: bool = False,
stream: trio.SocketStream = None, # expected to be active stream: trio.SocketStream = None, # expected to be active
# stream_serializer: type = MsgpackStream,
stream_serializer_type: type = MsgspecStream, # stream_serializer_type: type = MsgspecTCPStream,
stream_serializer_type: type = MsgpackTCPStream,
) -> None: ) -> None:
@ -192,6 +193,8 @@ class Channel:
self._exc: Optional[Exception] = None self._exc: Optional[Exception] = None
self._agen = self._aiter_recv() self._agen = self._aiter_recv()
self._closed: bool = False
def __repr__(self) -> str: def __repr__(self) -> str:
if self.msgstream: if self.msgstream:
return repr( return repr(
@ -208,35 +211,52 @@ class Channel:
return self.msgstream.raddr if self.msgstream else None return self.msgstream.raddr if self.msgstream else None
async def connect( async def connect(
self, destaddr: Tuple[Any, ...] = None, self,
destaddr: Tuple[Any, ...] = None,
**kwargs **kwargs
) -> trio.SocketStream: ) -> trio.SocketStream:
if self.connected(): if self.connected():
raise RuntimeError("channel is already connected?") raise RuntimeError("channel is already connected?")
destaddr = destaddr or self._destaddr destaddr = destaddr or self._destaddr
assert isinstance(destaddr, tuple) assert isinstance(destaddr, tuple)
stream = await trio.open_tcp_stream(*destaddr, **kwargs)
stream = await trio.open_tcp_stream(
*destaddr,
happy_eyeballs_delay=math.inf,
**kwargs
)
self.msgstream = self.stream_serializer_type(stream) self.msgstream = self.stream_serializer_type(stream)
return stream return stream
async def send(self, item: Any) -> None: async def send(self, item: Any) -> None:
log.trace(f"send `{item}`") # type: ignore log.trace(f"send `{item}`") # type: ignore
assert self.msgstream assert self.msgstream
await self.msgstream.send(item) await self.msgstream.send(item)
async def recv(self) -> Any: async def recv(self) -> Any:
assert self.msgstream assert self.msgstream
try: try:
return await self.msgstream.recv() return await self.msgstream.recv()
except trio.BrokenResourceError: except trio.BrokenResourceError:
if self._autorecon: if self._autorecon:
await self._reconnect() await self._reconnect()
return await self.recv() return await self.recv()
raise
async def aclose(self) -> None: async def aclose(self) -> None:
log.debug(f"Closing {self}") log.debug(f"Closing {self}")
assert self.msgstream assert self.msgstream
await self.msgstream.stream.aclose() await self.msgstream.stream.aclose()
self._closed = True
log.error(f'CLOSING CHAN {self}')
async def __aenter__(self): async def __aenter__(self):
await self.connect() await self.connect()

View File

@ -177,6 +177,7 @@ class Portal:
f"Cancelling all streams with {self.channel.uid}") f"Cancelling all streams with {self.channel.uid}")
for stream in self._streams.copy(): for stream in self._streams.copy():
try: try:
# with trio.CancelScope(shield=True):
await stream.aclose() await stream.aclose()
except trio.ClosedResourceError: except trio.ClosedResourceError:
# don't error the stream having already been closed # don't error the stream having already been closed
@ -332,12 +333,6 @@ class Portal:
# message right now since there shouldn't be a reason to # message right now since there shouldn't be a reason to
# stop and restart the stream, right? # stop and restart the stream, right?
try: try:
# We are for sure done with this stream and no more
# messages are expected to be delivered from the
# runtime's msg loop.
await recv_chan.aclose()
await ctx.cancel() await ctx.cancel()
except trio.ClosedResourceError: except trio.ClosedResourceError:
@ -375,64 +370,78 @@ class Portal:
recv_chan: Optional[trio.MemoryReceiveChannel] = None recv_chan: Optional[trio.MemoryReceiveChannel] = None
cid, recv_chan, functype, first_msg = await self._submit(
fn_mod_path, fn_name, kwargs)
assert functype == 'context'
msg = await recv_chan.receive()
try: try:
cid, recv_chan, functype, first_msg = await self._submit( # the "first" value here is delivered by the callee's
fn_mod_path, fn_name, kwargs) # ``Context.started()`` call.
first = msg['started']
assert functype == 'context' except KeyError:
msg = await recv_chan.receive() assert msg.get('cid'), ("Received internal error at context?")
try: if msg.get('error'):
# the "first" value here is delivered by the callee's # raise the error message
# ``Context.started()`` call. raise unpack_error(msg, self.channel)
first = msg['started'] else:
except KeyError:
assert msg.get('cid'), ("Received internal error at context?")
if msg.get('error'):
# raise the error message
raise unpack_error(msg, self.channel)
else:
raise
# deliver context instance and .started() msg value in open
# tuple.
ctx = Context(
self.channel,
cid,
_portal=self,
_recv_chan=recv_chan,
)
try:
yield ctx, first
if cancel_on_exit:
await ctx.cancel()
else:
if not ctx._cancel_called:
await ctx.result()
except ContextCancelled:
# if the context was cancelled by client code
# then we don't need to raise since user code
# is expecting this.
if not ctx._cancel_called:
raise
except BaseException:
# the context cancels itself on any deviation
await ctx.cancel()
raise raise
finally: # deliver context instance and .started() msg value in open
log.info(f'Context for {func.__name__} completed') # tuple.
try:
async with trio.open_nursery() as scope_nursery:
ctx = Context(
self.channel,
cid,
_portal=self,
_recv_chan=recv_chan,
_scope_nursery=scope_nursery,
)
recv_chan._ctx = ctx
yield ctx, first
log.info(f'Context for {func.__name__} completed')
if cancel_on_exit:
await ctx.cancel()
else:
if not ctx._cancel_called:
await ctx.result()
await recv_chan.aclose()
# except TypeError:
# # if fn_name == '_emsd_main':
# import tractor
# await tractor.breakpoint()
except ContextCancelled:
if not ctx._cancel_called:
raise
# if the context was cancelled by client code
# then we don't need to raise since user code
# is expecting this and the block should exit.
else:
log.debug(f'Context {ctx} cancelled gracefully')
except trio.Cancelled:
# the context cancels itself on any deviation
await ctx.cancel()
raise
# finally:
# log.info(f'Context for {func.__name__} completed')
# finally:
# if recv_chan is not None:
finally:
if recv_chan is not None:
await recv_chan.aclose()
@dataclass @dataclass
class LocalPortal: class LocalPortal:
@ -470,6 +479,7 @@ async def open_portal(
was_connected = False was_connected = False
async with maybe_open_nursery(nursery, shield=shield) as nursery: async with maybe_open_nursery(nursery, shield=shield) as nursery:
if not channel.connected(): if not channel.connected():
await channel.connect() await channel.connect()
was_connected = True was_connected = True
@ -491,11 +501,12 @@ async def open_portal(
portal = Portal(channel) portal = Portal(channel)
try: try:
yield portal yield portal
finally: finally:
await portal.aclose() await portal.aclose()
if was_connected: if was_connected:
# cancel remote channel-msg loop # gracefully signal remote channel-msg loop
await channel.send(None) await channel.send(None)
# cancel background msg loop task # cancel background msg loop task

View File

@ -147,7 +147,7 @@ async def cancel_on_completion(
) )
else: else:
log.info( log.runtime(
f"Cancelling {portal.channel.uid} gracefully " f"Cancelling {portal.channel.uid} gracefully "
f"after result {result}") f"after result {result}")
@ -263,7 +263,7 @@ async def new_proc(
parent_addr, parent_addr,
infect_asyncio=infect_asyncio infect_asyncio=infect_asyncio
) as proc: ) as proc:
log.info(f"Started {proc}") log.runtime(f"Started {proc}")
# wait for actor to spawn and connect back to us # wait for actor to spawn and connect back to us
# channel should have handshake completed by the # channel should have handshake completed by the
@ -416,7 +416,7 @@ async def mp_new_proc(
if not proc.is_alive(): if not proc.is_alive():
raise ActorFailure("Couldn't start sub-actor?") raise ActorFailure("Couldn't start sub-actor?")
log.info(f"Started {proc}") log.runtime(f"Started {proc}")
try: try:
# wait for actor to spawn and connect back to us # wait for actor to spawn and connect back to us

View File

@ -7,7 +7,7 @@ from contextlib import contextmanager, asynccontextmanager
from dataclasses import dataclass from dataclasses import dataclass
from typing import ( from typing import (
Any, Iterator, Optional, Callable, Any, Iterator, Optional, Callable,
AsyncGenerator, AsyncGenerator, Dict,
) )
import warnings import warnings
@ -61,11 +61,15 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
return msg['yield'] return msg['yield']
async def receive(self): async def receive(self):
# see ``.aclose()`` for an alt to always checking this # see ``.aclose()`` for notes on the old behaviour prior to
# introducing this
if self._eoc: if self._eoc:
raise trio.EndOfChannel raise trio.EndOfChannel
try: try:
# if self._ctx.chan.uid[0] == 'brokerd.ib':
# breakpoint()
msg = await self._rx_chan.receive() msg = await self._rx_chan.receive()
return msg['yield'] return msg['yield']
@ -81,12 +85,13 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
if msg.get('stop'): if msg.get('stop'):
log.debug(f"{self} was stopped at remote end") log.debug(f"{self} was stopped at remote end")
# when the send is closed we assume the stream has # # when the send is closed we assume the stream has
# terminated and signal this local iterator to stop # # terminated and signal this local iterator to stop
await self.aclose() # await self.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to # XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration``. # raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
raise trio.EndOfChannel raise trio.EndOfChannel
# TODO: test that shows stream raising an expected error!!! # TODO: test that shows stream raising an expected error!!!
@ -97,44 +102,50 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
else: else:
raise raise
except trio.ClosedResourceError: except (
# XXX: this indicates that a `stop` message was trio.ClosedResourceError, # by self._rx_chan
# sent by the far side of the underlying channel. trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
# Currently this is triggered by calling ``.aclose()`` on trio.Cancelled, # by local cancellation
):
# XXX: we close the stream on any of these error conditions:
# a ``ClosedResourceError`` indicates that the internal
# feeder memory receive channel was closed likely by the
# runtime after the associated transport-channel
# disconnected or broke.
# an ``EndOfChannel`` indicates either the internal recv
# memchan exhausted **or** we raisesd it just above after
# receiving a `stop` message from the far end of the stream.
# Previously this was triggered by calling ``.aclose()`` on
# the send side of the channel inside # the send side of the channel inside
# ``Actor._push_result()``, but maybe it should be put here? # ``Actor._push_result()`` (should still be commented code
# to avoid exposing the internal mem chan closing mechanism? # there - which should eventually get removed), but now the
# in theory we could instead do some flushing of the channel # 'stop' message handling has been put just above.
# if needed to ensure all consumers are complete before
# triggering closure too early?
# Locally, we want to close this stream gracefully, by # TODO: Locally, we want to close this stream gracefully, by
# terminating any local consumers tasks deterministically. # terminating any local consumers tasks deterministically.
# We **don't** want to be closing this send channel and not # One we have broadcast support, we **don't** want to be
# relaying a final value to remaining consumers who may not # closing this stream and not flushing a final value to
# have been scheduled to receive it yet? # remaining (clone) consumers who may not have been
# scheduled to receive it yet.
# lots of testing to do here
# when the send is closed we assume the stream has # when the send is closed we assume the stream has
# terminated and signal this local iterator to stop # terminated and signal this local iterator to stop
await self.aclose() await self.aclose()
raise trio.EndOfChannel raise # propagate
# if not isinstance(self, MsgStream): # except trio.Cancelled:
# # XXX: this was how we handled this originally for the # if not self._shielded:
# # single direction case? # # if shielded we don't propagate a cancelled
# raise trio.EndOfChannel # raise
# else: # except trio.Cancelled:
# # in 2-way case raise the closed error # # relay cancels to the remote task
# raise # propagate # await self.aclose()
# raise
except trio.Cancelled:
# relay cancels to the remote task
await self.aclose()
raise
@contextmanager @contextmanager
def shield( def shield(
@ -158,13 +169,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
on close. on close.
""" """
self._eoc = True
# XXX: keep proper adherance to trio's `.aclose()` semantics: # XXX: keep proper adherance to trio's `.aclose()` semantics:
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
rx_chan = self._rx_chan rx_chan = self._rx_chan
if rx_chan._closed: if rx_chan._closed: # or self._eoc:
log.warning(f"{self} is already closed") log.warning(f"{self} is already closed")
# this stream has already been closed so silently succeed as # this stream has already been closed so silently succeed as
@ -184,6 +193,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
log.warning(f"{self} is shielded, portal channel being kept alive") log.warning(f"{self} is shielded, portal channel being kept alive")
return return
# XXX: This must be set **AFTER** the shielded test above!
self._eoc = True
# NOTE: this is super subtle IPC messaging stuff: # NOTE: this is super subtle IPC messaging stuff:
# Relay stop iteration to far end **iff** we're # Relay stop iteration to far end **iff** we're
# in bidirectional mode. If we're only streaming # in bidirectional mode. If we're only streaming
@ -195,59 +207,63 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# isn't expecting messages to be sent by the caller. # isn't expecting messages to be sent by the caller.
# Thus, we must check that this context DOES NOT # Thus, we must check that this context DOES NOT
# have a portal reference to ensure this is indeed the callee # have a portal reference to ensure this is indeed the callee
# side and can relay a 'stop'. In the bidirectional case, # side and can relay a 'stop'.
# `Context.open_stream()` will create the `Actor._cids2qs`
# entry from a call to `Actor.get_memchans()`. # In the bidirectional case, `Context.open_stream()` will create
# the `Actor._cids2qs` entry from a call to
# `Actor.get_memchans()` and will send the stop message in
# ``__aexit__()`` on teardown so it **does not** need to be
# called here.
if not self._ctx._portal: if not self._ctx._portal:
try: try:
# only for 2 way streams can we can send # only for 2 way streams can we can send
# stop from the caller side # stop from the caller side
await self._ctx.send_stop() await self._ctx.send_stop()
except trio.BrokenResourceError: except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
# the underlying channel may already have been pulled # the underlying channel may already have been pulled
# in which case our stop message is meaningless since # in which case our stop message is meaningless since
# it can't traverse the transport. # it can't traverse the transport.
log.debug(f'Channel for {self} was already closed') log.debug(f'Channel for {self} was already closed')
# close the local mem chan??!? # close the local mem chan ``self._rx_chan`` ??!?
# NOT if we're a ``MsgStream``! # DEFINITELY NOT if we're a bi-dir ``MsgStream``!
# BECAUSE this same core-msg-loop mem recv-chan is used to deliver # BECAUSE this same core-msg-loop mem recv-chan is used to deliver
# the potential final result from the surrounding inter-actor # the potential final result from the surrounding inter-actor
# `Context` so we don't want to close it until that context has # `Context` so we don't want to close it until that context has
# run to completion. # run to completion.
# XXX: Notes on old behaviour. # XXX: Notes on old behaviour:
# await rx_chan.aclose()
# In the receive-only case, ``Portal.open_stream_from()`` should # In the receive-only case, ``Portal.open_stream_from()`` used
# call this explicitly on teardown but additionally if for some # to rely on this call explicitly on teardown such that a new
# reason stream consumer code tries to manually receive a new # call to ``.receive()`` after ``rx_chan`` had been closed, would
# result in us raising a ``trio.EndOfChannel`` (since we
# remapped the ``trio.ClosedResourceError`). However, now if for some
# reason the stream's consumer code tries to manually receive a new
# value before ``.aclose()`` is called **but** the far end has # value before ``.aclose()`` is called **but** the far end has
# stopped `.receive()` **must** raise ``trio.EndofChannel`` in # stopped `.receive()` **must** raise ``trio.EndofChannel`` in
# order to avoid an infinite hang on ``.__anext__()``. So we can # order to avoid an infinite hang on ``.__anext__()``; this is
# instead uncomment this check and close the underlying msg-loop # why we added ``self._eoc`` to denote stream closure indepedent
# mem chan below and not then **not** check for ``self._eoc`` in # of ``rx_chan``.
# ``.receive()`` (if for some reason we think that check is
# a bottle neck - not likely) such that the
# ``trio.ClosedResourceError`` would instead trigger the
# ``trio.EndOfChannel`` in ``.receive()`` (as it originally was
# before bi-dir streaming support).
# if not isinstance(self, MsgStream): # In theory we could still use this old method and close the
# await rx_chan.aclose() # underlying msg-loop mem chan as above and then **not** check
# for ``self._eoc`` in ``.receive()`` (if for some reason we
# TODO: but make it broadcasting to consumers # think that check is a bottle neck - not likely) **but** then
# def clone(self): # we would need to map the resulting
# """Clone this receive channel allowing for multi-task # ``trio.ClosedResourceError`` to a ``trio.EndOfChannel`` in
# consumption from the same channel. # ``.receive()`` (as it originally was before bi-dir streaming
# support) in order to trigger stream closure. The old behaviour
# """ # is arguably more confusing since we lose detection of the
# return ReceiveStream( # runtime's closure of ``rx_chan`` in the case where we may
# self._cid, # still need to consume msgs that are "in transit" from the far
# self._rx_chan.clone(), # end (eg. for ``Context.result()``).
# self._portal,
# )
class MsgStream(ReceiveMsgStream, trio.abc.Channel): class MsgStream(ReceiveMsgStream, trio.abc.Channel):
@ -260,8 +276,22 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel):
self, self,
data: Any data: Any
) -> None: ) -> None:
'''Send a message over this stream to the far end.
'''
await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})
# TODO: but make it broadcasting to consumers
def clone(self):
"""Clone this receive channel allowing for multi-task
consumption from the same channel.
"""
return MsgStream(
self._ctx,
self._rx_chan.clone(),
)
@dataclass @dataclass
class Context: class Context:
@ -288,7 +318,7 @@ class Context:
_cancel_called: bool = False _cancel_called: bool = False
# only set on the callee side # only set on the callee side
_cancel_scope: Optional[trio.CancelScope] = None _scope_nursery: Optional[trio.Nursery] = None
async def send_yield(self, data: Any) -> None: async def send_yield(self, data: Any) -> None:
@ -303,6 +333,16 @@ class Context:
async def send_stop(self) -> None: async def send_stop(self) -> None:
await self.chan.send({'stop': True, 'cid': self.cid}) await self.chan.send({'stop': True, 'cid': self.cid})
def _error_from_remote_msg(
self,
msg: Dict[str, Any],
) -> None:
async def raiser():
raise unpack_error(msg, self.chan)
self._scope_nursery.start_soon(raiser)
async def cancel(self) -> None: async def cancel(self) -> None:
'''Cancel this inter-actor-task context. '''Cancel this inter-actor-task context.
@ -341,13 +381,16 @@ class Context:
f"{cid} for {self._portal.channel.uid}") f"{cid} for {self._portal.channel.uid}")
else: else:
# ensure callee side # ensure callee side
assert self._cancel_scope assert self._scope_nursery
# TODO: should we have an explicit cancel message # TODO: should we have an explicit cancel message
# or is relaying the local `trio.Cancelled` as an # or is relaying the local `trio.Cancelled` as an
# {'error': trio.Cancelled, cid: "blah"} enough? # {'error': trio.Cancelled, cid: "blah"} enough?
# This probably gets into the discussion in # This probably gets into the discussion in
# https://github.com/goodboy/tractor/issues/36 # https://github.com/goodboy/tractor/issues/36
self._cancel_scope.cancel() self._scope_nursery.cancel_scope.cancel()
if self._recv_chan:
await self._recv_chan.aclose()
@asynccontextmanager @asynccontextmanager
async def open_stream( async def open_stream(
@ -418,7 +461,8 @@ class Context:
yield rchan yield rchan
except trio.EndOfChannel: except trio.EndOfChannel:
# stream iteration stop signal # likely the far end sent us a 'stop' message to
# terminate the stream.
raise raise
else: else:

View File

@ -30,16 +30,17 @@ DATE_FORMAT = '%b %d %H:%M:%S'
LEVELS = { LEVELS = {
'GARBAGE': 1, 'GARBAGE': 1,
'TRACE': 5, 'TRACE': 5,
'PROFILE': 15, 'RUNTIME': 15,
'RUNTIME': 500, 'PDB': 500,
'QUIET': 1000, 'QUIET': 1000,
} }
STD_PALETTE = { STD_PALETTE = {
'CRITICAL': 'red', 'CRITICAL': 'red',
'ERROR': 'red', 'ERROR': 'red',
'RUNTIME': 'white', 'PDB': 'white',
'WARNING': 'yellow', 'WARNING': 'yellow',
'INFO': 'green', 'INFO': 'green',
'RUNTIME': 'white',
'DEBUG': 'white', 'DEBUG': 'white',
'TRACE': 'cyan', 'TRACE': 'cyan',
'GARBAGE': 'blue', 'GARBAGE': 'blue',

View File

@ -101,14 +101,16 @@ def _run_asyncio_task(
"""Cancel the calling ``trio`` task on error. """Cancel the calling ``trio`` task on error.
""" """
nonlocal aio_err nonlocal aio_err
aio_err = task.exception() try:
aio_err = task.exception()
except asyncio.CancelledError as cerr:
aio_err = cerr
if aio_err: if aio_err:
log.exception(f"asyncio task errorred:\n{aio_err}") log.exception(f"asyncio task errorred:\n{aio_err}")
# cancel_scope.cancel() # cancel_scope.cancel()
from_aio._err = aio_err from_aio._err = aio_err
to_trio.close()
task.add_done_callback(cancel_trio) task.add_done_callback(cancel_trio)
@ -233,17 +235,25 @@ async def run_task(
# raise aio_err # raise aio_err
# Do we need this? # Do we need this?
except BaseException as err: except Exception as err:
# await tractor.breakpoint() # await tractor.breakpoint()
aio_err = from_aio._err aio_err = from_aio._err
# try:
if aio_err is not None: if aio_err is not None:
# always raise from any captured asyncio error # always raise from any captured asyncio error
raise err from aio_err raise err from aio_err
else: else:
raise raise
# finally:
# if not task.done():
# task.cancel()
finally: except trio.Cancelled:
task.cancel() if not task.done():
task.cancel()
raise
# async def stream_from_task # async def stream_from_task