Merge pull request #197 from goodboy/drop_run

Drop run
new_docs_polish
goodboy 2021-05-07 12:02:23 -04:00 committed by GitHub
commit f48548ab94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 350 additions and 256 deletions

View File

@ -145,7 +145,7 @@ and use the ``run_in_actor()`` method:
What's going on? What's going on?
- an initial *actor* is started with ``tractor.run()`` and told to execute - an initial *actor* is started with ``trio.run()`` and told to execute
its main task_: ``main()`` its main task_: ``main()``
- inside ``main()`` an actor is *spawned* using an ``ActorNusery`` and is told - inside ``main()`` an actor is *spawned* using an ``ActorNusery`` and is told
@ -182,7 +182,7 @@ Here is a similar example using the latter method:
.. literalinclude:: ../examples/actor_spawning_and_causality_with_daemon.py .. literalinclude:: ../examples/actor_spawning_and_causality_with_daemon.py
The ``rpc_module_paths`` `kwarg` above is a list of module path The ``enable_modules`` `kwarg` above is a list of module path
strings that will be loaded and made accessible for execution in the strings that will be loaded and made accessible for execution in the
remote actor through a call to ``Portal.run()``. For now this is remote actor through a call to ``Portal.run()``. For now this is
a simple mechanism to restrict the functionality of the remote a simple mechanism to restrict the functionality of the remote
@ -458,7 +458,7 @@ find an actor's socket address by name use the ``find_actor()`` function:
.. literalinclude:: ../examples/service_discovery.py .. literalinclude:: ../examples/service_discovery.py
The ``name`` value you should pass to ``find_actor()`` is the one you passed as the The ``name`` value you should pass to ``find_actor()`` is the one you passed as the
*first* argument to either ``tractor.run()`` or ``ActorNursery.start_actor()``. *first* argument to either ``trio.run()`` or ``ActorNursery.start_actor()``.
Running actors standalone Running actors standalone
@ -472,7 +472,17 @@ need to hop into a debugger. You just need to pass the existing
.. code:: python .. code:: python
tractor.run(main, arbiter_addr=('192.168.0.10', 1616)) import trio
import tractor
async def main():
async with tractor.open_root_actor(
arbiter_addr=('192.168.0.10', 1616)
):
await trio.sleep_forever()
trio.run(main)
Choosing a process spawning backend Choosing a process spawning backend
@ -480,7 +490,7 @@ Choosing a process spawning backend
``tractor`` is architected to support multiple actor (sub-process) ``tractor`` is architected to support multiple actor (sub-process)
spawning backends. Specific defaults are chosen based on your system spawning backends. Specific defaults are chosen based on your system
but you can also explicitly select a backend of choice at startup but you can also explicitly select a backend of choice at startup
via a ``start_method`` kwarg to ``tractor.run()``. via a ``start_method`` kwarg to ``tractor.open_nursery()``.
Currently the options available are: Currently the options available are:
@ -536,13 +546,14 @@ main python module of the program:
.. code:: python .. code:: python
# application/__main__.py # application/__main__.py
import trio
import tractor import tractor
import multiprocessing import multiprocessing
from . import tractor_app from . import tractor_app
if __name__ == '__main__': if __name__ == '__main__':
multiprocessing.freeze_support() multiprocessing.freeze_support()
tractor.run(tractor_app.main) trio.run(tractor_app.main)
And execute as:: And execute as::

View File

@ -16,4 +16,4 @@ if __name__ == '__main__':
# temporary dir and name it test_example.py. We import that script # temporary dir and name it test_example.py. We import that script
# module here and invoke it's ``main()``. # module here and invoke it's ``main()``.
from . import test_example from . import test_example
test_example.tractor.run(test_example.main, start_method='spawn') test_example.trio.run(test_example.main)

View File

@ -1,3 +1,4 @@
import trio
import tractor import tractor
_this_module = __name__ _this_module = __name__
@ -40,4 +41,4 @@ async def main():
if __name__ == '__main__': if __name__ == '__main__':
tractor.run(main) trio.run(main)

View File

@ -1,3 +1,4 @@
import trio
import tractor import tractor
@ -23,4 +24,4 @@ async def main():
if __name__ == '__main__': if __name__ == '__main__':
tractor.run(main) trio.run(main)

View File

@ -1,3 +1,4 @@
import trio
import tractor import tractor
@ -16,7 +17,7 @@ async def main():
portal = await n.start_actor( portal = await n.start_actor(
'frank', 'frank',
# enable the actor to run funcs from this current module # enable the actor to run funcs from this current module
rpc_module_paths=[__name__], enable_modules=[__name__],
) )
print(await portal.run(movie_theatre_question)) print(await portal.run(movie_theatre_question))
@ -30,4 +31,4 @@ async def main():
if __name__ == '__main__': if __name__ == '__main__':
tractor.run(main) trio.run(main)

View File

@ -14,12 +14,15 @@ async def stream_forever():
async def main(): async def main():
# stream for at most 1 seconds # stream for at most 1 seconds
with trio.move_on_after(1) as cancel_scope: with trio.move_on_after(1) as cancel_scope:
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
portal = await n.start_actor( portal = await n.start_actor(
f'donny', 'donny',
rpc_module_paths=[__name__], enable_modules=[__name__],
) )
# this async for loop streams values from the above # this async for loop streams values from the above
@ -34,4 +37,4 @@ async def main():
if __name__ == '__main__': if __name__ == '__main__':
tractor.run(main) trio.run(main)

View File

@ -11,7 +11,7 @@ async def breakpoint_forever():
async def name_error(): async def name_error():
"Raise a ``NameError``" "Raise a ``NameError``"
getattr(doggypants) getattr(doggypants) # noqa
async def main(): async def main():

View File

@ -4,7 +4,7 @@ import tractor
async def name_error(): async def name_error():
"Raise a ``NameError``" "Raise a ``NameError``"
getattr(doggypants) getattr(doggypants) # noqa
async def breakpoint_forever(): async def breakpoint_forever():

View File

@ -1,9 +1,10 @@
import trio
import tractor import tractor
async def name_error(): async def name_error():
"Raise a ``NameError``" "Raise a ``NameError``"
getattr(doggypants) getattr(doggypants) # noqa
async def spawn_error(): async def spawn_error():
@ -32,7 +33,9 @@ async def main():
- root actor should then fail on assert - root actor should then fail on assert
- program termination - program termination
""" """
async with tractor.open_nursery() as n: async with tractor.open_nursery(
debug_mode=True,
) as n:
# spawn both actors # spawn both actors
portal = await n.run_in_actor( portal = await n.run_in_actor(
@ -54,4 +57,4 @@ async def main():
if __name__ == '__main__': if __name__ == '__main__':
tractor.run(main, debug_mode=True) trio.run(main)

View File

@ -11,7 +11,7 @@ async def breakpoint_forever():
async def name_error(): async def name_error():
"Raise a ``NameError``" "Raise a ``NameError``"
getattr(doggypants) getattr(doggypants) # noqa
async def spawn_error(): async def spawn_error():
@ -36,7 +36,9 @@ async def main():
`-python -m tractor._child --uid ('spawn_error', '52ee14a5 ...) `-python -m tractor._child --uid ('spawn_error', '52ee14a5 ...)
`-python -m tractor._child --uid ('name_error', '3391222c ...) `-python -m tractor._child --uid ('name_error', '3391222c ...)
""" """
async with tractor.open_nursery() as n: async with tractor.open_nursery(
debug_mode=True,
) as n:
# Spawn both actors, don't bother with collecting results # Spawn both actors, don't bother with collecting results
# (would result in a different debugger outcome due to parent's # (would result in a different debugger outcome due to parent's
@ -47,4 +49,4 @@ async def main():
if __name__ == '__main__': if __name__ == '__main__':
tractor.run(main, debug_mode=True) trio.run(main)

View File

@ -4,12 +4,16 @@ import tractor
async def main(): async def main():
await trio.sleep(0.1) async with tractor.open_root_actor(
debug_mode=True,
):
await tractor.breakpoint() await trio.sleep(0.1)
await trio.sleep(0.1) await tractor.breakpoint()
await trio.sleep(0.1)
if __name__ == '__main__': if __name__ == '__main__':
tractor.run(main, debug_mode=True) trio.run(main)

View File

@ -1,11 +1,15 @@
import trio
import tractor import tractor
async def main(): async def main():
while True: async with tractor.open_root_actor(
await tractor.breakpoint() debug_mode=True,
):
while True:
await tractor.breakpoint()
if __name__ == '__main__': if __name__ == '__main__':
tractor.run(main, debug_mode=True) trio.run(main)

View File

@ -1,9 +1,13 @@
import trio
import tractor import tractor
async def main(): async def main():
assert 0 async with tractor.open_root_actor(
debug_mode=True,
):
assert 0
if __name__ == '__main__': if __name__ == '__main__':
tractor.run(main, debug_mode=True) trio.run(main)

View File

@ -1,9 +1,10 @@
import trio
import tractor import tractor
async def name_error(): async def name_error():
"Raise a ``NameError``" "Raise a ``NameError``"
getattr(doggypants) getattr(doggypants) # noqa
async def spawn_until(depth=0): async def spawn_until(depth=0):
@ -37,7 +38,10 @@ async def main():
python -m tractor._child --uid ('name_error', '6c2733b8 ...) python -m tractor._child --uid ('name_error', '6c2733b8 ...)
""" """
async with tractor.open_nursery() as n: async with tractor.open_nursery(
debug_mode=True,
loglevel='warning'
) as n:
# spawn both actors # spawn both actors
portal = await n.run_in_actor( portal = await n.run_in_actor(
@ -58,4 +62,4 @@ async def main():
if __name__ == '__main__': if __name__ == '__main__':
tractor.run(main, debug_mode=True, loglevel='warning') trio.run(main)

View File

@ -12,7 +12,9 @@ async def breakpoint_forever():
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery(
debug_mode=True,
) as n:
portal = await n.run_in_actor( portal = await n.run_in_actor(
breakpoint_forever, breakpoint_forever,
@ -21,4 +23,4 @@ async def main():
if __name__ == '__main__': if __name__ == '__main__':
tractor.run(main, debug_mode=True, loglevel='debug') trio.run(main)

View File

@ -1,3 +1,4 @@
import trio
import tractor import tractor
@ -6,11 +7,13 @@ async def name_error():
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery(
debug_mode=True,
) as n:
portal = await n.run_in_actor(name_error) portal = await n.run_in_actor(name_error)
await portal.result() await portal.result()
if __name__ == '__main__': if __name__ == '__main__':
tractor.run(main, debug_mode=True) trio.run(main)

View File

@ -68,10 +68,11 @@ async def aggregate(seed):
# this is the main actor and *arbiter* # this is the main actor and *arbiter*
async def main(): async def main():
# a nursery which spawns "actors" # a nursery which spawns "actors"
async with tractor.open_nursery() as nursery: async with tractor.open_nursery(
arbiter_addr=('127.0.0.1', 1616)
) as nursery:
seed = int(1e3) seed = int(1e3)
import time
pre_start = time.time() pre_start = time.time()
portal = await nursery.start_actor( portal = await nursery.start_actor(
@ -100,4 +101,4 @@ async def main():
if __name__ == '__main__': if __name__ == '__main__':
final_stream = tractor.run(main, arbiter_addr=('127.0.0.1', 1616)) final_stream = trio.run(main)

View File

@ -10,6 +10,7 @@ PRIMES = [
115797848077099, 115797848077099,
1099726899285419] 1099726899285419]
def is_prime(n): def is_prime(n):
if n < 2: if n < 2:
return False return False
@ -24,6 +25,7 @@ def is_prime(n):
return False return False
return True return True
def main(): def main():
with concurrent.futures.ProcessPoolExecutor() as executor: with concurrent.futures.ProcessPoolExecutor() as executor:
start = time.time() start = time.time()
@ -33,6 +35,7 @@ def main():
print(f'processing took {time.time() - start} seconds') print(f'processing took {time.time() - start} seconds')
if __name__ == '__main__': if __name__ == '__main__':
start = time.time() start = time.time()

View File

@ -4,7 +4,7 @@ Demonstration of the prime number detector example from the
https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example
This uses no extra threads, fancy semaphores or futures; all we need This uses no extra threads, fancy semaphores or futures; all we need
is ``tractor``'s channels. is ``tractor``'s channels.
""" """

View File

@ -1,3 +1,4 @@
import trio
import tractor import tractor
@ -11,7 +12,7 @@ async def main():
for i in range(3): for i in range(3):
real_actors.append(await n.start_actor( real_actors.append(await n.start_actor(
f'actor_{i}', f'actor_{i}',
rpc_module_paths=[__name__], enable_modules=[__name__],
)) ))
# start one actor that will fail immediately # start one actor that will fail immediately
@ -24,6 +25,6 @@ async def main():
if __name__ == '__main__': if __name__ == '__main__':
try: try:
# also raises # also raises
tractor.run(main) trio.run(main)
except tractor.RemoteActorError: except tractor.RemoteActorError:
print("Look Maa that actor failed hard, hehhh!") print("Look Maa that actor failed hard, hehhh!")

View File

@ -1,7 +1,9 @@
import trio
import tractor import tractor
tractor.log.get_console_log("INFO") tractor.log.get_console_log("INFO")
async def main(service_name): async def main(service_name):
async with tractor.open_nursery() as an: async with tractor.open_nursery() as an:
@ -17,4 +19,4 @@ async def main(service_name):
if __name__ == '__main__': if __name__ == '__main__':
tractor.run(main, 'some_actor_name') trio.run(main, 'some_actor_name')

View File

@ -47,7 +47,9 @@ def test_remote_error(arb_addr, args_err):
args, errtype = args_err args, errtype = args_err
async def main(): async def main():
async with tractor.open_nursery() as nursery: async with tractor.open_nursery(
arbiter_addr=arb_addr,
) as nursery:
portal = await nursery.run_in_actor( portal = await nursery.run_in_actor(
assert_err, name='errorer', **args assert_err, name='errorer', **args
@ -62,7 +64,7 @@ def test_remote_error(arb_addr, args_err):
raise raise
with pytest.raises(tractor.RemoteActorError) as excinfo: with pytest.raises(tractor.RemoteActorError) as excinfo:
tractor.run(main, arbiter_addr=arb_addr) trio.run(main)
# ensure boxed error is correct # ensure boxed error is correct
assert excinfo.value.type == errtype assert excinfo.value.type == errtype
@ -73,7 +75,9 @@ def test_multierror(arb_addr):
more then one actor errors. more then one actor errors.
""" """
async def main(): async def main():
async with tractor.open_nursery() as nursery: async with tractor.open_nursery(
arbiter_addr=arb_addr,
) as nursery:
await nursery.run_in_actor(assert_err, name='errorer1') await nursery.run_in_actor(assert_err, name='errorer1')
portal2 = await nursery.run_in_actor(assert_err, name='errorer2') portal2 = await nursery.run_in_actor(assert_err, name='errorer2')
@ -90,7 +94,7 @@ def test_multierror(arb_addr):
# from both subactors # from both subactors
with pytest.raises(trio.MultiError): with pytest.raises(trio.MultiError):
tractor.run(main, arbiter_addr=arb_addr) trio.run(main)
@pytest.mark.parametrize('delay', (0, 0.5)) @pytest.mark.parametrize('delay', (0, 0.5))
@ -103,7 +107,10 @@ def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
to test failure during an ongoing spawning. to test failure during an ongoing spawning.
""" """
async def main(): async def main():
async with tractor.open_nursery() as nursery: async with tractor.open_nursery(
arbiter_addr=arb_addr,
) as nursery:
for i in range(num_subactors): for i in range(num_subactors):
await nursery.run_in_actor( await nursery.run_in_actor(
assert_err, assert_err,
@ -112,7 +119,7 @@ def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
) )
with pytest.raises(trio.MultiError) as exc_info: with pytest.raises(trio.MultiError) as exc_info:
tractor.run(main, arbiter_addr=arb_addr) trio.run(main)
assert exc_info.type == tractor.MultiError assert exc_info.type == tractor.MultiError
err = exc_info.value err = exc_info.value
@ -134,10 +141,12 @@ def test_cancel_single_subactor(arb_addr, mechanism):
async def spawn_actor(): async def spawn_actor():
"""Spawn an actor that blocks indefinitely. """Spawn an actor that blocks indefinitely.
""" """
async with tractor.open_nursery() as nursery: async with tractor.open_nursery(
arbiter_addr=arb_addr,
) as nursery:
portal = await nursery.start_actor( portal = await nursery.start_actor(
'nothin', rpc_module_paths=[__name__], 'nothin', enable_modules=[__name__],
) )
assert (await portal.run(do_nothing)) is None assert (await portal.run(do_nothing)) is None
@ -148,10 +157,10 @@ def test_cancel_single_subactor(arb_addr, mechanism):
raise mechanism raise mechanism
if mechanism == 'nursery_cancel': if mechanism == 'nursery_cancel':
tractor.run(spawn_actor, arbiter_addr=arb_addr) trio.run(spawn_actor)
else: else:
with pytest.raises(mechanism): with pytest.raises(mechanism):
tractor.run(spawn_actor, arbiter_addr=arb_addr) trio.run(spawn_actor)
async def stream_forever(): async def stream_forever():
@ -229,7 +238,7 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
for i in range(num_actors): for i in range(num_actors):
dactor_portals.append(await n.start_actor( dactor_portals.append(await n.start_actor(
f'deamon_{i}', f'deamon_{i}',
rpc_module_paths=[__name__], enable_modules=[__name__],
)) ))
func, kwargs = ria_func func, kwargs = ria_func
@ -395,7 +404,7 @@ def test_cancel_via_SIGINT(
await trio.sleep_forever() await trio.sleep_forever()
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
tractor.run(main) trio.run(main)
@no_windows @no_windows
@ -430,8 +439,7 @@ def test_cancel_via_SIGINT_other_task(
os.kill(pid, signal.SIGINT) os.kill(pid, signal.SIGINT)
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
tractor.run(main) trio.run(main)
async def spin_for(period=3): async def spin_for(period=3):
"Sync sleep." "Sync sleep."
@ -470,4 +478,4 @@ def test_cancel_while_childs_child_in_sync_sleep(
assert 0 assert 0
with pytest.raises(AssertionError): with pytest.raises(AssertionError):
tractor.run(main) trio.run(main)

View File

@ -20,8 +20,11 @@ async def test_reg_then_unreg(arb_addr):
assert actor.is_arbiter assert actor.is_arbiter
assert len(actor._registry) == 1 # only self is registered assert len(actor._registry) == 1 # only self is registered
async with tractor.open_nursery() as n: async with tractor.open_nursery(
portal = await n.start_actor('actor', rpc_module_paths=[__name__]) arbiter_addr=arb_addr,
) as n:
portal = await n.start_actor('actor', enable_modules=[__name__])
uid = portal.channel.uid uid = portal.channel.uid
async with tractor.get_arbiter(*arb_addr) as aportal: async with tractor.get_arbiter(*arb_addr) as aportal:
@ -66,7 +69,7 @@ async def say_hello_use_wait(other_actor):
@tractor_test @tractor_test
@pytest.mark.parametrize('func', [say_hello, say_hello_use_wait]) @pytest.mark.parametrize('func', [say_hello, say_hello_use_wait])
async def test_trynamic_trio(func, start_method): async def test_trynamic_trio(func, start_method, arb_addr):
"""Main tractor entry point, the "master" process (for now """Main tractor entry point, the "master" process (for now
acts as the "director"). acts as the "director").
""" """
@ -119,74 +122,78 @@ async def spawn_and_check_registry(
remote_arbiter: bool = False, remote_arbiter: bool = False,
with_streaming: bool = False, with_streaming: bool = False,
) -> None: ) -> None:
actor = tractor.current_actor()
if remote_arbiter: async with tractor.open_root_actor(
assert not actor.is_arbiter arbiter_addr=arb_addr,
):
async with tractor.get_arbiter(*arb_addr) as portal:
# runtime needs to be up to call this
actor = tractor.current_actor()
async with tractor.get_arbiter(*arb_addr) as portal: if remote_arbiter:
assert not actor.is_arbiter
if actor.is_arbiter: if actor.is_arbiter:
async def get_reg(): async def get_reg():
return actor._registry return actor._registry
extra = 1 # arbiter is local root actor extra = 1 # arbiter is local root actor
else: else:
get_reg = partial(portal.run_from_ns, 'self', 'get_registry') get_reg = partial(portal.run_from_ns, 'self', 'get_registry')
extra = 2 # local root actor + remote arbiter extra = 2 # local root actor + remote arbiter
# ensure current actor is registered # ensure current actor is registered
registry = await get_reg() registry = await get_reg()
assert actor.uid in registry assert actor.uid in registry
try: try:
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
async with trio.open_nursery() as trion: async with trio.open_nursery() as trion:
portals = {}
for i in range(3):
name = f'a{i}'
if with_streaming:
portals[name] = await n.start_actor(
name=name, enable_modules=[__name__])
else: # no streaming
portals[name] = await n.run_in_actor(
trio.sleep_forever, name=name)
# wait on last actor to come up
async with tractor.wait_for_actor(name):
registry = await get_reg()
for uid in n._children:
assert uid in registry
assert len(portals) + extra == len(registry)
portals = {}
for i in range(3):
name = f'a{i}'
if with_streaming: if with_streaming:
portals[name] = await n.start_actor( await trio.sleep(0.1)
name=name, enable_modules=[__name__])
else: # no streaming pts = list(portals.values())
portals[name] = await n.run_in_actor( for p in pts[:-1]:
trio.sleep_forever, name=name) trion.start_soon(stream_from, p)
# wait on last actor to come up # stream for 1 sec
async with tractor.wait_for_actor(name): trion.start_soon(cancel, use_signal, 1)
registry = await get_reg()
for uid in n._children:
assert uid in registry
assert len(portals) + extra == len(registry) last_p = pts[-1]
await stream_from(last_p)
if with_streaming: else:
await trio.sleep(0.1) await cancel(use_signal)
pts = list(portals.values()) finally:
for p in pts[:-1]: with trio.CancelScope(shield=True):
trion.start_soon(stream_from, p) await trio.sleep(0.5)
# stream for 1 sec # all subactors should have de-registered
trion.start_soon(cancel, use_signal, 1) registry = await get_reg()
assert len(registry) == extra
last_p = pts[-1] assert actor.uid in registry
await stream_from(last_p)
else:
await cancel(use_signal)
finally:
with trio.CancelScope(shield=True):
await trio.sleep(0.5)
# all subactors should have de-registered
registry = await get_reg()
assert len(registry) == extra
assert actor.uid in registry
@pytest.mark.parametrize('use_signal', [False, True]) @pytest.mark.parametrize('use_signal', [False, True])
@ -201,7 +208,7 @@ def test_subactors_unregister_on_cancel(
deregistering themselves with the arbiter. deregistering themselves with the arbiter.
""" """
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
tractor.run( trio.run(
partial( partial(
spawn_and_check_registry, spawn_and_check_registry,
arb_addr, arb_addr,
@ -209,7 +216,6 @@ def test_subactors_unregister_on_cancel(
remote_arbiter=False, remote_arbiter=False,
with_streaming=with_streaming, with_streaming=with_streaming,
), ),
arbiter_addr=arb_addr
) )
@ -227,7 +233,7 @@ def test_subactors_unregister_on_cancel_remote_daemon(
tree) arbiter. tree) arbiter.
""" """
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
tractor.run( trio.run(
partial( partial(
spawn_and_check_registry, spawn_and_check_registry,
arb_addr, arb_addr,
@ -235,8 +241,6 @@ def test_subactors_unregister_on_cancel_remote_daemon(
remote_arbiter=True, remote_arbiter=True,
with_streaming=with_streaming, with_streaming=with_streaming,
), ),
# XXX: required to use remote daemon!
arbiter_addr=arb_addr
) )
@ -258,49 +262,52 @@ async def close_chans_before_nursery(
else: else:
entries_at_end = 1 entries_at_end = 1
async with tractor.get_arbiter(*arb_addr) as aportal: async with tractor.open_root_actor(
try: arbiter_addr=arb_addr,
get_reg = partial(aportal.run_from_ns, 'self', 'get_registry') ):
async with tractor.get_arbiter(*arb_addr) as aportal:
try:
get_reg = partial(aportal.run_from_ns, 'self', 'get_registry')
async with tractor.open_nursery() as tn: async with tractor.open_nursery() as tn:
portal1 = await tn.start_actor( portal1 = await tn.start_actor(
name='consumer1', enable_modules=[__name__]) name='consumer1', enable_modules=[__name__])
portal2 = await tn.start_actor( portal2 = await tn.start_actor(
'consumer2', enable_modules=[__name__]) 'consumer2', enable_modules=[__name__])
# TODO: compact this back as was in last commit once # TODO: compact this back as was in last commit once
# 3.9+, see https://github.com/goodboy/tractor/issues/207 # 3.9+, see https://github.com/goodboy/tractor/issues/207
async with portal1.open_stream_from(stream_forever) as agen1: async with portal1.open_stream_from(stream_forever) as agen1:
async with portal2.open_stream_from( async with portal2.open_stream_from(
stream_forever stream_forever
) as agen2: ) as agen2:
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
n.start_soon(streamer, agen1) n.start_soon(streamer, agen1)
n.start_soon(cancel, use_signal, .5) n.start_soon(cancel, use_signal, .5)
try: try:
await streamer(agen2) await streamer(agen2)
finally: finally:
# Kill the root nursery thus resulting in # Kill the root nursery thus resulting in
# normal arbiter channel ops to fail during # normal arbiter channel ops to fail during
# teardown. It doesn't seem like this is # teardown. It doesn't seem like this is
# reliably triggered by an external SIGINT. # reliably triggered by an external SIGINT.
# tractor.current_actor()._root_nursery.cancel_scope.cancel() # tractor.current_actor()._root_nursery.cancel_scope.cancel()
# XXX: THIS IS THE KEY THING that happens # XXX: THIS IS THE KEY THING that happens
# **before** exiting the actor nursery block # **before** exiting the actor nursery block
# also kill off channels cuz why not # also kill off channels cuz why not
await agen1.aclose() await agen1.aclose()
await agen2.aclose() await agen2.aclose()
finally: finally:
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await trio.sleep(1) await trio.sleep(1)
# all subactors should have de-registered # all subactors should have de-registered
registry = await get_reg() registry = await get_reg()
assert portal1.channel.uid not in registry assert portal1.channel.uid not in registry
assert portal2.channel.uid not in registry assert portal2.channel.uid not in registry
assert len(registry) == entries_at_end assert len(registry) == entries_at_end
@pytest.mark.parametrize('use_signal', [False, True]) @pytest.mark.parametrize('use_signal', [False, True])
@ -314,15 +321,13 @@ def test_close_channel_explicit(
results in subactor(s) deregistering from the arbiter. results in subactor(s) deregistering from the arbiter.
""" """
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
tractor.run( trio.run(
partial( partial(
close_chans_before_nursery, close_chans_before_nursery,
arb_addr, arb_addr,
use_signal, use_signal,
remote_arbiter=False, remote_arbiter=False,
), ),
# XXX: required to use remote daemon!
arbiter_addr=arb_addr
) )
@ -338,13 +343,11 @@ def test_close_channel_explicit_remote_arbiter(
results in subactor(s) deregistering from the arbiter. results in subactor(s) deregistering from the arbiter.
""" """
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
tractor.run( trio.run(
partial( partial(
close_chans_before_nursery, close_chans_before_nursery,
arb_addr, arb_addr,
use_signal, use_signal,
remote_arbiter=True, remote_arbiter=True,
), ),
# XXX: required to use remote daemon!
arbiter_addr=arb_addr
) )

View File

@ -15,8 +15,8 @@ async def test_no_arbitter():
"""An arbitter must be established before any nurseries """An arbitter must be established before any nurseries
can be created. can be created.
(In other words ``tractor.run`` must be used instead of ``trio.run`` as is (In other words ``tractor.open_root_actor()`` must be engaged at
done by the ``pytest-trio`` plugin.) some point?)
""" """
with pytest.raises(RuntimeError): with pytest.raises(RuntimeError):
with tractor.open_nursery(): with tractor.open_nursery():
@ -49,7 +49,8 @@ async def test_self_is_registered_localportal(arb_addr):
assert isinstance(portal, tractor._portal.LocalPortal) assert isinstance(portal, tractor._portal.LocalPortal)
with trio.fail_after(0.2): with trio.fail_after(0.2):
sockaddr = await portal.run_from_ns('self', 'wait_for_actor', name='root') sockaddr = await portal.run_from_ns(
'self', 'wait_for_actor', name='root')
assert sockaddr[0] == arb_addr assert sockaddr[0] == arb_addr
@ -59,15 +60,19 @@ def test_local_actor_async_func(arb_addr):
nums = [] nums = []
async def print_loop(): async def print_loop():
# arbiter is started in-proc if dne
assert tractor.current_actor().is_arbiter
for i in range(10): async with tractor.open_root_actor(
nums.append(i) arbiter_addr=arb_addr,
await trio.sleep(0.1) ):
# arbiter is started in-proc if dne
assert tractor.current_actor().is_arbiter
for i in range(10):
nums.append(i)
await trio.sleep(0.1)
start = time.time() start = time.time()
tractor.run(print_loop, arbiter_addr=arb_addr) trio.run(print_loop)
# ensure the sleeps were actually awaited # ensure the sleeps were actually awaited
assert time.time() - start >= 1 assert time.time() - start >= 1

View File

@ -1,10 +1,11 @@
""" """
Multiple python programs invoking ``tractor.run()`` Multiple python programs invoking the runtime.
""" """
import platform import platform
import time import time
import pytest import pytest
import trio
import tractor import tractor
from conftest import ( from conftest import (
tractor_test, tractor_test,
@ -45,8 +46,13 @@ async def test_cancel_remote_arbiter(daemon, arb_addr):
def test_register_duplicate_name(daemon, arb_addr): def test_register_duplicate_name(daemon, arb_addr):
async def main(): async def main():
assert not tractor.current_actor().is_arbiter
async with tractor.open_nursery() as n: async with tractor.open_nursery(
arbiter_addr=arb_addr,
) as n:
assert not tractor.current_actor().is_arbiter
p1 = await n.start_actor('doggy') p1 = await n.start_actor('doggy')
p2 = await n.start_actor('doggy') p2 = await n.start_actor('doggy')
@ -57,4 +63,4 @@ def test_register_duplicate_name(daemon, arb_addr):
# run it manually since we want to start **after** # run it manually since we want to start **after**
# the other "daemon" program # the other "daemon" program
tractor.run(main, arbiter_addr=arb_addr) trio.run(main)

View File

@ -46,8 +46,9 @@ async def pubber(get_topics, seed=10):
async def subs( async def subs(
which, pub_actor_name, seed=10, which,
portal=None, pub_actor_name,
seed=10,
task_status=trio.TASK_STATUS_IGNORED, task_status=trio.TASK_STATUS_IGNORED,
): ):
if len(which) == 1: if len(which) == 1:
@ -61,7 +62,9 @@ async def subs(
return isinstance(i, int) return isinstance(i, int)
# TODO: https://github.com/goodboy/tractor/issues/207 # TODO: https://github.com/goodboy/tractor/issues/207
async with tractor.find_actor(pub_actor_name) as portal: async with tractor.wait_for_actor(pub_actor_name) as portal:
assert portal
async with portal.open_stream_from( async with portal.open_stream_from(
pubber, pubber,
topics=which, topics=which,
@ -164,7 +167,10 @@ def test_multi_actor_subs_arbiter_pub(
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery(
arbiter_addr=arb_addr,
enable_modules=[__name__],
) as n:
name = 'root' name = 'root'
@ -172,7 +178,7 @@ def test_multi_actor_subs_arbiter_pub(
# start the publisher as a daemon # start the publisher as a daemon
master_portal = await n.start_actor( master_portal = await n.start_actor(
'streamer', 'streamer',
rpc_module_paths=[__name__], enable_modules=[__name__],
) )
even_portal = await n.run_in_actor( even_portal = await n.run_in_actor(
@ -232,7 +238,6 @@ def test_multi_actor_subs_arbiter_pub(
assert 'even' not in get_topics() assert 'even' not in get_topics()
await odd_portal.cancel_actor() await odd_portal.cancel_actor()
await trio.sleep(2)
if pub_actor == 'arbiter': if pub_actor == 'arbiter':
while get_topics(): while get_topics():
@ -242,11 +247,7 @@ def test_multi_actor_subs_arbiter_pub(
else: else:
await master_portal.cancel_actor() await master_portal.cancel_actor()
tractor.run( trio.run(main)
main,
arbiter_addr=arb_addr,
rpc_module_paths=[__name__],
)
def test_single_subactor_pub_multitask_subs( def test_single_subactor_pub_multitask_subs(
@ -255,11 +256,14 @@ def test_single_subactor_pub_multitask_subs(
): ):
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery(
arbiter_addr=arb_addr,
enable_modules=[__name__],
) as n:
portal = await n.start_actor( portal = await n.start_actor(
'streamer', 'streamer',
rpc_module_paths=[__name__], enable_modules=[__name__],
) )
async with tractor.wait_for_actor('streamer'): async with tractor.wait_for_actor('streamer'):
# block until 2nd actor is initialized # block until 2nd actor is initialized
@ -283,8 +287,4 @@ def test_single_subactor_pub_multitask_subs(
await portal.cancel_actor() await portal.cancel_actor()
tractor.run( trio.run(main)
main,
arbiter_addr=arb_addr,
rpc_module_paths=[__name__],
)

View File

@ -74,11 +74,15 @@ def test_rpc_errors(arb_addr, to_call, testdir):
remote_err = inside_err remote_err = inside_err
async def main(): async def main():
actor = tractor.current_actor()
assert actor.is_arbiter
# spawn a subactor which calls us back # spawn a subactor which calls us back
async with tractor.open_nursery() as n: async with tractor.open_nursery(
arbiter_addr=arb_addr,
enable_modules=exposed_mods.copy(),
) as n:
actor = tractor.current_actor()
assert actor.is_arbiter
await n.run_in_actor( await n.run_in_actor(
sleep_back_actor, sleep_back_actor,
actor_name=subactor_requests_to, actor_name=subactor_requests_to,
@ -90,15 +94,11 @@ def test_rpc_errors(arb_addr, to_call, testdir):
func_name=funcname, func_name=funcname,
exposed_mods=exposed_mods, exposed_mods=exposed_mods,
func_defined=True if func_defined else False, func_defined=True if func_defined else False,
rpc_module_paths=subactor_exposed_mods, enable_modules=subactor_exposed_mods,
) )
def run(): def run():
tractor.run( trio.run(main)
main,
arbiter_addr=arb_addr,
rpc_module_paths=exposed_mods.copy(),
)
# handle both parameterized cases # handle both parameterized cases
if exposed_mods and func_defined: if exposed_mods and func_defined:

View File

@ -1,7 +1,6 @@
""" """
Spawning basics Spawning basics
""" """
from functools import partial
import pytest import pytest
import trio import trio
@ -12,41 +11,50 @@ from conftest import tractor_test
data_to_pass_down = {'doggy': 10, 'kitty': 4} data_to_pass_down = {'doggy': 10, 'kitty': 4}
async def spawn(is_arbiter, data): async def spawn(is_arbiter, data, arb_addr):
namespaces = [__name__] namespaces = [__name__]
await trio.sleep(0.1) await trio.sleep(0.1)
actor = tractor.current_actor()
assert actor.is_arbiter == is_arbiter
data == data_to_pass_down
if actor.is_arbiter: async with tractor.open_root_actor(
async with tractor.open_nursery() as nursery: arbiter_addr=arb_addr,
# forks here ):
portal = await nursery.run_in_actor(
spawn,
is_arbiter=False,
name='sub-actor',
data=data,
rpc_module_paths=namespaces,
)
assert len(nursery._children) == 1 actor = tractor.current_actor()
assert portal.channel.uid in tractor.current_actor()._peers assert actor.is_arbiter == is_arbiter
# be sure we can still get the result data = data_to_pass_down
result = await portal.result()
assert result == 10 if actor.is_arbiter:
return result
else: async with tractor.open_nursery(
return 10 ) as nursery:
# forks here
portal = await nursery.run_in_actor(
spawn,
is_arbiter=False,
name='sub-actor',
data=data,
arb_addr=arb_addr,
enable_modules=namespaces,
)
assert len(nursery._children) == 1
assert portal.channel.uid in tractor.current_actor()._peers
# be sure we can still get the result
result = await portal.result()
assert result == 10
return result
else:
return 10
def test_local_arbiter_subactor_global_state(arb_addr): def test_local_arbiter_subactor_global_state(arb_addr):
result = tractor.run( result = trio.run(
partial(spawn, data=data_to_pass_down), spawn,
True, True,
name='arbiter', data_to_pass_down,
arbiter_addr=arb_addr, arb_addr,
) )
assert result == 10 assert result == 10
@ -67,7 +75,7 @@ async def test_movie_theatre_convo(start_method):
portal = await n.start_actor( portal = await n.start_actor(
'frank', 'frank',
# enable the actor to run funcs from this current module # enable the actor to run funcs from this current module
rpc_module_paths=[__name__], enable_modules=[__name__],
) )
print(await portal.run(movie_theatre_question)) print(await portal.run(movie_theatre_question))
@ -121,19 +129,20 @@ def test_loglevel_propagated_to_subactor(
level = 'critical' level = 'critical'
async def main(): async def main():
async with tractor.open_nursery() as tn: async with tractor.open_nursery(
name='arbiter',
loglevel=level,
start_method=start_method,
arbiter_addr=arb_addr,
) as tn:
await tn.run_in_actor( await tn.run_in_actor(
check_loglevel, check_loglevel,
level=level, level=level,
) )
tractor.run( trio.run(main)
main,
name='arbiter',
loglevel=level,
start_method=start_method,
arbiter_addr=arb_addr,
)
# ensure subactor spits log message on stderr # ensure subactor spits log message on stderr
captured = capfd.readouterr() captured = capfd.readouterr()
assert 'yoyoyo' in captured.err assert 'yoyoyo' in captured.err

View File

@ -50,14 +50,24 @@ async def context_stream(ctx, sequence):
assert cs.cancelled_caught assert cs.cancelled_caught
async def stream_from_single_subactor(stream_func): async def stream_from_single_subactor(
arb_addr,
start_method,
stream_func,
):
"""Verify we can spawn a daemon actor and retrieve streamed data. """Verify we can spawn a daemon actor and retrieve streamed data.
""" """
async with tractor.find_actor('streamerd') as portals: # only one per host address, spawns an actor if None
async with tractor.open_nursery(
arbiter_addr=arb_addr,
start_method=start_method,
) as nursery:
async with tractor.find_actor('streamerd') as portals:
if not portals:
if not portals:
# only one per host address, spawns an actor if None
async with tractor.open_nursery() as nursery:
# no brokerd actor found # no brokerd actor found
portal = await nursery.start_actor( portal = await nursery.start_actor(
'streamerd', 'streamerd',
@ -101,13 +111,13 @@ async def stream_from_single_subactor(stream_func):
def test_stream_from_single_subactor(arb_addr, start_method, stream_func): def test_stream_from_single_subactor(arb_addr, start_method, stream_func):
"""Verify streaming from a spawned async generator. """Verify streaming from a spawned async generator.
""" """
tractor.run( trio.run(
partial( partial(
stream_from_single_subactor, stream_from_single_subactor,
arb_addr,
start_method,
stream_func=stream_func, stream_func=stream_func,
), ),
arbiter_addr=arb_addr,
start_method=start_method,
) )
@ -208,9 +218,10 @@ async def a_quadruple_example():
return result_stream return result_stream
async def cancel_after(wait): async def cancel_after(wait, arb_addr):
with trio.move_on_after(wait): async with tractor.open_root_actor(arbiter_addr=arb_addr):
return await a_quadruple_example() with trio.move_on_after(wait):
return await a_quadruple_example()
@pytest.fixture(scope='module') @pytest.fixture(scope='module')
@ -222,7 +233,7 @@ def time_quad_ex(arb_addr, ci_env, spawn_backend):
timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4 timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4
start = time.time() start = time.time()
results = tractor.run(cancel_after, timeout, arbiter_addr=arb_addr) results = trio.run(cancel_after, timeout, arb_addr)
diff = time.time() - start diff = time.time() - start
assert results assert results
return results, diff return results, diff
@ -249,7 +260,7 @@ def test_not_fast_enough_quad(
""" """
results, diff = time_quad_ex results, diff = time_quad_ex
delay = max(diff - cancel_delay, 0) delay = max(diff - cancel_delay, 0)
results = tractor.run(cancel_after, delay, arbiter_addr=arb_addr) results = trio.run(cancel_after, delay, arb_addr)
system = platform.system() system = platform.system()
if system in ('Windows', 'Darwin') and results is not None: if system in ('Windows', 'Darwin') and results is not None:
# In CI envoirments it seems later runs are quicker then the first # In CI envoirments it seems later runs are quicker then the first

View File

@ -113,6 +113,7 @@ class ActorNursery:
name: Optional[str] = None, name: Optional[str] = None,
bind_addr: Tuple[str, int] = _default_bind_addr, bind_addr: Tuple[str, int] = _default_bind_addr,
rpc_module_paths: Optional[List[str]] = None, rpc_module_paths: Optional[List[str]] = None,
enable_modules: List[str] = None,
loglevel: str = None, # set log level per subactor loglevel: str = None, # set log level per subactor
**kwargs, # explicit args to ``fn`` **kwargs, # explicit args to ``fn``
) -> Portal: ) -> Portal:
@ -131,7 +132,9 @@ class ActorNursery:
portal = await self.start_actor( portal = await self.start_actor(
name, name,
rpc_module_paths=[mod_path] + (rpc_module_paths or []), enable_modules=[mod_path] + (
enable_modules or rpc_module_paths or []
),
bind_addr=bind_addr, bind_addr=bind_addr,
loglevel=loglevel, loglevel=loglevel,
# use the run_in_actor nursery # use the run_in_actor nursery
@ -366,7 +369,6 @@ async def open_nursery(
async with _open_and_supervise_one_cancels_all_nursery( async with _open_and_supervise_one_cancels_all_nursery(
actor actor
) as anursery: ) as anursery:
yield anursery yield anursery
else: # sub-nursery case else: # sub-nursery case