Compare commits
10 Commits
86aaec696f
...
e2e572ef51
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | e2e572ef51 | |
Tyler Goodlet | 6cf6a0e70b | |
Tyler Goodlet | 0a5cf6bbd2 | |
Tyler Goodlet | e6c9232b45 | |
Tyler Goodlet | 62ece4327d | |
Tyler Goodlet | f7b7c3fb93 | |
Tyler Goodlet | 186968d3e7 | |
Tyler Goodlet | 7e2a054dd7 | |
Tyler Goodlet | b494f29d55 | |
Tyler Goodlet | 9ced2066b9 |
|
@ -12,7 +12,7 @@ import uuid
|
||||||
import typing
|
import typing
|
||||||
from typing import Dict, List, Tuple, Any, Optional, Union
|
from typing import Dict, List, Tuple, Any, Optional, Union
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
import signal
|
# import signal
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
from contextlib import ExitStack
|
from contextlib import ExitStack
|
||||||
|
@ -31,6 +31,7 @@ from ._exceptions import (
|
||||||
ModuleNotExposed,
|
ModuleNotExposed,
|
||||||
is_multi_cancelled,
|
is_multi_cancelled,
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
|
TransportClosed,
|
||||||
)
|
)
|
||||||
from . import _debug
|
from . import _debug
|
||||||
from ._discovery import get_arbiter
|
from ._discovery import get_arbiter
|
||||||
|
@ -47,6 +48,7 @@ class ActorFailure(Exception):
|
||||||
|
|
||||||
|
|
||||||
async def _invoke(
|
async def _invoke(
|
||||||
|
|
||||||
actor: 'Actor',
|
actor: 'Actor',
|
||||||
cid: str,
|
cid: str,
|
||||||
chan: Channel,
|
chan: Channel,
|
||||||
|
@ -59,10 +61,12 @@ async def _invoke(
|
||||||
"""Invoke local func and deliver result(s) over provided channel.
|
"""Invoke local func and deliver result(s) over provided channel.
|
||||||
"""
|
"""
|
||||||
treat_as_gen = False
|
treat_as_gen = False
|
||||||
cs = None
|
|
||||||
cancel_scope = trio.CancelScope()
|
cancel_scope = trio.CancelScope()
|
||||||
ctx = Context(chan, cid, _cancel_scope=cancel_scope)
|
cs: trio.CancelScope = None
|
||||||
context = False
|
|
||||||
|
ctx = Context(chan, cid)
|
||||||
|
context: bool = False
|
||||||
|
|
||||||
if getattr(func, '_tractor_stream_function', False):
|
if getattr(func, '_tractor_stream_function', False):
|
||||||
# handle decorated ``@tractor.stream`` async functions
|
# handle decorated ``@tractor.stream`` async functions
|
||||||
|
@ -150,14 +154,22 @@ async def _invoke(
|
||||||
# context func with support for bi-dir streaming
|
# context func with support for bi-dir streaming
|
||||||
await chan.send({'functype': 'context', 'cid': cid})
|
await chan.send({'functype': 'context', 'cid': cid})
|
||||||
|
|
||||||
with cancel_scope as cs:
|
async with trio.open_nursery() as scope_nursery:
|
||||||
|
ctx._scope_nursery = scope_nursery
|
||||||
|
cs = scope_nursery.cancel_scope
|
||||||
task_status.started(cs)
|
task_status.started(cs)
|
||||||
await chan.send({'return': await coro, 'cid': cid})
|
await chan.send({'return': await coro, 'cid': cid})
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
if cs.cancelled_caught:
|
||||||
|
if ctx._cancel_called:
|
||||||
|
msg = f'{func.__name__} cancelled itself',
|
||||||
|
|
||||||
|
else:
|
||||||
|
msg = f'{func.__name__} was remotely cancelled',
|
||||||
|
|
||||||
# task-contex was cancelled so relay to the cancel to caller
|
# task-contex was cancelled so relay to the cancel to caller
|
||||||
raise ContextCancelled(
|
raise ContextCancelled(
|
||||||
f'{func.__name__} cancelled itself',
|
msg,
|
||||||
suberror_type=trio.Cancelled,
|
suberror_type=trio.Cancelled,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -170,20 +182,29 @@ async def _invoke(
|
||||||
|
|
||||||
except (Exception, trio.MultiError) as err:
|
except (Exception, trio.MultiError) as err:
|
||||||
|
|
||||||
# TODO: maybe we'll want differnet "levels" of debugging
|
if not is_multi_cancelled(err):
|
||||||
# eventualy such as ('app', 'supervisory', 'runtime') ?
|
|
||||||
if not isinstance(err, trio.ClosedResourceError) and (
|
log.exception("Actor crashed:")
|
||||||
not is_multi_cancelled(err)) and (
|
|
||||||
not isinstance(err, ContextCancelled)
|
# TODO: maybe we'll want different "levels" of debugging
|
||||||
):
|
# eventualy such as ('app', 'supervisory', 'runtime') ?
|
||||||
# XXX: is there any case where we'll want to debug IPC
|
|
||||||
# disconnects? I can't think of a reason that inspecting
|
# if not isinstance(err, trio.ClosedResourceError) and (
|
||||||
# this type of failure will be useful for respawns or
|
# if not is_multi_cancelled(err) and (
|
||||||
# recovery logic - the only case is some kind of strange bug
|
|
||||||
# in `trio` itself?
|
if not isinstance(err, ContextCancelled) or (
|
||||||
entered = await _debug._maybe_enter_pm(err)
|
isinstance(err, ContextCancelled) and ctx._cancel_called
|
||||||
if not entered:
|
):
|
||||||
log.exception("Actor crashed:")
|
# XXX: is there any case where we'll want to debug IPC
|
||||||
|
# disconnects? I can't think of a reason that inspecting
|
||||||
|
# this type of failure will be useful for respawns or
|
||||||
|
# recovery logic - the only case is some kind of strange bug
|
||||||
|
# in `trio` itself?
|
||||||
|
await _debug._maybe_enter_pm(err)
|
||||||
|
|
||||||
|
# entered = await _debug._maybe_enter_pm(err)
|
||||||
|
# if not entered:
|
||||||
|
# log.exception("Actor crashed:")
|
||||||
|
|
||||||
# always ship errors back to caller
|
# always ship errors back to caller
|
||||||
err_msg = pack_error(err)
|
err_msg = pack_error(err)
|
||||||
|
@ -192,8 +213,10 @@ async def _invoke(
|
||||||
await chan.send(err_msg)
|
await chan.send(err_msg)
|
||||||
|
|
||||||
except trio.ClosedResourceError:
|
except trio.ClosedResourceError:
|
||||||
log.warning(
|
# if we can't propagate the error that's a big boo boo
|
||||||
f"Failed to ship error to caller @ {chan.uid}")
|
log.error(
|
||||||
|
f"Failed to ship error to caller @ {chan.uid} !?"
|
||||||
|
)
|
||||||
|
|
||||||
if cs is None:
|
if cs is None:
|
||||||
# error is from above code not from rpc invocation
|
# error is from above code not from rpc invocation
|
||||||
|
@ -378,19 +401,34 @@ class Actor:
|
||||||
raise mne
|
raise mne
|
||||||
|
|
||||||
async def _stream_handler(
|
async def _stream_handler(
|
||||||
|
|
||||||
self,
|
self,
|
||||||
stream: trio.SocketStream,
|
stream: trio.SocketStream,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Entry point for new inbound connections to the channel server.
|
"""Entry point for new inbound connections to the channel server.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
self._no_more_peers = trio.Event() # unset
|
self._no_more_peers = trio.Event() # unset
|
||||||
|
|
||||||
chan = Channel(stream=stream)
|
chan = Channel(stream=stream)
|
||||||
log.info(f"New connection to us {chan}")
|
log.info(f"New connection to us {chan}")
|
||||||
|
|
||||||
# send/receive initial handshake response
|
# send/receive initial handshake response
|
||||||
try:
|
try:
|
||||||
uid = await self._do_handshake(chan)
|
uid = await self._do_handshake(chan)
|
||||||
except StopAsyncIteration:
|
|
||||||
|
except (
|
||||||
|
trio.BrokenResourceError,
|
||||||
|
trio.ClosedResourceError,
|
||||||
|
TransportClosed,
|
||||||
|
):
|
||||||
|
# XXX: This may propagate up from ``Channel._aiter_recv()``
|
||||||
|
# and ``MsgpackStream._inter_packets()`` on a read from the
|
||||||
|
# stream particularly when the runtime is first starting up
|
||||||
|
# inside ``open_root_actor()`` where there is a check for
|
||||||
|
# a bound listener on the "arbiter" addr. the reset will be
|
||||||
|
# because the handshake was never meant took place.
|
||||||
log.warning(f"Channel {chan} failed to handshake")
|
log.warning(f"Channel {chan} failed to handshake")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -418,10 +456,24 @@ class Actor:
|
||||||
try:
|
try:
|
||||||
await self._process_messages(chan)
|
await self._process_messages(chan)
|
||||||
finally:
|
finally:
|
||||||
|
|
||||||
|
# channel cleanup sequence
|
||||||
|
|
||||||
|
# for (channel, cid) in self._rpc_tasks.copy():
|
||||||
|
# if channel is chan:
|
||||||
|
# with trio.CancelScope(shield=True):
|
||||||
|
# await self._cancel_task(cid, channel)
|
||||||
|
|
||||||
|
# # close all consumer side task mem chans
|
||||||
|
# send_chan, _ = self._cids2qs[(chan.uid, cid)]
|
||||||
|
# assert send_chan.cid == cid # type: ignore
|
||||||
|
# await send_chan.aclose()
|
||||||
|
|
||||||
# Drop ref to channel so it can be gc-ed and disconnected
|
# Drop ref to channel so it can be gc-ed and disconnected
|
||||||
log.debug(f"Releasing channel {chan} from {chan.uid}")
|
log.debug(f"Releasing channel {chan} from {chan.uid}")
|
||||||
chans = self._peers.get(chan.uid)
|
chans = self._peers.get(chan.uid)
|
||||||
chans.remove(chan)
|
chans.remove(chan)
|
||||||
|
|
||||||
if not chans:
|
if not chans:
|
||||||
log.debug(f"No more channels for {chan.uid}")
|
log.debug(f"No more channels for {chan.uid}")
|
||||||
self._peers.pop(chan.uid, None)
|
self._peers.pop(chan.uid, None)
|
||||||
|
@ -434,14 +486,22 @@ class Actor:
|
||||||
|
|
||||||
# # XXX: is this necessary (GC should do it?)
|
# # XXX: is this necessary (GC should do it?)
|
||||||
if chan.connected():
|
if chan.connected():
|
||||||
|
# if the channel is still connected it may mean the far
|
||||||
|
# end has not closed and we may have gotten here due to
|
||||||
|
# an error and so we should at least try to terminate
|
||||||
|
# the channel from this end gracefully.
|
||||||
|
|
||||||
log.debug(f"Disconnecting channel {chan}")
|
log.debug(f"Disconnecting channel {chan}")
|
||||||
try:
|
try:
|
||||||
# send our msg loop terminate sentinel
|
# send a msg loop terminate sentinel
|
||||||
await chan.send(None)
|
await chan.send(None)
|
||||||
|
|
||||||
|
# XXX: do we want this?
|
||||||
|
# causes "[104] connection reset by peer" on other end
|
||||||
# await chan.aclose()
|
# await chan.aclose()
|
||||||
|
|
||||||
except trio.BrokenResourceError:
|
except trio.BrokenResourceError:
|
||||||
log.exception(
|
log.warning(f"Channel for {chan.uid} was already closed")
|
||||||
f"Channel for {chan.uid} was already zonked..")
|
|
||||||
|
|
||||||
async def _push_result(
|
async def _push_result(
|
||||||
self,
|
self,
|
||||||
|
@ -451,18 +511,22 @@ class Actor:
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Push an RPC result to the local consumer's queue.
|
"""Push an RPC result to the local consumer's queue.
|
||||||
"""
|
"""
|
||||||
actorid = chan.uid
|
# actorid = chan.uid
|
||||||
assert actorid, f"`actorid` can't be {actorid}"
|
assert chan.uid, f"`chan.uid` can't be {chan.uid}"
|
||||||
send_chan, recv_chan = self._cids2qs[(actorid, cid)]
|
send_chan, recv_chan = self._cids2qs[(chan.uid, cid)]
|
||||||
assert send_chan.cid == cid # type: ignore
|
assert send_chan.cid == cid # type: ignore
|
||||||
|
|
||||||
# if 'stop' in msg:
|
if 'error' in msg:
|
||||||
|
ctx = getattr(recv_chan, '_ctx', None)
|
||||||
|
# if ctx:
|
||||||
|
# ctx._error_from_remote_msg(msg)
|
||||||
|
|
||||||
# log.debug(f"{send_chan} was terminated at remote end")
|
# log.debug(f"{send_chan} was terminated at remote end")
|
||||||
# # indicate to consumer that far end has stopped
|
# # indicate to consumer that far end has stopped
|
||||||
# return await send_chan.aclose()
|
# return await send_chan.aclose()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
|
log.debug(f"Delivering {msg} from {chan.uid} to caller {cid}")
|
||||||
# maintain backpressure
|
# maintain backpressure
|
||||||
await send_chan.send(msg)
|
await send_chan.send(msg)
|
||||||
|
|
||||||
|
@ -481,12 +545,14 @@ class Actor:
|
||||||
self,
|
self,
|
||||||
actorid: Tuple[str, str],
|
actorid: Tuple[str, str],
|
||||||
cid: str
|
cid: str
|
||||||
|
|
||||||
) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]:
|
) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]:
|
||||||
|
|
||||||
log.debug(f"Getting result queue for {actorid} cid {cid}")
|
log.debug(f"Getting result queue for {actorid} cid {cid}")
|
||||||
try:
|
try:
|
||||||
send_chan, recv_chan = self._cids2qs[(actorid, cid)]
|
send_chan, recv_chan = self._cids2qs[(actorid, cid)]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
send_chan, recv_chan = trio.open_memory_channel(1000)
|
send_chan, recv_chan = trio.open_memory_channel(2*10)
|
||||||
send_chan.cid = cid # type: ignore
|
send_chan.cid = cid # type: ignore
|
||||||
recv_chan.cid = cid # type: ignore
|
recv_chan.cid = cid # type: ignore
|
||||||
self._cids2qs[(actorid, cid)] = send_chan, recv_chan
|
self._cids2qs[(actorid, cid)] = send_chan, recv_chan
|
||||||
|
@ -544,9 +610,15 @@ class Actor:
|
||||||
if channel is chan:
|
if channel is chan:
|
||||||
await self._cancel_task(cid, channel)
|
await self._cancel_task(cid, channel)
|
||||||
|
|
||||||
|
# close all consumer side task mem chans
|
||||||
|
# send_chan, _ = self._cids2qs[(chan.uid, cid)]
|
||||||
|
# assert send_chan.cid == cid # type: ignore
|
||||||
|
# await send_chan.aclose()
|
||||||
|
|
||||||
log.debug(
|
log.debug(
|
||||||
f"Msg loop signalled to terminate for"
|
f"Msg loop signalled to terminate for"
|
||||||
f" {chan} from {chan.uid}")
|
f" {chan} from {chan.uid}")
|
||||||
|
|
||||||
break
|
break
|
||||||
|
|
||||||
log.trace( # type: ignore
|
log.trace( # type: ignore
|
||||||
|
@ -641,22 +713,36 @@ class Actor:
|
||||||
)
|
)
|
||||||
await self.cancel_rpc_tasks(chan)
|
await self.cancel_rpc_tasks(chan)
|
||||||
|
|
||||||
except trio.ClosedResourceError:
|
except (
|
||||||
log.error(f"{chan} form {chan.uid} broke")
|
TransportClosed,
|
||||||
|
trio.BrokenResourceError,
|
||||||
|
trio.ClosedResourceError
|
||||||
|
):
|
||||||
|
# channels "breaking" is ok since we don't have a teardown
|
||||||
|
# handshake for them (yet) and instead we simply bail out
|
||||||
|
# of the message loop and expect the teardown sequence
|
||||||
|
# to clean up.
|
||||||
|
log.error(f"{chan} form {chan.uid} closed abruptly")
|
||||||
|
# raise
|
||||||
|
|
||||||
except (Exception, trio.MultiError) as err:
|
except (Exception, trio.MultiError) as err:
|
||||||
# ship any "internal" exception (i.e. one from internal machinery
|
# ship any "internal" exception (i.e. one from internal machinery
|
||||||
# not from an rpc task) to parent
|
# not from an rpc task) to parent
|
||||||
log.exception("Actor errored:")
|
log.exception("Actor errored:")
|
||||||
if self._parent_chan:
|
if self._parent_chan:
|
||||||
await self._parent_chan.send(pack_error(err))
|
await self._parent_chan.send(pack_error(err))
|
||||||
raise
|
|
||||||
# if this is the `MainProcess` we expect the error broadcasting
|
# if this is the `MainProcess` we expect the error broadcasting
|
||||||
# above to trigger an error at consuming portal "checkpoints"
|
# above to trigger an error at consuming portal "checkpoints"
|
||||||
|
raise
|
||||||
|
|
||||||
except trio.Cancelled:
|
except trio.Cancelled:
|
||||||
# debugging only
|
# debugging only
|
||||||
log.debug(f"Msg loop was cancelled for {chan}")
|
log.debug(f"Msg loop was cancelled for {chan}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
# msg debugging for when he machinery is brokey
|
||||||
log.debug(
|
log.debug(
|
||||||
f"Exiting msg loop for {chan} from {chan.uid} "
|
f"Exiting msg loop for {chan} from {chan.uid} "
|
||||||
f"with last msg:\n{msg}")
|
f"with last msg:\n{msg}")
|
||||||
|
@ -709,8 +795,8 @@ class Actor:
|
||||||
# Disable sigint handling in children if NOT running in
|
# Disable sigint handling in children if NOT running in
|
||||||
# debug mode; we shouldn't need it thanks to our
|
# debug mode; we shouldn't need it thanks to our
|
||||||
# cancellation machinery.
|
# cancellation machinery.
|
||||||
if 'debug_mode' not in rvs:
|
# if 'debug_mode' not in rvs:
|
||||||
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
# signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||||
|
|
||||||
return chan, accept_addr
|
return chan, accept_addr
|
||||||
|
|
||||||
|
@ -1089,6 +1175,7 @@ class Actor:
|
||||||
parlance.
|
parlance.
|
||||||
"""
|
"""
|
||||||
await chan.send(self.uid)
|
await chan.send(self.uid)
|
||||||
|
# breakpoint()
|
||||||
uid: Tuple[str, str] = tuple(await chan.recv())
|
uid: Tuple[str, str] = tuple(await chan.recv())
|
||||||
|
|
||||||
# if not isinstance(uid, tuple):
|
# if not isinstance(uid, tuple):
|
||||||
|
|
|
@ -41,6 +41,10 @@ class ContextCancelled(RemoteActorError):
|
||||||
"Inter-actor task context cancelled itself on the callee side."
|
"Inter-actor task context cancelled itself on the callee side."
|
||||||
|
|
||||||
|
|
||||||
|
class TransportClosed(trio.ClosedResourceError):
|
||||||
|
"Underlying channel transport was closed prior to use"
|
||||||
|
|
||||||
|
|
||||||
class NoResult(RuntimeError):
|
class NoResult(RuntimeError):
|
||||||
"No final result is expected for this actor"
|
"No final result is expected for this actor"
|
||||||
|
|
||||||
|
@ -66,12 +70,15 @@ def pack_error(exc: BaseException) -> Dict[str, Any]:
|
||||||
|
|
||||||
|
|
||||||
def unpack_error(
|
def unpack_error(
|
||||||
|
|
||||||
msg: Dict[str, Any],
|
msg: Dict[str, Any],
|
||||||
chan=None,
|
chan=None,
|
||||||
err_type=RemoteActorError
|
err_type=RemoteActorError
|
||||||
|
|
||||||
) -> Exception:
|
) -> Exception:
|
||||||
"""Unpack an 'error' message from the wire
|
"""Unpack an 'error' message from the wire
|
||||||
into a local ``RemoteActorError``.
|
into a local ``RemoteActorError``.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
error = msg['error']
|
error = msg['error']
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
Inter-process comms abstractions
|
Inter-process comms abstractions
|
||||||
"""
|
"""
|
||||||
from functools import partial
|
from functools import partial
|
||||||
|
import math
|
||||||
import struct
|
import struct
|
||||||
import typing
|
import typing
|
||||||
from typing import Any, Tuple, Optional
|
from typing import Any, Tuple, Optional
|
||||||
|
@ -13,6 +14,7 @@ import trio
|
||||||
from async_generator import asynccontextmanager
|
from async_generator import asynccontextmanager
|
||||||
|
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
|
from ._exceptions import TransportClosed
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
# :eyeroll:
|
# :eyeroll:
|
||||||
|
@ -24,7 +26,7 @@ except ImportError:
|
||||||
Unpacker = partial(msgpack.Unpacker, strict_map_key=False)
|
Unpacker = partial(msgpack.Unpacker, strict_map_key=False)
|
||||||
|
|
||||||
|
|
||||||
class MsgpackStream:
|
class MsgpackTCPStream:
|
||||||
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
|
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
|
||||||
using ``msgpack-python``.
|
using ``msgpack-python``.
|
||||||
|
|
||||||
|
@ -47,7 +49,10 @@ class MsgpackStream:
|
||||||
assert isinstance(rsockname, tuple)
|
assert isinstance(rsockname, tuple)
|
||||||
self._raddr = rsockname[:2]
|
self._raddr = rsockname[:2]
|
||||||
|
|
||||||
|
# start and seed first entry to read loop
|
||||||
self._agen = self._iter_packets()
|
self._agen = self._iter_packets()
|
||||||
|
# self._agen.asend(None) is None
|
||||||
|
|
||||||
self._send_lock = trio.StrictFIFOLock()
|
self._send_lock = trio.StrictFIFOLock()
|
||||||
|
|
||||||
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
|
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
|
||||||
|
@ -58,16 +63,13 @@ class MsgpackStream:
|
||||||
use_list=False,
|
use_list=False,
|
||||||
)
|
)
|
||||||
while True:
|
while True:
|
||||||
try:
|
data = await self.stream.receive_some(2**10)
|
||||||
data = await self.stream.receive_some(2**10)
|
log.trace(f"received {data}") # type: ignore
|
||||||
log.trace(f"received {data}") # type: ignore
|
|
||||||
except trio.BrokenResourceError:
|
|
||||||
log.warning(f"Stream connection {self.raddr} broke")
|
|
||||||
return
|
|
||||||
|
|
||||||
if data == b'':
|
if data == b'':
|
||||||
log.debug(f"Stream connection {self.raddr} was closed")
|
raise TransportClosed(
|
||||||
return
|
f'transport {self} was already closed prior ro read'
|
||||||
|
)
|
||||||
|
|
||||||
unpacker.feed(data)
|
unpacker.feed(data)
|
||||||
for packet in unpacker:
|
for packet in unpacker:
|
||||||
|
@ -98,7 +100,7 @@ class MsgpackStream:
|
||||||
return self.stream.socket.fileno() != -1
|
return self.stream.socket.fileno() != -1
|
||||||
|
|
||||||
|
|
||||||
class MsgspecStream(MsgpackStream):
|
class MsgspecTCPStream(MsgpackTCPStream):
|
||||||
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
|
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
|
||||||
using ``msgspec``.
|
using ``msgspec``.
|
||||||
|
|
||||||
|
@ -123,24 +125,22 @@ class MsgspecStream(MsgpackStream):
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
header = await self.recv_stream.receive_exactly(4)
|
header = await self.recv_stream.receive_exactly(4)
|
||||||
if header is None:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if header == b'':
|
except (ValueError):
|
||||||
log.debug(f"Stream connection {self.raddr} was closed")
|
raise TransportClosed(
|
||||||
return
|
f'transport {self} was already closed prior ro read'
|
||||||
|
)
|
||||||
|
|
||||||
size, = struct.unpack("<I", header)
|
if header == b'':
|
||||||
|
raise TransportClosed(
|
||||||
|
f'transport {self} was already closed prior ro read'
|
||||||
|
)
|
||||||
|
|
||||||
log.trace(f'received header {size}')
|
size, = struct.unpack("<I", header)
|
||||||
|
|
||||||
msg_bytes = await self.recv_stream.receive_exactly(size)
|
log.trace(f'received header {size}')
|
||||||
|
|
||||||
# the value error here is to catch a connect with immediate
|
msg_bytes = await self.recv_stream.receive_exactly(size)
|
||||||
# disconnect that will cause an EOF error inside `tricycle`.
|
|
||||||
except (ValueError, trio.BrokenResourceError):
|
|
||||||
log.warning(f"Stream connection {self.raddr} broke")
|
|
||||||
return
|
|
||||||
|
|
||||||
log.trace(f"received {msg_bytes}") # type: ignore
|
log.trace(f"received {msg_bytes}") # type: ignore
|
||||||
yield decoder.decode(msg_bytes)
|
yield decoder.decode(msg_bytes)
|
||||||
|
@ -169,8 +169,9 @@ class Channel:
|
||||||
on_reconnect: typing.Callable[..., typing.Awaitable] = None,
|
on_reconnect: typing.Callable[..., typing.Awaitable] = None,
|
||||||
auto_reconnect: bool = False,
|
auto_reconnect: bool = False,
|
||||||
stream: trio.SocketStream = None, # expected to be active
|
stream: trio.SocketStream = None, # expected to be active
|
||||||
# stream_serializer: type = MsgpackStream,
|
|
||||||
stream_serializer_type: type = MsgspecStream,
|
# stream_serializer_type: type = MsgspecTCPStream,
|
||||||
|
stream_serializer_type: type = MsgpackTCPStream,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
|
@ -192,6 +193,8 @@ class Channel:
|
||||||
self._exc: Optional[Exception] = None
|
self._exc: Optional[Exception] = None
|
||||||
self._agen = self._aiter_recv()
|
self._agen = self._aiter_recv()
|
||||||
|
|
||||||
|
self._closed: bool = False
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
if self.msgstream:
|
if self.msgstream:
|
||||||
return repr(
|
return repr(
|
||||||
|
@ -208,35 +211,52 @@ class Channel:
|
||||||
return self.msgstream.raddr if self.msgstream else None
|
return self.msgstream.raddr if self.msgstream else None
|
||||||
|
|
||||||
async def connect(
|
async def connect(
|
||||||
self, destaddr: Tuple[Any, ...] = None,
|
self,
|
||||||
|
destaddr: Tuple[Any, ...] = None,
|
||||||
**kwargs
|
**kwargs
|
||||||
|
|
||||||
) -> trio.SocketStream:
|
) -> trio.SocketStream:
|
||||||
|
|
||||||
if self.connected():
|
if self.connected():
|
||||||
raise RuntimeError("channel is already connected?")
|
raise RuntimeError("channel is already connected?")
|
||||||
|
|
||||||
destaddr = destaddr or self._destaddr
|
destaddr = destaddr or self._destaddr
|
||||||
assert isinstance(destaddr, tuple)
|
assert isinstance(destaddr, tuple)
|
||||||
stream = await trio.open_tcp_stream(*destaddr, **kwargs)
|
|
||||||
|
stream = await trio.open_tcp_stream(
|
||||||
|
*destaddr,
|
||||||
|
happy_eyeballs_delay=math.inf,
|
||||||
|
**kwargs
|
||||||
|
)
|
||||||
self.msgstream = self.stream_serializer_type(stream)
|
self.msgstream = self.stream_serializer_type(stream)
|
||||||
return stream
|
return stream
|
||||||
|
|
||||||
async def send(self, item: Any) -> None:
|
async def send(self, item: Any) -> None:
|
||||||
|
|
||||||
log.trace(f"send `{item}`") # type: ignore
|
log.trace(f"send `{item}`") # type: ignore
|
||||||
assert self.msgstream
|
assert self.msgstream
|
||||||
|
|
||||||
await self.msgstream.send(item)
|
await self.msgstream.send(item)
|
||||||
|
|
||||||
async def recv(self) -> Any:
|
async def recv(self) -> Any:
|
||||||
assert self.msgstream
|
assert self.msgstream
|
||||||
|
|
||||||
try:
|
try:
|
||||||
return await self.msgstream.recv()
|
return await self.msgstream.recv()
|
||||||
|
|
||||||
except trio.BrokenResourceError:
|
except trio.BrokenResourceError:
|
||||||
if self._autorecon:
|
if self._autorecon:
|
||||||
await self._reconnect()
|
await self._reconnect()
|
||||||
return await self.recv()
|
return await self.recv()
|
||||||
|
|
||||||
|
raise
|
||||||
|
|
||||||
async def aclose(self) -> None:
|
async def aclose(self) -> None:
|
||||||
log.debug(f"Closing {self}")
|
log.debug(f"Closing {self}")
|
||||||
assert self.msgstream
|
assert self.msgstream
|
||||||
await self.msgstream.stream.aclose()
|
await self.msgstream.stream.aclose()
|
||||||
|
self._closed = True
|
||||||
|
log.error(f'CLOSING CHAN {self}')
|
||||||
|
|
||||||
async def __aenter__(self):
|
async def __aenter__(self):
|
||||||
await self.connect()
|
await self.connect()
|
||||||
|
|
|
@ -177,6 +177,7 @@ class Portal:
|
||||||
f"Cancelling all streams with {self.channel.uid}")
|
f"Cancelling all streams with {self.channel.uid}")
|
||||||
for stream in self._streams.copy():
|
for stream in self._streams.copy():
|
||||||
try:
|
try:
|
||||||
|
# with trio.CancelScope(shield=True):
|
||||||
await stream.aclose()
|
await stream.aclose()
|
||||||
except trio.ClosedResourceError:
|
except trio.ClosedResourceError:
|
||||||
# don't error the stream having already been closed
|
# don't error the stream having already been closed
|
||||||
|
@ -332,12 +333,6 @@ class Portal:
|
||||||
# message right now since there shouldn't be a reason to
|
# message right now since there shouldn't be a reason to
|
||||||
# stop and restart the stream, right?
|
# stop and restart the stream, right?
|
||||||
try:
|
try:
|
||||||
|
|
||||||
# We are for sure done with this stream and no more
|
|
||||||
# messages are expected to be delivered from the
|
|
||||||
# runtime's msg loop.
|
|
||||||
await recv_chan.aclose()
|
|
||||||
|
|
||||||
await ctx.cancel()
|
await ctx.cancel()
|
||||||
|
|
||||||
except trio.ClosedResourceError:
|
except trio.ClosedResourceError:
|
||||||
|
@ -375,64 +370,78 @@ class Portal:
|
||||||
|
|
||||||
recv_chan: Optional[trio.MemoryReceiveChannel] = None
|
recv_chan: Optional[trio.MemoryReceiveChannel] = None
|
||||||
|
|
||||||
|
cid, recv_chan, functype, first_msg = await self._submit(
|
||||||
|
fn_mod_path, fn_name, kwargs)
|
||||||
|
|
||||||
|
assert functype == 'context'
|
||||||
|
msg = await recv_chan.receive()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
cid, recv_chan, functype, first_msg = await self._submit(
|
# the "first" value here is delivered by the callee's
|
||||||
fn_mod_path, fn_name, kwargs)
|
# ``Context.started()`` call.
|
||||||
|
first = msg['started']
|
||||||
|
|
||||||
assert functype == 'context'
|
except KeyError:
|
||||||
msg = await recv_chan.receive()
|
assert msg.get('cid'), ("Received internal error at context?")
|
||||||
|
|
||||||
try:
|
if msg.get('error'):
|
||||||
# the "first" value here is delivered by the callee's
|
# raise the error message
|
||||||
# ``Context.started()`` call.
|
raise unpack_error(msg, self.channel)
|
||||||
first = msg['started']
|
else:
|
||||||
|
|
||||||
except KeyError:
|
|
||||||
assert msg.get('cid'), ("Received internal error at context?")
|
|
||||||
|
|
||||||
if msg.get('error'):
|
|
||||||
# raise the error message
|
|
||||||
raise unpack_error(msg, self.channel)
|
|
||||||
else:
|
|
||||||
raise
|
|
||||||
|
|
||||||
# deliver context instance and .started() msg value in open
|
|
||||||
# tuple.
|
|
||||||
ctx = Context(
|
|
||||||
self.channel,
|
|
||||||
cid,
|
|
||||||
_portal=self,
|
|
||||||
_recv_chan=recv_chan,
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
yield ctx, first
|
|
||||||
|
|
||||||
if cancel_on_exit:
|
|
||||||
await ctx.cancel()
|
|
||||||
|
|
||||||
else:
|
|
||||||
if not ctx._cancel_called:
|
|
||||||
await ctx.result()
|
|
||||||
|
|
||||||
except ContextCancelled:
|
|
||||||
# if the context was cancelled by client code
|
|
||||||
# then we don't need to raise since user code
|
|
||||||
# is expecting this.
|
|
||||||
if not ctx._cancel_called:
|
|
||||||
raise
|
|
||||||
|
|
||||||
except BaseException:
|
|
||||||
# the context cancels itself on any deviation
|
|
||||||
await ctx.cancel()
|
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
# deliver context instance and .started() msg value in open
|
||||||
log.info(f'Context for {func.__name__} completed')
|
# tuple.
|
||||||
|
try:
|
||||||
|
async with trio.open_nursery() as scope_nursery:
|
||||||
|
ctx = Context(
|
||||||
|
self.channel,
|
||||||
|
cid,
|
||||||
|
_portal=self,
|
||||||
|
_recv_chan=recv_chan,
|
||||||
|
_scope_nursery=scope_nursery,
|
||||||
|
)
|
||||||
|
recv_chan._ctx = ctx
|
||||||
|
|
||||||
|
yield ctx, first
|
||||||
|
|
||||||
|
log.info(f'Context for {func.__name__} completed')
|
||||||
|
|
||||||
|
if cancel_on_exit:
|
||||||
|
await ctx.cancel()
|
||||||
|
|
||||||
|
else:
|
||||||
|
if not ctx._cancel_called:
|
||||||
|
await ctx.result()
|
||||||
|
|
||||||
|
await recv_chan.aclose()
|
||||||
|
|
||||||
|
# except TypeError:
|
||||||
|
# # if fn_name == '_emsd_main':
|
||||||
|
# import tractor
|
||||||
|
# await tractor.breakpoint()
|
||||||
|
|
||||||
|
except ContextCancelled:
|
||||||
|
if not ctx._cancel_called:
|
||||||
|
raise
|
||||||
|
|
||||||
|
# if the context was cancelled by client code
|
||||||
|
# then we don't need to raise since user code
|
||||||
|
# is expecting this and the block should exit.
|
||||||
|
else:
|
||||||
|
log.debug(f'Context {ctx} cancelled gracefully')
|
||||||
|
|
||||||
|
except trio.Cancelled:
|
||||||
|
# the context cancels itself on any deviation
|
||||||
|
await ctx.cancel()
|
||||||
|
raise
|
||||||
|
|
||||||
|
# finally:
|
||||||
|
# log.info(f'Context for {func.__name__} completed')
|
||||||
|
|
||||||
|
# finally:
|
||||||
|
# if recv_chan is not None:
|
||||||
|
|
||||||
finally:
|
|
||||||
if recv_chan is not None:
|
|
||||||
await recv_chan.aclose()
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class LocalPortal:
|
class LocalPortal:
|
||||||
|
@ -470,6 +479,7 @@ async def open_portal(
|
||||||
was_connected = False
|
was_connected = False
|
||||||
|
|
||||||
async with maybe_open_nursery(nursery, shield=shield) as nursery:
|
async with maybe_open_nursery(nursery, shield=shield) as nursery:
|
||||||
|
|
||||||
if not channel.connected():
|
if not channel.connected():
|
||||||
await channel.connect()
|
await channel.connect()
|
||||||
was_connected = True
|
was_connected = True
|
||||||
|
@ -491,11 +501,12 @@ async def open_portal(
|
||||||
portal = Portal(channel)
|
portal = Portal(channel)
|
||||||
try:
|
try:
|
||||||
yield portal
|
yield portal
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
await portal.aclose()
|
await portal.aclose()
|
||||||
|
|
||||||
if was_connected:
|
if was_connected:
|
||||||
# cancel remote channel-msg loop
|
# gracefully signal remote channel-msg loop
|
||||||
await channel.send(None)
|
await channel.send(None)
|
||||||
|
|
||||||
# cancel background msg loop task
|
# cancel background msg loop task
|
||||||
|
|
|
@ -147,7 +147,7 @@ async def cancel_on_completion(
|
||||||
)
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
log.info(
|
log.runtime(
|
||||||
f"Cancelling {portal.channel.uid} gracefully "
|
f"Cancelling {portal.channel.uid} gracefully "
|
||||||
f"after result {result}")
|
f"after result {result}")
|
||||||
|
|
||||||
|
@ -263,7 +263,7 @@ async def new_proc(
|
||||||
parent_addr,
|
parent_addr,
|
||||||
infect_asyncio=infect_asyncio
|
infect_asyncio=infect_asyncio
|
||||||
) as proc:
|
) as proc:
|
||||||
log.info(f"Started {proc}")
|
log.runtime(f"Started {proc}")
|
||||||
|
|
||||||
# wait for actor to spawn and connect back to us
|
# wait for actor to spawn and connect back to us
|
||||||
# channel should have handshake completed by the
|
# channel should have handshake completed by the
|
||||||
|
@ -416,7 +416,7 @@ async def mp_new_proc(
|
||||||
if not proc.is_alive():
|
if not proc.is_alive():
|
||||||
raise ActorFailure("Couldn't start sub-actor?")
|
raise ActorFailure("Couldn't start sub-actor?")
|
||||||
|
|
||||||
log.info(f"Started {proc}")
|
log.runtime(f"Started {proc}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# wait for actor to spawn and connect back to us
|
# wait for actor to spawn and connect back to us
|
||||||
|
|
|
@ -7,7 +7,7 @@ from contextlib import contextmanager, asynccontextmanager
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import (
|
from typing import (
|
||||||
Any, Iterator, Optional, Callable,
|
Any, Iterator, Optional, Callable,
|
||||||
AsyncGenerator,
|
AsyncGenerator, Dict,
|
||||||
)
|
)
|
||||||
|
|
||||||
import warnings
|
import warnings
|
||||||
|
@ -61,11 +61,15 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
return msg['yield']
|
return msg['yield']
|
||||||
|
|
||||||
async def receive(self):
|
async def receive(self):
|
||||||
# see ``.aclose()`` for an alt to always checking this
|
# see ``.aclose()`` for notes on the old behaviour prior to
|
||||||
|
# introducing this
|
||||||
if self._eoc:
|
if self._eoc:
|
||||||
raise trio.EndOfChannel
|
raise trio.EndOfChannel
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# if self._ctx.chan.uid[0] == 'brokerd.ib':
|
||||||
|
# breakpoint()
|
||||||
|
|
||||||
msg = await self._rx_chan.receive()
|
msg = await self._rx_chan.receive()
|
||||||
return msg['yield']
|
return msg['yield']
|
||||||
|
|
||||||
|
@ -81,12 +85,13 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
if msg.get('stop'):
|
if msg.get('stop'):
|
||||||
log.debug(f"{self} was stopped at remote end")
|
log.debug(f"{self} was stopped at remote end")
|
||||||
|
|
||||||
# when the send is closed we assume the stream has
|
# # when the send is closed we assume the stream has
|
||||||
# terminated and signal this local iterator to stop
|
# # terminated and signal this local iterator to stop
|
||||||
await self.aclose()
|
# await self.aclose()
|
||||||
|
|
||||||
# XXX: this causes ``ReceiveChannel.__anext__()`` to
|
# XXX: this causes ``ReceiveChannel.__anext__()`` to
|
||||||
# raise a ``StopAsyncIteration``.
|
# raise a ``StopAsyncIteration`` **and** in our catch
|
||||||
|
# block below it will trigger ``.aclose()``.
|
||||||
raise trio.EndOfChannel
|
raise trio.EndOfChannel
|
||||||
|
|
||||||
# TODO: test that shows stream raising an expected error!!!
|
# TODO: test that shows stream raising an expected error!!!
|
||||||
|
@ -97,44 +102,50 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
except trio.ClosedResourceError:
|
except (
|
||||||
# XXX: this indicates that a `stop` message was
|
trio.ClosedResourceError, # by self._rx_chan
|
||||||
# sent by the far side of the underlying channel.
|
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
|
||||||
# Currently this is triggered by calling ``.aclose()`` on
|
trio.Cancelled, # by local cancellation
|
||||||
|
):
|
||||||
|
# XXX: we close the stream on any of these error conditions:
|
||||||
|
|
||||||
|
# a ``ClosedResourceError`` indicates that the internal
|
||||||
|
# feeder memory receive channel was closed likely by the
|
||||||
|
# runtime after the associated transport-channel
|
||||||
|
# disconnected or broke.
|
||||||
|
|
||||||
|
# an ``EndOfChannel`` indicates either the internal recv
|
||||||
|
# memchan exhausted **or** we raisesd it just above after
|
||||||
|
# receiving a `stop` message from the far end of the stream.
|
||||||
|
|
||||||
|
# Previously this was triggered by calling ``.aclose()`` on
|
||||||
# the send side of the channel inside
|
# the send side of the channel inside
|
||||||
# ``Actor._push_result()``, but maybe it should be put here?
|
# ``Actor._push_result()`` (should still be commented code
|
||||||
# to avoid exposing the internal mem chan closing mechanism?
|
# there - which should eventually get removed), but now the
|
||||||
# in theory we could instead do some flushing of the channel
|
# 'stop' message handling has been put just above.
|
||||||
# if needed to ensure all consumers are complete before
|
|
||||||
# triggering closure too early?
|
|
||||||
|
|
||||||
# Locally, we want to close this stream gracefully, by
|
# TODO: Locally, we want to close this stream gracefully, by
|
||||||
# terminating any local consumers tasks deterministically.
|
# terminating any local consumers tasks deterministically.
|
||||||
# We **don't** want to be closing this send channel and not
|
# One we have broadcast support, we **don't** want to be
|
||||||
# relaying a final value to remaining consumers who may not
|
# closing this stream and not flushing a final value to
|
||||||
# have been scheduled to receive it yet?
|
# remaining (clone) consumers who may not have been
|
||||||
|
# scheduled to receive it yet.
|
||||||
# lots of testing to do here
|
|
||||||
|
|
||||||
# when the send is closed we assume the stream has
|
# when the send is closed we assume the stream has
|
||||||
# terminated and signal this local iterator to stop
|
# terminated and signal this local iterator to stop
|
||||||
await self.aclose()
|
await self.aclose()
|
||||||
|
|
||||||
raise trio.EndOfChannel
|
raise # propagate
|
||||||
|
|
||||||
# if not isinstance(self, MsgStream):
|
# except trio.Cancelled:
|
||||||
# # XXX: this was how we handled this originally for the
|
# if not self._shielded:
|
||||||
# # single direction case?
|
# # if shielded we don't propagate a cancelled
|
||||||
# raise trio.EndOfChannel
|
# raise
|
||||||
|
|
||||||
# else:
|
# except trio.Cancelled:
|
||||||
# # in 2-way case raise the closed error
|
# # relay cancels to the remote task
|
||||||
# raise # propagate
|
# await self.aclose()
|
||||||
|
# raise
|
||||||
except trio.Cancelled:
|
|
||||||
# relay cancels to the remote task
|
|
||||||
await self.aclose()
|
|
||||||
raise
|
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def shield(
|
def shield(
|
||||||
|
@ -158,13 +169,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
on close.
|
on close.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
self._eoc = True
|
|
||||||
|
|
||||||
# XXX: keep proper adherance to trio's `.aclose()` semantics:
|
# XXX: keep proper adherance to trio's `.aclose()` semantics:
|
||||||
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
|
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
|
||||||
rx_chan = self._rx_chan
|
rx_chan = self._rx_chan
|
||||||
|
|
||||||
if rx_chan._closed:
|
if rx_chan._closed: # or self._eoc:
|
||||||
log.warning(f"{self} is already closed")
|
log.warning(f"{self} is already closed")
|
||||||
|
|
||||||
# this stream has already been closed so silently succeed as
|
# this stream has already been closed so silently succeed as
|
||||||
|
@ -184,6 +193,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
log.warning(f"{self} is shielded, portal channel being kept alive")
|
log.warning(f"{self} is shielded, portal channel being kept alive")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# XXX: This must be set **AFTER** the shielded test above!
|
||||||
|
self._eoc = True
|
||||||
|
|
||||||
# NOTE: this is super subtle IPC messaging stuff:
|
# NOTE: this is super subtle IPC messaging stuff:
|
||||||
# Relay stop iteration to far end **iff** we're
|
# Relay stop iteration to far end **iff** we're
|
||||||
# in bidirectional mode. If we're only streaming
|
# in bidirectional mode. If we're only streaming
|
||||||
|
@ -195,59 +207,63 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
# isn't expecting messages to be sent by the caller.
|
# isn't expecting messages to be sent by the caller.
|
||||||
# Thus, we must check that this context DOES NOT
|
# Thus, we must check that this context DOES NOT
|
||||||
# have a portal reference to ensure this is indeed the callee
|
# have a portal reference to ensure this is indeed the callee
|
||||||
# side and can relay a 'stop'. In the bidirectional case,
|
# side and can relay a 'stop'.
|
||||||
# `Context.open_stream()` will create the `Actor._cids2qs`
|
|
||||||
# entry from a call to `Actor.get_memchans()`.
|
# In the bidirectional case, `Context.open_stream()` will create
|
||||||
|
# the `Actor._cids2qs` entry from a call to
|
||||||
|
# `Actor.get_memchans()` and will send the stop message in
|
||||||
|
# ``__aexit__()`` on teardown so it **does not** need to be
|
||||||
|
# called here.
|
||||||
if not self._ctx._portal:
|
if not self._ctx._portal:
|
||||||
try:
|
try:
|
||||||
# only for 2 way streams can we can send
|
# only for 2 way streams can we can send
|
||||||
# stop from the caller side
|
# stop from the caller side
|
||||||
await self._ctx.send_stop()
|
await self._ctx.send_stop()
|
||||||
|
|
||||||
except trio.BrokenResourceError:
|
except (
|
||||||
|
trio.BrokenResourceError,
|
||||||
|
trio.ClosedResourceError
|
||||||
|
):
|
||||||
# the underlying channel may already have been pulled
|
# the underlying channel may already have been pulled
|
||||||
# in which case our stop message is meaningless since
|
# in which case our stop message is meaningless since
|
||||||
# it can't traverse the transport.
|
# it can't traverse the transport.
|
||||||
log.debug(f'Channel for {self} was already closed')
|
log.debug(f'Channel for {self} was already closed')
|
||||||
|
|
||||||
# close the local mem chan??!?
|
# close the local mem chan ``self._rx_chan`` ??!?
|
||||||
|
|
||||||
# NOT if we're a ``MsgStream``!
|
# DEFINITELY NOT if we're a bi-dir ``MsgStream``!
|
||||||
# BECAUSE this same core-msg-loop mem recv-chan is used to deliver
|
# BECAUSE this same core-msg-loop mem recv-chan is used to deliver
|
||||||
# the potential final result from the surrounding inter-actor
|
# the potential final result from the surrounding inter-actor
|
||||||
# `Context` so we don't want to close it until that context has
|
# `Context` so we don't want to close it until that context has
|
||||||
# run to completion.
|
# run to completion.
|
||||||
|
|
||||||
# XXX: Notes on old behaviour.
|
# XXX: Notes on old behaviour:
|
||||||
|
# await rx_chan.aclose()
|
||||||
|
|
||||||
# In the receive-only case, ``Portal.open_stream_from()`` should
|
# In the receive-only case, ``Portal.open_stream_from()`` used
|
||||||
# call this explicitly on teardown but additionally if for some
|
# to rely on this call explicitly on teardown such that a new
|
||||||
# reason stream consumer code tries to manually receive a new
|
# call to ``.receive()`` after ``rx_chan`` had been closed, would
|
||||||
|
# result in us raising a ``trio.EndOfChannel`` (since we
|
||||||
|
# remapped the ``trio.ClosedResourceError`). However, now if for some
|
||||||
|
# reason the stream's consumer code tries to manually receive a new
|
||||||
# value before ``.aclose()`` is called **but** the far end has
|
# value before ``.aclose()`` is called **but** the far end has
|
||||||
# stopped `.receive()` **must** raise ``trio.EndofChannel`` in
|
# stopped `.receive()` **must** raise ``trio.EndofChannel`` in
|
||||||
# order to avoid an infinite hang on ``.__anext__()``. So we can
|
# order to avoid an infinite hang on ``.__anext__()``; this is
|
||||||
# instead uncomment this check and close the underlying msg-loop
|
# why we added ``self._eoc`` to denote stream closure indepedent
|
||||||
# mem chan below and not then **not** check for ``self._eoc`` in
|
# of ``rx_chan``.
|
||||||
# ``.receive()`` (if for some reason we think that check is
|
|
||||||
# a bottle neck - not likely) such that the
|
|
||||||
# ``trio.ClosedResourceError`` would instead trigger the
|
|
||||||
# ``trio.EndOfChannel`` in ``.receive()`` (as it originally was
|
|
||||||
# before bi-dir streaming support).
|
|
||||||
|
|
||||||
# if not isinstance(self, MsgStream):
|
# In theory we could still use this old method and close the
|
||||||
# await rx_chan.aclose()
|
# underlying msg-loop mem chan as above and then **not** check
|
||||||
|
# for ``self._eoc`` in ``.receive()`` (if for some reason we
|
||||||
# TODO: but make it broadcasting to consumers
|
# think that check is a bottle neck - not likely) **but** then
|
||||||
# def clone(self):
|
# we would need to map the resulting
|
||||||
# """Clone this receive channel allowing for multi-task
|
# ``trio.ClosedResourceError`` to a ``trio.EndOfChannel`` in
|
||||||
# consumption from the same channel.
|
# ``.receive()`` (as it originally was before bi-dir streaming
|
||||||
|
# support) in order to trigger stream closure. The old behaviour
|
||||||
# """
|
# is arguably more confusing since we lose detection of the
|
||||||
# return ReceiveStream(
|
# runtime's closure of ``rx_chan`` in the case where we may
|
||||||
# self._cid,
|
# still need to consume msgs that are "in transit" from the far
|
||||||
# self._rx_chan.clone(),
|
# end (eg. for ``Context.result()``).
|
||||||
# self._portal,
|
|
||||||
# )
|
|
||||||
|
|
||||||
|
|
||||||
class MsgStream(ReceiveMsgStream, trio.abc.Channel):
|
class MsgStream(ReceiveMsgStream, trio.abc.Channel):
|
||||||
|
@ -260,8 +276,22 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel):
|
||||||
self,
|
self,
|
||||||
data: Any
|
data: Any
|
||||||
) -> None:
|
) -> None:
|
||||||
|
'''Send a message over this stream to the far end.
|
||||||
|
|
||||||
|
'''
|
||||||
await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})
|
await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})
|
||||||
|
|
||||||
|
# TODO: but make it broadcasting to consumers
|
||||||
|
def clone(self):
|
||||||
|
"""Clone this receive channel allowing for multi-task
|
||||||
|
consumption from the same channel.
|
||||||
|
|
||||||
|
"""
|
||||||
|
return MsgStream(
|
||||||
|
self._ctx,
|
||||||
|
self._rx_chan.clone(),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Context:
|
class Context:
|
||||||
|
@ -288,7 +318,7 @@ class Context:
|
||||||
_cancel_called: bool = False
|
_cancel_called: bool = False
|
||||||
|
|
||||||
# only set on the callee side
|
# only set on the callee side
|
||||||
_cancel_scope: Optional[trio.CancelScope] = None
|
_scope_nursery: Optional[trio.Nursery] = None
|
||||||
|
|
||||||
async def send_yield(self, data: Any) -> None:
|
async def send_yield(self, data: Any) -> None:
|
||||||
|
|
||||||
|
@ -303,6 +333,16 @@ class Context:
|
||||||
async def send_stop(self) -> None:
|
async def send_stop(self) -> None:
|
||||||
await self.chan.send({'stop': True, 'cid': self.cid})
|
await self.chan.send({'stop': True, 'cid': self.cid})
|
||||||
|
|
||||||
|
def _error_from_remote_msg(
|
||||||
|
self,
|
||||||
|
msg: Dict[str, Any],
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
async def raiser():
|
||||||
|
raise unpack_error(msg, self.chan)
|
||||||
|
|
||||||
|
self._scope_nursery.start_soon(raiser)
|
||||||
|
|
||||||
async def cancel(self) -> None:
|
async def cancel(self) -> None:
|
||||||
'''Cancel this inter-actor-task context.
|
'''Cancel this inter-actor-task context.
|
||||||
|
|
||||||
|
@ -341,13 +381,16 @@ class Context:
|
||||||
f"{cid} for {self._portal.channel.uid}")
|
f"{cid} for {self._portal.channel.uid}")
|
||||||
else:
|
else:
|
||||||
# ensure callee side
|
# ensure callee side
|
||||||
assert self._cancel_scope
|
assert self._scope_nursery
|
||||||
# TODO: should we have an explicit cancel message
|
# TODO: should we have an explicit cancel message
|
||||||
# or is relaying the local `trio.Cancelled` as an
|
# or is relaying the local `trio.Cancelled` as an
|
||||||
# {'error': trio.Cancelled, cid: "blah"} enough?
|
# {'error': trio.Cancelled, cid: "blah"} enough?
|
||||||
# This probably gets into the discussion in
|
# This probably gets into the discussion in
|
||||||
# https://github.com/goodboy/tractor/issues/36
|
# https://github.com/goodboy/tractor/issues/36
|
||||||
self._cancel_scope.cancel()
|
self._scope_nursery.cancel_scope.cancel()
|
||||||
|
|
||||||
|
if self._recv_chan:
|
||||||
|
await self._recv_chan.aclose()
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def open_stream(
|
async def open_stream(
|
||||||
|
@ -418,7 +461,8 @@ class Context:
|
||||||
yield rchan
|
yield rchan
|
||||||
|
|
||||||
except trio.EndOfChannel:
|
except trio.EndOfChannel:
|
||||||
# stream iteration stop signal
|
# likely the far end sent us a 'stop' message to
|
||||||
|
# terminate the stream.
|
||||||
raise
|
raise
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -30,16 +30,17 @@ DATE_FORMAT = '%b %d %H:%M:%S'
|
||||||
LEVELS = {
|
LEVELS = {
|
||||||
'GARBAGE': 1,
|
'GARBAGE': 1,
|
||||||
'TRACE': 5,
|
'TRACE': 5,
|
||||||
'PROFILE': 15,
|
'RUNTIME': 15,
|
||||||
'RUNTIME': 500,
|
'PDB': 500,
|
||||||
'QUIET': 1000,
|
'QUIET': 1000,
|
||||||
}
|
}
|
||||||
STD_PALETTE = {
|
STD_PALETTE = {
|
||||||
'CRITICAL': 'red',
|
'CRITICAL': 'red',
|
||||||
'ERROR': 'red',
|
'ERROR': 'red',
|
||||||
'RUNTIME': 'white',
|
'PDB': 'white',
|
||||||
'WARNING': 'yellow',
|
'WARNING': 'yellow',
|
||||||
'INFO': 'green',
|
'INFO': 'green',
|
||||||
|
'RUNTIME': 'white',
|
||||||
'DEBUG': 'white',
|
'DEBUG': 'white',
|
||||||
'TRACE': 'cyan',
|
'TRACE': 'cyan',
|
||||||
'GARBAGE': 'blue',
|
'GARBAGE': 'blue',
|
||||||
|
|
|
@ -101,14 +101,16 @@ def _run_asyncio_task(
|
||||||
"""Cancel the calling ``trio`` task on error.
|
"""Cancel the calling ``trio`` task on error.
|
||||||
"""
|
"""
|
||||||
nonlocal aio_err
|
nonlocal aio_err
|
||||||
aio_err = task.exception()
|
try:
|
||||||
|
aio_err = task.exception()
|
||||||
|
except asyncio.CancelledError as cerr:
|
||||||
|
aio_err = cerr
|
||||||
|
|
||||||
if aio_err:
|
if aio_err:
|
||||||
log.exception(f"asyncio task errorred:\n{aio_err}")
|
log.exception(f"asyncio task errorred:\n{aio_err}")
|
||||||
|
|
||||||
# cancel_scope.cancel()
|
# cancel_scope.cancel()
|
||||||
from_aio._err = aio_err
|
from_aio._err = aio_err
|
||||||
to_trio.close()
|
|
||||||
|
|
||||||
task.add_done_callback(cancel_trio)
|
task.add_done_callback(cancel_trio)
|
||||||
|
|
||||||
|
@ -233,17 +235,25 @@ async def run_task(
|
||||||
# raise aio_err
|
# raise aio_err
|
||||||
|
|
||||||
# Do we need this?
|
# Do we need this?
|
||||||
except BaseException as err:
|
except Exception as err:
|
||||||
# await tractor.breakpoint()
|
# await tractor.breakpoint()
|
||||||
aio_err = from_aio._err
|
aio_err = from_aio._err
|
||||||
|
|
||||||
|
# try:
|
||||||
if aio_err is not None:
|
if aio_err is not None:
|
||||||
# always raise from any captured asyncio error
|
# always raise from any captured asyncio error
|
||||||
raise err from aio_err
|
raise err from aio_err
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
# finally:
|
||||||
|
# if not task.done():
|
||||||
|
# task.cancel()
|
||||||
|
|
||||||
finally:
|
except trio.Cancelled:
|
||||||
task.cancel()
|
if not task.done():
|
||||||
|
task.cancel()
|
||||||
|
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
# async def stream_from_task
|
# async def stream_from_task
|
||||||
|
|
Loading…
Reference in New Issue