forked from goodboy/tractor
				
			First try: pack cancelled tracebacks and ship to caller
							parent
							
								
									3423ea4011
								
							
						
					
					
						commit
						c2484e88a1
					
				| 
						 | 
				
			
			@ -60,6 +60,9 @@ async def _invoke(
 | 
			
		|||
    """
 | 
			
		||||
    treat_as_gen = False
 | 
			
		||||
 | 
			
		||||
    # possible a traceback (not sure what typing is for this..)
 | 
			
		||||
    tb = None
 | 
			
		||||
 | 
			
		||||
    cancel_scope = trio.CancelScope()
 | 
			
		||||
    cs: trio.CancelScope = None
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -156,14 +159,26 @@ async def _invoke(
 | 
			
		|||
                ctx._scope_nursery = scope_nursery
 | 
			
		||||
                cs = scope_nursery.cancel_scope
 | 
			
		||||
                task_status.started(cs)
 | 
			
		||||
                await chan.send({'return': await coro, 'cid': cid})
 | 
			
		||||
                try:
 | 
			
		||||
                    await chan.send({'return': await coro, 'cid': cid})
 | 
			
		||||
                except trio.Cancelled as err:
 | 
			
		||||
                    tb = err.__traceback__
 | 
			
		||||
 | 
			
		||||
            if cs.cancelled_caught:
 | 
			
		||||
                if ctx._cancel_called:
 | 
			
		||||
                    msg = f'{func.__name__} cancelled itself',
 | 
			
		||||
 | 
			
		||||
                else:
 | 
			
		||||
                    msg = f'{func.__name__} was remotely cancelled',
 | 
			
		||||
                # TODO: pack in ``trio.Cancelled.__traceback__`` here
 | 
			
		||||
                # so they can be unwrapped and displayed on the caller
 | 
			
		||||
                # side!
 | 
			
		||||
 | 
			
		||||
                fname = func.__name__
 | 
			
		||||
                if ctx._cancel_called:
 | 
			
		||||
                    msg = f'{fname} cancelled itself'
 | 
			
		||||
 | 
			
		||||
                elif cs.cancel_called:
 | 
			
		||||
                    msg = (
 | 
			
		||||
                        f'{fname} was remotely cancelled by its caller '
 | 
			
		||||
                        f'{ctx.chan.uid}'
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
                # task-contex was cancelled so relay to the cancel to caller
 | 
			
		||||
                raise ContextCancelled(
 | 
			
		||||
| 
						 | 
				
			
			@ -196,7 +211,7 @@ async def _invoke(
 | 
			
		|||
                log.exception("Actor crashed:")
 | 
			
		||||
 | 
			
		||||
        # always ship errors back to caller
 | 
			
		||||
        err_msg = pack_error(err)
 | 
			
		||||
        err_msg = pack_error(err, tb=tb)
 | 
			
		||||
        err_msg['cid'] = cid
 | 
			
		||||
        try:
 | 
			
		||||
            await chan.send(err_msg)
 | 
			
		||||
| 
						 | 
				
			
			@ -223,7 +238,7 @@ async def _invoke(
 | 
			
		|||
                f"Task {func} likely errored or cancelled before it started")
 | 
			
		||||
        finally:
 | 
			
		||||
            if not actor._rpc_tasks:
 | 
			
		||||
                log.info("All RPC tasks have completed")
 | 
			
		||||
                log.runtime("All RPC tasks have completed")
 | 
			
		||||
                actor._ongoing_rpc_tasks.set()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -238,10 +253,10 @@ _lifetime_stack: ExitStack = ExitStack()
 | 
			
		|||
class Actor:
 | 
			
		||||
    """The fundamental concurrency primitive.
 | 
			
		||||
 | 
			
		||||
    An *actor* is the combination of a regular Python or
 | 
			
		||||
    ``multiprocessing.Process`` executing a ``trio`` task tree, communicating
 | 
			
		||||
    An *actor* is the combination of a regular Python process
 | 
			
		||||
    executing a ``trio`` task tree, communicating
 | 
			
		||||
    with other actors through "portals" which provide a native async API
 | 
			
		||||
    around "channels".
 | 
			
		||||
    around various IPC transport "channels".
 | 
			
		||||
    """
 | 
			
		||||
    is_arbiter: bool = False
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -396,7 +411,7 @@ class Actor:
 | 
			
		|||
        self._no_more_peers = trio.Event()  # unset
 | 
			
		||||
 | 
			
		||||
        chan = Channel(stream=stream)
 | 
			
		||||
        log.info(f"New connection to us {chan}")
 | 
			
		||||
        log.runtime(f"New connection to us {chan}")
 | 
			
		||||
 | 
			
		||||
        # send/receive initial handshake response
 | 
			
		||||
        try:
 | 
			
		||||
| 
						 | 
				
			
			@ -427,8 +442,12 @@ class Actor:
 | 
			
		|||
            event.set()
 | 
			
		||||
 | 
			
		||||
        chans = self._peers[uid]
 | 
			
		||||
 | 
			
		||||
        # TODO: re-use channels for new connections instead 
 | 
			
		||||
        # of always new ones; will require changing all the
 | 
			
		||||
        # discovery funcs
 | 
			
		||||
        if chans:
 | 
			
		||||
            log.warning(
 | 
			
		||||
            log.runtime(
 | 
			
		||||
                f"already have channel(s) for {uid}:{chans}?"
 | 
			
		||||
            )
 | 
			
		||||
        log.trace(f"Registered {chan} for {uid}")  # type: ignore
 | 
			
		||||
| 
						 | 
				
			
			@ -664,7 +683,7 @@ class Actor:
 | 
			
		|||
                        else:
 | 
			
		||||
                            # mark that we have ongoing rpc tasks
 | 
			
		||||
                            self._ongoing_rpc_tasks = trio.Event()
 | 
			
		||||
                            log.info(f"RPC func is {func}")
 | 
			
		||||
                            log.runtime(f"RPC func is {func}")
 | 
			
		||||
                            # store cancel scope such that the rpc task can be
 | 
			
		||||
                            # cancelled gracefully if requested
 | 
			
		||||
                            self._rpc_tasks[(chan, cid)] = (
 | 
			
		||||
| 
						 | 
				
			
			@ -673,7 +692,7 @@ class Actor:
 | 
			
		|||
                        # self.cancel() was called so kill this msg loop
 | 
			
		||||
                        # and break out into ``_async_main()``
 | 
			
		||||
                        log.warning(
 | 
			
		||||
                            f"{self.uid} was remotely cancelled; "
 | 
			
		||||
                            f"Actor {self.uid} was remotely cancelled; "
 | 
			
		||||
                            "waiting on cancellation completion..")
 | 
			
		||||
                        await self._cancel_complete.wait()
 | 
			
		||||
                        loop_cs.cancel()
 | 
			
		||||
| 
						 | 
				
			
			@ -1141,7 +1160,7 @@ class Actor:
 | 
			
		|||
            raise ValueError(f"{uid} is not a valid uid?!")
 | 
			
		||||
 | 
			
		||||
        chan.uid = uid
 | 
			
		||||
        log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
 | 
			
		||||
        log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete")
 | 
			
		||||
        return uid
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -56,13 +56,22 @@ class NoRuntime(RuntimeError):
 | 
			
		|||
    "The root actor has not been initialized yet"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def pack_error(exc: BaseException) -> Dict[str, Any]:
 | 
			
		||||
def pack_error(
 | 
			
		||||
    exc: BaseException,
 | 
			
		||||
    tb = None,
 | 
			
		||||
 | 
			
		||||
) -> Dict[str, Any]:
 | 
			
		||||
    """Create an "error message" for tranmission over
 | 
			
		||||
    a channel (aka the wire).
 | 
			
		||||
    """
 | 
			
		||||
    if tb:
 | 
			
		||||
        tb_str = ''.join(traceback.format_tb(tb))
 | 
			
		||||
    else:
 | 
			
		||||
        tb_str = traceback.format_exc()
 | 
			
		||||
 | 
			
		||||
    return {
 | 
			
		||||
        'error': {
 | 
			
		||||
            'tb_str': traceback.format_exc(),
 | 
			
		||||
            'tb_str': tb_str,
 | 
			
		||||
            'type_str': type(exc).__name__,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue