Parameterize the `return_msg_type` in `._invoke()`
Since we also handle a runtime-specific `CancelAck`, allow the caller-scheduler to pass in the expected return-type msg per the RPC msg endpoint loop.runtime_to_msgspec
parent
cbd47d800e
commit
a520951928
|
@ -63,6 +63,7 @@ from .log import get_logger
|
||||||
from .msg import (
|
from .msg import (
|
||||||
current_codec,
|
current_codec,
|
||||||
MsgCodec,
|
MsgCodec,
|
||||||
|
PayloadT,
|
||||||
NamespacePath,
|
NamespacePath,
|
||||||
pretty_struct,
|
pretty_struct,
|
||||||
)
|
)
|
||||||
|
@ -96,7 +97,7 @@ async def _invoke_non_context(
|
||||||
|
|
||||||
treat_as_gen: bool,
|
treat_as_gen: bool,
|
||||||
is_rpc: bool,
|
is_rpc: bool,
|
||||||
return_msg: Return|CancelAck = Return,
|
return_msg_type: Return|CancelAck = Return,
|
||||||
|
|
||||||
task_status: TaskStatus[
|
task_status: TaskStatus[
|
||||||
Context | BaseException
|
Context | BaseException
|
||||||
|
@ -218,7 +219,7 @@ async def _invoke_non_context(
|
||||||
and chan.connected()
|
and chan.connected()
|
||||||
):
|
):
|
||||||
try:
|
try:
|
||||||
ret_msg = return_msg(
|
ret_msg = return_msg_type(
|
||||||
cid=cid,
|
cid=cid,
|
||||||
pld=result,
|
pld=result,
|
||||||
)
|
)
|
||||||
|
@ -417,7 +418,7 @@ async def _invoke(
|
||||||
|
|
||||||
is_rpc: bool = True,
|
is_rpc: bool = True,
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = True,
|
||||||
return_msg: Return|CancelAck = Return,
|
return_msg_type: Return|CancelAck = Return,
|
||||||
|
|
||||||
task_status: TaskStatus[
|
task_status: TaskStatus[
|
||||||
Context | BaseException
|
Context | BaseException
|
||||||
|
@ -531,7 +532,7 @@ async def _invoke(
|
||||||
kwargs,
|
kwargs,
|
||||||
treat_as_gen,
|
treat_as_gen,
|
||||||
is_rpc,
|
is_rpc,
|
||||||
return_msg,
|
return_msg_type,
|
||||||
task_status,
|
task_status,
|
||||||
)
|
)
|
||||||
# XXX below fallthrough is ONLY for `@context` eps
|
# XXX below fallthrough is ONLY for `@context` eps
|
||||||
|
@ -591,18 +592,21 @@ async def _invoke(
|
||||||
ctx._scope = tn.cancel_scope
|
ctx._scope = tn.cancel_scope
|
||||||
task_status.started(ctx)
|
task_status.started(ctx)
|
||||||
|
|
||||||
# TODO: should would be nice to have our
|
# TODO: better `trionics` tooling:
|
||||||
# `TaskMngr` nursery here!
|
# -[ ] should would be nice to have our `TaskMngr`
|
||||||
res: Any = await coro
|
# nursery here!
|
||||||
ctx._result = res
|
# -[ ] payload value checking like we do with
|
||||||
|
# `.started()` such that the debbuger can engage
|
||||||
# deliver final result to caller side.
|
# here in the child task instead of waiting for the
|
||||||
await chan.send(
|
# parent to crash with it's own MTE..
|
||||||
return_msg(
|
res: Any|PayloadT = await coro
|
||||||
cid=cid,
|
return_msg: Return|CancelAck = return_msg_type(
|
||||||
pld=res,
|
cid=cid,
|
||||||
)
|
pld=res,
|
||||||
)
|
)
|
||||||
|
# set and shuttle final result to "parent"-side task.
|
||||||
|
ctx._result = res
|
||||||
|
await chan.send(return_msg)
|
||||||
|
|
||||||
# NOTE: this happens IFF `ctx._scope.cancel()` is
|
# NOTE: this happens IFF `ctx._scope.cancel()` is
|
||||||
# called by any of,
|
# called by any of,
|
||||||
|
@ -938,7 +942,7 @@ async def process_messages(
|
||||||
actor.cancel,
|
actor.cancel,
|
||||||
kwargs,
|
kwargs,
|
||||||
is_rpc=False,
|
is_rpc=False,
|
||||||
return_msg=CancelAck,
|
return_msg_type=CancelAck,
|
||||||
)
|
)
|
||||||
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
|
@ -972,7 +976,7 @@ async def process_messages(
|
||||||
actor._cancel_task,
|
actor._cancel_task,
|
||||||
kwargs,
|
kwargs,
|
||||||
is_rpc=False,
|
is_rpc=False,
|
||||||
return_msg=CancelAck,
|
return_msg_type=CancelAck,
|
||||||
)
|
)
|
||||||
except BaseException:
|
except BaseException:
|
||||||
log.exception(
|
log.exception(
|
||||||
|
|
Loading…
Reference in New Issue