diff --git a/.travis.yml b/.travis.yml index d170042..dc63616 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,18 +1,29 @@ language: python +dist: xenial +sudo: required matrix: include: - - os: windows + - name: "Windows, Python Latest" + os: windows language: sh - python: 3.7 + python: 3.x # only works on linux before_install: - - choco install python3 - - export PATH="/c/Python37:/c/Python37/Scripts:$PATH" + - choco install python3 --params "/InstallDir:C:\\Python" + - export PATH="/c/Python:/c/Python/Scripts:$PATH" - python -m pip install --upgrade pip wheel - - python: 3.7 - dist: xenial - sudo: required + - name: "Windows, Python 3.7" + os: windows + python: 3.7 # only works on linux + language: sh + before_install: + - choco install python3 --version 3.7.4 --params "/InstallDir:C:\\Python" + - export PATH="/c/Python:/c/Python/Scripts:$PATH" + - python -m pip install --upgrade pip wheel + + - python: 3.7 # this works for Linux but is ignored on macOS or Windows + - python: 3.8 install: - cd $TRAVIS_BUILD_DIR diff --git a/README.rst b/README.rst index 2cfddeb..5819f20 100644 --- a/README.rst +++ b/README.rst @@ -82,18 +82,10 @@ No PyPi release yet! pip install git+git://github.com/goodboy/tractor.git -Windows "gotchas" -***************** -`tractor` internally uses the stdlib's `multiprocessing` package which -*can* have some gotchas on Windows. Namely, the need for calling -`freeze_support()`_ inside the ``__main__`` context. See `#61`_ for the -deats. - -.. _freeze_support(): https://docs.python.org/3/library/multiprocessing.html#multiprocessing.freeze_support -.. _#61: https://github.com/goodboy/tractor/pull/61#issuecomment-470053512 - Examples -------- +Note, if you are on Windows please be sure to see the gotchas section +before trying these. A trynamic first scene @@ -707,6 +699,47 @@ selected by default for speed. .. _multiprocessing start method: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods +Windows "gotchas" +***************** +`tractor` internally uses the stdlib's `multiprocessing` package which +*can* have some gotchas on Windows. Namely, the need for calling +`freeze_support()`_ inside the ``__main__`` context. Additionally you +may need place you `tractor` program entry point in a seperate +`__main__.py` module in your package in order to avoid an error like the +following :: + + Traceback (most recent call last): + File "C:\ProgramData\Miniconda3\envs\tractor19030601\lib\site-packages\tractor\_actor.py", line 234, in _get_rpc_func + return getattr(self._mods[ns], funcname) + KeyError: '__mp_main__' + + +To avoid this, the following is the **only code** that should be in your +main python module of the program: + +.. code:: python + + # application/__main__.py + import tractor + import multiprocessing + from . import tractor_app + + if __name__ == '__main__': + multiprocessing.freeze_support() + tractor.run(tractor_app.main) + +And execute as:: + + python -m application + + +See `#61`_ and `#79`_ for further details. + +.. _freeze_support(): https://docs.python.org/3/library/multiprocessing.html#multiprocessing.freeze_support +.. _#61: https://github.com/goodboy/tractor/pull/61#issuecomment-470053512 +.. _#79: https://github.com/goodboy/tractor/pull/79 + + Enabling logging **************** Considering how complicated distributed software can become it helps to know diff --git a/tests/test_multi_program.py b/tests/test_multi_program.py index 8e1545f..20f4226 100644 --- a/tests/test_multi_program.py +++ b/tests/test_multi_program.py @@ -20,7 +20,7 @@ if platform.system() == 'Windows': else: _KILL_SIGNAL = signal.SIGKILL _INT_SIGNAL = signal.SIGINT - _INT_RETURN_CODE = 1 + _INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value _PROC_SPAWN_WAIT = 0.6 if sys.version_info < (3, 7) else 0.4 diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 7be0950..60877cf 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -215,7 +215,7 @@ def test_a_quadruple_example(time_quad_ex): """This also serves as a kind of "we'd like to be this fast test".""" results, diff = time_quad_ex assert results - this_fast = 5 if platform.system() == 'Windows' else 2.5 + this_fast = 6 if platform.system() == 'Windows' else 2.5 assert diff < this_fast diff --git a/tractor/_forkserver_override.py b/tractor/_forkserver_override.py index 68d8333..07933dd 100644 --- a/tractor/_forkserver_override.py +++ b/tractor/_forkserver_override.py @@ -15,7 +15,17 @@ import errno import selectors import warnings -from multiprocessing import semaphore_tracker, spawn, process # type: ignore +try: + from multiprocessing import semaphore_tracker # type: ignore + resource_tracker = semaphore_tracker + resource_tracker._resource_tracker = resource_tracker._semaphore_tracker + _tracker_type = resource_tracker.SemaphoreTracker +except ImportError: + # 3.8 introduces a more general version that also tracks shared mems + from multiprocessing import resource_tracker # type: ignore + _tracker_type = resource_tracker.ResourceTracker + +from multiprocessing import spawn, process # type: ignore from multiprocessing import forkserver, util, connection # type: ignore from multiprocessing.forkserver import ( ForkServer, MAXFDS_TO_SEND @@ -48,7 +58,7 @@ class PatchedForkServer(ForkServer): parent_r, child_w = os.pipe() child_r, parent_w = os.pipe() allfds = [child_r, child_w, self._forkserver_alive_fd, - semaphore_tracker.getfd()] + resource_tracker.getfd()] allfds += fds # XXX This is the only part changed @@ -83,7 +93,7 @@ class PatchedForkServer(ForkServer): ensure_running() will do nothing. ''' with self._lock: - semaphore_tracker.ensure_running() + resource_tracker.ensure_running() if self._forkserver_pid is not None: # forkserver was launched before, is it still running? pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG) @@ -231,14 +241,14 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None): # Incoming fork request with listener.accept()[0] as s: - # XXX Thing changed - be tolerant of socket disconnects + # XXX Thing that changed - be tolerant of socket disconnects try: # Receive fds from client fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1) except EOFError: # broken socket due to reconnection on client-side continue - # XXX Thing changed - be tolerant of socket disconnects + # XXX Thing that changed - be tolerant of socket disconnects if len(fds) > MAXFDS_TO_SEND: raise RuntimeError( @@ -288,12 +298,20 @@ def _serve_one(child_r, fds, unused_fds, handlers): for fd in unused_fds: os.close(fd) + # XXX this assigment is why we override this func... (_forkserver._forkserver_alive_fd, - semaphore_tracker._semaphore_tracker._fd, + resource_tracker._resource_tracker._fd, *_forkserver._inherited_fds) = fds # Run process object received over pipe - code = spawn._main(child_r) + # XXX this clearly changed + try: + # spawn._main() signature changed in 3.8 + parent_sentinel = os.dup(child_r) + code = spawn._main(child_r, parent_sentinel) + except TypeError: + # pre 3.8 signature + code = spawn._main(child_r) return code @@ -307,15 +325,16 @@ def write_signed(fd, n): msg = msg[nbytes:] -class PatchedSemaphoreTracker(semaphore_tracker.SemaphoreTracker): +class PatchedResourceTracker(_tracker_type): # type: ignore """Stop GD ensuring everything is running... """ def getfd(self): + # XXX: why must be constantly spawn trackers.. # self.ensure_running() return self._fd -_semaphore_tracker = PatchedSemaphoreTracker() +_resource_tracker = PatchedResourceTracker() _forkserver = PatchedForkServer() @@ -328,11 +347,16 @@ def override_stdlib(): and semaphore trackers for each actor nursery used to create new sub-actors from sub-actors. """ - semaphore_tracker._semaphore_tracker = _semaphore_tracker - semaphore_tracker.ensure_running = _semaphore_tracker.ensure_running - semaphore_tracker.register = _semaphore_tracker.register - semaphore_tracker.unregister = _semaphore_tracker.unregister - semaphore_tracker.getfd = _semaphore_tracker.getfd + try: + resource_tracker._semaphore_tracker = _resource_tracker + resource_tracker._resource_tracker = _resource_tracker + except AttributeError: + resource_tracker._resource_tracker = _resource_tracker + + resource_tracker.ensure_running = _resource_tracker.ensure_running + resource_tracker.register = _resource_tracker.register + resource_tracker.unregister = _resource_tracker.unregister + resource_tracker.getfd = _resource_tracker.getfd forkserver._forkserver = _forkserver forkserver.ensure_running = _forkserver.ensure_running diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 0b6916f..a759946 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -4,8 +4,17 @@ Process spawning. Mostly just wrapping around ``multiprocessing``. """ import multiprocessing as mp -from multiprocessing import forkserver, semaphore_tracker # type: ignore -from typing import Tuple, Optional + +try: + from multiprocessing import semaphore_tracker # type: ignore + resource_tracker = semaphore_tracker + resource_tracker._resource_tracker = resource_tracker._semaphore_tracker +except ImportError: + # 3.8 introduces a more general version that also tracks shared mems + from multiprocessing import resource_tracker # type: ignore + +from multiprocessing import forkserver # type: ignore +from typing import Tuple from . import _forkserver_override from ._state import current_actor @@ -71,8 +80,8 @@ def new_proc( fs._forkserver_address, fs._forkserver_alive_fd, getattr(fs, '_forkserver_pid', None), - getattr(semaphore_tracker._semaphore_tracker, '_pid', None), - semaphore_tracker._semaphore_tracker._fd, + getattr(resource_tracker._resource_tracker, '_pid', None), + resource_tracker._resource_tracker._fd, ) else: assert curr_actor._forkserver_info @@ -80,8 +89,8 @@ def new_proc( fs._forkserver_address, fs._forkserver_alive_fd, fs._forkserver_pid, - semaphore_tracker._semaphore_tracker._pid, - semaphore_tracker._semaphore_tracker._fd, + resource_tracker._resource_tracker._pid, + resource_tracker._resource_tracker._fd, ) = curr_actor._forkserver_info else: fs_info = (None, None, None, None, None)