Compare commits
	
		
			8 Commits 
		
	
	
		
			5fc64107e5
			...
			3749720d74
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						3749720d74 | |
| 
							
							
								 | 
						7d15b3ea4b | |
| 
							
							
								 | 
						262008f784 | |
| 
							
							
								 | 
						059903bf81 | |
| 
							
							
								 | 
						900e0b4cd1 | |
| 
							
							
								 | 
						38a4e37d47 | |
| 
							
							
								 | 
						a06289c47d | |
| 
							
							
								 | 
						eeb748a206 | 
| 
						 | 
				
			
			@ -0,0 +1,145 @@
 | 
			
		|||
from contextlib import (
 | 
			
		||||
    contextmanager as cm,
 | 
			
		||||
    # TODO, any diff in async case(s)??
 | 
			
		||||
    # asynccontextmanager as acm,
 | 
			
		||||
)
 | 
			
		||||
from functools import partial
 | 
			
		||||
 | 
			
		||||
import tractor
 | 
			
		||||
import trio
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
log = tractor.log.get_logger(__name__)
 | 
			
		||||
tractor.log.get_console_log('info')
 | 
			
		||||
 | 
			
		||||
@cm
 | 
			
		||||
def teardown_on_exc(
 | 
			
		||||
    raise_from_handler: bool = False,
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    You could also have a teardown handler which catches any exc and
 | 
			
		||||
    does some required teardown. In this case the problem is
 | 
			
		||||
    compounded UNLESS you ensure the handler's scope is OUTSIDE the
 | 
			
		||||
    `ux.aclose()`.. that is in the caller's enclosing scope.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    try:
 | 
			
		||||
        yield
 | 
			
		||||
    except BaseException as _berr:
 | 
			
		||||
        berr = _berr
 | 
			
		||||
        log.exception(
 | 
			
		||||
            f'Handling termination teardown in child due to,\n'
 | 
			
		||||
            f'{berr!r}\n'
 | 
			
		||||
        )
 | 
			
		||||
        if raise_from_handler:
 | 
			
		||||
            # XXX teardown ops XXX
 | 
			
		||||
            # on termination these steps say need to be run to
 | 
			
		||||
            # ensure wider system consistency (like the state of
 | 
			
		||||
            # remote connections/services).
 | 
			
		||||
            #
 | 
			
		||||
            # HOWEVER, any bug in this teardown code is also
 | 
			
		||||
            # masked by the `tx.aclose()`!
 | 
			
		||||
            # this is also true if `_tn.cancel_scope` is
 | 
			
		||||
            # `.cancel_called` by the parent in a graceful
 | 
			
		||||
            # request case..
 | 
			
		||||
 | 
			
		||||
            # simulate a bug in teardown handler.
 | 
			
		||||
            raise RuntimeError(
 | 
			
		||||
                'woopsie teardown bug!'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        raise  # no teardown bug.
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def finite_stream_to_rent(
 | 
			
		||||
    tx: trio.abc.SendChannel,
 | 
			
		||||
    child_errors_mid_stream: bool,
 | 
			
		||||
 | 
			
		||||
    task_status: trio.TaskStatus[
 | 
			
		||||
        trio.CancelScope,
 | 
			
		||||
    ] = trio.TASK_STATUS_IGNORED,
 | 
			
		||||
):
 | 
			
		||||
    async with (
 | 
			
		||||
        # XXX without this unmasker the mid-streaming RTE is never
 | 
			
		||||
        # reported since it is masked by the `tx.aclose()`
 | 
			
		||||
        # call which in turn raises `Cancelled`!
 | 
			
		||||
        #
 | 
			
		||||
        # NOTE, this is WITHOUT doing any exception handling
 | 
			
		||||
        # inside the child  task!
 | 
			
		||||
        #
 | 
			
		||||
        # TODO, uncomment next LoC to see the supprsessed beg[RTE]!
 | 
			
		||||
        # tractor.trionics.maybe_raise_from_masking_exc(),
 | 
			
		||||
 | 
			
		||||
        tx as tx,  # .aclose() is the guilty masker chkpt!
 | 
			
		||||
        trio.open_nursery() as _tn,
 | 
			
		||||
    ):
 | 
			
		||||
        # pass our scope back to parent for supervision\
 | 
			
		||||
        # control.
 | 
			
		||||
        task_status.started(_tn.cancel_scope)
 | 
			
		||||
 | 
			
		||||
        with teardown_on_exc(
 | 
			
		||||
            raise_from_handler=not child_errors_mid_stream,
 | 
			
		||||
        ):
 | 
			
		||||
            for i in range(100):
 | 
			
		||||
                log.info(
 | 
			
		||||
                    f'Child tx {i!r}\n'
 | 
			
		||||
                )
 | 
			
		||||
                if (
 | 
			
		||||
                    child_errors_mid_stream
 | 
			
		||||
                    and
 | 
			
		||||
                    i == 66
 | 
			
		||||
                ):
 | 
			
		||||
                    # oh wait but WOOPS there's a bug
 | 
			
		||||
                    # in that teardown code!?
 | 
			
		||||
                    raise RuntimeError(
 | 
			
		||||
                        'woopsie, a mid-streaming bug!?'
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
                await tx.send(i)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def main(
 | 
			
		||||
    # TODO! toggle this for the 2 cases!
 | 
			
		||||
    # 1. child errors mid-stream while parent is also requesting
 | 
			
		||||
    #   (graceful) cancel of that child streamer.
 | 
			
		||||
    #
 | 
			
		||||
    # 2. child contains a teardown handler which contains a
 | 
			
		||||
    #   bug and raises.
 | 
			
		||||
    #
 | 
			
		||||
    child_errors_mid_stream: bool,
 | 
			
		||||
):
 | 
			
		||||
    tx, rx = trio.open_memory_channel(1)
 | 
			
		||||
 | 
			
		||||
    async with (
 | 
			
		||||
        trio.open_nursery() as tn,
 | 
			
		||||
        rx as rx,
 | 
			
		||||
    ):
 | 
			
		||||
 | 
			
		||||
        _child_cs = await tn.start(
 | 
			
		||||
            partial(
 | 
			
		||||
                finite_stream_to_rent,
 | 
			
		||||
                child_errors_mid_stream=child_errors_mid_stream,
 | 
			
		||||
                tx=tx,
 | 
			
		||||
            )
 | 
			
		||||
        )
 | 
			
		||||
        async for msg in rx:
 | 
			
		||||
            log.info(
 | 
			
		||||
                f'Rent rx {msg!r}\n'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # simulate some external cancellation
 | 
			
		||||
            # request **JUST BEFORE** the child errors.
 | 
			
		||||
            if msg == 65:
 | 
			
		||||
                log.cancel(
 | 
			
		||||
                    f'Cancelling parent on,\n'
 | 
			
		||||
                    f'msg={msg}\n'
 | 
			
		||||
                    f'\n'
 | 
			
		||||
                    f'Simulates OOB cancel request!\n'
 | 
			
		||||
                )
 | 
			
		||||
                tn.cancel_scope.cancel()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == '__main__':
 | 
			
		||||
 | 
			
		||||
    for case in [True, False]:
 | 
			
		||||
        trio.run(main, case)
 | 
			
		||||
| 
						 | 
				
			
			@ -235,10 +235,16 @@ async def cancel_after(wait, reg_addr):
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture(scope='module')
 | 
			
		||||
def time_quad_ex(reg_addr, ci_env, spawn_backend):
 | 
			
		||||
def time_quad_ex(
 | 
			
		||||
    reg_addr: tuple,
 | 
			
		||||
    ci_env: bool,
 | 
			
		||||
    spawn_backend: str,
 | 
			
		||||
):
 | 
			
		||||
    if spawn_backend == 'mp':
 | 
			
		||||
        """no idea but the  mp *nix runs are flaking out here often...
 | 
			
		||||
        """
 | 
			
		||||
        '''
 | 
			
		||||
        no idea but the  mp *nix runs are flaking out here often...
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        pytest.skip("Test is too flaky on mp in CI")
 | 
			
		||||
 | 
			
		||||
    timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4
 | 
			
		||||
| 
						 | 
				
			
			@ -249,12 +255,24 @@ def time_quad_ex(reg_addr, ci_env, spawn_backend):
 | 
			
		|||
    return results, diff
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend):
 | 
			
		||||
    """This also serves as a kind of "we'd like to be this fast test"."""
 | 
			
		||||
def test_a_quadruple_example(
 | 
			
		||||
    time_quad_ex: tuple,
 | 
			
		||||
    ci_env: bool,
 | 
			
		||||
    spawn_backend: str,
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    This also serves as a kind of "we'd like to be this fast test".
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    results, diff = time_quad_ex
 | 
			
		||||
    assert results
 | 
			
		||||
    this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 3
 | 
			
		||||
    this_fast = (
 | 
			
		||||
        6 if platform.system() in (
 | 
			
		||||
            'Windows',
 | 
			
		||||
            'Darwin',
 | 
			
		||||
        )
 | 
			
		||||
        else 3
 | 
			
		||||
    )
 | 
			
		||||
    assert diff < this_fast
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -478,7 +478,10 @@ async def open_root_actor(
 | 
			
		|||
 | 
			
		||||
            # start runtime in a bg sub-task, yield to caller.
 | 
			
		||||
            async with (
 | 
			
		||||
                collapse_eg(),
 | 
			
		||||
                collapse_eg(
 | 
			
		||||
                    # bp=True,
 | 
			
		||||
                    hide_tb=False,
 | 
			
		||||
                ),
 | 
			
		||||
                trio.open_nursery() as root_tn,
 | 
			
		||||
 | 
			
		||||
                # XXX, finally-footgun below?
 | 
			
		||||
| 
						 | 
				
			
			@ -523,6 +526,12 @@ async def open_root_actor(
 | 
			
		|||
                        err,
 | 
			
		||||
                        api_frame=inspect.currentframe(),
 | 
			
		||||
                        debug_filter=debug_filter,
 | 
			
		||||
 | 
			
		||||
                        # XXX NOTE, required to debug root-actor
 | 
			
		||||
                        # crashes under cancellation conditions; so
 | 
			
		||||
                        # most of them!
 | 
			
		||||
                        shield=root_tn.cancel_scope.cancel_called,
 | 
			
		||||
                        # ^TODO? write a (debugger) test for this ya?
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
                    if (
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -324,8 +324,8 @@ async def _errors_relayed_via_ipc(
 | 
			
		|||
                    )
 | 
			
		||||
                )
 | 
			
		||||
                # TODO? better then `debug_filter` below?
 | 
			
		||||
                # and
 | 
			
		||||
                # not isinstance(err, TransportClosed)
 | 
			
		||||
                and
 | 
			
		||||
                not isinstance(err, TransportClosed)
 | 
			
		||||
            ):
 | 
			
		||||
                # XXX QUESTION XXX: is there any case where we'll
 | 
			
		||||
                # want to debug IPC disconnects as a default?
 | 
			
		||||
| 
						 | 
				
			
			@ -693,20 +693,23 @@ async def _invoke(
 | 
			
		|||
                    f'\n'
 | 
			
		||||
                    f'{pretty_struct.pformat(return_msg)}\n'
 | 
			
		||||
                )
 | 
			
		||||
                try:
 | 
			
		||||
                    await chan.send(return_msg)
 | 
			
		||||
                except TransportClosed:
 | 
			
		||||
                    log.exception(
 | 
			
		||||
                        f"Failed send final result to 'parent'-side of IPC-ctx!\n"
 | 
			
		||||
                        f'\n'
 | 
			
		||||
                        f'{chan}\n'
 | 
			
		||||
                        f'Channel already disconnected ??\n'
 | 
			
		||||
                        f'\n'
 | 
			
		||||
                        f'{pretty_struct.pformat(return_msg)}'
 | 
			
		||||
                    )
 | 
			
		||||
                    # ?TODO? will this ever be true though?
 | 
			
		||||
                    if chan.connected():
 | 
			
		||||
                        raise
 | 
			
		||||
                await chan.send(return_msg)
 | 
			
		||||
                # ?TODO, remove the below since .send() already
 | 
			
		||||
                # doesn't raise on tpt-closed?
 | 
			
		||||
                # try:
 | 
			
		||||
                #     await chan.send(return_msg)
 | 
			
		||||
                # except TransportClosed:
 | 
			
		||||
                #     log.exception(
 | 
			
		||||
                #         f"Failed send final result to 'parent'-side of IPC-ctx!\n"
 | 
			
		||||
                #         f'\n'
 | 
			
		||||
                #         f'{chan}\n'
 | 
			
		||||
                #         f'Channel already disconnected ??\n'
 | 
			
		||||
                #         f'\n'
 | 
			
		||||
                #         f'{pretty_struct.pformat(return_msg)}'
 | 
			
		||||
                #     )
 | 
			
		||||
                #     # ?TODO? will this ever be true though?
 | 
			
		||||
                #     if chan.connected():
 | 
			
		||||
                #         raise
 | 
			
		||||
 | 
			
		||||
            # NOTE: this happens IFF `ctx._scope.cancel()` is
 | 
			
		||||
            # called by any of,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -297,6 +297,23 @@ async def hard_kill(
 | 
			
		|||
    # zombies (as a feature) we ask the OS to do send in the
 | 
			
		||||
    # removal swad as the last resort.
 | 
			
		||||
    if cs.cancelled_caught:
 | 
			
		||||
 | 
			
		||||
        # TODO? attempt at intermediary-rent-sub
 | 
			
		||||
        # with child in debug lock?
 | 
			
		||||
        # |_https://github.com/goodboy/tractor/issues/320
 | 
			
		||||
        #
 | 
			
		||||
        # if not is_root_process():
 | 
			
		||||
        #     log.warning(
 | 
			
		||||
        #         'Attempting to acquire debug-REPL-lock before zombie reap!'
 | 
			
		||||
        #     )
 | 
			
		||||
        #     with trio.CancelScope(shield=True):
 | 
			
		||||
        #         async with debug.acquire_debug_lock(
 | 
			
		||||
        #             subactor_uid=current_actor().uid,
 | 
			
		||||
        #         ) as _ctx:
 | 
			
		||||
        #             log.warning(
 | 
			
		||||
        #                 'Acquired debug lock, child ready to be killed ??\n'
 | 
			
		||||
        #             )
 | 
			
		||||
 | 
			
		||||
        # TODO: toss in the skynet-logo face as ascii art?
 | 
			
		||||
        log.critical(
 | 
			
		||||
            # 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n'
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -430,6 +430,7 @@ class MsgpackTransport(MsgTransport):
 | 
			
		|||
                return await self.stream.send_all(size + bytes_data)
 | 
			
		||||
            except (
 | 
			
		||||
                trio.BrokenResourceError,
 | 
			
		||||
                trio.ClosedResourceError,
 | 
			
		||||
            ) as _re:
 | 
			
		||||
                trans_err = _re
 | 
			
		||||
                tpt_name: str = f'{type(self).__name__!r}'
 | 
			
		||||
| 
						 | 
				
			
			@ -458,6 +459,22 @@ class MsgpackTransport(MsgTransport):
 | 
			
		|||
                        )
 | 
			
		||||
                        raise tpt_closed from trans_err
 | 
			
		||||
 | 
			
		||||
                    # case trio.ClosedResourceError() if (
 | 
			
		||||
                    #     'this socket was already closed'
 | 
			
		||||
                    #     in
 | 
			
		||||
                    #     trans_err.args[0]
 | 
			
		||||
                    # ):
 | 
			
		||||
                    #     tpt_closed = TransportClosed.from_src_exc(
 | 
			
		||||
                    #         message=(
 | 
			
		||||
                    #             f'{tpt_name} already closed by peer\n'
 | 
			
		||||
                    #         ),
 | 
			
		||||
                    #         body=f'{self}\n',
 | 
			
		||||
                    #         src_exc=trans_err,
 | 
			
		||||
                    #         raise_on_report=True,
 | 
			
		||||
                    #         loglevel='transport',
 | 
			
		||||
                    #     )
 | 
			
		||||
                    #     raise tpt_closed from trans_err
 | 
			
		||||
 | 
			
		||||
                    # unless the disconnect condition falls under "a
 | 
			
		||||
                    # normal operation breakage" we usualy console warn
 | 
			
		||||
                    # about it.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue