Compare commits
	
		
			4 Commits 
		
	
	
		
			29db08b370
			...
			be2f4f306e
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						be2f4f306e | |
| 
							
							
								 | 
						65ae2dc67c | |
| 
							
							
								 | 
						4be499fb1f | |
| 
							
							
								 | 
						7317bb269c | 
| 
						 | 
				
			
			@ -0,0 +1,360 @@
 | 
			
		|||
# tractor: structured concurrent "actors".
 | 
			
		||||
# Copyright 2018-eternity Tyler Goodlet.
 | 
			
		||||
 | 
			
		||||
# This program is free software: you can redistribute it and/or modify
 | 
			
		||||
# it under the terms of the GNU Affero General Public License as published by
 | 
			
		||||
# the Free Software Foundation, either version 3 of the License, or
 | 
			
		||||
# (at your option) any later version.
 | 
			
		||||
 | 
			
		||||
# This program is distributed in the hope that it will be useful,
 | 
			
		||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
			
		||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
			
		||||
# GNU Affero General Public License for more details.
 | 
			
		||||
 | 
			
		||||
# You should have received a copy of the GNU Affero General Public License
 | 
			
		||||
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
A shm-thread-as-generator-fn-ctx **prototype** in anticipation of
 | 
			
		||||
free-threading (aka GIL-less threads) in py 3.13+
 | 
			
		||||
 | 
			
		||||
Bo
 | 
			
		||||
 | 
			
		||||
Main rationale,
 | 
			
		||||
- binding a bg-thread to a "suspendable fn scope" means avoiding any
 | 
			
		||||
  locking around shm-data-structures and, except for a single
 | 
			
		||||
  `threading.Condition` (or better) for thread-context-switching,
 | 
			
		||||
  enables a "pure-ish" style semantic for inter-thread
 | 
			
		||||
  value-passing-IO between a "parent" and (bg) "child" shm-thread
 | 
			
		||||
  where the only (allowed) data-flow would be via immutable values in
 | 
			
		||||
  a "coroutine-style" using,
 | 
			
		||||
 | 
			
		||||
  - parent-side:
 | 
			
		||||
   |_ `callee_sent = gen.send(caller_sent)`
 | 
			
		||||
 | 
			
		||||
  - child-side:
 | 
			
		||||
   |_ `caller_sent = yield callee_sent`
 | 
			
		||||
 | 
			
		||||
Related (official) reading,
 | 
			
		||||
- https://docs.python.org/3/glossary.html#term-free-threading
 | 
			
		||||
- https://peps.python.org/pep-0703/
 | 
			
		||||
 |_https://peps.python.org/pep-0703/#optimistic-avoiding-locking-in-dict-and-list-accesses
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
from contextlib import (
 | 
			
		||||
    contextmanager as cm,
 | 
			
		||||
)
 | 
			
		||||
import inspect
 | 
			
		||||
# from functools import partial
 | 
			
		||||
import time
 | 
			
		||||
import threading
 | 
			
		||||
from typing import (
 | 
			
		||||
    Any,
 | 
			
		||||
    Generator,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
import tractor
 | 
			
		||||
import trio
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
log = tractor.log.get_console_log(
 | 
			
		||||
    'info',
 | 
			
		||||
    # ^ XXX causes latency with seed>=1e3
 | 
			
		||||
    # 'warning',
 | 
			
		||||
)
 | 
			
		||||
_seed: int = int(1e3)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def thread_gen(seed: int):
 | 
			
		||||
    thr = threading.current_thread()
 | 
			
		||||
    log.info(
 | 
			
		||||
        f'thr: {thr.name} @ {thr.ident}\n'
 | 
			
		||||
        f'    |_{thr!r}\n'
 | 
			
		||||
        f'\n'
 | 
			
		||||
        f'IN `thread_gen(seed={seed})`\n'
 | 
			
		||||
        f'\n'
 | 
			
		||||
        f'Starting range()-loop\n'
 | 
			
		||||
    )
 | 
			
		||||
    for i in range(seed):
 | 
			
		||||
        log.info(
 | 
			
		||||
            f'yielding i={i}\n'
 | 
			
		||||
        )
 | 
			
		||||
        from_main = yield i
 | 
			
		||||
        log.info(
 | 
			
		||||
            f'(from_main := {from_main}) = yield (i:={i})\n'
 | 
			
		||||
        )
 | 
			
		||||
        # time.sleep(0.0001)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# TODO, how would we get the equiv from a pub trio-API?
 | 
			
		||||
# -[ ] what about an inter-thread channel much like we have for
 | 
			
		||||
# `to_asyncio` & guest mode??
 | 
			
		||||
#
 | 
			
		||||
# async def spawn_bg_thread_running_gen(fn):
 | 
			
		||||
#     log.info('running trio.to_thread.run_sync()')
 | 
			
		||||
#     await trio.to_thread.run_sync(
 | 
			
		||||
#        partial(
 | 
			
		||||
#          run_gen_in_thread,
 | 
			
		||||
#          fn=fn,
 | 
			
		||||
#          seed=_seed,
 | 
			
		||||
#         )
 | 
			
		||||
#     )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# ?TODO? once correct, wrap this as a @deco-API?
 | 
			
		||||
# -[ ] @generator_thread or similar?
 | 
			
		||||
#
 | 
			
		||||
def run_gen_in_thread(
 | 
			
		||||
    cond: threading.Condition,
 | 
			
		||||
    gen: Generator,
 | 
			
		||||
    # ^NOTE, already closure-bound-in tgt generator-fn-instance which
 | 
			
		||||
    # will be yielded to in the bg-thread!
 | 
			
		||||
):
 | 
			
		||||
    thr: threading.Thread = threading.current_thread()
 | 
			
		||||
    log.info(
 | 
			
		||||
        f'thr: {thr.name} @ {thr.ident}\n'
 | 
			
		||||
        f'    |_{thr!r}\n'
 | 
			
		||||
        f'\n'
 | 
			
		||||
        f'IN `run_gen_in_thread(gen={gen})`\n'
 | 
			
		||||
        f'\n'
 | 
			
		||||
        f'entering gen blocking: {gen!r}\n'
 | 
			
		||||
    )
 | 
			
		||||
    try:
 | 
			
		||||
        log.runtime('locking cond..')
 | 
			
		||||
        with cond:
 | 
			
		||||
            log.runtime('LOCKED cond..')
 | 
			
		||||
            first_yielded = gen.send(None)
 | 
			
		||||
            assert cond.to_yield is None
 | 
			
		||||
            cond.to_yield = first_yielded
 | 
			
		||||
            log.runtime('notifying cond..')
 | 
			
		||||
            cond.notify()
 | 
			
		||||
            log.runtime('waiting cond..')
 | 
			
		||||
            cond.wait()
 | 
			
		||||
 | 
			
		||||
            while (to_send := cond.to_send) is not None:
 | 
			
		||||
                try:
 | 
			
		||||
                    yielded = gen.send(to_send)
 | 
			
		||||
                except StopIteration as siter:
 | 
			
		||||
                    # TODO, check for return value?
 | 
			
		||||
                    # if (ret := siter.value):
 | 
			
		||||
                    #     cond.to_return = ret
 | 
			
		||||
                    assert siter
 | 
			
		||||
                    log.exception(f'{gen} exited')
 | 
			
		||||
                    raise
 | 
			
		||||
 | 
			
		||||
                cond.to_yield = yielded
 | 
			
		||||
                log.runtime('LOOP notifying cond..')
 | 
			
		||||
                cond.notify()
 | 
			
		||||
                log.runtime('LOOP waiting cond..')
 | 
			
		||||
                cond.wait()
 | 
			
		||||
 | 
			
		||||
            # out = (yield from gen)
 | 
			
		||||
            log.runtime('RELEASE-ing cond..')
 | 
			
		||||
 | 
			
		||||
        # with cond block-end
 | 
			
		||||
        log.runtime('RELEASE-ed cond..')
 | 
			
		||||
 | 
			
		||||
    except BaseException:
 | 
			
		||||
        log.exception(f'exited gen: {gen!r}\n')
 | 
			
		||||
        raise
 | 
			
		||||
 | 
			
		||||
    finally:
 | 
			
		||||
        log.warning(
 | 
			
		||||
            'Exiting bg thread!\n'
 | 
			
		||||
        )
 | 
			
		||||
        # TODO! better then this null setting naivety!
 | 
			
		||||
        # -[ ] maybe an Unresolved or similar like for our `Context`?
 | 
			
		||||
        #
 | 
			
		||||
        # apply sentinel
 | 
			
		||||
        cond.to_yield = None
 | 
			
		||||
        with cond:
 | 
			
		||||
            cond.notify_all()
 | 
			
		||||
 | 
			
		||||
@cm
 | 
			
		||||
def start_in_bg_thread(
 | 
			
		||||
    gen: Generator,
 | 
			
		||||
 | 
			
		||||
    # ?TODO?, is this useful to pass startup-ctx to the thread?
 | 
			
		||||
    name: str|None = None,
 | 
			
		||||
    **kwargs,
 | 
			
		||||
 | 
			
		||||
) -> tuple[
 | 
			
		||||
    threading.Thread,
 | 
			
		||||
    Generator,
 | 
			
		||||
    Any,
 | 
			
		||||
]:
 | 
			
		||||
    if not inspect.isgenerator(gen):
 | 
			
		||||
        raise ValueError(
 | 
			
		||||
            f'You must pass a `gen: Generator` instance\n'
 | 
			
		||||
            f'gen={gen!r}\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    # ?TODO? wrap this stuff into some kinda
 | 
			
		||||
    # single-entry-inter-thread mem-chan?
 | 
			
		||||
    #
 | 
			
		||||
    cond = threading.Condition()
 | 
			
		||||
    cond.to_send = None
 | 
			
		||||
    cond.to_yield = None
 | 
			
		||||
    cond.to_return = None
 | 
			
		||||
 | 
			
		||||
    thr = threading.Thread(
 | 
			
		||||
        target=run_gen_in_thread,
 | 
			
		||||
        # args=(),  # ?TODO, useful?
 | 
			
		||||
        kwargs={
 | 
			
		||||
            'cond': cond,
 | 
			
		||||
            'gen': gen,
 | 
			
		||||
        } | kwargs,
 | 
			
		||||
        name=name or gen.__name__,
 | 
			
		||||
    )
 | 
			
		||||
    log.info(
 | 
			
		||||
        f'starting bg thread\n'
 | 
			
		||||
        f'>(\n'
 | 
			
		||||
        f'|_{thr!r}\n'
 | 
			
		||||
    )
 | 
			
		||||
    thr.start()
 | 
			
		||||
 | 
			
		||||
    # TODO, Event or cond.wait() here to sync!?
 | 
			
		||||
    time.sleep(0.01)
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
        log.info(f'locking cond {cond}..')
 | 
			
		||||
        with cond:
 | 
			
		||||
            log.runtime(f'LOCKED cond {cond}..')
 | 
			
		||||
            first_yielded = cond.to_yield
 | 
			
		||||
            log.runtime(f'cond.to_yield: {first_yielded}')
 | 
			
		||||
 | 
			
		||||
            # delegator shim generator which proxies values from
 | 
			
		||||
            # caller to callee-in-bg-thread
 | 
			
		||||
            def wrapper():
 | 
			
		||||
 | 
			
		||||
                # !?TODO, minimize # of yields during startup?
 | 
			
		||||
                # -[ ] we can do i in <=1 manual yield pre while-loop no?
 | 
			
		||||
                #
 | 
			
		||||
                first_sent = yield first_yielded
 | 
			
		||||
                cond.to_send = first_sent
 | 
			
		||||
 | 
			
		||||
                # !TODO, exactly why we need a conditional-emit-sys!
 | 
			
		||||
                log.runtime(
 | 
			
		||||
                    f'cond.notify()\n'
 | 
			
		||||
                    f'cond.to_send={cond.to_send!r}\n'
 | 
			
		||||
                    f'cond.to_yield={cond.to_yield!r}\n'
 | 
			
		||||
                )
 | 
			
		||||
                cond.notify()
 | 
			
		||||
                log.runtime(
 | 
			
		||||
                    f'cond.wait()\n'
 | 
			
		||||
                    f'cond.to_send={cond.to_send!r}\n'
 | 
			
		||||
                    f'cond.to_yield={cond.to_yield!r}\n'
 | 
			
		||||
                )
 | 
			
		||||
                cond.wait()
 | 
			
		||||
 | 
			
		||||
                to_yield = cond.to_yield
 | 
			
		||||
                log.runtime(
 | 
			
		||||
                    f'yielding to caller\n'
 | 
			
		||||
                    f'cond.to_send={cond.to_send!r}\n'
 | 
			
		||||
                    f'cond.to_yield={cond.to_yield!r}\n'
 | 
			
		||||
                )
 | 
			
		||||
                to_send = yield to_yield
 | 
			
		||||
                log.runtime(
 | 
			
		||||
                    f'post-yield to caller\n'
 | 
			
		||||
                    f'to_send={to_send!r}\n'
 | 
			
		||||
                    f'to_yield={to_yield!r}\n'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
                # !TODO, proper sentinel-to-break type-condition!
 | 
			
		||||
                while to_send is not None:
 | 
			
		||||
                    cond.to_send = to_send
 | 
			
		||||
                    log.runtime(
 | 
			
		||||
                        f'cond.nofity()\n'
 | 
			
		||||
                        f'cond.to_send={cond.to_send!r}\n'
 | 
			
		||||
                        f'cond.to_yield={cond.to_yield!r}\n'
 | 
			
		||||
                    )
 | 
			
		||||
                    cond.notify()
 | 
			
		||||
                    if cond.to_yield is None:
 | 
			
		||||
                        log.runtime(
 | 
			
		||||
                            'BREAKING from wrapper-LOOP!\n'
 | 
			
		||||
                        )
 | 
			
		||||
                        break
 | 
			
		||||
                        return
 | 
			
		||||
 | 
			
		||||
                    log.runtime(
 | 
			
		||||
                        f'cond.wait()\n'
 | 
			
		||||
                        f'cond.to_send={cond.to_send!r}\n'
 | 
			
		||||
                        f'cond.to_yield={cond.to_yield!r}\n'
 | 
			
		||||
                    )
 | 
			
		||||
                    cond.wait()
 | 
			
		||||
 | 
			
		||||
                    log.runtime(
 | 
			
		||||
                        f'yielding to caller\n'
 | 
			
		||||
                        f'cond.to_send={cond.to_send!r}\n'
 | 
			
		||||
                        f'cond.to_yield={cond.to_yield!r}\n'
 | 
			
		||||
                    )
 | 
			
		||||
                    to_yield = cond.to_yield
 | 
			
		||||
                    to_send = yield to_yield
 | 
			
		||||
                    log.runtime(
 | 
			
		||||
                        f'post-yield to caller\n'
 | 
			
		||||
                        f'to_send={to_send!r}\n'
 | 
			
		||||
                        f'to_yield={to_yield!r}\n'
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
            log.info('creating wrapper..')
 | 
			
		||||
            wrapper_gen = wrapper()
 | 
			
		||||
            log.info(f'first .send(None): {wrapper_gen}\n')
 | 
			
		||||
            first_yielded = wrapper_gen.send(None)
 | 
			
		||||
            log.info(f'first yielded: {first_yielded}\n')
 | 
			
		||||
 | 
			
		||||
            yield (
 | 
			
		||||
                thr,
 | 
			
		||||
                wrapper_gen,
 | 
			
		||||
                first_yielded,
 | 
			
		||||
            )
 | 
			
		||||
    finally:
 | 
			
		||||
        thr.join()
 | 
			
		||||
        log.info(f'bg thread joined: {thr!r}')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def main():
 | 
			
		||||
    async with trio.open_nursery() as tn:
 | 
			
		||||
        assert tn
 | 
			
		||||
 | 
			
		||||
        with (
 | 
			
		||||
            start_in_bg_thread(
 | 
			
		||||
                gen=(
 | 
			
		||||
                    _gen:=thread_gen(
 | 
			
		||||
                        seed=_seed,
 | 
			
		||||
                    )
 | 
			
		||||
                ),
 | 
			
		||||
            ) as (
 | 
			
		||||
                thr,
 | 
			
		||||
                wrapped_gen,
 | 
			
		||||
                first,
 | 
			
		||||
            ),
 | 
			
		||||
        ):
 | 
			
		||||
            assert (
 | 
			
		||||
                _gen is not wrapped_gen
 | 
			
		||||
                and
 | 
			
		||||
                wrapped_gen is not None
 | 
			
		||||
            )
 | 
			
		||||
            log.info(
 | 
			
		||||
                'Entering wrapped_gen loop\n'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # NOTE, like our `Context.started` value
 | 
			
		||||
            assert first == 0
 | 
			
		||||
 | 
			
		||||
            # !TODO, proper sentinel-to-break type-condition!
 | 
			
		||||
            yielded = first
 | 
			
		||||
            while yielded is not None:
 | 
			
		||||
 | 
			
		||||
                # XXX, compute callers new value to send to bg-thread
 | 
			
		||||
                to_send = yielded * yielded
 | 
			
		||||
 | 
			
		||||
                # send to bg-thread
 | 
			
		||||
                yielded = wrapped_gen.send(to_send)
 | 
			
		||||
                log.info(
 | 
			
		||||
                    f'(yielded:={yielded!r}) = wrapped_gen.send((to_send:={to_send!r})'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == '__main__':
 | 
			
		||||
    trio.run(main)
 | 
			
		||||
| 
						 | 
				
			
			@ -116,9 +116,11 @@ def test_shield_pause(
 | 
			
		|||
        child.pid,
 | 
			
		||||
        signal.SIGINT,
 | 
			
		||||
    )
 | 
			
		||||
    from tractor._supervise import _shutdown_msg
 | 
			
		||||
    expect(
 | 
			
		||||
        child,
 | 
			
		||||
        'Shutting down actor runtime',
 | 
			
		||||
        # 'Shutting down actor runtime',
 | 
			
		||||
        _shutdown_msg,
 | 
			
		||||
        timeout=6,
 | 
			
		||||
    )
 | 
			
		||||
    assert_before(
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -118,6 +118,10 @@ class Portal:
 | 
			
		|||
 | 
			
		||||
    @property
 | 
			
		||||
    def chan(self) -> Channel:
 | 
			
		||||
        '''
 | 
			
		||||
        Ref to this ctx's underlying `tractor.ipc.Channel`.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        return self._chan
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
| 
						 | 
				
			
			@ -177,10 +181,17 @@ class Portal:
 | 
			
		|||
 | 
			
		||||
        # not expecting a "main" result
 | 
			
		||||
        if self._expect_result_ctx is None:
 | 
			
		||||
            peer_id: str = f'{self.channel.aid.reprol()!r}'
 | 
			
		||||
            log.warning(
 | 
			
		||||
                f"Portal for {self.channel.aid} not expecting a final"
 | 
			
		||||
                " result?\nresult() should only be called if subactor"
 | 
			
		||||
                " was spawned with `ActorNursery.run_in_actor()`")
 | 
			
		||||
                f'Portal to peer {peer_id} will not deliver a final result?\n'
 | 
			
		||||
                f'\n'
 | 
			
		||||
                f'Context.result() can only be called by the parent of '
 | 
			
		||||
                f'a sub-actor when it was spawned with '
 | 
			
		||||
                f'`ActorNursery.run_in_actor()`'
 | 
			
		||||
                f'\n'
 | 
			
		||||
                f'Further this `ActorNursery`-method-API will deprecated in the'
 | 
			
		||||
                f'near fututre!\n'
 | 
			
		||||
            )
 | 
			
		||||
            return NoResult
 | 
			
		||||
 | 
			
		||||
        # expecting a "main" result
 | 
			
		||||
| 
						 | 
				
			
			@ -213,6 +224,7 @@ class Portal:
 | 
			
		|||
        typname: str = type(self).__name__
 | 
			
		||||
        log.warning(
 | 
			
		||||
            f'`{typname}.result()` is DEPRECATED!\n'
 | 
			
		||||
            f'\n'
 | 
			
		||||
            f'Use `{typname}.wait_for_result()` instead!\n'
 | 
			
		||||
        )
 | 
			
		||||
        return await self.wait_for_result(
 | 
			
		||||
| 
						 | 
				
			
			@ -224,8 +236,10 @@ class Portal:
 | 
			
		|||
        # terminate all locally running async generator
 | 
			
		||||
        # IPC calls
 | 
			
		||||
        if self._streams:
 | 
			
		||||
            log.cancel(
 | 
			
		||||
                f"Cancelling all streams with {self.channel.aid}")
 | 
			
		||||
            peer_id: str = f'{self.channel.aid.reprol()!r}'
 | 
			
		||||
            report: str = (
 | 
			
		||||
                f'Cancelling all msg-streams with {peer_id}\n'
 | 
			
		||||
            )
 | 
			
		||||
            for stream in self._streams.copy():
 | 
			
		||||
                try:
 | 
			
		||||
                    await stream.aclose()
 | 
			
		||||
| 
						 | 
				
			
			@ -234,10 +248,18 @@ class Portal:
 | 
			
		|||
                    # (unless of course at some point down the road we
 | 
			
		||||
                    # won't expect this to always be the case or need to
 | 
			
		||||
                    # detect it for respawning purposes?)
 | 
			
		||||
                    log.debug(f"{stream} was already closed.")
 | 
			
		||||
                    report += (
 | 
			
		||||
                        f'->) {stream!r} already closed\n'
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
            log.cancel(report)
 | 
			
		||||
 | 
			
		||||
    async def aclose(self):
 | 
			
		||||
        log.debug(f"Closing {self}")
 | 
			
		||||
        log.debug(
 | 
			
		||||
            f'Closing portal\n'
 | 
			
		||||
            f'>}}\n'
 | 
			
		||||
            f'|_{self}\n'
 | 
			
		||||
        )
 | 
			
		||||
        # TODO: once we move to implementing our own `ReceiveChannel`
 | 
			
		||||
        # (including remote task cancellation inside its `.aclose()`)
 | 
			
		||||
        # we'll need to .aclose all those channels here
 | 
			
		||||
| 
						 | 
				
			
			@ -263,19 +285,18 @@ class Portal:
 | 
			
		|||
        __runtimeframe__: int = 1  # noqa
 | 
			
		||||
 | 
			
		||||
        chan: Channel = self.channel
 | 
			
		||||
        peer_id: str = f'{self.channel.aid.reprol()!r}'
 | 
			
		||||
        if not chan.connected():
 | 
			
		||||
            log.runtime(
 | 
			
		||||
                'This channel is already closed, skipping cancel request..'
 | 
			
		||||
                'Peer {peer_id} is already disconnected\n'
 | 
			
		||||
                '-> skipping cancel request..\n'
 | 
			
		||||
            )
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
        reminfo: str = (
 | 
			
		||||
            f'c)=> {self.channel.aid}\n'
 | 
			
		||||
            f'  |_{chan}\n'
 | 
			
		||||
        )
 | 
			
		||||
        log.cancel(
 | 
			
		||||
            f'Requesting actor-runtime cancel for peer\n\n'
 | 
			
		||||
            f'{reminfo}'
 | 
			
		||||
            f'Sending actor-runtime-cancel-req to peer\n'
 | 
			
		||||
            f'\n'
 | 
			
		||||
            f'c)=> {peer_id}\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # XXX the one spot we set it?
 | 
			
		||||
| 
						 | 
				
			
			@ -300,8 +321,9 @@ class Portal:
 | 
			
		|||
                # may timeout and we never get an ack (obvi racy)
 | 
			
		||||
                # but that doesn't mean it wasn't cancelled.
 | 
			
		||||
                log.debug(
 | 
			
		||||
                    'May have failed to cancel peer?\n'
 | 
			
		||||
                    f'{reminfo}'
 | 
			
		||||
                    f'May have failed to cancel peer?\n'
 | 
			
		||||
                    f'\n'
 | 
			
		||||
                    f'c)=?> {peer_id}\n'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            # if we get here some weird cancellation case happened
 | 
			
		||||
| 
						 | 
				
			
			@ -319,22 +341,22 @@ class Portal:
 | 
			
		|||
 | 
			
		||||
            TransportClosed,
 | 
			
		||||
        ) as tpt_err:
 | 
			
		||||
            report: str = (
 | 
			
		||||
                f'IPC chan for actor already closed or broken?\n\n'
 | 
			
		||||
                f'{self.channel.aid}\n'
 | 
			
		||||
                f' |_{self.channel}\n'
 | 
			
		||||
            ipc_borked_report: str = (
 | 
			
		||||
                f'IPC for actor already closed/broken?\n\n'
 | 
			
		||||
                f'\n'
 | 
			
		||||
                f'c)=x> {peer_id}\n'
 | 
			
		||||
            )
 | 
			
		||||
            match tpt_err:
 | 
			
		||||
                case TransportClosed():
 | 
			
		||||
                    log.debug(report)
 | 
			
		||||
                    log.debug(ipc_borked_report)
 | 
			
		||||
                case _:
 | 
			
		||||
                    report += (
 | 
			
		||||
                    ipc_borked_report += (
 | 
			
		||||
                        f'\n'
 | 
			
		||||
                        f'Unhandled low-level transport-closed/error during\n'
 | 
			
		||||
                        f'Portal.cancel_actor()` request?\n'
 | 
			
		||||
                        f'<{type(tpt_err).__name__}( {tpt_err} )>\n'
 | 
			
		||||
                    )
 | 
			
		||||
                    log.warning(report)
 | 
			
		||||
                    log.warning(ipc_borked_report)
 | 
			
		||||
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -491,10 +513,13 @@ class Portal:
 | 
			
		|||
                with trio.CancelScope(shield=True):
 | 
			
		||||
                    await ctx.cancel()
 | 
			
		||||
 | 
			
		||||
            except trio.ClosedResourceError:
 | 
			
		||||
            except trio.ClosedResourceError as cre:
 | 
			
		||||
                # if the far end terminates before we send a cancel the
 | 
			
		||||
                # underlying transport-channel may already be closed.
 | 
			
		||||
                log.cancel(f'Context {ctx} was already closed?')
 | 
			
		||||
                log.cancel(
 | 
			
		||||
                    f'Context.cancel() -> {cre!r}\n'
 | 
			
		||||
                    f'cid: {ctx.cid!r} already closed?\n'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            # XXX: should this always be done?
 | 
			
		||||
            # await recv_chan.aclose()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -34,9 +34,9 @@ from typing import (
 | 
			
		|||
import trio
 | 
			
		||||
from trio import TaskStatus
 | 
			
		||||
 | 
			
		||||
from .devx.debug import (
 | 
			
		||||
    maybe_wait_for_debugger,
 | 
			
		||||
    acquire_debug_lock,
 | 
			
		||||
from .devx import (
 | 
			
		||||
    debug,
 | 
			
		||||
    pformat as _pformat
 | 
			
		||||
)
 | 
			
		||||
from tractor._state import (
 | 
			
		||||
    current_actor,
 | 
			
		||||
| 
						 | 
				
			
			@ -51,14 +51,17 @@ from tractor._portal import Portal
 | 
			
		|||
from tractor._runtime import Actor
 | 
			
		||||
from tractor._entry import _mp_main
 | 
			
		||||
from tractor._exceptions import ActorFailure
 | 
			
		||||
from tractor.msg.types import (
 | 
			
		||||
    Aid,
 | 
			
		||||
    SpawnSpec,
 | 
			
		||||
from tractor.msg import (
 | 
			
		||||
    types as msgtypes,
 | 
			
		||||
    pretty_struct,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if TYPE_CHECKING:
 | 
			
		||||
    from ipc import IPCServer
 | 
			
		||||
    from ipc import (
 | 
			
		||||
        _server,
 | 
			
		||||
        Channel,
 | 
			
		||||
    )
 | 
			
		||||
    from ._supervise import ActorNursery
 | 
			
		||||
    ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -328,20 +331,21 @@ async def soft_kill(
 | 
			
		|||
    see `.hard_kill()`).
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    peer_aid: Aid = portal.channel.aid
 | 
			
		||||
    chan: Channel = portal.channel
 | 
			
		||||
    peer_aid: msgtypes.Aid = chan.aid
 | 
			
		||||
    try:
 | 
			
		||||
        log.cancel(
 | 
			
		||||
            f'Soft killing sub-actor via portal request\n'
 | 
			
		||||
            f'\n'
 | 
			
		||||
            f'(c=> {peer_aid}\n'
 | 
			
		||||
            f'  |_{proc}\n'
 | 
			
		||||
            f'c)=> {peer_aid.reprol()}@[{chan.maddr}]\n'
 | 
			
		||||
            f'   |_{proc}\n'
 | 
			
		||||
        )
 | 
			
		||||
        # wait on sub-proc to signal termination
 | 
			
		||||
        await wait_func(proc)
 | 
			
		||||
 | 
			
		||||
    except trio.Cancelled:
 | 
			
		||||
        with trio.CancelScope(shield=True):
 | 
			
		||||
            await maybe_wait_for_debugger(
 | 
			
		||||
            await debug.maybe_wait_for_debugger(
 | 
			
		||||
                child_in_debug=_runtime_vars.get(
 | 
			
		||||
                    '_debug_mode', False
 | 
			
		||||
                ),
 | 
			
		||||
| 
						 | 
				
			
			@ -465,7 +469,7 @@ async def trio_proc(
 | 
			
		|||
        "--uid",
 | 
			
		||||
        # TODO, how to pass this over "wire" encodings like
 | 
			
		||||
        # cmdline args?
 | 
			
		||||
        # -[ ] maybe we can add an `Aid.min_tuple()` ?
 | 
			
		||||
        # -[ ] maybe we can add an `msgtypes.Aid.min_tuple()` ?
 | 
			
		||||
        str(subactor.uid),
 | 
			
		||||
        # Address the child must connect to on startup
 | 
			
		||||
        "--parent_addr",
 | 
			
		||||
| 
						 | 
				
			
			@ -483,13 +487,14 @@ async def trio_proc(
 | 
			
		|||
 | 
			
		||||
    cancelled_during_spawn: bool = False
 | 
			
		||||
    proc: trio.Process|None = None
 | 
			
		||||
    ipc_server: IPCServer = actor_nursery._actor.ipc_server
 | 
			
		||||
    ipc_server: _server.Server = actor_nursery._actor.ipc_server
 | 
			
		||||
    try:
 | 
			
		||||
        try:
 | 
			
		||||
            proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs)
 | 
			
		||||
            log.runtime(
 | 
			
		||||
                'Started new child\n'
 | 
			
		||||
                f'|_{proc}\n'
 | 
			
		||||
                f'Started new child subproc\n'
 | 
			
		||||
                f'(>\n'
 | 
			
		||||
                f' |_{proc}\n'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # wait for actor to spawn and connect back to us
 | 
			
		||||
| 
						 | 
				
			
			@ -507,10 +512,10 @@ async def trio_proc(
 | 
			
		|||
                with trio.CancelScope(shield=True):
 | 
			
		||||
                    # don't clobber an ongoing pdb
 | 
			
		||||
                    if is_root_process():
 | 
			
		||||
                        await maybe_wait_for_debugger()
 | 
			
		||||
                        await debug.maybe_wait_for_debugger()
 | 
			
		||||
 | 
			
		||||
                    elif proc is not None:
 | 
			
		||||
                        async with acquire_debug_lock(subactor.uid):
 | 
			
		||||
                        async with debug.acquire_debug_lock(subactor.uid):
 | 
			
		||||
                            # soft wait on the proc to terminate
 | 
			
		||||
                            with trio.move_on_after(0.5):
 | 
			
		||||
                                await proc.wait()
 | 
			
		||||
| 
						 | 
				
			
			@ -528,14 +533,19 @@ async def trio_proc(
 | 
			
		|||
 | 
			
		||||
        # send a "spawning specification" which configures the
 | 
			
		||||
        # initial runtime state of the child.
 | 
			
		||||
        sspec = SpawnSpec(
 | 
			
		||||
        sspec = msgtypes.SpawnSpec(
 | 
			
		||||
            _parent_main_data=subactor._parent_main_data,
 | 
			
		||||
            enable_modules=subactor.enable_modules,
 | 
			
		||||
            reg_addrs=subactor.reg_addrs,
 | 
			
		||||
            bind_addrs=bind_addrs,
 | 
			
		||||
            _runtime_vars=_runtime_vars,
 | 
			
		||||
        )
 | 
			
		||||
        log.runtime(f'Sending spawn spec: {str(sspec)}')
 | 
			
		||||
        log.runtime(
 | 
			
		||||
            f'Sending spawn spec to child\n'
 | 
			
		||||
            f'{{}}=> {chan.aid.reprol()!r}\n'
 | 
			
		||||
            f'\n'
 | 
			
		||||
            f'{pretty_struct.pformat(sspec)}\n'
 | 
			
		||||
        )
 | 
			
		||||
        await chan.send(sspec)
 | 
			
		||||
 | 
			
		||||
        # track subactor in current nursery
 | 
			
		||||
| 
						 | 
				
			
			@ -563,7 +573,7 @@ async def trio_proc(
 | 
			
		|||
            # condition.
 | 
			
		||||
            await soft_kill(
 | 
			
		||||
                proc,
 | 
			
		||||
                trio.Process.wait,
 | 
			
		||||
                trio.Process.wait,  # XXX, uses `pidfd_open()` below.
 | 
			
		||||
                portal
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -571,8 +581,7 @@ async def trio_proc(
 | 
			
		|||
            # tandem if not done already
 | 
			
		||||
            log.cancel(
 | 
			
		||||
                'Cancelling portal result reaper task\n'
 | 
			
		||||
                f'>c)\n'
 | 
			
		||||
                f' |_{subactor.uid}\n'
 | 
			
		||||
                f'c)> {subactor.aid.reprol()!r}\n'
 | 
			
		||||
            )
 | 
			
		||||
            nursery.cancel_scope.cancel()
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -581,21 +590,24 @@ async def trio_proc(
 | 
			
		|||
        # allowed! Do this **after** cancellation/teardown to avoid
 | 
			
		||||
        # killing the process too early.
 | 
			
		||||
        if proc:
 | 
			
		||||
            reap_repr: str = _pformat.nest_from_op(
 | 
			
		||||
                input_op='>x)',
 | 
			
		||||
                text=subactor.pformat(),
 | 
			
		||||
            )
 | 
			
		||||
            log.cancel(
 | 
			
		||||
                f'Hard reap sequence starting for subactor\n'
 | 
			
		||||
                f'>x)\n'
 | 
			
		||||
                f' |_{subactor}@{subactor.uid}\n'
 | 
			
		||||
                f'{reap_repr}'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            with trio.CancelScope(shield=True):
 | 
			
		||||
                # don't clobber an ongoing pdb
 | 
			
		||||
                if cancelled_during_spawn:
 | 
			
		||||
                    # Try again to avoid TTY clobbering.
 | 
			
		||||
                    async with acquire_debug_lock(subactor.uid):
 | 
			
		||||
                    async with debug.acquire_debug_lock(subactor.uid):
 | 
			
		||||
                        with trio.move_on_after(0.5):
 | 
			
		||||
                            await proc.wait()
 | 
			
		||||
 | 
			
		||||
                await maybe_wait_for_debugger(
 | 
			
		||||
                await debug.maybe_wait_for_debugger(
 | 
			
		||||
                    child_in_debug=_runtime_vars.get(
 | 
			
		||||
                        '_debug_mode', False
 | 
			
		||||
                    ),
 | 
			
		||||
| 
						 | 
				
			
			@ -624,7 +636,7 @@ async def trio_proc(
 | 
			
		|||
                #     acquire the lock and get notified of who has it,
 | 
			
		||||
                #     check that uid against our known children?
 | 
			
		||||
                # this_uid: tuple[str, str] = current_actor().uid
 | 
			
		||||
                # await acquire_debug_lock(this_uid)
 | 
			
		||||
                # await debug.acquire_debug_lock(this_uid)
 | 
			
		||||
 | 
			
		||||
                if proc.poll() is None:
 | 
			
		||||
                    log.cancel(f"Attempting to hard kill {proc}")
 | 
			
		||||
| 
						 | 
				
			
			@ -727,7 +739,7 @@ async def mp_proc(
 | 
			
		|||
 | 
			
		||||
    log.runtime(f"Started {proc}")
 | 
			
		||||
 | 
			
		||||
    ipc_server: IPCServer = actor_nursery._actor.ipc_server
 | 
			
		||||
    ipc_server: _server.Server = actor_nursery._actor.ipc_server
 | 
			
		||||
    try:
 | 
			
		||||
        # wait for actor to spawn and connect back to us
 | 
			
		||||
        # channel should have handshake completed by the
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue