Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet 051ea3f99d Proxy asyncio cancelleds as well 2021-07-02 13:20:00 -04:00
Tyler Goodlet 40984b9f0e Power of 2 cuz puters 2021-07-02 13:20:00 -04:00
Tyler Goodlet 93ffffc047 Don't kill root's immediate children when in debug
If the root calls `trio.Process.kill()` on immediate child proc teardown
when the child is using pdb, we can get stdstreams clobbering that
results in a pdb++ repl where the user can't see what's been typed. Not
killing such children on cancellation / error seems to resolve this
issue whilst still giving reliable termination. For now, code that
special path until a time it becomes a problem for ensuring zombie
reaps.
2021-07-02 13:20:00 -04:00
Tyler Goodlet 45b1a834b0 WIP redo asyncio async gen streaming 2021-07-02 13:20:00 -04:00
Tyler Goodlet ad98a5a2df Support asyncio actors with the trio spawner backend 2021-07-02 13:20:00 -04:00
Tyler Goodlet 625def2260 Support sync code breakpointing via built-in
Override `breakpoint()` for sync code making it work
properly with `trio` as per:

https://github.com/python-trio/trio/issues/1155#issuecomment-742964018

Relates to #193
2021-07-02 13:20:00 -04:00
Tyler Goodlet b08107ada5 Support asyncio actors with the trio spawner backend 2021-07-02 13:20:00 -04:00
Tyler Goodlet 3353e09978 Export portal type at top level 2021-07-02 13:20:00 -04:00
Tyler Goodlet 34b6360477 Link to SC on wikipedia 2021-07-02 13:20:00 -04:00
Tyler Goodlet 8d9c945487 Add per actor debug mode toggle 2021-07-02 13:20:00 -04:00
8 changed files with 166 additions and 38 deletions

View File

@ -322,6 +322,7 @@ channel`_!
.. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich .. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228 .. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
.. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s .. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s
.. .. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts
.. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s .. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s
.. _trio gitter channel: https://gitter.im/python-trio/general .. _trio gitter channel: https://gitter.im/python-trio/general
.. _matrix channel: https://matrix.to/#/!tractor:matrix.org .. _matrix channel: https://matrix.to/#/!tractor:matrix.org
@ -330,7 +331,7 @@ channel`_!
.. _messages: https://en.wikipedia.org/wiki/Message_passing .. _messages: https://en.wikipedia.org/wiki/Message_passing
.. _trio docs: https://trio.readthedocs.io/en/latest/ .. _trio docs: https://trio.readthedocs.io/en/latest/
.. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ .. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
.. _structured concurrency: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ .. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency
.. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony .. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony
.. _async generators: https://www.python.org/dev/peps/pep-0525/ .. _async generators: https://www.python.org/dev/peps/pep-0525/
.. _trio-parallel: https://github.com/richardsheridan/trio-parallel .. _trio-parallel: https://github.com/richardsheridan/trio-parallel

View File

@ -23,6 +23,7 @@ from ._exceptions import (
from ._debug import breakpoint, post_mortem from ._debug import breakpoint, post_mortem
from . import msg from . import msg
from ._root import run, run_daemon, open_root_actor from ._root import run, run_daemon, open_root_actor
from ._portal import Portal
__all__ = [ __all__ = [
@ -40,6 +41,7 @@ __all__ = [
'msg', 'msg',
'open_nursery', 'open_nursery',
'open_root_actor', 'open_root_actor',
'Portal',
'post_mortem', 'post_mortem',
'run', 'run',
'run_daemon', 'run_daemon',

View File

@ -574,7 +574,7 @@ class Actor:
try: try:
send_chan, recv_chan = self._cids2qs[(actorid, cid)] send_chan, recv_chan = self._cids2qs[(actorid, cid)]
except KeyError: except KeyError:
send_chan, recv_chan = trio.open_memory_channel(1000) send_chan, recv_chan = trio.open_memory_channel(2*10)
send_chan.cid = cid # type: ignore send_chan.cid = cid # type: ignore
recv_chan.cid = cid # type: ignore recv_chan.cid = cid # type: ignore
self._cids2qs[(actorid, cid)] = send_chan, recv_chan self._cids2qs[(actorid, cid)] = send_chan, recv_chan

View File

@ -69,6 +69,8 @@ def _trio_main(
""" """
log.info(f"Started new trio process for {actor.uid}") log.info(f"Started new trio process for {actor.uid}")
log.info(f"Started new trio process for {actor.uid}")
if actor.loglevel is not None: if actor.loglevel is not None:
log.info( log.info(
f"Setting loglevel for {actor.uid} to {actor.loglevel}") f"Setting loglevel for {actor.uid} to {actor.loglevel}")

View File

@ -229,7 +229,7 @@ def run(
def run_daemon( def run_daemon(
rpc_module_paths: List[str], enable_modules: List[str],
**kwargs **kwargs
) -> None: ) -> None:
"""Spawn daemon actor which will respond to RPC. """Spawn daemon actor which will respond to RPC.
@ -238,9 +238,9 @@ def run_daemon(
``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned ``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned
is meant to run forever responding to RPC requests. is meant to run forever responding to RPC requests.
""" """
kwargs['rpc_module_paths'] = list(rpc_module_paths) kwargs['enable_modules'] = list(enable_modules)
for path in rpc_module_paths: for path in enable_modules:
importlib.import_module(path) importlib.import_module(path)
return run(partial(trio.sleep, float('inf')), **kwargs) return run(partial(trio.sleep, float('inf')), **kwargs)

View File

@ -366,6 +366,7 @@ async def new_proc(
bind_addr=bind_addr, bind_addr=bind_addr,
parent_addr=parent_addr, parent_addr=parent_addr,
_runtime_vars=_runtime_vars, _runtime_vars=_runtime_vars,
infect_asyncio=infect_asyncio,
task_status=task_status, task_status=task_status,
) )
@ -381,6 +382,7 @@ async def mp_new_proc(
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child _runtime_vars: Dict[str, Any], # serialized and sent to _child
*, *,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None: ) -> None:

View File

@ -62,6 +62,7 @@ class ActorNursery:
loglevel: str = None, # set log level per subactor loglevel: str = None, # set log level per subactor
nursery: trio.Nursery = None, nursery: trio.Nursery = None,
infect_asyncio: bool = False, infect_asyncio: bool = False,
debug_mode: Optional[bool] = None,
) -> Portal: ) -> Portal:
loglevel = loglevel or self._actor.loglevel or get_loglevel() loglevel = loglevel or self._actor.loglevel or get_loglevel()
@ -69,6 +70,10 @@ class ActorNursery:
_rtv = _state._runtime_vars.copy() _rtv = _state._runtime_vars.copy()
_rtv['_is_root'] = False _rtv['_is_root'] = False
# allow setting debug policy per actor
if debug_mode is not None:
_rtv['_debug_mode'] = debug_mode
enable_modules = enable_modules or [] enable_modules = enable_modules or []
if rpc_module_paths: if rpc_module_paths:

View File

@ -43,15 +43,16 @@ async def consume_asyncgen(
to_trio.send_nowait(item) to_trio.send_nowait(item)
async def run_task( def _run_asyncio_task(
func: Callable, func: Callable,
*, *,
qsize: int = 2**10, qsize: int = 1,
_treat_as_stream: bool = False, _treat_as_stream: bool = False,
**kwargs, **kwargs,
) -> Any: ) -> Any:
"""Run an ``asyncio`` async function or generator in a task, return """Run an ``asyncio`` async function or generator in a task, return
or stream the result back to ``trio``. or stream the result back to ``trio``.
""" """
assert current_actor().is_infected_aio() assert current_actor().is_infected_aio()
@ -59,29 +60,38 @@ async def run_task(
from_trio = asyncio.Queue(qsize) # type: ignore from_trio = asyncio.Queue(qsize) # type: ignore
to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore
from_aio._err = None
args = tuple(inspect.getfullargspec(func).args) args = tuple(inspect.getfullargspec(func).args)
if getattr(func, '_tractor_steam_function', None): if getattr(func, '_tractor_steam_function', None):
# the assumption is that the target async routine accepts the # the assumption is that the target async routine accepts the
# send channel then it intends to yield more then one return # send channel then it intends to yield more then one return
# value otherwise it would just return ;P # value otherwise it would just return ;P
_treat_as_stream = True # _treat_as_stream = True
assert qsize > 1
# allow target func to accept/stream results manually by name # allow target func to accept/stream results manually by name
if 'to_trio' in args: if 'to_trio' in args:
kwargs['to_trio'] = to_trio kwargs['to_trio'] = to_trio
if 'from_trio' in args: if 'from_trio' in args:
kwargs['from_trio'] = from_trio kwargs['from_trio'] = from_trio
# if 'from_aio' in args:
# kwargs['from_aio'] = from_aio
coro = func(**kwargs) coro = func(**kwargs)
cancel_scope = trio.CancelScope() # cancel_scope = trio.CancelScope()
# start the asyncio task we submitted from trio # start the asyncio task we submitted from trio
if inspect.isawaitable(coro): if inspect.isawaitable(coro):
task = asyncio.create_task(run_coro(to_trio, coro)) task = asyncio.create_task(run_coro(to_trio, coro))
elif inspect.isasyncgen(coro): elif inspect.isasyncgen(coro):
task = asyncio.create_task(consume_asyncgen(to_trio, coro)) task = asyncio.create_task(consume_asyncgen(to_trio, coro))
else: else:
raise TypeError(f"No support for invoking {coro}") raise TypeError(f"No support for invoking {coro}")
@ -91,58 +101,164 @@ async def run_task(
"""Cancel the calling ``trio`` task on error. """Cancel the calling ``trio`` task on error.
""" """
nonlocal aio_err nonlocal aio_err
aio_err = task.exception() try:
aio_err = task.exception()
except asyncio.CancelledError as cerr:
aio_err = cerr
if aio_err: if aio_err:
log.exception(f"asyncio task errorred:\n{aio_err}") log.exception(f"asyncio task errorred:\n{aio_err}")
cancel_scope.cancel() # cancel_scope.cancel()
from_aio._err = aio_err
to_trio.close()
task.add_done_callback(cancel_trio) task.add_done_callback(cancel_trio)
return task, from_aio, to_trio
async def run_task(
func: Callable,
*,
qsize: int = 2**10,
_treat_as_stream: bool = False,
**kwargs,
) -> Any:
"""Run an ``asyncio`` async function or generator in a task, return
or stream the result back to ``trio``.
"""
# assert current_actor().is_infected_aio()
# # ITC (inter task comms)
# from_trio = asyncio.Queue(qsize) # type: ignore
# to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore
# args = tuple(inspect.getfullargspec(func).args)
# if getattr(func, '_tractor_steam_function', None):
# # the assumption is that the target async routine accepts the
# # send channel then it intends to yield more then one return
# # value otherwise it would just return ;P
# _treat_as_stream = True
# # allow target func to accept/stream results manually by name
# if 'to_trio' in args:
# kwargs['to_trio'] = to_trio
# if 'from_trio' in args:
# kwargs['from_trio'] = from_trio
# coro = func(**kwargs)
# cancel_scope = trio.CancelScope()
# # start the asyncio task we submitted from trio
# if inspect.isawaitable(coro):
# task = asyncio.create_task(run_coro(to_trio, coro))
# elif inspect.isasyncgen(coro):
# task = asyncio.create_task(consume_asyncgen(to_trio, coro))
# else:
# raise TypeError(f"No support for invoking {coro}")
# aio_err = None
# def cancel_trio(task):
# """Cancel the calling ``trio`` task on error.
# """
# nonlocal aio_err
# aio_err = task.exception()
# if aio_err:
# log.exception(f"asyncio task errorred:\n{aio_err}")
# cancel_scope.cancel()
# task.add_done_callback(cancel_trio)
# async iterator # async iterator
if inspect.isasyncgen(coro) or _treat_as_stream: # if inspect.isasyncgen(coro) or _treat_as_stream:
async def stream_results(): # if inspect.isasyncgenfunction(meth) or :
try: if _treat_as_stream:
with cancel_scope:
# stream values upward
async with from_aio:
async for item in from_aio:
yield item
if cancel_scope.cancelled_caught: task, from_aio, to_trio = _run_asyncio_task(
# always raise from any captured asyncio error func,
if aio_err: qsize=2**8,
raise aio_err **kwargs,
)
except BaseException as err: return from_aio
if aio_err is not None:
# always raise from any captured asyncio error
raise err from aio_err
else:
raise
return stream_results() # async def stream_results():
# try:
# with cancel_scope:
# # stream values upward
# async with from_aio:
# async for item in from_aio:
# yield item
# if cancel_scope.cancelled_caught:
# # always raise from any captured asyncio error
# if aio_err:
# raise aio_err
# except BaseException as err:
# if aio_err is not None:
# # always raise from any captured asyncio error
# raise err from aio_err
# else:
# raise
# finally:
# # breakpoint()
# task.cancel()
# return stream_results()
# simple async func # simple async func
try: try:
with cancel_scope: task, from_aio, to_trio = _run_asyncio_task(
# return single value func,
return await from_aio.receive() qsize=1,
**kwargs,
)
if cancel_scope.cancelled_caught: # with cancel_scope:
# always raise from any captured asyncio error # async with from_aio:
if aio_err: # return single value
raise aio_err return await from_aio.receive()
# if cancel_scope.cancelled_caught:
# # always raise from any captured asyncio error
# if aio_err:
# raise aio_err
# Do we need this? # Do we need this?
except BaseException as err: except Exception as err:
# await tractor.breakpoint()
aio_err = from_aio._err
# try:
if aio_err is not None: if aio_err is not None:
# always raise from any captured asyncio error # always raise from any captured asyncio error
raise err from aio_err raise err from aio_err
else: else:
raise raise
# finally:
# if not task.done():
# task.cancel()
except trio.Cancelled:
if not task.done():
task.cancel()
raise
# async def stream_from_task
# pass
def run_as_asyncio_guest( def run_as_asyncio_guest(