tractor/tractor/_forkserver_override.py

387 lines
14 KiB
Python
Raw Normal View History

Re-license code base for distribution under AGPL This commit obviously denotes a re-license of all applicable parts of the code base. Acknowledgement of this change was completed in #274 by the majority of the current set of contributors. From here henceforth all changes will be AGPL licensed and distributed. This is purely an effort to maintain the same copy-left policy whilst closing the (perceived) SaaS loophole the GPL allows for. It is merely for this loophole: to avoid code hiding by any potential "network providers" who are attempting to use the project to make a profit without either compensating the authors or re-distributing their changes. I thought quite a bit about this change and can't see a reason not to close the SaaS loophole in our current license. We still are (hard) copy-left and I plan to keep the code base this way for a couple reasons: - The code base produces income/profit through parent projects and is demonstrably of high value. - I believe firms should not get free lunch for the sake of "contributions from their employees" or "usage as a service" which I have found to be a dubious argument at best. - If a firm who intends to profit from the code base wants to use it they can propose a secondary commercial license to purchase with the proceeds going to the project's authors under some form of well defined contract. - Many successful projects like Qt use this model; I see no reason it can't work in this case until such a time as the authors feel it should be loosened. There has been detailed discussion in #103 on licensing alternatives. The main point of this AGPL change is to protect the code base for the time being from exploitation while it grows and as we move into the next phase of development which will include extension into the multi-host distributed software space.
2021-12-13 18:08:32 +00:00
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
This is near-copy of the 3.8 stdlib's ``multiprocessing.forkserver.py``
with some hackery to prevent any more then a single forkserver and
semaphore tracker per ``MainProcess``.
2018-08-26 17:12:29 +00:00
.. note:: There is no type hinting in this code base (yet) to remain as
a close as possible to upstream.
"""
2021-01-24 18:20:24 +00:00
# type: ignore
import os
import socket
import signal
import struct
import sys
import errno
import selectors
import warnings
try:
from multiprocessing import semaphore_tracker # type: ignore
resource_tracker = semaphore_tracker
resource_tracker._resource_tracker = resource_tracker._semaphore_tracker
_tracker_type = resource_tracker.SemaphoreTracker
except ImportError:
# 3.8 introduces a more general version that also tracks shared mems
from multiprocessing import resource_tracker # type: ignore
_tracker_type = resource_tracker.ResourceTracker
from multiprocessing import spawn, process # type: ignore
2018-08-31 21:16:24 +00:00
from multiprocessing import forkserver, util, connection # type: ignore
from multiprocessing.forkserver import (
2018-08-07 12:47:57 +00:00
ForkServer, MAXFDS_TO_SEND
)
2018-08-31 21:16:24 +00:00
from multiprocessing.context import reduction # type: ignore
# taken from 3.8
SIGNED_STRUCT = struct.Struct('q') # large enough for pid_t
2018-08-03 02:32:48 +00:00
class PatchedForkServer(ForkServer):
2018-08-19 20:11:57 +00:00
_forkserver_pid = None
def connect_to_new_process(self, fds):
'''Request forkserver to create a child process.
Returns a pair of fds (status_r, data_w). The calling process can read
the child process's pid and (eventually) its returncode from status_r.
The calling process should write to data_w the pickled preparation and
process data.
'''
# self.ensure_running() # treat our users like adults expecting
# them to spawn the server on their own
if len(fds) + 4 >= MAXFDS_TO_SEND:
raise ValueError('too many fds')
with socket.socket(socket.AF_UNIX) as client:
client.connect(self._forkserver_address)
parent_r, child_w = os.pipe()
child_r, parent_w = os.pipe()
allfds = [child_r, child_w, self._forkserver_alive_fd,
resource_tracker.getfd()]
allfds += fds
2019-04-24 01:55:27 +00:00
# XXX This is the only part changed
try:
while True:
try:
reduction.sendfds(client, allfds)
break
except OSError as err:
2018-08-03 02:32:48 +00:00
if err.errno == errno.EBADF:
print(f"Bad FD {err}")
client = socket.socket(socket.AF_UNIX)
client.connect(self._forkserver_address)
continue
raise
2019-04-24 01:55:27 +00:00
# XXX This is the only part changed
return parent_r, parent_w
except Exception:
os.close(parent_r)
os.close(parent_w)
raise
finally:
os.close(child_r)
os.close(child_w)
def ensure_running(self):
'''Make sure that a fork server is running.
This can be called from any process. Note that usually a child
process will just reuse the forkserver started by its parent, so
ensure_running() will do nothing.
'''
with self._lock:
resource_tracker.ensure_running()
if self._forkserver_pid is not None:
# forkserver was launched before, is it still running?
pid, status = os.waitpid(self._forkserver_pid, os.WNOHANG)
if not pid:
# still alive
return
# dead, launch it again
os.close(self._forkserver_alive_fd)
self._forkserver_address = None
self._forkserver_alive_fd = None
self._forkserver_pid = None
# XXX only thing that changed!
2019-04-24 01:55:27 +00:00
cmd = ('from tractor._forkserver_override import main; ' +
'main(%d, %d, %r, **%r)')
if self._preload_modules:
desired_keys = {'main_path', 'sys_path'}
data = spawn.get_preparation_data('ignore')
data = {x: y for x, y in data.items() if x in desired_keys}
else:
data = {}
with socket.socket(socket.AF_UNIX) as listener:
address = connection.arbitrary_address('AF_UNIX')
listener.bind(address)
2020-12-23 17:57:05 +00:00
if not util.is_abstract_socket_namespace(address):
os.chmod(address, 0o600)
listener.listen()
# all client processes own the write end of the "alive" pipe;
# when they all terminate the read end becomes ready.
alive_r, alive_w = os.pipe()
try:
fds_to_pass = [listener.fileno(), alive_r]
cmd %= (listener.fileno(), alive_r, self._preload_modules,
data)
exe = spawn.get_executable()
args = [exe] + util._args_from_interpreter_flags()
args += ['-c', cmd]
pid = util.spawnv_passfds(exe, args, fds_to_pass)
except:
os.close(alive_w)
raise
finally:
os.close(alive_r)
self._forkserver_address = address
self._forkserver_alive_fd = alive_w
self._forkserver_pid = pid
def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
'''Run forkserver.'''
if preload:
if '__main__' in preload and main_path is not None:
process.current_process()._inheriting = True
try:
spawn.import_main_path(main_path)
finally:
del process.current_process()._inheriting
for modname in preload:
try:
__import__(modname)
except ImportError:
pass
util._close_stdin()
sig_r, sig_w = os.pipe()
os.set_blocking(sig_r, False)
os.set_blocking(sig_w, False)
def sigchld_handler(*_unused):
# Dummy signal handler, doesn't do anything
pass
handlers = {
# unblocking SIGCHLD allows the wakeup fd to notify our event loop
signal.SIGCHLD: sigchld_handler,
# protect the process from ^C
signal.SIGINT: signal.SIG_IGN,
}
old_handlers = {sig: signal.signal(sig, val)
for (sig, val) in handlers.items()}
# calling os.write() in the Python signal handler is racy
signal.set_wakeup_fd(sig_w)
# map child pids to client fds
pid_to_fd = {}
with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \
selectors.DefaultSelector() as selector:
_forkserver._forkserver_address = listener.getsockname()
selector.register(listener, selectors.EVENT_READ)
selector.register(alive_r, selectors.EVENT_READ)
selector.register(sig_r, selectors.EVENT_READ)
while True:
try:
while True:
rfds = [key.fileobj for (key, events) in selector.select()]
if rfds:
break
if alive_r in rfds:
# EOF because no more client processes left
assert os.read(alive_r, 1) == b'', "Not at EOF?"
raise SystemExit
if sig_r in rfds:
# Got SIGCHLD
os.read(sig_r, 65536) # exhaust
while True:
# Scan for child processes
try:
pid, sts = os.waitpid(-1, os.WNOHANG)
except ChildProcessError:
break
if pid == 0:
break
child_w = pid_to_fd.pop(pid, None)
if child_w is not None:
if os.WIFSIGNALED(sts):
returncode = -os.WTERMSIG(sts)
else:
if not os.WIFEXITED(sts):
raise AssertionError(
"Child {0:n} status is {1:n}".format(
pid, sts))
returncode = os.WEXITSTATUS(sts)
# Send exit code to client process
try:
2018-08-07 12:47:57 +00:00
write_signed(child_w, returncode)
except BrokenPipeError:
# client vanished
pass
os.close(child_w)
else:
# This shouldn't happen really
2020-10-14 17:48:14 +00:00
warnings.warning('forkserver: waitpid returned '
'unexpected pid %d' % pid)
if listener in rfds:
# Incoming fork request
with listener.accept()[0] as s:
# XXX Thing that changed - be tolerant of socket disconnects
try:
# Receive fds from client
fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
except EOFError:
# broken socket due to reconnection on client-side
continue
# XXX Thing that changed - be tolerant of socket disconnects
if len(fds) > MAXFDS_TO_SEND:
raise RuntimeError(
"Too many ({0:n}) fds to send".format(
len(fds)))
child_r, child_w, *fds = fds
s.close()
pid = os.fork()
if pid == 0:
# Child
code = 1
try:
listener.close()
selector.close()
unused_fds = [alive_r, child_w, sig_r, sig_w]
unused_fds.extend(pid_to_fd.values())
code = _serve_one(child_r, fds,
unused_fds,
old_handlers)
except Exception:
sys.excepthook(*sys.exc_info())
sys.stderr.flush()
finally:
os._exit(code)
else:
# Send pid to client process
try:
2018-08-07 12:47:57 +00:00
write_signed(child_w, pid)
except BrokenPipeError:
# client vanished
pass
pid_to_fd[pid] = child_w
os.close(child_r)
for fd in fds:
os.close(fd)
except OSError as e:
if e.errno != errno.ECONNABORTED:
raise
def _serve_one(child_r, fds, unused_fds, handlers):
# close unnecessary stuff and reset signal handlers
signal.set_wakeup_fd(-1)
for sig, val in handlers.items():
signal.signal(sig, val)
for fd in unused_fds:
os.close(fd)
# XXX this assigment is why we override this func...
(_forkserver._forkserver_alive_fd,
resource_tracker._resource_tracker._fd,
*_forkserver._inherited_fds) = fds
# Run process object received over pipe
# XXX this clearly changed
try:
# spawn._main() signature changed in 3.8
parent_sentinel = os.dup(child_r)
code = spawn._main(child_r, parent_sentinel)
except TypeError:
# pre 3.8 signature
code = spawn._main(child_r)
return code
def write_signed(fd, n):
msg = SIGNED_STRUCT.pack(n)
while msg:
nbytes = os.write(fd, msg)
if nbytes == 0:
raise RuntimeError('should not get here')
msg = msg[nbytes:]
class PatchedResourceTracker(_tracker_type): # type: ignore
"""Stop GD ensuring everything is running...
"""
def getfd(self):
# XXX: why must be constantly spawn trackers..
# self.ensure_running()
return self._fd
_resource_tracker = PatchedResourceTracker()
2018-08-03 02:32:48 +00:00
_forkserver = PatchedForkServer()
def override_stdlib():
"""Override the stdlib's ``multiprocessing.forkserver`` behaviour
such that our local "manually managed" version from above can be
used instead.
This allows avoiding spawning superfluous additional forkservers
and semaphore trackers for each actor nursery used to create new
sub-actors from sub-actors.
"""
try:
resource_tracker._semaphore_tracker = _resource_tracker
resource_tracker._resource_tracker = _resource_tracker
except AttributeError:
resource_tracker._resource_tracker = _resource_tracker
resource_tracker.ensure_running = _resource_tracker.ensure_running
resource_tracker.register = _resource_tracker.register
resource_tracker.unregister = _resource_tracker.unregister
resource_tracker.getfd = _resource_tracker.getfd
forkserver._forkserver = _forkserver
forkserver.ensure_running = _forkserver.ensure_running
forkserver.main = main
forkserver._serve_one = _serve_one
forkserver.get_inherited_fds = _forkserver.get_inherited_fds
forkserver.connect_to_new_process = _forkserver.connect_to_new_process
forkserver.set_forkserver_preload = _forkserver.set_forkserver_preload