Compare commits
2 Commits
35a9d8ec9d
...
de6189da4d
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | de6189da4d | |
Tyler Goodlet | cc5b21a7e6 |
|
@ -23,6 +23,7 @@ from __future__ import annotations
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
asynccontextmanager as acm,
|
asynccontextmanager as acm,
|
||||||
)
|
)
|
||||||
|
from functools import partial
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
from typing import (
|
from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
|
@ -190,14 +191,17 @@ def broker_init(
|
||||||
|
|
||||||
|
|
||||||
async def spawn_brokerd(
|
async def spawn_brokerd(
|
||||||
|
|
||||||
brokername: str,
|
brokername: str,
|
||||||
loglevel: str | None = None,
|
loglevel: str | None = None,
|
||||||
|
|
||||||
**tractor_kwargs,
|
**tractor_kwargs,
|
||||||
|
|
||||||
) -> bool:
|
) -> bool:
|
||||||
|
'''
|
||||||
|
Spawn a `brokerd.<backendname>` subactor service daemon
|
||||||
|
using `pikerd`'s service mngr.
|
||||||
|
|
||||||
|
'''
|
||||||
from piker.service._util import log # use service mngr log
|
from piker.service._util import log # use service mngr log
|
||||||
log.info(f'Spawning {brokername} broker daemon')
|
log.info(f'Spawning {brokername} broker daemon')
|
||||||
|
|
||||||
|
@ -217,27 +221,35 @@ async def spawn_brokerd(
|
||||||
|
|
||||||
# ask `pikerd` to spawn a new sub-actor and manage it under its
|
# ask `pikerd` to spawn a new sub-actor and manage it under its
|
||||||
# actor nursery
|
# actor nursery
|
||||||
from piker.service import Services
|
from piker.service import (
|
||||||
|
get_service_mngr,
|
||||||
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
|
ServiceMngr,
|
||||||
portal = await Services.actor_n.start_actor(
|
|
||||||
dname,
|
|
||||||
enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'),
|
|
||||||
debug_mode=Services.debug_mode,
|
|
||||||
**tractor_kwargs
|
|
||||||
)
|
)
|
||||||
|
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
|
||||||
# NOTE: the service mngr expects an already spawned actor + its
|
mngr: ServiceMngr = get_service_mngr()
|
||||||
# portal ref in order to do non-blocking setup of brokerd
|
ctx: tractor.Context = await mngr.start_service(
|
||||||
# service nursery.
|
daemon_name=dname,
|
||||||
await Services.start_service_task(
|
ctx_ep=partial(
|
||||||
dname,
|
|
||||||
portal,
|
|
||||||
|
|
||||||
# signature of target root-task endpoint
|
# signature of target root-task endpoint
|
||||||
daemon_fixture_ep,
|
daemon_fixture_ep,
|
||||||
|
|
||||||
|
# passed to daemon_fixture_ep(**kwargs)
|
||||||
brokername=brokername,
|
brokername=brokername,
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
|
),
|
||||||
|
debug_mode=mngr.debug_mode,
|
||||||
|
loglevel=loglevel,
|
||||||
|
enable_modules=(
|
||||||
|
_data_mods
|
||||||
|
+
|
||||||
|
tractor_kwargs.pop('enable_modules')
|
||||||
|
),
|
||||||
|
**tractor_kwargs
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
not ctx.cancel_called
|
||||||
|
and ctx.portal # parent side
|
||||||
|
and dname in ctx.chan.uid # subactor is named as desired
|
||||||
)
|
)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@ -262,8 +274,7 @@ async def maybe_spawn_brokerd(
|
||||||
from piker.service import maybe_spawn_daemon
|
from piker.service import maybe_spawn_daemon
|
||||||
|
|
||||||
async with maybe_spawn_daemon(
|
async with maybe_spawn_daemon(
|
||||||
|
service_name=f'brokerd.{brokername}',
|
||||||
f'brokerd.{brokername}',
|
|
||||||
service_task_target=spawn_brokerd,
|
service_task_target=spawn_brokerd,
|
||||||
spawn_args={
|
spawn_args={
|
||||||
'brokername': brokername,
|
'brokername': brokername,
|
||||||
|
|
|
@ -25,6 +25,7 @@ from collections import (
|
||||||
defaultdict,
|
defaultdict,
|
||||||
)
|
)
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
|
from functools import partial
|
||||||
import time
|
import time
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
|
@ -42,7 +43,7 @@ from tractor.trionics import (
|
||||||
maybe_open_nursery,
|
maybe_open_nursery,
|
||||||
)
|
)
|
||||||
import trio
|
import trio
|
||||||
from trio_typing import TaskStatus
|
from trio import TaskStatus
|
||||||
|
|
||||||
from .ticktools import (
|
from .ticktools import (
|
||||||
frame_ticks,
|
frame_ticks,
|
||||||
|
@ -70,6 +71,7 @@ if TYPE_CHECKING:
|
||||||
_default_delay_s: float = 1.0
|
_default_delay_s: float = 1.0
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: use new `tractor.singleton_acm` API for this!
|
||||||
class Sampler:
|
class Sampler:
|
||||||
'''
|
'''
|
||||||
Global sampling engine registry.
|
Global sampling engine registry.
|
||||||
|
@ -79,9 +81,9 @@ class Sampler:
|
||||||
|
|
||||||
This non-instantiated type is meant to be a singleton within
|
This non-instantiated type is meant to be a singleton within
|
||||||
a `samplerd` actor-service spawned once by the user wishing to
|
a `samplerd` actor-service spawned once by the user wishing to
|
||||||
time-step-sample (real-time) quote feeds, see
|
time-step-sample a (real-time) quote feeds, see
|
||||||
``.service.maybe_open_samplerd()`` and the below
|
`.service.maybe_open_samplerd()` and the below
|
||||||
``register_with_sampler()``.
|
`register_with_sampler()`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
service_nursery: None | trio.Nursery = None
|
service_nursery: None | trio.Nursery = None
|
||||||
|
@ -375,7 +377,10 @@ async def register_with_sampler(
|
||||||
assert Sampler.ohlcv_shms
|
assert Sampler.ohlcv_shms
|
||||||
|
|
||||||
# unblock caller
|
# unblock caller
|
||||||
await ctx.started(set(Sampler.ohlcv_shms.keys()))
|
await ctx.started(
|
||||||
|
# XXX bc msgpack only allows one array type!
|
||||||
|
list(Sampler.ohlcv_shms.keys())
|
||||||
|
)
|
||||||
|
|
||||||
if open_index_stream:
|
if open_index_stream:
|
||||||
try:
|
try:
|
||||||
|
@ -419,7 +424,6 @@ async def register_with_sampler(
|
||||||
|
|
||||||
|
|
||||||
async def spawn_samplerd(
|
async def spawn_samplerd(
|
||||||
|
|
||||||
loglevel: str | None = None,
|
loglevel: str | None = None,
|
||||||
**extra_tractor_kwargs
|
**extra_tractor_kwargs
|
||||||
|
|
||||||
|
@ -429,7 +433,10 @@ async def spawn_samplerd(
|
||||||
update and increment count write and stream broadcasting.
|
update and increment count write and stream broadcasting.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from piker.service import Services
|
from piker.service import (
|
||||||
|
get_service_mngr,
|
||||||
|
ServiceMngr,
|
||||||
|
)
|
||||||
|
|
||||||
dname = 'samplerd'
|
dname = 'samplerd'
|
||||||
log.info(f'Spawning `{dname}`')
|
log.info(f'Spawning `{dname}`')
|
||||||
|
@ -437,26 +444,33 @@ async def spawn_samplerd(
|
||||||
# singleton lock creation of ``samplerd`` since we only ever want
|
# singleton lock creation of ``samplerd`` since we only ever want
|
||||||
# one daemon per ``pikerd`` proc tree.
|
# one daemon per ``pikerd`` proc tree.
|
||||||
# TODO: make this built-into the service api?
|
# TODO: make this built-into the service api?
|
||||||
async with Services.locks[dname + '_singleton']:
|
mngr: ServiceMngr = get_service_mngr()
|
||||||
|
already_started: bool = dname in mngr.service_tasks
|
||||||
|
|
||||||
if dname not in Services.service_tasks:
|
async with mngr._locks[dname + '_singleton']:
|
||||||
|
ctx: Context = await mngr.start_service(
|
||||||
|
daemon_name=dname,
|
||||||
|
ctx_ep=partial(
|
||||||
|
register_with_sampler,
|
||||||
|
period_s=1,
|
||||||
|
sub_for_broadcasts=False,
|
||||||
|
),
|
||||||
|
debug_mode=mngr.debug_mode, # set by pikerd flag
|
||||||
|
|
||||||
portal = await Services.actor_n.start_actor(
|
# proxy-through to tractor
|
||||||
dname,
|
|
||||||
enable_modules=[
|
enable_modules=[
|
||||||
'piker.data._sampling',
|
'piker.data._sampling',
|
||||||
],
|
],
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
debug_mode=Services.debug_mode, # set by pikerd flag
|
|
||||||
**extra_tractor_kwargs
|
**extra_tractor_kwargs
|
||||||
)
|
)
|
||||||
|
if not already_started:
|
||||||
await Services.start_service_task(
|
assert (
|
||||||
dname,
|
ctx
|
||||||
portal,
|
and
|
||||||
register_with_sampler,
|
ctx.portal
|
||||||
period_s=1,
|
and
|
||||||
sub_for_broadcasts=False,
|
not ctx.cancel_called
|
||||||
)
|
)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@ -889,6 +903,7 @@ async def uniform_rate_send(
|
||||||
# to consumers which crash or lose network connection.
|
# to consumers which crash or lose network connection.
|
||||||
# I.e. we **DO NOT** want to crash and propagate up to
|
# I.e. we **DO NOT** want to crash and propagate up to
|
||||||
# ``pikerd`` these kinds of errors!
|
# ``pikerd`` these kinds of errors!
|
||||||
|
trio.EndOfChannel,
|
||||||
trio.ClosedResourceError,
|
trio.ClosedResourceError,
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
ConnectionResetError,
|
ConnectionResetError,
|
||||||
|
|
|
@ -30,7 +30,11 @@ Actor runtime primtives and (distributed) service APIs for,
|
||||||
=> TODO: maybe to (re)move elsewhere?
|
=> TODO: maybe to (re)move elsewhere?
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from ._mngr import Services as Services
|
from ._mngr import (
|
||||||
|
get_service_mngr as get_service_mngr,
|
||||||
|
open_service_mngr as open_service_mngr,
|
||||||
|
ServiceMngr as ServiceMngr,
|
||||||
|
)
|
||||||
from ._registry import (
|
from ._registry import (
|
||||||
_tractor_kwargs as _tractor_kwargs,
|
_tractor_kwargs as _tractor_kwargs,
|
||||||
_default_reg_addr as _default_reg_addr,
|
_default_reg_addr as _default_reg_addr,
|
||||||
|
|
|
@ -21,7 +21,6 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
import os
|
import os
|
||||||
from typing import (
|
from typing import (
|
||||||
Optional,
|
|
||||||
Any,
|
Any,
|
||||||
ClassVar,
|
ClassVar,
|
||||||
)
|
)
|
||||||
|
@ -30,13 +29,13 @@ from contextlib import (
|
||||||
)
|
)
|
||||||
|
|
||||||
import tractor
|
import tractor
|
||||||
import trio
|
|
||||||
|
|
||||||
from ._util import (
|
from ._util import (
|
||||||
get_console_log,
|
get_console_log,
|
||||||
)
|
)
|
||||||
from ._mngr import (
|
from ._mngr import (
|
||||||
Services,
|
open_service_mngr,
|
||||||
|
ServiceMngr,
|
||||||
)
|
)
|
||||||
from ._registry import ( # noqa
|
from ._registry import ( # noqa
|
||||||
_tractor_kwargs,
|
_tractor_kwargs,
|
||||||
|
@ -59,7 +58,7 @@ async def open_piker_runtime(
|
||||||
registry_addrs: list[tuple[str, int]] = [],
|
registry_addrs: list[tuple[str, int]] = [],
|
||||||
|
|
||||||
enable_modules: list[str] = [],
|
enable_modules: list[str] = [],
|
||||||
loglevel: Optional[str] = None,
|
loglevel: str|None = None,
|
||||||
|
|
||||||
# XXX NOTE XXX: you should pretty much never want debug mode
|
# XXX NOTE XXX: you should pretty much never want debug mode
|
||||||
# for data daemons when running in production.
|
# for data daemons when running in production.
|
||||||
|
@ -119,6 +118,10 @@ async def open_piker_runtime(
|
||||||
# spawn other specialized daemons I think?
|
# spawn other specialized daemons I think?
|
||||||
enable_modules=enable_modules,
|
enable_modules=enable_modules,
|
||||||
|
|
||||||
|
# TODO: how to configure this?
|
||||||
|
# keep it on by default if debug mode is set?
|
||||||
|
# maybe_enable_greenback=debug_mode,
|
||||||
|
|
||||||
**tractor_kwargs,
|
**tractor_kwargs,
|
||||||
) as actor,
|
) as actor,
|
||||||
|
|
||||||
|
@ -167,12 +170,13 @@ async def open_pikerd(
|
||||||
|
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> Services:
|
) -> ServiceMngr:
|
||||||
'''
|
'''
|
||||||
Start a root piker daemon with an indefinite lifetime.
|
Start a root piker daemon actor (aka `pikerd`) with an indefinite
|
||||||
|
lifetime.
|
||||||
|
|
||||||
A root actor nursery is created which can be used to create and keep
|
A root actor-nursery is created which can be used to spawn and
|
||||||
alive underling services (see below).
|
supervise underling service sub-actors (see below).
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# NOTE: for the root daemon we always enable the root
|
# NOTE: for the root daemon we always enable the root
|
||||||
|
@ -199,8 +203,6 @@ async def open_pikerd(
|
||||||
root_actor,
|
root_actor,
|
||||||
reg_addrs,
|
reg_addrs,
|
||||||
),
|
),
|
||||||
tractor.open_nursery() as actor_nursery,
|
|
||||||
trio.open_nursery() as service_nursery,
|
|
||||||
):
|
):
|
||||||
for addr in reg_addrs:
|
for addr in reg_addrs:
|
||||||
if addr not in root_actor.accept_addrs:
|
if addr not in root_actor.accept_addrs:
|
||||||
|
@ -209,25 +211,17 @@ async def open_pikerd(
|
||||||
'Maybe you have another daemon already running?'
|
'Maybe you have another daemon already running?'
|
||||||
)
|
)
|
||||||
|
|
||||||
# assign globally for future daemon/task creation
|
mngr: ServiceMngr
|
||||||
Services.actor_n = actor_nursery
|
async with open_service_mngr(
|
||||||
Services.service_n = service_nursery
|
debug_mode=debug_mode,
|
||||||
Services.debug_mode = debug_mode
|
) as mngr:
|
||||||
|
yield mngr
|
||||||
try:
|
|
||||||
yield Services
|
|
||||||
|
|
||||||
finally:
|
|
||||||
# TODO: is this more clever/efficient?
|
|
||||||
# if 'samplerd' in Services.service_tasks:
|
|
||||||
# await Services.cancel_service('samplerd')
|
|
||||||
service_nursery.cancel_scope.cancel()
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: do we even need this?
|
# TODO: do we even need this?
|
||||||
# @acm
|
# @acm
|
||||||
# async def maybe_open_runtime(
|
# async def maybe_open_runtime(
|
||||||
# loglevel: Optional[str] = None,
|
# loglevel: str|None = None,
|
||||||
# **kwargs,
|
# **kwargs,
|
||||||
|
|
||||||
# ) -> None:
|
# ) -> None:
|
||||||
|
@ -256,7 +250,7 @@ async def maybe_open_pikerd(
|
||||||
loglevel: str | None = None,
|
loglevel: str | None = None,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> tractor._portal.Portal | ClassVar[Services]:
|
) -> tractor._portal.Portal | ClassVar[ServiceMngr]:
|
||||||
'''
|
'''
|
||||||
If no ``pikerd`` daemon-root-actor can be found start it and
|
If no ``pikerd`` daemon-root-actor can be found start it and
|
||||||
yield up (we should probably figure out returning a portal to self
|
yield up (we should probably figure out returning a portal to self
|
||||||
|
|
|
@ -49,7 +49,7 @@ from requests.exceptions import (
|
||||||
ReadTimeout,
|
ReadTimeout,
|
||||||
)
|
)
|
||||||
|
|
||||||
from ._mngr import Services
|
from ._mngr import ServiceMngr
|
||||||
from ._util import (
|
from ._util import (
|
||||||
log, # sub-sys logger
|
log, # sub-sys logger
|
||||||
get_console_log,
|
get_console_log,
|
||||||
|
@ -453,7 +453,7 @@ async def open_ahabd(
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def start_ahab_service(
|
async def start_ahab_service(
|
||||||
services: Services,
|
services: ServiceMngr,
|
||||||
service_name: str,
|
service_name: str,
|
||||||
|
|
||||||
# endpoint config passed as **kwargs
|
# endpoint config passed as **kwargs
|
||||||
|
@ -549,7 +549,8 @@ async def start_ahab_service(
|
||||||
log.warning('Failed to cancel root permsed container')
|
log.warning('Failed to cancel root permsed container')
|
||||||
|
|
||||||
except (
|
except (
|
||||||
trio.MultiError,
|
# trio.MultiError,
|
||||||
|
ExceptionGroup,
|
||||||
) as err:
|
) as err:
|
||||||
for subexc in err.exceptions:
|
for subexc in err.exceptions:
|
||||||
if isinstance(subexc, PermissionError):
|
if isinstance(subexc, PermissionError):
|
||||||
|
|
|
@ -26,14 +26,17 @@ from typing import (
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
asynccontextmanager as acm,
|
asynccontextmanager as acm,
|
||||||
)
|
)
|
||||||
|
from collections import defaultdict
|
||||||
|
|
||||||
import tractor
|
import tractor
|
||||||
|
import trio
|
||||||
|
|
||||||
from ._util import (
|
from ._util import (
|
||||||
log, # sub-sys logger
|
log, # sub-sys logger
|
||||||
)
|
)
|
||||||
from ._mngr import (
|
from ._mngr import (
|
||||||
Services,
|
get_service_mngr,
|
||||||
|
ServiceMngr,
|
||||||
)
|
)
|
||||||
from ._actor_runtime import maybe_open_pikerd
|
from ._actor_runtime import maybe_open_pikerd
|
||||||
from ._registry import find_service
|
from ._registry import find_service
|
||||||
|
@ -41,15 +44,14 @@ from ._registry import find_service
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def maybe_spawn_daemon(
|
async def maybe_spawn_daemon(
|
||||||
|
|
||||||
service_name: str,
|
service_name: str,
|
||||||
service_task_target: Callable,
|
service_task_target: Callable,
|
||||||
|
|
||||||
spawn_args: dict[str, Any],
|
spawn_args: dict[str, Any],
|
||||||
|
|
||||||
loglevel: str | None = None,
|
loglevel: str | None = None,
|
||||||
singleton: bool = False,
|
singleton: bool = False,
|
||||||
|
|
||||||
|
_locks = defaultdict(trio.Lock),
|
||||||
**pikerd_kwargs,
|
**pikerd_kwargs,
|
||||||
|
|
||||||
) -> tractor.Portal:
|
) -> tractor.Portal:
|
||||||
|
@ -67,7 +69,7 @@ async def maybe_spawn_daemon(
|
||||||
'''
|
'''
|
||||||
# serialize access to this section to avoid
|
# serialize access to this section to avoid
|
||||||
# 2 or more tasks racing to create a daemon
|
# 2 or more tasks racing to create a daemon
|
||||||
lock = Services.locks[service_name]
|
lock = _locks[service_name]
|
||||||
await lock.acquire()
|
await lock.acquire()
|
||||||
|
|
||||||
async with find_service(
|
async with find_service(
|
||||||
|
@ -132,7 +134,65 @@ async def maybe_spawn_daemon(
|
||||||
async with tractor.wait_for_actor(service_name) as portal:
|
async with tractor.wait_for_actor(service_name) as portal:
|
||||||
lock.release()
|
lock.release()
|
||||||
yield portal
|
yield portal
|
||||||
await portal.cancel_actor()
|
# --- ---- ---
|
||||||
|
# XXX NOTE XXX
|
||||||
|
# --- ---- ---
|
||||||
|
# DO NOT PUT A `portal.cancel_actor()` here (as was prior)!
|
||||||
|
#
|
||||||
|
# Doing so will cause an "out-of-band" ctxc
|
||||||
|
# (`tractor.ContextCancelled`) to be raised inside the
|
||||||
|
# `ServiceMngr.open_context_in_task()`'s call to
|
||||||
|
# `ctx.wait_for_result()` AND the internal self-ctxc
|
||||||
|
# "graceful capture" WILL NOT CATCH IT!
|
||||||
|
#
|
||||||
|
# This can cause certain types of operations to raise
|
||||||
|
# that ctxc BEFORE THEY `return`, resulting in
|
||||||
|
# a "false-negative" ctxc being raised when really
|
||||||
|
# nothing actually failed, other then our semantic
|
||||||
|
# "failure" to suppress an expected, graceful,
|
||||||
|
# self-cancel scenario..
|
||||||
|
#
|
||||||
|
# bUt wHy duZ It WorK lIKe dis..
|
||||||
|
# ------------------------------
|
||||||
|
# from the perspective of the `tractor.Context` this
|
||||||
|
# cancel request was conducted "out of band" since
|
||||||
|
# `Context.cancel()` was never called and thus the
|
||||||
|
# `._cancel_called: bool` was never set. Despite the
|
||||||
|
# remote `.canceller` being set to `pikerd` (i.e. the
|
||||||
|
# same `Actor.uid` of the raising service-mngr task) the
|
||||||
|
# service-task's ctx itself was never marked as having
|
||||||
|
# requested cancellation and thus still raises the ctxc
|
||||||
|
# bc it was unaware of any such request.
|
||||||
|
#
|
||||||
|
# How to make grokin these cases easier tho?
|
||||||
|
# ------------------------------------------
|
||||||
|
# Because `Portal.cancel_actor()` was called it requests
|
||||||
|
# "full-`Actor`-runtime-cancellation" of it's peer
|
||||||
|
# process which IS NOT THE SAME as a single inter-actor
|
||||||
|
# RPC task cancelling its local context with a remote
|
||||||
|
# peer `Task` in that same peer process.
|
||||||
|
#
|
||||||
|
# ?TODO? It might be better if we do one (or all) of the
|
||||||
|
# following:
|
||||||
|
#
|
||||||
|
# -[ ] at least set a special message for the
|
||||||
|
# `ContextCancelled` when raised locally by the
|
||||||
|
# unaware ctx task such that we check for the
|
||||||
|
# `.canceller` being *our `Actor`* and in the case
|
||||||
|
# where `Context._cancel_called == False` we specially
|
||||||
|
# note that this is likely an "out-of-band"
|
||||||
|
# runtime-cancel request triggered by some call to
|
||||||
|
# `Portal.cancel_actor()`, possibly even reporting the
|
||||||
|
# exact LOC of that caller by tracking it inside our
|
||||||
|
# portal-type?
|
||||||
|
# -[ ] possibly add another field `ContextCancelled` like
|
||||||
|
# maybe a,
|
||||||
|
# `.request_type: Literal['os', 'proc', 'actor',
|
||||||
|
# 'ctx']` type thing which would allow immediately
|
||||||
|
# being able to tell what kind of cancellation caused
|
||||||
|
# the unexpected ctxc?
|
||||||
|
# -[ ] REMOVE THIS COMMENT, once we've settled on how to
|
||||||
|
# better augment `tractor` to be more explicit on this!
|
||||||
|
|
||||||
|
|
||||||
async def spawn_emsd(
|
async def spawn_emsd(
|
||||||
|
@ -147,21 +207,22 @@ async def spawn_emsd(
|
||||||
"""
|
"""
|
||||||
log.info('Spawning emsd')
|
log.info('Spawning emsd')
|
||||||
|
|
||||||
portal = await Services.actor_n.start_actor(
|
smngr: ServiceMngr = get_service_mngr()
|
||||||
|
portal = await smngr.actor_n.start_actor(
|
||||||
'emsd',
|
'emsd',
|
||||||
enable_modules=[
|
enable_modules=[
|
||||||
'piker.clearing._ems',
|
'piker.clearing._ems',
|
||||||
'piker.clearing._client',
|
'piker.clearing._client',
|
||||||
],
|
],
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
debug_mode=Services.debug_mode, # set by pikerd flag
|
debug_mode=smngr.debug_mode, # set by pikerd flag
|
||||||
**extra_tractor_kwargs
|
**extra_tractor_kwargs
|
||||||
)
|
)
|
||||||
|
|
||||||
# non-blocking setup of clearing service
|
# non-blocking setup of clearing service
|
||||||
from ..clearing._ems import _setup_persistent_emsd
|
from ..clearing._ems import _setup_persistent_emsd
|
||||||
|
|
||||||
await Services.start_service_task(
|
await smngr.start_service_task(
|
||||||
'emsd',
|
'emsd',
|
||||||
portal,
|
portal,
|
||||||
|
|
||||||
|
|
|
@ -18,16 +18,29 @@
|
||||||
daemon-service management API.
|
daemon-service management API.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
from contextlib import (
|
||||||
|
asynccontextmanager as acm,
|
||||||
|
# contextmanager as cm,
|
||||||
|
)
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
from dataclasses import (
|
||||||
|
dataclass,
|
||||||
|
field,
|
||||||
|
)
|
||||||
|
import functools
|
||||||
|
import inspect
|
||||||
from typing import (
|
from typing import (
|
||||||
Callable,
|
Callable,
|
||||||
Any,
|
Any,
|
||||||
)
|
)
|
||||||
|
|
||||||
import trio
|
import msgspec
|
||||||
from trio_typing import TaskStatus
|
|
||||||
import tractor
|
import tractor
|
||||||
|
import trio
|
||||||
|
from trio import TaskStatus
|
||||||
from tractor import (
|
from tractor import (
|
||||||
|
ActorNursery,
|
||||||
current_actor,
|
current_actor,
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
Context,
|
Context,
|
||||||
|
@ -39,6 +52,130 @@ from ._util import (
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: implement a singleton deco-API for wrapping the below
|
||||||
|
# factory's impl for general actor-singleton use?
|
||||||
|
#
|
||||||
|
# @singleton
|
||||||
|
# async def open_service_mngr(
|
||||||
|
# **init_kwargs,
|
||||||
|
# ) -> ServiceMngr:
|
||||||
|
# '''
|
||||||
|
# Note this function body is invoke IFF no existing singleton instance already
|
||||||
|
# exists in this proc's memory.
|
||||||
|
|
||||||
|
# '''
|
||||||
|
# # setup
|
||||||
|
# yield ServiceMngr(**init_kwargs)
|
||||||
|
# # teardown
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: singleton factory API instead of a class API
|
||||||
|
@acm
|
||||||
|
async def open_service_mngr(
|
||||||
|
*,
|
||||||
|
debug_mode: bool = False,
|
||||||
|
|
||||||
|
# impl deat which ensures a single global instance
|
||||||
|
_singleton: list[ServiceMngr|None] = [None],
|
||||||
|
**init_kwargs,
|
||||||
|
|
||||||
|
) -> ServiceMngr:
|
||||||
|
'''
|
||||||
|
Open a multi-subactor-as-service-daemon tree supervisor.
|
||||||
|
|
||||||
|
The delivered `ServiceMngr` is a singleton instance for each
|
||||||
|
actor-process and is allocated on first open and never
|
||||||
|
de-allocated unless explicitly deleted by al call to
|
||||||
|
`del_service_mngr()`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# TODO: factor this an allocation into
|
||||||
|
# a `._mngr.open_service_mngr()` and put in the
|
||||||
|
# once-n-only-once setup/`.__aenter__()` part!
|
||||||
|
# -[ ] how to make this only happen on the `mngr == None` case?
|
||||||
|
# |_ use `.trionics.maybe_open_context()` (for generic
|
||||||
|
# async-with-style-only-once of the factory impl, though
|
||||||
|
# what do we do for the allocation case?
|
||||||
|
# / `.maybe_open_nursery()` (since for this specific case
|
||||||
|
# it's simpler?) to activate
|
||||||
|
async with (
|
||||||
|
tractor.open_nursery() as an,
|
||||||
|
trio.open_nursery() as tn,
|
||||||
|
):
|
||||||
|
# impl specific obvi..
|
||||||
|
init_kwargs.update({
|
||||||
|
'actor_n': an,
|
||||||
|
'service_n': tn,
|
||||||
|
})
|
||||||
|
|
||||||
|
mngr: ServiceMngr|None
|
||||||
|
if (mngr := _singleton[0]) is None:
|
||||||
|
|
||||||
|
log.info('Allocating a new service mngr!')
|
||||||
|
mngr = _singleton[0] = ServiceMngr(**init_kwargs)
|
||||||
|
|
||||||
|
# TODO: put into `.__aenter__()` section of
|
||||||
|
# eventual `@singleton_acm` API wrapper.
|
||||||
|
#
|
||||||
|
# assign globally for future daemon/task creation
|
||||||
|
mngr.actor_n = an
|
||||||
|
mngr.service_n = tn
|
||||||
|
|
||||||
|
else:
|
||||||
|
assert (
|
||||||
|
mngr.actor_n
|
||||||
|
and
|
||||||
|
mngr.service_tn
|
||||||
|
)
|
||||||
|
log.info(
|
||||||
|
'Using extant service mngr!\n\n'
|
||||||
|
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# NOTE: this is a singleton factory impl specific detail
|
||||||
|
# which should be supported in the condensed
|
||||||
|
# `@singleton_acm` API?
|
||||||
|
mngr.debug_mode = debug_mode
|
||||||
|
|
||||||
|
yield mngr
|
||||||
|
finally:
|
||||||
|
# TODO: is this more clever/efficient?
|
||||||
|
# if 'samplerd' in mngr.service_tasks:
|
||||||
|
# await mngr.cancel_service('samplerd')
|
||||||
|
tn.cancel_scope.cancel()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def get_service_mngr() -> ServiceMngr:
|
||||||
|
'''
|
||||||
|
Try to get the singleton service-mngr for this actor presuming it
|
||||||
|
has already been allocated using,
|
||||||
|
|
||||||
|
.. code:: python
|
||||||
|
|
||||||
|
async with open_<@singleton_acm(func)>() as mngr`
|
||||||
|
... this block kept open ...
|
||||||
|
|
||||||
|
If not yet allocated raise a `ServiceError`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# https://stackoverflow.com/a/12627202
|
||||||
|
# https://docs.python.org/3/library/inspect.html#inspect.Signature
|
||||||
|
maybe_mngr: ServiceMngr|None = inspect.signature(
|
||||||
|
open_service_mngr
|
||||||
|
).parameters['_singleton'].default[0]
|
||||||
|
|
||||||
|
if maybe_mngr is None:
|
||||||
|
raise RuntimeError(
|
||||||
|
'Someone must allocate a `ServiceMngr` using\n\n'
|
||||||
|
'`async with open_service_mngr()` beforehand!!\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
return maybe_mngr
|
||||||
|
|
||||||
|
|
||||||
# TODO: we need remote wrapping and a general soln:
|
# TODO: we need remote wrapping and a general soln:
|
||||||
# - factor this into a ``tractor.highlevel`` extension # pack for the
|
# - factor this into a ``tractor.highlevel`` extension # pack for the
|
||||||
# library.
|
# library.
|
||||||
|
@ -46,31 +183,46 @@ from ._util import (
|
||||||
# to the pikerd actor for starting services remotely!
|
# to the pikerd actor for starting services remotely!
|
||||||
# - prolly rename this to ActorServicesNursery since it spawns
|
# - prolly rename this to ActorServicesNursery since it spawns
|
||||||
# new actors and supervises them to completion?
|
# new actors and supervises them to completion?
|
||||||
class Services:
|
@dataclass
|
||||||
|
class ServiceMngr:
|
||||||
|
# class ServiceMngr(msgspec.Struct):
|
||||||
|
'''
|
||||||
|
A multi-subactor-as-service manager.
|
||||||
|
|
||||||
actor_n: tractor._supervise.ActorNursery
|
Spawn, supervise and monitor service/daemon subactors in a SC
|
||||||
|
process tree.
|
||||||
|
|
||||||
|
'''
|
||||||
|
actor_n: ActorNursery
|
||||||
service_n: trio.Nursery
|
service_n: trio.Nursery
|
||||||
debug_mode: bool # tractor sub-actor debug mode flag
|
debug_mode: bool = False # tractor sub-actor debug mode flag
|
||||||
|
|
||||||
service_tasks: dict[
|
service_tasks: dict[
|
||||||
str,
|
str,
|
||||||
tuple[
|
tuple[
|
||||||
trio.CancelScope,
|
trio.CancelScope,
|
||||||
|
Context,
|
||||||
Portal,
|
Portal,
|
||||||
trio.Event,
|
trio.Event,
|
||||||
]
|
]
|
||||||
] = {}
|
] = field(default_factory=dict)
|
||||||
locks = defaultdict(trio.Lock)
|
|
||||||
|
# internal per-service task mutexs
|
||||||
|
_locks = defaultdict(trio.Lock)
|
||||||
|
|
||||||
@classmethod
|
|
||||||
async def start_service_task(
|
async def start_service_task(
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
portal: Portal,
|
portal: Portal,
|
||||||
|
|
||||||
|
# TODO: typevar for the return type of the target and then
|
||||||
|
# use it below for `ctx_res`?
|
||||||
target: Callable,
|
target: Callable,
|
||||||
|
|
||||||
allow_overruns: bool = False,
|
allow_overruns: bool = False,
|
||||||
**ctx_kwargs,
|
**ctx_kwargs,
|
||||||
|
|
||||||
) -> (trio.CancelScope, Context):
|
) -> (trio.CancelScope, Context, Any):
|
||||||
'''
|
'''
|
||||||
Open a context in a service sub-actor, add to a stack
|
Open a context in a service sub-actor, add to a stack
|
||||||
that gets unwound at ``pikerd`` teardown.
|
that gets unwound at ``pikerd`` teardown.
|
||||||
|
@ -83,6 +235,7 @@ class Services:
|
||||||
task_status: TaskStatus[
|
task_status: TaskStatus[
|
||||||
tuple[
|
tuple[
|
||||||
trio.CancelScope,
|
trio.CancelScope,
|
||||||
|
Context,
|
||||||
trio.Event,
|
trio.Event,
|
||||||
Any,
|
Any,
|
||||||
]
|
]
|
||||||
|
@ -90,64 +243,87 @@ class Services:
|
||||||
|
|
||||||
) -> Any:
|
) -> Any:
|
||||||
|
|
||||||
|
# TODO: use the ctx._scope directly here instead?
|
||||||
|
# -[ ] actually what semantics do we expect for this
|
||||||
|
# usage!?
|
||||||
with trio.CancelScope() as cs:
|
with trio.CancelScope() as cs:
|
||||||
|
try:
|
||||||
async with portal.open_context(
|
async with portal.open_context(
|
||||||
target,
|
target,
|
||||||
allow_overruns=allow_overruns,
|
allow_overruns=allow_overruns,
|
||||||
**ctx_kwargs,
|
**ctx_kwargs,
|
||||||
|
|
||||||
) as (ctx, first):
|
) as (ctx, started):
|
||||||
|
|
||||||
# unblock once the remote context has started
|
# unblock once the remote context has started
|
||||||
complete = trio.Event()
|
complete = trio.Event()
|
||||||
task_status.started((cs, complete, first))
|
task_status.started((
|
||||||
|
cs,
|
||||||
|
ctx,
|
||||||
|
complete,
|
||||||
|
started,
|
||||||
|
))
|
||||||
log.info(
|
log.info(
|
||||||
f'`pikerd` service {name} started with value {first}'
|
f'`pikerd` service {name} started with value {started}'
|
||||||
)
|
)
|
||||||
try:
|
|
||||||
# wait on any context's return value
|
# wait on any context's return value
|
||||||
# and any final portal result from the
|
# and any final portal result from the
|
||||||
# sub-actor.
|
# sub-actor.
|
||||||
ctx_res: Any = await ctx.result()
|
ctx_res: Any = await ctx.wait_for_result()
|
||||||
|
|
||||||
# NOTE: blocks indefinitely until cancelled
|
# NOTE: blocks indefinitely until cancelled
|
||||||
# either by error from the target context
|
# either by error from the target context
|
||||||
# function or by being cancelled here by the
|
# function or by being cancelled here by the
|
||||||
# surrounding cancel scope.
|
# surrounding cancel scope.
|
||||||
return (await portal.result(), ctx_res)
|
return (
|
||||||
|
await portal.wait_for_result(),
|
||||||
|
ctx_res,
|
||||||
|
)
|
||||||
|
|
||||||
except ContextCancelled as ctxe:
|
except ContextCancelled as ctxe:
|
||||||
canceller: tuple[str, str] = ctxe.canceller
|
canceller: tuple[str, str] = ctxe.canceller
|
||||||
our_uid: tuple[str, str] = current_actor().uid
|
our_uid: tuple[str, str] = current_actor().uid
|
||||||
if (
|
if (
|
||||||
canceller != portal.channel.uid
|
canceller != portal.chan.uid
|
||||||
and
|
and
|
||||||
canceller != our_uid
|
canceller != our_uid
|
||||||
):
|
):
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Actor-service {name} was remotely cancelled?\n'
|
f'Actor-service `{name}` was remotely cancelled by a peer?\n'
|
||||||
f'remote canceller: {canceller}\n'
|
|
||||||
f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n'
|
# TODO: this would be a good spot to use
|
||||||
|
# a respawn feature Bo
|
||||||
|
f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n'
|
||||||
|
|
||||||
|
f'cancellee: {portal.chan.uid}\n'
|
||||||
|
f'canceller: {canceller}\n'
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
# NOTE: the ctx MUST be cancelled first if we
|
||||||
|
# don't want the above `ctx.wait_for_result()` to
|
||||||
|
# raise a self-ctxc. WHY, well since from the ctx's
|
||||||
|
# perspective the cancel request will have
|
||||||
|
# arrived out-out-of-band at the `Actor.cancel()`
|
||||||
|
# level, thus `Context.cancel_called == False`,
|
||||||
|
# meaning `ctx._is_self_cancelled() == False`.
|
||||||
|
# with trio.CancelScope(shield=True):
|
||||||
|
# await ctx.cancel()
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
complete.set()
|
complete.set()
|
||||||
self.service_tasks.pop(name)
|
self.service_tasks.pop(name)
|
||||||
|
|
||||||
cs, complete, first = await self.service_n.start(open_context_in_task)
|
cs, sub_ctx, complete, started = await self.service_n.start(
|
||||||
|
open_context_in_task
|
||||||
|
)
|
||||||
|
|
||||||
# store the cancel scope and portal for later cancellation or
|
# store the cancel scope and portal for later cancellation or
|
||||||
# retstart if needed.
|
# retstart if needed.
|
||||||
self.service_tasks[name] = (cs, portal, complete)
|
self.service_tasks[name] = (cs, sub_ctx, portal, complete)
|
||||||
|
return cs, sub_ctx, started
|
||||||
|
|
||||||
return cs, first
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
async def cancel_service(
|
async def cancel_service(
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
|
@ -158,8 +334,80 @@ class Services:
|
||||||
|
|
||||||
'''
|
'''
|
||||||
log.info(f'Cancelling `pikerd` service {name}')
|
log.info(f'Cancelling `pikerd` service {name}')
|
||||||
cs, portal, complete = self.service_tasks[name]
|
cs, sub_ctx, portal, complete = self.service_tasks[name]
|
||||||
cs.cancel()
|
|
||||||
|
# cs.cancel()
|
||||||
|
await sub_ctx.cancel()
|
||||||
await complete.wait()
|
await complete.wait()
|
||||||
assert name not in self.service_tasks, \
|
|
||||||
|
if name in self.service_tasks:
|
||||||
|
# TODO: custom err?
|
||||||
|
# raise ServiceError(
|
||||||
|
raise RuntimeError(
|
||||||
f'Serice task for {name} not terminated?'
|
f'Serice task for {name} not terminated?'
|
||||||
|
)
|
||||||
|
|
||||||
|
# assert name not in self.service_tasks, \
|
||||||
|
# f'Serice task for {name} not terminated?'
|
||||||
|
|
||||||
|
async def start_service(
|
||||||
|
self,
|
||||||
|
daemon_name: str,
|
||||||
|
ctx_ep: Callable, # kwargs must `partial`-ed in!
|
||||||
|
|
||||||
|
debug_mode: bool = False,
|
||||||
|
**tractor_actor_kwargs,
|
||||||
|
|
||||||
|
) -> Context:
|
||||||
|
'''
|
||||||
|
Start a "service" task in a new sub-actor (daemon) and manage it's lifetime
|
||||||
|
indefinitely.
|
||||||
|
|
||||||
|
Services can be cancelled/shutdown using `.cancel_service()`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
entry: tuple|None = self.service_tasks.get(daemon_name)
|
||||||
|
if entry:
|
||||||
|
(cs, sub_ctx, portal, complete) = entry
|
||||||
|
return sub_ctx
|
||||||
|
|
||||||
|
if daemon_name not in self.service_tasks:
|
||||||
|
portal = await self.actor_n.start_actor(
|
||||||
|
daemon_name,
|
||||||
|
debug_mode=( # maybe set globally during allocate
|
||||||
|
debug_mode
|
||||||
|
or
|
||||||
|
self.debug_mode
|
||||||
|
),
|
||||||
|
**tractor_actor_kwargs,
|
||||||
|
)
|
||||||
|
ctx_kwargs: dict[str, Any] = {}
|
||||||
|
if isinstance(ctx_ep, functools.partial):
|
||||||
|
ctx_kwargs: dict[str, Any] = ctx_ep.keywords
|
||||||
|
ctx_ep: Callable = ctx_ep.func
|
||||||
|
|
||||||
|
(cs, sub_ctx, started) = await self.start_service_task(
|
||||||
|
daemon_name,
|
||||||
|
portal,
|
||||||
|
ctx_ep,
|
||||||
|
**ctx_kwargs,
|
||||||
|
)
|
||||||
|
|
||||||
|
return sub_ctx
|
||||||
|
|
||||||
|
|
||||||
|
# TODO:
|
||||||
|
# -[ ] factor all the common shit from `.data._sampling`
|
||||||
|
# and `.brokers._daemon` into here / `ServiceMngr`
|
||||||
|
# in terms of allocating the `Portal` as part of the
|
||||||
|
# "service-in-subactor" starting!
|
||||||
|
# -[ ] move to `tractor.hilevel._service`, import and use here!
|
||||||
|
# NOTE: purposely leaks the ref to the mod-scope Bo
|
||||||
|
# import tractor
|
||||||
|
# from tractor.hilevel import (
|
||||||
|
# open_service_mngr,
|
||||||
|
# ServiceMngr,
|
||||||
|
# )
|
||||||
|
# mngr: ServiceMngr|None = None
|
||||||
|
# with tractor.hilevel.open_service_mngr() as mngr:
|
||||||
|
# Services = proxy(mngr)
|
||||||
|
|
|
@ -21,11 +21,13 @@ from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# TODO: oof, needs to be changed to `httpx`!
|
||||||
import asks
|
import asks
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
import docker
|
import docker
|
||||||
from ._ahab import DockerContainer
|
from ._ahab import DockerContainer
|
||||||
|
from . import ServiceMngr
|
||||||
|
|
||||||
from ._util import log # sub-sys logger
|
from ._util import log # sub-sys logger
|
||||||
from ._util import (
|
from ._util import (
|
||||||
|
@ -127,7 +129,7 @@ def start_elasticsearch(
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def start_ahab_daemon(
|
async def start_ahab_daemon(
|
||||||
service_mngr: Services,
|
service_mngr: ServiceMngr,
|
||||||
user_config: dict | None = None,
|
user_config: dict | None = None,
|
||||||
loglevel: str | None = None,
|
loglevel: str | None = None,
|
||||||
|
|
||||||
|
|
|
@ -53,7 +53,7 @@ import pendulum
|
||||||
# import purerpc
|
# import purerpc
|
||||||
|
|
||||||
from ..data.feed import maybe_open_feed
|
from ..data.feed import maybe_open_feed
|
||||||
from . import Services
|
from . import ServiceMngr
|
||||||
from ._util import (
|
from ._util import (
|
||||||
log, # sub-sys logger
|
log, # sub-sys logger
|
||||||
get_console_log,
|
get_console_log,
|
||||||
|
@ -233,7 +233,7 @@ def start_marketstore(
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def start_ahab_daemon(
|
async def start_ahab_daemon(
|
||||||
service_mngr: Services,
|
service_mngr: ServiceMngr,
|
||||||
user_config: dict | None = None,
|
user_config: dict | None = None,
|
||||||
loglevel: str | None = None,
|
loglevel: str | None = None,
|
||||||
|
|
||||||
|
|
|
@ -10,7 +10,7 @@ from piker import (
|
||||||
config,
|
config,
|
||||||
)
|
)
|
||||||
from piker.service import (
|
from piker.service import (
|
||||||
Services,
|
get_service_mngr,
|
||||||
)
|
)
|
||||||
from piker.log import get_console_log
|
from piker.log import get_console_log
|
||||||
|
|
||||||
|
@ -129,7 +129,7 @@ async def _open_test_pikerd(
|
||||||
) as service_manager,
|
) as service_manager,
|
||||||
):
|
):
|
||||||
# this proc/actor is the pikerd
|
# this proc/actor is the pikerd
|
||||||
assert service_manager is Services
|
assert service_manager is get_service_mngr()
|
||||||
|
|
||||||
async with tractor.wait_for_actor(
|
async with tractor.wait_for_actor(
|
||||||
'pikerd',
|
'pikerd',
|
||||||
|
|
|
@ -26,7 +26,7 @@ import pytest
|
||||||
import tractor
|
import tractor
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
from piker.service import Services
|
from piker.service import ServiceMngr
|
||||||
from piker.log import get_logger
|
from piker.log import get_logger
|
||||||
from piker.clearing._messages import (
|
from piker.clearing._messages import (
|
||||||
Order,
|
Order,
|
||||||
|
@ -158,7 +158,7 @@ def load_and_check_pos(
|
||||||
|
|
||||||
|
|
||||||
def test_ems_err_on_bad_broker(
|
def test_ems_err_on_bad_broker(
|
||||||
open_test_pikerd: Services,
|
open_test_pikerd: ServiceMngr,
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
):
|
):
|
||||||
async def load_bad_fqme():
|
async def load_bad_fqme():
|
||||||
|
|
|
@ -15,7 +15,7 @@ import tractor
|
||||||
|
|
||||||
from piker.service import (
|
from piker.service import (
|
||||||
find_service,
|
find_service,
|
||||||
Services,
|
ServiceMngr,
|
||||||
)
|
)
|
||||||
from piker.data import (
|
from piker.data import (
|
||||||
open_feed,
|
open_feed,
|
||||||
|
@ -44,7 +44,7 @@ def test_runtime_boot(
|
||||||
async def main():
|
async def main():
|
||||||
port = 6666
|
port = 6666
|
||||||
daemon_addr = ('127.0.0.1', port)
|
daemon_addr = ('127.0.0.1', port)
|
||||||
services: Services
|
services: ServiceMngr
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
open_test_pikerd(
|
open_test_pikerd(
|
||||||
|
|
Loading…
Reference in New Issue