Continue hacking the forkserver in Python 3.8
They got all fancy and added shared memory segment tracking and then had to "generalize" the tracker name...hooray Fixes #81windows_support
parent
6ff32347bf
commit
da4796749f
|
@ -15,7 +15,17 @@ import errno
|
|||
import selectors
|
||||
import warnings
|
||||
|
||||
from multiprocessing import semaphore_tracker, spawn, process # type: ignore
|
||||
try:
|
||||
from multiprocessing import semaphore_tracker # type: ignore
|
||||
resource_tracker = semaphore_tracker
|
||||
resource_tracker._resource_tracker = resource_tracker._semaphore_tracker
|
||||
_tracker_type = resource_tracker.SemaphoreTracker
|
||||
except ImportError:
|
||||
# 3.8 introduces a more general version that also tracks shared mems
|
||||
from multiprocessing import resource_tracker # type: ignore
|
||||
_tracker_type = resource_tracker.ResourceTracker
|
||||
|
||||
from multiprocessing import spawn, process # type: ignore
|
||||
from multiprocessing import forkserver, util, connection # type: ignore
|
||||
from multiprocessing.forkserver import (
|
||||
ForkServer, MAXFDS_TO_SEND
|
||||
|
@ -48,7 +58,7 @@ class PatchedForkServer(ForkServer):
|
|||
parent_r, child_w = os.pipe()
|
||||
child_r, parent_w = os.pipe()
|
||||
allfds = [child_r, child_w, self._forkserver_alive_fd,
|
||||
semaphore_tracker.getfd()]
|
||||
resource_tracker.getfd()]
|
||||
allfds += fds
|
||||
|
||||
# XXX This is the only part changed
|
||||
|
@ -83,7 +93,7 @@ class PatchedForkServer(ForkServer):
|
|||
ensure_running() will do nothing.
|
||||
'''
|
||||
with self._lock:
|
||||
semaphore_tracker.ensure_running()
|
||||
resource_tracker.ensure_running()
|
||||
if self._forkserver_pid is not None:
|
||||
# forkserver was launched before, is it still running?
|
||||
pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG)
|
||||
|
@ -231,14 +241,14 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
|
|||
# Incoming fork request
|
||||
with listener.accept()[0] as s:
|
||||
|
||||
# XXX Thing changed - be tolerant of socket disconnects
|
||||
# XXX Thing that changed - be tolerant of socket disconnects
|
||||
try:
|
||||
# Receive fds from client
|
||||
fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
|
||||
except EOFError:
|
||||
# broken socket due to reconnection on client-side
|
||||
continue
|
||||
# XXX Thing changed - be tolerant of socket disconnects
|
||||
# XXX Thing that changed - be tolerant of socket disconnects
|
||||
|
||||
if len(fds) > MAXFDS_TO_SEND:
|
||||
raise RuntimeError(
|
||||
|
@ -288,12 +298,20 @@ def _serve_one(child_r, fds, unused_fds, handlers):
|
|||
for fd in unused_fds:
|
||||
os.close(fd)
|
||||
|
||||
# XXX this assigment is why we override this func...
|
||||
(_forkserver._forkserver_alive_fd,
|
||||
semaphore_tracker._semaphore_tracker._fd,
|
||||
resource_tracker._resource_tracker._fd,
|
||||
*_forkserver._inherited_fds) = fds
|
||||
|
||||
# Run process object received over pipe
|
||||
code = spawn._main(child_r)
|
||||
# XXX this clearly changed
|
||||
try:
|
||||
# spawn._main() signature changed in 3.8
|
||||
parent_sentinel = os.dup(child_r)
|
||||
code = spawn._main(child_r, parent_sentinel)
|
||||
except TypeError:
|
||||
# pre 3.8 signature
|
||||
code = spawn._main(child_r)
|
||||
|
||||
return code
|
||||
|
||||
|
@ -307,15 +325,16 @@ def write_signed(fd, n):
|
|||
msg = msg[nbytes:]
|
||||
|
||||
|
||||
class PatchedSemaphoreTracker(semaphore_tracker.SemaphoreTracker):
|
||||
class PatchedResourceTracker(_tracker_type): # type: ignore
|
||||
"""Stop GD ensuring everything is running...
|
||||
"""
|
||||
def getfd(self):
|
||||
# XXX: why must be constantly spawn trackers..
|
||||
# self.ensure_running()
|
||||
return self._fd
|
||||
|
||||
|
||||
_semaphore_tracker = PatchedSemaphoreTracker()
|
||||
_resource_tracker = PatchedResourceTracker()
|
||||
_forkserver = PatchedForkServer()
|
||||
|
||||
|
||||
|
@ -328,11 +347,16 @@ def override_stdlib():
|
|||
and semaphore trackers for each actor nursery used to create new
|
||||
sub-actors from sub-actors.
|
||||
"""
|
||||
semaphore_tracker._semaphore_tracker = _semaphore_tracker
|
||||
semaphore_tracker.ensure_running = _semaphore_tracker.ensure_running
|
||||
semaphore_tracker.register = _semaphore_tracker.register
|
||||
semaphore_tracker.unregister = _semaphore_tracker.unregister
|
||||
semaphore_tracker.getfd = _semaphore_tracker.getfd
|
||||
try:
|
||||
resource_tracker._semaphore_tracker = _resource_tracker
|
||||
resource_tracker._resource_tracker = _resource_tracker
|
||||
except AttributeError:
|
||||
resource_tracker._resource_tracker = _resource_tracker
|
||||
|
||||
resource_tracker.ensure_running = _resource_tracker.ensure_running
|
||||
resource_tracker.register = _resource_tracker.register
|
||||
resource_tracker.unregister = _resource_tracker.unregister
|
||||
resource_tracker.getfd = _resource_tracker.getfd
|
||||
|
||||
forkserver._forkserver = _forkserver
|
||||
forkserver.ensure_running = _forkserver.ensure_running
|
||||
|
|
|
@ -4,8 +4,17 @@ Process spawning.
|
|||
Mostly just wrapping around ``multiprocessing``.
|
||||
"""
|
||||
import multiprocessing as mp
|
||||
from multiprocessing import forkserver, semaphore_tracker # type: ignore
|
||||
from typing import Tuple, Optional
|
||||
|
||||
try:
|
||||
from multiprocessing import semaphore_tracker # type: ignore
|
||||
resource_tracker = semaphore_tracker
|
||||
resource_tracker._resource_tracker = resource_tracker._semaphore_tracker
|
||||
except ImportError:
|
||||
# 3.8 introduces a more general version that also tracks shared mems
|
||||
from multiprocessing import resource_tracker # type: ignore
|
||||
|
||||
from multiprocessing import forkserver # type: ignore
|
||||
from typing import Tuple
|
||||
|
||||
from . import _forkserver_override
|
||||
from ._state import current_actor
|
||||
|
@ -71,8 +80,8 @@ def new_proc(
|
|||
fs._forkserver_address,
|
||||
fs._forkserver_alive_fd,
|
||||
getattr(fs, '_forkserver_pid', None),
|
||||
getattr(semaphore_tracker._semaphore_tracker, '_pid', None),
|
||||
semaphore_tracker._semaphore_tracker._fd,
|
||||
getattr(resource_tracker._resource_tracker, '_pid', None),
|
||||
resource_tracker._resource_tracker._fd,
|
||||
)
|
||||
else:
|
||||
assert curr_actor._forkserver_info
|
||||
|
@ -80,8 +89,8 @@ def new_proc(
|
|||
fs._forkserver_address,
|
||||
fs._forkserver_alive_fd,
|
||||
fs._forkserver_pid,
|
||||
semaphore_tracker._semaphore_tracker._pid,
|
||||
semaphore_tracker._semaphore_tracker._fd,
|
||||
resource_tracker._resource_tracker._pid,
|
||||
resource_tracker._resource_tracker._fd,
|
||||
) = curr_actor._forkserver_info
|
||||
else:
|
||||
fs_info = (None, None, None, None, None)
|
||||
|
|
Loading…
Reference in New Issue