forked from goodboy/tractor
1
0
Fork 0

Merge pull request #102 from goodboy/example_tests

Test docs examples as scripts
fix_win_ci_again
goodboy 2020-02-10 11:13:32 -06:00 committed by GitHub
commit 3b3d563ac9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 361 additions and 19 deletions

View File

@ -108,7 +108,6 @@ the hip new film we're shooting:
.. code:: python
import tractor
from functools import partial
_this_module = __name__
the_line = 'Hi my name is {}'

View File

@ -0,0 +1,19 @@
"""
Needed on Windows.
This module is needed as the program entry point for invocation
with ``python -m <modulename>``. See the solution from @chrizzFTD
here:
https://github.com/goodboy/tractor/pull/61#issuecomment-470053512
"""
if __name__ == '__main__':
import multiprocessing
multiprocessing.freeze_support()
# ``tests/test_docs_examples.py::test_example`` will copy each
# script from this examples directory into a module in a new
# temporary dir and name it test_example.py. We import that script
# module here and invoke it's ``main()``.
from . import test_example
test_example.tractor.run(test_example.main, start_method='spawn')

View File

@ -0,0 +1,44 @@
import platform
import tractor
_this_module = __name__
the_line = 'Hi my name is {}'
tractor.log.get_console_log("INFO")
async def hi():
return the_line.format(tractor.current_actor().name)
async def say_hello(other_actor):
async with tractor.wait_for_actor(other_actor) as portal:
return await portal.run(_this_module, 'hi')
async def main():
"""Main tractor entry point, the "master" process (for now
acts as the "director").
"""
async with tractor.open_nursery() as n:
print("Alright... Action!")
donny = await n.run_in_actor(
'donny',
say_hello,
# arguments are always named
other_actor='gretchen',
)
gretchen = await n.run_in_actor(
'gretchen',
say_hello,
other_actor='donny',
)
print(await gretchen.result())
print(await donny.result())
print("CUTTTT CUUTT CUT!!! Donny!! You're supposed to say...")
if __name__ == '__main__':
tractor.run(main)

View File

@ -0,0 +1,22 @@
import tractor
def cellar_door():
return "Dang that's beautiful"
async def main():
"""The main ``tractor`` routine.
"""
async with tractor.open_nursery() as n:
portal = await n.run_in_actor('some_linguist', cellar_door)
# The ``async with`` will unblock here since the 'some_linguist'
# actor has completed its main task ``cellar_door``.
print(await portal.result())
if __name__ == '__main__':
tractor.run(main)

View File

@ -0,0 +1,36 @@
from itertools import repeat
import trio
import tractor
tractor.log.get_console_log("INFO")
async def stream_forever():
for i in repeat("I can see these little future bubble things"):
# each yielded value is sent over the ``Channel`` to the
# parent actor
yield i
await trio.sleep(0.01)
async def main():
# stream for at most 1 seconds
with trio.move_on_after(1) as cancel_scope:
async with tractor.open_nursery() as n:
portal = await n.start_actor(
f'donny',
rpc_module_paths=[__name__],
)
# this async for loop streams values from the above
# async generator running in a separate process
async for letter in await portal.run(__name__, 'stream_forever'):
print(letter)
# we support trio's cancellation system
assert cancel_scope.cancelled_caught
assert n.cancelled
if __name__ == '__main__':
tractor.run(main, start_method='forkserver')

View File

@ -0,0 +1,96 @@
import time
import trio
import tractor
# this is the first 2 actors, streamer_1 and streamer_2
async def stream_data(seed):
for i in range(seed):
yield i
await trio.sleep(0) # trigger scheduler
# this is the third actor; the aggregator
async def aggregate(seed):
"""Ensure that the two streams we receive match but only stream
a single set of values to the parent.
"""
async with tractor.open_nursery() as nursery:
portals = []
for i in range(1, 3):
# fork point
portal = await nursery.start_actor(
name=f'streamer_{i}',
rpc_module_paths=[__name__],
)
portals.append(portal)
send_chan, recv_chan = trio.open_memory_channel(500)
async def push_to_chan(portal, send_chan):
async with send_chan:
async for value in await portal.run(
__name__, 'stream_data', seed=seed
):
# leverage trio's built-in backpressure
await send_chan.send(value)
print(f"FINISHED ITERATING {portal.channel.uid}")
# spawn 2 trio tasks to collect streams and push to a local queue
async with trio.open_nursery() as n:
for portal in portals:
n.start_soon(push_to_chan, portal, send_chan.clone())
# close this local task's reference to send side
await send_chan.aclose()
unique_vals = set()
async with recv_chan:
async for value in recv_chan:
if value not in unique_vals:
unique_vals.add(value)
# yield upwards to the spawning parent actor
yield value
assert value in unique_vals
print("FINISHED ITERATING in aggregator")
await nursery.cancel()
print("WAITING on `ActorNursery` to finish")
print("AGGREGATOR COMPLETE!")
# this is the main actor and *arbiter*
async def main():
# a nursery which spawns "actors"
async with tractor.open_nursery() as nursery:
seed = int(1e3)
import time
pre_start = time.time()
portal = await nursery.run_in_actor(
'aggregator',
aggregate,
seed=seed,
)
start = time.time()
# the portal call returns exactly what you'd expect
# as if the remote "aggregate" function was called locally
result_stream = []
async for value in await portal.result():
result_stream.append(value)
print(f"STREAM TIME = {time.time() - start}")
print(f"STREAM + SPAWN TIME = {time.time() - pre_start}")
assert result_stream == list(range(seed))
return result_stream
if __name__ == '__main__':
final_stream = tractor.run(main, arbiter_addr=('127.0.0.1', 1616))

View File

@ -0,0 +1,20 @@
import tractor
tractor.log.get_console_log("INFO")
async def main(service_name):
async with tractor.open_nursery() as an:
await an.start_actor(service_name)
async with tractor.get_arbiter('127.0.0.1', 1616) as portal:
print(f"Arbiter is listening on {portal.channel}")
async with tractor.wait_for_actor(service_name) as sockaddr:
print(f"my_service is found at {sockaddr}")
await an.cancel()
if __name__ == '__main__':
tractor.run(main, 'some_actor_name')

View File

@ -0,0 +1,105 @@
"""
Let's make sure them docs work yah?
"""
from contextlib import contextmanager
import os
import sys
import subprocess
import platform
import shutil
import pytest
def repodir():
"""Return the abspath to the repo directory.
"""
dirname = os.path.dirname
dirpath = os.path.abspath(
dirname(dirname(os.path.realpath(__file__)))
)
return dirpath
def examples_dir():
"""Return the abspath to the examples directory.
"""
return os.path.join(repodir(), 'examples')
@pytest.fixture
def run_example_in_subproc(loglevel, testdir, arb_addr):
@contextmanager
def run(script_code):
kwargs = dict()
if platform.system() == 'Windows':
# on windows we need to create a special __main__.py which will
# be executed with ``python -m <modulename>`` on windows..
shutil.copyfile(
os.path.join(examples_dir(), '__main__.py'),
os.path.join(str(testdir), '__main__.py')
)
# drop the ``if __name__ == '__main__'`` guard from the *NIX
# version of each script
script_code = '\n'.join(script_code.splitlines()[:-4])
script_file = testdir.makefile('.py', script_code)
# without this, tests hang on windows forever
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
# run the testdir "libary module" as a script
cmdargs = [
sys.executable,
'-m',
# use the "module name" of this "package"
'test_example'
]
else:
script_file = testdir.makefile('.py', script_code)
cmdargs = [
sys.executable,
str(script_file),
]
proc = testdir.popen(
cmdargs,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs,
)
assert not proc.returncode
yield proc
proc.wait()
assert proc.returncode == 0
yield run
@pytest.mark.parametrize(
'example_script',
[f for f in os.listdir(examples_dir()) if '__' not in f],
)
def test_example(run_example_in_subproc, example_script):
"""Load and run scripts from this repo's ``examples/`` dir as a user
would copy and pasing them into their editor.
On windows a little more "finessing" is done to make ``multiprocessing`` play nice:
we copy the ``__main__.py`` into the test directory and invoke the script as a module
with ``python -m test_example``.
"""
ex_file = os.path.join(examples_dir(), example_script)
with open(ex_file, 'r') as ex:
code = ex.read()
with run_example_in_subproc(code) as proc:
proc.wait()
err, _ = proc.stderr.read(), proc.stdout.read()
# if we get some gnarly output let's aggregate and raise
if err and b'Error' in err:
raise Exception(err.decode())
assert proc.returncode == 0

View File

@ -270,7 +270,10 @@ class Actor:
# should allow for relative (at least downward) imports.
sys.path.append(os.path.dirname(filepath))
log.debug(f"Attempting to import {modpath}@{filepath}")
self._mods[modpath] = importlib.import_module(modpath)
mod = importlib.import_module(modpath)
self._mods[modpath] = mod
if modpath == '__main__':
self._mods['__mp_main__'] = mod
except ModuleNotFoundError:
# it is expected the corresponding `ModuleNotExposed` error
# will be raised later
@ -278,11 +281,6 @@ class Actor:
raise
def _get_rpc_func(self, ns, funcname):
if ns == "__mp_main__":
# In subprocesses, `__main__` will actually map to
# `__mp_main__` which should be the same entry-point-module
# as the parent.
ns = "__main__"
try:
return getattr(self._mods[ns], funcname)
except KeyError as err:

View File

@ -5,7 +5,7 @@ import typing
from typing import Tuple, Optional, Union
from async_generator import asynccontextmanager
from ._ipc import _connect_chan
from ._ipc import _connect_chan, Channel
from ._portal import (
Portal,
open_portal,
@ -16,7 +16,8 @@ from ._state import current_actor
@asynccontextmanager
async def get_arbiter(
host: str, port: int
host: str,
port: int,
) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]:
"""Return a portal instance connected to a local or remote
arbiter.
@ -28,7 +29,7 @@ async def get_arbiter(
if actor.is_arbiter:
# we're already the arbiter
# (likely a re-entrant call from the arbiter actor)
yield LocalPortal(actor)
yield LocalPortal(actor, Channel((host, port)))
else:
async with _connect_chan(host, port) as chan:
async with open_portal(chan) as arb_portal:

View File

@ -298,6 +298,7 @@ class LocalPortal:
using an in process actor instance.
"""
actor: 'Actor' # type: ignore
channel: Channel
async def run(self, ns: str, func_name: str, **kwargs) -> Any:
"""Run a requested function locally and return it's result.

View File

@ -30,20 +30,21 @@ from ._actor import Actor, ActorFailure
log = get_logger('tractor')
# use trip as our default on *nix systems for now
if platform.system() != 'Windows':
_spawn_method: str = "trio_run_in_process"
else:
_spawn_method = "spawn"
# placeholder for an mp start context if so using that backend
_ctx: Optional[mp.context.BaseContext] = None
_spawn_method: str = "spawn"
if platform.system() == 'Windows':
_spawn_method = "spawn"
_ctx = mp.get_context("spawn")
async def proc_waiter(proc: mp.Process) -> None:
await trio.hazmat.WaitForSingleObject(proc.sentinel)
else:
# *NIX systems use ``trio_run_in_process` as our default (for now)
import trio_run_in_process
_spawn_method = "trio_run_in_process"
async def proc_waiter(proc: mp.Process) -> None:
await trio.hazmat.wait_readable(proc.sentinel)
@ -53,9 +54,9 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
"""Attempt to set the start method for process starting, aka the "actor
spawning backend".
If the desired method is not supported this function will error. On Windows
the only supported option is the ``multiprocessing`` "spawn" method. The default
on *nix systems is ``trio_run_in_process``.
If the desired method is not supported this function will error. On
Windows the only supported option is the ``multiprocessing`` "spawn"
method. The default on *nix systems is ``trio_run_in_process``.
"""
global _ctx
global _spawn_method