diff --git a/tractor/_forkserver_override.py b/tractor/_forkserver_override.py index 68d8333..07933dd 100644 --- a/tractor/_forkserver_override.py +++ b/tractor/_forkserver_override.py @@ -15,7 +15,17 @@ import errno import selectors import warnings -from multiprocessing import semaphore_tracker, spawn, process # type: ignore +try: + from multiprocessing import semaphore_tracker # type: ignore + resource_tracker = semaphore_tracker + resource_tracker._resource_tracker = resource_tracker._semaphore_tracker + _tracker_type = resource_tracker.SemaphoreTracker +except ImportError: + # 3.8 introduces a more general version that also tracks shared mems + from multiprocessing import resource_tracker # type: ignore + _tracker_type = resource_tracker.ResourceTracker + +from multiprocessing import spawn, process # type: ignore from multiprocessing import forkserver, util, connection # type: ignore from multiprocessing.forkserver import ( ForkServer, MAXFDS_TO_SEND @@ -48,7 +58,7 @@ class PatchedForkServer(ForkServer): parent_r, child_w = os.pipe() child_r, parent_w = os.pipe() allfds = [child_r, child_w, self._forkserver_alive_fd, - semaphore_tracker.getfd()] + resource_tracker.getfd()] allfds += fds # XXX This is the only part changed @@ -83,7 +93,7 @@ class PatchedForkServer(ForkServer): ensure_running() will do nothing. ''' with self._lock: - semaphore_tracker.ensure_running() + resource_tracker.ensure_running() if self._forkserver_pid is not None: # forkserver was launched before, is it still running? pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG) @@ -231,14 +241,14 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None): # Incoming fork request with listener.accept()[0] as s: - # XXX Thing changed - be tolerant of socket disconnects + # XXX Thing that changed - be tolerant of socket disconnects try: # Receive fds from client fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1) except EOFError: # broken socket due to reconnection on client-side continue - # XXX Thing changed - be tolerant of socket disconnects + # XXX Thing that changed - be tolerant of socket disconnects if len(fds) > MAXFDS_TO_SEND: raise RuntimeError( @@ -288,12 +298,20 @@ def _serve_one(child_r, fds, unused_fds, handlers): for fd in unused_fds: os.close(fd) + # XXX this assigment is why we override this func... (_forkserver._forkserver_alive_fd, - semaphore_tracker._semaphore_tracker._fd, + resource_tracker._resource_tracker._fd, *_forkserver._inherited_fds) = fds # Run process object received over pipe - code = spawn._main(child_r) + # XXX this clearly changed + try: + # spawn._main() signature changed in 3.8 + parent_sentinel = os.dup(child_r) + code = spawn._main(child_r, parent_sentinel) + except TypeError: + # pre 3.8 signature + code = spawn._main(child_r) return code @@ -307,15 +325,16 @@ def write_signed(fd, n): msg = msg[nbytes:] -class PatchedSemaphoreTracker(semaphore_tracker.SemaphoreTracker): +class PatchedResourceTracker(_tracker_type): # type: ignore """Stop GD ensuring everything is running... """ def getfd(self): + # XXX: why must be constantly spawn trackers.. # self.ensure_running() return self._fd -_semaphore_tracker = PatchedSemaphoreTracker() +_resource_tracker = PatchedResourceTracker() _forkserver = PatchedForkServer() @@ -328,11 +347,16 @@ def override_stdlib(): and semaphore trackers for each actor nursery used to create new sub-actors from sub-actors. """ - semaphore_tracker._semaphore_tracker = _semaphore_tracker - semaphore_tracker.ensure_running = _semaphore_tracker.ensure_running - semaphore_tracker.register = _semaphore_tracker.register - semaphore_tracker.unregister = _semaphore_tracker.unregister - semaphore_tracker.getfd = _semaphore_tracker.getfd + try: + resource_tracker._semaphore_tracker = _resource_tracker + resource_tracker._resource_tracker = _resource_tracker + except AttributeError: + resource_tracker._resource_tracker = _resource_tracker + + resource_tracker.ensure_running = _resource_tracker.ensure_running + resource_tracker.register = _resource_tracker.register + resource_tracker.unregister = _resource_tracker.unregister + resource_tracker.getfd = _resource_tracker.getfd forkserver._forkserver = _forkserver forkserver.ensure_running = _forkserver.ensure_running diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 0b6916f..a759946 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -4,8 +4,17 @@ Process spawning. Mostly just wrapping around ``multiprocessing``. """ import multiprocessing as mp -from multiprocessing import forkserver, semaphore_tracker # type: ignore -from typing import Tuple, Optional + +try: + from multiprocessing import semaphore_tracker # type: ignore + resource_tracker = semaphore_tracker + resource_tracker._resource_tracker = resource_tracker._semaphore_tracker +except ImportError: + # 3.8 introduces a more general version that also tracks shared mems + from multiprocessing import resource_tracker # type: ignore + +from multiprocessing import forkserver # type: ignore +from typing import Tuple from . import _forkserver_override from ._state import current_actor @@ -71,8 +80,8 @@ def new_proc( fs._forkserver_address, fs._forkserver_alive_fd, getattr(fs, '_forkserver_pid', None), - getattr(semaphore_tracker._semaphore_tracker, '_pid', None), - semaphore_tracker._semaphore_tracker._fd, + getattr(resource_tracker._resource_tracker, '_pid', None), + resource_tracker._resource_tracker._fd, ) else: assert curr_actor._forkserver_info @@ -80,8 +89,8 @@ def new_proc( fs._forkserver_address, fs._forkserver_alive_fd, fs._forkserver_pid, - semaphore_tracker._semaphore_tracker._pid, - semaphore_tracker._semaphore_tracker._fd, + resource_tracker._resource_tracker._pid, + resource_tracker._resource_tracker._fd, ) = curr_actor._forkserver_info else: fs_info = (None, None, None, None, None)