forked from goodboy/tractor
				
			Move maybe-raise-error-msg logic into context
A context method handling all this logic makes the most sense since it contains all the state related to whether the error should be raised in a nursery scope or is expected to be raised by a consumer task which reads and processes the msg directly (via a `Portal` API call). This also makes it easy to always process remote errors even when there is no (stream) overrun condition.stricter_context_starting
							parent
							
								
									1f8e1cccbb
								
							
						
					
					
						commit
						c9132de7dc
					
				| 
						 | 
				
			
			@ -650,81 +650,56 @@ class Actor:
 | 
			
		|||
            return
 | 
			
		||||
 | 
			
		||||
        send_chan = ctx._send_chan
 | 
			
		||||
        assert send_chan
 | 
			
		||||
 | 
			
		||||
        error = msg.get('error')
 | 
			
		||||
        if error:
 | 
			
		||||
            # If this is an error message from a context opened by
 | 
			
		||||
            # ``Portal.open_context()`` we want to interrupt any ongoing
 | 
			
		||||
            # (child) tasks within that context to be notified of the remote
 | 
			
		||||
            # error relayed here.
 | 
			
		||||
            #
 | 
			
		||||
            # The reason we may want to raise the remote error immediately
 | 
			
		||||
            # is that there is no guarantee the associated local task(s)
 | 
			
		||||
            # will attempt to read from any locally opened stream any time
 | 
			
		||||
            # soon.
 | 
			
		||||
            #
 | 
			
		||||
            # NOTE: this only applies when
 | 
			
		||||
            # ``Portal.open_context()`` has been called since it is assumed
 | 
			
		||||
            # (currently) that other portal APIs (``Portal.run()``,
 | 
			
		||||
            # ``.run_in_actor()``) do their own error checking at the point
 | 
			
		||||
            # of the call and result processing.
 | 
			
		||||
            log.warning(f'Remote context for {chan.uid}:{cid} errored {msg}')
 | 
			
		||||
            ctx._maybe_error_from_remote_msg(msg)
 | 
			
		||||
        log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}")
 | 
			
		||||
 | 
			
		||||
        # XXX: we do **not** maintain backpressure and instead
 | 
			
		||||
        # opt to relay stream overrun errors to the sender.
 | 
			
		||||
        try:
 | 
			
		||||
            log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}")
 | 
			
		||||
            send_chan.send_nowait(msg)
 | 
			
		||||
            # if an error is deteced we should always
 | 
			
		||||
            # expect it to be raised by any context (stream)
 | 
			
		||||
            # consumer task
 | 
			
		||||
            await ctx._maybe_raise_from_remote_msg(msg)
 | 
			
		||||
 | 
			
		||||
            # XXX: we do **not** maintain backpressure and instead
 | 
			
		||||
            # opt to relay stream overrun errors to the sender.
 | 
			
		||||
            try:
 | 
			
		||||
                send_chan.send_nowait(msg)
 | 
			
		||||
            except trio.WouldBlock:
 | 
			
		||||
        except trio.WouldBlock:
 | 
			
		||||
            # XXX: always push an error even if the local
 | 
			
		||||
            # receiver is in overrun state.
 | 
			
		||||
            await ctx._maybe_raise_from_remote_msg(msg)
 | 
			
		||||
 | 
			
		||||
                # XXX: do we need this?
 | 
			
		||||
                # if we're trying to push an error but we're in
 | 
			
		||||
                # an overrun state we'll just get stuck sending
 | 
			
		||||
                # the error that was sent to us back to it's sender
 | 
			
		||||
                # instead of it actually being raises in the target
 | 
			
		||||
                # task..
 | 
			
		||||
                # if error:
 | 
			
		||||
                #     raise unpack_error(msg, chan) from None
 | 
			
		||||
            uid = chan.uid
 | 
			
		||||
            lines = [
 | 
			
		||||
                'Task context stream was overrun',
 | 
			
		||||
                f'local task: {cid} @ {self.uid}',
 | 
			
		||||
                f'remote sender: {uid}',
 | 
			
		||||
            ]
 | 
			
		||||
            if not ctx._stream_opened:
 | 
			
		||||
                lines.insert(
 | 
			
		||||
                    1,
 | 
			
		||||
                    f'\n*** No stream open on `{self.uid[0]}` side! ***\n'
 | 
			
		||||
                )
 | 
			
		||||
            text = '\n'.join(lines)
 | 
			
		||||
 | 
			
		||||
                uid = chan.uid
 | 
			
		||||
 | 
			
		||||
                lines = [
 | 
			
		||||
                    'Task context stream was overrun',
 | 
			
		||||
                    f'local task: {cid} @ {self.uid}',
 | 
			
		||||
                    f'remote sender: {chan.uid}',
 | 
			
		||||
                ]
 | 
			
		||||
                if not ctx._stream_opened:
 | 
			
		||||
                    lines.insert(
 | 
			
		||||
                        1,
 | 
			
		||||
                        f'\n*** No stream open on `{self.uid[0]}` side! ***\n'
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
                text = '\n'.join(lines)
 | 
			
		||||
            if ctx._backpressure:
 | 
			
		||||
                log.warning(text)
 | 
			
		||||
                if ctx._backpressure:
 | 
			
		||||
                    await send_chan.send(msg)
 | 
			
		||||
                else:
 | 
			
		||||
                await send_chan.send(msg)
 | 
			
		||||
            else:
 | 
			
		||||
                try:
 | 
			
		||||
                    raise StreamOverrun(text) from None
 | 
			
		||||
                except StreamOverrun as err:
 | 
			
		||||
                    err_msg = pack_error(err)
 | 
			
		||||
                    err_msg['cid'] = cid
 | 
			
		||||
                    try:
 | 
			
		||||
                        raise StreamOverrun(text) from None
 | 
			
		||||
                    except StreamOverrun as err:
 | 
			
		||||
                        err_msg = pack_error(err)
 | 
			
		||||
                        err_msg['cid'] = cid
 | 
			
		||||
                        # TODO: what is the right way to handle the case where the
 | 
			
		||||
                        # local task has already sent a 'stop' / StopAsyncInteration
 | 
			
		||||
                        # to the other side but and possibly has closed the local
 | 
			
		||||
                        # feeder mem chan? Do we wait for some kind of ack or just
 | 
			
		||||
                        # let this fail silently and bubble up (currently)?
 | 
			
		||||
                        await chan.send(err_msg)
 | 
			
		||||
 | 
			
		||||
        except trio.BrokenResourceError:
 | 
			
		||||
            # TODO: what is the right way to handle the case where the
 | 
			
		||||
            # local task has already sent a 'stop' / StopAsyncInteration
 | 
			
		||||
            # to the other side but and possibly has closed the local
 | 
			
		||||
            # feeder mem chan? Do we wait for some kind of ack or just
 | 
			
		||||
            # let this fail silently and bubble up (currently)?
 | 
			
		||||
 | 
			
		||||
            # XXX: local consumer has closed their side
 | 
			
		||||
            # so cancel the far end streaming task
 | 
			
		||||
            log.warning(f"{send_chan} consumer is already closed")
 | 
			
		||||
                    except trio.BrokenResourceError:
 | 
			
		||||
                        # XXX: local consumer has closed their side
 | 
			
		||||
                        # so cancel the far end streaming task
 | 
			
		||||
                        log.warning(f"{send_chan} consumer is already closed")
 | 
			
		||||
 | 
			
		||||
    def get_context(
 | 
			
		||||
        self,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -364,33 +364,55 @@ class Context:
 | 
			
		|||
    async def send_stop(self) -> None:
 | 
			
		||||
        await self.chan.send({'stop': True, 'cid': self.cid})
 | 
			
		||||
 | 
			
		||||
    def _maybe_error_from_remote_msg(
 | 
			
		||||
    async def _maybe_raise_from_remote_msg(
 | 
			
		||||
        self,
 | 
			
		||||
        msg: Dict[str, Any],
 | 
			
		||||
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        '''
 | 
			
		||||
        Unpack and raise a msg error into the local scope
 | 
			
		||||
        (Maybe) unpack and raise a msg error into the local scope
 | 
			
		||||
        nursery for this context.
 | 
			
		||||
 | 
			
		||||
        Acts as a form of "relay" for a remote error raised
 | 
			
		||||
        in the corresponding remote callee task.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        self._error = unpack_error(msg, self.chan)
 | 
			
		||||
        error = msg.get('error')
 | 
			
		||||
        if error:
 | 
			
		||||
            # If this is an error message from a context opened by
 | 
			
		||||
            # ``Portal.open_context()`` we want to interrupt any ongoing
 | 
			
		||||
            # (child) tasks within that context to be notified of the remote
 | 
			
		||||
            # error relayed here.
 | 
			
		||||
            #
 | 
			
		||||
            # The reason we may want to raise the remote error immediately
 | 
			
		||||
            # is that there is no guarantee the associated local task(s)
 | 
			
		||||
            # will attempt to read from any locally opened stream any time
 | 
			
		||||
            # soon.
 | 
			
		||||
            #
 | 
			
		||||
            # NOTE: this only applies when
 | 
			
		||||
            # ``Portal.open_context()`` has been called since it is assumed
 | 
			
		||||
            # (currently) that other portal APIs (``Portal.run()``,
 | 
			
		||||
            # ``.run_in_actor()``) do their own error checking at the point
 | 
			
		||||
            # of the call and result processing.
 | 
			
		||||
            log.error(
 | 
			
		||||
                f'Remote context error for {self.chan.uid}:{self.cid}:\n'
 | 
			
		||||
                f'{msg["error"]["tb_str"]}'
 | 
			
		||||
            )
 | 
			
		||||
            # await ctx._maybe_error_from_remote_msg(msg)
 | 
			
		||||
            self._error = unpack_error(msg, self.chan)
 | 
			
		||||
 | 
			
		||||
        # TODO: tempted to **not** do this by-reraising in a
 | 
			
		||||
        # nursery and instead cancel a surrounding scope, detect
 | 
			
		||||
        # the cancellation, then lookup the error that was set?
 | 
			
		||||
        if self._scope_nursery:
 | 
			
		||||
            # TODO: tempted to **not** do this by-reraising in a
 | 
			
		||||
            # nursery and instead cancel a surrounding scope, detect
 | 
			
		||||
            # the cancellation, then lookup the error that was set?
 | 
			
		||||
            if self._scope_nursery:
 | 
			
		||||
 | 
			
		||||
            async def raiser():
 | 
			
		||||
                raise self._error from None
 | 
			
		||||
                async def raiser():
 | 
			
		||||
                    raise self._error from None
 | 
			
		||||
 | 
			
		||||
            # from trio.testing import wait_all_tasks_blocked
 | 
			
		||||
            # await wait_all_tasks_blocked()
 | 
			
		||||
            if not self._scope_nursery._closed:  # type: ignore
 | 
			
		||||
                self._scope_nursery.start_soon(raiser)
 | 
			
		||||
                # from trio.testing import wait_all_tasks_blocked
 | 
			
		||||
                # await wait_all_tasks_blocked()
 | 
			
		||||
                if not self._scope_nursery._closed:  # type: ignore
 | 
			
		||||
                    self._scope_nursery.start_soon(raiser)
 | 
			
		||||
 | 
			
		||||
    async def cancel(self) -> None:
 | 
			
		||||
        '''
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue