Ignore `ContextCancelled`s from non-mngr requests
Since service daemon actors may be cancelled remotely by clients (who maybe also requested said daemon-actor's spawn in the first place) we specifically ignore `tractor.ContextCancelled`s from the `ctx.wait()` inside `Services.start_service_task()` to avoid crashing the service mngr, and thus for now `pikerd`, (which **does** happen now due to updated and more explicit remote cancellation semantics implemented in `tractor`) since the `.canceller` field is not going to match the `pikerd` uid in such cases! This explicit check makes sense since the `Services` mngr is built to allow remote requests to "spawn-n-supervise service actors" where the services can remain persistent but also cancelled later as requested. We may want to consider only allowing cancellation by actors who requested spawn in the future tho? Also change to more explicit imports to `tractor` types for annots throughout the sub-pkg.distribute_dis
parent
fc216d37de
commit
05f874001a
|
@ -15,8 +15,8 @@
|
|||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
Supervisor for ``docker`` with included async and SC wrapping
|
||||
to ensure a cancellable container lifetime system.
|
||||
Supervisor for ``docker`` with included async and SC wrapping to
|
||||
ensure a cancellable container lifetime system.
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
|
|
|
@ -27,6 +27,12 @@ from typing import (
|
|||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
import tractor
|
||||
from tractor import (
|
||||
current_actor,
|
||||
ContextCancelled,
|
||||
Context,
|
||||
Portal,
|
||||
)
|
||||
|
||||
from ._util import (
|
||||
log, # sub-sys logger
|
||||
|
@ -38,6 +44,8 @@ from ._util import (
|
|||
# library.
|
||||
# - wrap a "remote api" wherein you can get a method proxy
|
||||
# to the pikerd actor for starting services remotely!
|
||||
# - prolly rename this to ActorServicesNursery since it spawns
|
||||
# new actors and supervises them to completion?
|
||||
class Services:
|
||||
|
||||
actor_n: tractor._supervise.ActorNursery
|
||||
|
@ -47,7 +55,7 @@ class Services:
|
|||
str,
|
||||
tuple[
|
||||
trio.CancelScope,
|
||||
tractor.Portal,
|
||||
Portal,
|
||||
trio.Event,
|
||||
]
|
||||
] = {}
|
||||
|
@ -57,12 +65,12 @@ class Services:
|
|||
async def start_service_task(
|
||||
self,
|
||||
name: str,
|
||||
portal: tractor.Portal,
|
||||
portal: Portal,
|
||||
target: Callable,
|
||||
allow_overruns: bool = False,
|
||||
**ctx_kwargs,
|
||||
|
||||
) -> (trio.CancelScope, tractor.Context):
|
||||
) -> (trio.CancelScope, Context):
|
||||
'''
|
||||
Open a context in a service sub-actor, add to a stack
|
||||
that gets unwound at ``pikerd`` teardown.
|
||||
|
@ -101,13 +109,30 @@ class Services:
|
|||
# wait on any context's return value
|
||||
# and any final portal result from the
|
||||
# sub-actor.
|
||||
ctx_res = await ctx.result()
|
||||
ctx_res: Any = await ctx.result()
|
||||
|
||||
# NOTE: blocks indefinitely until cancelled
|
||||
# either by error from the target context
|
||||
# function or by being cancelled here by the
|
||||
# surrounding cancel scope.
|
||||
return (await portal.result(), ctx_res)
|
||||
except ContextCancelled as ctxe:
|
||||
canceller: tuple[str, str] = ctxe.canceller
|
||||
our_uid: tuple[str, str] = current_actor().uid
|
||||
if (
|
||||
canceller != portal.channel.uid
|
||||
and
|
||||
canceller != our_uid
|
||||
):
|
||||
log.cancel(
|
||||
f'Actor-service {name} was remotely cancelled?\n'
|
||||
f'remote canceller: {canceller}\n'
|
||||
f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n'
|
||||
)
|
||||
else:
|
||||
raise
|
||||
|
||||
|
||||
|
||||
finally:
|
||||
await portal.cancel_actor()
|
||||
|
|
|
@ -27,6 +27,7 @@ from typing import (
|
|||
)
|
||||
|
||||
import tractor
|
||||
from tractor import Portal
|
||||
|
||||
from ._util import (
|
||||
log, # sub-sys logger
|
||||
|
@ -140,7 +141,11 @@ async def find_service(
|
|||
|
||||
first_only: bool = True,
|
||||
|
||||
) -> tractor.Portal | None:
|
||||
) -> (
|
||||
Portal
|
||||
| list[Portal]
|
||||
| None
|
||||
):
|
||||
|
||||
reg_addrs: list[tuple[str, int]]
|
||||
async with open_registry(
|
||||
|
@ -153,6 +158,9 @@ async def find_service(
|
|||
),
|
||||
) as reg_addrs:
|
||||
log.info(f'Scanning for service `{service_name}`')
|
||||
|
||||
maybe_portals: list[Portal] | Portal | None
|
||||
|
||||
# attach to existing daemon by name if possible
|
||||
async with tractor.find_actor(
|
||||
service_name,
|
||||
|
|
Loading…
Reference in New Issue