forked from goodboy/tractor
1
0
Fork 0

Merge pull request #174 from goodboy/func_refs_always

Allow passing function refs to `Portal.run()`
py3.9
goodboy 2020-12-22 19:45:10 -05:00 committed by GitHub
commit f4f39c29f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 332 additions and 147 deletions

View File

@ -385,37 +385,61 @@ as ``multiprocessing`` calls it) which is running ``main()``.
.. _remote function execution: https://codespeak.net/execnet/example/test_info.html#remote-exec-a-function-avoiding-inlined-source-part-i .. _remote function execution: https://codespeak.net/execnet/example/test_info.html#remote-exec-a-function-avoiding-inlined-source-part-i
Actor local variables Actor local (aka *process global*) variables
********************* ********************************************
Although ``tractor`` uses a *shared-nothing* architecture between processes Although ``tractor`` uses a *shared-nothing* architecture between
you can of course share state between tasks running *within* an actor. processes you can of course share state between tasks running *within*
``trio`` tasks spawned via multiple RPC calls to an actor can access global an actor (since a `trio.run()` runtime is single threaded). ``trio``
state using the per actor ``statespace`` dictionary: tasks spawned via multiple RPC calls to an actor can modify
*process-global-state* defined using Python module attributes:
.. code:: python .. code:: python
statespace = {'doggy': 10} # a per process cache
_actor_cache: Dict[str, bool] = {}
def check_statespace(): def ping_endpoints(endpoints: List[str]):
# Remember this runs in a new process so no changes """Start a polling process which runs completely separate
# will propagate back to the parent actor from our root actor/process.
assert tractor.current_actor().statespace == statespace
"""
# This runs in a new process so no changes # will propagate
# back to the parent actor
while True:
for ep in endpoints:
status = await check_endpoint_is_up(ep)
_actor_cache[ep] = status
await trio.sleep(0.5)
async def get_alive_endpoints():
nonlocal _actor_cache
return {key for key, value in _actor_cache.items() if value}
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
await n.run_in_actor(
'checker', portal = await n.run_in_actor(ping_endpoints)
check_statespace,
statespace=statespace # print the alive endpoints after 3 seconds
) await trio.sleep(3)
# this is submitted to be run in our "ping_endpoints" actor
print(await portal.run(get_alive_endpoints))
Of course you don't have to use the ``statespace`` variable (it's mostly You can pass any kind of (`msgpack`) serializable data between actors using
a convenience for passing simple data to newly spawned actors); building function call semantics but building out a state sharing system per-actor
out a state sharing system per-actor is totally up to you. is totally up to you.
Service Discovery Service Discovery

View File

@ -13,7 +13,7 @@ async def hi():
async def say_hello(other_actor): async def say_hello(other_actor):
async with tractor.wait_for_actor(other_actor) as portal: async with tractor.wait_for_actor(other_actor) as portal:
return await portal.run(_this_module, 'hi') return await portal.run(hi)
async def main(): async def main():
@ -24,14 +24,14 @@ async def main():
print("Alright... Action!") print("Alright... Action!")
donny = await n.run_in_actor( donny = await n.run_in_actor(
'donny',
say_hello, say_hello,
name='donny',
# arguments are always named # arguments are always named
other_actor='gretchen', other_actor='gretchen',
) )
gretchen = await n.run_in_actor( gretchen = await n.run_in_actor(
'gretchen',
say_hello, say_hello,
name='gretchen',
other_actor='donny', other_actor='donny',
) )
print(await gretchen.result()) print(await gretchen.result())

View File

@ -2,7 +2,8 @@ import tractor
def cellar_door(): def cellar_door():
return "Dang that's beautiful" assert not tractor.is_root_process()
return "Dang that's beautiful"
async def main(): async def main():
@ -10,7 +11,10 @@ async def main():
""" """
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
portal = await n.run_in_actor('some_linguist', cellar_door) portal = await n.run_in_actor(
cellar_door,
name='some_linguist',
)
# The ``async with`` will unblock here since the 'some_linguist' # The ``async with`` will unblock here since the 'some_linguist'
# actor has completed its main task ``cellar_door``. # actor has completed its main task ``cellar_door``.

View File

@ -19,9 +19,9 @@ async def main():
rpc_module_paths=[__name__], rpc_module_paths=[__name__],
) )
print(await portal.run(__name__, 'movie_theatre_question')) print(await portal.run(movie_theatre_question))
# call the subactor a 2nd time # call the subactor a 2nd time
print(await portal.run(__name__, 'movie_theatre_question')) print(await portal.run(movie_theatre_question))
# the async with will block here indefinitely waiting # the async with will block here indefinitely waiting
# for our actor "frank" to complete, but since it's an # for our actor "frank" to complete, but since it's an

View File

@ -24,7 +24,7 @@ async def main():
# this async for loop streams values from the above # this async for loop streams values from the above
# async generator running in a separate process # async generator running in a separate process
async for letter in await portal.run(__name__, 'stream_forever'): async for letter in await portal.run(stream_forever):
print(letter) print(letter)
# we support trio's cancellation system # we support trio's cancellation system
@ -33,4 +33,4 @@ async def main():
if __name__ == '__main__': if __name__ == '__main__':
tractor.run(main, start_method='forkserver') tractor.run(main)

View File

@ -23,8 +23,8 @@ async def main():
p1 = await n.start_actor('name_error', rpc_module_paths=[__name__]) p1 = await n.start_actor('name_error', rpc_module_paths=[__name__])
# retreive results # retreive results
stream = await p0.run(__name__, 'breakpoint_forever') stream = await p0.run(breakpoint_forever)
await p1.run(__name__, 'name_error') await p1.run(name_error)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -18,10 +18,17 @@ async def spawn_until(depth=0):
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
if depth < 1: if depth < 1:
# await n.run_in_actor('breakpoint_forever', breakpoint_forever) # await n.run_in_actor('breakpoint_forever', breakpoint_forever)
await n.run_in_actor('name_error', name_error) await n.run_in_actor(
name_error,
name='name_error'
)
else: else:
depth -= 1 depth -= 1
await n.run_in_actor(f'spawn_until_{depth}', spawn_until, depth=depth) await n.run_in_actor(
spawn_until,
depth=depth,
name=f'spawn_until_{depth}',
)
async def main(): async def main():
@ -46,12 +53,20 @@ async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
# spawn both actors # spawn both actors
portal = await n.run_in_actor('spawner0', spawn_until, depth=3) portal = await n.run_in_actor(
portal1 = await n.run_in_actor('spawner1', spawn_until, depth=4) spawn_until,
depth=3,
name='spawner0',
)
portal1 = await n.run_in_actor(
spawn_until,
depth=4,
name='spawner1',
)
# gah still an issue here. # gah still an issue here.
# await portal.result() await portal.result()
# await portal1.result() await portal1.result()
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -10,7 +10,10 @@ async def spawn_error():
""""A nested nursery that triggers another ``NameError``. """"A nested nursery that triggers another ``NameError``.
""" """
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
portal = await n.run_in_actor('name_error_1', name_error) portal = await n.run_in_actor(
name_error,
name='name_error_1',
)
return await portal.result() return await portal.result()
@ -27,8 +30,14 @@ async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
# spawn both actors # spawn both actors
portal = await n.run_in_actor('name_error', name_error) portal = await n.run_in_actor(
portal1 = await n.run_in_actor('spawn_error', spawn_error) name_error,
name='name_error',
)
portal1 = await n.run_in_actor(
spawn_error,
name='spawn_error',
)
# trigger a root actor error # trigger a root actor error
assert 0 assert 0

View File

@ -18,7 +18,10 @@ async def spawn_error():
""""A nested nursery that triggers another ``NameError``. """"A nested nursery that triggers another ``NameError``.
""" """
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
portal = await n.run_in_actor('name_error_1', name_error) portal = await n.run_in_actor(
name_error,
name='name_error_1',
)
return await portal.result() return await portal.result()
@ -38,9 +41,9 @@ async def main():
# Spawn both actors, don't bother with collecting results # Spawn both actors, don't bother with collecting results
# (would result in a different debugger outcome due to parent's # (would result in a different debugger outcome due to parent's
# cancellation). # cancellation).
await n.run_in_actor('bp_forever', breakpoint_forever) await n.run_in_actor(breakpoint_forever)
await n.run_in_actor('name_error', name_error) await n.run_in_actor(name_error)
await n.run_in_actor('spawn_error', spawn_error) await n.run_in_actor(spawn_error)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -12,10 +12,14 @@ async def spawn_until(depth=0):
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
if depth < 1: if depth < 1:
# await n.run_in_actor('breakpoint_forever', breakpoint_forever) # await n.run_in_actor('breakpoint_forever', breakpoint_forever)
await n.run_in_actor('name_error', name_error) await n.run_in_actor(name_error)
else: else:
depth -= 1 depth -= 1
await n.run_in_actor(f'spawn_until_{depth}', spawn_until, depth=depth) await n.run_in_actor(
spawn_until,
depth=depth,
name=f'spawn_until_{depth}',
)
async def main(): async def main():
@ -36,8 +40,16 @@ async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
# spawn both actors # spawn both actors
portal = await n.run_in_actor('spawner0', spawn_until, depth=0) portal = await n.run_in_actor(
portal1 = await n.run_in_actor('spawner1', spawn_until, depth=1) spawn_until,
depth=0,
name='spawner0',
)
portal1 = await n.run_in_actor(
spawn_until,
depth=1,
name='spawner1',
)
# nursery cancellation should be triggered due to propagated # nursery cancellation should be triggered due to propagated
# error from child. # error from child.

View File

@ -15,7 +15,6 @@ async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
portal = await n.run_in_actor( portal = await n.run_in_actor(
'breakpoint_forever',
breakpoint_forever, breakpoint_forever,
) )
await portal.result() await portal.result()

View File

@ -8,7 +8,7 @@ async def name_error():
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
portal = await n.run_in_actor('name_error', name_error) portal = await n.run_in_actor(name_error)
await portal.result() await portal.result()

View File

@ -30,9 +30,7 @@ async def aggregate(seed):
async def push_to_chan(portal, send_chan): async def push_to_chan(portal, send_chan):
async with send_chan: async with send_chan:
async for value in await portal.run( async for value in await portal.run(stream_data, seed=seed):
__name__, 'stream_data', seed=seed
):
# leverage trio's built-in backpressure # leverage trio's built-in backpressure
await send_chan.send(value) await send_chan.send(value)
@ -74,8 +72,8 @@ async def main():
pre_start = time.time() pre_start = time.time()
portal = await nursery.run_in_actor( portal = await nursery.run_in_actor(
'aggregator',
aggregate, aggregate,
name='aggregator',
seed=seed, seed=seed,
) )

View File

@ -15,7 +15,7 @@ async def main():
)) ))
# start one actor that will fail immediately # start one actor that will fail immediately
await n.run_in_actor('extra', assert_err) await n.run_in_actor(assert_err)
# should error here with a ``RemoteActorError`` containing # should error here with a ``RemoteActorError`` containing
# an ``AssertionError`` and all the other actors have been cancelled # an ``AssertionError`` and all the other actors have been cancelled

View File

@ -49,7 +49,7 @@ def test_remote_error(arb_addr, args_err):
async def main(): async def main():
async with tractor.open_nursery() as nursery: async with tractor.open_nursery() as nursery:
portal = await nursery.run_in_actor('errorer', assert_err, **args) portal = await nursery.run_in_actor(assert_err, name='errorer', **args)
# get result(s) from main task # get result(s) from main task
try: try:
@ -73,8 +73,8 @@ def test_multierror(arb_addr):
async def main(): async def main():
async with tractor.open_nursery() as nursery: async with tractor.open_nursery() as nursery:
await nursery.run_in_actor('errorer1', assert_err) await nursery.run_in_actor(assert_err, name='errorer1')
portal2 = await nursery.run_in_actor('errorer2', assert_err) portal2 = await nursery.run_in_actor(assert_err, name='errorer2')
# get result(s) from main task # get result(s) from main task
try: try:
@ -104,7 +104,10 @@ def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
async with tractor.open_nursery() as nursery: async with tractor.open_nursery() as nursery:
for i in range(num_subactors): for i in range(num_subactors):
await nursery.run_in_actor( await nursery.run_in_actor(
f'errorer{i}', assert_err, delay=delay) assert_err,
name=f'errorer{i}',
delay=delay
)
with pytest.raises(trio.MultiError) as exc_info: with pytest.raises(trio.MultiError) as exc_info:
tractor.run(main, arbiter_addr=arb_addr) tractor.run(main, arbiter_addr=arb_addr)
@ -134,7 +137,7 @@ def test_cancel_single_subactor(arb_addr, mechanism):
portal = await nursery.start_actor( portal = await nursery.start_actor(
'nothin', rpc_module_paths=[__name__], 'nothin', rpc_module_paths=[__name__],
) )
assert (await portal.run(__name__, 'do_nothing')) is None assert (await portal.run(do_nothing)) is None
if mechanism == 'nursery_cancel': if mechanism == 'nursery_cancel':
# would hang otherwise # would hang otherwise
@ -170,7 +173,7 @@ async def test_cancel_infinite_streamer(start_method):
# this async for loop streams values from the above # this async for loop streams values from the above
# async generator running in a separate process # async generator running in a separate process
async for letter in await portal.run(__name__, 'stream_forever'): async for letter in await portal.run(stream_forever):
print(letter) print(letter)
# we support trio's cancellation system # we support trio's cancellation system
@ -231,7 +234,12 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
for i in range(num_actors): for i in range(num_actors):
# start actor(s) that will fail immediately # start actor(s) that will fail immediately
riactor_portals.append( riactor_portals.append(
await n.run_in_actor(f'actor_{i}', func, **kwargs)) await n.run_in_actor(
func,
name=f'actor_{i}',
**kwargs
)
)
if da_func: if da_func:
func, kwargs, expect_error = da_func func, kwargs, expect_error = da_func
@ -239,7 +247,8 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
# if this function fails then we should error here # if this function fails then we should error here
# and the nursery should teardown all other actors # and the nursery should teardown all other actors
try: try:
await portal.run(__name__, func.__name__, **kwargs) await portal.run(func, **kwargs)
except tractor.RemoteActorError as err: except tractor.RemoteActorError as err:
assert err.type == err_type assert err.type == err_type
# we only expect this first error to propogate # we only expect this first error to propogate
@ -276,21 +285,24 @@ async def spawn_and_error(breadth, depth) -> None:
name = tractor.current_actor().name name = tractor.current_actor().name
async with tractor.open_nursery() as nursery: async with tractor.open_nursery() as nursery:
for i in range(breadth): for i in range(breadth):
if depth > 0: if depth > 0:
args = ( args = (
f'spawner_{i}_depth_{depth}',
spawn_and_error, spawn_and_error,
) )
kwargs = { kwargs = {
'name': f'spawner_{i}_depth_{depth}',
'breadth': breadth, 'breadth': breadth,
'depth': depth - 1, 'depth': depth - 1,
} }
else: else:
args = ( args = (
f'{name}_errorer_{i}',
assert_err, assert_err,
) )
kwargs = {} kwargs = {
'name': f'{name}_errorer_{i}',
}
await nursery.run_in_actor(*args, **kwargs) await nursery.run_in_actor(*args, **kwargs)
@ -318,8 +330,8 @@ async def test_nested_multierrors(loglevel, start_method):
async with tractor.open_nursery() as nursery: async with tractor.open_nursery() as nursery:
for i in range(subactor_breadth): for i in range(subactor_breadth):
await nursery.run_in_actor( await nursery.run_in_actor(
f'spawner_{i}',
spawn_and_error, spawn_and_error,
name=f'spawner_{i}',
breadth=subactor_breadth, breadth=subactor_breadth,
depth=depth, depth=depth,
) )
@ -398,7 +410,10 @@ def test_cancel_via_SIGINT_other_task(
async def spawn_and_sleep_forever(task_status=trio.TASK_STATUS_IGNORED): async def spawn_and_sleep_forever(task_status=trio.TASK_STATUS_IGNORED):
async with tractor.open_nursery() as tn: async with tractor.open_nursery() as tn:
for i in range(3): for i in range(3):
await tn.run_in_actor('sucka', sleep_forever) await tn.run_in_actor(
sleep_forever,
name='namesucka',
)
task_status.started() task_status.started()
await trio.sleep_forever() await trio.sleep_forever()
@ -423,7 +438,10 @@ async def spin_for(period=3):
async def spawn(): async def spawn():
async with tractor.open_nursery() as tn: async with tractor.open_nursery() as tn:
portal = await tn.run_in_actor('sleeper', spin_for) portal = await tn.run_in_actor(
spin_for,
name='sleeper',
)
@no_windows @no_windows
@ -442,7 +460,10 @@ def test_cancel_while_childs_child_in_sync_sleep(
async def main(): async def main():
with trio.fail_after(2): with trio.fail_after(2):
async with tractor.open_nursery() as tn: async with tractor.open_nursery() as tn:
portal = await tn.run_in_actor('spawn', spawn) portal = await tn.run_in_actor(
spawn,
name='spawn',
)
await trio.sleep(1) await trio.sleep(1)
assert 0 assert 0

View File

@ -237,7 +237,7 @@ def test_multi_subactors(spawn):
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching pdb to actor: ('bp_forever'" in before assert "Attaching pdb to actor: ('breakpoint_forever'" in before
# do some "next" commands to demonstrate recurrent breakpoint # do some "next" commands to demonstrate recurrent breakpoint
# entries # entries
@ -265,7 +265,7 @@ def test_multi_subactors(spawn):
child.sendline('c') child.sendline('c')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching pdb to actor: ('bp_forever'" in before assert "Attaching pdb to actor: ('breakpoint_forever'" in before
# now run some "continues" to show re-entries # now run some "continues" to show re-entries
for _ in range(5): for _ in range(5):
@ -277,7 +277,7 @@ def test_multi_subactors(spawn):
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('arbiter'" in before assert "Attaching to pdb in crashed actor: ('arbiter'" in before
assert "RemoteActorError: ('bp_forever'" in before assert "RemoteActorError: ('breakpoint_forever'" in before
assert 'bdb.BdbQuit' in before assert 'bdb.BdbQuit' in before
# process should exit # process should exit
@ -285,7 +285,7 @@ def test_multi_subactors(spawn):
child.expect(pexpect.EOF) child.expect(pexpect.EOF)
before = str(child.before.decode()) before = str(child.before.decode())
assert "RemoteActorError: ('bp_forever'" in before assert "RemoteActorError: ('breakpoint_forever'" in before
assert 'bdb.BdbQuit' in before assert 'bdb.BdbQuit' in before
@ -391,8 +391,9 @@ def test_multi_nested_subactors_error_through_nurseries(spawn):
child.expect(pexpect.EOF) child.expect(pexpect.EOF)
before = str(child.before.decode()) if not timed_out_early:
assert "NameError" in before before = str(child.before.decode())
assert "NameError" in before
def test_root_nursery_cancels_before_child_releases_tty_lock(spawn, start_method): def test_root_nursery_cancels_before_child_releases_tty_lock(spawn, start_method):

View File

@ -74,14 +74,14 @@ async def test_trynamic_trio(func, start_method):
print("Alright... Action!") print("Alright... Action!")
donny = await n.run_in_actor( donny = await n.run_in_actor(
'donny',
func, func,
other_actor='gretchen', other_actor='gretchen',
name='donny',
) )
gretchen = await n.run_in_actor( gretchen = await n.run_in_actor(
'gretchen',
func, func,
other_actor='donny', other_actor='donny',
name='gretchen',
) )
print(await gretchen.result()) print(await gretchen.result())
print(await donny.result()) print(await donny.result())
@ -124,12 +124,15 @@ async def spawn_and_check_registry(
assert not actor.is_arbiter assert not actor.is_arbiter
async with tractor.get_arbiter(*arb_addr) as portal: async with tractor.get_arbiter(*arb_addr) as portal:
if actor.is_arbiter: if actor.is_arbiter:
async def get_reg(): async def get_reg():
return actor._registry return actor._registry
extra = 1 # arbiter is local root actor extra = 1 # arbiter is local root actor
else: else:
get_reg = partial(portal.run, 'self', 'get_registry') get_reg = partial(portal.run_from_ns, 'self', 'get_registry')
extra = 2 # local root actor + remote arbiter extra = 2 # local root actor + remote arbiter
# ensure current actor is registered # ensure current actor is registered
@ -147,7 +150,7 @@ async def spawn_and_check_registry(
portals = {} portals = {}
for i in range(3): for i in range(3):
name = f'a{i}' name = f'a{i}'
portals[name] = await n.run_in_actor(name, to_run) portals[name] = await n.run_in_actor(to_run, name=name)
# wait on last actor to come up # wait on last actor to come up
async with tractor.wait_for_actor(name): async with tractor.wait_for_actor(name):
@ -254,14 +257,17 @@ async def close_chans_before_nursery(
async with tractor.get_arbiter(*arb_addr) as aportal: async with tractor.get_arbiter(*arb_addr) as aportal:
try: try:
get_reg = partial(aportal.run, 'self', 'get_registry') get_reg = partial(aportal.run_from_ns, 'self', 'get_registry')
async with tractor.open_nursery() as tn: async with tractor.open_nursery() as tn:
portal1 = await tn.run_in_actor('consumer1', stream_forever) portal1 = await tn.run_in_actor(
stream_forever,
name='consumer1',
)
agen1 = await portal1.result() agen1 = await portal1.result()
portal2 = await tn.start_actor('consumer2', rpc_module_paths=[__name__]) portal2 = await tn.start_actor('consumer2', rpc_module_paths=[__name__])
agen2 = await portal2.run(__name__, 'stream_forever') agen2 = await portal2.run(stream_forever)
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
n.start_soon(streamer, agen1) n.start_soon(streamer, agen1)

View File

@ -46,7 +46,7 @@ async def test_self_is_registered_localportal(arb_addr):
assert actor.is_arbiter assert actor.is_arbiter
async with tractor.get_arbiter(*arb_addr) as portal: async with tractor.get_arbiter(*arb_addr) as portal:
assert isinstance(portal, tractor._portal.LocalPortal) assert isinstance(portal, tractor._portal.LocalPortal)
sockaddr = await portal.run('self', 'wait_for_actor', name='arbiter') sockaddr = await portal.run_from_ns('self', 'wait_for_actor', name='arbiter')
assert sockaddr[0] == arb_addr assert sockaddr[0] == arb_addr

View File

@ -28,15 +28,19 @@ def is_even(i):
return i % 2 == 0 return i % 2 == 0
# placeholder for topics getter
_get_topics = None
@tractor.msg.pub @tractor.msg.pub
async def pubber(get_topics, seed=10): async def pubber(get_topics, seed=10):
ss = tractor.current_actor().statespace
# ensure topic subscriptions are as expected
global _get_topics
_get_topics = get_topics
for i in cycle(range(seed)): for i in cycle(range(seed)):
# ensure topic subscriptions are as expected
ss['get_topics'] = get_topics
yield {'even' if is_even(i) else 'odd': i} yield {'even' if is_even(i) else 'odd': i}
await trio.sleep(0.1) await trio.sleep(0.1)
@ -58,7 +62,7 @@ async def subs(
async with tractor.find_actor(pub_actor_name) as portal: async with tractor.find_actor(pub_actor_name) as portal:
stream = await portal.run( stream = await portal.run(
__name__, 'pubber', pubber,
topics=which, topics=which,
seed=seed, seed=seed,
) )
@ -76,7 +80,7 @@ async def subs(
await stream.aclose() await stream.aclose()
stream = await portal.run( stream = await portal.run(
__name__, 'pubber', pubber,
topics=['odd'], topics=['odd'],
seed=seed, seed=seed,
) )
@ -126,7 +130,10 @@ async def test_required_args(callwith_expecterror):
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
# await func(**kwargs) # await func(**kwargs)
portal = await n.run_in_actor( portal = await n.run_in_actor(
'pubber', multilock_pubber, **kwargs) multilock_pubber,
name='pubber',
**kwargs
)
async with tractor.wait_for_actor('pubber'): async with tractor.wait_for_actor('pubber'):
pass pass
@ -148,8 +155,9 @@ def test_multi_actor_subs_arbiter_pub(
): ):
"""Try out the neato @pub decorator system. """Try out the neato @pub decorator system.
""" """
global _get_topics
async def main(): async def main():
ss = tractor.current_actor().statespace
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
@ -163,20 +171,29 @@ def test_multi_actor_subs_arbiter_pub(
) )
even_portal = await n.run_in_actor( even_portal = await n.run_in_actor(
'evens', subs, which=['even'], pub_actor_name=name) subs,
which=['even'],
name='evens',
pub_actor_name=name
)
odd_portal = await n.run_in_actor( odd_portal = await n.run_in_actor(
'odds', subs, which=['odd'], pub_actor_name=name) subs,
which=['odd'],
name='odds',
pub_actor_name=name
)
async with tractor.wait_for_actor('evens'): async with tractor.wait_for_actor('evens'):
# block until 2nd actor is initialized # block until 2nd actor is initialized
pass pass
if pub_actor == 'arbiter': if pub_actor == 'arbiter':
# wait for publisher task to be spawned in a local RPC task # wait for publisher task to be spawned in a local RPC task
while not ss.get('get_topics'): while _get_topics is None:
await trio.sleep(0.1) await trio.sleep(0.1)
get_topics = ss.get('get_topics') get_topics = _get_topics
assert 'even' in get_topics() assert 'even' in get_topics()

View File

@ -80,9 +80,11 @@ def test_rpc_errors(arb_addr, to_call, testdir):
# spawn a subactor which calls us back # spawn a subactor which calls us back
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
await n.run_in_actor( await n.run_in_actor(
'subactor',
sleep_back_actor, sleep_back_actor,
actor_name=subactor_requests_to, actor_name=subactor_requests_to,
name='subactor',
# function from the local exposed module space # function from the local exposed module space
# the subactor will invoke when it RPCs back to this actor # the subactor will invoke when it RPCs back to this actor
func_name=funcname, func_name=funcname,

View File

@ -1,31 +1,33 @@
""" """
Spawning basics Spawning basics
""" """
from functools import partial
import pytest import pytest
import trio import trio
import tractor import tractor
from conftest import tractor_test from conftest import tractor_test
statespace = {'doggy': 10, 'kitty': 4} data_to_pass_down = {'doggy': 10, 'kitty': 4}
async def spawn(is_arbiter): async def spawn(is_arbiter, data):
namespaces = [__name__] namespaces = [__name__]
await trio.sleep(0.1) await trio.sleep(0.1)
actor = tractor.current_actor() actor = tractor.current_actor()
assert actor.is_arbiter == is_arbiter assert actor.is_arbiter == is_arbiter
assert actor.statespace == statespace data == data_to_pass_down
if actor.is_arbiter: if actor.is_arbiter:
async with tractor.open_nursery() as nursery: async with tractor.open_nursery() as nursery:
# forks here # forks here
portal = await nursery.run_in_actor( portal = await nursery.run_in_actor(
'sub-actor',
spawn, spawn,
is_arbiter=False, is_arbiter=False,
statespace=statespace, name='sub-actor',
data=data,
rpc_module_paths=namespaces, rpc_module_paths=namespaces,
) )
@ -41,10 +43,9 @@ async def spawn(is_arbiter):
def test_local_arbiter_subactor_global_state(arb_addr): def test_local_arbiter_subactor_global_state(arb_addr):
result = tractor.run( result = tractor.run(
spawn, partial(spawn, data=data_to_pass_down),
True, True,
name='arbiter', name='arbiter',
statespace=statespace,
arbiter_addr=arb_addr, arbiter_addr=arb_addr,
) )
assert result == 10 assert result == 10
@ -69,9 +70,9 @@ async def test_movie_theatre_convo(start_method):
rpc_module_paths=[__name__], rpc_module_paths=[__name__],
) )
print(await portal.run(__name__, 'movie_theatre_question')) print(await portal.run(movie_theatre_question))
# call the subactor a 2nd time # call the subactor a 2nd time
print(await portal.run(__name__, 'movie_theatre_question')) print(await portal.run(movie_theatre_question))
# the async with will block here indefinitely waiting # the async with will block here indefinitely waiting
# for our actor "frank" to complete, we cancel 'frank' # for our actor "frank" to complete, we cancel 'frank'
@ -89,7 +90,10 @@ async def test_most_beautiful_word(start_method):
""" """
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
portal = await n.run_in_actor('some_linguist', cellar_door) portal = await n.run_in_actor(
cellar_door,
name='some_linguist',
)
# The ``async with`` will unblock here since the 'some_linguist' # The ``async with`` will unblock here since the 'some_linguist'
# actor has completed its main task ``cellar_door``. # actor has completed its main task ``cellar_door``.
@ -119,7 +123,6 @@ def test_loglevel_propagated_to_subactor(
async def main(): async def main():
async with tractor.open_nursery() as tn: async with tractor.open_nursery() as tn:
await tn.run_in_actor( await tn.run_in_actor(
'log_checker',
check_loglevel, check_loglevel,
level=level, level=level,
) )

View File

@ -50,7 +50,7 @@ async def context_stream(ctx, sequence):
assert cs.cancelled_caught assert cs.cancelled_caught
async def stream_from_single_subactor(stream_func_name): async def stream_from_single_subactor(stream_func):
"""Verify we can spawn a daemon actor and retrieve streamed data. """Verify we can spawn a daemon actor and retrieve streamed data.
""" """
async with tractor.find_actor('streamerd') as portals: async with tractor.find_actor('streamerd') as portals:
@ -62,14 +62,12 @@ async def stream_from_single_subactor(stream_func_name):
portal = await nursery.start_actor( portal = await nursery.start_actor(
'streamerd', 'streamerd',
rpc_module_paths=[__name__], rpc_module_paths=[__name__],
statespace={'global_dict': {}},
) )
seq = range(10) seq = range(10)
stream = await portal.run( stream = await portal.run(
__name__, stream_func, # one of the funcs above
stream_func_name, # one of the funcs above
sequence=list(seq), # has to be msgpack serializable sequence=list(seq), # has to be msgpack serializable
) )
# it'd sure be nice to have an asyncitertools here... # it'd sure be nice to have an asyncitertools here...
@ -97,7 +95,7 @@ async def stream_from_single_subactor(stream_func_name):
@pytest.mark.parametrize( @pytest.mark.parametrize(
'stream_func', ['async_gen_stream', 'context_stream'] 'stream_func', [async_gen_stream, context_stream]
) )
def test_stream_from_single_subactor(arb_addr, start_method, stream_func): def test_stream_from_single_subactor(arb_addr, start_method, stream_func):
"""Verify streaming from a spawned async generator. """Verify streaming from a spawned async generator.
@ -105,7 +103,7 @@ def test_stream_from_single_subactor(arb_addr, start_method, stream_func):
tractor.run( tractor.run(
partial( partial(
stream_from_single_subactor, stream_from_single_subactor,
stream_func_name=stream_func, stream_func=stream_func,
), ),
arbiter_addr=arb_addr, arbiter_addr=arb_addr,
start_method=start_method, start_method=start_method,
@ -186,9 +184,9 @@ async def a_quadruple_example():
pre_start = time.time() pre_start = time.time()
portal = await nursery.run_in_actor( portal = await nursery.run_in_actor(
'aggregator',
aggregate, aggregate,
seed=seed, seed=seed,
name='aggregator',
) )
start = time.time() start = time.time()
@ -275,9 +273,9 @@ async def test_respawn_consumer_task(
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
stream = await(await n.run_in_actor( stream = await(await n.run_in_actor(
'streamer',
stream_data, stream_data,
seed=11, seed=11,
name='streamer',
)).result() )).result()
expect = set(range(11)) expect = set(range(11))

View File

@ -16,7 +16,7 @@ from ._streaming import Context, stream
from ._discovery import get_arbiter, find_actor, wait_for_actor from ._discovery import get_arbiter, find_actor, wait_for_actor
from ._actor import Actor, _start_actor, Arbiter from ._actor import Actor, _start_actor, Arbiter
from ._trionics import open_nursery from ._trionics import open_nursery
from ._state import current_actor from ._state import current_actor, is_root_process
from . import _state from . import _state
from ._exceptions import RemoteActorError, ModuleNotExposed from ._exceptions import RemoteActorError, ModuleNotExposed
from ._debug import breakpoint, post_mortem from ._debug import breakpoint, post_mortem

View File

@ -126,10 +126,17 @@ async def _invoke(
with cancel_scope as cs: with cancel_scope as cs:
task_status.started(cs) task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid}) await chan.send({'return': await coro, 'cid': cid})
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
# NOTE: don't enter debug mode recursively after quitting pdb
log.exception("Actor crashed:") if not isinstance(err, trio.ClosedResourceError):
await _debug._maybe_enter_pm(err) log.exception("Actor crashed:")
# XXX: is there any case where we'll want to debug IPC
# disconnects? I can't think of a reason that inspecting
# this type of failure will be useful for respawns or
# recovery logic - the only case is some kind of strange bug
# in `trio` itself?
await _debug._maybe_enter_pm(err)
# always ship errors back to caller # always ship errors back to caller
err_msg = pack_error(err) err_msg = pack_error(err)
@ -187,7 +194,6 @@ class Actor:
name: str, name: str,
*, *,
rpc_module_paths: List[str] = [], rpc_module_paths: List[str] = [],
statespace: Optional[Dict[str, Any]] = None,
uid: str = None, uid: str = None,
loglevel: str = None, loglevel: str = None,
arbiter_addr: Optional[Tuple[str, int]] = None, arbiter_addr: Optional[Tuple[str, int]] = None,
@ -219,7 +225,6 @@ class Actor:
# TODO: consider making this a dynamically defined # TODO: consider making this a dynamically defined
# @dataclass once we get py3.7 # @dataclass once we get py3.7
self.statespace = statespace or {}
self.loglevel = loglevel self.loglevel = loglevel
self._arb_addr = arbiter_addr self._arb_addr = arbiter_addr
@ -705,7 +710,7 @@ class Actor:
assert isinstance(self._arb_addr, tuple) assert isinstance(self._arb_addr, tuple)
async with get_arbiter(*self._arb_addr) as arb_portal: async with get_arbiter(*self._arb_addr) as arb_portal:
await arb_portal.run( await arb_portal.run_from_ns(
'self', 'self',
'register_actor', 'register_actor',
uid=self.uid, uid=self.uid,
@ -781,8 +786,11 @@ class Actor:
cs.shield = True cs.shield = True
try: try:
async with get_arbiter(*self._arb_addr) as arb_portal: async with get_arbiter(*self._arb_addr) as arb_portal:
await arb_portal.run( await arb_portal.run_from_ns(
'self', 'unregister_actor', uid=self.uid) 'self',
'unregister_actor',
uid=self.uid
)
except OSError: except OSError:
failed = True failed = True
if cs.cancelled_caught: if cs.cancelled_caught:

View File

@ -180,15 +180,14 @@ def _breakpoint(debug_func) -> Awaitable[None]:
try: try:
async with get_root() as portal: async with get_root() as portal:
with trio.fail_after(.5): with trio.fail_after(.5):
agen = await portal.run( stream = await portal.run(
'tractor._debug', tractor._debug._hijack_stdin_relay_to_child,
'_hijack_stdin_relay_to_child',
subactor_uid=actor.uid, subactor_uid=actor.uid,
) )
async with aclosing(agen): async with aclosing(stream):
# block until first yield above # block until first yield above
async for val in agen: async for val in stream:
assert val == 'Locked' assert val == 'Locked'
task_status.started() task_status.started()
@ -305,6 +304,10 @@ post_mortem = partial(
async def _maybe_enter_pm(err): async def _maybe_enter_pm(err):
if ( if (
_state.debug_mode() _state.debug_mode()
# NOTE: don't enter debug mode recursively after quitting pdb
# Iow, don't re-enter the repl if the `quit` command was issued
# by the user.
and not isinstance(err, bdb.BdbQuit) and not isinstance(err, bdb.BdbQuit)
# XXX: if the error is the likely result of runtime-wide # XXX: if the error is the likely result of runtime-wide

View File

@ -60,7 +60,7 @@ async def find_actor(
""" """
actor = current_actor() actor = current_actor()
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
sockaddr = await arb_portal.run('self', 'find_actor', name=name) sockaddr = await arb_portal.run_from_ns('self', 'find_actor', name=name)
# TODO: return portals to all available actors - for now just # TODO: return portals to all available actors - for now just
# the last one that registered # the last one that registered
if name == 'arbiter' and actor.is_arbiter: if name == 'arbiter' and actor.is_arbiter:
@ -84,7 +84,7 @@ async def wait_for_actor(
""" """
actor = current_actor() actor = current_actor()
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
sockaddrs = await arb_portal.run('self', 'wait_for_actor', name=name) sockaddrs = await arb_portal.run_from_ns('self', 'wait_for_actor', name=name)
sockaddr = sockaddrs[-1] sockaddr = sockaddrs[-1]
async with _connect_chan(*sockaddr) as chan: async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal: async with open_portal(chan) as portal:

View File

@ -8,6 +8,7 @@ from typing import Tuple, Any, Dict, Optional, Set, Iterator
from functools import partial from functools import partial
from dataclasses import dataclass from dataclasses import dataclass
from contextlib import contextmanager from contextlib import contextmanager
import warnings
import trio import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
@ -119,7 +120,7 @@ class ReceiveStream(trio.abc.ReceiveChannel):
# NOTE: we're telling the far end actor to cancel a task # NOTE: we're telling the far end actor to cancel a task
# corresponding to *this actor*. The far end local channel # corresponding to *this actor*. The far end local channel
# instance is passed to `Actor._cancel_task()` implicitly. # instance is passed to `Actor._cancel_task()` implicitly.
await self._portal.run('self', '_cancel_task', cid=cid) await self._portal.run_from_ns('self', '_cancel_task', cid=cid)
if cs.cancelled_caught: if cs.cancelled_caught:
# XXX: there's no way to know if the remote task was indeed # XXX: there's no way to know if the remote task was indeed
@ -197,15 +198,59 @@ class Portal:
"A pending main result has already been submitted" "A pending main result has already been submitted"
self._expect_result = await self._submit(ns, func, kwargs) self._expect_result = await self._submit(ns, func, kwargs)
async def run(self, ns: str, func: str, **kwargs) -> Any: async def run(
self,
func_or_ns: str,
fn_name: Optional[str] = None,
**kwargs
) -> Any:
"""Submit a remote function to be scheduled and run by actor, """Submit a remote function to be scheduled and run by actor,
wrap and return its (stream of) result(s). wrap and return its (stream of) result(s).
This is a blocking call and returns either a value from the This is a blocking call and returns either a value from the
remote rpc task or a local async generator instance. remote rpc task or a local async generator instance.
""" """
if isinstance(func_or_ns, str):
warnings.warn(
"`Portal.run(namespace: str, funcname: str)` is now"
"deprecated, pass a function reference directly instead\n"
"If you still want to run a remote function by name use"
"`Portal.run_from_ns()`",
DeprecationWarning,
stacklevel=2,
)
fn_mod_path = func_or_ns
assert isinstance(fn_name, str)
else: # function reference was passed directly
fn = func_or_ns
fn_mod_path = fn.__module__
fn_name = fn.__name__
return await self._return_from_resptype( return await self._return_from_resptype(
*(await self._submit(ns, func, kwargs)) *(await self._submit(fn_mod_path, fn_name, kwargs))
)
async def run_from_ns(
self,
namespace_path: str,
function_name: str,
**kwargs,
) -> Any:
"""Run a function from a (remote) namespace in a new task on the far-end actor.
This is a more explitcit way to run tasks in a remote-process
actor using explicit object-path syntax. Hint: this is how
`.run()` works underneath.
Note::
A special namespace `self` can be used to invoke `Actor`
instance methods in the remote runtime. Currently this should only
be used for `tractor` internals.
"""
return await self._return_from_resptype(
*(await self._submit(namespace_path, function_name, kwargs))
) )
async def _return_from_resptype( async def _return_from_resptype(
@ -274,7 +319,14 @@ class Portal:
log.warning( log.warning(
f"Cancelling all streams with {self.channel.uid}") f"Cancelling all streams with {self.channel.uid}")
for stream in self._streams.copy(): for stream in self._streams.copy():
await stream.aclose() try:
await stream.aclose()
except trio.ClosedResourceError:
# don't error the stream having already been closed
# (unless of course at some point down the road we
# won't expect this to always be the case or need to
# detect it for respawning purposes?)
log.debug(f"{stream} was already closed.")
async def aclose(self): async def aclose(self):
log.debug(f"Closing {self}") log.debug(f"Closing {self}")
@ -302,13 +354,16 @@ class Portal:
# with trio.CancelScope(shield=True) as cancel_scope: # with trio.CancelScope(shield=True) as cancel_scope:
with trio.move_on_after(0.5) as cancel_scope: with trio.move_on_after(0.5) as cancel_scope:
cancel_scope.shield = True cancel_scope.shield = True
await self.run('self', 'cancel')
await self.run_from_ns('self', 'cancel')
return True return True
if cancel_scope.cancelled_caught: if cancel_scope.cancelled_caught:
log.warning(f"May have failed to cancel {self.channel.uid}") log.warning(f"May have failed to cancel {self.channel.uid}")
# if we get here some weird cancellation case happened # if we get here some weird cancellation case happened
return False return False
except trio.ClosedResourceError: except trio.ClosedResourceError:
log.warning( log.warning(
f"{self.channel} for {self.channel.uid} was already closed?") f"{self.channel} for {self.channel.uid} was already closed?")
@ -325,8 +380,10 @@ class LocalPortal:
actor: 'Actor' # type: ignore # noqa actor: 'Actor' # type: ignore # noqa
channel: Channel channel: Channel
async def run(self, ns: str, func_name: str, **kwargs) -> Any: async def run_from_ns(self, ns: str, func_name: str, **kwargs) -> Any:
"""Run a requested function locally and return it's result. """Run a requested local function from a namespace path and
return it's result.
""" """
obj = self.actor if ns == 'self' else importlib.import_module(ns) obj = self.actor if ns == 'self' else importlib.import_module(ns)
func = getattr(obj, func_name) func = getattr(obj, func_name)

View File

@ -248,7 +248,6 @@ async def new_proc(
await chan.send({ await chan.send({
"_parent_main_data": subactor._parent_main_data, "_parent_main_data": subactor._parent_main_data,
"rpc_module_paths": subactor.rpc_module_paths, "rpc_module_paths": subactor.rpc_module_paths,
"statespace": subactor.statespace,
"_arb_addr": subactor._arb_addr, "_arb_addr": subactor._arb_addr,
"bind_host": bind_addr[0], "bind_host": bind_addr[0],
"bind_port": bind_addr[1], "bind_port": bind_addr[1],

View File

@ -52,7 +52,6 @@ class ActorNursery:
name: str, name: str,
*, *,
bind_addr: Tuple[str, int] = _default_bind_addr, bind_addr: Tuple[str, int] = _default_bind_addr,
statespace: Optional[Dict[str, Any]] = None,
rpc_module_paths: List[str] = None, rpc_module_paths: List[str] = None,
loglevel: str = None, # set log level per subactor loglevel: str = None, # set log level per subactor
nursery: trio.Nursery = None, nursery: trio.Nursery = None,
@ -67,7 +66,6 @@ class ActorNursery:
name, name,
# modules allowed to invoked funcs from # modules allowed to invoked funcs from
rpc_module_paths=rpc_module_paths or [], rpc_module_paths=rpc_module_paths or [],
statespace=statespace, # global proc state vars
loglevel=loglevel, loglevel=loglevel,
arbiter_addr=current_actor()._arb_addr, arbiter_addr=current_actor()._arb_addr,
) )
@ -94,12 +92,11 @@ class ActorNursery:
async def run_in_actor( async def run_in_actor(
self, self,
name: str,
fn: typing.Callable, fn: typing.Callable,
*, *,
name: Optional[str] = None,
bind_addr: Tuple[str, int] = _default_bind_addr, bind_addr: Tuple[str, int] = _default_bind_addr,
rpc_module_paths: Optional[List[str]] = None, rpc_module_paths: Optional[List[str]] = None,
statespace: Dict[str, Any] = None,
loglevel: str = None, # set log level per subactor loglevel: str = None, # set log level per subactor
**kwargs, # explicit args to ``fn`` **kwargs, # explicit args to ``fn``
) -> Portal: ) -> Portal:
@ -111,11 +108,15 @@ class ActorNursery:
the actor is terminated. the actor is terminated.
""" """
mod_path = fn.__module__ mod_path = fn.__module__
if name is None:
# use the explicit function name if not provided
name = fn.__name__
portal = await self.start_actor( portal = await self.start_actor(
name, name,
rpc_module_paths=[mod_path] + (rpc_module_paths or []), rpc_module_paths=[mod_path] + (rpc_module_paths or []),
bind_addr=bind_addr, bind_addr=bind_addr,
statespace=statespace,
loglevel=loglevel, loglevel=loglevel,
# use the run_in_actor nursery # use the run_in_actor nursery
nursery=self._ria_nursery, nursery=self._ria_nursery,

View File

@ -90,6 +90,9 @@ def modify_subs(topics2ctxs, topics, ctx):
topics2ctxs.pop(topic) topics2ctxs.pop(topic)
_pub_state: Dict[str, dict] = {}
def pub( def pub(
wrapped: typing.Callable = None, wrapped: typing.Callable = None,
*, *,
@ -175,12 +178,15 @@ def pub(
subscribers. If you are ok to have a new task running for every call subscribers. If you are ok to have a new task running for every call
to ``pub_service()`` then probably don't need this. to ``pub_service()`` then probably don't need this.
""" """
global _pub_state
# handle the decorator not called with () case # handle the decorator not called with () case
if wrapped is None: if wrapped is None:
return partial(pub, tasks=tasks) return partial(pub, tasks=tasks)
task2lock: Dict[Union[str, None], trio.StrictFIFOLock] = { task2lock: Dict[Union[str, None], trio.StrictFIFOLock] = {
None: trio.StrictFIFOLock()} None: trio.StrictFIFOLock()}
for name in tasks: for name in tasks:
task2lock[name] = trio.StrictFIFOLock() task2lock[name] = trio.StrictFIFOLock()
@ -203,11 +209,10 @@ def pub(
f"argument with a falue from {tasks}") f"argument with a falue from {tasks}")
topics = set(topics) topics = set(topics)
ss = current_actor().statespace lockmap = _pub_state.setdefault('_pubtask2lock', task2lock)
lockmap = ss.setdefault('_pubtask2lock', task2lock)
lock = lockmap[task_name] lock = lockmap[task_name]
all_subs = ss.setdefault('_subs', {}) all_subs = _pub_state.setdefault('_subs', {})
topics2ctxs = all_subs.setdefault(task_name, {}) topics2ctxs = all_subs.setdefault(task_name, {})
try: try: