diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d2f6237..e6c19ac 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -6,15 +6,19 @@ jobs: mypy: name: 'MyPy' runs-on: ubuntu-latest + steps: - name: Checkout uses: actions/checkout@v2 + - name: Setup python uses: actions/setup-python@v2 with: - python-version: '3.8' + python-version: '3.9' + - name: Install dependencies - run: pip install -U . --upgrade-strategy eager + run: pip install -U . --upgrade-strategy eager -r requirements-test.txt + - name: Run MyPy check run: mypy tractor/ --ignore-missing-imports diff --git a/docs/README.rst b/docs/README.rst index bd7b6af..18afd26 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -127,7 +127,8 @@ Zombie safe: self-destruct a process tree print('This process tree will self-destruct in 1 sec...') await trio.sleep(1) - # you could have done this yourself + # raise an error in root actor/process and trigger + # reaping of all minions raise Exception('Self Destructed') @@ -197,6 +198,98 @@ And, yes, there's a built-in crash handling mode B) We're hoping to add a respawn-from-repl system soon! +SC compatible bi-directional streaming +-------------------------------------- +Yes, you saw it here first; we provide 2-way streams +with reliable, transitive setup/teardown semantics. + +Our nascent api is remniscent of ``trio.Nursery.start()`` +style invocation: + +.. code:: python + + import trio + import tractor + + + @tractor.context + async def simple_rpc( + + ctx: tractor.Context, + data: int, + + ) -> None: + '''Test a small ping-pong 2-way streaming server. + + ''' + # signal to parent that we're up much like + # ``trio_typing.TaskStatus.started()`` + await ctx.started(data + 1) + + async with ctx.open_stream() as stream: + + count = 0 + async for msg in stream: + + assert msg == 'ping' + await stream.send('pong') + count += 1 + + else: + assert count == 10 + + + async def main() -> None: + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'rpc_server', + enable_modules=[__name__], + ) + + # XXX: this syntax requires py3.9 + async with ( + + portal.open_context( + simple_rpc, + data=10, + ) as (ctx, sent), + + ctx.open_stream() as stream, + ): + + assert sent == 11 + + count = 0 + # receive msgs using async for style + await stream.send('ping') + + async for msg in stream: + assert msg == 'pong' + await stream.send('ping') + count += 1 + + if count >= 9: + break + + + # explicitly teardown the daemon-actor + await portal.cancel_actor() + + + if __name__ == '__main__': + trio.run(main) + + +See original proposal and discussion in `#53`_ as well +as follow up improvements in `#223`_ that we'd love to +hear your thoughts on! + +.. _#53: https://github.com/goodboy/tractor/issues/53 +.. _#223: https://github.com/goodboy/tractor/issues/223 + + Worker poolz are easy peasy --------------------------- The initial ask from most new users is *"how do I make a worker diff --git a/examples/rpc_bidir_streaming.py b/examples/rpc_bidir_streaming.py new file mode 100644 index 0000000..7320081 --- /dev/null +++ b/examples/rpc_bidir_streaming.py @@ -0,0 +1,72 @@ +import trio +import tractor + + +@tractor.context +async def simple_rpc( + + ctx: tractor.Context, + data: int, + +) -> None: + '''Test a small ping-pong 2-way streaming server. + + ''' + # signal to parent that we're up much like + # ``trio_typing.TaskStatus.started()`` + await ctx.started(data + 1) + + async with ctx.open_stream() as stream: + + count = 0 + async for msg in stream: + + assert msg == 'ping' + await stream.send('pong') + count += 1 + + else: + assert count == 10 + + +async def main() -> None: + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'rpc_server', + enable_modules=[__name__], + ) + + # XXX: syntax requires py3.9 + async with ( + + portal.open_context( + simple_rpc, # taken from pytest parameterization + data=10, + + ) as (ctx, sent), + + ctx.open_stream() as stream, + ): + + assert sent == 11 + + count = 0 + # receive msgs using async for style + await stream.send('ping') + + async for msg in stream: + assert msg == 'pong' + await stream.send('ping') + count += 1 + + if count >= 9: + break + + # explicitly teardown the daemon-actor + await portal.cancel_actor() + + +if __name__ == '__main__': + trio.run(main) diff --git a/tests/test_2way.py b/tests/test_2way.py new file mode 100644 index 0000000..1ef05d2 --- /dev/null +++ b/tests/test_2way.py @@ -0,0 +1,498 @@ +""" +Bidirectional streaming and context API. + +""" +import pytest +import trio +import tractor + +from conftest import tractor_test + +# the general stream semantics are +# - normal termination: far end relays a stop message which +# terminates an ongoing ``MsgStream`` iteration +# - cancel termination: context is cancelled on either side cancelling +# the "linked" inter-actor task context + + +_state: bool = False + + +@tractor.context +async def simple_setup_teardown( + + ctx: tractor.Context, + data: int, + block_forever: bool = False, + +) -> None: + + # startup phase + global _state + _state = True + + # signal to parent that we're up + await ctx.started(data + 1) + + try: + if block_forever: + # block until cancelled + await trio.sleep_forever() + else: + return 'yo' + finally: + _state = False + + +async def assert_state(value: bool): + global _state + assert _state == value + + +@pytest.mark.parametrize( + 'error_parent', + [False, True], +) +@pytest.mark.parametrize( + 'callee_blocks_forever', + [False, True], +) +def test_simple_context( + error_parent, + callee_blocks_forever, +): + + async def main(): + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'simple_context', + enable_modules=[__name__], + ) + + async with portal.open_context( + simple_setup_teardown, + data=10, + block_forever=callee_blocks_forever, + ) as (ctx, sent): + + assert sent == 11 + + if callee_blocks_forever: + await portal.run(assert_state, value=True) + await ctx.cancel() + else: + assert await ctx.result() == 'yo' + + # after cancellation + await portal.run(assert_state, value=False) + + if error_parent: + raise ValueError + + # shut down daemon + await portal.cancel_actor() + + if error_parent: + try: + trio.run(main) + except ValueError: + pass + else: + trio.run(main) + + +# basic stream terminations: +# - callee context closes without using stream +# - caller context closes without using stream +# - caller context calls `Context.cancel()` while streaming +# is ongoing resulting in callee being cancelled +# - callee calls `Context.cancel()` while streaming and caller +# sees stream terminated in `RemoteActorError` + +# TODO: future possible features +# - restart request: far end raises `ContextRestart` + + +@tractor.context +async def close_ctx_immediately( + + ctx: tractor.Context, + +) -> None: + + await ctx.started() + global _state + + async with ctx.open_stream(): + pass + + +@tractor_test +async def test_callee_closes_ctx_after_stream_open(): + 'callee context closes without using stream' + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'fast_stream_closer', + enable_modules=[__name__], + ) + + async with portal.open_context( + close_ctx_immediately, + + # flag to avoid waiting the final result + # cancel_on_exit=True, + + ) as (ctx, sent): + + assert sent is None + + with trio.fail_after(0.5): + async with ctx.open_stream() as stream: + + # should fall through since ``StopAsyncIteration`` + # should be raised through translation of + # a ``trio.EndOfChannel`` by + # ``trio.abc.ReceiveChannel.__anext__()`` + async for _ in stream: + assert 0 + else: + + # verify stream is now closed + try: + await stream.receive() + except trio.EndOfChannel: + pass + + # TODO: should be just raise the closed resource err + # directly here to enforce not allowing a re-open + # of a stream to the context (at least until a time of + # if/when we decide that's a good idea?) + try: + async with ctx.open_stream() as stream: + pass + except trio.ClosedResourceError: + pass + + await portal.cancel_actor() + + +@tractor.context +async def expect_cancelled( + + ctx: tractor.Context, + +) -> None: + global _state + _state = True + + await ctx.started() + + try: + async with ctx.open_stream() as stream: + async for msg in stream: + await stream.send(msg) # echo server + + except trio.Cancelled: + # expected case + _state = False + raise + + else: + assert 0, "Wasn't cancelled!?" + + +@pytest.mark.parametrize( + 'use_ctx_cancel_method', + [False, True], +) +@tractor_test +async def test_caller_closes_ctx_after_callee_opens_stream( + use_ctx_cancel_method: bool, +): + 'caller context closes without using stream' + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'ctx_cancelled', + enable_modules=[__name__], + ) + + async with portal.open_context( + expect_cancelled, + ) as (ctx, sent): + await portal.run(assert_state, value=True) + + assert sent is None + + # call cancel explicitly + if use_ctx_cancel_method: + + await ctx.cancel() + + try: + async with ctx.open_stream() as stream: + async for msg in stream: + pass + + except tractor.ContextCancelled: + raise # XXX: must be propagated to __aexit__ + + else: + assert 0, "Should have context cancelled?" + + # channel should still be up + assert portal.channel.connected() + + # ctx is closed here + await portal.run(assert_state, value=False) + + else: + try: + with trio.fail_after(0.2): + await ctx.result() + assert 0, "Callee should have blocked!?" + except trio.TooSlowError: + await ctx.cancel() + try: + async with ctx.open_stream() as stream: + async for msg in stream: + pass + except tractor.ContextCancelled: + pass + else: + assert 0, "Should have received closed resource error?" + + # ctx is closed here + await portal.run(assert_state, value=False) + + # channel should not have been destroyed yet, only the + # inter-actor-task context + assert portal.channel.connected() + + # teardown the actor + await portal.cancel_actor() + + +@tractor_test +async def test_multitask_caller_cancels_from_nonroot_task(): + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'ctx_cancelled', + enable_modules=[__name__], + ) + + async with portal.open_context( + expect_cancelled, + ) as (ctx, sent): + + await portal.run(assert_state, value=True) + assert sent is None + + async with ctx.open_stream() as stream: + + async def send_msg_then_cancel(): + await stream.send('yo') + await portal.run(assert_state, value=True) + await ctx.cancel() + await portal.run(assert_state, value=False) + + async with trio.open_nursery() as n: + n.start_soon(send_msg_then_cancel) + + try: + async for msg in stream: + assert msg == 'yo' + + except tractor.ContextCancelled: + raise # XXX: must be propagated to __aexit__ + + # channel should still be up + assert portal.channel.connected() + + # ctx is closed here + await portal.run(assert_state, value=False) + + # channel should not have been destroyed yet, only the + # inter-actor-task context + assert portal.channel.connected() + + # teardown the actor + await portal.cancel_actor() + + +@tractor.context +async def cancel_self( + + ctx: tractor.Context, + +) -> None: + global _state + _state = True + + await ctx.cancel() + try: + with trio.fail_after(0.1): + await trio.sleep_forever() + + except trio.Cancelled: + raise + + except trio.TooSlowError: + # should never get here + assert 0 + + +@tractor_test +async def test_callee_cancels_before_started(): + '''callee calls `Context.cancel()` while streaming and caller + sees stream terminated in `ContextCancelled`. + + ''' + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'cancels_self', + enable_modules=[__name__], + ) + try: + + async with portal.open_context( + cancel_self, + ) as (ctx, sent): + async with ctx.open_stream(): + + await trio.sleep_forever() + + # raises a special cancel signal + except tractor.ContextCancelled as ce: + ce.type == trio.Cancelled + + # teardown the actor + await portal.cancel_actor() + + +@tractor.context +async def simple_rpc( + + ctx: tractor.Context, + data: int, + +) -> None: + """Test a small ping-pong server. + + """ + # signal to parent that we're up + await ctx.started(data + 1) + + print('opening stream in callee') + async with ctx.open_stream() as stream: + + count = 0 + while True: + try: + await stream.receive() == 'ping' + except trio.EndOfChannel: + assert count == 10 + break + else: + print('pong') + await stream.send('pong') + count += 1 + + +@tractor.context +async def simple_rpc_with_forloop( + + ctx: tractor.Context, + data: int, + +) -> None: + """Same as previous test but using ``async for`` syntax/api. + + """ + + # signal to parent that we're up + await ctx.started(data + 1) + + print('opening stream in callee') + async with ctx.open_stream() as stream: + + count = 0 + async for msg in stream: + + assert msg == 'ping' + print('pong') + await stream.send('pong') + count += 1 + + else: + assert count == 10 + + +@pytest.mark.parametrize( + 'use_async_for', + [True, False], +) +@pytest.mark.parametrize( + 'server_func', + [simple_rpc, simple_rpc_with_forloop], +) +def test_simple_rpc(server_func, use_async_for): + """The simplest request response pattern. + + """ + async def main(): + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'rpc_server', + enable_modules=[__name__], + ) + + async with portal.open_context( + server_func, # taken from pytest parameterization + data=10, + ) as (ctx, sent): + + assert sent == 11 + + async with ctx.open_stream() as stream: + + if use_async_for: + + count = 0 + # receive msgs using async for style + print('ping') + await stream.send('ping') + + async for msg in stream: + assert msg == 'pong' + print('ping') + await stream.send('ping') + count += 1 + + if count >= 9: + break + + else: + # classic send/receive style + for _ in range(10): + + print('ping') + await stream.send('ping') + assert await stream.receive() == 'pong' + + # stream should terminate here + + # final context result(s) should be consumed here in __aexit__() + + await portal.cancel_actor() + + trio.run(main) diff --git a/tests/test_advanced_streaming.py b/tests/test_advanced_streaming.py new file mode 100644 index 0000000..4429d25 --- /dev/null +++ b/tests/test_advanced_streaming.py @@ -0,0 +1,220 @@ +""" +Advanced streaming patterns using bidirectional streams and contexts. + +""" +import itertools +from typing import Set, Dict, List + +import trio +import tractor + + +_registry: Dict[str, Set[tractor.ReceiveMsgStream]] = { + 'even': set(), + 'odd': set(), +} + + +async def publisher( + + seed: int = 0, + +) -> None: + + global _registry + + def is_even(i): + return i % 2 == 0 + + for val in itertools.count(seed): + + sub = 'even' if is_even(val) else 'odd' + + for sub_stream in _registry[sub].copy(): + await sub_stream.send(val) + + # throttle send rate to ~1kHz + # making it readable to a human user + await trio.sleep(1/1000) + + +@tractor.context +async def subscribe( + + ctx: tractor.Context, + +) -> None: + + global _registry + + # syn caller + await ctx.started(None) + + async with ctx.open_stream() as stream: + + # update subs list as consumer requests + async for new_subs in stream: + + new_subs = set(new_subs) + remove = new_subs - _registry.keys() + + print(f'setting sub to {new_subs} for {ctx.chan.uid}') + + # remove old subs + for sub in remove: + _registry[sub].remove(stream) + + # add new subs for consumer + for sub in new_subs: + _registry[sub].add(stream) + + +async def consumer( + + subs: List[str], + +) -> None: + + uid = tractor.current_actor().uid + + async with tractor.wait_for_actor('publisher') as portal: + async with portal.open_context(subscribe) as (ctx, first): + async with ctx.open_stream() as stream: + + # flip between the provided subs dynamically + if len(subs) > 1: + + for sub in itertools.cycle(subs): + print(f'setting dynamic sub to {sub}') + await stream.send([sub]) + + count = 0 + async for value in stream: + print(f'{uid} got: {value}') + if count > 5: + break + count += 1 + + else: # static sub + + await stream.send(subs) + async for value in stream: + print(f'{uid} got: {value}') + + +def test_dynamic_pub_sub(): + + global _registry + + from multiprocessing import cpu_count + cpus = cpu_count() + + async def main(): + async with tractor.open_nursery() as n: + + # name of this actor will be same as target func + await n.run_in_actor(publisher) + + for i, sub in zip( + range(cpus - 2), + itertools.cycle(_registry.keys()) + ): + await n.run_in_actor( + consumer, + name=f'consumer_{sub}', + subs=[sub], + ) + + # make one dynamic subscriber + await n.run_in_actor( + consumer, + name='consumer_dynamic', + subs=list(_registry.keys()), + ) + + # block until cancelled by user + with trio.fail_after(3): + await trio.sleep_forever() + + try: + trio.run(main) + except trio.TooSlowError: + pass + + +@tractor.context +async def one_task_streams_and_one_handles_reqresp( + + ctx: tractor.Context, + +) -> None: + + await ctx.started() + + async with ctx.open_stream() as stream: + + async def pingpong(): + '''Run a simple req/response service. + + ''' + async for msg in stream: + print('rpc server ping') + assert msg == 'ping' + print('rpc server pong') + await stream.send('pong') + + async with trio.open_nursery() as n: + n.start_soon(pingpong) + + for _ in itertools.count(): + await stream.send('yo') + await trio.sleep(0.01) + + +def test_reqresp_ontopof_streaming(): + '''Test a subactor that both streams with one task and + spawns another which handles a small requests-response + dialogue over the same bidir-stream. + + ''' + async def main(): + + with trio.move_on_after(2): + async with tractor.open_nursery() as n: + + # name of this actor will be same as target func + portal = await n.start_actor( + 'dual_tasks', + enable_modules=[__name__] + ) + + # flat to make sure we get at least one pong + got_pong: bool = False + + async with portal.open_context( + one_task_streams_and_one_handles_reqresp, + + ) as (ctx, first): + + assert first is None + + async with ctx.open_stream() as stream: + + await stream.send('ping') + + async for msg in stream: + print(f'client received: {msg}') + + assert msg in {'pong', 'yo'} + + if msg == 'pong': + got_pong = True + await stream.send('ping') + print('client sent ping') + + assert got_pong + + try: + trio.run(main) + except trio.TooSlowError: + pass diff --git a/tests/test_debugger.py b/tests/test_debugger.py index 910e37a..8b9220b 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -8,6 +8,7 @@ TODO: None of these tests have been run successfully on windows yet. """ import time from os import path +import platform import pytest import pexpect @@ -25,6 +26,13 @@ from conftest import repodir # - recurrent root errors +if platform.system() == 'Windows': + pytest.skip( + 'Debugger tests have no windows support (yet)', + allow_module_level=True, + ) + + def examples_dir(): """Return the abspath to the examples directory. """ diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py index 632d85c..5f47419 100644 --- a/tests/test_docs_examples.py +++ b/tests/test_docs_examples.py @@ -84,8 +84,8 @@ def run_example_in_subproc(loglevel, testdir, arb_addr): if '__' not in f and f[0] != '_' - and 'debugging' not in p[0] - ], + and 'debugging' not in p[0]], + ids=lambda t: t[1], ) def test_example(run_example_in_subproc, example_script): @@ -98,6 +98,10 @@ def test_example(run_example_in_subproc, example_script): test_example``. """ ex_file = os.path.join(*example_script) + + if 'rpc_bidir_streaming' in ex_file and sys.version_info < (3, 9): + pytest.skip("2-way streaming example requires py3.9 async with syntax") + with open(ex_file, 'r') as ex: code = ex.read() diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 8d8169e..8c8d07e 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -32,13 +32,16 @@ async def async_gen_stream(sequence): # block indefinitely waiting to be cancelled by ``aclose()`` call with trio.CancelScope() as cs: - await trio.sleep(float('inf')) + await trio.sleep_forever() assert 0 assert cs.cancelled_caught @tractor.stream -async def context_stream(ctx, sequence): +async def context_stream( + ctx: tractor.Context, + sequence +): for i in sequence: await ctx.send_yield(i) await trio.sleep(0.1) @@ -338,6 +341,8 @@ async def test_respawn_consumer_task( print("all values streamed, BREAKING") break + cs.cancel() + # TODO: this is justification for a # ``ActorNursery.stream_from_actor()`` helper? await portal.cancel_actor() diff --git a/tractor/__init__.py b/tractor/__init__.py index 7e6f800..a7cadb9 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -5,11 +5,21 @@ tractor: An actor model micro-framework built on from trio import MultiError from ._ipc import Channel -from ._streaming import Context, stream +from ._streaming import ( + Context, + ReceiveMsgStream, + MsgStream, + stream, + context, +) from ._discovery import get_arbiter, find_actor, wait_for_actor from ._trionics import open_nursery from ._state import current_actor, is_root_process -from ._exceptions import RemoteActorError, ModuleNotExposed +from ._exceptions import ( + RemoteActorError, + ModuleNotExposed, + ContextCancelled, +) from ._debug import breakpoint, post_mortem from . import msg from ._root import run, run_daemon, open_root_actor @@ -21,6 +31,7 @@ __all__ = [ 'ModuleNotExposed', 'MultiError', 'RemoteActorError', + 'ContextCancelled', 'breakpoint', 'current_actor', 'find_actor', @@ -33,7 +44,9 @@ __all__ = [ 'run', 'run_daemon', 'stream', - 'wait_for_actor', + 'context', + 'ReceiveMsgStream', + 'MsgStream', 'to_asyncio', 'wait_for_actor', ] diff --git a/tractor/_actor.py b/tractor/_actor.py index 95e8592..c45449d 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -14,6 +14,7 @@ from types import ModuleType import sys import os from contextlib import ExitStack +import warnings import trio # type: ignore from trio_typing import TaskStatus @@ -27,6 +28,7 @@ from ._exceptions import ( unpack_error, ModuleNotExposed, is_multi_cancelled, + ContextCancelled, TransportClosed, ) from . import _debug @@ -44,6 +46,7 @@ class ActorFailure(Exception): async def _invoke( + actor: 'Actor', cid: str, chan: Channel, @@ -56,15 +59,44 @@ async def _invoke( """Invoke local func and deliver result(s) over provided channel. """ treat_as_gen = False - cs = None + + # possible a traceback (not sure what typing is for this..) + tb = None + cancel_scope = trio.CancelScope() - ctx = Context(chan, cid, cancel_scope) + cs: Optional[trio.CancelScope] = None + + ctx = Context(chan, cid) + context: bool = False if getattr(func, '_tractor_stream_function', False): # handle decorated ``@tractor.stream`` async functions + sig = inspect.signature(func) + params = sig.parameters + + # compat with old api kwargs['ctx'] = ctx + + if 'ctx' in params: + warnings.warn( + "`@tractor.stream decorated funcs should now declare " + "a `stream` arg, `ctx` is now designated for use with " + "@tractor.context", + DeprecationWarning, + stacklevel=2, + ) + + elif 'stream' in params: + assert 'stream' in params + kwargs['stream'] = ctx + treat_as_gen = True + elif getattr(func, '_tractor_context_function', False): + # handle decorated ``@tractor.context`` async function + kwargs['ctx'] = ctx + context = True + # errors raised inside this block are propgated back to caller try: if not ( @@ -102,52 +134,106 @@ async def _invoke( # `StopAsyncIteration` system here for returning a final # value if desired await chan.send({'stop': True, 'cid': cid}) - else: - if treat_as_gen: - await chan.send({'functype': 'asyncgen', 'cid': cid}) - # XXX: the async-func may spawn further tasks which push - # back values like an async-generator would but must - # manualy construct the response dict-packet-responses as - # above - with cancel_scope as cs: - task_status.started(cs) - await coro - if not cs.cancelled_caught: - # task was not cancelled so we can instruct the - # far end async gen to tear down - await chan.send({'stop': True, 'cid': cid}) - else: - # regular async function - await chan.send({'functype': 'asyncfunc', 'cid': cid}) - with cancel_scope as cs: - task_status.started(cs) + + # one way @stream func that gets treated like an async gen + elif treat_as_gen: + await chan.send({'functype': 'asyncgen', 'cid': cid}) + # XXX: the async-func may spawn further tasks which push + # back values like an async-generator would but must + # manualy construct the response dict-packet-responses as + # above + with cancel_scope as cs: + task_status.started(cs) + await coro + + if not cs.cancelled_caught: + # task was not cancelled so we can instruct the + # far end async gen to tear down + await chan.send({'stop': True, 'cid': cid}) + + elif context: + # context func with support for bi-dir streaming + await chan.send({'functype': 'context', 'cid': cid}) + + async with trio.open_nursery() as scope_nursery: + ctx._scope_nursery = scope_nursery + cs = scope_nursery.cancel_scope + task_status.started(cs) + try: await chan.send({'return': await coro, 'cid': cid}) + except trio.Cancelled as err: + tb = err.__traceback__ + + if cs.cancelled_caught: + + # TODO: pack in ``trio.Cancelled.__traceback__`` here + # so they can be unwrapped and displayed on the caller + # side! + + fname = func.__name__ + if ctx._cancel_called: + msg = f'{fname} cancelled itself' + + elif cs.cancel_called: + msg = ( + f'{fname} was remotely cancelled by its caller ' + f'{ctx.chan.uid}' + ) + + # task-contex was cancelled so relay to the cancel to caller + raise ContextCancelled( + msg, + suberror_type=trio.Cancelled, + ) + + else: + # regular async function + await chan.send({'functype': 'asyncfunc', 'cid': cid}) + with cancel_scope as cs: + task_status.started(cs) + await chan.send({'return': await coro, 'cid': cid}) except (Exception, trio.MultiError) as err: - # TODO: maybe we'll want differnet "levels" of debugging - # eventualy such as ('app', 'supervisory', 'runtime') ? - if not isinstance(err, trio.ClosedResourceError) and ( - not is_multi_cancelled(err) - ): - # XXX: is there any case where we'll want to debug IPC - # disconnects? I can't think of a reason that inspecting - # this type of failure will be useful for respawns or - # recovery logic - the only case is some kind of strange bug - # in `trio` itself? - entered = await _debug._maybe_enter_pm(err) - if not entered: + if not is_multi_cancelled(err): + + log.exception("Actor crashed:") + + # TODO: maybe we'll want different "levels" of debugging + # eventualy such as ('app', 'supervisory', 'runtime') ? + + # if not isinstance(err, trio.ClosedResourceError) and ( + # if not is_multi_cancelled(err) and ( + + entered_debug: bool = False + if not isinstance(err, ContextCancelled) or ( + isinstance(err, ContextCancelled) and ctx._cancel_called + ): + # XXX: is there any case where we'll want to debug IPC + # disconnects as a default? + # + # I can't think of a reason that inspecting + # this type of failure will be useful for respawns or + # recovery logic - the only case is some kind of strange bug + # in our transport layer itself? Going to keep this + # open ended for now. + + entered_debug = await _debug._maybe_enter_pm(err) + + if not entered_debug: log.exception("Actor crashed:") # always ship errors back to caller - err_msg = pack_error(err) + err_msg = pack_error(err, tb=tb) err_msg['cid'] = cid try: await chan.send(err_msg) except trio.ClosedResourceError: - log.warning( - f"Failed to ship error to caller @ {chan.uid}") + # if we can't propagate the error that's a big boo boo + log.error( + f"Failed to ship error to caller @ {chan.uid} !?" + ) if cs is None: # error is from above code not from rpc invocation @@ -165,7 +251,7 @@ async def _invoke( f"Task {func} likely errored or cancelled before it started") finally: if not actor._rpc_tasks: - log.info("All RPC tasks have completed") + log.runtime("All RPC tasks have completed") actor._ongoing_rpc_tasks.set() @@ -180,10 +266,10 @@ _lifetime_stack: ExitStack = ExitStack() class Actor: """The fundamental concurrency primitive. - An *actor* is the combination of a regular Python or - ``multiprocessing.Process`` executing a ``trio`` task tree, communicating + An *actor* is the combination of a regular Python process + executing a ``trio`` task tree, communicating with other actors through "portals" which provide a native async API - around "channels". + around various IPC transport "channels". """ is_arbiter: bool = False @@ -327,14 +413,18 @@ class Actor: raise mne async def _stream_handler( + self, stream: trio.SocketStream, + ) -> None: """Entry point for new inbound connections to the channel server. + """ self._no_more_peers = trio.Event() # unset + chan = Channel(stream=stream) - log.info(f"New connection to us {chan}") + log.runtime(f"New connection to us {chan}") # send/receive initial handshake response try: @@ -365,11 +455,16 @@ class Actor: event.set() chans = self._peers[uid] + + # TODO: re-use channels for new connections instead + # of always new ones; will require changing all the + # discovery funcs if chans: - log.warning( + log.runtime( f"already have channel(s) for {uid}:{chans}?" ) - log.trace(f"Registered {chan} for {uid}") # type: ignore + + log.runtime(f"Registered {chan} for {uid}") # type: ignore # append new channel self._peers[uid].append(chan) @@ -378,10 +473,24 @@ class Actor: try: await self._process_messages(chan) finally: + + # channel cleanup sequence + + # for (channel, cid) in self._rpc_tasks.copy(): + # if channel is chan: + # with trio.CancelScope(shield=True): + # await self._cancel_task(cid, channel) + + # # close all consumer side task mem chans + # send_chan, _ = self._cids2qs[(chan.uid, cid)] + # assert send_chan.cid == cid # type: ignore + # await send_chan.aclose() + # Drop ref to channel so it can be gc-ed and disconnected log.debug(f"Releasing channel {chan} from {chan.uid}") chans = self._peers.get(chan.uid) chans.remove(chan) + if not chans: log.debug(f"No more channels for {chan.uid}") self._peers.pop(chan.uid, None) @@ -394,14 +503,22 @@ class Actor: # # XXX: is this necessary (GC should do it?) if chan.connected(): + # if the channel is still connected it may mean the far + # end has not closed and we may have gotten here due to + # an error and so we should at least try to terminate + # the channel from this end gracefully. + log.debug(f"Disconnecting channel {chan}") try: - # send our msg loop terminate sentinel + # send a msg loop terminate sentinel await chan.send(None) + + # XXX: do we want this? + # causes "[104] connection reset by peer" on other end # await chan.aclose() + except trio.BrokenResourceError: - log.exception( - f"Channel for {chan.uid} was already zonked..") + log.warning(f"Channel for {chan.uid} was already closed") async def _push_result( self, @@ -411,22 +528,32 @@ class Actor: ) -> None: """Push an RPC result to the local consumer's queue. """ - actorid = chan.uid - assert actorid, f"`actorid` can't be {actorid}" - send_chan, recv_chan = self._cids2qs[(actorid, cid)] + # actorid = chan.uid + assert chan.uid, f"`chan.uid` can't be {chan.uid}" + send_chan, recv_chan = self._cids2qs[(chan.uid, cid)] assert send_chan.cid == cid # type: ignore - if 'stop' in msg: - log.debug(f"{send_chan} was terminated at remote end") - # indicate to consumer that far end has stopped - return await send_chan.aclose() + # if 'error' in msg: + # ctx = getattr(recv_chan, '_ctx', None) + # if ctx: + # ctx._error_from_remote_msg(msg) + + # log.debug(f"{send_chan} was terminated at remote end") + # # indicate to consumer that far end has stopped + # return await send_chan.aclose() try: - log.debug(f"Delivering {msg} from {actorid} to caller {cid}") + log.debug(f"Delivering {msg} from {chan.uid} to caller {cid}") # maintain backpressure await send_chan.send(msg) except trio.BrokenResourceError: + # TODO: what is the right way to handle the case where the + # local task has already sent a 'stop' / StopAsyncInteration + # to the other side but and possibly has closed the local + # feeder mem chan? Do we wait for some kind of ack or just + # let this fail silently and bubble up (currently)? + # XXX: local consumer has closed their side # so cancel the far end streaming task log.warning(f"{send_chan} consumer is already closed") @@ -435,7 +562,9 @@ class Actor: self, actorid: Tuple[str, str], cid: str + ) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]: + log.debug(f"Getting result queue for {actorid} cid {cid}") try: send_chan, recv_chan = self._cids2qs[(actorid, cid)] @@ -489,23 +618,28 @@ class Actor: task_status.started(loop_cs) async for msg in chan: if msg is None: # loop terminate sentinel + log.debug( f"Cancelling all tasks for {chan} from {chan.uid}") - for (channel, cid) in self._rpc_tasks: + + for (channel, cid) in self._rpc_tasks.copy(): if channel is chan: await self._cancel_task(cid, channel) + log.debug( f"Msg loop signalled to terminate for" f" {chan} from {chan.uid}") + break - log.trace( # type: ignore + log.transport( # type: ignore f"Received msg {msg} from {chan.uid}") cid = msg.get('cid') if cid: # deliver response to local caller/waiter await self._push_result(chan, cid, msg) + log.debug( f"Waiting on next msg for {chan} from {chan.uid}") continue @@ -566,7 +700,7 @@ class Actor: else: # mark that we have ongoing rpc tasks self._ongoing_rpc_tasks = trio.Event() - log.info(f"RPC func is {func}") + log.runtime(f"RPC func is {func}") # store cancel scope such that the rpc task can be # cancelled gracefully if requested self._rpc_tasks[(chan, cid)] = ( @@ -575,7 +709,7 @@ class Actor: # self.cancel() was called so kill this msg loop # and break out into ``_async_main()`` log.warning( - f"{self.uid} was remotely cancelled; " + f"Actor {self.uid} was remotely cancelled; " "waiting on cancellation completion..") await self._cancel_complete.wait() loop_cs.cancel() @@ -1043,7 +1177,7 @@ class Actor: raise ValueError(f"{uid} is not a valid uid?!") chan.uid = uid - log.info(f"Handshake with actor {uid}@{chan.raddr} complete") + log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete") return uid diff --git a/tractor/_debug.py b/tractor/_debug.py index 75e502a..c76422a 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -102,7 +102,7 @@ class PdbwTeardown(pdbpp.Pdb): # async with aclosing(async_stdin): # async for msg in async_stdin: -# log.trace(f"Stdin input:\n{msg}") +# log.runtime(f"Stdin input:\n{msg}") # # encode to bytes # bmsg = str.encode(msg) @@ -276,7 +276,7 @@ def _set_trace(actor=None): pdb = _mk_pdb() if actor is not None: - log.runtime(f"\nAttaching pdb to actor: {actor.uid}\n") + log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n") pdb.set_trace( # start 2 levels up in user code @@ -306,7 +306,7 @@ breakpoint = partial( def _post_mortem(actor): - log.runtime(f"\nAttaching to pdb in crashed actor: {actor.uid}\n") + log.pdb(f"\nAttaching to pdb in crashed actor: {actor.uid}\n") pdb = _mk_pdb() # custom Pdb post-mortem entry diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 9e520b3..bcfcc84 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -16,12 +16,14 @@ from ._state import current_actor, _runtime_vars @asynccontextmanager async def get_arbiter( + host: str, port: int, + ) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: - """Return a portal instance connected to a local or remote + '''Return a portal instance connected to a local or remote arbiter. - """ + ''' actor = current_actor() if not actor: @@ -33,16 +35,20 @@ async def get_arbiter( yield LocalPortal(actor, Channel((host, port))) else: async with _connect_chan(host, port) as chan: + async with open_portal(chan) as arb_portal: + yield arb_portal @asynccontextmanager async def get_root( -**kwargs, + **kwargs, ) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: + host, port = _runtime_vars['_root_mailbox'] assert host is not None + async with _connect_chan(host, port) as chan: async with open_portal(chan, **kwargs) as portal: yield portal @@ -60,12 +66,16 @@ async def find_actor( """ actor = current_actor() async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: + sockaddr = await arb_portal.run_from_ns('self', 'find_actor', name=name) + # TODO: return portals to all available actors - for now just # the last one that registered if name == 'arbiter' and actor.is_arbiter: raise RuntimeError("The current actor is the arbiter") + elif sockaddr: + async with _connect_chan(*sockaddr) as chan: async with open_portal(chan) as portal: yield portal @@ -83,9 +93,12 @@ async def wait_for_actor( A portal to the first registered actor is returned. """ actor = current_actor() + async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: + sockaddrs = await arb_portal.run_from_ns('self', 'wait_for_actor', name=name) sockaddr = sockaddrs[-1] + async with _connect_chan(*sockaddr) as chan: async with open_portal(chan) as portal: yield portal diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index f6d9f47..30c872b 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -1,7 +1,7 @@ """ Our classy exception set. """ -from typing import Dict, Any +from typing import Dict, Any, Optional, Type import importlib import builtins import traceback @@ -15,17 +15,16 @@ _this_mod = importlib.import_module(__name__) class RemoteActorError(Exception): # TODO: local recontruction of remote exception deats "Remote actor exception bundled locally" - def __init__(self, message, type_str, **msgdata) -> None: - super().__init__(message) - for ns in [builtins, _this_mod, trio]: - try: - self.type = getattr(ns, type_str) - break - except AttributeError: - continue - else: - self.type = Exception + def __init__( + self, + message: str, + suberror_type: Optional[Type[BaseException]] = None, + **msgdata + ) -> None: + super().__init__(message) + + self.type = suberror_type self.msgdata = msgdata # TODO: a trio.MultiError.catch like context manager @@ -41,6 +40,9 @@ class InternalActorError(RemoteActorError): class TransportClosed(trio.ClosedResourceError): "Underlying channel transport was closed prior to use" +class ContextCancelled(RemoteActorError): + "Inter-actor task context cancelled itself on the callee side." + class NoResult(RuntimeError): "No final result is expected for this actor" @@ -54,13 +56,22 @@ class NoRuntime(RuntimeError): "The root actor has not been initialized yet" -def pack_error(exc: BaseException) -> Dict[str, Any]: +def pack_error( + exc: BaseException, + tb = None, + +) -> Dict[str, Any]: """Create an "error message" for tranmission over a channel (aka the wire). """ + if tb: + tb_str = ''.join(traceback.format_tb(tb)) + else: + tb_str = traceback.format_exc() + return { 'error': { - 'tb_str': traceback.format_exc(), + 'tb_str': tb_str, 'type_str': type(exc).__name__, } } @@ -77,12 +88,35 @@ def unpack_error( into a local ``RemoteActorError``. """ - tb_str = msg['error'].get('tb_str', '') - return err_type( - f"{chan.uid}\n" + tb_str, + error = msg['error'] + + tb_str = error.get('tb_str', '') + message = f"{chan.uid}\n" + tb_str + type_name = error['type_str'] + suberror_type: Type[BaseException] = Exception + + if type_name == 'ContextCancelled': + err_type = ContextCancelled + suberror_type = trio.Cancelled + + else: # try to lookup a suitable local error type + for ns in [builtins, _this_mod, trio]: + try: + suberror_type = getattr(ns, type_name) + break + except AttributeError: + continue + + exc = err_type( + message, + suberror_type=suberror_type, + + # unpack other fields into error type init **msg['error'], ) + return exc + def is_multi_cancelled(exc: BaseException) -> bool: """Predicate to determine if a ``trio.MultiError`` contains only diff --git a/tractor/_ipc.py b/tractor/_ipc.py index efe388e..08057e9 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -1,5 +1,6 @@ """ Inter-process comms abstractions + """ import platform import typing @@ -61,7 +62,6 @@ class MsgpackTCPStream: use_list=False, ) while True: - try: data = await self.stream.receive_some(2**10) @@ -88,7 +88,7 @@ class MsgpackTCPStream: else: raise - log.trace(f"received {data}") # type: ignore + log.transport(f"received {data}") # type: ignore if data == b'': raise TransportClosed( @@ -169,6 +169,7 @@ class Channel: return self.msgstream.raddr if self.msgstream else None async def connect( + self, destaddr: Tuple[Any, ...] = None, **kwargs @@ -180,13 +181,21 @@ class Channel: destaddr = destaddr or self._destaddr assert isinstance(destaddr, tuple) - stream = await trio.open_tcp_stream(*destaddr, **kwargs) + + stream = await trio.open_tcp_stream( + *destaddr, + **kwargs + ) self.msgstream = MsgpackTCPStream(stream) + + log.transport( + f'Opened channel to peer {self.laddr} -> {self.raddr}' + ) return stream async def send(self, item: Any) -> None: - log.trace(f"send `{item}`") # type: ignore + log.transport(f"send `{item}`") # type: ignore assert self.msgstream await self.msgstream.send(item) @@ -205,7 +214,8 @@ class Channel: raise async def aclose(self) -> None: - log.debug( + + log.transport( f'Closing channel to {self.uid} ' f'{self.laddr} -> {self.raddr}' ) @@ -234,11 +244,11 @@ class Channel: await self.connect() cancelled = cancel_scope.cancelled_caught if cancelled: - log.warning( + log.transport( "Reconnect timed out after 3 seconds, retrying...") continue else: - log.warning("Stream connection re-established!") + log.transport("Stream connection re-established!") # run any reconnection sequence on_recon = self._recon_seq if on_recon: @@ -247,7 +257,7 @@ class Channel: except (OSError, ConnectionRefusedError): if not down: down = True - log.warning( + log.transport( f"Connection to {self.raddr} went down, waiting" " for re-establishment") await trio.sleep(1) diff --git a/tractor/_portal.py b/tractor/_portal.py index d82e040..44e8630 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -17,7 +17,12 @@ from async_generator import asynccontextmanager from ._state import current_actor from ._ipc import Channel from .log import get_logger -from ._exceptions import unpack_error, NoResult, RemoteActorError +from ._exceptions import ( + unpack_error, + NoResult, + RemoteActorError, + ContextCancelled, +) from ._streaming import Context, ReceiveMsgStream @@ -84,7 +89,7 @@ class Portal: ns: str, func: str, kwargs, - ) -> Tuple[str, trio.abc.ReceiveChannel, str, Dict[str, Any]]: + ) -> Tuple[str, trio.MemoryReceiveChannel, str, Dict[str, Any]]: """Submit a function to be scheduled and run by actor, return the associated caller id, response queue, response type str, first message packet as a tuple. @@ -172,6 +177,7 @@ class Portal: f"Cancelling all streams with {self.channel.uid}") for stream in self._streams.copy(): try: + # with trio.CancelScope(shield=True): await stream.aclose() except trio.ClosedResourceError: # don't error the stream having already been closed @@ -289,6 +295,7 @@ class Portal: self, async_gen_func: Callable, # typing: ignore **kwargs, + ) -> AsyncGenerator[ReceiveMsgStream, None]: if not inspect.isasyncgenfunction(async_gen_func): @@ -312,13 +319,23 @@ class Portal: ctx = Context(self.channel, cid, _portal=self) try: - async with ReceiveMsgStream(ctx, recv_chan, self) as rchan: + # deliver receive only stream + async with ReceiveMsgStream(ctx, recv_chan) as rchan: self._streams.add(rchan) yield rchan + finally: + # cancel the far end task on consumer close + # NOTE: this is a special case since we assume that if using + # this ``.open_fream_from()`` api, the stream is one a one + # time use and we couple the far end tasks's lifetime to + # the consumer's scope; we don't ever send a `'stop'` + # message right now since there shouldn't be a reason to + # stop and restart the stream, right? try: await ctx.cancel() + except trio.ClosedResourceError: # if the far end terminates before we send a cancel the # underlying transport-channel may already be closed. @@ -326,16 +343,123 @@ class Portal: self._streams.remove(rchan) - # @asynccontextmanager - # async def open_context( - # self, - # func: Callable, - # **kwargs, - # ) -> Context: - # # TODO - # elif resptype == 'context': # context manager style setup/teardown - # # TODO likely not here though - # raise NotImplementedError + @asynccontextmanager + async def open_context( + + self, + func: Callable, + **kwargs, + + ) -> AsyncGenerator[Tuple[Context, Any], None]: + '''Open an inter-actor task context. + + This is a synchronous API which allows for deterministic + setup/teardown of a remote task. The yielded ``Context`` further + allows for opening bidirectional streams, explicit cancellation + and synchronized final result collection. See ``tractor.Context``. + + ''' + + # conduct target func method structural checks + if not inspect.iscoroutinefunction(func) and ( + getattr(func, '_tractor_contex_function', False) + ): + raise TypeError( + f'{func} must be an async generator function!') + + fn_mod_path, fn_name = func_deats(func) + + recv_chan: Optional[trio.MemoryReceiveChannel] = None + + cid, recv_chan, functype, first_msg = await self._submit( + fn_mod_path, fn_name, kwargs) + + assert functype == 'context' + msg = await recv_chan.receive() + + try: + # the "first" value here is delivered by the callee's + # ``Context.started()`` call. + first = msg['started'] + + except KeyError: + assert msg.get('cid'), ("Received internal error at context?") + + if msg.get('error'): + # raise the error message + raise unpack_error(msg, self.channel) + else: + raise + + _err: Optional[BaseException] = None + # deliver context instance and .started() msg value in open tuple. + try: + async with trio.open_nursery() as scope_nursery: + ctx = Context( + self.channel, + cid, + _portal=self, + _recv_chan=recv_chan, + _scope_nursery=scope_nursery, + ) + + # pairs with handling in ``Actor._push_result()`` + # recv_chan._ctx = ctx + + # await trio.lowlevel.checkpoint() + yield ctx, first + + except ContextCancelled as err: + _err = err + if not ctx._cancel_called: + # context was cancelled at the far end but was + # not part of this end requesting that cancel + # so raise for the local task to respond and handle. + raise + + # if the context was cancelled by client code + # then we don't need to raise since user code + # is expecting this and the block should exit. + else: + log.debug(f'Context {ctx} cancelled gracefully') + + except ( + trio.Cancelled, + trio.MultiError, + Exception, + ) as err: + _err = err + # the context cancels itself on any cancel + # causing error. + log.error(f'Context {ctx} sending cancel to far end') + with trio.CancelScope(shield=True): + await ctx.cancel() + raise + + finally: + result = await ctx.result() + + # though it should be impossible for any tasks + # operating *in* this scope to have survived + # we tear down the runtime feeder chan last + # to avoid premature stream clobbers. + if recv_chan is not None: + await recv_chan.aclose() + + if _err: + if ctx._cancel_called: + log.warning( + f'Context {fn_name} cancelled by caller with\n{_err}' + ) + elif _err is not None: + log.warning( + f'Context {fn_name} cancelled by callee with\n{_err}' + ) + else: + log.info( + f'Context {fn_name} returned ' + f'value from callee `{result}`' + ) @dataclass @@ -360,10 +484,12 @@ class LocalPortal: @asynccontextmanager async def open_portal( + channel: Channel, nursery: Optional[trio.Nursery] = None, start_msg_loop: bool = True, shield: bool = False, + ) -> AsyncGenerator[Portal, None]: """Open a ``Portal`` through the provided ``channel``. @@ -374,6 +500,7 @@ async def open_portal( was_connected = False async with maybe_open_nursery(nursery, shield=shield) as nursery: + if not channel.connected(): await channel.connect() was_connected = True @@ -395,12 +522,14 @@ async def open_portal( portal = Portal(channel) try: yield portal + finally: await portal.aclose() if was_connected: - # cancel remote channel-msg loop + # gracefully signal remote channel-msg loop await channel.send(None) + # await channel.aclose() # cancel background msg loop task if msg_loop_cs: diff --git a/tractor/_root.py b/tractor/_root.py index f5bd778..8391f4c 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -86,6 +86,9 @@ async def open_root_actor( # for use of ``await tractor.breakpoint()`` enable_modules.append('tractor._debug') + if loglevel is None: + loglevel = 'pdb' + elif debug_mode: raise RuntimeError( "Debug mode is only supported for the `trio` backend!" @@ -179,8 +182,7 @@ async def open_root_actor( finally: logger.info("Shutting down root actor") - with trio.CancelScope(shield=True): - await actor.cancel() + await actor.cancel() finally: _state._current_actor = None logger.info("Root actor terminated") diff --git a/tractor/_spawn.py b/tractor/_spawn.py index d893479..678250b 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -22,7 +22,10 @@ from multiprocessing import forkserver # type: ignore from typing import Tuple from . import _forkserver_override -from ._state import current_actor, is_main_process +from ._state import ( + current_actor, + is_main_process, +) from .log import get_logger from ._portal import Portal from ._actor import Actor, ActorFailure @@ -149,6 +152,27 @@ async def cancel_on_completion( await portal.cancel_actor() +async def do_hard_kill( + proc: trio.Process, +) -> None: + # NOTE: this timeout used to do nothing since we were shielding + # the ``.wait()`` inside ``new_proc()`` which will pretty much + # never release until the process exits, now it acts as + # a hard-kill time ultimatum. + with trio.move_on_after(3) as cs: + + # NOTE: This ``__aexit__()`` shields internally. + async with proc: # calls ``trio.Process.aclose()`` + log.debug(f"Terminating {proc}") + + if cs.cancelled_caught: + # XXX: should pretty much never get here unless we have + # to move the bits from ``proc.__aexit__()`` out and + # into here. + log.critical(f"HARD KILLING {proc}") + proc.kill() + + @asynccontextmanager async def spawn_subactor( subactor: 'Actor', @@ -180,26 +204,15 @@ async def spawn_subactor( proc = await trio.open_process(spawn_cmd) try: yield proc + finally: + # XXX: do this **after** cancellation/tearfown # to avoid killing the process too early # since trio does this internally on ``__aexit__()`` - # NOTE: we always "shield" join sub procs in - # the outer scope since no actor zombies are - # ever allowed. This ``__aexit__()`` also shields - # internally. log.debug(f"Attempting to kill {proc}") - - # NOTE: this timeout effectively does nothing right now since - # we are shielding the ``.wait()`` inside ``new_proc()`` which - # will pretty much never release until the process exits. - with trio.move_on_after(3) as cs: - async with proc: - log.debug(f"Terminating {proc}") - if cs.cancelled_caught: - log.critical(f"HARD KILLING {proc}") - proc.kill() + await do_hard_kill(proc) async def new_proc( @@ -212,7 +225,6 @@ async def new_proc( parent_addr: Tuple[str, int], _runtime_vars: Dict[str, Any], # serialized and sent to _child *, - use_trio_run_in_process: bool = False, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: """Create a new ``multiprocessing.Process`` using the @@ -223,7 +235,7 @@ async def new_proc( # mark the new actor with the global spawn method subactor._spawn_method = _spawn_method - if use_trio_run_in_process or _spawn_method == 'trio': + if _spawn_method == 'trio': async with trio.open_nursery() as nursery: async with spawn_subactor( subactor, @@ -277,9 +289,14 @@ async def new_proc( # reaping more stringently without the shield # we used to have below... - # always "hard" join sub procs: - # no actor zombies allowed # with trio.CancelScope(shield=True): + # async with proc: + + # Always "hard" join sub procs since no actor zombies + # are allowed! + + # this is a "light" (cancellable) join, the hard join is + # in the enclosing scope (see above). await proc.wait() log.debug(f"Joined {proc}") @@ -320,7 +337,6 @@ async def mp_new_proc( parent_addr: Tuple[str, int], _runtime_vars: Dict[str, Any], # serialized and sent to _child *, - use_trio_run_in_process: bool = False, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 0836f4e..eead6f6 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -1,41 +1,312 @@ +""" +Message stream types and APIs. + +""" import inspect -from contextlib import contextmanager # , asynccontextmanager +from contextlib import contextmanager, asynccontextmanager from dataclasses import dataclass -from typing import Any, Iterator, Optional +from typing import ( + Any, Iterator, Optional, Callable, + AsyncGenerator, Dict, +) + import warnings import trio from ._ipc import Channel -from ._exceptions import unpack_error +from ._exceptions import unpack_error, ContextCancelled +from ._state import current_actor from .log import get_logger log = get_logger(__name__) -@dataclass(frozen=True) -class Context: - """An IAC (inter-actor communication) context. +# TODO: generic typing like trio's receive channel +# but with msgspec messages? +# class ReceiveChannel(AsyncResource, Generic[ReceiveType]): - Allows maintaining task or protocol specific state between communicating - actors. A unique context is created on the receiving end for every request - to a remote actor. - A context can be cancelled and (eventually) restarted from - either side of the underlying IPC channel. +class ReceiveMsgStream(trio.abc.ReceiveChannel): + """A wrapper around a ``trio._channel.MemoryReceiveChannel`` with + special behaviour for signalling stream termination across an + inter-actor ``Channel``. This is the type returned to a local task + which invoked a remote streaming function using `Portal.run()`. - A context can be used to open task oriented message streams. + Termination rules: + + - if the local task signals stop iteration a cancel signal is + relayed to the remote task indicating to stop streaming + - if the remote task signals the end of a stream, raise + a ``StopAsyncIteration`` to terminate the local ``async for`` """ + def __init__( + self, + ctx: 'Context', # typing: ignore # noqa + rx_chan: trio.abc.ReceiveChannel, + shield: bool = False, + ) -> None: + self._ctx = ctx + self._rx_chan = rx_chan + self._shielded = shield + + # flag to denote end of stream + self._eoc: bool = False + + # delegate directly to underlying mem channel + def receive_nowait(self): + msg = self._rx_chan.receive_nowait() + return msg['yield'] + + async def receive(self): + # see ``.aclose()`` for notes on the old behaviour prior to + # introducing this + if self._eoc: + raise trio.EndOfChannel + + try: + + msg = await self._rx_chan.receive() + return msg['yield'] + + except KeyError: + # internal error should never get here + assert msg.get('cid'), ("Received internal error at portal?") + + # TODO: handle 2 cases with 3.10 match syntax + # - 'stop' + # - 'error' + # possibly just handle msg['stop'] here! + + if msg.get('stop'): + log.debug(f"{self} was stopped at remote end") + + # # when the send is closed we assume the stream has + # # terminated and signal this local iterator to stop + # await self.aclose() + + # XXX: this causes ``ReceiveChannel.__anext__()`` to + # raise a ``StopAsyncIteration`` **and** in our catch + # block below it will trigger ``.aclose()``. + raise trio.EndOfChannel + + # TODO: test that shows stream raising an expected error!!! + elif msg.get('error'): + # raise the error message + raise unpack_error(msg, self._ctx.chan) + + else: + raise + + except ( + trio.ClosedResourceError, # by self._rx_chan + trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end + trio.Cancelled, # by local cancellation + ): + # XXX: we close the stream on any of these error conditions: + + # a ``ClosedResourceError`` indicates that the internal + # feeder memory receive channel was closed likely by the + # runtime after the associated transport-channel + # disconnected or broke. + + # an ``EndOfChannel`` indicates either the internal recv + # memchan exhausted **or** we raisesd it just above after + # receiving a `stop` message from the far end of the stream. + + # Previously this was triggered by calling ``.aclose()`` on + # the send side of the channel inside + # ``Actor._push_result()`` (should still be commented code + # there - which should eventually get removed), but now the + # 'stop' message handling has been put just above. + + # TODO: Locally, we want to close this stream gracefully, by + # terminating any local consumers tasks deterministically. + # One we have broadcast support, we **don't** want to be + # closing this stream and not flushing a final value to + # remaining (clone) consumers who may not have been + # scheduled to receive it yet. + + # when the send is closed we assume the stream has + # terminated and signal this local iterator to stop + await self.aclose() + + raise # propagate + + @contextmanager + def shield( + self + ) -> Iterator['ReceiveMsgStream']: # noqa + """Shield this stream's underlying channel such that a local consumer task + can be cancelled (and possibly restarted) using ``trio.Cancelled``. + + Note that here, "shielding" here guards against relaying + a ``'stop'`` message to the far end of the stream thus keeping + the stream machinery active and ready for further use, it does + not have anything to do with an internal ``trio.CancelScope``. + + """ + self._shielded = True + yield self + self._shielded = False + + async def aclose(self): + """Cancel associated remote actor task and local memory channel + on close. + + """ + # XXX: keep proper adherance to trio's `.aclose()` semantics: + # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose + rx_chan = self._rx_chan + + if rx_chan._closed: + log.warning(f"{self} is already closed") + + # this stream has already been closed so silently succeed as + # per ``trio.AsyncResource`` semantics. + # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose + return + + # TODO: broadcasting to multiple consumers + # stats = rx_chan.statistics() + # if stats.open_receive_channels > 1: + # # if we've been cloned don't kill the stream + # log.debug( + # "there are still consumers running keeping stream alive") + # return + + if self._shielded: + log.warning(f"{self} is shielded, portal channel being kept alive") + return + + # XXX: This must be set **AFTER** the shielded test above! + self._eoc = True + + # NOTE: this is super subtle IPC messaging stuff: + # Relay stop iteration to far end **iff** we're + # in bidirectional mode. If we're only streaming + # *from* one side then that side **won't** have an + # entry in `Actor._cids2qs` (maybe it should though?). + # So any `yield` or `stop` msgs sent from the caller side + # will cause key errors on the callee side since there is + # no entry for a local feeder mem chan since the callee task + # isn't expecting messages to be sent by the caller. + # Thus, we must check that this context DOES NOT + # have a portal reference to ensure this is indeed the callee + # side and can relay a 'stop'. + + # In the bidirectional case, `Context.open_stream()` will create + # the `Actor._cids2qs` entry from a call to + # `Actor.get_memchans()` and will send the stop message in + # ``__aexit__()`` on teardown so it **does not** need to be + # called here. + if not self._ctx._portal: + try: + # only for 2 way streams can we can send + # stop from the caller side + await self._ctx.send_stop() + + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + # the underlying channel may already have been pulled + # in which case our stop message is meaningless since + # it can't traverse the transport. + log.debug(f'Channel for {self} was already closed') + + # close the local mem chan ``self._rx_chan`` ??!? + + # DEFINITELY NOT if we're a bi-dir ``MsgStream``! + # BECAUSE this same core-msg-loop mem recv-chan is used to deliver + # the potential final result from the surrounding inter-actor + # `Context` so we don't want to close it until that context has + # run to completion. + + # XXX: Notes on old behaviour: + # await rx_chan.aclose() + + # In the receive-only case, ``Portal.open_stream_from()`` used + # to rely on this call explicitly on teardown such that a new + # call to ``.receive()`` after ``rx_chan`` had been closed, would + # result in us raising a ``trio.EndOfChannel`` (since we + # remapped the ``trio.ClosedResourceError`). However, now if for some + # reason the stream's consumer code tries to manually receive a new + # value before ``.aclose()`` is called **but** the far end has + # stopped `.receive()` **must** raise ``trio.EndofChannel`` in + # order to avoid an infinite hang on ``.__anext__()``; this is + # why we added ``self._eoc`` to denote stream closure indepedent + # of ``rx_chan``. + + # In theory we could still use this old method and close the + # underlying msg-loop mem chan as above and then **not** check + # for ``self._eoc`` in ``.receive()`` (if for some reason we + # think that check is a bottle neck - not likely) **but** then + # we would need to map the resulting + # ``trio.ClosedResourceError`` to a ``trio.EndOfChannel`` in + # ``.receive()`` (as it originally was before bi-dir streaming + # support) in order to trigger stream closure. The old behaviour + # is arguably more confusing since we lose detection of the + # runtime's closure of ``rx_chan`` in the case where we may + # still need to consume msgs that are "in transit" from the far + # end (eg. for ``Context.result()``). + + +class MsgStream(ReceiveMsgStream, trio.abc.Channel): + """ + Bidirectional message stream for use within an inter-actor actor + ``Context```. + + """ + async def send( + self, + data: Any + ) -> None: + '''Send a message over this stream to the far end. + + ''' + await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) + + # TODO: but make it broadcasting to consumers + def clone(self): + """Clone this receive channel allowing for multi-task + consumption from the same channel. + + """ + return MsgStream( + self._ctx, + self._rx_chan.clone(), + ) + + +@dataclass +class Context: + '''An inter-actor task communication context. + + Allows maintaining task or protocol specific state between + 2 communicating actor tasks. A unique context is created on the + callee side/end for every request to a remote actor from a portal. + + A context can be cancelled and (possibly eventually restarted) from + either side of the underlying IPC channel. + + A context can be used to open task oriented message streams and can + be thought of as an IPC aware inter-actor cancel scope. + + ''' chan: Channel cid: str # only set on the caller side _portal: Optional['Portal'] = None # type: ignore # noqa + _recv_chan: Optional[trio.MemoryReceiveChannel] = None + _result: Optional[Any] = False + _cancel_called: bool = False # only set on the callee side - _cancel_scope: Optional[trio.CancelScope] = None + _scope_nursery: Optional[trio.Nursery] = None async def send_yield(self, data: Any) -> None: @@ -50,53 +321,229 @@ class Context: async def send_stop(self) -> None: await self.chan.send({'stop': True, 'cid': self.cid}) + def _error_from_remote_msg( + self, + msg: Dict[str, Any], + + ) -> None: + '''Unpack and raise a msg error into the local scope + nursery for this context. + + Acts as a form of "relay" for a remote error raised + in the corresponding remote callee task. + ''' + assert self._scope_nursery + + async def raiser(): + raise unpack_error(msg, self.chan) + + self._scope_nursery.start_soon(raiser) + async def cancel(self) -> None: - """Cancel this inter-actor-task context. + '''Cancel this inter-actor-task context. Request that the far side cancel it's current linked context, - timeout quickly to sidestep 2-generals... + Timeout quickly in an attempt to sidestep 2-generals... - """ - assert self._portal, ( - "No portal found, this is likely a callee side context") + ''' + side = 'caller' if self._portal else 'callee' - cid = self.cid - with trio.move_on_after(0.5) as cs: - cs.shield = True - log.warning( - f"Cancelling stream {cid} to " - f"{self._portal.channel.uid}") + log.warning(f'Cancelling {side} side of context to {self.chan}') - # NOTE: we're telling the far end actor to cancel a task - # corresponding to *this actor*. The far end local channel - # instance is passed to `Actor._cancel_task()` implicitly. - await self._portal.run_from_ns('self', '_cancel_task', cid=cid) + self._cancel_called = True - if cs.cancelled_caught: - # XXX: there's no way to know if the remote task was indeed - # cancelled in the case where the connection is broken or - # some other network error occurred. - if not self._portal.channel.connected(): + if side == 'caller': + if not self._portal: + raise RuntimeError( + "No portal found, this is likely a callee side context" + ) + + cid = self.cid + with trio.move_on_after(0.5) as cs: + cs.shield = True log.warning( - "May have failed to cancel remote task " - f"{cid} for {self._portal.channel.uid}") + f"Cancelling stream {cid} to " + f"{self._portal.channel.uid}") + # NOTE: we're telling the far end actor to cancel a task + # corresponding to *this actor*. The far end local channel + # instance is passed to `Actor._cancel_task()` implicitly. + await self._portal.run_from_ns('self', '_cancel_task', cid=cid) + + if cs.cancelled_caught: + # XXX: there's no way to know if the remote task was indeed + # cancelled in the case where the connection is broken or + # some other network error occurred. + # if not self._portal.channel.connected(): + if not self.chan.connected(): + log.warning( + "May have failed to cancel remote task " + f"{cid} for {self._portal.channel.uid}") + else: + # callee side remote task + + # TODO: should we have an explicit cancel message + # or is relaying the local `trio.Cancelled` as an + # {'error': trio.Cancelled, cid: "blah"} enough? + # This probably gets into the discussion in + # https://github.com/goodboy/tractor/issues/36 + assert self._scope_nursery + self._scope_nursery.cancel_scope.cancel() + + if self._recv_chan: + await self._recv_chan.aclose() + + @asynccontextmanager + async def open_stream( + + self, + shield: bool = False, + + ) -> AsyncGenerator[MsgStream, None]: + '''Open a ``MsgStream``, a bi-directional stream connected to the + cross-actor (far end) task for this ``Context``. + + This context manager must be entered on both the caller and + callee for the stream to logically be considered "connected". + + A ``MsgStream`` is currently "one-shot" use, meaning if you + close it you can not "re-open" it for streaming and instead you + must re-establish a new surrounding ``Context`` using + ``Portal.open_context()``. In the future this may change but + currently there seems to be no obvious reason to support + "re-opening": + - pausing a stream can be done with a message. + - task errors will normally require a restart of the entire + scope of the inter-actor task context due to the nature of + ``trio``'s cancellation system. + + ''' + actor = current_actor() + + # here we create a mem chan that corresponds to the + # far end caller / callee. + + # NOTE: in one way streaming this only happens on the + # caller side inside `Actor.send_cmd()` so if you try + # to send a stop from the caller to the callee in the + # single-direction-stream case you'll get a lookup error + # currently. + _, recv_chan = actor.get_memchans( + self.chan.uid, + self.cid + ) + + # Likewise if the surrounding context has been cancelled we error here + # since it likely means the surrounding block was exited or + # killed + + if self._cancel_called: + task = trio.lowlevel.current_task().name + raise ContextCancelled( + f'Context around {actor.uid[0]}:{task} was already cancelled!' + ) + + # XXX: If the underlying channel feeder receive mem chan has + # been closed then likely client code has already exited + # a ``.open_stream()`` block prior or there was some other + # unanticipated error or cancellation from ``trio``. + + if recv_chan._closed: + raise trio.ClosedResourceError( + 'The underlying channel for this stream was already closed!?') + + async with MsgStream( + ctx=self, + rx_chan=recv_chan, + shield=shield, + ) as rchan: + + if self._portal: + self._portal._streams.add(rchan) + + try: + # ensure we aren't cancelled before delivering + # the stream + # await trio.lowlevel.checkpoint() + yield rchan + + except trio.EndOfChannel: + # likely the far end sent us a 'stop' message to + # terminate the stream. + raise + + else: + # XXX: Make the stream "one-shot use". On exit, signal + # ``trio.EndOfChannel``/``StopAsyncIteration`` to the + # far end. + await self.send_stop() + + finally: + if self._portal: + self._portal._streams.remove(rchan) + + async def result(self) -> Any: + '''From a caller side, wait for and return the final result from + the callee side task. + + ''' + assert self._portal, "Context.result() can not be called from callee!" + assert self._recv_chan + + if self._result is False: + + if not self._recv_chan._closed: # type: ignore + + # wait for a final context result consuming + # and discarding any bi dir stream msgs still + # in transit from the far end. + while True: + + msg = await self._recv_chan.receive() + try: + self._result = msg['return'] + break + except KeyError: + + if 'yield' in msg: + # far end task is still streaming to us.. + log.warning(f'Remote stream deliverd {msg}') + # do disard + continue + + elif 'stop' in msg: + log.debug('Remote stream terminated') + continue + + # internal error should never get here + assert msg.get('cid'), ( + "Received internal error at portal?") + raise unpack_error(msg, self._portal.channel) + + return self._result + + async def started(self, value: Optional[Any] = None) -> None: + + if self._portal: + raise RuntimeError( + f"Caller side context {self} can not call started!") + + await self.chan.send({'started': value, 'cid': self.cid}) + + # TODO: do we need a restart api? # async def restart(self) -> None: - # # TODO - # pass - - # @asynccontextmanager - # async def open_stream( - # self, - # ) -> AsyncContextManager: - # # TODO # pass -def stream(func): +def stream(func: Callable) -> Callable: """Mark an async function as a streaming routine with ``@stream``. + """ - func._tractor_stream_function = True + # annotate + # TODO: apply whatever solution ``mypy`` ends up picking for this: + # https://github.com/python/mypy/issues/2087#issuecomment-769266912 + func._tractor_stream_function = True # type: ignore + sig = inspect.signature(func) params = sig.parameters if 'stream' not in params and 'ctx' in params: @@ -114,147 +561,26 @@ def stream(func): ): raise TypeError( "The first argument to the stream function " - f"{func.__name__} must be `ctx: tractor.Context`" + f"{func.__name__} must be `ctx: tractor.Context` " + "(Or ``to_trio`` if using ``asyncio`` in guest mode)." ) return func -class ReceiveMsgStream(trio.abc.ReceiveChannel): - """A wrapper around a ``trio._channel.MemoryReceiveChannel`` with - special behaviour for signalling stream termination across an - inter-actor ``Channel``. This is the type returned to a local task - which invoked a remote streaming function using `Portal.run()`. - - Termination rules: - - if the local task signals stop iteration a cancel signal is - relayed to the remote task indicating to stop streaming - - if the remote task signals the end of a stream, raise a - ``StopAsyncIteration`` to terminate the local ``async for`` +def context(func: Callable) -> Callable: + """Mark an async function as a streaming routine with ``@context``. """ - def __init__( - self, - ctx: Context, - rx_chan: trio.abc.ReceiveChannel, - portal: 'Portal', # type: ignore # noqa - ) -> None: - self._ctx = ctx - self._rx_chan = rx_chan - self._portal = portal - self._shielded = False + # annotate + # TODO: apply whatever solution ``mypy`` ends up picking for this: + # https://github.com/python/mypy/issues/2087#issuecomment-769266912 + func._tractor_context_function = True # type: ignore - # delegate directly to underlying mem channel - def receive_nowait(self): - return self._rx_chan.receive_nowait() - - async def receive(self): - try: - msg = await self._rx_chan.receive() - return msg['yield'] - - except KeyError: - # internal error should never get here - assert msg.get('cid'), ("Received internal error at portal?") - - # TODO: handle 2 cases with 3.10 match syntax - # - 'stop' - # - 'error' - # possibly just handle msg['stop'] here! - - # TODO: test that shows stream raising an expected error!!! - if msg.get('error'): - # raise the error message - raise unpack_error(msg, self._portal.channel) - - except (trio.ClosedResourceError, StopAsyncIteration): - # XXX: this indicates that a `stop` message was - # sent by the far side of the underlying channel. - # Currently this is triggered by calling ``.aclose()`` on - # the send side of the channel inside - # ``Actor._push_result()``, but maybe it should be put here? - # to avoid exposing the internal mem chan closing mechanism? - # in theory we could instead do some flushing of the channel - # if needed to ensure all consumers are complete before - # triggering closure too early? - - # Locally, we want to close this stream gracefully, by - # terminating any local consumers tasks deterministically. - # We **don't** want to be closing this send channel and not - # relaying a final value to remaining consumers who may not - # have been scheduled to receive it yet? - - # lots of testing to do here - - # when the send is closed we assume the stream has - # terminated and signal this local iterator to stop - await self.aclose() - raise StopAsyncIteration - - except trio.Cancelled: - # relay cancels to the remote task - await self.aclose() - raise - - @contextmanager - def shield( - self - ) -> Iterator['ReceiveMsgStream']: # noqa - """Shield this stream's underlying channel such that a local consumer task - can be cancelled (and possibly restarted) using ``trio.Cancelled``. - - """ - self._shielded = True - yield self - self._shielded = False - - async def aclose(self): - """Cancel associated remote actor task and local memory channel - on close. - """ - rx_chan = self._rx_chan - - if rx_chan._closed: - log.warning(f"{self} is already closed") - return - - # stats = rx_chan.statistics() - # if stats.open_receive_channels > 1: - # # if we've been cloned don't kill the stream - # log.debug( - # "there are still consumers running keeping stream alive") - # return - - if self._shielded: - log.warning(f"{self} is shielded, portal channel being kept alive") - return - - # close the local mem chan - rx_chan.close() - - # cancel surrounding IPC context - await self._ctx.cancel() - - # TODO: but make it broadcasting to consumers - # def clone(self): - # """Clone this receive channel allowing for multi-task - # consumption from the same channel. - - # """ - # return ReceiveStream( - # self._cid, - # self._rx_chan.clone(), - # self._portal, - # ) - - -# class MsgStream(ReceiveMsgStream, trio.abc.Channel): -# """ -# Bidirectional message stream for use within an inter-actor actor -# ``Context```. - -# """ -# async def send( -# self, -# data: Any -# ) -> None: -# await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) + sig = inspect.signature(func) + params = sig.parameters + if 'ctx' not in params: + raise TypeError( + "The first argument to the context function " + f"{func.__name__} must be `ctx: tractor.Context`" + ) + return func diff --git a/tractor/_trionics.py b/tractor/_trionics.py index dcf7aa5..eea3aae 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -252,6 +252,12 @@ async def _open_and_supervise_one_cancels_all_nursery( f"Waiting on subactors {anursery._children} " "to complete" ) + + # Last bit before first nursery block ends in the case + # where we didn't error in the caller's scope + log.debug("Waiting on all subactors to complete") + anursery._join_procs.set() + except BaseException as err: # if the caller's scope errored then we activate our # one-cancels-all supervisor strategy (don't @@ -292,11 +298,6 @@ async def _open_and_supervise_one_cancels_all_nursery( else: raise - # Last bit before first nursery block ends in the case - # where we didn't error in the caller's scope - log.debug("Waiting on all subactors to complete") - anursery._join_procs.set() - # ria_nursery scope end # XXX: do we need a `trio.Cancelled` catch here as well? @@ -357,7 +358,8 @@ async def open_nursery( try: if actor is None and is_main_process(): - # if we are the parent process start the actor runtime implicitly + # if we are the parent process start the + # actor runtime implicitly log.info("Starting actor runtime!") # mark us for teardown on exit @@ -376,7 +378,6 @@ async def open_nursery( async with _open_and_supervise_one_cancels_all_nursery( actor ) as anursery: - yield anursery finally: diff --git a/tractor/log.py b/tractor/log.py index e8327a3..667c7c6 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -26,35 +26,62 @@ LOG_FORMAT = ( " {thin_white}{filename}{log_color}:{reset}{thin_white}{lineno}{log_color}" " {reset}{bold_white}{thin_white}{message}" ) + DATE_FORMAT = '%b %d %H:%M:%S' + LEVELS = { - 'GARBAGE': 1, - 'TRACE': 5, - 'PROFILE': 15, - 'RUNTIME': 500, - 'QUIET': 1000, + 'TRANSPORT': 5, + 'RUNTIME': 15, + 'PDB': 500, } + STD_PALETTE = { 'CRITICAL': 'red', 'ERROR': 'red', - 'RUNTIME': 'white', + 'PDB': 'white', 'WARNING': 'yellow', 'INFO': 'green', + 'RUNTIME': 'white', 'DEBUG': 'white', - 'TRACE': 'cyan', - 'GARBAGE': 'blue', + 'TRANSPORT': 'cyan', } + BOLD_PALETTE = { 'bold': { level: f"bold_{color}" for level, color in STD_PALETTE.items()} } +class StackLevelAdapter(logging.LoggerAdapter): + + def transport( + self, + msg: str, + + ) -> None: + return self.log(5, msg) + + def runtime( + self, + msg: str, + ) -> None: + return self.log(15, msg) + + def pdb( + self, + msg: str, + ) -> None: + return self.log(500, msg) + + def get_logger( + name: str = None, _root_name: str = _proj_name, -) -> logging.LoggerAdapter: - '''Return the package log or a sub-log for `name` if provided. + +) -> StackLevelAdapter: + '''Return the package log or a sub-logger for ``name`` if provided. + ''' log = rlog = logging.getLogger(_root_name) @@ -71,13 +98,14 @@ def get_logger( # add our actor-task aware adapter which will dynamically look up # the actor and task names at each log emit - logger = logging.LoggerAdapter(log, ActorContextInfo()) + logger = StackLevelAdapter(log, ActorContextInfo()) # additional levels for name, val in LEVELS.items(): logging.addLevelName(val, name) - # ex. create ``logger.trace()`` - setattr(logger, name.lower(), partial(logger.log, val)) + + # ensure customs levels exist as methods + assert getattr(logger, name.lower()), f'Logger does not define {name}' return logger diff --git a/tractor/msg.py b/tractor/msg.py index 560e644..28e3405 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -1,9 +1,13 @@ """ Messaging pattern APIs and helpers. + +NOTE: this module is likely deprecated by the new bi-directional streaming +support provided by ``tractor.Context.open_stream()`` and friends. + """ import inspect import typing -from typing import Dict, Any, Set, Callable +from typing import Dict, Any, Set, Callable, List, Tuple from functools import partial from async_generator import aclosing @@ -20,7 +24,7 @@ log = get_logger('messaging') async def fan_out_to_ctxs( pub_async_gen_func: typing.Callable, # it's an async gen ... gd mypy - topics2ctxs: Dict[str, set], + topics2ctxs: Dict[str, list], packetizer: typing.Callable = None, ) -> None: """Request and fan out quotes to each subscribed actor channel. @@ -34,24 +38,27 @@ async def fan_out_to_ctxs( async for published in pub_gen: - ctx_payloads: Dict[str, Any] = {} + ctx_payloads: List[Tuple[Context, Any]] = [] for topic, data in published.items(): log.debug(f"publishing {topic, data}") + # build a new dict packet or invoke provided packetizer if packetizer is None: packet = {topic: data} + else: packet = packetizer(topic, data) - for ctx in topics2ctxs.get(topic, set()): - ctx_payloads.setdefault(ctx, {}).update(packet), + + for ctx in topics2ctxs.get(topic, list()): + ctx_payloads.append((ctx, packet)) if not ctx_payloads: log.debug(f"Unconsumed values:\n{published}") # deliver to each subscriber (fan out) if ctx_payloads: - for ctx, payload in ctx_payloads.items(): + for ctx, payload in ctx_payloads: try: await ctx.send_yield(payload) except ( @@ -60,15 +67,24 @@ async def fan_out_to_ctxs( ConnectionRefusedError, ): log.warning(f"{ctx.chan} went down?") - for ctx_set in topics2ctxs.values(): - ctx_set.discard(ctx) + for ctx_list in topics2ctxs.values(): + try: + ctx_list.remove(ctx) + except ValueError: + continue if not get_topics(): log.warning(f"No subscribers left for {pub_gen}") break -def modify_subs(topics2ctxs, topics, ctx): +def modify_subs( + + topics2ctxs: Dict[str, List[Context]], + topics: Set[str], + ctx: Context, + +) -> None: """Absolute symbol subscription list for each quote stream. Effectively a symbol subscription api. @@ -77,7 +93,7 @@ def modify_subs(topics2ctxs, topics, ctx): # update map from each symbol to requesting client's chan for topic in topics: - topics2ctxs.setdefault(topic, set()).add(ctx) + topics2ctxs.setdefault(topic, list()).append(ctx) # remove any existing symbol subscriptions if symbol is not # found in ``symbols`` @@ -85,10 +101,14 @@ def modify_subs(topics2ctxs, topics, ctx): for topic in filter( lambda topic: topic not in topics, topics2ctxs.copy() ): - ctx_set = topics2ctxs.get(topic) - ctx_set.discard(ctx) + ctx_list = topics2ctxs.get(topic) + if ctx_list: + try: + ctx_list.remove(ctx) + except ValueError: + pass - if not ctx_set: + if not ctx_list: # pop empty sets which will trigger bg quoter task termination topics2ctxs.pop(topic) @@ -256,7 +276,7 @@ def pub( respawn = True finally: # remove all subs for this context - modify_subs(topics2ctxs, (), ctx) + modify_subs(topics2ctxs, set(), ctx) # if there are truly no more subscriptions with this broker # drop from broker subs dict diff --git a/tractor/testing/_tractor_test.py b/tractor/testing/_tractor_test.py index 734e367..ebb6f32 100644 --- a/tractor/testing/_tractor_test.py +++ b/tractor/testing/_tractor_test.py @@ -78,7 +78,7 @@ def tractor_test(fn): else: # use implicit root actor start - main = partial(fn, *args, **kwargs), + main = partial(fn, *args, **kwargs) return trio.run(main) # arbiter_addr=arb_addr,