Compare commits
10 Commits
a4396e78ee
...
bce2de9e20
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | bce2de9e20 | |
Tyler Goodlet | 16e4da6958 | |
Tyler Goodlet | c79363f4a1 | |
Tyler Goodlet | 013e766a16 | |
Tyler Goodlet | 61cc393a1a | |
Tyler Goodlet | ee3dc5123f | |
Tyler Goodlet | f8494de478 | |
Tyler Goodlet | fc90e1f171 | |
Tyler Goodlet | 02210d8f8e | |
Tyler Goodlet | 194c0e120d |
|
@ -6,6 +6,7 @@ async def gen():
|
||||||
yield 'yo'
|
yield 'yo'
|
||||||
await tractor.breakpoint()
|
await tractor.breakpoint()
|
||||||
yield 'yo'
|
yield 'yo'
|
||||||
|
await tractor.breakpoint()
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
@ -13,35 +14,35 @@ async def just_bp(
|
||||||
ctx: tractor.Context,
|
ctx: tractor.Context,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
await ctx.started('yo bpin here')
|
await ctx.started()
|
||||||
await tractor.breakpoint()
|
await tractor.breakpoint()
|
||||||
|
|
||||||
# async for val in gen():
|
# TODO: bps and errors in this call..
|
||||||
# print(val)
|
async for val in gen():
|
||||||
|
print(val)
|
||||||
|
|
||||||
await trio.sleep(0.5)
|
# await trio.sleep(0.5)
|
||||||
|
|
||||||
# THIS CAUSES AN UNRECOVERABLE HANG!?
|
# prematurely destroy the connection
|
||||||
|
await ctx.chan.aclose()
|
||||||
|
|
||||||
|
# THIS CAUSES AN UNRECOVERABLE HANG
|
||||||
|
# without latest ``pdbpp``:
|
||||||
assert 0
|
assert 0
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
loglevel='transport',
|
|
||||||
debug_mode=True,
|
debug_mode=True,
|
||||||
) as n:
|
) as n:
|
||||||
p = await n.start_actor(
|
p = await n.start_actor(
|
||||||
'bp_boi',
|
'bp_boi',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
# debug_mode=True,
|
|
||||||
)
|
)
|
||||||
async with p.open_context(
|
async with p.open_context(
|
||||||
just_bp,
|
just_bp,
|
||||||
) as (ctx, first):
|
) as (ctx, first):
|
||||||
|
|
||||||
# await tractor.breakpoint()
|
|
||||||
# breakpoint()
|
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
import trio
|
||||||
|
import click
|
||||||
|
import tractor
|
||||||
|
import pydantic
|
||||||
|
# from multiprocessing import shared_memory
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def just_sleep(
|
||||||
|
|
||||||
|
ctx: tractor.Context,
|
||||||
|
**kwargs,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Test a small ping-pong 2-way streaming server.
|
||||||
|
|
||||||
|
'''
|
||||||
|
await ctx.started()
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
async def main() -> None:
|
||||||
|
|
||||||
|
proc = await trio.open_process( (
|
||||||
|
'python',
|
||||||
|
'-c',
|
||||||
|
'import trio; trio.run(trio.sleep_forever)',
|
||||||
|
))
|
||||||
|
await proc.wait()
|
||||||
|
# await trio.sleep_forever()
|
||||||
|
# async with tractor.open_nursery() as n:
|
||||||
|
|
||||||
|
# portal = await n.start_actor(
|
||||||
|
# 'rpc_server',
|
||||||
|
# enable_modules=[__name__],
|
||||||
|
# )
|
||||||
|
|
||||||
|
# async with portal.open_context(
|
||||||
|
# just_sleep, # taken from pytest parameterization
|
||||||
|
# ) as (ctx, sent):
|
||||||
|
# await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
import time
|
||||||
|
# time.sleep(999)
|
||||||
|
trio.run(main)
|
|
@ -26,8 +26,11 @@ import importlib
|
||||||
import importlib.util
|
import importlib.util
|
||||||
import inspect
|
import inspect
|
||||||
import uuid
|
import uuid
|
||||||
import typing
|
from typing import (
|
||||||
from typing import Any, Optional, Union
|
Any, Optional,
|
||||||
|
Union, TYPE_CHECKING,
|
||||||
|
Callable,
|
||||||
|
)
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
|
@ -57,6 +60,10 @@ from . import _state
|
||||||
from . import _mp_fixup_main
|
from . import _mp_fixup_main
|
||||||
|
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from ._supervise import ActorNursery
|
||||||
|
|
||||||
|
|
||||||
log = get_logger('tractor')
|
log = get_logger('tractor')
|
||||||
|
|
||||||
|
|
||||||
|
@ -65,7 +72,7 @@ async def _invoke(
|
||||||
actor: 'Actor',
|
actor: 'Actor',
|
||||||
cid: str,
|
cid: str,
|
||||||
chan: Channel,
|
chan: Channel,
|
||||||
func: typing.Callable,
|
func: Callable,
|
||||||
kwargs: dict[str, Any],
|
kwargs: dict[str, Any],
|
||||||
|
|
||||||
is_rpc: bool = True,
|
is_rpc: bool = True,
|
||||||
|
@ -200,7 +207,7 @@ async def _invoke(
|
||||||
ctx = actor._contexts.pop((chan.uid, cid))
|
ctx = actor._contexts.pop((chan.uid, cid))
|
||||||
if ctx:
|
if ctx:
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'Context entrypoint for {func} was terminated:\n{ctx}'
|
f'Context entrypoint {func} was terminated:\n{ctx}'
|
||||||
)
|
)
|
||||||
|
|
||||||
assert cs
|
assert cs
|
||||||
|
@ -316,7 +323,9 @@ async def try_ship_error_to_parent(
|
||||||
trio.ClosedResourceError,
|
trio.ClosedResourceError,
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
):
|
):
|
||||||
log.error(
|
# in SC terms this is one of the worst things that can
|
||||||
|
# happen and creates the 2-general's dilemma.
|
||||||
|
log.critical(
|
||||||
f"Failed to ship error to parent "
|
f"Failed to ship error to parent "
|
||||||
f"{channel.uid}, channel was closed"
|
f"{channel.uid}, channel was closed"
|
||||||
)
|
)
|
||||||
|
@ -424,7 +433,7 @@ class Actor:
|
||||||
# (chan, cid) -> (cancel_scope, func)
|
# (chan, cid) -> (cancel_scope, func)
|
||||||
self._rpc_tasks: dict[
|
self._rpc_tasks: dict[
|
||||||
tuple[Channel, str],
|
tuple[Channel, str],
|
||||||
tuple[trio.CancelScope, typing.Callable, trio.Event]
|
tuple[trio.CancelScope, Callable, trio.Event]
|
||||||
] = {}
|
] = {}
|
||||||
|
|
||||||
# map {actor uids -> Context}
|
# map {actor uids -> Context}
|
||||||
|
@ -513,6 +522,7 @@ class Actor:
|
||||||
self._no_more_peers = trio.Event() # unset
|
self._no_more_peers = trio.Event() # unset
|
||||||
|
|
||||||
chan = Channel.from_stream(stream)
|
chan = Channel.from_stream(stream)
|
||||||
|
uid: Optional[tuple[str, str]] = chan.uid
|
||||||
log.runtime(f"New connection to us {chan}")
|
log.runtime(f"New connection to us {chan}")
|
||||||
|
|
||||||
# send/receive initial handshake response
|
# send/receive initial handshake response
|
||||||
|
@ -560,33 +570,51 @@ class Actor:
|
||||||
# append new channel
|
# append new channel
|
||||||
self._peers[uid].append(chan)
|
self._peers[uid].append(chan)
|
||||||
|
|
||||||
|
local_nursery: Optional[ActorNursery] = None # noqa
|
||||||
|
disconnected: bool = False
|
||||||
|
|
||||||
# Begin channel management - respond to remote requests and
|
# Begin channel management - respond to remote requests and
|
||||||
# process received reponses.
|
# process received reponses.
|
||||||
try:
|
try:
|
||||||
await self._process_messages(chan)
|
disconnected = await self._process_messages(chan)
|
||||||
|
|
||||||
except trio.Cancelled:
|
except (
|
||||||
|
trio.Cancelled,
|
||||||
|
):
|
||||||
log.cancel(f"Msg loop was cancelled for {chan}")
|
log.cancel(f"Msg loop was cancelled for {chan}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
local_nursery = self._actoruid2nursery.get(uid, local_nursery)
|
||||||
|
|
||||||
# This is set in ``Portal.cancel_actor()``. So if
|
# This is set in ``Portal.cancel_actor()``. So if
|
||||||
# the peer was cancelled we try to wait for them
|
# the peer was cancelled we try to wait for them
|
||||||
# to tear down their side of the connection before
|
# to tear down their side of the connection before
|
||||||
# moving on with closing our own side.
|
# moving on with closing our own side.
|
||||||
local_nursery = self._actoruid2nursery.get(chan.uid)
|
|
||||||
if (
|
if (
|
||||||
local_nursery
|
local_nursery
|
||||||
):
|
):
|
||||||
|
if disconnected:
|
||||||
|
# if the transport died and this actor is still
|
||||||
|
# registered within a local nursery, we report that the
|
||||||
|
# IPC layer may have failed unexpectedly since it may be
|
||||||
|
# the cause of other downstream errors.
|
||||||
|
entry = local_nursery._children.get(uid)
|
||||||
|
if entry:
|
||||||
|
_, proc, _ = entry
|
||||||
|
log.error(f'Actor {uid}@{proc} IPC connection broke!?')
|
||||||
|
# if proc.poll() is not None:
|
||||||
|
# log.error('Actor {uid} proc died and IPC broke?')
|
||||||
|
|
||||||
log.cancel(f"Waiting on cancel request to peer {chan.uid}")
|
log.cancel(f"Waiting on cancel request to peer {chan.uid}")
|
||||||
# XXX: this is a soft wait on the channel (and its
|
# XXX: this is a soft wait on the channel (and its
|
||||||
# underlying transport protocol) to close from the remote
|
# underlying transport protocol) to close from the
|
||||||
# peer side since we presume that any channel which
|
# remote peer side since we presume that any channel
|
||||||
# is mapped to a sub-actor (i.e. it's managed by
|
# which is mapped to a sub-actor (i.e. it's managed by
|
||||||
# one of our local nurseries)
|
# one of our local nurseries) has a message is sent to
|
||||||
# message is sent to the peer likely by this actor which is
|
# the peer likely by this actor (which is now in
|
||||||
# now in a cancelled condition) when the local runtime here
|
# a cancelled condition) when the local runtime here is
|
||||||
# is now cancelled while (presumably) in the middle of msg
|
# now cancelled while (presumably) in the middle of msg
|
||||||
# loop processing.
|
# loop processing.
|
||||||
with trio.move_on_after(0.5) as cs:
|
with trio.move_on_after(0.5) as cs:
|
||||||
cs.shield = True
|
cs.shield = True
|
||||||
|
@ -609,6 +637,8 @@ class Actor:
|
||||||
|
|
||||||
await local_nursery.exited.wait()
|
await local_nursery.exited.wait()
|
||||||
|
|
||||||
|
# if local_nursery._children
|
||||||
|
|
||||||
# ``Channel`` teardown and closure sequence
|
# ``Channel`` teardown and closure sequence
|
||||||
|
|
||||||
# Drop ref to channel so it can be gc-ed and disconnected
|
# Drop ref to channel so it can be gc-ed and disconnected
|
||||||
|
@ -618,7 +648,7 @@ class Actor:
|
||||||
|
|
||||||
if not chans:
|
if not chans:
|
||||||
log.runtime(f"No more channels for {chan.uid}")
|
log.runtime(f"No more channels for {chan.uid}")
|
||||||
self._peers.pop(chan.uid, None)
|
self._peers.pop(uid, None)
|
||||||
|
|
||||||
# for (uid, cid) in self._contexts.copy():
|
# for (uid, cid) in self._contexts.copy():
|
||||||
# if chan.uid == uid:
|
# if chan.uid == uid:
|
||||||
|
@ -626,11 +656,13 @@ class Actor:
|
||||||
|
|
||||||
log.runtime(f"Peers is {self._peers}")
|
log.runtime(f"Peers is {self._peers}")
|
||||||
|
|
||||||
if not self._peers: # no more channels connected
|
# No more channels to other actors (at all) registered
|
||||||
|
# as connected.
|
||||||
|
if not self._peers:
|
||||||
log.runtime("Signalling no more peer channels")
|
log.runtime("Signalling no more peer channels")
|
||||||
self._no_more_peers.set()
|
self._no_more_peers.set()
|
||||||
|
|
||||||
# # XXX: is this necessary (GC should do it?)
|
# XXX: is this necessary (GC should do it)?
|
||||||
if chan.connected():
|
if chan.connected():
|
||||||
# if the channel is still connected it may mean the far
|
# if the channel is still connected it may mean the far
|
||||||
# end has not closed and we may have gotten here due to
|
# end has not closed and we may have gotten here due to
|
||||||
|
@ -665,8 +697,8 @@ class Actor:
|
||||||
ctx = self._contexts[(uid, cid)]
|
ctx = self._contexts[(uid, cid)]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Ignoring msg from [no-longer/un]known context with {uid}:'
|
f'Ignoring msg from [no-longer/un]known context {uid}:'
|
||||||
f'\n{msg}')
|
f'\n{msg}')
|
||||||
return
|
return
|
||||||
|
|
||||||
send_chan = ctx._send_chan
|
send_chan = ctx._send_chan
|
||||||
|
@ -813,7 +845,7 @@ class Actor:
|
||||||
shield: bool = False,
|
shield: bool = False,
|
||||||
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> None:
|
) -> bool:
|
||||||
'''
|
'''
|
||||||
Process messages for the channel async-RPC style.
|
Process messages for the channel async-RPC style.
|
||||||
|
|
||||||
|
@ -839,7 +871,7 @@ class Actor:
|
||||||
if msg is None: # loop terminate sentinel
|
if msg is None: # loop terminate sentinel
|
||||||
|
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f"Channerl to {chan.uid} terminated?\n"
|
f"Channel to {chan.uid} terminated?\n"
|
||||||
"Cancelling all associated tasks..")
|
"Cancelling all associated tasks..")
|
||||||
|
|
||||||
for (channel, cid) in self._rpc_tasks.copy():
|
for (channel, cid) in self._rpc_tasks.copy():
|
||||||
|
@ -986,6 +1018,9 @@ class Actor:
|
||||||
# up.
|
# up.
|
||||||
log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}')
|
log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}')
|
||||||
|
|
||||||
|
# transport **was** disconnected
|
||||||
|
return True
|
||||||
|
|
||||||
except (Exception, trio.MultiError) as err:
|
except (Exception, trio.MultiError) as err:
|
||||||
if nursery_cancelled_before_task:
|
if nursery_cancelled_before_task:
|
||||||
sn = self._service_n
|
sn = self._service_n
|
||||||
|
@ -1010,6 +1045,9 @@ class Actor:
|
||||||
f"Exiting msg loop for {chan} from {chan.uid} "
|
f"Exiting msg loop for {chan} from {chan.uid} "
|
||||||
f"with last msg:\n{msg}")
|
f"with last msg:\n{msg}")
|
||||||
|
|
||||||
|
# transport **was not** disconnected
|
||||||
|
return False
|
||||||
|
|
||||||
async def _from_parent(
|
async def _from_parent(
|
||||||
self,
|
self,
|
||||||
parent_addr: Optional[tuple[str, int]],
|
parent_addr: Optional[tuple[str, int]],
|
||||||
|
|
|
@ -49,7 +49,8 @@ try:
|
||||||
except ImportError:
|
except ImportError:
|
||||||
# pdbpp is installed in regular mode...it monkey patches stuff
|
# pdbpp is installed in regular mode...it monkey patches stuff
|
||||||
import pdb
|
import pdb
|
||||||
assert pdb.xpm, "pdbpp is not installed?" # type: ignore
|
xpm = getattr(pdb, 'xpm', None)
|
||||||
|
assert xpm, "pdbpp is not installed?" # type: ignore
|
||||||
pdbpp = pdb
|
pdbpp = pdb
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
@ -259,16 +260,11 @@ async def _hijack_stdin_for_child(
|
||||||
orig_handler = signal.signal(
|
orig_handler = signal.signal(
|
||||||
signal.SIGINT,
|
signal.SIGINT,
|
||||||
shield_sigint,
|
shield_sigint,
|
||||||
# partial(shield_sigint, pdb=pdb),
|
|
||||||
)
|
)
|
||||||
# try:
|
|
||||||
# yield
|
|
||||||
try:
|
try:
|
||||||
with (
|
with (
|
||||||
trio.CancelScope(shield=True),
|
trio.CancelScope(shield=True),
|
||||||
# disable_sigint(),
|
|
||||||
):
|
):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
lock = None
|
lock = None
|
||||||
async with _acquire_debug_lock(subactor_uid) as lock:
|
async with _acquire_debug_lock(subactor_uid) as lock:
|
||||||
|
@ -584,10 +580,6 @@ def shield_sigint(
|
||||||
'''
|
'''
|
||||||
__tracebackhide__ = True
|
__tracebackhide__ = True
|
||||||
|
|
||||||
frame = sys._getframe()
|
|
||||||
last_f = frame.f_back
|
|
||||||
last_f.f_globals['__tracebackhide__'] = True
|
|
||||||
|
|
||||||
global _local_task_in_debug, _global_actor_in_debug
|
global _local_task_in_debug, _global_actor_in_debug
|
||||||
in_debug = _global_actor_in_debug
|
in_debug = _global_actor_in_debug
|
||||||
|
|
||||||
|
@ -604,6 +596,7 @@ def shield_sigint(
|
||||||
log.pdb(
|
log.pdb(
|
||||||
f"Ignoring SIGINT while child in debug mode: `{in_debug}`"
|
f"Ignoring SIGINT while child in debug mode: `{in_debug}`"
|
||||||
)
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
log.pdb(
|
log.pdb(
|
||||||
"Ignoring SIGINT while in debug mode"
|
"Ignoring SIGINT while in debug mode"
|
||||||
|
|
|
@ -24,7 +24,8 @@ import importlib
|
||||||
import inspect
|
import inspect
|
||||||
from typing import (
|
from typing import (
|
||||||
Any, Optional,
|
Any, Optional,
|
||||||
Callable, AsyncGenerator
|
Callable, AsyncGenerator,
|
||||||
|
Type,
|
||||||
)
|
)
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
@ -442,6 +443,10 @@ class Portal:
|
||||||
_err: Optional[BaseException] = None
|
_err: Optional[BaseException] = None
|
||||||
ctx._portal = self
|
ctx._portal = self
|
||||||
|
|
||||||
|
uid = self.channel.uid
|
||||||
|
cid = ctx.cid
|
||||||
|
etype: Optional[Type[BaseException]] = None
|
||||||
|
|
||||||
# deliver context instance and .started() msg value in open tuple.
|
# deliver context instance and .started() msg value in open tuple.
|
||||||
try:
|
try:
|
||||||
async with trio.open_nursery() as scope_nursery:
|
async with trio.open_nursery() as scope_nursery:
|
||||||
|
@ -477,13 +482,24 @@ class Portal:
|
||||||
# KeyboardInterrupt,
|
# KeyboardInterrupt,
|
||||||
|
|
||||||
) as err:
|
) as err:
|
||||||
_err = err
|
etype = type(err)
|
||||||
# the context cancels itself on any cancel
|
# the context cancels itself on any cancel
|
||||||
# causing error.
|
# causing error.
|
||||||
log.cancel(
|
|
||||||
f'Context to {self.channel.uid} sending cancel request..')
|
|
||||||
|
|
||||||
await ctx.cancel()
|
if ctx.chan.connected():
|
||||||
|
log.cancel(
|
||||||
|
'Context cancelled for task, sending cancel request..\n'
|
||||||
|
f'task:{cid}\n'
|
||||||
|
f'actor:{uid}'
|
||||||
|
)
|
||||||
|
await ctx.cancel()
|
||||||
|
else:
|
||||||
|
log.warning(
|
||||||
|
'IPC connection for context is broken?\n'
|
||||||
|
f'task:{cid}\n'
|
||||||
|
f'actor:{uid}'
|
||||||
|
)
|
||||||
|
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
@ -492,7 +508,13 @@ class Portal:
|
||||||
# sure we get the error the underlying feeder mem chan.
|
# sure we get the error the underlying feeder mem chan.
|
||||||
# if it's not raised here it *should* be raised from the
|
# if it's not raised here it *should* be raised from the
|
||||||
# msg loop nursery right?
|
# msg loop nursery right?
|
||||||
result = await ctx.result()
|
if ctx.chan.connected():
|
||||||
|
log.info(
|
||||||
|
'Waiting on final context-task result for\n'
|
||||||
|
f'task:{cid}\n'
|
||||||
|
f'actor:{uid}'
|
||||||
|
)
|
||||||
|
result = await ctx.result()
|
||||||
|
|
||||||
# though it should be impossible for any tasks
|
# though it should be impossible for any tasks
|
||||||
# operating *in* this scope to have survived
|
# operating *in* this scope to have survived
|
||||||
|
@ -502,14 +524,17 @@ class Portal:
|
||||||
# should we encapsulate this in the context api?
|
# should we encapsulate this in the context api?
|
||||||
await ctx._recv_chan.aclose()
|
await ctx._recv_chan.aclose()
|
||||||
|
|
||||||
if _err:
|
if etype:
|
||||||
if ctx._cancel_called:
|
if ctx._cancel_called:
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Context {fn_name} cancelled by caller with\n{_err}'
|
f'Context {fn_name} cancelled by caller with\n{etype}'
|
||||||
)
|
)
|
||||||
elif _err is not None:
|
elif _err is not None:
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Context {fn_name} cancelled by callee with\n{_err}'
|
f'Context for task cancelled by callee with {etype}\n'
|
||||||
|
f'target: `{fn_name}`\n'
|
||||||
|
f'task:{cid}\n'
|
||||||
|
f'actor:{uid}'
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
log.runtime(
|
log.runtime(
|
||||||
|
|
|
@ -18,17 +18,30 @@
|
||||||
Machinery for actor process spawning using multiple backends.
|
Machinery for actor process spawning using multiple backends.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
|
||||||
import sys
|
import sys
|
||||||
|
import multiprocessing as mp
|
||||||
import platform
|
import platform
|
||||||
from typing import (
|
from typing import (
|
||||||
Any, Optional, Callable, TypeVar, TYPE_CHECKING
|
Any, Dict, Optional, Callable,
|
||||||
|
TypeVar,
|
||||||
)
|
)
|
||||||
from collections.abc import Awaitable
|
from collections.abc import Awaitable
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
|
|
||||||
|
try:
|
||||||
|
from multiprocessing import semaphore_tracker # type: ignore
|
||||||
|
resource_tracker = semaphore_tracker
|
||||||
|
resource_tracker._resource_tracker = resource_tracker._semaphore_tracker
|
||||||
|
except ImportError:
|
||||||
|
# 3.8 introduces a more general version that also tracks shared mems
|
||||||
|
from multiprocessing import resource_tracker # type: ignore
|
||||||
|
|
||||||
|
from multiprocessing import forkserver # type: ignore
|
||||||
|
from typing import Tuple
|
||||||
|
|
||||||
|
from . import _forkserver_override
|
||||||
from ._debug import (
|
from ._debug import (
|
||||||
maybe_wait_for_debugger,
|
maybe_wait_for_debugger,
|
||||||
acquire_debug_lock,
|
acquire_debug_lock,
|
||||||
|
@ -47,11 +60,8 @@ from ._entry import _mp_main
|
||||||
from ._exceptions import ActorFailure
|
from ._exceptions import ActorFailure
|
||||||
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
import multiprocessing as mp
|
|
||||||
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
|
|
||||||
|
|
||||||
log = get_logger('tractor')
|
log = get_logger('tractor')
|
||||||
|
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
|
||||||
|
|
||||||
# placeholder for an mp start context if so using that backend
|
# placeholder for an mp start context if so using that backend
|
||||||
_ctx: Optional[mp.context.BaseContext] = None
|
_ctx: Optional[mp.context.BaseContext] = None
|
||||||
|
@ -60,7 +70,6 @@ _spawn_method: str = "trio"
|
||||||
|
|
||||||
if platform.system() == 'Windows':
|
if platform.system() == 'Windows':
|
||||||
|
|
||||||
import multiprocessing as mp
|
|
||||||
_ctx = mp.get_context("spawn")
|
_ctx = mp.get_context("spawn")
|
||||||
|
|
||||||
async def proc_waiter(proc: mp.Process) -> None:
|
async def proc_waiter(proc: mp.Process) -> None:
|
||||||
|
@ -83,7 +92,6 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
|
||||||
``subprocess.Popen``.
|
``subprocess.Popen``.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
import multiprocessing as mp
|
|
||||||
global _ctx
|
global _ctx
|
||||||
global _spawn_method
|
global _spawn_method
|
||||||
|
|
||||||
|
@ -100,7 +108,6 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
|
||||||
f"Spawn method `{name}` is invalid please choose one of {methods}"
|
f"Spawn method `{name}` is invalid please choose one of {methods}"
|
||||||
)
|
)
|
||||||
elif name == 'forkserver':
|
elif name == 'forkserver':
|
||||||
from . import _forkserver_override
|
|
||||||
_forkserver_override.override_stdlib()
|
_forkserver_override.override_stdlib()
|
||||||
_ctx = mp.get_context(name)
|
_ctx = mp.get_context(name)
|
||||||
elif name == 'trio':
|
elif name == 'trio':
|
||||||
|
@ -148,7 +155,7 @@ async def cancel_on_completion(
|
||||||
|
|
||||||
portal: Portal,
|
portal: Portal,
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
errors: dict[tuple[str, str], Exception],
|
errors: Dict[Tuple[str, str], Exception],
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -251,12 +258,12 @@ async def new_proc(
|
||||||
name: str,
|
name: str,
|
||||||
actor_nursery: 'ActorNursery', # type: ignore # noqa
|
actor_nursery: 'ActorNursery', # type: ignore # noqa
|
||||||
subactor: Actor,
|
subactor: Actor,
|
||||||
errors: dict[tuple[str, str], Exception],
|
errors: Dict[Tuple[str, str], Exception],
|
||||||
|
|
||||||
# passed through to actor main
|
# passed through to actor main
|
||||||
bind_addr: tuple[str, int],
|
bind_addr: Tuple[str, int],
|
||||||
parent_addr: tuple[str, int],
|
parent_addr: Tuple[str, int],
|
||||||
_runtime_vars: dict[str, Any], # serialized and sent to _child
|
_runtime_vars: Dict[str, Any], # serialized and sent to _child
|
||||||
|
|
||||||
*,
|
*,
|
||||||
|
|
||||||
|
@ -288,7 +295,7 @@ async def new_proc(
|
||||||
# the OS; it otherwise can be passed via the parent channel if
|
# the OS; it otherwise can be passed via the parent channel if
|
||||||
# we prefer in the future (for privacy).
|
# we prefer in the future (for privacy).
|
||||||
"--uid",
|
"--uid",
|
||||||
str(subactor.uid),
|
str(uid),
|
||||||
# Address the child must connect to on startup
|
# Address the child must connect to on startup
|
||||||
"--parent_addr",
|
"--parent_addr",
|
||||||
str(parent_addr)
|
str(parent_addr)
|
||||||
|
@ -314,8 +321,7 @@ async def new_proc(
|
||||||
# wait for actor to spawn and connect back to us
|
# wait for actor to spawn and connect back to us
|
||||||
# channel should have handshake completed by the
|
# channel should have handshake completed by the
|
||||||
# local actor by the time we get a ref to it
|
# local actor by the time we get a ref to it
|
||||||
event, chan = await actor_nursery._actor.wait_for_peer(
|
event, chan = await actor_nursery._actor.wait_for_peer(uid)
|
||||||
subactor.uid)
|
|
||||||
|
|
||||||
except trio.Cancelled:
|
except trio.Cancelled:
|
||||||
cancelled_during_spawn = True
|
cancelled_during_spawn = True
|
||||||
|
@ -356,10 +362,54 @@ async def new_proc(
|
||||||
task_status.started(portal)
|
task_status.started(portal)
|
||||||
|
|
||||||
# wait for ActorNursery.wait() to be called
|
# wait for ActorNursery.wait() to be called
|
||||||
|
n_exited = actor_nursery._join_procs
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await actor_nursery._join_procs.wait()
|
await n_exited.wait()
|
||||||
|
|
||||||
async with trio.open_nursery() as nursery:
|
async with trio.open_nursery() as nursery:
|
||||||
|
|
||||||
|
async def soft_wait_and_maybe_cancel_ria_task():
|
||||||
|
# This is a "soft" (cancellable) join/reap which
|
||||||
|
# will remote cancel the actor on a ``trio.Cancelled``
|
||||||
|
# condition.
|
||||||
|
await soft_wait(
|
||||||
|
proc,
|
||||||
|
trio.Process.wait,
|
||||||
|
portal
|
||||||
|
)
|
||||||
|
|
||||||
|
if n_exited.is_set():
|
||||||
|
# cancel result waiter that may have been spawned in
|
||||||
|
# tandem if not done already
|
||||||
|
log.warning(
|
||||||
|
"Cancelling existing result waiter task for "
|
||||||
|
f"{subactor.uid}"
|
||||||
|
)
|
||||||
|
nursery.cancel_scope.cancel()
|
||||||
|
|
||||||
|
else:
|
||||||
|
log.warning(
|
||||||
|
f'Process for actor {uid} terminated before'
|
||||||
|
'nursery exit. ' 'This may mean an IPC'
|
||||||
|
'connection failed!'
|
||||||
|
)
|
||||||
|
|
||||||
|
nursery.start_soon(soft_wait_and_maybe_cancel_ria_task)
|
||||||
|
|
||||||
|
# TODO: when we finally remove the `.run_in_actor()` api
|
||||||
|
# we should be able to entirely drop these 2 blocking calls:
|
||||||
|
# - we don't need to wait on nursery exit to capture
|
||||||
|
# process-spawn-machinery level errors (and propagate them).
|
||||||
|
# - we don't need to wait on final results from ria portals
|
||||||
|
# since this will be done in some higher level wrapper API.
|
||||||
|
|
||||||
|
# XXX: interestingly we can't put this here bc it causes
|
||||||
|
# the pub-sub tests to fail? wth.. should probably drop
|
||||||
|
# those XD
|
||||||
|
# wait for ActorNursery.wait() to be called
|
||||||
|
# with trio.CancelScope(shield=True):
|
||||||
|
# await n_exited.wait()
|
||||||
|
|
||||||
if portal in actor_nursery._cancel_after_result_on_exit:
|
if portal in actor_nursery._cancel_after_result_on_exit:
|
||||||
nursery.start_soon(
|
nursery.start_soon(
|
||||||
cancel_on_completion,
|
cancel_on_completion,
|
||||||
|
@ -368,22 +418,6 @@ async def new_proc(
|
||||||
errors
|
errors
|
||||||
)
|
)
|
||||||
|
|
||||||
# This is a "soft" (cancellable) join/reap which
|
|
||||||
# will remote cancel the actor on a ``trio.Cancelled``
|
|
||||||
# condition.
|
|
||||||
await soft_wait(
|
|
||||||
proc,
|
|
||||||
trio.Process.wait,
|
|
||||||
portal
|
|
||||||
)
|
|
||||||
|
|
||||||
# cancel result waiter that may have been spawned in
|
|
||||||
# tandem if not done already
|
|
||||||
log.warning(
|
|
||||||
"Cancelling existing result waiter task for "
|
|
||||||
f"{subactor.uid}")
|
|
||||||
nursery.cancel_scope.cancel()
|
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# The "hard" reap since no actor zombies are allowed!
|
# The "hard" reap since no actor zombies are allowed!
|
||||||
# XXX: do this **after** cancellation/tearfown to avoid
|
# XXX: do this **after** cancellation/tearfown to avoid
|
||||||
|
@ -400,9 +434,10 @@ async def new_proc(
|
||||||
await proc.wait()
|
await proc.wait()
|
||||||
|
|
||||||
if is_root_process():
|
if is_root_process():
|
||||||
|
|
||||||
await maybe_wait_for_debugger(
|
await maybe_wait_for_debugger(
|
||||||
child_in_debug=_runtime_vars.get(
|
child_in_debug=_runtime_vars.get(
|
||||||
'_debug_mode', False),
|
'_debug_mode', False)
|
||||||
)
|
)
|
||||||
|
|
||||||
if proc.poll() is None:
|
if proc.poll() is None:
|
||||||
|
@ -441,30 +476,20 @@ async def mp_new_proc(
|
||||||
name: str,
|
name: str,
|
||||||
actor_nursery: 'ActorNursery', # type: ignore # noqa
|
actor_nursery: 'ActorNursery', # type: ignore # noqa
|
||||||
subactor: Actor,
|
subactor: Actor,
|
||||||
errors: dict[tuple[str, str], Exception],
|
errors: Dict[Tuple[str, str], Exception],
|
||||||
# passed through to actor main
|
# passed through to actor main
|
||||||
bind_addr: tuple[str, int],
|
bind_addr: Tuple[str, int],
|
||||||
parent_addr: tuple[str, int],
|
parent_addr: Tuple[str, int],
|
||||||
_runtime_vars: dict[str, Any], # serialized and sent to _child
|
_runtime_vars: Dict[str, Any], # serialized and sent to _child
|
||||||
*,
|
*,
|
||||||
infect_asyncio: bool = False,
|
infect_asyncio: bool = False,
|
||||||
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
|
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
# uggh zone
|
|
||||||
try:
|
|
||||||
from multiprocessing import semaphore_tracker # type: ignore
|
|
||||||
resource_tracker = semaphore_tracker
|
|
||||||
resource_tracker._resource_tracker = resource_tracker._semaphore_tracker # noqa
|
|
||||||
except ImportError:
|
|
||||||
# 3.8 introduces a more general version that also tracks shared mems
|
|
||||||
from multiprocessing import resource_tracker # type: ignore
|
|
||||||
|
|
||||||
assert _ctx
|
assert _ctx
|
||||||
start_method = _ctx.get_start_method()
|
start_method = _ctx.get_start_method()
|
||||||
if start_method == 'forkserver':
|
if start_method == 'forkserver':
|
||||||
from multiprocessing import forkserver # type: ignore
|
|
||||||
# XXX do our hackery on the stdlib to avoid multiple
|
# XXX do our hackery on the stdlib to avoid multiple
|
||||||
# forkservers (one at each subproc layer).
|
# forkservers (one at each subproc layer).
|
||||||
fs = forkserver._forkserver
|
fs = forkserver._forkserver
|
||||||
|
|
|
@ -604,7 +604,8 @@ class Context:
|
||||||
self._portal._streams.remove(rchan)
|
self._portal._streams.remove(rchan)
|
||||||
|
|
||||||
async def result(self) -> Any:
|
async def result(self) -> Any:
|
||||||
'''From a caller side, wait for and return the final result from
|
'''
|
||||||
|
From a caller side, wait for and return the final result from
|
||||||
the callee side task.
|
the callee side task.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
Loading…
Reference in New Issue