From a668f714d5102741de8175787ec868dad6192981 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 21 Dec 2020 09:09:55 -0500 Subject: [PATCH 1/6] Allow passing function refs to `Portal.run()` This resolves and completes #69 allowing all RPC invocation APIs to pass function references directly instead of explicit `str` names for the target namespace and function (this is still done implicitly underneath). This brings us closer to `trio`'s task running API as well as acknowledges that any inter-host RPC system (and API) will likely need to be implemented on top of local RPC primitives anyway. Even if this ends up **not** being true we can always go to "function stubs" as part of our IAC protocol or, add a new method to do explicit namespace calls: `.run_from_module()` or whatever everyone votes on. Resolves #69 Further, this commit drops `Actor.statespace` from the entire system since a user can easily get this same functionality using module level variables. Fix docs to match all these changes (luckily mostly already done due to example scripts referencing). --- docs/index.rst | 62 +++++++++++++------ examples/a_trynamic_first_scene.py | 6 +- examples/actor_spawning_and_causality.py | 8 ++- ...ctor_spawning_and_causality_with_daemon.py | 4 +- examples/asynchronous_generators.py | 4 +- examples/debugging/multi_daemon_subactors.py | 4 +- ...ed_subactors_error_up_through_nurseries.py | 27 ++++++-- .../debugging/multi_subactor_root_errors.py | 15 ++++- examples/debugging/multi_subactors.py | 11 ++-- ...root_cancelled_but_child_is_in_tty_lock.py | 20 ++++-- examples/debugging/subactor_breakpoint.py | 1 - examples/debugging/subactor_error.py | 2 +- examples/full_fledged_streaming_service.py | 6 +- examples/remote_error_propagation.py | 2 +- tests/test_cancellation.py | 44 +++++++++---- tests/test_debugger.py | 13 ++-- tests/test_discovery.py | 11 ++-- tests/test_pubsub.py | 17 ++++- tests/test_rpc.py | 4 +- tests/test_spawning.py | 21 ++++--- tests/test_streaming.py | 5 +- tractor/__init__.py | 2 +- tractor/_actor.py | 13 +++- tractor/_debug.py | 4 ++ tractor/_portal.py | 33 +++++++++- tractor/_trionics.py | 15 +++-- 26 files changed, 250 insertions(+), 104 deletions(-) diff --git a/docs/index.rst b/docs/index.rst index f254eb0..20b03e5 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -385,37 +385,61 @@ as ``multiprocessing`` calls it) which is running ``main()``. .. _remote function execution: https://codespeak.net/execnet/example/test_info.html#remote-exec-a-function-avoiding-inlined-source-part-i -Actor local variables -********************* -Although ``tractor`` uses a *shared-nothing* architecture between processes -you can of course share state between tasks running *within* an actor. -``trio`` tasks spawned via multiple RPC calls to an actor can access global -state using the per actor ``statespace`` dictionary: +Actor local (aka *process global*) variables +******************************************** +Although ``tractor`` uses a *shared-nothing* architecture between +processes you can of course share state between tasks running *within* +an actor (since a `trio.run()` runtime is single threaded). ``trio`` +tasks spawned via multiple RPC calls to an actor can modify +*process-global-state* defined using Python module attributes: .. code:: python - statespace = {'doggy': 10} + # a per process cache + _actor_cache: Dict[str, bool] = {} - def check_statespace(): - # Remember this runs in a new process so no changes - # will propagate back to the parent actor - assert tractor.current_actor().statespace == statespace + def ping_endpoints(endpoints: List[str]): + """Start a polling process which runs completely separate + from our root actor/process. + + """ + + # This runs in a new process so no changes # will propagate + # back to the parent actor + while True: + + for ep in endpoints: + status = await check_endpoint_is_up(ep) + _actor_cache[ep] = status + + await trio.sleep(0.5) + + + async def get_alive_endpoints(): + + nonlocal _actor_cache + + return {key for key, value in _actor_cache.items() if value} async def main(): + async with tractor.open_nursery() as n: - await n.run_in_actor( - 'checker', - check_statespace, - statespace=statespace - ) + + portal = await n.run_in_actor(ping_endpoints) + + # print the alive endpoints after 3 seconds + await trio.sleep(3) + + # this is submitted to be run in our "ping_endpoints" actor + print(await portal.run(get_alive_endpoints)) -Of course you don't have to use the ``statespace`` variable (it's mostly -a convenience for passing simple data to newly spawned actors); building -out a state sharing system per-actor is totally up to you. +You can pass any kind of (`msgpack`) serializable data between actors using +function call semantics but building out a state sharing system per-actor +is totally up to you. Service Discovery diff --git a/examples/a_trynamic_first_scene.py b/examples/a_trynamic_first_scene.py index f64e631..dca59c2 100644 --- a/examples/a_trynamic_first_scene.py +++ b/examples/a_trynamic_first_scene.py @@ -13,7 +13,7 @@ async def hi(): async def say_hello(other_actor): async with tractor.wait_for_actor(other_actor) as portal: - return await portal.run(_this_module, 'hi') + return await portal.run(hi) async def main(): @@ -24,14 +24,14 @@ async def main(): print("Alright... Action!") donny = await n.run_in_actor( - 'donny', say_hello, + name='donny', # arguments are always named other_actor='gretchen', ) gretchen = await n.run_in_actor( - 'gretchen', say_hello, + name='gretchen', other_actor='donny', ) print(await gretchen.result()) diff --git a/examples/actor_spawning_and_causality.py b/examples/actor_spawning_and_causality.py index fc0d7c7..0d577c0 100644 --- a/examples/actor_spawning_and_causality.py +++ b/examples/actor_spawning_and_causality.py @@ -2,7 +2,8 @@ import tractor def cellar_door(): - return "Dang that's beautiful" + assert not tractor.is_root_process() + return "Dang that's beautiful" async def main(): @@ -10,7 +11,10 @@ async def main(): """ async with tractor.open_nursery() as n: - portal = await n.run_in_actor('some_linguist', cellar_door) + portal = await n.run_in_actor( + cellar_door, + name='some_linguist', + ) # The ``async with`` will unblock here since the 'some_linguist' # actor has completed its main task ``cellar_door``. diff --git a/examples/actor_spawning_and_causality_with_daemon.py b/examples/actor_spawning_and_causality_with_daemon.py index bb6f4ee..1216af0 100644 --- a/examples/actor_spawning_and_causality_with_daemon.py +++ b/examples/actor_spawning_and_causality_with_daemon.py @@ -19,9 +19,9 @@ async def main(): rpc_module_paths=[__name__], ) - print(await portal.run(__name__, 'movie_theatre_question')) + print(await portal.run(movie_theatre_question)) # call the subactor a 2nd time - print(await portal.run(__name__, 'movie_theatre_question')) + print(await portal.run(movie_theatre_question)) # the async with will block here indefinitely waiting # for our actor "frank" to complete, but since it's an diff --git a/examples/asynchronous_generators.py b/examples/asynchronous_generators.py index 7ca0beb..102b29a 100644 --- a/examples/asynchronous_generators.py +++ b/examples/asynchronous_generators.py @@ -24,7 +24,7 @@ async def main(): # this async for loop streams values from the above # async generator running in a separate process - async for letter in await portal.run(__name__, 'stream_forever'): + async for letter in await portal.run(stream_forever): print(letter) # we support trio's cancellation system @@ -33,4 +33,4 @@ async def main(): if __name__ == '__main__': - tractor.run(main, start_method='forkserver') + tractor.run(main) diff --git a/examples/debugging/multi_daemon_subactors.py b/examples/debugging/multi_daemon_subactors.py index b37f47a..b834a02 100644 --- a/examples/debugging/multi_daemon_subactors.py +++ b/examples/debugging/multi_daemon_subactors.py @@ -23,8 +23,8 @@ async def main(): p1 = await n.start_actor('name_error', rpc_module_paths=[__name__]) # retreive results - stream = await p0.run(__name__, 'breakpoint_forever') - await p1.run(__name__, 'name_error') + stream = await p0.run(breakpoint_forever) + await p1.run(name_error) if __name__ == '__main__': diff --git a/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py b/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py index 488fffa..82b4def 100644 --- a/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py +++ b/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py @@ -18,10 +18,17 @@ async def spawn_until(depth=0): async with tractor.open_nursery() as n: if depth < 1: # await n.run_in_actor('breakpoint_forever', breakpoint_forever) - await n.run_in_actor('name_error', name_error) + await n.run_in_actor( + name_error, + name='name_error' + ) else: depth -= 1 - await n.run_in_actor(f'spawn_until_{depth}', spawn_until, depth=depth) + await n.run_in_actor( + spawn_until, + depth=depth, + name=f'spawn_until_{depth}', + ) async def main(): @@ -46,12 +53,20 @@ async def main(): async with tractor.open_nursery() as n: # spawn both actors - portal = await n.run_in_actor('spawner0', spawn_until, depth=3) - portal1 = await n.run_in_actor('spawner1', spawn_until, depth=4) + portal = await n.run_in_actor( + spawn_until, + depth=3, + name='spawner0', + ) + portal1 = await n.run_in_actor( + spawn_until, + depth=4, + name='spawner1', + ) # gah still an issue here. - # await portal.result() - # await portal1.result() + await portal.result() + await portal1.result() if __name__ == '__main__': diff --git a/examples/debugging/multi_subactor_root_errors.py b/examples/debugging/multi_subactor_root_errors.py index 05f0fa7..62af999 100644 --- a/examples/debugging/multi_subactor_root_errors.py +++ b/examples/debugging/multi_subactor_root_errors.py @@ -10,7 +10,10 @@ async def spawn_error(): """"A nested nursery that triggers another ``NameError``. """ async with tractor.open_nursery() as n: - portal = await n.run_in_actor('name_error_1', name_error) + portal = await n.run_in_actor( + name_error, + name='name_error_1', + ) return await portal.result() @@ -27,8 +30,14 @@ async def main(): async with tractor.open_nursery() as n: # spawn both actors - portal = await n.run_in_actor('name_error', name_error) - portal1 = await n.run_in_actor('spawn_error', spawn_error) + portal = await n.run_in_actor( + name_error, + name='name_error', + ) + portal1 = await n.run_in_actor( + spawn_error, + name='spawn_error', + ) # trigger a root actor error assert 0 diff --git a/examples/debugging/multi_subactors.py b/examples/debugging/multi_subactors.py index 16ff22d..0d5ee83 100644 --- a/examples/debugging/multi_subactors.py +++ b/examples/debugging/multi_subactors.py @@ -18,7 +18,10 @@ async def spawn_error(): """"A nested nursery that triggers another ``NameError``. """ async with tractor.open_nursery() as n: - portal = await n.run_in_actor('name_error_1', name_error) + portal = await n.run_in_actor( + name_error, + name='name_error_1', + ) return await portal.result() @@ -38,9 +41,9 @@ async def main(): # Spawn both actors, don't bother with collecting results # (would result in a different debugger outcome due to parent's # cancellation). - await n.run_in_actor('bp_forever', breakpoint_forever) - await n.run_in_actor('name_error', name_error) - await n.run_in_actor('spawn_error', spawn_error) + await n.run_in_actor(breakpoint_forever) + await n.run_in_actor(name_error) + await n.run_in_actor(spawn_error) if __name__ == '__main__': diff --git a/examples/debugging/root_cancelled_but_child_is_in_tty_lock.py b/examples/debugging/root_cancelled_but_child_is_in_tty_lock.py index 797de9c..d0a1649 100644 --- a/examples/debugging/root_cancelled_but_child_is_in_tty_lock.py +++ b/examples/debugging/root_cancelled_but_child_is_in_tty_lock.py @@ -12,10 +12,14 @@ async def spawn_until(depth=0): async with tractor.open_nursery() as n: if depth < 1: # await n.run_in_actor('breakpoint_forever', breakpoint_forever) - await n.run_in_actor('name_error', name_error) + await n.run_in_actor(name_error) else: depth -= 1 - await n.run_in_actor(f'spawn_until_{depth}', spawn_until, depth=depth) + await n.run_in_actor( + spawn_until, + depth=depth, + name=f'spawn_until_{depth}', + ) async def main(): @@ -36,8 +40,16 @@ async def main(): async with tractor.open_nursery() as n: # spawn both actors - portal = await n.run_in_actor('spawner0', spawn_until, depth=0) - portal1 = await n.run_in_actor('spawner1', spawn_until, depth=1) + portal = await n.run_in_actor( + spawn_until, + depth=0, + name='spawner0', + ) + portal1 = await n.run_in_actor( + spawn_until, + depth=1, + name='spawner1', + ) # nursery cancellation should be triggered due to propagated # error from child. diff --git a/examples/debugging/subactor_breakpoint.py b/examples/debugging/subactor_breakpoint.py index 35db6cf..cb16004 100644 --- a/examples/debugging/subactor_breakpoint.py +++ b/examples/debugging/subactor_breakpoint.py @@ -15,7 +15,6 @@ async def main(): async with tractor.open_nursery() as n: portal = await n.run_in_actor( - 'breakpoint_forever', breakpoint_forever, ) await portal.result() diff --git a/examples/debugging/subactor_error.py b/examples/debugging/subactor_error.py index 67ec1b6..86bb7ca 100644 --- a/examples/debugging/subactor_error.py +++ b/examples/debugging/subactor_error.py @@ -8,7 +8,7 @@ async def name_error(): async def main(): async with tractor.open_nursery() as n: - portal = await n.run_in_actor('name_error', name_error) + portal = await n.run_in_actor(name_error) await portal.result() diff --git a/examples/full_fledged_streaming_service.py b/examples/full_fledged_streaming_service.py index e86c4ed..0ed5f66 100644 --- a/examples/full_fledged_streaming_service.py +++ b/examples/full_fledged_streaming_service.py @@ -30,9 +30,7 @@ async def aggregate(seed): async def push_to_chan(portal, send_chan): async with send_chan: - async for value in await portal.run( - __name__, 'stream_data', seed=seed - ): + async for value in await portal.run(stream_data, seed=seed): # leverage trio's built-in backpressure await send_chan.send(value) @@ -74,8 +72,8 @@ async def main(): pre_start = time.time() portal = await nursery.run_in_actor( - 'aggregator', aggregate, + name='aggregator', seed=seed, ) diff --git a/examples/remote_error_propagation.py b/examples/remote_error_propagation.py index 29528f5..0999d39 100644 --- a/examples/remote_error_propagation.py +++ b/examples/remote_error_propagation.py @@ -15,7 +15,7 @@ async def main(): )) # start one actor that will fail immediately - await n.run_in_actor('extra', assert_err) + await n.run_in_actor(assert_err) # should error here with a ``RemoteActorError`` containing # an ``AssertionError`` and all the other actors have been cancelled diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 9e3c9f9..1ff7f23 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -49,7 +49,7 @@ def test_remote_error(arb_addr, args_err): async def main(): async with tractor.open_nursery() as nursery: - portal = await nursery.run_in_actor('errorer', assert_err, **args) + portal = await nursery.run_in_actor(assert_err, name='errorer', **args) # get result(s) from main task try: @@ -73,8 +73,8 @@ def test_multierror(arb_addr): async def main(): async with tractor.open_nursery() as nursery: - await nursery.run_in_actor('errorer1', assert_err) - portal2 = await nursery.run_in_actor('errorer2', assert_err) + await nursery.run_in_actor(assert_err, name='errorer1') + portal2 = await nursery.run_in_actor(assert_err, name='errorer2') # get result(s) from main task try: @@ -104,7 +104,10 @@ def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay): async with tractor.open_nursery() as nursery: for i in range(num_subactors): await nursery.run_in_actor( - f'errorer{i}', assert_err, delay=delay) + assert_err, + name=f'errorer{i}', + delay=delay + ) with pytest.raises(trio.MultiError) as exc_info: tractor.run(main, arbiter_addr=arb_addr) @@ -231,7 +234,12 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel): for i in range(num_actors): # start actor(s) that will fail immediately riactor_portals.append( - await n.run_in_actor(f'actor_{i}', func, **kwargs)) + await n.run_in_actor( + func, + name=f'actor_{i}', + **kwargs + ) + ) if da_func: func, kwargs, expect_error = da_func @@ -276,21 +284,24 @@ async def spawn_and_error(breadth, depth) -> None: name = tractor.current_actor().name async with tractor.open_nursery() as nursery: for i in range(breadth): + if depth > 0: + args = ( - f'spawner_{i}_depth_{depth}', spawn_and_error, ) kwargs = { + 'name': f'spawner_{i}_depth_{depth}', 'breadth': breadth, 'depth': depth - 1, } else: args = ( - f'{name}_errorer_{i}', assert_err, ) - kwargs = {} + kwargs = { + 'name': f'{name}_errorer_{i}', + } await nursery.run_in_actor(*args, **kwargs) @@ -318,8 +329,8 @@ async def test_nested_multierrors(loglevel, start_method): async with tractor.open_nursery() as nursery: for i in range(subactor_breadth): await nursery.run_in_actor( - f'spawner_{i}', spawn_and_error, + name=f'spawner_{i}', breadth=subactor_breadth, depth=depth, ) @@ -398,7 +409,10 @@ def test_cancel_via_SIGINT_other_task( async def spawn_and_sleep_forever(task_status=trio.TASK_STATUS_IGNORED): async with tractor.open_nursery() as tn: for i in range(3): - await tn.run_in_actor('sucka', sleep_forever) + await tn.run_in_actor( + sleep_forever, + name='namesucka', + ) task_status.started() await trio.sleep_forever() @@ -423,7 +437,10 @@ async def spin_for(period=3): async def spawn(): async with tractor.open_nursery() as tn: - portal = await tn.run_in_actor('sleeper', spin_for) + portal = await tn.run_in_actor( + spin_for, + name='sleeper', + ) @no_windows @@ -442,7 +459,10 @@ def test_cancel_while_childs_child_in_sync_sleep( async def main(): with trio.fail_after(2): async with tractor.open_nursery() as tn: - portal = await tn.run_in_actor('spawn', spawn) + portal = await tn.run_in_actor( + spawn, + name='spawn', + ) await trio.sleep(1) assert 0 diff --git a/tests/test_debugger.py b/tests/test_debugger.py index f35ff89..5360b4b 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -237,7 +237,7 @@ def test_multi_subactors(spawn): child.expect(r"\(Pdb\+\+\)") before = str(child.before.decode()) - assert "Attaching pdb to actor: ('bp_forever'" in before + assert "Attaching pdb to actor: ('breakpoint_forever'" in before # do some "next" commands to demonstrate recurrent breakpoint # entries @@ -265,7 +265,7 @@ def test_multi_subactors(spawn): child.sendline('c') child.expect(r"\(Pdb\+\+\)") before = str(child.before.decode()) - assert "Attaching pdb to actor: ('bp_forever'" in before + assert "Attaching pdb to actor: ('breakpoint_forever'" in before # now run some "continues" to show re-entries for _ in range(5): @@ -277,7 +277,7 @@ def test_multi_subactors(spawn): child.expect(r"\(Pdb\+\+\)") before = str(child.before.decode()) assert "Attaching to pdb in crashed actor: ('arbiter'" in before - assert "RemoteActorError: ('bp_forever'" in before + assert "RemoteActorError: ('breakpoint_forever'" in before assert 'bdb.BdbQuit' in before # process should exit @@ -285,7 +285,7 @@ def test_multi_subactors(spawn): child.expect(pexpect.EOF) before = str(child.before.decode()) - assert "RemoteActorError: ('bp_forever'" in before + assert "RemoteActorError: ('breakpoint_forever'" in before assert 'bdb.BdbQuit' in before @@ -391,8 +391,9 @@ def test_multi_nested_subactors_error_through_nurseries(spawn): child.expect(pexpect.EOF) - before = str(child.before.decode()) - assert "NameError" in before + if not timed_out_early: + before = str(child.before.decode()) + assert "NameError" in before def test_root_nursery_cancels_before_child_releases_tty_lock(spawn, start_method): diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 52a1dc8..b0aa279 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -74,14 +74,14 @@ async def test_trynamic_trio(func, start_method): print("Alright... Action!") donny = await n.run_in_actor( - 'donny', func, other_actor='gretchen', + name='donny', ) gretchen = await n.run_in_actor( - 'gretchen', func, other_actor='donny', + name='gretchen', ) print(await gretchen.result()) print(await donny.result()) @@ -147,7 +147,7 @@ async def spawn_and_check_registry( portals = {} for i in range(3): name = f'a{i}' - portals[name] = await n.run_in_actor(name, to_run) + portals[name] = await n.run_in_actor(to_run, name=name) # wait on last actor to come up async with tractor.wait_for_actor(name): @@ -257,7 +257,10 @@ async def close_chans_before_nursery( get_reg = partial(aportal.run, 'self', 'get_registry') async with tractor.open_nursery() as tn: - portal1 = await tn.run_in_actor('consumer1', stream_forever) + portal1 = await tn.run_in_actor( + stream_forever, + name='consumer1', + ) agen1 = await portal1.result() portal2 = await tn.start_actor('consumer2', rpc_module_paths=[__name__]) diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index aaee831..b963cdb 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -126,7 +126,10 @@ async def test_required_args(callwith_expecterror): async with tractor.open_nursery() as n: # await func(**kwargs) portal = await n.run_in_actor( - 'pubber', multilock_pubber, **kwargs) + multilock_pubber, + name='pubber', + **kwargs + ) async with tractor.wait_for_actor('pubber'): pass @@ -163,9 +166,17 @@ def test_multi_actor_subs_arbiter_pub( ) even_portal = await n.run_in_actor( - 'evens', subs, which=['even'], pub_actor_name=name) + subs, + which=['even'], + name='evens', + pub_actor_name=name + ) odd_portal = await n.run_in_actor( - 'odds', subs, which=['odd'], pub_actor_name=name) + subs, + which=['odd'], + name='odds', + pub_actor_name=name + ) async with tractor.wait_for_actor('evens'): # block until 2nd actor is initialized diff --git a/tests/test_rpc.py b/tests/test_rpc.py index 8ac9b09..95a46ec 100644 --- a/tests/test_rpc.py +++ b/tests/test_rpc.py @@ -80,9 +80,11 @@ def test_rpc_errors(arb_addr, to_call, testdir): # spawn a subactor which calls us back async with tractor.open_nursery() as n: await n.run_in_actor( - 'subactor', sleep_back_actor, actor_name=subactor_requests_to, + + name='subactor', + # function from the local exposed module space # the subactor will invoke when it RPCs back to this actor func_name=funcname, diff --git a/tests/test_spawning.py b/tests/test_spawning.py index 220af3e..0f9db30 100644 --- a/tests/test_spawning.py +++ b/tests/test_spawning.py @@ -1,31 +1,33 @@ """ Spawning basics """ +from functools import partial + import pytest import trio import tractor from conftest import tractor_test -statespace = {'doggy': 10, 'kitty': 4} +data_to_pass_down = {'doggy': 10, 'kitty': 4} -async def spawn(is_arbiter): +async def spawn(is_arbiter, data): namespaces = [__name__] await trio.sleep(0.1) actor = tractor.current_actor() assert actor.is_arbiter == is_arbiter - assert actor.statespace == statespace + data == data_to_pass_down if actor.is_arbiter: async with tractor.open_nursery() as nursery: # forks here portal = await nursery.run_in_actor( - 'sub-actor', spawn, is_arbiter=False, - statespace=statespace, + name='sub-actor', + data=data, rpc_module_paths=namespaces, ) @@ -41,10 +43,9 @@ async def spawn(is_arbiter): def test_local_arbiter_subactor_global_state(arb_addr): result = tractor.run( - spawn, + partial(spawn, data=data_to_pass_down), True, name='arbiter', - statespace=statespace, arbiter_addr=arb_addr, ) assert result == 10 @@ -89,7 +90,10 @@ async def test_most_beautiful_word(start_method): """ async with tractor.open_nursery() as n: - portal = await n.run_in_actor('some_linguist', cellar_door) + portal = await n.run_in_actor( + cellar_door, + name='some_linguist', + ) # The ``async with`` will unblock here since the 'some_linguist' # actor has completed its main task ``cellar_door``. @@ -119,7 +123,6 @@ def test_loglevel_propagated_to_subactor( async def main(): async with tractor.open_nursery() as tn: await tn.run_in_actor( - 'log_checker', check_loglevel, level=level, ) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index a64515f..1a48479 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -62,7 +62,6 @@ async def stream_from_single_subactor(stream_func_name): portal = await nursery.start_actor( 'streamerd', rpc_module_paths=[__name__], - statespace={'global_dict': {}}, ) seq = range(10) @@ -186,9 +185,9 @@ async def a_quadruple_example(): pre_start = time.time() portal = await nursery.run_in_actor( - 'aggregator', aggregate, seed=seed, + name='aggregator', ) start = time.time() @@ -275,9 +274,9 @@ async def test_respawn_consumer_task( async with tractor.open_nursery() as n: stream = await(await n.run_in_actor( - 'streamer', stream_data, seed=11, + name='streamer', )).result() expect = set(range(11)) diff --git a/tractor/__init__.py b/tractor/__init__.py index 070bf72..f9a1730 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -16,7 +16,7 @@ from ._streaming import Context, stream from ._discovery import get_arbiter, find_actor, wait_for_actor from ._actor import Actor, _start_actor, Arbiter from ._trionics import open_nursery -from ._state import current_actor +from ._state import current_actor, is_root_process from . import _state from ._exceptions import RemoteActorError, ModuleNotExposed from ._debug import breakpoint, post_mortem diff --git a/tractor/_actor.py b/tractor/_actor.py index 02a010c..dba47ab 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -126,10 +126,17 @@ async def _invoke( with cancel_scope as cs: task_status.started(cs) await chan.send({'return': await coro, 'cid': cid}) + except (Exception, trio.MultiError) as err: - # NOTE: don't enter debug mode recursively after quitting pdb - log.exception("Actor crashed:") - await _debug._maybe_enter_pm(err) + + if not isinstance(err, trio.ClosedResourceError): + log.exception("Actor crashed:") + # XXX: is there any case where we'll want to debug IPC + # disconnects? I can't think of a reason that inspecting + # this type of failure will be useful for respawns or + # recovery logic - the only case is some kind of strange bug + # in `trio` itself? + await _debug._maybe_enter_pm(err) # always ship errors back to caller err_msg = pack_error(err) diff --git a/tractor/_debug.py b/tractor/_debug.py index bb73857..5bd1a2b 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -305,6 +305,10 @@ post_mortem = partial( async def _maybe_enter_pm(err): if ( _state.debug_mode() + + # NOTE: don't enter debug mode recursively after quitting pdb + # Iow, don't re-enter the repl if the `quit` command was issued + # by the user. and not isinstance(err, bdb.BdbQuit) # XXX: if the error is the likely result of runtime-wide diff --git a/tractor/_portal.py b/tractor/_portal.py index 15d86e3..c336e89 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -8,6 +8,7 @@ from typing import Tuple, Any, Dict, Optional, Set, Iterator from functools import partial from dataclasses import dataclass from contextlib import contextmanager +import warnings import trio from async_generator import asynccontextmanager @@ -197,15 +198,34 @@ class Portal: "A pending main result has already been submitted" self._expect_result = await self._submit(ns, func, kwargs) - async def run(self, ns: str, func: str, **kwargs) -> Any: + async def run( + self, + func_or_ns: str, + fn_name: Optional[str] = None, + **kwargs + ) -> Any: """Submit a remote function to be scheduled and run by actor, wrap and return its (stream of) result(s). This is a blocking call and returns either a value from the remote rpc task or a local async generator instance. """ + if isinstance(func_or_ns, str): + warnings.warn( + "`Portal.run(namespace: str, funcname: str)` is now deprecated, " + "pass a function reference directly instead", + DeprecationWarning + ) + fn_mod_path = func_or_ns + assert isinstance(fn_name, str) + + else: # function reference was passed directly + fn = func_or_ns + fn_mod_path = fn.__module__ + fn_name = fn.__name__ + return await self._return_from_resptype( - *(await self._submit(ns, func, kwargs)) + *(await self._submit(fn_mod_path, fn_name, kwargs)) ) async def _return_from_resptype( @@ -274,7 +294,14 @@ class Portal: log.warning( f"Cancelling all streams with {self.channel.uid}") for stream in self._streams.copy(): - await stream.aclose() + try: + await stream.aclose() + except trio.ClosedResourceError: + # don't error the stream having already been closed + # (unless of course at some point down the road we + # won't expect this to always be the case or need to + # detect it for respawning purposes?) + log.debug(f"{stream} was already closed.") async def aclose(self): log.debug(f"Closing {self}") diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 203a9dc..3ee9526 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -52,7 +52,7 @@ class ActorNursery: name: str, *, bind_addr: Tuple[str, int] = _default_bind_addr, - statespace: Optional[Dict[str, Any]] = None, + # statespace: Optional[Dict[str, Any]] = None, rpc_module_paths: List[str] = None, loglevel: str = None, # set log level per subactor nursery: trio.Nursery = None, @@ -67,7 +67,7 @@ class ActorNursery: name, # modules allowed to invoked funcs from rpc_module_paths=rpc_module_paths or [], - statespace=statespace, # global proc state vars + # statespace=statespace, # global proc state vars loglevel=loglevel, arbiter_addr=current_actor()._arb_addr, ) @@ -94,12 +94,12 @@ class ActorNursery: async def run_in_actor( self, - name: str, fn: typing.Callable, *, + name: Optional[str] = None, bind_addr: Tuple[str, int] = _default_bind_addr, rpc_module_paths: Optional[List[str]] = None, - statespace: Dict[str, Any] = None, + # statespace: Dict[str, Any] = None, loglevel: str = None, # set log level per subactor **kwargs, # explicit args to ``fn`` ) -> Portal: @@ -111,11 +111,16 @@ class ActorNursery: the actor is terminated. """ mod_path = fn.__module__ + + if name is None: + # use the explicit function name if not provided + name = fn.__name__ + portal = await self.start_actor( name, rpc_module_paths=[mod_path] + (rpc_module_paths or []), bind_addr=bind_addr, - statespace=statespace, + # statespace=statespace, loglevel=loglevel, # use the run_in_actor nursery nursery=self._ria_nursery, From 7134f35d6ee711c40f86c5f43f4e93803fd29d54 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 22 Dec 2020 10:19:52 -0500 Subject: [PATCH 2/6] Add `Portal.run_from_ns()` It turns out in order to maintain our sneaky little "call an `Actor` method in this remote process" we still need the ability to invoke functions from a namespace. We're currently using a "self" namespace as a way to do this for internal inter-process method calling. Either way, I see no reason not to keep a public method for this invoke style (we just won't market it) since it is still how the machinery works underneath. --- tractor/_portal.py | 44 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 37 insertions(+), 7 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index c336e89..6c026f8 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -120,7 +120,7 @@ class ReceiveStream(trio.abc.ReceiveChannel): # NOTE: we're telling the far end actor to cancel a task # corresponding to *this actor*. The far end local channel # instance is passed to `Actor._cancel_task()` implicitly. - await self._portal.run('self', '_cancel_task', cid=cid) + await self._portal.run_from_ns('self', '_cancel_task', cid=cid) if cs.cancelled_caught: # XXX: there's no way to know if the remote task was indeed @@ -212,9 +212,12 @@ class Portal: """ if isinstance(func_or_ns, str): warnings.warn( - "`Portal.run(namespace: str, funcname: str)` is now deprecated, " - "pass a function reference directly instead", - DeprecationWarning + "`Portal.run(namespace: str, funcname: str)` is now" + "deprecated, pass a function reference directly instead\n" + "If you still want to run a remote function by name use" + "`Portal.run_from_ns()`", + DeprecationWarning, + stacklevel=2, ) fn_mod_path = func_or_ns assert isinstance(fn_name, str) @@ -228,6 +231,28 @@ class Portal: *(await self._submit(fn_mod_path, fn_name, kwargs)) ) + async def run_from_ns( + self, + namespace_path: str, + function_name: str, + **kwargs, + ) -> Any: + """Run a function from a (remote) namespace in a new task on the far-end actor. + + This is a more explitcit way to run tasks in a remote-process + actor using explicit object-path syntax. Hint: this is how + `.run()` works underneath. + + Note:: + + A special namespace `self` can be used to invoke `Actor` + instance methods in the remote runtime. Currently this should only + be used for `tractor` internals. + """ + return await self._return_from_resptype( + *(await self._submit(namespace_path, function_name, kwargs)) + ) + async def _return_from_resptype( self, cid: str, @@ -329,13 +354,16 @@ class Portal: # with trio.CancelScope(shield=True) as cancel_scope: with trio.move_on_after(0.5) as cancel_scope: cancel_scope.shield = True - await self.run('self', 'cancel') + + await self.run_from_ns('self', 'cancel') return True + if cancel_scope.cancelled_caught: log.warning(f"May have failed to cancel {self.channel.uid}") # if we get here some weird cancellation case happened return False + except trio.ClosedResourceError: log.warning( f"{self.channel} for {self.channel.uid} was already closed?") @@ -352,8 +380,10 @@ class LocalPortal: actor: 'Actor' # type: ignore # noqa channel: Channel - async def run(self, ns: str, func_name: str, **kwargs) -> Any: - """Run a requested function locally and return it's result. + async def run_from_ns(self, ns: str, func_name: str, **kwargs) -> Any: + """Run a requested local function from a namespace path and + return it's result. + """ obj = self.actor if ns == 'self' else importlib.import_module(ns) func = getattr(obj, func_name) From 9fd3c42eb1a66a41c82c2b3e18e4ad4c5340085d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 22 Dec 2020 10:26:07 -0500 Subject: [PATCH 3/6] Port inter-process method calls to `Portal.run_from_ns()` --- tractor/_actor.py | 9 ++++++--- tractor/_debug.py | 9 ++++----- tractor/_discovery.py | 4 ++-- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index dba47ab..3dbdc5e 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -712,7 +712,7 @@ class Actor: assert isinstance(self._arb_addr, tuple) async with get_arbiter(*self._arb_addr) as arb_portal: - await arb_portal.run( + await arb_portal.run_from_ns( 'self', 'register_actor', uid=self.uid, @@ -788,8 +788,11 @@ class Actor: cs.shield = True try: async with get_arbiter(*self._arb_addr) as arb_portal: - await arb_portal.run( - 'self', 'unregister_actor', uid=self.uid) + await arb_portal.run_from_ns( + 'self', + 'unregister_actor', + uid=self.uid + ) except OSError: failed = True if cs.cancelled_caught: diff --git a/tractor/_debug.py b/tractor/_debug.py index 5bd1a2b..f9af7a6 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -180,15 +180,14 @@ def _breakpoint(debug_func) -> Awaitable[None]: try: async with get_root() as portal: with trio.fail_after(.5): - agen = await portal.run( - 'tractor._debug', - '_hijack_stdin_relay_to_child', + stream = await portal.run( + tractor._debug._hijack_stdin_relay_to_child, subactor_uid=actor.uid, ) - async with aclosing(agen): + async with aclosing(stream): # block until first yield above - async for val in agen: + async for val in stream: assert val == 'Locked' task_status.started() diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 360b16d..9e520b3 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -60,7 +60,7 @@ async def find_actor( """ actor = current_actor() async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: - sockaddr = await arb_portal.run('self', 'find_actor', name=name) + sockaddr = await arb_portal.run_from_ns('self', 'find_actor', name=name) # TODO: return portals to all available actors - for now just # the last one that registered if name == 'arbiter' and actor.is_arbiter: @@ -84,7 +84,7 @@ async def wait_for_actor( """ actor = current_actor() async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: - sockaddrs = await arb_portal.run('self', 'wait_for_actor', name=name) + sockaddrs = await arb_portal.run_from_ns('self', 'wait_for_actor', name=name) sockaddr = sockaddrs[-1] async with _connect_chan(*sockaddr) as chan: async with open_portal(chan) as portal: From 493f2efb508ec5e8754dab3177fcf2dfba61415f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 22 Dec 2020 10:26:58 -0500 Subject: [PATCH 4/6] Port tests to `Portal.run_from_ns()` --- tests/test_discovery.py | 9 ++++++--- tests/test_local.py | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index b0aa279..eff2897 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -124,12 +124,15 @@ async def spawn_and_check_registry( assert not actor.is_arbiter async with tractor.get_arbiter(*arb_addr) as portal: + if actor.is_arbiter: + async def get_reg(): return actor._registry + extra = 1 # arbiter is local root actor else: - get_reg = partial(portal.run, 'self', 'get_registry') + get_reg = partial(portal.run_from_ns, 'self', 'get_registry') extra = 2 # local root actor + remote arbiter # ensure current actor is registered @@ -254,7 +257,7 @@ async def close_chans_before_nursery( async with tractor.get_arbiter(*arb_addr) as aportal: try: - get_reg = partial(aportal.run, 'self', 'get_registry') + get_reg = partial(aportal.run_from_ns, 'self', 'get_registry') async with tractor.open_nursery() as tn: portal1 = await tn.run_in_actor( @@ -264,7 +267,7 @@ async def close_chans_before_nursery( agen1 = await portal1.result() portal2 = await tn.start_actor('consumer2', rpc_module_paths=[__name__]) - agen2 = await portal2.run(__name__, 'stream_forever') + agen2 = await portal2.run(stream_forever) async with trio.open_nursery() as n: n.start_soon(streamer, agen1) diff --git a/tests/test_local.py b/tests/test_local.py index 0a594d0..ef64795 100644 --- a/tests/test_local.py +++ b/tests/test_local.py @@ -46,7 +46,7 @@ async def test_self_is_registered_localportal(arb_addr): assert actor.is_arbiter async with tractor.get_arbiter(*arb_addr) as portal: assert isinstance(portal, tractor._portal.LocalPortal) - sockaddr = await portal.run('self', 'wait_for_actor', name='arbiter') + sockaddr = await portal.run_from_ns('self', 'wait_for_actor', name='arbiter') assert sockaddr[0] == arb_addr From 0eba5f470839a91c0e5eccd98f5ac63b77c91b5d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 22 Dec 2020 10:35:05 -0500 Subject: [PATCH 5/6] Port remaining tests to pass func refs --- tests/test_cancellation.py | 7 ++++--- tests/test_pubsub.py | 4 ++-- tests/test_spawning.py | 4 ++-- tests/test_streaming.py | 9 ++++----- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 1ff7f23..458607d 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -137,7 +137,7 @@ def test_cancel_single_subactor(arb_addr, mechanism): portal = await nursery.start_actor( 'nothin', rpc_module_paths=[__name__], ) - assert (await portal.run(__name__, 'do_nothing')) is None + assert (await portal.run(do_nothing)) is None if mechanism == 'nursery_cancel': # would hang otherwise @@ -173,7 +173,7 @@ async def test_cancel_infinite_streamer(start_method): # this async for loop streams values from the above # async generator running in a separate process - async for letter in await portal.run(__name__, 'stream_forever'): + async for letter in await portal.run(stream_forever): print(letter) # we support trio's cancellation system @@ -247,7 +247,8 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel): # if this function fails then we should error here # and the nursery should teardown all other actors try: - await portal.run(__name__, func.__name__, **kwargs) + await portal.run(func, **kwargs) + except tractor.RemoteActorError as err: assert err.type == err_type # we only expect this first error to propogate diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index b963cdb..e55b1d5 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -58,7 +58,7 @@ async def subs( async with tractor.find_actor(pub_actor_name) as portal: stream = await portal.run( - __name__, 'pubber', + pubber, topics=which, seed=seed, ) @@ -76,7 +76,7 @@ async def subs( await stream.aclose() stream = await portal.run( - __name__, 'pubber', + pubber, topics=['odd'], seed=seed, ) diff --git a/tests/test_spawning.py b/tests/test_spawning.py index 0f9db30..a8da2c1 100644 --- a/tests/test_spawning.py +++ b/tests/test_spawning.py @@ -70,9 +70,9 @@ async def test_movie_theatre_convo(start_method): rpc_module_paths=[__name__], ) - print(await portal.run(__name__, 'movie_theatre_question')) + print(await portal.run(movie_theatre_question)) # call the subactor a 2nd time - print(await portal.run(__name__, 'movie_theatre_question')) + print(await portal.run(movie_theatre_question)) # the async with will block here indefinitely waiting # for our actor "frank" to complete, we cancel 'frank' diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 1a48479..45fbd5b 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -50,7 +50,7 @@ async def context_stream(ctx, sequence): assert cs.cancelled_caught -async def stream_from_single_subactor(stream_func_name): +async def stream_from_single_subactor(stream_func): """Verify we can spawn a daemon actor and retrieve streamed data. """ async with tractor.find_actor('streamerd') as portals: @@ -67,8 +67,7 @@ async def stream_from_single_subactor(stream_func_name): seq = range(10) stream = await portal.run( - __name__, - stream_func_name, # one of the funcs above + stream_func, # one of the funcs above sequence=list(seq), # has to be msgpack serializable ) # it'd sure be nice to have an asyncitertools here... @@ -96,7 +95,7 @@ async def stream_from_single_subactor(stream_func_name): @pytest.mark.parametrize( - 'stream_func', ['async_gen_stream', 'context_stream'] + 'stream_func', [async_gen_stream, context_stream] ) def test_stream_from_single_subactor(arb_addr, start_method, stream_func): """Verify streaming from a spawned async generator. @@ -104,7 +103,7 @@ def test_stream_from_single_subactor(arb_addr, start_method, stream_func): tractor.run( partial( stream_from_single_subactor, - stream_func_name=stream_func, + stream_func=stream_func, ), arbiter_addr=arb_addr, start_method=start_method, From 4bf9b27f57deeaa430c96984514515ba2448b7b0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 22 Dec 2020 19:30:51 -0500 Subject: [PATCH 6/6] Drop all .statespace refs; it was a silly idea --- tests/test_pubsub.py | 20 +++++++++++++------- tractor/_actor.py | 2 -- tractor/_spawn.py | 1 - tractor/_trionics.py | 4 ---- tractor/msg.py | 11 ++++++++--- 5 files changed, 21 insertions(+), 17 deletions(-) diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index e55b1d5..365dbb9 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -28,15 +28,19 @@ def is_even(i): return i % 2 == 0 +# placeholder for topics getter +_get_topics = None + + @tractor.msg.pub async def pubber(get_topics, seed=10): - ss = tractor.current_actor().statespace + + # ensure topic subscriptions are as expected + global _get_topics + _get_topics = get_topics for i in cycle(range(seed)): - # ensure topic subscriptions are as expected - ss['get_topics'] = get_topics - yield {'even' if is_even(i) else 'odd': i} await trio.sleep(0.1) @@ -151,8 +155,9 @@ def test_multi_actor_subs_arbiter_pub( ): """Try out the neato @pub decorator system. """ + global _get_topics + async def main(): - ss = tractor.current_actor().statespace async with tractor.open_nursery() as n: @@ -183,11 +188,12 @@ def test_multi_actor_subs_arbiter_pub( pass if pub_actor == 'arbiter': + # wait for publisher task to be spawned in a local RPC task - while not ss.get('get_topics'): + while _get_topics is None: await trio.sleep(0.1) - get_topics = ss.get('get_topics') + get_topics = _get_topics assert 'even' in get_topics() diff --git a/tractor/_actor.py b/tractor/_actor.py index 3dbdc5e..87e6139 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -194,7 +194,6 @@ class Actor: name: str, *, rpc_module_paths: List[str] = [], - statespace: Optional[Dict[str, Any]] = None, uid: str = None, loglevel: str = None, arbiter_addr: Optional[Tuple[str, int]] = None, @@ -226,7 +225,6 @@ class Actor: # TODO: consider making this a dynamically defined # @dataclass once we get py3.7 - self.statespace = statespace or {} self.loglevel = loglevel self._arb_addr = arbiter_addr diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 2065967..78158bb 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -248,7 +248,6 @@ async def new_proc( await chan.send({ "_parent_main_data": subactor._parent_main_data, "rpc_module_paths": subactor.rpc_module_paths, - "statespace": subactor.statespace, "_arb_addr": subactor._arb_addr, "bind_host": bind_addr[0], "bind_port": bind_addr[1], diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 3ee9526..4d9e15f 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -52,7 +52,6 @@ class ActorNursery: name: str, *, bind_addr: Tuple[str, int] = _default_bind_addr, - # statespace: Optional[Dict[str, Any]] = None, rpc_module_paths: List[str] = None, loglevel: str = None, # set log level per subactor nursery: trio.Nursery = None, @@ -67,7 +66,6 @@ class ActorNursery: name, # modules allowed to invoked funcs from rpc_module_paths=rpc_module_paths or [], - # statespace=statespace, # global proc state vars loglevel=loglevel, arbiter_addr=current_actor()._arb_addr, ) @@ -99,7 +97,6 @@ class ActorNursery: name: Optional[str] = None, bind_addr: Tuple[str, int] = _default_bind_addr, rpc_module_paths: Optional[List[str]] = None, - # statespace: Dict[str, Any] = None, loglevel: str = None, # set log level per subactor **kwargs, # explicit args to ``fn`` ) -> Portal: @@ -120,7 +117,6 @@ class ActorNursery: name, rpc_module_paths=[mod_path] + (rpc_module_paths or []), bind_addr=bind_addr, - # statespace=statespace, loglevel=loglevel, # use the run_in_actor nursery nursery=self._ria_nursery, diff --git a/tractor/msg.py b/tractor/msg.py index 0462fd0..f778ac7 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -90,6 +90,9 @@ def modify_subs(topics2ctxs, topics, ctx): topics2ctxs.pop(topic) +_pub_state: Dict[str, dict] = {} + + def pub( wrapped: typing.Callable = None, *, @@ -175,12 +178,15 @@ def pub( subscribers. If you are ok to have a new task running for every call to ``pub_service()`` then probably don't need this. """ + global _pub_state + # handle the decorator not called with () case if wrapped is None: return partial(pub, tasks=tasks) task2lock: Dict[Union[str, None], trio.StrictFIFOLock] = { None: trio.StrictFIFOLock()} + for name in tasks: task2lock[name] = trio.StrictFIFOLock() @@ -203,11 +209,10 @@ def pub( f"argument with a falue from {tasks}") topics = set(topics) - ss = current_actor().statespace - lockmap = ss.setdefault('_pubtask2lock', task2lock) + lockmap = _pub_state.setdefault('_pubtask2lock', task2lock) lock = lockmap[task_name] - all_subs = ss.setdefault('_subs', {}) + all_subs = _pub_state.setdefault('_subs', {}) topics2ctxs = all_subs.setdefault(task_name, {}) try: