forked from goodboy/tractor
1
0
Fork 0

Merge pull request #82 from goodboy/windows_support

Windows and Python 3.8 support
pip_ci_fix
goodboy 2019-10-17 09:11:40 -04:00 committed by GitHub
commit 07d54110c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 116 additions and 39 deletions

View File

@ -1,18 +1,29 @@
language: python language: python
dist: xenial
sudo: required
matrix: matrix:
include: include:
- os: windows - name: "Windows, Python Latest"
os: windows
language: sh language: sh
python: 3.7 python: 3.x # only works on linux
before_install: before_install:
- choco install python3 - choco install python3 --params "/InstallDir:C:\\Python"
- export PATH="/c/Python37:/c/Python37/Scripts:$PATH" - export PATH="/c/Python:/c/Python/Scripts:$PATH"
- python -m pip install --upgrade pip wheel - python -m pip install --upgrade pip wheel
- python: 3.7 - name: "Windows, Python 3.7"
dist: xenial os: windows
sudo: required python: 3.7 # only works on linux
language: sh
before_install:
- choco install python3 --version 3.7.4 --params "/InstallDir:C:\\Python"
- export PATH="/c/Python:/c/Python/Scripts:$PATH"
- python -m pip install --upgrade pip wheel
- python: 3.7 # this works for Linux but is ignored on macOS or Windows
- python: 3.8
install: install:
- cd $TRAVIS_BUILD_DIR - cd $TRAVIS_BUILD_DIR

View File

@ -82,18 +82,10 @@ No PyPi release yet!
pip install git+git://github.com/goodboy/tractor.git pip install git+git://github.com/goodboy/tractor.git
Windows "gotchas"
*****************
`tractor` internally uses the stdlib's `multiprocessing` package which
*can* have some gotchas on Windows. Namely, the need for calling
`freeze_support()`_ inside the ``__main__`` context. See `#61`_ for the
deats.
.. _freeze_support(): https://docs.python.org/3/library/multiprocessing.html#multiprocessing.freeze_support
.. _#61: https://github.com/goodboy/tractor/pull/61#issuecomment-470053512
Examples Examples
-------- --------
Note, if you are on Windows please be sure to see the gotchas section
before trying these.
A trynamic first scene A trynamic first scene
@ -707,6 +699,47 @@ selected by default for speed.
.. _multiprocessing start method: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods .. _multiprocessing start method: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
Windows "gotchas"
*****************
`tractor` internally uses the stdlib's `multiprocessing` package which
*can* have some gotchas on Windows. Namely, the need for calling
`freeze_support()`_ inside the ``__main__`` context. Additionally you
may need place you `tractor` program entry point in a seperate
`__main__.py` module in your package in order to avoid an error like the
following ::
Traceback (most recent call last):
File "C:\ProgramData\Miniconda3\envs\tractor19030601\lib\site-packages\tractor\_actor.py", line 234, in _get_rpc_func
return getattr(self._mods[ns], funcname)
KeyError: '__mp_main__'
To avoid this, the following is the **only code** that should be in your
main python module of the program:
.. code:: python
# application/__main__.py
import tractor
import multiprocessing
from . import tractor_app
if __name__ == '__main__':
multiprocessing.freeze_support()
tractor.run(tractor_app.main)
And execute as::
python -m application
See `#61`_ and `#79`_ for further details.
.. _freeze_support(): https://docs.python.org/3/library/multiprocessing.html#multiprocessing.freeze_support
.. _#61: https://github.com/goodboy/tractor/pull/61#issuecomment-470053512
.. _#79: https://github.com/goodboy/tractor/pull/79
Enabling logging Enabling logging
**************** ****************
Considering how complicated distributed software can become it helps to know Considering how complicated distributed software can become it helps to know

View File

@ -20,7 +20,7 @@ if platform.system() == 'Windows':
else: else:
_KILL_SIGNAL = signal.SIGKILL _KILL_SIGNAL = signal.SIGKILL
_INT_SIGNAL = signal.SIGINT _INT_SIGNAL = signal.SIGINT
_INT_RETURN_CODE = 1 _INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value
_PROC_SPAWN_WAIT = 0.6 if sys.version_info < (3, 7) else 0.4 _PROC_SPAWN_WAIT = 0.6 if sys.version_info < (3, 7) else 0.4

View File

@ -215,7 +215,7 @@ def test_a_quadruple_example(time_quad_ex):
"""This also serves as a kind of "we'd like to be this fast test".""" """This also serves as a kind of "we'd like to be this fast test"."""
results, diff = time_quad_ex results, diff = time_quad_ex
assert results assert results
this_fast = 5 if platform.system() == 'Windows' else 2.5 this_fast = 6 if platform.system() == 'Windows' else 2.5
assert diff < this_fast assert diff < this_fast

View File

@ -15,7 +15,17 @@ import errno
import selectors import selectors
import warnings import warnings
from multiprocessing import semaphore_tracker, spawn, process # type: ignore try:
from multiprocessing import semaphore_tracker # type: ignore
resource_tracker = semaphore_tracker
resource_tracker._resource_tracker = resource_tracker._semaphore_tracker
_tracker_type = resource_tracker.SemaphoreTracker
except ImportError:
# 3.8 introduces a more general version that also tracks shared mems
from multiprocessing import resource_tracker # type: ignore
_tracker_type = resource_tracker.ResourceTracker
from multiprocessing import spawn, process # type: ignore
from multiprocessing import forkserver, util, connection # type: ignore from multiprocessing import forkserver, util, connection # type: ignore
from multiprocessing.forkserver import ( from multiprocessing.forkserver import (
ForkServer, MAXFDS_TO_SEND ForkServer, MAXFDS_TO_SEND
@ -48,7 +58,7 @@ class PatchedForkServer(ForkServer):
parent_r, child_w = os.pipe() parent_r, child_w = os.pipe()
child_r, parent_w = os.pipe() child_r, parent_w = os.pipe()
allfds = [child_r, child_w, self._forkserver_alive_fd, allfds = [child_r, child_w, self._forkserver_alive_fd,
semaphore_tracker.getfd()] resource_tracker.getfd()]
allfds += fds allfds += fds
# XXX This is the only part changed # XXX This is the only part changed
@ -83,7 +93,7 @@ class PatchedForkServer(ForkServer):
ensure_running() will do nothing. ensure_running() will do nothing.
''' '''
with self._lock: with self._lock:
semaphore_tracker.ensure_running() resource_tracker.ensure_running()
if self._forkserver_pid is not None: if self._forkserver_pid is not None:
# forkserver was launched before, is it still running? # forkserver was launched before, is it still running?
pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG) pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG)
@ -231,14 +241,14 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
# Incoming fork request # Incoming fork request
with listener.accept()[0] as s: with listener.accept()[0] as s:
# XXX Thing changed - be tolerant of socket disconnects # XXX Thing that changed - be tolerant of socket disconnects
try: try:
# Receive fds from client # Receive fds from client
fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1) fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
except EOFError: except EOFError:
# broken socket due to reconnection on client-side # broken socket due to reconnection on client-side
continue continue
# XXX Thing changed - be tolerant of socket disconnects # XXX Thing that changed - be tolerant of socket disconnects
if len(fds) > MAXFDS_TO_SEND: if len(fds) > MAXFDS_TO_SEND:
raise RuntimeError( raise RuntimeError(
@ -288,11 +298,19 @@ def _serve_one(child_r, fds, unused_fds, handlers):
for fd in unused_fds: for fd in unused_fds:
os.close(fd) os.close(fd)
# XXX this assigment is why we override this func...
(_forkserver._forkserver_alive_fd, (_forkserver._forkserver_alive_fd,
semaphore_tracker._semaphore_tracker._fd, resource_tracker._resource_tracker._fd,
*_forkserver._inherited_fds) = fds *_forkserver._inherited_fds) = fds
# Run process object received over pipe # Run process object received over pipe
# XXX this clearly changed
try:
# spawn._main() signature changed in 3.8
parent_sentinel = os.dup(child_r)
code = spawn._main(child_r, parent_sentinel)
except TypeError:
# pre 3.8 signature
code = spawn._main(child_r) code = spawn._main(child_r)
return code return code
@ -307,15 +325,16 @@ def write_signed(fd, n):
msg = msg[nbytes:] msg = msg[nbytes:]
class PatchedSemaphoreTracker(semaphore_tracker.SemaphoreTracker): class PatchedResourceTracker(_tracker_type): # type: ignore
"""Stop GD ensuring everything is running... """Stop GD ensuring everything is running...
""" """
def getfd(self): def getfd(self):
# XXX: why must be constantly spawn trackers..
# self.ensure_running() # self.ensure_running()
return self._fd return self._fd
_semaphore_tracker = PatchedSemaphoreTracker() _resource_tracker = PatchedResourceTracker()
_forkserver = PatchedForkServer() _forkserver = PatchedForkServer()
@ -328,11 +347,16 @@ def override_stdlib():
and semaphore trackers for each actor nursery used to create new and semaphore trackers for each actor nursery used to create new
sub-actors from sub-actors. sub-actors from sub-actors.
""" """
semaphore_tracker._semaphore_tracker = _semaphore_tracker try:
semaphore_tracker.ensure_running = _semaphore_tracker.ensure_running resource_tracker._semaphore_tracker = _resource_tracker
semaphore_tracker.register = _semaphore_tracker.register resource_tracker._resource_tracker = _resource_tracker
semaphore_tracker.unregister = _semaphore_tracker.unregister except AttributeError:
semaphore_tracker.getfd = _semaphore_tracker.getfd resource_tracker._resource_tracker = _resource_tracker
resource_tracker.ensure_running = _resource_tracker.ensure_running
resource_tracker.register = _resource_tracker.register
resource_tracker.unregister = _resource_tracker.unregister
resource_tracker.getfd = _resource_tracker.getfd
forkserver._forkserver = _forkserver forkserver._forkserver = _forkserver
forkserver.ensure_running = _forkserver.ensure_running forkserver.ensure_running = _forkserver.ensure_running

View File

@ -4,8 +4,17 @@ Process spawning.
Mostly just wrapping around ``multiprocessing``. Mostly just wrapping around ``multiprocessing``.
""" """
import multiprocessing as mp import multiprocessing as mp
from multiprocessing import forkserver, semaphore_tracker # type: ignore
from typing import Tuple, Optional try:
from multiprocessing import semaphore_tracker # type: ignore
resource_tracker = semaphore_tracker
resource_tracker._resource_tracker = resource_tracker._semaphore_tracker
except ImportError:
# 3.8 introduces a more general version that also tracks shared mems
from multiprocessing import resource_tracker # type: ignore
from multiprocessing import forkserver # type: ignore
from typing import Tuple
from . import _forkserver_override from . import _forkserver_override
from ._state import current_actor from ._state import current_actor
@ -71,8 +80,8 @@ def new_proc(
fs._forkserver_address, fs._forkserver_address,
fs._forkserver_alive_fd, fs._forkserver_alive_fd,
getattr(fs, '_forkserver_pid', None), getattr(fs, '_forkserver_pid', None),
getattr(semaphore_tracker._semaphore_tracker, '_pid', None), getattr(resource_tracker._resource_tracker, '_pid', None),
semaphore_tracker._semaphore_tracker._fd, resource_tracker._resource_tracker._fd,
) )
else: else:
assert curr_actor._forkserver_info assert curr_actor._forkserver_info
@ -80,8 +89,8 @@ def new_proc(
fs._forkserver_address, fs._forkserver_address,
fs._forkserver_alive_fd, fs._forkserver_alive_fd,
fs._forkserver_pid, fs._forkserver_pid,
semaphore_tracker._semaphore_tracker._pid, resource_tracker._resource_tracker._pid,
semaphore_tracker._semaphore_tracker._fd, resource_tracker._resource_tracker._fd,
) = curr_actor._forkserver_info ) = curr_actor._forkserver_info
else: else:
fs_info = (None, None, None, None, None) fs_info = (None, None, None, None, None)