Merge pull request #25 from tgoodlet/drop_main_kwarg
Drop the "main" task via kwarg ideareg_with_uid
commit
140956aedb
|
@ -10,4 +10,4 @@ install:
|
||||||
- pip install . -r requirements-test.txt
|
- pip install . -r requirements-test.txt
|
||||||
|
|
||||||
script:
|
script:
|
||||||
- pytest tests/
|
- pytest tests/ --no-print-logs
|
||||||
|
|
|
@ -76,19 +76,19 @@ async def spawn(is_arbiter):
|
||||||
await trio.sleep(0.1)
|
await trio.sleep(0.1)
|
||||||
actor = tractor.current_actor()
|
actor = tractor.current_actor()
|
||||||
assert actor.is_arbiter == is_arbiter
|
assert actor.is_arbiter == is_arbiter
|
||||||
|
|
||||||
# arbiter should always have an empty statespace as it's redundant
|
|
||||||
assert actor.statespace == statespace
|
assert actor.statespace == statespace
|
||||||
|
|
||||||
if actor.is_arbiter:
|
if actor.is_arbiter:
|
||||||
async with tractor.open_nursery() as nursery:
|
async with tractor.open_nursery() as nursery:
|
||||||
# forks here
|
# forks here
|
||||||
portal = await nursery.start_actor(
|
portal = await nursery.run_in_actor(
|
||||||
'sub-actor',
|
'sub-actor',
|
||||||
main=partial(spawn, False),
|
spawn,
|
||||||
|
is_arbiter=False,
|
||||||
statespace=statespace,
|
statespace=statespace,
|
||||||
rpc_module_paths=namespaces,
|
rpc_module_paths=namespaces,
|
||||||
)
|
)
|
||||||
|
|
||||||
assert len(nursery._children) == 1
|
assert len(nursery._children) == 1
|
||||||
assert portal.channel.uid in tractor.current_actor()._peers
|
assert portal.channel.uid in tractor.current_actor()._peers
|
||||||
# be sure we can still get the result
|
# be sure we can still get the result
|
||||||
|
@ -128,9 +128,6 @@ async def stream_from_single_subactor():
|
||||||
'streamerd',
|
'streamerd',
|
||||||
rpc_module_paths=[__name__],
|
rpc_module_paths=[__name__],
|
||||||
statespace={'global_dict': {}},
|
statespace={'global_dict': {}},
|
||||||
# don't start a main func - use rpc
|
|
||||||
# currently the same as outlive_main=False
|
|
||||||
main=None,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
seq = range(10)
|
seq = range(10)
|
||||||
|
@ -172,7 +169,7 @@ def test_remote_error():
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery() as nursery:
|
async with tractor.open_nursery() as nursery:
|
||||||
|
|
||||||
portal = await nursery.start_actor('errorer', main=assert_err)
|
portal = await nursery.run_in_actor('errorer', assert_err)
|
||||||
|
|
||||||
# get result(s) from main task
|
# get result(s) from main task
|
||||||
try:
|
try:
|
||||||
|
@ -204,7 +201,6 @@ async def test_cancel_infinite_streamer():
|
||||||
portal = await n.start_actor(
|
portal = await n.start_actor(
|
||||||
f'donny',
|
f'donny',
|
||||||
rpc_module_paths=[__name__],
|
rpc_module_paths=[__name__],
|
||||||
outlive_main=True
|
|
||||||
)
|
)
|
||||||
async for letter in await portal.run(__name__, 'stream_forever'):
|
async for letter in await portal.run(__name__, 'stream_forever'):
|
||||||
print(letter)
|
print(letter)
|
||||||
|
@ -227,17 +223,17 @@ async def test_one_cancels_all():
|
||||||
real_actors.append(await n.start_actor(
|
real_actors.append(await n.start_actor(
|
||||||
f'actor_{i}',
|
f'actor_{i}',
|
||||||
rpc_module_paths=[__name__],
|
rpc_module_paths=[__name__],
|
||||||
outlive_main=True
|
|
||||||
))
|
))
|
||||||
|
|
||||||
# start one actor that will fail immediately
|
# start one actor that will fail immediately
|
||||||
await n.start_actor('extra', main=assert_err)
|
await n.run_in_actor('extra', assert_err)
|
||||||
|
|
||||||
# should error here with a ``RemoteActorError`` containing
|
# should error here with a ``RemoteActorError`` containing
|
||||||
# an ``AssertionError`
|
# an ``AssertionError`
|
||||||
|
|
||||||
except tractor.RemoteActorError:
|
except tractor.RemoteActorError:
|
||||||
assert n.cancelled is True
|
assert n.cancelled is True
|
||||||
|
assert not n._children
|
||||||
else:
|
else:
|
||||||
pytest.fail("Should have gotten a remote assertion error?")
|
pytest.fail("Should have gotten a remote assertion error?")
|
||||||
|
|
||||||
|
@ -263,16 +259,15 @@ async def test_trynamic_trio():
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery() as n:
|
||||||
print("Alright... Action!")
|
print("Alright... Action!")
|
||||||
|
|
||||||
donny = await n.start_actor(
|
donny = await n.run_in_actor(
|
||||||
'donny',
|
'donny',
|
||||||
main=partial(say_hello, 'gretchen'),
|
say_hello,
|
||||||
rpc_module_paths=[__name__],
|
other_actor='gretchen',
|
||||||
outlive_main=True
|
|
||||||
)
|
)
|
||||||
gretchen = await n.start_actor(
|
gretchen = await n.run_in_actor(
|
||||||
'gretchen',
|
'gretchen',
|
||||||
main=partial(say_hello, 'donny'),
|
say_hello,
|
||||||
rpc_module_paths=[__name__],
|
other_actor='donny',
|
||||||
)
|
)
|
||||||
print(await gretchen.result())
|
print(await gretchen.result())
|
||||||
print(await donny.result())
|
print(await donny.result())
|
||||||
|
@ -296,7 +291,6 @@ async def test_movie_theatre_convo():
|
||||||
'frank',
|
'frank',
|
||||||
# enable the actor to run funcs from this current module
|
# enable the actor to run funcs from this current module
|
||||||
rpc_module_paths=[__name__],
|
rpc_module_paths=[__name__],
|
||||||
outlive_main=True,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
print(await portal.run(__name__, 'movie_theatre_question'))
|
print(await portal.run(__name__, 'movie_theatre_question'))
|
||||||
|
@ -304,18 +298,18 @@ async def test_movie_theatre_convo():
|
||||||
print(await portal.run(__name__, 'movie_theatre_question'))
|
print(await portal.run(__name__, 'movie_theatre_question'))
|
||||||
|
|
||||||
# the async with will block here indefinitely waiting
|
# the async with will block here indefinitely waiting
|
||||||
# for our actor "frank" to complete, but since it's an
|
# for our actor "frank" to complete, we cancel 'frank'
|
||||||
# "outlive_main" actor it will never end until cancelled
|
# to avoid blocking indefinitely
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
|
|
||||||
@tractor_test
|
@tractor_test
|
||||||
async def test_movie_theatre_convo_main_task():
|
async def test_movie_theatre_convo_main_task():
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery() as n:
|
||||||
portal = await n.start_actor('some_linguist', main=cellar_door)
|
portal = await n.run_in_actor('frank', movie_theatre_question)
|
||||||
|
|
||||||
# The ``async with`` will unblock here since the 'some_linguist'
|
# The ``async with`` will unblock here since the 'frank'
|
||||||
# actor has completed its main task ``cellar_door``.
|
# actor has completed its main task ``movie_theatre_question()``.
|
||||||
|
|
||||||
print(await portal.result())
|
print(await portal.result())
|
||||||
|
|
||||||
|
@ -329,7 +323,7 @@ async def test_most_beautiful_word():
|
||||||
"""The main ``tractor`` routine.
|
"""The main ``tractor`` routine.
|
||||||
"""
|
"""
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery() as n:
|
||||||
portal = await n.start_actor('some_linguist', main=cellar_door)
|
portal = await n.run_in_actor('some_linguist', cellar_door)
|
||||||
|
|
||||||
# The ``async with`` will unblock here since the 'some_linguist'
|
# The ``async with`` will unblock here since the 'some_linguist'
|
||||||
# actor has completed its main task ``cellar_door``.
|
# actor has completed its main task ``cellar_door``.
|
||||||
|
@ -375,7 +369,6 @@ async def aggregate(seed):
|
||||||
portal = await nursery.start_actor(
|
portal = await nursery.start_actor(
|
||||||
name=f'streamer_{i}',
|
name=f'streamer_{i}',
|
||||||
rpc_module_paths=[__name__],
|
rpc_module_paths=[__name__],
|
||||||
outlive_main=True, # daemonize these actors
|
|
||||||
)
|
)
|
||||||
|
|
||||||
portals.append(portal)
|
portals.append(portal)
|
||||||
|
@ -416,7 +409,6 @@ async def aggregate(seed):
|
||||||
print("AGGREGATOR COMPLETE!")
|
print("AGGREGATOR COMPLETE!")
|
||||||
|
|
||||||
|
|
||||||
# @tractor_test
|
|
||||||
async def a_quadruple_example():
|
async def a_quadruple_example():
|
||||||
# a nursery which spawns "actors"
|
# a nursery which spawns "actors"
|
||||||
async with tractor.open_nursery() as nursery:
|
async with tractor.open_nursery() as nursery:
|
||||||
|
@ -424,10 +416,10 @@ async def a_quadruple_example():
|
||||||
seed = int(1e3)
|
seed = int(1e3)
|
||||||
pre_start = time.time()
|
pre_start = time.time()
|
||||||
|
|
||||||
portal = await nursery.start_actor(
|
portal = await nursery.run_in_actor(
|
||||||
name='aggregator',
|
'aggregator',
|
||||||
# executed in the actor's "main task" immediately
|
aggregate,
|
||||||
main=partial(aggregate, seed),
|
seed=seed,
|
||||||
)
|
)
|
||||||
|
|
||||||
start = time.time()
|
start = time.time()
|
||||||
|
@ -449,17 +441,18 @@ async def cancel_after(wait):
|
||||||
|
|
||||||
|
|
||||||
def test_a_quadruple_example():
|
def test_a_quadruple_example():
|
||||||
"""Verify the *show me the code* readme example works.
|
"""This also serves as a kind of "we'd like to eventually be this
|
||||||
|
fast test".
|
||||||
"""
|
"""
|
||||||
results = tractor.run(cancel_after, 2, arbiter_addr=_arb_addr)
|
results = tractor.run(cancel_after, 2.1, arbiter_addr=_arb_addr)
|
||||||
assert results
|
assert results
|
||||||
|
|
||||||
|
|
||||||
def test_not_fast_enough_quad():
|
@pytest.mark.parametrize('cancel_delay', list(range(1, 7)))
|
||||||
|
def test_not_fast_enough_quad(cancel_delay):
|
||||||
"""Verify we can cancel midway through the quad example and all actors
|
"""Verify we can cancel midway through the quad example and all actors
|
||||||
cancel gracefully.
|
cancel gracefully.
|
||||||
|
|
||||||
This also serves as a kind of "we'd like to eventually be this fast test".
|
|
||||||
"""
|
"""
|
||||||
results = tractor.run(cancel_after, 1, arbiter_addr=_arb_addr)
|
delay = 1 + cancel_delay/10
|
||||||
|
results = tractor.run(cancel_after, delay, arbiter_addr=_arb_addr)
|
||||||
assert results is None
|
assert results is None
|
||||||
|
|
|
@ -49,23 +49,22 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr):
|
||||||
log.info(f"Arbiter seems to exist @ {host}:{port}")
|
log.info(f"Arbiter seems to exist @ {host}:{port}")
|
||||||
actor = Actor(
|
actor = Actor(
|
||||||
name or 'anonymous',
|
name or 'anonymous',
|
||||||
main=main,
|
|
||||||
arbiter_addr=arbiter_addr,
|
arbiter_addr=arbiter_addr,
|
||||||
**kwargs
|
**kwargs
|
||||||
)
|
)
|
||||||
host, port = (host, 0)
|
host, port = (host, 0)
|
||||||
else:
|
else:
|
||||||
# start this local actor as the arbiter
|
# start this local actor as the arbiter
|
||||||
# this should eventually get passed `outlive_main=True`?
|
|
||||||
actor = Arbiter(
|
actor = Arbiter(
|
||||||
name or 'arbiter', main=main, arbiter_addr=arbiter_addr, **kwargs)
|
name or 'arbiter', arbiter_addr=arbiter_addr, **kwargs)
|
||||||
|
|
||||||
# ``Actor._async_main()`` creates an internal nursery if one is not
|
# ``Actor._async_main()`` creates an internal nursery if one is not
|
||||||
# provided and thus blocks here until it's main task completes.
|
# provided and thus blocks here until it's main task completes.
|
||||||
# Note that if the current actor is the arbiter it is desirable
|
# Note that if the current actor is the arbiter it is desirable
|
||||||
# for it to stay up indefinitely until a re-election process has
|
# for it to stay up indefinitely until a re-election process has
|
||||||
# taken place - which is not implemented yet FYI).
|
# taken place - which is not implemented yet FYI).
|
||||||
return await _start_actor(actor, host, port, arbiter_addr=arbiter_addr)
|
return await _start_actor(
|
||||||
|
actor, main, host, port, arbiter_addr=arbiter_addr)
|
||||||
|
|
||||||
|
|
||||||
def run(
|
def run(
|
||||||
|
|
|
@ -5,9 +5,9 @@ import inspect
|
||||||
import importlib
|
import importlib
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from typing import Coroutine
|
|
||||||
import traceback
|
import traceback
|
||||||
import uuid
|
import uuid
|
||||||
|
from itertools import chain
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
from async_generator import asynccontextmanager, aclosing
|
from async_generator import asynccontextmanager, aclosing
|
||||||
|
@ -27,9 +27,13 @@ class ActorFailure(Exception):
|
||||||
"General actor failure"
|
"General actor failure"
|
||||||
|
|
||||||
|
|
||||||
|
class InternalActorError(RuntimeError):
|
||||||
|
"Actor primitive internals failure"
|
||||||
|
|
||||||
|
|
||||||
async def _invoke(
|
async def _invoke(
|
||||||
cid, chan, func, kwargs,
|
actor, cid, chan, func, kwargs,
|
||||||
treat_as_gen=False, raise_errs=False,
|
treat_as_gen=False,
|
||||||
task_status=trio.TASK_STATUS_IGNORED
|
task_status=trio.TASK_STATUS_IGNORED
|
||||||
):
|
):
|
||||||
"""Invoke local func and return results over provided channel.
|
"""Invoke local func and return results over provided channel.
|
||||||
|
@ -47,11 +51,15 @@ async def _invoke(
|
||||||
not is_async_partial and
|
not is_async_partial and
|
||||||
not is_async_gen_partial
|
not is_async_gen_partial
|
||||||
):
|
):
|
||||||
|
await chan.send({'functype': 'function', 'cid': cid})
|
||||||
|
with trio.open_cancel_scope() as cs:
|
||||||
|
task_status.started(cs)
|
||||||
await chan.send({'return': func(**kwargs), 'cid': cid})
|
await chan.send({'return': func(**kwargs), 'cid': cid})
|
||||||
else:
|
else:
|
||||||
coro = func(**kwargs)
|
coro = func(**kwargs)
|
||||||
|
|
||||||
if inspect.isasyncgen(coro):
|
if inspect.isasyncgen(coro):
|
||||||
|
await chan.send({'functype': 'asyncgen', 'cid': cid})
|
||||||
# XXX: massive gotcha! If the containing scope
|
# XXX: massive gotcha! If the containing scope
|
||||||
# is cancelled and we execute the below line,
|
# is cancelled and we execute the below line,
|
||||||
# any ``ActorNursery.__aexit__()`` WON'T be
|
# any ``ActorNursery.__aexit__()`` WON'T be
|
||||||
|
@ -59,6 +67,8 @@ async def _invoke(
|
||||||
# have to properly handle the closing (aclosing)
|
# have to properly handle the closing (aclosing)
|
||||||
# of the async gen in order to be sure the cancel
|
# of the async gen in order to be sure the cancel
|
||||||
# is propagated!
|
# is propagated!
|
||||||
|
with trio.open_cancel_scope() as cs:
|
||||||
|
task_status.started(cs)
|
||||||
async with aclosing(coro) as agen:
|
async with aclosing(coro) as agen:
|
||||||
async for item in agen:
|
async for item in agen:
|
||||||
# TODO: can we send values back in here?
|
# TODO: can we send values back in here?
|
||||||
|
@ -77,22 +87,35 @@ async def _invoke(
|
||||||
await chan.send({'stop': None, 'cid': cid})
|
await chan.send({'stop': None, 'cid': cid})
|
||||||
else:
|
else:
|
||||||
if treat_as_gen:
|
if treat_as_gen:
|
||||||
|
await chan.send({'functype': 'asyncgen', 'cid': cid})
|
||||||
# XXX: the async-func may spawn further tasks which push
|
# XXX: the async-func may spawn further tasks which push
|
||||||
# back values like an async-generator would but must
|
# back values like an async-generator would but must
|
||||||
# manualy construct the response dict-packet-responses as
|
# manualy construct the response dict-packet-responses as
|
||||||
# above
|
# above
|
||||||
|
with trio.open_cancel_scope() as cs:
|
||||||
|
task_status.started(cs)
|
||||||
await coro
|
await coro
|
||||||
else:
|
else:
|
||||||
|
await chan.send({'functype': 'asyncfunction', 'cid': cid})
|
||||||
|
with trio.open_cancel_scope() as cs:
|
||||||
|
task_status.started(cs)
|
||||||
await chan.send({'return': await coro, 'cid': cid})
|
await chan.send({'return': await coro, 'cid': cid})
|
||||||
|
|
||||||
except Exception:
|
except Exception:
|
||||||
|
# always ship errors back to caller
|
||||||
log.exception("Actor errored:")
|
log.exception("Actor errored:")
|
||||||
if not raise_errs:
|
|
||||||
await chan.send({'error': traceback.format_exc(), 'cid': cid})
|
await chan.send({'error': traceback.format_exc(), 'cid': cid})
|
||||||
else:
|
finally:
|
||||||
raise
|
# RPC task bookeeping
|
||||||
|
tasks = actor._rpc_tasks.get(chan, None)
|
||||||
|
if tasks:
|
||||||
|
tasks.remove((cs, func))
|
||||||
|
|
||||||
task_status.started()
|
if not tasks:
|
||||||
|
actor._rpc_tasks.pop(chan, None)
|
||||||
|
|
||||||
|
if not actor._rpc_tasks:
|
||||||
|
log.info(f"All RPC tasks have completed")
|
||||||
|
actor._no_more_rpc_tasks.set()
|
||||||
|
|
||||||
|
|
||||||
class Actor:
|
class Actor:
|
||||||
|
@ -108,12 +131,10 @@ class Actor:
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
main: Coroutine = None,
|
|
||||||
rpc_module_paths: [str] = [],
|
rpc_module_paths: [str] = [],
|
||||||
statespace: dict = {},
|
statespace: dict = {},
|
||||||
uid: str = None,
|
uid: str = None,
|
||||||
allow_rpc: bool = True,
|
allow_rpc: bool = True,
|
||||||
outlive_main: bool = False,
|
|
||||||
loglevel: str = None,
|
loglevel: str = None,
|
||||||
arbiter_addr: (str, int) = None,
|
arbiter_addr: (str, int) = None,
|
||||||
):
|
):
|
||||||
|
@ -121,22 +142,25 @@ class Actor:
|
||||||
self.uid = (name, uid or str(uuid.uuid1()))
|
self.uid = (name, uid or str(uuid.uuid1()))
|
||||||
self.rpc_module_paths = rpc_module_paths
|
self.rpc_module_paths = rpc_module_paths
|
||||||
self._mods = {}
|
self._mods = {}
|
||||||
self.main = main
|
|
||||||
# TODO: consider making this a dynamically defined
|
# TODO: consider making this a dynamically defined
|
||||||
# @dataclass once we get py3.7
|
# @dataclass once we get py3.7
|
||||||
self.statespace = statespace
|
self.statespace = statespace
|
||||||
self._allow_rpc = allow_rpc
|
self._allow_rpc = allow_rpc
|
||||||
self._outlive_main = outlive_main
|
|
||||||
self.loglevel = loglevel
|
self.loglevel = loglevel
|
||||||
self._arb_addr = arbiter_addr
|
self._arb_addr = arbiter_addr
|
||||||
|
|
||||||
# filled in by `_async_main` after fork
|
# filled in by `_async_main` after fork
|
||||||
|
self._root_nursery = None
|
||||||
|
self._server_nursery = None
|
||||||
self._peers = defaultdict(list)
|
self._peers = defaultdict(list)
|
||||||
self._peer_connected = {}
|
self._peer_connected = {}
|
||||||
self._no_more_peers = trio.Event()
|
self._no_more_peers = trio.Event()
|
||||||
self._main_complete = trio.Event()
|
|
||||||
self._main_scope = None
|
|
||||||
self._no_more_peers.set()
|
self._no_more_peers.set()
|
||||||
|
|
||||||
|
self._no_more_rpc_tasks = trio.Event()
|
||||||
|
self._no_more_rpc_tasks.set()
|
||||||
|
self._rpc_tasks = {}
|
||||||
|
|
||||||
self._actors2calls = {} # map {uids -> {callids -> waiter queues}}
|
self._actors2calls = {} # map {uids -> {callids -> waiter queues}}
|
||||||
self._listeners = []
|
self._listeners = []
|
||||||
self._parent_chan = None
|
self._parent_chan = None
|
||||||
|
@ -209,12 +233,6 @@ class Actor:
|
||||||
if not chans:
|
if not chans:
|
||||||
log.debug(f"No more channels for {chan.uid}")
|
log.debug(f"No more channels for {chan.uid}")
|
||||||
self._peers.pop(chan.uid, None)
|
self._peers.pop(chan.uid, None)
|
||||||
if not self._actors2calls.get(chan.uid, {}).get('main'):
|
|
||||||
# fake a "main task" result for any waiting
|
|
||||||
# nurseries/portals
|
|
||||||
log.debug(f"Faking result for {chan} from {chan.uid}")
|
|
||||||
q = self.get_waitq(chan.uid, 'main')
|
|
||||||
q.put_nowait({'return': None, 'cid': 'main'})
|
|
||||||
|
|
||||||
log.debug(f"Peers is {self._peers}")
|
log.debug(f"Peers is {self._peers}")
|
||||||
|
|
||||||
|
@ -222,7 +240,7 @@ class Actor:
|
||||||
self._no_more_peers.set()
|
self._no_more_peers.set()
|
||||||
log.debug(f"Signalling no more peer channels")
|
log.debug(f"Signalling no more peer channels")
|
||||||
|
|
||||||
# XXX: is this necessary?
|
# # XXX: is this necessary (GC should do it?)
|
||||||
if chan.connected():
|
if chan.connected():
|
||||||
log.debug(f"Disconnecting channel {chan}")
|
log.debug(f"Disconnecting channel {chan}")
|
||||||
await chan.send(None)
|
await chan.send(None)
|
||||||
|
@ -259,25 +277,33 @@ class Actor:
|
||||||
# TODO: once https://github.com/python-trio/trio/issues/467 gets
|
# TODO: once https://github.com/python-trio/trio/issues/467 gets
|
||||||
# worked out we'll likely want to use that!
|
# worked out we'll likely want to use that!
|
||||||
log.debug(f"Entering msg loop for {chan} from {chan.uid}")
|
log.debug(f"Entering msg loop for {chan} from {chan.uid}")
|
||||||
async with trio.open_nursery() as nursery:
|
|
||||||
try:
|
try:
|
||||||
async for msg in chan.aiter_recv():
|
async for msg in chan.aiter_recv():
|
||||||
if msg is None: # terminate sentinel
|
if msg is None: # terminate sentinel
|
||||||
log.debug(
|
log.debug(
|
||||||
f"Cancelling all tasks for {chan} from {chan.uid}")
|
f"Cancelling all tasks for {chan} from {chan.uid}")
|
||||||
nursery.cancel_scope.cancel()
|
for scope, func in self._rpc_tasks.pop(chan, ()):
|
||||||
|
scope.cancel()
|
||||||
|
|
||||||
log.debug(
|
log.debug(
|
||||||
f"Msg loop signalled to terminate for"
|
f"Msg loop signalled to terminate for"
|
||||||
f" {chan} from {chan.uid}")
|
f" {chan} from {chan.uid}")
|
||||||
break
|
break
|
||||||
log.debug(f"Received msg {msg} from {chan.uid}")
|
log.debug(f"Received msg {msg} from {chan.uid}")
|
||||||
cid = msg.get('cid')
|
cid = msg.get('cid')
|
||||||
if cid: # deliver response to local caller/waiter
|
if cid:
|
||||||
|
if cid == 'internal': # internal actor error
|
||||||
|
# import pdb; pdb.set_trace()
|
||||||
|
raise InternalActorError(
|
||||||
|
f"{chan.uid}\n" + msg['error'])
|
||||||
|
|
||||||
|
# deliver response to local caller/waiter
|
||||||
await self._push_result(chan.uid, cid, msg)
|
await self._push_result(chan.uid, cid, msg)
|
||||||
log.debug(
|
log.debug(
|
||||||
f"Waiting on next msg for {chan} from {chan.uid}")
|
f"Waiting on next msg for {chan} from {chan.uid}")
|
||||||
continue
|
continue
|
||||||
else:
|
|
||||||
|
# process command request
|
||||||
ns, funcname, kwargs, actorid, cid = msg['cmd']
|
ns, funcname, kwargs, actorid, cid = msg['cmd']
|
||||||
|
|
||||||
log.debug(
|
log.debug(
|
||||||
|
@ -304,17 +330,35 @@ class Actor:
|
||||||
treat_as_gen = True
|
treat_as_gen = True
|
||||||
|
|
||||||
log.debug(f"Spawning task for {func}")
|
log.debug(f"Spawning task for {func}")
|
||||||
nursery.start_soon(
|
cs = await self._root_nursery.start(
|
||||||
_invoke, cid, chan, func, kwargs, treat_as_gen,
|
_invoke, self, cid, chan, func, kwargs, treat_as_gen,
|
||||||
name=funcname
|
name=funcname
|
||||||
)
|
)
|
||||||
|
# never allow cancelling cancel requests (results in
|
||||||
|
# deadlock and other weird behaviour)
|
||||||
|
if func != self.cancel:
|
||||||
|
self._no_more_rpc_tasks.clear()
|
||||||
|
log.info(f"RPC func is {func}")
|
||||||
|
self._rpc_tasks.setdefault(chan, []).append((cs, func))
|
||||||
log.debug(
|
log.debug(
|
||||||
f"Waiting on next msg for {chan} from {chan.uid}")
|
f"Waiting on next msg for {chan} from {chan.uid}")
|
||||||
else: # channel disconnect
|
else: # channel disconnect
|
||||||
log.debug(f"{chan} from {chan.uid} disconnected")
|
log.debug(f"{chan} from {chan.uid} disconnected")
|
||||||
except trio.ClosedStreamError:
|
|
||||||
log.error(f"{chan} form {chan.uid} broke")
|
|
||||||
|
|
||||||
|
except InternalActorError:
|
||||||
|
# ship internal errors upwards
|
||||||
|
if self._parent_chan:
|
||||||
|
await self._parent_chan.send(
|
||||||
|
{'error': traceback.format_exc(), 'cid': 'internal'})
|
||||||
|
raise
|
||||||
|
except trio.ClosedResourceError:
|
||||||
|
log.error(f"{chan} form {chan.uid} broke")
|
||||||
|
except Exception:
|
||||||
|
# ship exception (from above code) to peer as an internal error
|
||||||
|
await chan.send(
|
||||||
|
{'error': traceback.format_exc(), 'cid': 'internal'})
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
log.debug(f"Exiting msg loop for {chan} from {chan.uid}")
|
log.debug(f"Exiting msg loop for {chan} from {chan.uid}")
|
||||||
|
|
||||||
def _fork_main(self, accept_addr, parent_addr=None):
|
def _fork_main(self, accept_addr, parent_addr=None):
|
||||||
|
@ -324,7 +368,7 @@ class Actor:
|
||||||
if self.loglevel is not None:
|
if self.loglevel is not None:
|
||||||
get_console_log(self.loglevel)
|
get_console_log(self.loglevel)
|
||||||
log.info(
|
log.info(
|
||||||
f"Started new {ctx.current_process()} for actor {self.uid}")
|
f"Started new {ctx.current_process()} for {self.uid}")
|
||||||
_state._current_actor = self
|
_state._current_actor = self
|
||||||
log.debug(f"parent_addr is {parent_addr}")
|
log.debug(f"parent_addr is {parent_addr}")
|
||||||
try:
|
try:
|
||||||
|
@ -332,14 +376,15 @@ class Actor:
|
||||||
self._async_main, accept_addr, parent_addr=parent_addr))
|
self._async_main, accept_addr, parent_addr=parent_addr))
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
pass # handle it the same way trio does?
|
pass # handle it the same way trio does?
|
||||||
log.debug(f"Actor {self.uid} terminated")
|
log.info(f"Actor {self.uid} terminated")
|
||||||
|
|
||||||
async def _async_main(
|
async def _async_main(
|
||||||
self,
|
self,
|
||||||
accept_addr,
|
accept_addr,
|
||||||
arbiter_addr=None,
|
arbiter_addr=None,
|
||||||
parent_addr=None,
|
parent_addr=None,
|
||||||
nursery=None
|
nursery=None,
|
||||||
|
task_status=trio.TASK_STATUS_IGNORED,
|
||||||
):
|
):
|
||||||
"""Start the channel server, maybe connect back to the parent, and
|
"""Start the channel server, maybe connect back to the parent, and
|
||||||
start the main task.
|
start the main task.
|
||||||
|
@ -347,7 +392,6 @@ class Actor:
|
||||||
A "root-most" (or "top-level") nursery for this actor is opened here
|
A "root-most" (or "top-level") nursery for this actor is opened here
|
||||||
and when cancelled effectively cancels the actor.
|
and when cancelled effectively cancels the actor.
|
||||||
"""
|
"""
|
||||||
result = None
|
|
||||||
arbiter_addr = arbiter_addr or self._arb_addr
|
arbiter_addr = arbiter_addr or self._arb_addr
|
||||||
registered_with_arbiter = False
|
registered_with_arbiter = False
|
||||||
try:
|
try:
|
||||||
|
@ -373,7 +417,6 @@ class Actor:
|
||||||
# exception back to the parent actor)
|
# exception back to the parent actor)
|
||||||
chan = self._parent_chan = Channel(
|
chan = self._parent_chan = Channel(
|
||||||
destaddr=parent_addr,
|
destaddr=parent_addr,
|
||||||
on_reconnect=self.main
|
|
||||||
)
|
)
|
||||||
await chan.connect()
|
await chan.connect()
|
||||||
# initial handshake, report who we are, who they are
|
# initial handshake, report who we are, who they are
|
||||||
|
@ -382,7 +425,7 @@ class Actor:
|
||||||
log.warn(
|
log.warn(
|
||||||
f"Failed to connect to parent @ {parent_addr},"
|
f"Failed to connect to parent @ {parent_addr},"
|
||||||
" closing server")
|
" closing server")
|
||||||
self.cancel_server()
|
await self.cancel()
|
||||||
self._parent_chan = None
|
self._parent_chan = None
|
||||||
|
|
||||||
# register with the arbiter if we're told its addr
|
# register with the arbiter if we're told its addr
|
||||||
|
@ -393,6 +436,7 @@ class Actor:
|
||||||
name=self.name, sockaddr=self.accept_addr)
|
name=self.name, sockaddr=self.accept_addr)
|
||||||
registered_with_arbiter = True
|
registered_with_arbiter = True
|
||||||
|
|
||||||
|
task_status.started()
|
||||||
# handle new connection back to parent optionally
|
# handle new connection back to parent optionally
|
||||||
# begin responding to RPC
|
# begin responding to RPC
|
||||||
if self._allow_rpc:
|
if self._allow_rpc:
|
||||||
|
@ -401,38 +445,6 @@ class Actor:
|
||||||
nursery.start_soon(
|
nursery.start_soon(
|
||||||
self._process_messages, self._parent_chan)
|
self._process_messages, self._parent_chan)
|
||||||
|
|
||||||
if self.main:
|
|
||||||
try:
|
|
||||||
if self._parent_chan:
|
|
||||||
async with trio.open_nursery() as n:
|
|
||||||
self._main_scope = n.cancel_scope
|
|
||||||
log.debug(f"Starting main task `{self.main}`")
|
|
||||||
# spawned subactor so deliver "main"
|
|
||||||
# task result(s) back to parent
|
|
||||||
await n.start(
|
|
||||||
_invoke, 'main',
|
|
||||||
self._parent_chan, self.main, {},
|
|
||||||
# treat_as_gen, raise_errs params
|
|
||||||
False, True
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
with trio.open_cancel_scope() as main_scope:
|
|
||||||
self._main_scope = main_scope
|
|
||||||
# run directly we are an "unspawned actor"
|
|
||||||
log.debug(f"Running `{self.main}` directly")
|
|
||||||
result = await self.main()
|
|
||||||
finally:
|
|
||||||
# tear down channel server in order to ensure
|
|
||||||
# we exit normally when the main task is done
|
|
||||||
if not self._outlive_main:
|
|
||||||
log.debug(f"Shutting down channel server")
|
|
||||||
self.cancel_server()
|
|
||||||
log.debug(f"Shutting down root nursery")
|
|
||||||
nursery.cancel_scope.cancel()
|
|
||||||
self._main_complete.set()
|
|
||||||
|
|
||||||
if self._main_scope.cancelled_caught:
|
|
||||||
log.debug("Main task was cancelled sucessfully")
|
|
||||||
log.debug("Waiting on root nursery to complete")
|
log.debug("Waiting on root nursery to complete")
|
||||||
# blocks here as expected if no nursery was provided until
|
# blocks here as expected if no nursery was provided until
|
||||||
# the channel server is killed (i.e. this actor is
|
# the channel server is killed (i.e. this actor is
|
||||||
|
@ -441,7 +453,7 @@ class Actor:
|
||||||
if self._parent_chan:
|
if self._parent_chan:
|
||||||
try:
|
try:
|
||||||
await self._parent_chan.send(
|
await self._parent_chan.send(
|
||||||
{'error': traceback.format_exc(), 'cid': 'main'})
|
{'error': traceback.format_exc(), 'cid': 'internal'})
|
||||||
except trio.ClosedStreamError:
|
except trio.ClosedStreamError:
|
||||||
log.error(
|
log.error(
|
||||||
f"Failed to ship error to parent "
|
f"Failed to ship error to parent "
|
||||||
|
@ -458,6 +470,9 @@ class Actor:
|
||||||
# terminate actor once all it's peers (actors that connected
|
# terminate actor once all it's peers (actors that connected
|
||||||
# to it as clients) have disappeared
|
# to it as clients) have disappeared
|
||||||
if not self._no_more_peers.is_set():
|
if not self._no_more_peers.is_set():
|
||||||
|
if any(
|
||||||
|
chan.connected() for chan in chain(*self._peers.values())
|
||||||
|
):
|
||||||
log.debug(
|
log.debug(
|
||||||
f"Waiting for remaining peers {self._peers} to clear")
|
f"Waiting for remaining peers {self._peers} to clear")
|
||||||
await self._no_more_peers.wait()
|
await self._no_more_peers.wait()
|
||||||
|
@ -465,11 +480,8 @@ class Actor:
|
||||||
|
|
||||||
# tear down channel server no matter what since we errored
|
# tear down channel server no matter what since we errored
|
||||||
# or completed
|
# or completed
|
||||||
log.debug(f"Shutting down channel server")
|
|
||||||
self.cancel_server()
|
self.cancel_server()
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
||||||
async def _serve_forever(
|
async def _serve_forever(
|
||||||
self,
|
self,
|
||||||
*,
|
*,
|
||||||
|
@ -514,20 +526,39 @@ class Actor:
|
||||||
log.warn(f"Unable to unregister {self.name} from arbiter")
|
log.warn(f"Unable to unregister {self.name} from arbiter")
|
||||||
|
|
||||||
async def cancel(self):
|
async def cancel(self):
|
||||||
"""This cancels the internal root-most nursery thereby gracefully
|
"""Cancel this actor.
|
||||||
cancelling (for all intents and purposes) this actor.
|
|
||||||
|
The sequence in order is:
|
||||||
|
- cancelling all rpc tasks
|
||||||
|
- cancelling the channel server
|
||||||
|
- cancel the "root" nursery
|
||||||
"""
|
"""
|
||||||
|
# cancel all ongoing rpc tasks
|
||||||
|
await self.cancel_rpc_tasks()
|
||||||
self.cancel_server()
|
self.cancel_server()
|
||||||
if self._main_scope:
|
|
||||||
self._main_scope.cancel()
|
|
||||||
log.debug("Waiting on main task to complete")
|
|
||||||
await self._main_complete.wait()
|
|
||||||
self._root_nursery.cancel_scope.cancel()
|
self._root_nursery.cancel_scope.cancel()
|
||||||
|
|
||||||
|
async def cancel_rpc_tasks(self):
|
||||||
|
"""Cancel all existing RPC responder tasks using the cancel scope
|
||||||
|
registered for each.
|
||||||
|
"""
|
||||||
|
scopes = self._rpc_tasks
|
||||||
|
log.info(f"Cancelling all {len(scopes)} rpc tasks:\n{scopes}")
|
||||||
|
for chan, scopes in scopes.items():
|
||||||
|
log.debug(f"Cancelling all tasks for {chan.uid}")
|
||||||
|
for scope, func in scopes:
|
||||||
|
log.debug(f"Cancelling task for {func}")
|
||||||
|
scope.cancel()
|
||||||
|
if scopes:
|
||||||
|
log.info(
|
||||||
|
f"Waiting for remaining rpc tasks to complete {scopes}")
|
||||||
|
await self._no_more_rpc_tasks.wait()
|
||||||
|
|
||||||
def cancel_server(self):
|
def cancel_server(self):
|
||||||
"""Cancel the internal channel server nursery thereby
|
"""Cancel the internal channel server nursery thereby
|
||||||
preventing any new inbound connections from being established.
|
preventing any new inbound connections from being established.
|
||||||
"""
|
"""
|
||||||
|
log.debug("Shutting down channel server")
|
||||||
self._server_nursery.cancel_scope.cancel()
|
self._server_nursery.cancel_scope.cancel()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -568,7 +599,7 @@ class Arbiter(Actor):
|
||||||
self._registry.pop(name, None)
|
self._registry.pop(name, None)
|
||||||
|
|
||||||
|
|
||||||
async def _start_actor(actor, host, port, arbiter_addr, nursery=None):
|
async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None):
|
||||||
"""Spawn a local actor by starting a task to execute it's main
|
"""Spawn a local actor by starting a task to execute it's main
|
||||||
async function.
|
async function.
|
||||||
|
|
||||||
|
@ -582,18 +613,24 @@ async def _start_actor(actor, host, port, arbiter_addr, nursery=None):
|
||||||
# NOTE: this won't block since we provide the nursery
|
# NOTE: this won't block since we provide the nursery
|
||||||
log.info(f"Starting local {actor} @ {host}:{port}")
|
log.info(f"Starting local {actor} @ {host}:{port}")
|
||||||
|
|
||||||
result = await actor._async_main(
|
async with trio.open_nursery() as nursery:
|
||||||
|
await nursery.start(
|
||||||
|
partial(
|
||||||
|
actor._async_main,
|
||||||
accept_addr=(host, port),
|
accept_addr=(host, port),
|
||||||
parent_addr=None,
|
parent_addr=None,
|
||||||
arbiter_addr=arbiter_addr,
|
arbiter_addr=arbiter_addr,
|
||||||
nursery=nursery,
|
|
||||||
)
|
)
|
||||||
# XXX: If spawned locally, the actor is cancelled when this
|
)
|
||||||
# context is complete given that there are no more active
|
if main is not None:
|
||||||
# peer channels connected to it.
|
result = await main()
|
||||||
if not actor._outlive_main:
|
# XXX: If spawned with a dedicated "main function",
|
||||||
|
# the actor is cancelled when this context is complete
|
||||||
|
# given that there are no more active peer channels connected to it.
|
||||||
actor.cancel_server()
|
actor.cancel_server()
|
||||||
|
|
||||||
|
# block on actor to complete
|
||||||
|
|
||||||
# unset module state
|
# unset module state
|
||||||
_state._current_actor = None
|
_state._current_actor = None
|
||||||
log.info("Completed async main")
|
log.info("Completed async main")
|
||||||
|
|
|
@ -42,20 +42,6 @@ async def _do_handshake(actor, chan):
|
||||||
return uid
|
return uid
|
||||||
|
|
||||||
|
|
||||||
async def result_from_q(q, chan):
|
|
||||||
"""Process a msg from a remote actor.
|
|
||||||
"""
|
|
||||||
first_msg = await q.get()
|
|
||||||
if 'return' in first_msg:
|
|
||||||
return 'return', first_msg, q
|
|
||||||
elif 'yield' in first_msg:
|
|
||||||
return 'yield', first_msg, q
|
|
||||||
elif 'error' in first_msg:
|
|
||||||
raise RemoteActorError(f"{chan.uid}\n" + first_msg['error'])
|
|
||||||
else:
|
|
||||||
raise ValueError(f"{first_msg} is an invalid response packet?")
|
|
||||||
|
|
||||||
|
|
||||||
class Portal:
|
class Portal:
|
||||||
"""A 'portal' to a(n) (remote) ``Actor``.
|
"""A 'portal' to a(n) (remote) ``Actor``.
|
||||||
|
|
||||||
|
@ -67,7 +53,12 @@ class Portal:
|
||||||
"""
|
"""
|
||||||
def __init__(self, channel):
|
def __init__(self, channel):
|
||||||
self.channel = channel
|
self.channel = channel
|
||||||
|
# when this is set to a tuple returned from ``_submit()`` then
|
||||||
|
# it is expected that ``result()`` will be awaited at some point
|
||||||
|
# during the portal's lifetime
|
||||||
self._result = None
|
self._result = None
|
||||||
|
self._expect_result = None
|
||||||
|
self._errored = False
|
||||||
|
|
||||||
async def aclose(self):
|
async def aclose(self):
|
||||||
log.debug(f"Closing {self}")
|
log.debug(f"Closing {self}")
|
||||||
|
@ -75,26 +66,56 @@ class Portal:
|
||||||
# gets in!
|
# gets in!
|
||||||
await self.channel.aclose()
|
await self.channel.aclose()
|
||||||
|
|
||||||
async def run(self, ns, func, **kwargs):
|
async def _submit(self, ns, func, **kwargs):
|
||||||
"""Submit a function to be scheduled and run by actor, return its
|
"""Submit a function to be scheduled and run by actor, return the
|
||||||
(stream of) result(s).
|
associated caller id, response queue, response type str,
|
||||||
|
first message packet as a tuple.
|
||||||
|
|
||||||
|
This is an async call.
|
||||||
"""
|
"""
|
||||||
|
# ship a function call request to the remote actor
|
||||||
|
cid, q = await current_actor().send_cmd(self.channel, ns, func, kwargs)
|
||||||
|
|
||||||
|
# wait on first response msg and handle (this should be
|
||||||
|
# in an immediate response)
|
||||||
|
first_msg = await q.get()
|
||||||
|
functype = first_msg.get('functype')
|
||||||
|
|
||||||
|
if functype == 'function' or functype == 'asyncfunction':
|
||||||
|
resp_type = 'return'
|
||||||
|
elif functype == 'asyncgen':
|
||||||
|
resp_type = 'yield'
|
||||||
|
elif 'error' in first_msg:
|
||||||
|
raise RemoteActorError(
|
||||||
|
f"{self.channel.uid}\n" + first_msg['error'])
|
||||||
|
else:
|
||||||
|
raise ValueError(f"{first_msg} is an invalid response packet?")
|
||||||
|
|
||||||
|
return cid, q, resp_type, first_msg
|
||||||
|
|
||||||
|
async def _submit_for_result(self, ns, func, **kwargs):
|
||||||
|
assert self._expect_result is None, \
|
||||||
|
"A pending main result has already been submitted"
|
||||||
|
self._expect_result = await self._submit(ns, func, **kwargs)
|
||||||
|
|
||||||
|
async def run(self, ns, func, **kwargs):
|
||||||
|
"""Submit a function to be scheduled and run by actor, wrap and return
|
||||||
|
its (stream of) result(s).
|
||||||
|
|
||||||
|
This is a blocking call.
|
||||||
|
"""
|
||||||
|
return await self._return_from_resptype(
|
||||||
|
*(await self._submit(ns, func, **kwargs))
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _return_from_resptype(self, cid, q, resptype, first_msg):
|
||||||
# TODO: not this needs some serious work and thinking about how
|
# TODO: not this needs some serious work and thinking about how
|
||||||
# to make async-generators the fundamental IPC API over channels!
|
# to make async-generators the fundamental IPC API over channels!
|
||||||
# (think `yield from`, `gen.send()`, and functional reactive stuff)
|
# (think `yield from`, `gen.send()`, and functional reactive stuff)
|
||||||
actor = current_actor()
|
|
||||||
# ship a function call request to the remote actor
|
|
||||||
cid, q = await actor.send_cmd(self.channel, ns, func, kwargs)
|
|
||||||
# wait on first response msg and handle
|
|
||||||
return await self._return_from_resptype(
|
|
||||||
cid, *(await result_from_q(q, self.channel)))
|
|
||||||
|
|
||||||
async def _return_from_resptype(self, cid, resptype, first_msg, q):
|
|
||||||
|
|
||||||
if resptype == 'yield':
|
if resptype == 'yield':
|
||||||
|
|
||||||
async def yield_from_q():
|
async def yield_from_q():
|
||||||
yield first_msg['yield']
|
|
||||||
try:
|
try:
|
||||||
async for msg in q:
|
async for msg in q:
|
||||||
try:
|
try:
|
||||||
|
@ -103,8 +124,9 @@ class Portal:
|
||||||
if 'stop' in msg:
|
if 'stop' in msg:
|
||||||
break # far end async gen terminated
|
break # far end async gen terminated
|
||||||
else:
|
else:
|
||||||
raise RemoteActorError(msg['error'])
|
raise RemoteActorError(
|
||||||
except GeneratorExit:
|
f"{self.channel.uid}\n" + msg['error'])
|
||||||
|
except StopAsyncIteration:
|
||||||
log.debug(
|
log.debug(
|
||||||
f"Cancelling async gen call {cid} to "
|
f"Cancelling async gen call {cid} to "
|
||||||
f"{self.channel.uid}")
|
f"{self.channel.uid}")
|
||||||
|
@ -113,22 +135,24 @@ class Portal:
|
||||||
return yield_from_q()
|
return yield_from_q()
|
||||||
|
|
||||||
elif resptype == 'return':
|
elif resptype == 'return':
|
||||||
return first_msg['return']
|
msg = await q.get()
|
||||||
|
try:
|
||||||
|
return msg['return']
|
||||||
|
except KeyError:
|
||||||
|
raise RemoteActorError(
|
||||||
|
f"{self.channel.uid}\n" + msg['error'])
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"Unknown msg response type: {first_msg}")
|
raise ValueError(f"Unknown msg response type: {first_msg}")
|
||||||
|
|
||||||
async def result(self):
|
async def result(self):
|
||||||
"""Return the result(s) from the remote actor's "main" task.
|
"""Return the result(s) from the remote actor's "main" task.
|
||||||
"""
|
"""
|
||||||
if self._result is None:
|
if self._expect_result is None:
|
||||||
q = current_actor().get_waitq(self.channel.uid, 'main')
|
raise RuntimeError("This portal is not expecting a final result?")
|
||||||
resptype, first_msg, q = (await result_from_q(q, self.channel))
|
elif self._result is None:
|
||||||
self._result = await self._return_from_resptype(
|
self._result = await self._return_from_resptype(
|
||||||
'main', resptype, first_msg, q)
|
*self._expect_result
|
||||||
log.warn(
|
)
|
||||||
f"Retrieved first result `{self._result}` "
|
|
||||||
f"for {self.channel.uid}")
|
|
||||||
# await q.put(first_msg) # for next consumer (e.g. nursery)
|
|
||||||
return self._result
|
return self._result
|
||||||
|
|
||||||
async def close(self):
|
async def close(self):
|
||||||
|
@ -153,7 +177,9 @@ class Portal:
|
||||||
log.warn(
|
log.warn(
|
||||||
f"{self.channel} for {self.channel.uid} was already closed?")
|
f"{self.channel} for {self.channel.uid} was already closed?")
|
||||||
return False
|
return False
|
||||||
|
else:
|
||||||
|
log.warn(f"May have failed to cancel {self.channel.uid}")
|
||||||
|
return False
|
||||||
|
|
||||||
class LocalPortal:
|
class LocalPortal:
|
||||||
"""A 'portal' to a local ``Actor``.
|
"""A 'portal' to a local ``Actor``.
|
||||||
|
|
|
@ -2,9 +2,10 @@
|
||||||
``trio`` inspired apis and helpers
|
``trio`` inspired apis and helpers
|
||||||
"""
|
"""
|
||||||
import multiprocessing as mp
|
import multiprocessing as mp
|
||||||
|
import inspect
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
from async_generator import asynccontextmanager
|
from async_generator import asynccontextmanager, aclosing
|
||||||
|
|
||||||
from ._state import current_actor
|
from ._state import current_actor
|
||||||
from .log import get_logger, get_loglevel
|
from .log import get_logger, get_loglevel
|
||||||
|
@ -22,9 +23,9 @@ class ActorNursery:
|
||||||
def __init__(self, actor, supervisor=None):
|
def __init__(self, actor, supervisor=None):
|
||||||
self.supervisor = supervisor # TODO
|
self.supervisor = supervisor # TODO
|
||||||
self._actor = actor
|
self._actor = actor
|
||||||
# We'll likely want some way to cancel all sub-actors eventually
|
|
||||||
# self.cancel_scope = cancel_scope
|
|
||||||
self._children = {}
|
self._children = {}
|
||||||
|
# portals spawned with ``run_in_actor()``
|
||||||
|
self._cancel_after_result_on_exit = set()
|
||||||
self.cancelled = False
|
self.cancelled = False
|
||||||
|
|
||||||
async def __aenter__(self):
|
async def __aenter__(self):
|
||||||
|
@ -33,11 +34,9 @@ class ActorNursery:
|
||||||
async def start_actor(
|
async def start_actor(
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
main=None,
|
|
||||||
bind_addr=('127.0.0.1', 0),
|
bind_addr=('127.0.0.1', 0),
|
||||||
statespace=None,
|
statespace=None,
|
||||||
rpc_module_paths=None,
|
rpc_module_paths=None,
|
||||||
outlive_main=False, # sub-actors die when their main task completes
|
|
||||||
loglevel=None, # set log level per subactor
|
loglevel=None, # set log level per subactor
|
||||||
):
|
):
|
||||||
loglevel = loglevel or self._actor.loglevel or get_loglevel()
|
loglevel = loglevel or self._actor.loglevel or get_loglevel()
|
||||||
|
@ -46,8 +45,6 @@ class ActorNursery:
|
||||||
# modules allowed to invoked funcs from
|
# modules allowed to invoked funcs from
|
||||||
rpc_module_paths=rpc_module_paths or [],
|
rpc_module_paths=rpc_module_paths or [],
|
||||||
statespace=statespace, # global proc state vars
|
statespace=statespace, # global proc state vars
|
||||||
main=main, # main coroutine to be invoked
|
|
||||||
outlive_main=outlive_main,
|
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
arbiter_addr=current_actor()._arb_addr,
|
arbiter_addr=current_actor()._arb_addr,
|
||||||
)
|
)
|
||||||
|
@ -59,6 +56,11 @@ class ActorNursery:
|
||||||
# daemon=True,
|
# daemon=True,
|
||||||
name=name,
|
name=name,
|
||||||
)
|
)
|
||||||
|
# register the process before start in case we get a cancel
|
||||||
|
# request before the actor has fully spawned - then we can wait
|
||||||
|
# for it to fully come up before sending a cancel request
|
||||||
|
self._children[actor.uid] = [actor, proc, None]
|
||||||
|
|
||||||
proc.start()
|
proc.start()
|
||||||
if not proc.is_alive():
|
if not proc.is_alive():
|
||||||
raise ActorFailure("Couldn't start sub-actor?")
|
raise ActorFailure("Couldn't start sub-actor?")
|
||||||
|
@ -69,7 +71,39 @@ class ActorNursery:
|
||||||
# local actor by the time we get a ref to it
|
# local actor by the time we get a ref to it
|
||||||
event, chan = await self._actor.wait_for_peer(actor.uid)
|
event, chan = await self._actor.wait_for_peer(actor.uid)
|
||||||
portal = Portal(chan)
|
portal = Portal(chan)
|
||||||
self._children[(name, proc.pid)] = (actor, proc, portal)
|
self._children[actor.uid][2] = portal
|
||||||
|
return portal
|
||||||
|
|
||||||
|
async def run_in_actor(
|
||||||
|
self,
|
||||||
|
name,
|
||||||
|
fn,
|
||||||
|
bind_addr=('127.0.0.1', 0),
|
||||||
|
rpc_module_paths=None,
|
||||||
|
statespace=None,
|
||||||
|
loglevel=None, # set log level per subactor
|
||||||
|
**kwargs, # explicit args to ``fn``
|
||||||
|
):
|
||||||
|
"""Spawn a new actor, run a lone task, then terminate the actor and
|
||||||
|
return its result.
|
||||||
|
|
||||||
|
Actors spawned using this method are kept alive at nursery teardown
|
||||||
|
until the task spawned by executing ``fn`` completes at which point
|
||||||
|
the actor is terminated.
|
||||||
|
"""
|
||||||
|
mod_path = fn.__module__
|
||||||
|
portal = await self.start_actor(
|
||||||
|
name,
|
||||||
|
rpc_module_paths=[mod_path],
|
||||||
|
bind_addr=bind_addr,
|
||||||
|
statespace=statespace,
|
||||||
|
)
|
||||||
|
await portal._submit_for_result(
|
||||||
|
mod_path,
|
||||||
|
fn.__name__,
|
||||||
|
**kwargs
|
||||||
|
)
|
||||||
|
self._cancel_after_result_on_exit.add(portal)
|
||||||
return portal
|
return portal
|
||||||
|
|
||||||
async def wait(self):
|
async def wait(self):
|
||||||
|
@ -82,27 +116,34 @@ class ActorNursery:
|
||||||
# please god don't hang
|
# please god don't hang
|
||||||
proc.join()
|
proc.join()
|
||||||
log.debug(f"Joined {proc}")
|
log.debug(f"Joined {proc}")
|
||||||
event = self._actor._peers.get(actor.uid)
|
self._children.pop(actor.uid)
|
||||||
if isinstance(event, trio.Event):
|
|
||||||
event.set()
|
async def wait_for_result(portal, actor):
|
||||||
log.warn(
|
# cancel the actor gracefully
|
||||||
f"Cancelled `wait_for_peer()` call since {actor.uid}"
|
log.info(f"Cancelling {portal.channel.uid} gracefully")
|
||||||
f" is already dead!")
|
await portal.cancel_actor()
|
||||||
if not portal._result:
|
|
||||||
log.debug(f"Faking result for {actor.uid}")
|
|
||||||
q = self._actor.get_waitq(actor.uid, 'main')
|
|
||||||
q.put_nowait({'return': None, 'cid': 'main'})
|
|
||||||
|
|
||||||
async def wait_for_result(portal):
|
|
||||||
if portal.channel.connected():
|
|
||||||
log.debug(f"Waiting on final result from {subactor.uid}")
|
log.debug(f"Waiting on final result from {subactor.uid}")
|
||||||
await portal.result()
|
res = await portal.result()
|
||||||
|
# if it's an async-gen then we should alert the user
|
||||||
|
# that we're cancelling it
|
||||||
|
if inspect.isasyncgen(res):
|
||||||
|
log.warn(
|
||||||
|
f"Blindly consuming asyncgen for {actor.uid}")
|
||||||
|
with trio.fail_after(1):
|
||||||
|
async with aclosing(res) as agen:
|
||||||
|
async for item in agen:
|
||||||
|
log.debug(f"Consuming item {item}")
|
||||||
|
|
||||||
# unblocks when all waiter tasks have completed
|
# unblocks when all waiter tasks have completed
|
||||||
|
children = self._children.copy()
|
||||||
async with trio.open_nursery() as nursery:
|
async with trio.open_nursery() as nursery:
|
||||||
for subactor, proc, portal in self._children.values():
|
for subactor, proc, portal in children.values():
|
||||||
nursery.start_soon(wait_for_proc, proc, subactor, portal)
|
nursery.start_soon(wait_for_proc, proc, subactor, portal)
|
||||||
nursery.start_soon(wait_for_result, portal)
|
if proc.is_alive() and (
|
||||||
|
portal in self._cancel_after_result_on_exit
|
||||||
|
):
|
||||||
|
nursery.start_soon(wait_for_result, portal, subactor)
|
||||||
|
|
||||||
async def cancel(self, hard_kill=False):
|
async def cancel(self, hard_kill=False):
|
||||||
"""Cancel this nursery by instructing each subactor to cancel
|
"""Cancel this nursery by instructing each subactor to cancel
|
||||||
|
@ -111,20 +152,37 @@ class ActorNursery:
|
||||||
If ``hard_killl`` is set to ``True`` then kill the processes
|
If ``hard_killl`` is set to ``True`` then kill the processes
|
||||||
directly without any far end graceful ``trio`` cancellation.
|
directly without any far end graceful ``trio`` cancellation.
|
||||||
"""
|
"""
|
||||||
log.debug(f"Cancelling nursery")
|
def do_hard_kill(proc):
|
||||||
for subactor, proc, portal in self._children.values():
|
|
||||||
if proc is mp.current_process():
|
|
||||||
# XXX: does this even make sense?
|
|
||||||
await subactor.cancel()
|
|
||||||
else:
|
|
||||||
if hard_kill:
|
|
||||||
log.warn(f"Hard killing subactors {self._children}")
|
log.warn(f"Hard killing subactors {self._children}")
|
||||||
proc.terminate()
|
proc.terminate()
|
||||||
# XXX: doesn't seem to work?
|
# XXX: below doesn't seem to work?
|
||||||
# send KeyBoardInterrupt (trio abort signal) to sub-actors
|
# send KeyBoardInterrupt (trio abort signal) to sub-actors
|
||||||
# os.kill(proc.pid, signal.SIGINT)
|
# os.kill(proc.pid, signal.SIGINT)
|
||||||
|
|
||||||
|
log.debug(f"Cancelling nursery")
|
||||||
|
with trio.fail_after(3):
|
||||||
|
async with trio.open_nursery() as n:
|
||||||
|
for subactor, proc, portal in self._children.values():
|
||||||
|
if hard_kill:
|
||||||
|
do_hard_kill(proc)
|
||||||
else:
|
else:
|
||||||
await portal.cancel_actor()
|
if portal is None: # actor hasn't fully spawned yet
|
||||||
|
event = self._actor._peer_connected[subactor.uid]
|
||||||
|
log.warn(
|
||||||
|
f"{subactor.uid} wasn't finished spawning?")
|
||||||
|
await event.wait()
|
||||||
|
# channel/portal should now be up
|
||||||
|
_, _, portal = self._children[subactor.uid]
|
||||||
|
if portal is None:
|
||||||
|
# cancelled while waiting on the event?
|
||||||
|
chan = self._actor._peers[subactor.uid][-1]
|
||||||
|
if chan:
|
||||||
|
portal = Portal(chan)
|
||||||
|
else: # there's no other choice left
|
||||||
|
do_hard_kill(proc)
|
||||||
|
|
||||||
|
# spawn cancel tasks async
|
||||||
|
n.start_soon(portal.cancel_actor)
|
||||||
|
|
||||||
log.debug(f"Waiting on all subactors to complete")
|
log.debug(f"Waiting on all subactors to complete")
|
||||||
await self.wait()
|
await self.wait()
|
||||||
|
@ -134,12 +192,13 @@ class ActorNursery:
|
||||||
async def __aexit__(self, etype, value, tb):
|
async def __aexit__(self, etype, value, tb):
|
||||||
"""Wait on all subactor's main routines to complete.
|
"""Wait on all subactor's main routines to complete.
|
||||||
"""
|
"""
|
||||||
|
try:
|
||||||
if etype is not None:
|
if etype is not None:
|
||||||
# XXX: hypothetically an error could be raised and then
|
# XXX: hypothetically an error could be raised and then
|
||||||
# a cancel signal shows up slightly after in which case the
|
# a cancel signal shows up slightly after in which case the
|
||||||
# else block here might not complete? Should both be shielded?
|
# else block here might not complete? Should both be shielded?
|
||||||
if etype is trio.Cancelled:
|
|
||||||
with trio.open_cancel_scope(shield=True):
|
with trio.open_cancel_scope(shield=True):
|
||||||
|
if etype is trio.Cancelled:
|
||||||
log.warn(
|
log.warn(
|
||||||
f"{current_actor().uid} was cancelled with {etype}"
|
f"{current_actor().uid} was cancelled with {etype}"
|
||||||
", cancelling actor nursery")
|
", cancelling actor nursery")
|
||||||
|
@ -160,6 +219,10 @@ class ActorNursery:
|
||||||
await self.cancel()
|
await self.cancel()
|
||||||
raise
|
raise
|
||||||
log.debug(f"Nursery teardown complete")
|
log.debug(f"Nursery teardown complete")
|
||||||
|
except Exception:
|
||||||
|
log.exception("Error on nursery exit:")
|
||||||
|
await self.wait()
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
|
|
Loading…
Reference in New Issue