From 328e5bd597e582040add5b6319dfcbfc7b59d2b2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 19 Aug 2018 15:37:01 -0400 Subject: [PATCH 1/3] Import our `forkserver.main()` in server cmd Something changed in 3.7 (likely to do with changes to the core import system) that requires explicitly importing our version of `forkserver.main()` in order to guarantee the server runs our module code. Override `forkserver.ensure_running()`; specifically, modify the python launch command. --- tractor/_forkserver_hackzorz.py | 63 +++++++++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 2 deletions(-) diff --git a/tractor/_forkserver_hackzorz.py b/tractor/_forkserver_hackzorz.py index d04f262..ec60b5c 100644 --- a/tractor/_forkserver_hackzorz.py +++ b/tractor/_forkserver_hackzorz.py @@ -13,7 +13,8 @@ import selectors import warnings from multiprocessing import ( - forkserver, semaphore_tracker, spawn, process, util + forkserver, semaphore_tracker, spawn, process, util, + connection ) from multiprocessing.forkserver import ( ForkServer, MAXFDS_TO_SEND @@ -72,6 +73,64 @@ class PatchedForkServer(ForkServer): os.close(child_r) os.close(child_w) + def ensure_running(self): + '''Make sure that a fork server is running. + + This can be called from any process. Note that usually a child + process will just reuse the forkserver started by its parent, so + ensure_running() will do nothing. + ''' + with self._lock: + semaphore_tracker.ensure_running() + if self._forkserver_pid is not None: + # forkserver was launched before, is it still running? + pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG) + if not pid: + # still alive + return + # dead, launch it again + os.close(self._forkserver_alive_fd) + self._forkserver_address = None + self._forkserver_alive_fd = None + self._forkserver_pid = None + + # XXX only thing that changed! + cmd = ('from tractor._forkserver_hackzorz import main; ' + + 'main(%d, %d, %r, **%r)') + + if self._preload_modules: + desired_keys = {'main_path', 'sys_path'} + data = spawn.get_preparation_data('ignore') + data = {x: y for x, y in data.items() if x in desired_keys} + else: + data = {} + + with socket.socket(socket.AF_UNIX) as listener: + address = connection.arbitrary_address('AF_UNIX') + listener.bind(address) + os.chmod(address, 0o600) + listener.listen() + + # all client processes own the write end of the "alive" pipe; + # when they all terminate the read end becomes ready. + alive_r, alive_w = os.pipe() + try: + fds_to_pass = [listener.fileno(), alive_r] + cmd %= (listener.fileno(), alive_r, self._preload_modules, + data) + exe = spawn.get_executable() + args = [exe] + util._args_from_interpreter_flags() + args += ['-c', cmd] + pid = util.spawnv_passfds(exe, args, fds_to_pass) + except: + os.close(alive_w) + raise + finally: + os.close(alive_r) + self._forkserver_address = address + self._forkserver_alive_fd = alive_w + self._forkserver_pid = pid + def main(listener_fd, alive_r, preload, main_path=None, sys_path=None): '''Run forkserver.''' @@ -274,9 +333,9 @@ def override_stdlib(): semaphore_tracker.getfd = _semaphore_tracker.getfd forkserver._forkserver = _forkserver + forkserver.ensure_running = _forkserver.ensure_running forkserver.main = main forkserver._serve_one = _serve_one - forkserver.ensure_running = _forkserver.ensure_running forkserver.get_inherited_fds = _forkserver.get_inherited_fds forkserver.connect_to_new_process = _forkserver.connect_to_new_process forkserver.set_forkserver_preload = _forkserver.set_forkserver_preload From a64c2a70bd3204e6f298ecbf6b4eb6607220625c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 19 Aug 2018 16:02:12 -0400 Subject: [PATCH 2/3] Base cancel delays on speed of a non-cancelled run --- tests/test_tractor.py | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/tests/test_tractor.py b/tests/test_tractor.py index 61b1ea8..8f0b1ff 100644 --- a/tests/test_tractor.py +++ b/tests/test_tractor.py @@ -383,19 +383,31 @@ async def cancel_after(wait): return await a_quadruple_example() -def test_a_quadruple_example(arb_addr): - """This also serves as a kind of "we'd like to eventually be this - fast test". - """ - results = tractor.run(cancel_after, 2.2, arbiter_addr=arb_addr) +@pytest.fixture(scope='module') +def time_quad_ex(arb_addr): + start = time.time() + results = tractor.run(cancel_after, 3, arbiter_addr=arb_addr) + diff = time.time() - start assert results + return results, diff -@pytest.mark.parametrize('cancel_delay', list(range(1, 7))) -def test_not_fast_enough_quad(arb_addr, cancel_delay): +def test_a_quadruple_example(time_quad_ex): + """This also serves as a kind of "we'd like to be this fast test".""" + results, diff = time_quad_ex + assert results + assert diff < 2.5 + + +@pytest.mark.parametrize( + 'cancel_delay', + list(map(lambda i: i/10, range(3, 10))) +) +def test_not_fast_enough_quad(arb_addr, time_quad_ex, cancel_delay): """Verify we can cancel midway through the quad example and all actors cancel gracefully. """ - delay = 1 + cancel_delay/10 + results, diff = time_quad_ex + delay = diff - cancel_delay results = tractor.run(cancel_after, delay, arbiter_addr=arb_addr) assert results is None From 996ad891f462d84530102b0f042d78f6e6b9565f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 19 Aug 2018 16:11:57 -0400 Subject: [PATCH 3/3] py3.6 is missing this attr --- tractor/_forkserver_hackzorz.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tractor/_forkserver_hackzorz.py b/tractor/_forkserver_hackzorz.py index ec60b5c..edd282d 100644 --- a/tractor/_forkserver_hackzorz.py +++ b/tractor/_forkserver_hackzorz.py @@ -29,6 +29,8 @@ SIGNED_STRUCT = struct.Struct('q') # large enough for pid_t class PatchedForkServer(ForkServer): + _forkserver_pid = None + def connect_to_new_process(self, fds): '''Request forkserver to create a child process.