forked from goodboy/tractor
1
0
Fork 0

Annotate nursery and portal methods for `CallerInfo` scanning

runtime_to_msgspec
Tyler Goodlet 2024-04-18 15:17:50 -04:00
parent 5439060cd3
commit d2f6428e46
2 changed files with 29 additions and 7 deletions

View File

@ -161,17 +161,18 @@ class Portal:
self._expect_result = await self.actor.start_remote_task( self._expect_result = await self.actor.start_remote_task(
self.channel, self.channel,
nsf=NamespacePath(f'{ns}:{func}'), nsf=NamespacePath(f'{ns}:{func}'),
kwargs=kwargs kwargs=kwargs,
portal=self,
) )
async def _return_once( async def _return_once(
self, self,
ctx: Context, ctx: Context,
) -> dict[str, Any]: ) -> Return:
assert ctx._remote_func_type == 'asyncfunc' # single response assert ctx._remote_func_type == 'asyncfunc' # single response
msg: dict = await ctx._recv_chan.receive() msg: Return = await ctx._recv_chan.receive()
return msg return msg
async def result(self) -> Any: async def result(self) -> Any:
@ -247,6 +248,8 @@ class Portal:
purpose. purpose.
''' '''
__runtimeframe__: int = 1 # noqa
chan: Channel = self.channel chan: Channel = self.channel
if not chan.connected(): if not chan.connected():
log.runtime( log.runtime(
@ -324,16 +327,18 @@ class Portal:
internals! internals!
''' '''
__runtimeframe__: int = 1 # noqa
nsf = NamespacePath( nsf = NamespacePath(
f'{namespace_path}:{function_name}' f'{namespace_path}:{function_name}'
) )
ctx = await self.actor.start_remote_task( ctx: Context = await self.actor.start_remote_task(
chan=self.channel, chan=self.channel,
nsf=nsf, nsf=nsf,
kwargs=kwargs, kwargs=kwargs,
portal=self,
) )
ctx._portal = self ctx._portal: Portal = self
msg = await self._return_once(ctx) msg: Return = await self._return_once(ctx)
return _unwrap_msg( return _unwrap_msg(
msg, msg,
self.channel, self.channel,
@ -384,6 +389,7 @@ class Portal:
self.channel, self.channel,
nsf=nsf, nsf=nsf,
kwargs=kwargs, kwargs=kwargs,
portal=self,
) )
ctx._portal = self ctx._portal = self
return _unwrap_msg( return _unwrap_msg(
@ -398,6 +404,14 @@ class Portal:
**kwargs, **kwargs,
) -> AsyncGenerator[MsgStream, None]: ) -> AsyncGenerator[MsgStream, None]:
'''
Legacy one-way streaming API.
TODO: re-impl on top `Portal.open_context()` + an async gen
around `Context.open_stream()`.
'''
__runtimeframe__: int = 1 # noqa
if not inspect.isasyncgenfunction(async_gen_func): if not inspect.isasyncgenfunction(async_gen_func):
if not ( if not (
@ -411,6 +425,7 @@ class Portal:
self.channel, self.channel,
nsf=NamespacePath.from_ref(async_gen_func), nsf=NamespacePath.from_ref(async_gen_func),
kwargs=kwargs, kwargs=kwargs,
portal=self,
) )
ctx._portal = self ctx._portal = self

View File

@ -131,7 +131,12 @@ class ActorNursery:
"main task" besides the runtime. "main task" besides the runtime.
''' '''
loglevel = loglevel or self._actor.loglevel or get_loglevel() __runtimeframe__: int = 1 # noqa
loglevel: str = (
loglevel
or self._actor.loglevel
or get_loglevel()
)
# configure and pass runtime state # configure and pass runtime state
_rtv = _state._runtime_vars.copy() _rtv = _state._runtime_vars.copy()
@ -209,6 +214,7 @@ class ActorNursery:
the actor is terminated. the actor is terminated.
''' '''
__runtimeframe__: int = 1 # noqa
mod_path: str = fn.__module__ mod_path: str = fn.__module__
if name is None: if name is None:
@ -257,6 +263,7 @@ class ActorNursery:
directly without any far end graceful ``trio`` cancellation. directly without any far end graceful ``trio`` cancellation.
''' '''
__runtimeframe__: int = 1 # noqa
self.cancelled = True self.cancelled = True
# TODO: impl a repr for spawn more compact # TODO: impl a repr for spawn more compact