forked from goodboy/tractor
Always set `Context._portal` on the caller task side
parent
92b540d518
commit
2680a9473d
|
@ -249,14 +249,14 @@ class Portal:
|
||||||
internals.
|
internals.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
msg = await self._return_once(
|
ctx = await self.actor.start_remote_task(
|
||||||
await self.actor.start_remote_task(
|
self.channel,
|
||||||
self.channel,
|
namespace_path,
|
||||||
namespace_path,
|
function_name,
|
||||||
function_name,
|
kwargs,
|
||||||
kwargs,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
ctx._portal = self
|
||||||
|
msg = await self._return_once(ctx)
|
||||||
return _unwrap_msg(msg, self.channel)
|
return _unwrap_msg(msg, self.channel)
|
||||||
|
|
||||||
async def run(
|
async def run(
|
||||||
|
@ -298,15 +298,15 @@ class Portal:
|
||||||
|
|
||||||
fn_mod_path, fn_name = func_deats(func)
|
fn_mod_path, fn_name = func_deats(func)
|
||||||
|
|
||||||
|
ctx = await self.actor.start_remote_task(
|
||||||
|
self.channel,
|
||||||
|
fn_mod_path,
|
||||||
|
fn_name,
|
||||||
|
kwargs,
|
||||||
|
)
|
||||||
|
ctx._portal = self
|
||||||
return _unwrap_msg(
|
return _unwrap_msg(
|
||||||
await self._return_once(
|
await self._return_once(ctx),
|
||||||
await self.actor.start_remote_task(
|
|
||||||
self.channel,
|
|
||||||
fn_mod_path,
|
|
||||||
fn_name,
|
|
||||||
kwargs,
|
|
||||||
)
|
|
||||||
),
|
|
||||||
self.channel,
|
self.channel,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -385,7 +385,6 @@ class Portal:
|
||||||
and synchronized final result collection. See ``tractor.Context``.
|
and synchronized final result collection. See ``tractor.Context``.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
# conduct target func method structural checks
|
# conduct target func method structural checks
|
||||||
if not inspect.iscoroutinefunction(func) and (
|
if not inspect.iscoroutinefunction(func) and (
|
||||||
getattr(func, '_tractor_contex_function', False)
|
getattr(func, '_tractor_contex_function', False)
|
||||||
|
@ -393,6 +392,8 @@ class Portal:
|
||||||
raise TypeError(
|
raise TypeError(
|
||||||
f'{func} must be an async generator function!')
|
f'{func} must be an async generator function!')
|
||||||
|
|
||||||
|
__tracebackhide__ = True
|
||||||
|
|
||||||
fn_mod_path, fn_name = func_deats(func)
|
fn_mod_path, fn_name = func_deats(func)
|
||||||
ctx = await self.actor.start_remote_task(
|
ctx = await self.actor.start_remote_task(
|
||||||
self.channel,
|
self.channel,
|
||||||
|
@ -407,6 +408,7 @@ class Portal:
|
||||||
# the "first" value here is delivered by the callee's
|
# the "first" value here is delivered by the callee's
|
||||||
# ``Context.started()`` call.
|
# ``Context.started()`` call.
|
||||||
first = msg['started']
|
first = msg['started']
|
||||||
|
ctx._started_called = True
|
||||||
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
assert msg.get('cid'), ("Received internal error at context?")
|
assert msg.get('cid'), ("Received internal error at context?")
|
||||||
|
@ -458,9 +460,10 @@ class Portal:
|
||||||
_err = err
|
_err = err
|
||||||
# the context cancels itself on any cancel
|
# the context cancels itself on any cancel
|
||||||
# causing error.
|
# causing error.
|
||||||
log.cancel(f'Context {ctx} sending cancel to far end')
|
log.cancel(
|
||||||
with trio.CancelScope(shield=True):
|
f'Context to {self.channel.uid} sending cancel request..')
|
||||||
await ctx.cancel()
|
|
||||||
|
await ctx.cancel()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
|
Loading…
Reference in New Issue