Merge pull request #34 from tgoodlet/reliable_cancel_tests

Reliable cancel tests
py3.7_tweaks
goodboy 2018-08-20 08:27:38 -04:00 committed by GitHub
commit 5e23ed20e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 83 additions and 10 deletions

View File

@ -383,19 +383,31 @@ async def cancel_after(wait):
return await a_quadruple_example() return await a_quadruple_example()
def test_a_quadruple_example(arb_addr): @pytest.fixture(scope='module')
"""This also serves as a kind of "we'd like to eventually be this def time_quad_ex(arb_addr):
fast test". start = time.time()
""" results = tractor.run(cancel_after, 3, arbiter_addr=arb_addr)
results = tractor.run(cancel_after, 2.2, arbiter_addr=arb_addr) diff = time.time() - start
assert results assert results
return results, diff
@pytest.mark.parametrize('cancel_delay', list(range(1, 7))) def test_a_quadruple_example(time_quad_ex):
def test_not_fast_enough_quad(arb_addr, cancel_delay): """This also serves as a kind of "we'd like to be this fast test"."""
results, diff = time_quad_ex
assert results
assert diff < 2.5
@pytest.mark.parametrize(
'cancel_delay',
list(map(lambda i: i/10, range(3, 10)))
)
def test_not_fast_enough_quad(arb_addr, time_quad_ex, cancel_delay):
"""Verify we can cancel midway through the quad example and all actors """Verify we can cancel midway through the quad example and all actors
cancel gracefully. cancel gracefully.
""" """
delay = 1 + cancel_delay/10 results, diff = time_quad_ex
delay = diff - cancel_delay
results = tractor.run(cancel_after, delay, arbiter_addr=arb_addr) results = tractor.run(cancel_after, delay, arbiter_addr=arb_addr)
assert results is None assert results is None

View File

@ -13,7 +13,8 @@ import selectors
import warnings import warnings
from multiprocessing import ( from multiprocessing import (
forkserver, semaphore_tracker, spawn, process, util forkserver, semaphore_tracker, spawn, process, util,
connection
) )
from multiprocessing.forkserver import ( from multiprocessing.forkserver import (
ForkServer, MAXFDS_TO_SEND ForkServer, MAXFDS_TO_SEND
@ -28,6 +29,8 @@ SIGNED_STRUCT = struct.Struct('q') # large enough for pid_t
class PatchedForkServer(ForkServer): class PatchedForkServer(ForkServer):
_forkserver_pid = None
def connect_to_new_process(self, fds): def connect_to_new_process(self, fds):
'''Request forkserver to create a child process. '''Request forkserver to create a child process.
@ -72,6 +75,64 @@ class PatchedForkServer(ForkServer):
os.close(child_r) os.close(child_r)
os.close(child_w) os.close(child_w)
def ensure_running(self):
'''Make sure that a fork server is running.
This can be called from any process. Note that usually a child
process will just reuse the forkserver started by its parent, so
ensure_running() will do nothing.
'''
with self._lock:
semaphore_tracker.ensure_running()
if self._forkserver_pid is not None:
# forkserver was launched before, is it still running?
pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG)
if not pid:
# still alive
return
# dead, launch it again
os.close(self._forkserver_alive_fd)
self._forkserver_address = None
self._forkserver_alive_fd = None
self._forkserver_pid = None
# XXX only thing that changed!
cmd = ('from tractor._forkserver_hackzorz import main; ' +
'main(%d, %d, %r, **%r)')
if self._preload_modules:
desired_keys = {'main_path', 'sys_path'}
data = spawn.get_preparation_data('ignore')
data = {x: y for x, y in data.items() if x in desired_keys}
else:
data = {}
with socket.socket(socket.AF_UNIX) as listener:
address = connection.arbitrary_address('AF_UNIX')
listener.bind(address)
os.chmod(address, 0o600)
listener.listen()
# all client processes own the write end of the "alive" pipe;
# when they all terminate the read end becomes ready.
alive_r, alive_w = os.pipe()
try:
fds_to_pass = [listener.fileno(), alive_r]
cmd %= (listener.fileno(), alive_r, self._preload_modules,
data)
exe = spawn.get_executable()
args = [exe] + util._args_from_interpreter_flags()
args += ['-c', cmd]
pid = util.spawnv_passfds(exe, args, fds_to_pass)
except:
os.close(alive_w)
raise
finally:
os.close(alive_r)
self._forkserver_address = address
self._forkserver_alive_fd = alive_w
self._forkserver_pid = pid
def main(listener_fd, alive_r, preload, main_path=None, sys_path=None): def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
'''Run forkserver.''' '''Run forkserver.'''
@ -274,9 +335,9 @@ def override_stdlib():
semaphore_tracker.getfd = _semaphore_tracker.getfd semaphore_tracker.getfd = _semaphore_tracker.getfd
forkserver._forkserver = _forkserver forkserver._forkserver = _forkserver
forkserver.ensure_running = _forkserver.ensure_running
forkserver.main = main forkserver.main = main
forkserver._serve_one = _serve_one forkserver._serve_one = _serve_one
forkserver.ensure_running = _forkserver.ensure_running
forkserver.get_inherited_fds = _forkserver.get_inherited_fds forkserver.get_inherited_fds = _forkserver.get_inherited_fds
forkserver.connect_to_new_process = _forkserver.connect_to_new_process forkserver.connect_to_new_process = _forkserver.connect_to_new_process
forkserver.set_forkserver_preload = _forkserver.set_forkserver_preload forkserver.set_forkserver_preload = _forkserver.set_forkserver_preload