More service-mngr clarity notes
Nothing changing functionally here just adding more `tractor` operational notes, tips for debug tooling and typing fixes B) Of particular note is adding further details about the reason we do not need to call `Context.cancel()` inside the `finally:` block of `.open_context_in_task()` thanks to `tractor`'s new and improved inter-actor cancellation semantics Boservice_mng_to_tractor
parent
4f34797780
commit
4ce161e443
|
@ -144,6 +144,9 @@ async def open_service_mngr(
|
||||||
# TODO: is this more clever/efficient?
|
# TODO: is this more clever/efficient?
|
||||||
# if 'samplerd' in mngr.service_tasks:
|
# if 'samplerd' in mngr.service_tasks:
|
||||||
# await mngr.cancel_service('samplerd')
|
# await mngr.cancel_service('samplerd')
|
||||||
|
|
||||||
|
# await tractor.pause(shield=True)
|
||||||
|
# ^XXX, if needed mk sure to shield it ;)
|
||||||
tn.cancel_scope.cancel()
|
tn.cancel_scope.cancel()
|
||||||
|
|
||||||
|
|
||||||
|
@ -241,7 +244,11 @@ class ServiceMngr:
|
||||||
]
|
]
|
||||||
] = trio.TASK_STATUS_IGNORED,
|
] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> Any:
|
) -> tuple[
|
||||||
|
trio.CancelScope,
|
||||||
|
Context,
|
||||||
|
Any, # started value from ctx
|
||||||
|
]:
|
||||||
|
|
||||||
# TODO: use the ctx._scope directly here instead?
|
# TODO: use the ctx._scope directly here instead?
|
||||||
# -[ ] actually what semantics do we expect for this
|
# -[ ] actually what semantics do we expect for this
|
||||||
|
@ -251,6 +258,10 @@ class ServiceMngr:
|
||||||
async with portal.open_context(
|
async with portal.open_context(
|
||||||
target,
|
target,
|
||||||
allow_overruns=allow_overruns,
|
allow_overruns=allow_overruns,
|
||||||
|
|
||||||
|
# hide_tb=False,
|
||||||
|
# ^XXX^ HAWT TIPZ
|
||||||
|
|
||||||
**ctx_kwargs,
|
**ctx_kwargs,
|
||||||
|
|
||||||
) as (ctx, started):
|
) as (ctx, started):
|
||||||
|
@ -269,7 +280,9 @@ class ServiceMngr:
|
||||||
# wait on any context's return value
|
# wait on any context's return value
|
||||||
# and any final portal result from the
|
# and any final portal result from the
|
||||||
# sub-actor.
|
# sub-actor.
|
||||||
ctx_res: Any = await ctx.wait_for_result()
|
ctx_res: Any = await ctx.wait_for_result(
|
||||||
|
# hide_tb=False,
|
||||||
|
)
|
||||||
|
|
||||||
# NOTE: blocks indefinitely until cancelled
|
# NOTE: blocks indefinitely until cancelled
|
||||||
# either by error from the target context
|
# either by error from the target context
|
||||||
|
@ -304,25 +317,53 @@ class ServiceMngr:
|
||||||
finally:
|
finally:
|
||||||
# NOTE: the ctx MUST be cancelled first if we
|
# NOTE: the ctx MUST be cancelled first if we
|
||||||
# don't want the above `ctx.wait_for_result()` to
|
# don't want the above `ctx.wait_for_result()` to
|
||||||
# raise a self-ctxc. WHY, well since from the ctx's
|
# raise a self-ctxc.
|
||||||
|
#
|
||||||
|
# WHY, well since from the ctx's
|
||||||
# perspective the cancel request will have
|
# perspective the cancel request will have
|
||||||
# arrived out-out-of-band at the `Actor.cancel()`
|
# arrived out-out-of-band at the `Actor.cancel()`
|
||||||
# level, thus `Context.cancel_called == False`,
|
# level (since pikerd will have called
|
||||||
|
# `Portal.cancel_actor()`), and thus
|
||||||
|
# `Context.cancel_called == False`,
|
||||||
# meaning `ctx._is_self_cancelled() == False`.
|
# meaning `ctx._is_self_cancelled() == False`.
|
||||||
# with trio.CancelScope(shield=True):
|
#
|
||||||
# await ctx.cancel()
|
# HOWEVER, this should happen implicitly WITHOUT
|
||||||
|
# a manual `ctx.cancel()` call HERE since,
|
||||||
|
#
|
||||||
|
# - in the mngr shutdown case the surrounding
|
||||||
|
# `.service_n.cancel_scope` should be
|
||||||
|
# `.cancel_called == True` and the
|
||||||
|
# `Portal.open_context()` internals should take
|
||||||
|
# care of it.
|
||||||
|
#
|
||||||
|
# - in the specific-service cancellation case,
|
||||||
|
# `.cancel_service()` makes the manual
|
||||||
|
# `ctx.cancel()` call for us which SHOULD mean
|
||||||
|
# the ctxc is never raised above (since, again,
|
||||||
|
# it will be gracefully suppressed by
|
||||||
|
# `.open_context()` internals) and thus we only
|
||||||
|
# need to shut down the service actor.
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
complete.set()
|
|
||||||
self.service_tasks.pop(name)
|
self.service_tasks.pop(name)
|
||||||
|
complete.set()
|
||||||
|
|
||||||
cs, sub_ctx, complete, started = await self.service_n.start(
|
(
|
||||||
|
cs, # internally allocated
|
||||||
|
sub_ctx, # RPC peer-actor ctx
|
||||||
|
complete, # termination syncing
|
||||||
|
started, # proxyed from internal `.open_context()` entry.
|
||||||
|
) = await self.service_n.start(
|
||||||
open_context_in_task
|
open_context_in_task
|
||||||
)
|
)
|
||||||
|
|
||||||
# store the cancel scope and portal for later cancellation or
|
# store the cancel scope and portal for later cancellation or
|
||||||
# retstart if needed.
|
# retstart if needed.
|
||||||
self.service_tasks[name] = (cs, sub_ctx, portal, complete)
|
self.service_tasks[name] = (cs, sub_ctx, portal, complete)
|
||||||
return cs, sub_ctx, started
|
return (
|
||||||
|
cs,
|
||||||
|
sub_ctx,
|
||||||
|
started,
|
||||||
|
)
|
||||||
|
|
||||||
async def cancel_service(
|
async def cancel_service(
|
||||||
self,
|
self,
|
||||||
|
@ -341,11 +382,11 @@ class ServiceMngr:
|
||||||
await complete.wait()
|
await complete.wait()
|
||||||
|
|
||||||
if name in self.service_tasks:
|
if name in self.service_tasks:
|
||||||
# TODO: custom err?
|
|
||||||
# raise ServiceError(
|
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f'Serice task for {name} not terminated?'
|
f'Serice task for {name} not terminated?'
|
||||||
)
|
)
|
||||||
|
# raise ServiceError(
|
||||||
|
# ^TODO? custom err type?
|
||||||
|
|
||||||
# assert name not in self.service_tasks, \
|
# assert name not in self.service_tasks, \
|
||||||
# f'Serice task for {name} not terminated?'
|
# f'Serice task for {name} not terminated?'
|
||||||
|
|
Loading…
Reference in New Issue