Compare commits

..

No commits in common. "bce2de9e20c9b3063b9e474faf57586d98b65ee0" and "a4396e78ee202810f232c9768c2e02a953bef7f1" have entirely different histories.

7 changed files with 102 additions and 234 deletions

View File

@ -6,7 +6,6 @@ async def gen():
yield 'yo' yield 'yo'
await tractor.breakpoint() await tractor.breakpoint()
yield 'yo' yield 'yo'
await tractor.breakpoint()
@tractor.context @tractor.context
@ -14,35 +13,35 @@ async def just_bp(
ctx: tractor.Context, ctx: tractor.Context,
) -> None: ) -> None:
await ctx.started() await ctx.started('yo bpin here')
await tractor.breakpoint() await tractor.breakpoint()
# TODO: bps and errors in this call.. # async for val in gen():
async for val in gen(): # print(val)
print(val)
# await trio.sleep(0.5) await trio.sleep(0.5)
# prematurely destroy the connection # THIS CAUSES AN UNRECOVERABLE HANG!?
await ctx.chan.aclose()
# THIS CAUSES AN UNRECOVERABLE HANG
# without latest ``pdbpp``:
assert 0 assert 0
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
loglevel='transport',
debug_mode=True, debug_mode=True,
) as n: ) as n:
p = await n.start_actor( p = await n.start_actor(
'bp_boi', 'bp_boi',
enable_modules=[__name__], enable_modules=[__name__],
# debug_mode=True,
) )
async with p.open_context( async with p.open_context(
just_bp, just_bp,
) as (ctx, first): ) as (ctx, first):
# await tractor.breakpoint()
# breakpoint()
await trio.sleep_forever() await trio.sleep_forever()

View File

@ -1,49 +0,0 @@
import trio
import click
import tractor
import pydantic
# from multiprocessing import shared_memory
@tractor.context
async def just_sleep(
ctx: tractor.Context,
**kwargs,
) -> None:
'''
Test a small ping-pong 2-way streaming server.
'''
await ctx.started()
await trio.sleep_forever()
async def main() -> None:
proc = await trio.open_process( (
'python',
'-c',
'import trio; trio.run(trio.sleep_forever)',
))
await proc.wait()
# await trio.sleep_forever()
# async with tractor.open_nursery() as n:
# portal = await n.start_actor(
# 'rpc_server',
# enable_modules=[__name__],
# )
# async with portal.open_context(
# just_sleep, # taken from pytest parameterization
# ) as (ctx, sent):
# await trio.sleep_forever()
if __name__ == '__main__':
import time
# time.sleep(999)
trio.run(main)

View File

@ -26,11 +26,8 @@ import importlib
import importlib.util import importlib.util
import inspect import inspect
import uuid import uuid
from typing import ( import typing
Any, Optional, from typing import Any, Optional, Union
Union, TYPE_CHECKING,
Callable,
)
from types import ModuleType from types import ModuleType
import sys import sys
import os import os
@ -60,10 +57,6 @@ from . import _state
from . import _mp_fixup_main from . import _mp_fixup_main
if TYPE_CHECKING:
from ._supervise import ActorNursery
log = get_logger('tractor') log = get_logger('tractor')
@ -72,7 +65,7 @@ async def _invoke(
actor: 'Actor', actor: 'Actor',
cid: str, cid: str,
chan: Channel, chan: Channel,
func: Callable, func: typing.Callable,
kwargs: dict[str, Any], kwargs: dict[str, Any],
is_rpc: bool = True, is_rpc: bool = True,
@ -207,7 +200,7 @@ async def _invoke(
ctx = actor._contexts.pop((chan.uid, cid)) ctx = actor._contexts.pop((chan.uid, cid))
if ctx: if ctx:
log.runtime( log.runtime(
f'Context entrypoint {func} was terminated:\n{ctx}' f'Context entrypoint for {func} was terminated:\n{ctx}'
) )
assert cs assert cs
@ -323,9 +316,7 @@ async def try_ship_error_to_parent(
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
): ):
# in SC terms this is one of the worst things that can log.error(
# happen and creates the 2-general's dilemma.
log.critical(
f"Failed to ship error to parent " f"Failed to ship error to parent "
f"{channel.uid}, channel was closed" f"{channel.uid}, channel was closed"
) )
@ -433,7 +424,7 @@ class Actor:
# (chan, cid) -> (cancel_scope, func) # (chan, cid) -> (cancel_scope, func)
self._rpc_tasks: dict[ self._rpc_tasks: dict[
tuple[Channel, str], tuple[Channel, str],
tuple[trio.CancelScope, Callable, trio.Event] tuple[trio.CancelScope, typing.Callable, trio.Event]
] = {} ] = {}
# map {actor uids -> Context} # map {actor uids -> Context}
@ -522,7 +513,6 @@ class Actor:
self._no_more_peers = trio.Event() # unset self._no_more_peers = trio.Event() # unset
chan = Channel.from_stream(stream) chan = Channel.from_stream(stream)
uid: Optional[tuple[str, str]] = chan.uid
log.runtime(f"New connection to us {chan}") log.runtime(f"New connection to us {chan}")
# send/receive initial handshake response # send/receive initial handshake response
@ -570,51 +560,33 @@ class Actor:
# append new channel # append new channel
self._peers[uid].append(chan) self._peers[uid].append(chan)
local_nursery: Optional[ActorNursery] = None # noqa
disconnected: bool = False
# Begin channel management - respond to remote requests and # Begin channel management - respond to remote requests and
# process received reponses. # process received reponses.
try: try:
disconnected = await self._process_messages(chan) await self._process_messages(chan)
except ( except trio.Cancelled:
trio.Cancelled,
):
log.cancel(f"Msg loop was cancelled for {chan}") log.cancel(f"Msg loop was cancelled for {chan}")
raise raise
finally: finally:
local_nursery = self._actoruid2nursery.get(uid, local_nursery)
# This is set in ``Portal.cancel_actor()``. So if # This is set in ``Portal.cancel_actor()``. So if
# the peer was cancelled we try to wait for them # the peer was cancelled we try to wait for them
# to tear down their side of the connection before # to tear down their side of the connection before
# moving on with closing our own side. # moving on with closing our own side.
local_nursery = self._actoruid2nursery.get(chan.uid)
if ( if (
local_nursery local_nursery
): ):
if disconnected:
# if the transport died and this actor is still
# registered within a local nursery, we report that the
# IPC layer may have failed unexpectedly since it may be
# the cause of other downstream errors.
entry = local_nursery._children.get(uid)
if entry:
_, proc, _ = entry
log.error(f'Actor {uid}@{proc} IPC connection broke!?')
# if proc.poll() is not None:
# log.error('Actor {uid} proc died and IPC broke?')
log.cancel(f"Waiting on cancel request to peer {chan.uid}") log.cancel(f"Waiting on cancel request to peer {chan.uid}")
# XXX: this is a soft wait on the channel (and its # XXX: this is a soft wait on the channel (and its
# underlying transport protocol) to close from the # underlying transport protocol) to close from the remote
# remote peer side since we presume that any channel # peer side since we presume that any channel which
# which is mapped to a sub-actor (i.e. it's managed by # is mapped to a sub-actor (i.e. it's managed by
# one of our local nurseries) has a message is sent to # one of our local nurseries)
# the peer likely by this actor (which is now in # message is sent to the peer likely by this actor which is
# a cancelled condition) when the local runtime here is # now in a cancelled condition) when the local runtime here
# now cancelled while (presumably) in the middle of msg # is now cancelled while (presumably) in the middle of msg
# loop processing. # loop processing.
with trio.move_on_after(0.5) as cs: with trio.move_on_after(0.5) as cs:
cs.shield = True cs.shield = True
@ -637,8 +609,6 @@ class Actor:
await local_nursery.exited.wait() await local_nursery.exited.wait()
# if local_nursery._children
# ``Channel`` teardown and closure sequence # ``Channel`` teardown and closure sequence
# Drop ref to channel so it can be gc-ed and disconnected # Drop ref to channel so it can be gc-ed and disconnected
@ -648,7 +618,7 @@ class Actor:
if not chans: if not chans:
log.runtime(f"No more channels for {chan.uid}") log.runtime(f"No more channels for {chan.uid}")
self._peers.pop(uid, None) self._peers.pop(chan.uid, None)
# for (uid, cid) in self._contexts.copy(): # for (uid, cid) in self._contexts.copy():
# if chan.uid == uid: # if chan.uid == uid:
@ -656,13 +626,11 @@ class Actor:
log.runtime(f"Peers is {self._peers}") log.runtime(f"Peers is {self._peers}")
# No more channels to other actors (at all) registered if not self._peers: # no more channels connected
# as connected.
if not self._peers:
log.runtime("Signalling no more peer channels") log.runtime("Signalling no more peer channels")
self._no_more_peers.set() self._no_more_peers.set()
# XXX: is this necessary (GC should do it)? # # XXX: is this necessary (GC should do it?)
if chan.connected(): if chan.connected():
# if the channel is still connected it may mean the far # if the channel is still connected it may mean the far
# end has not closed and we may have gotten here due to # end has not closed and we may have gotten here due to
@ -697,7 +665,7 @@ class Actor:
ctx = self._contexts[(uid, cid)] ctx = self._contexts[(uid, cid)]
except KeyError: except KeyError:
log.warning( log.warning(
f'Ignoring msg from [no-longer/un]known context {uid}:' f'Ignoring msg from [no-longer/un]known context with {uid}:'
f'\n{msg}') f'\n{msg}')
return return
@ -845,7 +813,7 @@ class Actor:
shield: bool = False, shield: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> bool: ) -> None:
''' '''
Process messages for the channel async-RPC style. Process messages for the channel async-RPC style.
@ -871,7 +839,7 @@ class Actor:
if msg is None: # loop terminate sentinel if msg is None: # loop terminate sentinel
log.cancel( log.cancel(
f"Channel to {chan.uid} terminated?\n" f"Channerl to {chan.uid} terminated?\n"
"Cancelling all associated tasks..") "Cancelling all associated tasks..")
for (channel, cid) in self._rpc_tasks.copy(): for (channel, cid) in self._rpc_tasks.copy():
@ -1018,9 +986,6 @@ class Actor:
# up. # up.
log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}') log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}')
# transport **was** disconnected
return True
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
if nursery_cancelled_before_task: if nursery_cancelled_before_task:
sn = self._service_n sn = self._service_n
@ -1045,9 +1010,6 @@ class Actor:
f"Exiting msg loop for {chan} from {chan.uid} " f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}") f"with last msg:\n{msg}")
# transport **was not** disconnected
return False
async def _from_parent( async def _from_parent(
self, self,
parent_addr: Optional[tuple[str, int]], parent_addr: Optional[tuple[str, int]],

View File

@ -49,8 +49,7 @@ try:
except ImportError: except ImportError:
# pdbpp is installed in regular mode...it monkey patches stuff # pdbpp is installed in regular mode...it monkey patches stuff
import pdb import pdb
xpm = getattr(pdb, 'xpm', None) assert pdb.xpm, "pdbpp is not installed?" # type: ignore
assert xpm, "pdbpp is not installed?" # type: ignore
pdbpp = pdb pdbpp = pdb
log = get_logger(__name__) log = get_logger(__name__)
@ -260,11 +259,16 @@ async def _hijack_stdin_for_child(
orig_handler = signal.signal( orig_handler = signal.signal(
signal.SIGINT, signal.SIGINT,
shield_sigint, shield_sigint,
# partial(shield_sigint, pdb=pdb),
) )
# try:
# yield
try: try:
with ( with (
trio.CancelScope(shield=True), trio.CancelScope(shield=True),
# disable_sigint(),
): ):
try: try:
lock = None lock = None
async with _acquire_debug_lock(subactor_uid) as lock: async with _acquire_debug_lock(subactor_uid) as lock:
@ -580,6 +584,10 @@ def shield_sigint(
''' '''
__tracebackhide__ = True __tracebackhide__ = True
frame = sys._getframe()
last_f = frame.f_back
last_f.f_globals['__tracebackhide__'] = True
global _local_task_in_debug, _global_actor_in_debug global _local_task_in_debug, _global_actor_in_debug
in_debug = _global_actor_in_debug in_debug = _global_actor_in_debug
@ -596,7 +604,6 @@ def shield_sigint(
log.pdb( log.pdb(
f"Ignoring SIGINT while child in debug mode: `{in_debug}`" f"Ignoring SIGINT while child in debug mode: `{in_debug}`"
) )
else: else:
log.pdb( log.pdb(
"Ignoring SIGINT while in debug mode" "Ignoring SIGINT while in debug mode"

View File

@ -24,8 +24,7 @@ import importlib
import inspect import inspect
from typing import ( from typing import (
Any, Optional, Any, Optional,
Callable, AsyncGenerator, Callable, AsyncGenerator
Type,
) )
from functools import partial from functools import partial
from dataclasses import dataclass from dataclasses import dataclass
@ -443,10 +442,6 @@ class Portal:
_err: Optional[BaseException] = None _err: Optional[BaseException] = None
ctx._portal = self ctx._portal = self
uid = self.channel.uid
cid = ctx.cid
etype: Optional[Type[BaseException]] = None
# deliver context instance and .started() msg value in open tuple. # deliver context instance and .started() msg value in open tuple.
try: try:
async with trio.open_nursery() as scope_nursery: async with trio.open_nursery() as scope_nursery:
@ -482,24 +477,13 @@ class Portal:
# KeyboardInterrupt, # KeyboardInterrupt,
) as err: ) as err:
etype = type(err) _err = err
# the context cancels itself on any cancel # the context cancels itself on any cancel
# causing error. # causing error.
if ctx.chan.connected():
log.cancel( log.cancel(
'Context cancelled for task, sending cancel request..\n' f'Context to {self.channel.uid} sending cancel request..')
f'task:{cid}\n'
f'actor:{uid}'
)
await ctx.cancel()
else:
log.warning(
'IPC connection for context is broken?\n'
f'task:{cid}\n'
f'actor:{uid}'
)
await ctx.cancel()
raise raise
finally: finally:
@ -508,12 +492,6 @@ class Portal:
# sure we get the error the underlying feeder mem chan. # sure we get the error the underlying feeder mem chan.
# if it's not raised here it *should* be raised from the # if it's not raised here it *should* be raised from the
# msg loop nursery right? # msg loop nursery right?
if ctx.chan.connected():
log.info(
'Waiting on final context-task result for\n'
f'task:{cid}\n'
f'actor:{uid}'
)
result = await ctx.result() result = await ctx.result()
# though it should be impossible for any tasks # though it should be impossible for any tasks
@ -524,17 +502,14 @@ class Portal:
# should we encapsulate this in the context api? # should we encapsulate this in the context api?
await ctx._recv_chan.aclose() await ctx._recv_chan.aclose()
if etype: if _err:
if ctx._cancel_called: if ctx._cancel_called:
log.cancel( log.cancel(
f'Context {fn_name} cancelled by caller with\n{etype}' f'Context {fn_name} cancelled by caller with\n{_err}'
) )
elif _err is not None: elif _err is not None:
log.cancel( log.cancel(
f'Context for task cancelled by callee with {etype}\n' f'Context {fn_name} cancelled by callee with\n{_err}'
f'target: `{fn_name}`\n'
f'task:{cid}\n'
f'actor:{uid}'
) )
else: else:
log.runtime( log.runtime(

View File

@ -18,30 +18,17 @@
Machinery for actor process spawning using multiple backends. Machinery for actor process spawning using multiple backends.
""" """
from __future__ import annotations
import sys import sys
import multiprocessing as mp
import platform import platform
from typing import ( from typing import (
Any, Dict, Optional, Callable, Any, Optional, Callable, TypeVar, TYPE_CHECKING
TypeVar,
) )
from collections.abc import Awaitable from collections.abc import Awaitable
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
try:
from multiprocessing import semaphore_tracker # type: ignore
resource_tracker = semaphore_tracker
resource_tracker._resource_tracker = resource_tracker._semaphore_tracker
except ImportError:
# 3.8 introduces a more general version that also tracks shared mems
from multiprocessing import resource_tracker # type: ignore
from multiprocessing import forkserver # type: ignore
from typing import Tuple
from . import _forkserver_override
from ._debug import ( from ._debug import (
maybe_wait_for_debugger, maybe_wait_for_debugger,
acquire_debug_lock, acquire_debug_lock,
@ -60,9 +47,12 @@ from ._entry import _mp_main
from ._exceptions import ActorFailure from ._exceptions import ActorFailure
log = get_logger('tractor') if TYPE_CHECKING:
import multiprocessing as mp
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process) ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
log = get_logger('tractor')
# placeholder for an mp start context if so using that backend # placeholder for an mp start context if so using that backend
_ctx: Optional[mp.context.BaseContext] = None _ctx: Optional[mp.context.BaseContext] = None
_spawn_method: str = "trio" _spawn_method: str = "trio"
@ -70,6 +60,7 @@ _spawn_method: str = "trio"
if platform.system() == 'Windows': if platform.system() == 'Windows':
import multiprocessing as mp
_ctx = mp.get_context("spawn") _ctx = mp.get_context("spawn")
async def proc_waiter(proc: mp.Process) -> None: async def proc_waiter(proc: mp.Process) -> None:
@ -92,6 +83,7 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
``subprocess.Popen``. ``subprocess.Popen``.
''' '''
import multiprocessing as mp
global _ctx global _ctx
global _spawn_method global _spawn_method
@ -108,6 +100,7 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
f"Spawn method `{name}` is invalid please choose one of {methods}" f"Spawn method `{name}` is invalid please choose one of {methods}"
) )
elif name == 'forkserver': elif name == 'forkserver':
from . import _forkserver_override
_forkserver_override.override_stdlib() _forkserver_override.override_stdlib()
_ctx = mp.get_context(name) _ctx = mp.get_context(name)
elif name == 'trio': elif name == 'trio':
@ -155,7 +148,7 @@ async def cancel_on_completion(
portal: Portal, portal: Portal,
actor: Actor, actor: Actor,
errors: Dict[Tuple[str, str], Exception], errors: dict[tuple[str, str], Exception],
) -> None: ) -> None:
''' '''
@ -258,12 +251,12 @@ async def new_proc(
name: str, name: str,
actor_nursery: 'ActorNursery', # type: ignore # noqa actor_nursery: 'ActorNursery', # type: ignore # noqa
subactor: Actor, subactor: Actor,
errors: Dict[Tuple[str, str], Exception], errors: dict[tuple[str, str], Exception],
# passed through to actor main # passed through to actor main
bind_addr: Tuple[str, int], bind_addr: tuple[str, int],
parent_addr: Tuple[str, int], parent_addr: tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child _runtime_vars: dict[str, Any], # serialized and sent to _child
*, *,
@ -295,7 +288,7 @@ async def new_proc(
# the OS; it otherwise can be passed via the parent channel if # the OS; it otherwise can be passed via the parent channel if
# we prefer in the future (for privacy). # we prefer in the future (for privacy).
"--uid", "--uid",
str(uid), str(subactor.uid),
# Address the child must connect to on startup # Address the child must connect to on startup
"--parent_addr", "--parent_addr",
str(parent_addr) str(parent_addr)
@ -321,7 +314,8 @@ async def new_proc(
# wait for actor to spawn and connect back to us # wait for actor to spawn and connect back to us
# channel should have handshake completed by the # channel should have handshake completed by the
# local actor by the time we get a ref to it # local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer(uid) event, chan = await actor_nursery._actor.wait_for_peer(
subactor.uid)
except trio.Cancelled: except trio.Cancelled:
cancelled_during_spawn = True cancelled_during_spawn = True
@ -362,13 +356,18 @@ async def new_proc(
task_status.started(portal) task_status.started(portal)
# wait for ActorNursery.wait() to be called # wait for ActorNursery.wait() to be called
n_exited = actor_nursery._join_procs
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await n_exited.wait() await actor_nursery._join_procs.wait()
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
if portal in actor_nursery._cancel_after_result_on_exit:
nursery.start_soon(
cancel_on_completion,
portal,
subactor,
errors
)
async def soft_wait_and_maybe_cancel_ria_task():
# This is a "soft" (cancellable) join/reap which # This is a "soft" (cancellable) join/reap which
# will remote cancel the actor on a ``trio.Cancelled`` # will remote cancel the actor on a ``trio.Cancelled``
# condition. # condition.
@ -378,46 +377,13 @@ async def new_proc(
portal portal
) )
if n_exited.is_set():
# cancel result waiter that may have been spawned in # cancel result waiter that may have been spawned in
# tandem if not done already # tandem if not done already
log.warning( log.warning(
"Cancelling existing result waiter task for " "Cancelling existing result waiter task for "
f"{subactor.uid}" f"{subactor.uid}")
)
nursery.cancel_scope.cancel() nursery.cancel_scope.cancel()
else:
log.warning(
f'Process for actor {uid} terminated before'
'nursery exit. ' 'This may mean an IPC'
'connection failed!'
)
nursery.start_soon(soft_wait_and_maybe_cancel_ria_task)
# TODO: when we finally remove the `.run_in_actor()` api
# we should be able to entirely drop these 2 blocking calls:
# - we don't need to wait on nursery exit to capture
# process-spawn-machinery level errors (and propagate them).
# - we don't need to wait on final results from ria portals
# since this will be done in some higher level wrapper API.
# XXX: interestingly we can't put this here bc it causes
# the pub-sub tests to fail? wth.. should probably drop
# those XD
# wait for ActorNursery.wait() to be called
# with trio.CancelScope(shield=True):
# await n_exited.wait()
if portal in actor_nursery._cancel_after_result_on_exit:
nursery.start_soon(
cancel_on_completion,
portal,
subactor,
errors
)
finally: finally:
# The "hard" reap since no actor zombies are allowed! # The "hard" reap since no actor zombies are allowed!
# XXX: do this **after** cancellation/tearfown to avoid # XXX: do this **after** cancellation/tearfown to avoid
@ -434,10 +400,9 @@ async def new_proc(
await proc.wait() await proc.wait()
if is_root_process(): if is_root_process():
await maybe_wait_for_debugger( await maybe_wait_for_debugger(
child_in_debug=_runtime_vars.get( child_in_debug=_runtime_vars.get(
'_debug_mode', False) '_debug_mode', False),
) )
if proc.poll() is None: if proc.poll() is None:
@ -476,20 +441,30 @@ async def mp_new_proc(
name: str, name: str,
actor_nursery: 'ActorNursery', # type: ignore # noqa actor_nursery: 'ActorNursery', # type: ignore # noqa
subactor: Actor, subactor: Actor,
errors: Dict[Tuple[str, str], Exception], errors: dict[tuple[str, str], Exception],
# passed through to actor main # passed through to actor main
bind_addr: Tuple[str, int], bind_addr: tuple[str, int],
parent_addr: Tuple[str, int], parent_addr: tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child _runtime_vars: dict[str, Any], # serialized and sent to _child
*, *,
infect_asyncio: bool = False, infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None: ) -> None:
# uggh zone
try:
from multiprocessing import semaphore_tracker # type: ignore
resource_tracker = semaphore_tracker
resource_tracker._resource_tracker = resource_tracker._semaphore_tracker # noqa
except ImportError:
# 3.8 introduces a more general version that also tracks shared mems
from multiprocessing import resource_tracker # type: ignore
assert _ctx assert _ctx
start_method = _ctx.get_start_method() start_method = _ctx.get_start_method()
if start_method == 'forkserver': if start_method == 'forkserver':
from multiprocessing import forkserver # type: ignore
# XXX do our hackery on the stdlib to avoid multiple # XXX do our hackery on the stdlib to avoid multiple
# forkservers (one at each subproc layer). # forkservers (one at each subproc layer).
fs = forkserver._forkserver fs = forkserver._forkserver

View File

@ -604,8 +604,7 @@ class Context:
self._portal._streams.remove(rchan) self._portal._streams.remove(rchan)
async def result(self) -> Any: async def result(self) -> Any:
''' '''From a caller side, wait for and return the final result from
From a caller side, wait for and return the final result from
the callee side task. the callee side task.
''' '''