Compare commits

..

No commits in common. "e2e572ef51078340ce840c7b1c7b107a308617b2" and "86aaec696fd497d9ab821bd534d8cbba1a43dcc4" have entirely different histories.

8 changed files with 199 additions and 379 deletions

View File

@ -12,7 +12,7 @@ import uuid
import typing import typing
from typing import Dict, List, Tuple, Any, Optional, Union from typing import Dict, List, Tuple, Any, Optional, Union
from types import ModuleType from types import ModuleType
# import signal import signal
import sys import sys
import os import os
from contextlib import ExitStack from contextlib import ExitStack
@ -31,7 +31,6 @@ from ._exceptions import (
ModuleNotExposed, ModuleNotExposed,
is_multi_cancelled, is_multi_cancelled,
ContextCancelled, ContextCancelled,
TransportClosed,
) )
from . import _debug from . import _debug
from ._discovery import get_arbiter from ._discovery import get_arbiter
@ -48,7 +47,6 @@ class ActorFailure(Exception):
async def _invoke( async def _invoke(
actor: 'Actor', actor: 'Actor',
cid: str, cid: str,
chan: Channel, chan: Channel,
@ -61,12 +59,10 @@ async def _invoke(
"""Invoke local func and deliver result(s) over provided channel. """Invoke local func and deliver result(s) over provided channel.
""" """
treat_as_gen = False treat_as_gen = False
cs = None
cancel_scope = trio.CancelScope() cancel_scope = trio.CancelScope()
cs: trio.CancelScope = None ctx = Context(chan, cid, _cancel_scope=cancel_scope)
context = False
ctx = Context(chan, cid)
context: bool = False
if getattr(func, '_tractor_stream_function', False): if getattr(func, '_tractor_stream_function', False):
# handle decorated ``@tractor.stream`` async functions # handle decorated ``@tractor.stream`` async functions
@ -154,22 +150,14 @@ async def _invoke(
# context func with support for bi-dir streaming # context func with support for bi-dir streaming
await chan.send({'functype': 'context', 'cid': cid}) await chan.send({'functype': 'context', 'cid': cid})
async with trio.open_nursery() as scope_nursery: with cancel_scope as cs:
ctx._scope_nursery = scope_nursery
cs = scope_nursery.cancel_scope
task_status.started(cs) task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid}) await chan.send({'return': await coro, 'cid': cid})
if cs.cancelled_caught: if cs.cancelled_caught:
if ctx._cancel_called:
msg = f'{func.__name__} cancelled itself',
else:
msg = f'{func.__name__} was remotely cancelled',
# task-contex was cancelled so relay to the cancel to caller # task-contex was cancelled so relay to the cancel to caller
raise ContextCancelled( raise ContextCancelled(
msg, f'{func.__name__} cancelled itself',
suberror_type=trio.Cancelled, suberror_type=trio.Cancelled,
) )
@ -182,29 +170,20 @@ async def _invoke(
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
if not is_multi_cancelled(err): # TODO: maybe we'll want differnet "levels" of debugging
log.exception("Actor crashed:")
# TODO: maybe we'll want different "levels" of debugging
# eventualy such as ('app', 'supervisory', 'runtime') ? # eventualy such as ('app', 'supervisory', 'runtime') ?
if not isinstance(err, trio.ClosedResourceError) and (
# if not isinstance(err, trio.ClosedResourceError) and ( not is_multi_cancelled(err)) and (
# if not is_multi_cancelled(err) and ( not isinstance(err, ContextCancelled)
if not isinstance(err, ContextCancelled) or (
isinstance(err, ContextCancelled) and ctx._cancel_called
): ):
# XXX: is there any case where we'll want to debug IPC # XXX: is there any case where we'll want to debug IPC
# disconnects? I can't think of a reason that inspecting # disconnects? I can't think of a reason that inspecting
# this type of failure will be useful for respawns or # this type of failure will be useful for respawns or
# recovery logic - the only case is some kind of strange bug # recovery logic - the only case is some kind of strange bug
# in `trio` itself? # in `trio` itself?
await _debug._maybe_enter_pm(err) entered = await _debug._maybe_enter_pm(err)
if not entered:
# entered = await _debug._maybe_enter_pm(err) log.exception("Actor crashed:")
# if not entered:
# log.exception("Actor crashed:")
# always ship errors back to caller # always ship errors back to caller
err_msg = pack_error(err) err_msg = pack_error(err)
@ -213,10 +192,8 @@ async def _invoke(
await chan.send(err_msg) await chan.send(err_msg)
except trio.ClosedResourceError: except trio.ClosedResourceError:
# if we can't propagate the error that's a big boo boo log.warning(
log.error( f"Failed to ship error to caller @ {chan.uid}")
f"Failed to ship error to caller @ {chan.uid} !?"
)
if cs is None: if cs is None:
# error is from above code not from rpc invocation # error is from above code not from rpc invocation
@ -401,34 +378,19 @@ class Actor:
raise mne raise mne
async def _stream_handler( async def _stream_handler(
self, self,
stream: trio.SocketStream, stream: trio.SocketStream,
) -> None: ) -> None:
"""Entry point for new inbound connections to the channel server. """Entry point for new inbound connections to the channel server.
""" """
self._no_more_peers = trio.Event() # unset self._no_more_peers = trio.Event() # unset
chan = Channel(stream=stream) chan = Channel(stream=stream)
log.info(f"New connection to us {chan}") log.info(f"New connection to us {chan}")
# send/receive initial handshake response # send/receive initial handshake response
try: try:
uid = await self._do_handshake(chan) uid = await self._do_handshake(chan)
except StopAsyncIteration:
except (
trio.BrokenResourceError,
trio.ClosedResourceError,
TransportClosed,
):
# XXX: This may propagate up from ``Channel._aiter_recv()``
# and ``MsgpackStream._inter_packets()`` on a read from the
# stream particularly when the runtime is first starting up
# inside ``open_root_actor()`` where there is a check for
# a bound listener on the "arbiter" addr. the reset will be
# because the handshake was never meant took place.
log.warning(f"Channel {chan} failed to handshake") log.warning(f"Channel {chan} failed to handshake")
return return
@ -456,24 +418,10 @@ class Actor:
try: try:
await self._process_messages(chan) await self._process_messages(chan)
finally: finally:
# channel cleanup sequence
# for (channel, cid) in self._rpc_tasks.copy():
# if channel is chan:
# with trio.CancelScope(shield=True):
# await self._cancel_task(cid, channel)
# # close all consumer side task mem chans
# send_chan, _ = self._cids2qs[(chan.uid, cid)]
# assert send_chan.cid == cid # type: ignore
# await send_chan.aclose()
# Drop ref to channel so it can be gc-ed and disconnected # Drop ref to channel so it can be gc-ed and disconnected
log.debug(f"Releasing channel {chan} from {chan.uid}") log.debug(f"Releasing channel {chan} from {chan.uid}")
chans = self._peers.get(chan.uid) chans = self._peers.get(chan.uid)
chans.remove(chan) chans.remove(chan)
if not chans: if not chans:
log.debug(f"No more channels for {chan.uid}") log.debug(f"No more channels for {chan.uid}")
self._peers.pop(chan.uid, None) self._peers.pop(chan.uid, None)
@ -486,22 +434,14 @@ class Actor:
# # XXX: is this necessary (GC should do it?) # # XXX: is this necessary (GC should do it?)
if chan.connected(): if chan.connected():
# if the channel is still connected it may mean the far
# end has not closed and we may have gotten here due to
# an error and so we should at least try to terminate
# the channel from this end gracefully.
log.debug(f"Disconnecting channel {chan}") log.debug(f"Disconnecting channel {chan}")
try: try:
# send a msg loop terminate sentinel # send our msg loop terminate sentinel
await chan.send(None) await chan.send(None)
# XXX: do we want this?
# causes "[104] connection reset by peer" on other end
# await chan.aclose() # await chan.aclose()
except trio.BrokenResourceError: except trio.BrokenResourceError:
log.warning(f"Channel for {chan.uid} was already closed") log.exception(
f"Channel for {chan.uid} was already zonked..")
async def _push_result( async def _push_result(
self, self,
@ -511,22 +451,18 @@ class Actor:
) -> None: ) -> None:
"""Push an RPC result to the local consumer's queue. """Push an RPC result to the local consumer's queue.
""" """
# actorid = chan.uid actorid = chan.uid
assert chan.uid, f"`chan.uid` can't be {chan.uid}" assert actorid, f"`actorid` can't be {actorid}"
send_chan, recv_chan = self._cids2qs[(chan.uid, cid)] send_chan, recv_chan = self._cids2qs[(actorid, cid)]
assert send_chan.cid == cid # type: ignore assert send_chan.cid == cid # type: ignore
if 'error' in msg: # if 'stop' in msg:
ctx = getattr(recv_chan, '_ctx', None)
# if ctx:
# ctx._error_from_remote_msg(msg)
# log.debug(f"{send_chan} was terminated at remote end") # log.debug(f"{send_chan} was terminated at remote end")
# # indicate to consumer that far end has stopped # # indicate to consumer that far end has stopped
# return await send_chan.aclose() # return await send_chan.aclose()
try: try:
log.debug(f"Delivering {msg} from {chan.uid} to caller {cid}") log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
# maintain backpressure # maintain backpressure
await send_chan.send(msg) await send_chan.send(msg)
@ -545,14 +481,12 @@ class Actor:
self, self,
actorid: Tuple[str, str], actorid: Tuple[str, str],
cid: str cid: str
) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]: ) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]:
log.debug(f"Getting result queue for {actorid} cid {cid}") log.debug(f"Getting result queue for {actorid} cid {cid}")
try: try:
send_chan, recv_chan = self._cids2qs[(actorid, cid)] send_chan, recv_chan = self._cids2qs[(actorid, cid)]
except KeyError: except KeyError:
send_chan, recv_chan = trio.open_memory_channel(2*10) send_chan, recv_chan = trio.open_memory_channel(1000)
send_chan.cid = cid # type: ignore send_chan.cid = cid # type: ignore
recv_chan.cid = cid # type: ignore recv_chan.cid = cid # type: ignore
self._cids2qs[(actorid, cid)] = send_chan, recv_chan self._cids2qs[(actorid, cid)] = send_chan, recv_chan
@ -610,15 +544,9 @@ class Actor:
if channel is chan: if channel is chan:
await self._cancel_task(cid, channel) await self._cancel_task(cid, channel)
# close all consumer side task mem chans
# send_chan, _ = self._cids2qs[(chan.uid, cid)]
# assert send_chan.cid == cid # type: ignore
# await send_chan.aclose()
log.debug( log.debug(
f"Msg loop signalled to terminate for" f"Msg loop signalled to terminate for"
f" {chan} from {chan.uid}") f" {chan} from {chan.uid}")
break break
log.trace( # type: ignore log.trace( # type: ignore
@ -713,36 +641,22 @@ class Actor:
) )
await self.cancel_rpc_tasks(chan) await self.cancel_rpc_tasks(chan)
except ( except trio.ClosedResourceError:
TransportClosed, log.error(f"{chan} form {chan.uid} broke")
trio.BrokenResourceError,
trio.ClosedResourceError
):
# channels "breaking" is ok since we don't have a teardown
# handshake for them (yet) and instead we simply bail out
# of the message loop and expect the teardown sequence
# to clean up.
log.error(f"{chan} form {chan.uid} closed abruptly")
# raise
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
# ship any "internal" exception (i.e. one from internal machinery # ship any "internal" exception (i.e. one from internal machinery
# not from an rpc task) to parent # not from an rpc task) to parent
log.exception("Actor errored:") log.exception("Actor errored:")
if self._parent_chan: if self._parent_chan:
await self._parent_chan.send(pack_error(err)) await self._parent_chan.send(pack_error(err))
raise
# if this is the `MainProcess` we expect the error broadcasting # if this is the `MainProcess` we expect the error broadcasting
# above to trigger an error at consuming portal "checkpoints" # above to trigger an error at consuming portal "checkpoints"
raise
except trio.Cancelled: except trio.Cancelled:
# debugging only # debugging only
log.debug(f"Msg loop was cancelled for {chan}") log.debug(f"Msg loop was cancelled for {chan}")
raise raise
finally: finally:
# msg debugging for when he machinery is brokey
log.debug( log.debug(
f"Exiting msg loop for {chan} from {chan.uid} " f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}") f"with last msg:\n{msg}")
@ -795,8 +709,8 @@ class Actor:
# Disable sigint handling in children if NOT running in # Disable sigint handling in children if NOT running in
# debug mode; we shouldn't need it thanks to our # debug mode; we shouldn't need it thanks to our
# cancellation machinery. # cancellation machinery.
# if 'debug_mode' not in rvs: if 'debug_mode' not in rvs:
# signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGINT, signal.SIG_IGN)
return chan, accept_addr return chan, accept_addr
@ -1175,7 +1089,6 @@ class Actor:
parlance. parlance.
""" """
await chan.send(self.uid) await chan.send(self.uid)
# breakpoint()
uid: Tuple[str, str] = tuple(await chan.recv()) uid: Tuple[str, str] = tuple(await chan.recv())
# if not isinstance(uid, tuple): # if not isinstance(uid, tuple):

View File

@ -41,10 +41,6 @@ class ContextCancelled(RemoteActorError):
"Inter-actor task context cancelled itself on the callee side." "Inter-actor task context cancelled itself on the callee side."
class TransportClosed(trio.ClosedResourceError):
"Underlying channel transport was closed prior to use"
class NoResult(RuntimeError): class NoResult(RuntimeError):
"No final result is expected for this actor" "No final result is expected for this actor"
@ -70,15 +66,12 @@ def pack_error(exc: BaseException) -> Dict[str, Any]:
def unpack_error( def unpack_error(
msg: Dict[str, Any], msg: Dict[str, Any],
chan=None, chan=None,
err_type=RemoteActorError err_type=RemoteActorError
) -> Exception: ) -> Exception:
"""Unpack an 'error' message from the wire """Unpack an 'error' message from the wire
into a local ``RemoteActorError``. into a local ``RemoteActorError``.
""" """
error = msg['error'] error = msg['error']

View File

@ -2,7 +2,6 @@
Inter-process comms abstractions Inter-process comms abstractions
""" """
from functools import partial from functools import partial
import math
import struct import struct
import typing import typing
from typing import Any, Tuple, Optional from typing import Any, Tuple, Optional
@ -14,7 +13,6 @@ import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
from .log import get_logger from .log import get_logger
from ._exceptions import TransportClosed
log = get_logger(__name__) log = get_logger(__name__)
# :eyeroll: # :eyeroll:
@ -26,7 +24,7 @@ except ImportError:
Unpacker = partial(msgpack.Unpacker, strict_map_key=False) Unpacker = partial(msgpack.Unpacker, strict_map_key=False)
class MsgpackTCPStream: class MsgpackStream:
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
using ``msgpack-python``. using ``msgpack-python``.
@ -49,10 +47,7 @@ class MsgpackTCPStream:
assert isinstance(rsockname, tuple) assert isinstance(rsockname, tuple)
self._raddr = rsockname[:2] self._raddr = rsockname[:2]
# start and seed first entry to read loop
self._agen = self._iter_packets() self._agen = self._iter_packets()
# self._agen.asend(None) is None
self._send_lock = trio.StrictFIFOLock() self._send_lock = trio.StrictFIFOLock()
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
@ -63,13 +58,16 @@ class MsgpackTCPStream:
use_list=False, use_list=False,
) )
while True: while True:
try:
data = await self.stream.receive_some(2**10) data = await self.stream.receive_some(2**10)
log.trace(f"received {data}") # type: ignore log.trace(f"received {data}") # type: ignore
except trio.BrokenResourceError:
log.warning(f"Stream connection {self.raddr} broke")
return
if data == b'': if data == b'':
raise TransportClosed( log.debug(f"Stream connection {self.raddr} was closed")
f'transport {self} was already closed prior ro read' return
)
unpacker.feed(data) unpacker.feed(data)
for packet in unpacker: for packet in unpacker:
@ -100,7 +98,7 @@ class MsgpackTCPStream:
return self.stream.socket.fileno() != -1 return self.stream.socket.fileno() != -1
class MsgspecTCPStream(MsgpackTCPStream): class MsgspecStream(MsgpackStream):
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
using ``msgspec``. using ``msgspec``.
@ -125,16 +123,12 @@ class MsgspecTCPStream(MsgpackTCPStream):
while True: while True:
try: try:
header = await self.recv_stream.receive_exactly(4) header = await self.recv_stream.receive_exactly(4)
if header is None:
except (ValueError): continue
raise TransportClosed(
f'transport {self} was already closed prior ro read'
)
if header == b'': if header == b'':
raise TransportClosed( log.debug(f"Stream connection {self.raddr} was closed")
f'transport {self} was already closed prior ro read' return
)
size, = struct.unpack("<I", header) size, = struct.unpack("<I", header)
@ -142,6 +136,12 @@ class MsgspecTCPStream(MsgpackTCPStream):
msg_bytes = await self.recv_stream.receive_exactly(size) msg_bytes = await self.recv_stream.receive_exactly(size)
# the value error here is to catch a connect with immediate
# disconnect that will cause an EOF error inside `tricycle`.
except (ValueError, trio.BrokenResourceError):
log.warning(f"Stream connection {self.raddr} broke")
return
log.trace(f"received {msg_bytes}") # type: ignore log.trace(f"received {msg_bytes}") # type: ignore
yield decoder.decode(msg_bytes) yield decoder.decode(msg_bytes)
@ -169,9 +169,8 @@ class Channel:
on_reconnect: typing.Callable[..., typing.Awaitable] = None, on_reconnect: typing.Callable[..., typing.Awaitable] = None,
auto_reconnect: bool = False, auto_reconnect: bool = False,
stream: trio.SocketStream = None, # expected to be active stream: trio.SocketStream = None, # expected to be active
# stream_serializer: type = MsgpackStream,
# stream_serializer_type: type = MsgspecTCPStream, stream_serializer_type: type = MsgspecStream,
stream_serializer_type: type = MsgpackTCPStream,
) -> None: ) -> None:
@ -193,8 +192,6 @@ class Channel:
self._exc: Optional[Exception] = None self._exc: Optional[Exception] = None
self._agen = self._aiter_recv() self._agen = self._aiter_recv()
self._closed: bool = False
def __repr__(self) -> str: def __repr__(self) -> str:
if self.msgstream: if self.msgstream:
return repr( return repr(
@ -211,52 +208,35 @@ class Channel:
return self.msgstream.raddr if self.msgstream else None return self.msgstream.raddr if self.msgstream else None
async def connect( async def connect(
self, self, destaddr: Tuple[Any, ...] = None,
destaddr: Tuple[Any, ...] = None,
**kwargs **kwargs
) -> trio.SocketStream: ) -> trio.SocketStream:
if self.connected(): if self.connected():
raise RuntimeError("channel is already connected?") raise RuntimeError("channel is already connected?")
destaddr = destaddr or self._destaddr destaddr = destaddr or self._destaddr
assert isinstance(destaddr, tuple) assert isinstance(destaddr, tuple)
stream = await trio.open_tcp_stream(*destaddr, **kwargs)
stream = await trio.open_tcp_stream(
*destaddr,
happy_eyeballs_delay=math.inf,
**kwargs
)
self.msgstream = self.stream_serializer_type(stream) self.msgstream = self.stream_serializer_type(stream)
return stream return stream
async def send(self, item: Any) -> None: async def send(self, item: Any) -> None:
log.trace(f"send `{item}`") # type: ignore log.trace(f"send `{item}`") # type: ignore
assert self.msgstream assert self.msgstream
await self.msgstream.send(item) await self.msgstream.send(item)
async def recv(self) -> Any: async def recv(self) -> Any:
assert self.msgstream assert self.msgstream
try: try:
return await self.msgstream.recv() return await self.msgstream.recv()
except trio.BrokenResourceError: except trio.BrokenResourceError:
if self._autorecon: if self._autorecon:
await self._reconnect() await self._reconnect()
return await self.recv() return await self.recv()
raise
async def aclose(self) -> None: async def aclose(self) -> None:
log.debug(f"Closing {self}") log.debug(f"Closing {self}")
assert self.msgstream assert self.msgstream
await self.msgstream.stream.aclose() await self.msgstream.stream.aclose()
self._closed = True
log.error(f'CLOSING CHAN {self}')
async def __aenter__(self): async def __aenter__(self):
await self.connect() await self.connect()

View File

@ -177,7 +177,6 @@ class Portal:
f"Cancelling all streams with {self.channel.uid}") f"Cancelling all streams with {self.channel.uid}")
for stream in self._streams.copy(): for stream in self._streams.copy():
try: try:
# with trio.CancelScope(shield=True):
await stream.aclose() await stream.aclose()
except trio.ClosedResourceError: except trio.ClosedResourceError:
# don't error the stream having already been closed # don't error the stream having already been closed
@ -333,6 +332,12 @@ class Portal:
# message right now since there shouldn't be a reason to # message right now since there shouldn't be a reason to
# stop and restart the stream, right? # stop and restart the stream, right?
try: try:
# We are for sure done with this stream and no more
# messages are expected to be delivered from the
# runtime's msg loop.
await recv_chan.aclose()
await ctx.cancel() await ctx.cancel()
except trio.ClosedResourceError: except trio.ClosedResourceError:
@ -370,6 +375,7 @@ class Portal:
recv_chan: Optional[trio.MemoryReceiveChannel] = None recv_chan: Optional[trio.MemoryReceiveChannel] = None
try:
cid, recv_chan, functype, first_msg = await self._submit( cid, recv_chan, functype, first_msg = await self._submit(
fn_mod_path, fn_name, kwargs) fn_mod_path, fn_name, kwargs)
@ -392,21 +398,16 @@ class Portal:
# deliver context instance and .started() msg value in open # deliver context instance and .started() msg value in open
# tuple. # tuple.
try:
async with trio.open_nursery() as scope_nursery:
ctx = Context( ctx = Context(
self.channel, self.channel,
cid, cid,
_portal=self, _portal=self,
_recv_chan=recv_chan, _recv_chan=recv_chan,
_scope_nursery=scope_nursery,
) )
recv_chan._ctx = ctx
try:
yield ctx, first yield ctx, first
log.info(f'Context for {func.__name__} completed')
if cancel_on_exit: if cancel_on_exit:
await ctx.cancel() await ctx.cancel()
@ -414,34 +415,24 @@ class Portal:
if not ctx._cancel_called: if not ctx._cancel_called:
await ctx.result() await ctx.result()
await recv_chan.aclose()
# except TypeError:
# # if fn_name == '_emsd_main':
# import tractor
# await tractor.breakpoint()
except ContextCancelled: except ContextCancelled:
# if the context was cancelled by client code
# then we don't need to raise since user code
# is expecting this.
if not ctx._cancel_called: if not ctx._cancel_called:
raise raise
# if the context was cancelled by client code except BaseException:
# then we don't need to raise since user code
# is expecting this and the block should exit.
else:
log.debug(f'Context {ctx} cancelled gracefully')
except trio.Cancelled:
# the context cancels itself on any deviation # the context cancels itself on any deviation
await ctx.cancel() await ctx.cancel()
raise raise
# finally: finally:
# log.info(f'Context for {func.__name__} completed') log.info(f'Context for {func.__name__} completed')
# finally:
# if recv_chan is not None:
finally:
if recv_chan is not None:
await recv_chan.aclose()
@dataclass @dataclass
class LocalPortal: class LocalPortal:
@ -479,7 +470,6 @@ async def open_portal(
was_connected = False was_connected = False
async with maybe_open_nursery(nursery, shield=shield) as nursery: async with maybe_open_nursery(nursery, shield=shield) as nursery:
if not channel.connected(): if not channel.connected():
await channel.connect() await channel.connect()
was_connected = True was_connected = True
@ -501,12 +491,11 @@ async def open_portal(
portal = Portal(channel) portal = Portal(channel)
try: try:
yield portal yield portal
finally: finally:
await portal.aclose() await portal.aclose()
if was_connected: if was_connected:
# gracefully signal remote channel-msg loop # cancel remote channel-msg loop
await channel.send(None) await channel.send(None)
# cancel background msg loop task # cancel background msg loop task

View File

@ -147,7 +147,7 @@ async def cancel_on_completion(
) )
else: else:
log.runtime( log.info(
f"Cancelling {portal.channel.uid} gracefully " f"Cancelling {portal.channel.uid} gracefully "
f"after result {result}") f"after result {result}")
@ -263,7 +263,7 @@ async def new_proc(
parent_addr, parent_addr,
infect_asyncio=infect_asyncio infect_asyncio=infect_asyncio
) as proc: ) as proc:
log.runtime(f"Started {proc}") log.info(f"Started {proc}")
# wait for actor to spawn and connect back to us # wait for actor to spawn and connect back to us
# channel should have handshake completed by the # channel should have handshake completed by the
@ -416,7 +416,7 @@ async def mp_new_proc(
if not proc.is_alive(): if not proc.is_alive():
raise ActorFailure("Couldn't start sub-actor?") raise ActorFailure("Couldn't start sub-actor?")
log.runtime(f"Started {proc}") log.info(f"Started {proc}")
try: try:
# wait for actor to spawn and connect back to us # wait for actor to spawn and connect back to us

View File

@ -7,7 +7,7 @@ from contextlib import contextmanager, asynccontextmanager
from dataclasses import dataclass from dataclasses import dataclass
from typing import ( from typing import (
Any, Iterator, Optional, Callable, Any, Iterator, Optional, Callable,
AsyncGenerator, Dict, AsyncGenerator,
) )
import warnings import warnings
@ -61,15 +61,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
return msg['yield'] return msg['yield']
async def receive(self): async def receive(self):
# see ``.aclose()`` for notes on the old behaviour prior to # see ``.aclose()`` for an alt to always checking this
# introducing this
if self._eoc: if self._eoc:
raise trio.EndOfChannel raise trio.EndOfChannel
try: try:
# if self._ctx.chan.uid[0] == 'brokerd.ib':
# breakpoint()
msg = await self._rx_chan.receive() msg = await self._rx_chan.receive()
return msg['yield'] return msg['yield']
@ -85,13 +81,12 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
if msg.get('stop'): if msg.get('stop'):
log.debug(f"{self} was stopped at remote end") log.debug(f"{self} was stopped at remote end")
# # when the send is closed we assume the stream has # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop # terminated and signal this local iterator to stop
# await self.aclose() await self.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to # XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch # raise a ``StopAsyncIteration``.
# block below it will trigger ``.aclose()``.
raise trio.EndOfChannel raise trio.EndOfChannel
# TODO: test that shows stream raising an expected error!!! # TODO: test that shows stream raising an expected error!!!
@ -102,50 +97,44 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
else: else:
raise raise
except ( except trio.ClosedResourceError:
trio.ClosedResourceError, # by self._rx_chan # XXX: this indicates that a `stop` message was
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end # sent by the far side of the underlying channel.
trio.Cancelled, # by local cancellation # Currently this is triggered by calling ``.aclose()`` on
):
# XXX: we close the stream on any of these error conditions:
# a ``ClosedResourceError`` indicates that the internal
# feeder memory receive channel was closed likely by the
# runtime after the associated transport-channel
# disconnected or broke.
# an ``EndOfChannel`` indicates either the internal recv
# memchan exhausted **or** we raisesd it just above after
# receiving a `stop` message from the far end of the stream.
# Previously this was triggered by calling ``.aclose()`` on
# the send side of the channel inside # the send side of the channel inside
# ``Actor._push_result()`` (should still be commented code # ``Actor._push_result()``, but maybe it should be put here?
# there - which should eventually get removed), but now the # to avoid exposing the internal mem chan closing mechanism?
# 'stop' message handling has been put just above. # in theory we could instead do some flushing of the channel
# if needed to ensure all consumers are complete before
# triggering closure too early?
# TODO: Locally, we want to close this stream gracefully, by # Locally, we want to close this stream gracefully, by
# terminating any local consumers tasks deterministically. # terminating any local consumers tasks deterministically.
# One we have broadcast support, we **don't** want to be # We **don't** want to be closing this send channel and not
# closing this stream and not flushing a final value to # relaying a final value to remaining consumers who may not
# remaining (clone) consumers who may not have been # have been scheduled to receive it yet?
# scheduled to receive it yet.
# lots of testing to do here
# when the send is closed we assume the stream has # when the send is closed we assume the stream has
# terminated and signal this local iterator to stop # terminated and signal this local iterator to stop
await self.aclose() await self.aclose()
raise # propagate raise trio.EndOfChannel
# except trio.Cancelled: # if not isinstance(self, MsgStream):
# if not self._shielded: # # XXX: this was how we handled this originally for the
# # if shielded we don't propagate a cancelled # # single direction case?
# raise # raise trio.EndOfChannel
# except trio.Cancelled: # else:
# # relay cancels to the remote task # # in 2-way case raise the closed error
# await self.aclose() # raise # propagate
# raise
except trio.Cancelled:
# relay cancels to the remote task
await self.aclose()
raise
@contextmanager @contextmanager
def shield( def shield(
@ -169,11 +158,13 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
on close. on close.
""" """
self._eoc = True
# XXX: keep proper adherance to trio's `.aclose()` semantics: # XXX: keep proper adherance to trio's `.aclose()` semantics:
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
rx_chan = self._rx_chan rx_chan = self._rx_chan
if rx_chan._closed: # or self._eoc: if rx_chan._closed:
log.warning(f"{self} is already closed") log.warning(f"{self} is already closed")
# this stream has already been closed so silently succeed as # this stream has already been closed so silently succeed as
@ -193,9 +184,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
log.warning(f"{self} is shielded, portal channel being kept alive") log.warning(f"{self} is shielded, portal channel being kept alive")
return return
# XXX: This must be set **AFTER** the shielded test above!
self._eoc = True
# NOTE: this is super subtle IPC messaging stuff: # NOTE: this is super subtle IPC messaging stuff:
# Relay stop iteration to far end **iff** we're # Relay stop iteration to far end **iff** we're
# in bidirectional mode. If we're only streaming # in bidirectional mode. If we're only streaming
@ -207,63 +195,59 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# isn't expecting messages to be sent by the caller. # isn't expecting messages to be sent by the caller.
# Thus, we must check that this context DOES NOT # Thus, we must check that this context DOES NOT
# have a portal reference to ensure this is indeed the callee # have a portal reference to ensure this is indeed the callee
# side and can relay a 'stop'. # side and can relay a 'stop'. In the bidirectional case,
# `Context.open_stream()` will create the `Actor._cids2qs`
# In the bidirectional case, `Context.open_stream()` will create # entry from a call to `Actor.get_memchans()`.
# the `Actor._cids2qs` entry from a call to
# `Actor.get_memchans()` and will send the stop message in
# ``__aexit__()`` on teardown so it **does not** need to be
# called here.
if not self._ctx._portal: if not self._ctx._portal:
try: try:
# only for 2 way streams can we can send # only for 2 way streams can we can send
# stop from the caller side # stop from the caller side
await self._ctx.send_stop() await self._ctx.send_stop()
except ( except trio.BrokenResourceError:
trio.BrokenResourceError,
trio.ClosedResourceError
):
# the underlying channel may already have been pulled # the underlying channel may already have been pulled
# in which case our stop message is meaningless since # in which case our stop message is meaningless since
# it can't traverse the transport. # it can't traverse the transport.
log.debug(f'Channel for {self} was already closed') log.debug(f'Channel for {self} was already closed')
# close the local mem chan ``self._rx_chan`` ??!? # close the local mem chan??!?
# DEFINITELY NOT if we're a bi-dir ``MsgStream``! # NOT if we're a ``MsgStream``!
# BECAUSE this same core-msg-loop mem recv-chan is used to deliver # BECAUSE this same core-msg-loop mem recv-chan is used to deliver
# the potential final result from the surrounding inter-actor # the potential final result from the surrounding inter-actor
# `Context` so we don't want to close it until that context has # `Context` so we don't want to close it until that context has
# run to completion. # run to completion.
# XXX: Notes on old behaviour: # XXX: Notes on old behaviour.
# await rx_chan.aclose()
# In the receive-only case, ``Portal.open_stream_from()`` used # In the receive-only case, ``Portal.open_stream_from()`` should
# to rely on this call explicitly on teardown such that a new # call this explicitly on teardown but additionally if for some
# call to ``.receive()`` after ``rx_chan`` had been closed, would # reason stream consumer code tries to manually receive a new
# result in us raising a ``trio.EndOfChannel`` (since we
# remapped the ``trio.ClosedResourceError`). However, now if for some
# reason the stream's consumer code tries to manually receive a new
# value before ``.aclose()`` is called **but** the far end has # value before ``.aclose()`` is called **but** the far end has
# stopped `.receive()` **must** raise ``trio.EndofChannel`` in # stopped `.receive()` **must** raise ``trio.EndofChannel`` in
# order to avoid an infinite hang on ``.__anext__()``; this is # order to avoid an infinite hang on ``.__anext__()``. So we can
# why we added ``self._eoc`` to denote stream closure indepedent # instead uncomment this check and close the underlying msg-loop
# of ``rx_chan``. # mem chan below and not then **not** check for ``self._eoc`` in
# ``.receive()`` (if for some reason we think that check is
# a bottle neck - not likely) such that the
# ``trio.ClosedResourceError`` would instead trigger the
# ``trio.EndOfChannel`` in ``.receive()`` (as it originally was
# before bi-dir streaming support).
# In theory we could still use this old method and close the # if not isinstance(self, MsgStream):
# underlying msg-loop mem chan as above and then **not** check # await rx_chan.aclose()
# for ``self._eoc`` in ``.receive()`` (if for some reason we
# think that check is a bottle neck - not likely) **but** then # TODO: but make it broadcasting to consumers
# we would need to map the resulting # def clone(self):
# ``trio.ClosedResourceError`` to a ``trio.EndOfChannel`` in # """Clone this receive channel allowing for multi-task
# ``.receive()`` (as it originally was before bi-dir streaming # consumption from the same channel.
# support) in order to trigger stream closure. The old behaviour
# is arguably more confusing since we lose detection of the # """
# runtime's closure of ``rx_chan`` in the case where we may # return ReceiveStream(
# still need to consume msgs that are "in transit" from the far # self._cid,
# end (eg. for ``Context.result()``). # self._rx_chan.clone(),
# self._portal,
# )
class MsgStream(ReceiveMsgStream, trio.abc.Channel): class MsgStream(ReceiveMsgStream, trio.abc.Channel):
@ -276,22 +260,8 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel):
self, self,
data: Any data: Any
) -> None: ) -> None:
'''Send a message over this stream to the far end.
'''
await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})
# TODO: but make it broadcasting to consumers
def clone(self):
"""Clone this receive channel allowing for multi-task
consumption from the same channel.
"""
return MsgStream(
self._ctx,
self._rx_chan.clone(),
)
@dataclass @dataclass
class Context: class Context:
@ -318,7 +288,7 @@ class Context:
_cancel_called: bool = False _cancel_called: bool = False
# only set on the callee side # only set on the callee side
_scope_nursery: Optional[trio.Nursery] = None _cancel_scope: Optional[trio.CancelScope] = None
async def send_yield(self, data: Any) -> None: async def send_yield(self, data: Any) -> None:
@ -333,16 +303,6 @@ class Context:
async def send_stop(self) -> None: async def send_stop(self) -> None:
await self.chan.send({'stop': True, 'cid': self.cid}) await self.chan.send({'stop': True, 'cid': self.cid})
def _error_from_remote_msg(
self,
msg: Dict[str, Any],
) -> None:
async def raiser():
raise unpack_error(msg, self.chan)
self._scope_nursery.start_soon(raiser)
async def cancel(self) -> None: async def cancel(self) -> None:
'''Cancel this inter-actor-task context. '''Cancel this inter-actor-task context.
@ -381,16 +341,13 @@ class Context:
f"{cid} for {self._portal.channel.uid}") f"{cid} for {self._portal.channel.uid}")
else: else:
# ensure callee side # ensure callee side
assert self._scope_nursery assert self._cancel_scope
# TODO: should we have an explicit cancel message # TODO: should we have an explicit cancel message
# or is relaying the local `trio.Cancelled` as an # or is relaying the local `trio.Cancelled` as an
# {'error': trio.Cancelled, cid: "blah"} enough? # {'error': trio.Cancelled, cid: "blah"} enough?
# This probably gets into the discussion in # This probably gets into the discussion in
# https://github.com/goodboy/tractor/issues/36 # https://github.com/goodboy/tractor/issues/36
self._scope_nursery.cancel_scope.cancel() self._cancel_scope.cancel()
if self._recv_chan:
await self._recv_chan.aclose()
@asynccontextmanager @asynccontextmanager
async def open_stream( async def open_stream(
@ -461,8 +418,7 @@ class Context:
yield rchan yield rchan
except trio.EndOfChannel: except trio.EndOfChannel:
# likely the far end sent us a 'stop' message to # stream iteration stop signal
# terminate the stream.
raise raise
else: else:

View File

@ -30,17 +30,16 @@ DATE_FORMAT = '%b %d %H:%M:%S'
LEVELS = { LEVELS = {
'GARBAGE': 1, 'GARBAGE': 1,
'TRACE': 5, 'TRACE': 5,
'RUNTIME': 15, 'PROFILE': 15,
'PDB': 500, 'RUNTIME': 500,
'QUIET': 1000, 'QUIET': 1000,
} }
STD_PALETTE = { STD_PALETTE = {
'CRITICAL': 'red', 'CRITICAL': 'red',
'ERROR': 'red', 'ERROR': 'red',
'PDB': 'white', 'RUNTIME': 'white',
'WARNING': 'yellow', 'WARNING': 'yellow',
'INFO': 'green', 'INFO': 'green',
'RUNTIME': 'white',
'DEBUG': 'white', 'DEBUG': 'white',
'TRACE': 'cyan', 'TRACE': 'cyan',
'GARBAGE': 'blue', 'GARBAGE': 'blue',

View File

@ -101,16 +101,14 @@ def _run_asyncio_task(
"""Cancel the calling ``trio`` task on error. """Cancel the calling ``trio`` task on error.
""" """
nonlocal aio_err nonlocal aio_err
try:
aio_err = task.exception() aio_err = task.exception()
except asyncio.CancelledError as cerr:
aio_err = cerr
if aio_err: if aio_err:
log.exception(f"asyncio task errorred:\n{aio_err}") log.exception(f"asyncio task errorred:\n{aio_err}")
# cancel_scope.cancel() # cancel_scope.cancel()
from_aio._err = aio_err from_aio._err = aio_err
to_trio.close()
task.add_done_callback(cancel_trio) task.add_done_callback(cancel_trio)
@ -235,26 +233,18 @@ async def run_task(
# raise aio_err # raise aio_err
# Do we need this? # Do we need this?
except Exception as err: except BaseException as err:
# await tractor.breakpoint() # await tractor.breakpoint()
aio_err = from_aio._err aio_err = from_aio._err
# try:
if aio_err is not None: if aio_err is not None:
# always raise from any captured asyncio error # always raise from any captured asyncio error
raise err from aio_err raise err from aio_err
else: else:
raise raise
# finally:
# if not task.done():
# task.cancel()
except trio.Cancelled: finally:
if not task.done():
task.cancel() task.cancel()
raise
# async def stream_from_task # async def stream_from_task
# pass # pass