Nixos default.nix file for Qt6 #7

Open
ntorres wants to merge 7 commits from nix-qt6-fix into go_httpx
18 changed files with 1531 additions and 574 deletions

82
default.nix 100644
View File

@ -0,0 +1,82 @@
with (import <nixpkgs> {});
with python312Packages;
let
glibStorePath = lib.getLib glib;
qtpyStorePath = lib.getLib qtpy;
pyqt6StorePath = lib.getLib pyqt6;
pyqt6SipStorePath = lib.getLib pyqt6-sip;
qt6baseStorePath = lib.getLib qt6.qtbase;
rapidfuzzStorePath = lib.getLib rapidfuzz;
qdarkstyleStorePath = lib.getLib qdarkstyle;
in
stdenv.mkDerivation {
name = "piker-qt6-poetry-shell";
buildInputs = [
# System requirements.
glib
qt6.qtbase
libgcc.lib
# Python requirements.
python312Full
poetry-core
qdarkstyle
rapidfuzz
pyqt6
qtpy
];
src = null;
shellHook = ''
set -e
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${libgcc.lib}/lib:${glibStorePath}/lib
# Set the Qt plugin path
# export QT_DEBUG_PLUGINS=1
QTBASE_PATH="${qt6baseStorePath}"
echo "qtbase path: $QTBASE_PATH"
echo ""
export QT_PLUGIN_PATH="$QTBASE_PATH/lib/qt-6/plugins"
export QT_QPA_PLATFORM_PLUGIN_PATH="$QT_PLUGIN_PATH/platforms"
echo "qt plugin path: $QT_PLUGIN_PATH"
echo ""
# Maybe create venv & install deps
poetry install --with uis
# Use pyqt6 from System, patch activate script
ACTIVATE_SCRIPT_PATH="$(poetry env info --path)/bin/activate"
export RPDFUZZ_PATH="${rapidfuzzStorePath}/lib/python3.12/site-packages"
export QDRKSTYLE_PATH="${qdarkstyleStorePath}/lib/python3.12/site-packages"
export QTPY_PATH="${qtpyStorePath}/lib/python3.12/site-packages"
export PYQT6_PATH="${pyqt6StorePath}/lib/python3.12/site-packages"
export PYQT6_SIP_PATH="${pyqt6SipStorePath}/lib/python3.12/site-packages"
echo "rapidfuzz at: $RPDFUZZ_PATH"
echo "qdarkstyle at: $QDRKSTYLE_PATH"
echo "qtpy at: $QTPY_PATH"
echo "pyqt6 at: $PYQT6_PATH"
echo "pyqt6-sip at: $PYQT6_SIP_PATH"
echo ""
PATCH="export PYTHONPATH=\""
PATCH="$PATCH\$RPDFUZZ_PATH"
PATCH="$PATCH:\$QDRKSTYLE_PATH"
PATCH="$PATCH:\$QTPY_PATH"
PATCH="$PATCH:\$PYQT6_PATH"
PATCH="$PATCH:\$PYQT6_SIP_PATH"
PATCH="$PATCH\""
if grep -q "$PATCH" "$ACTIVATE_SCRIPT_PATH"; then
echo "venv is already patched."
else
echo "patching $ACTIVATE_SCRIPT_PATH to use pyqt6 from nixos..."
sed -i "\$i$PATCH" $ACTIVATE_SCRIPT_PATH
fi
poetry shell
'';
}

View File

@ -23,6 +23,7 @@ from __future__ import annotations
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from functools import partial
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
@ -190,14 +191,17 @@ def broker_init(
async def spawn_brokerd( async def spawn_brokerd(
brokername: str, brokername: str,
loglevel: str | None = None, loglevel: str | None = None,
**tractor_kwargs, **tractor_kwargs,
) -> bool: ) -> bool:
'''
Spawn a `brokerd.<backendname>` subactor service daemon
using `pikerd`'s service mngr.
'''
from piker.service._util import log # use service mngr log from piker.service._util import log # use service mngr log
log.info(f'Spawning {brokername} broker daemon') log.info(f'Spawning {brokername} broker daemon')
@ -217,27 +221,35 @@ async def spawn_brokerd(
# ask `pikerd` to spawn a new sub-actor and manage it under its # ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery # actor nursery
from piker.service import Services from piker.service import (
get_service_mngr,
ServiceMngr,
)
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}' dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
portal = await Services.actor_n.start_actor( mngr: ServiceMngr = get_service_mngr()
dname, ctx: tractor.Context = await mngr.start_service(
enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'), daemon_name=dname,
debug_mode=Services.debug_mode, ctx_ep=partial(
# signature of target root-task endpoint
daemon_fixture_ep,
# passed to daemon_fixture_ep(**kwargs)
brokername=brokername,
loglevel=loglevel,
),
debug_mode=mngr.debug_mode,
loglevel=loglevel,
enable_modules=(
_data_mods
+
tractor_kwargs.pop('enable_modules')
),
**tractor_kwargs **tractor_kwargs
) )
assert (
# NOTE: the service mngr expects an already spawned actor + its not ctx.cancel_called
# portal ref in order to do non-blocking setup of brokerd and ctx.portal # parent side
# service nursery. and dname in ctx.chan.uid # subactor is named as desired
await Services.start_service_task(
dname,
portal,
# signature of target root-task endpoint
daemon_fixture_ep,
brokername=brokername,
loglevel=loglevel,
) )
return True return True
@ -262,8 +274,7 @@ async def maybe_spawn_brokerd(
from piker.service import maybe_spawn_daemon from piker.service import maybe_spawn_daemon
async with maybe_spawn_daemon( async with maybe_spawn_daemon(
service_name=f'brokerd.{brokername}',
f'brokerd.{brokername}',
service_task_target=spawn_brokerd, service_task_target=spawn_brokerd,
spawn_args={ spawn_args={
'brokername': brokername, 'brokername': brokername,

View File

@ -567,6 +567,7 @@ class Client:
) -> str: ) -> str:
return { return {
'USDTM': 'usdtm_futes', 'USDTM': 'usdtm_futes',
'SPOT': 'spot',
# 'COINM': 'coin_futes', # 'COINM': 'coin_futes',
# ^-TODO-^ bc someone might want it..? # ^-TODO-^ bc someone might want it..?
}[pair.venue] }[pair.venue]

View File

@ -335,7 +335,7 @@ def services(config, tl, ports):
name='service_query', name='service_query',
loglevel=config['loglevel'] if tl else None, loglevel=config['loglevel'] if tl else None,
), ),
tractor.get_arbiter( tractor.get_registry(
host=host, host=host,
port=ports[0] port=ports[0]
) as portal ) as portal

View File

@ -25,6 +25,7 @@ from collections import (
defaultdict, defaultdict,
) )
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from functools import partial
import time import time
from typing import ( from typing import (
Any, Any,
@ -42,7 +43,7 @@ from tractor.trionics import (
maybe_open_nursery, maybe_open_nursery,
) )
import trio import trio
from trio_typing import TaskStatus from trio import TaskStatus
from .ticktools import ( from .ticktools import (
frame_ticks, frame_ticks,
@ -70,6 +71,7 @@ if TYPE_CHECKING:
_default_delay_s: float = 1.0 _default_delay_s: float = 1.0
# TODO: use new `tractor.singleton_acm` API for this!
class Sampler: class Sampler:
''' '''
Global sampling engine registry. Global sampling engine registry.
@ -79,9 +81,9 @@ class Sampler:
This non-instantiated type is meant to be a singleton within This non-instantiated type is meant to be a singleton within
a `samplerd` actor-service spawned once by the user wishing to a `samplerd` actor-service spawned once by the user wishing to
time-step-sample (real-time) quote feeds, see time-step-sample a (real-time) quote feeds, see
``.service.maybe_open_samplerd()`` and the below `.service.maybe_open_samplerd()` and the below
``register_with_sampler()``. `register_with_sampler()`.
''' '''
service_nursery: None | trio.Nursery = None service_nursery: None | trio.Nursery = None
@ -375,7 +377,10 @@ async def register_with_sampler(
assert Sampler.ohlcv_shms assert Sampler.ohlcv_shms
# unblock caller # unblock caller
await ctx.started(set(Sampler.ohlcv_shms.keys())) await ctx.started(
# XXX bc msgpack only allows one array type!
list(Sampler.ohlcv_shms.keys())
)
if open_index_stream: if open_index_stream:
try: try:
@ -419,7 +424,6 @@ async def register_with_sampler(
async def spawn_samplerd( async def spawn_samplerd(
loglevel: str | None = None, loglevel: str | None = None,
**extra_tractor_kwargs **extra_tractor_kwargs
@ -429,7 +433,10 @@ async def spawn_samplerd(
update and increment count write and stream broadcasting. update and increment count write and stream broadcasting.
''' '''
from piker.service import Services from piker.service import (
get_service_mngr,
ServiceMngr,
)
dname = 'samplerd' dname = 'samplerd'
log.info(f'Spawning `{dname}`') log.info(f'Spawning `{dname}`')
@ -437,26 +444,33 @@ async def spawn_samplerd(
# singleton lock creation of ``samplerd`` since we only ever want # singleton lock creation of ``samplerd`` since we only ever want
# one daemon per ``pikerd`` proc tree. # one daemon per ``pikerd`` proc tree.
# TODO: make this built-into the service api? # TODO: make this built-into the service api?
async with Services.locks[dname + '_singleton']: mngr: ServiceMngr = get_service_mngr()
already_started: bool = dname in mngr.service_tasks
if dname not in Services.service_tasks: async with mngr._locks[dname + '_singleton']:
ctx: Context = await mngr.start_service(
portal = await Services.actor_n.start_actor( daemon_name=dname,
dname, ctx_ep=partial(
enable_modules=[
'piker.data._sampling',
],
loglevel=loglevel,
debug_mode=Services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs
)
await Services.start_service_task(
dname,
portal,
register_with_sampler, register_with_sampler,
period_s=1, period_s=1,
sub_for_broadcasts=False, sub_for_broadcasts=False,
),
debug_mode=mngr.debug_mode, # set by pikerd flag
# proxy-through to tractor
enable_modules=[
'piker.data._sampling',
],
loglevel=loglevel,
**extra_tractor_kwargs
)
if not already_started:
assert (
ctx
and
ctx.portal
and
not ctx.cancel_called
) )
return True return True
@ -889,6 +903,7 @@ async def uniform_rate_send(
# to consumers which crash or lose network connection. # to consumers which crash or lose network connection.
# I.e. we **DO NOT** want to crash and propagate up to # I.e. we **DO NOT** want to crash and propagate up to
# ``pikerd`` these kinds of errors! # ``pikerd`` these kinds of errors!
trio.EndOfChannel,
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
ConnectionResetError, ConnectionResetError,

View File

@ -30,7 +30,11 @@ Actor runtime primtives and (distributed) service APIs for,
=> TODO: maybe to (re)move elsewhere? => TODO: maybe to (re)move elsewhere?
''' '''
from ._mngr import Services as Services from ._mngr import (
get_service_mngr as get_service_mngr,
open_service_mngr as open_service_mngr,
ServiceMngr as ServiceMngr,
)
from ._registry import ( from ._registry import (
_tractor_kwargs as _tractor_kwargs, _tractor_kwargs as _tractor_kwargs,
_default_reg_addr as _default_reg_addr, _default_reg_addr as _default_reg_addr,

View File

@ -21,7 +21,6 @@
from __future__ import annotations from __future__ import annotations
import os import os
from typing import ( from typing import (
Optional,
Any, Any,
ClassVar, ClassVar,
) )
@ -30,13 +29,13 @@ from contextlib import (
) )
import tractor import tractor
import trio
from ._util import ( from ._util import (
get_console_log, get_console_log,
) )
from ._mngr import ( from ._mngr import (
Services, open_service_mngr,
ServiceMngr,
) )
from ._registry import ( # noqa from ._registry import ( # noqa
_tractor_kwargs, _tractor_kwargs,
@ -59,7 +58,7 @@ async def open_piker_runtime(
registry_addrs: list[tuple[str, int]] = [], registry_addrs: list[tuple[str, int]] = [],
enable_modules: list[str] = [], enable_modules: list[str] = [],
loglevel: Optional[str] = None, loglevel: str|None = None,
# XXX NOTE XXX: you should pretty much never want debug mode # XXX NOTE XXX: you should pretty much never want debug mode
# for data daemons when running in production. # for data daemons when running in production.
@ -69,7 +68,7 @@ async def open_piker_runtime(
# and spawn the service tree distributed per that. # and spawn the service tree distributed per that.
start_method: str = 'trio', start_method: str = 'trio',
tractor_runtime_overrides: dict | None = None, tractor_runtime_overrides: dict|None = None,
**tractor_kwargs, **tractor_kwargs,
) -> tuple[ ) -> tuple[
@ -119,6 +118,10 @@ async def open_piker_runtime(
# spawn other specialized daemons I think? # spawn other specialized daemons I think?
enable_modules=enable_modules, enable_modules=enable_modules,
# TODO: how to configure this?
# keep it on by default if debug mode is set?
# maybe_enable_greenback=debug_mode,
**tractor_kwargs, **tractor_kwargs,
) as actor, ) as actor,
@ -167,12 +170,13 @@ async def open_pikerd(
**kwargs, **kwargs,
) -> Services: ) -> ServiceMngr:
''' '''
Start a root piker daemon with an indefinite lifetime. Start a root piker daemon actor (aka `pikerd`) with an indefinite
lifetime.
A root actor nursery is created which can be used to create and keep A root actor-nursery is created which can be used to spawn and
alive underling services (see below). supervise underling service sub-actors (see below).
''' '''
# NOTE: for the root daemon we always enable the root # NOTE: for the root daemon we always enable the root
@ -199,8 +203,6 @@ async def open_pikerd(
root_actor, root_actor,
reg_addrs, reg_addrs,
), ),
tractor.open_nursery() as actor_nursery,
trio.open_nursery() as service_nursery,
): ):
for addr in reg_addrs: for addr in reg_addrs:
if addr not in root_actor.accept_addrs: if addr not in root_actor.accept_addrs:
@ -209,25 +211,17 @@ async def open_pikerd(
'Maybe you have another daemon already running?' 'Maybe you have another daemon already running?'
) )
# assign globally for future daemon/task creation mngr: ServiceMngr
Services.actor_n = actor_nursery async with open_service_mngr(
Services.service_n = service_nursery debug_mode=debug_mode,
Services.debug_mode = debug_mode ) as mngr:
yield mngr
try:
yield Services
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in Services.service_tasks:
# await Services.cancel_service('samplerd')
service_nursery.cancel_scope.cancel()
# TODO: do we even need this? # TODO: do we even need this?
# @acm # @acm
# async def maybe_open_runtime( # async def maybe_open_runtime(
# loglevel: Optional[str] = None, # loglevel: str|None = None,
# **kwargs, # **kwargs,
# ) -> None: # ) -> None:
@ -256,7 +250,7 @@ async def maybe_open_pikerd(
loglevel: str | None = None, loglevel: str | None = None,
**kwargs, **kwargs,
) -> tractor._portal.Portal | ClassVar[Services]: ) -> tractor._portal.Portal | ClassVar[ServiceMngr]:
''' '''
If no ``pikerd`` daemon-root-actor can be found start it and If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self yield up (we should probably figure out returning a portal to self

View File

@ -49,7 +49,7 @@ from requests.exceptions import (
ReadTimeout, ReadTimeout,
) )
from ._mngr import Services from ._mngr import ServiceMngr
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
get_console_log, get_console_log,
@ -453,7 +453,7 @@ async def open_ahabd(
@acm @acm
async def start_ahab_service( async def start_ahab_service(
services: Services, services: ServiceMngr,
service_name: str, service_name: str,
# endpoint config passed as **kwargs # endpoint config passed as **kwargs
@ -549,7 +549,8 @@ async def start_ahab_service(
log.warning('Failed to cancel root permsed container') log.warning('Failed to cancel root permsed container')
except ( except (
trio.MultiError, # trio.MultiError,
ExceptionGroup,
) as err: ) as err:
for subexc in err.exceptions: for subexc in err.exceptions:
if isinstance(subexc, PermissionError): if isinstance(subexc, PermissionError):

View File

@ -26,14 +26,17 @@ from typing import (
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from collections import defaultdict
import tractor import tractor
import trio
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
) )
from ._mngr import ( from ._mngr import (
Services, get_service_mngr,
ServiceMngr,
) )
from ._actor_runtime import maybe_open_pikerd from ._actor_runtime import maybe_open_pikerd
from ._registry import find_service from ._registry import find_service
@ -41,15 +44,14 @@ from ._registry import find_service
@acm @acm
async def maybe_spawn_daemon( async def maybe_spawn_daemon(
service_name: str, service_name: str,
service_task_target: Callable, service_task_target: Callable,
spawn_args: dict[str, Any], spawn_args: dict[str, Any],
loglevel: str | None = None, loglevel: str | None = None,
singleton: bool = False, singleton: bool = False,
_locks = defaultdict(trio.Lock),
**pikerd_kwargs, **pikerd_kwargs,
) -> tractor.Portal: ) -> tractor.Portal:
@ -67,7 +69,7 @@ async def maybe_spawn_daemon(
''' '''
# serialize access to this section to avoid # serialize access to this section to avoid
# 2 or more tasks racing to create a daemon # 2 or more tasks racing to create a daemon
lock = Services.locks[service_name] lock = _locks[service_name]
await lock.acquire() await lock.acquire()
async with find_service( async with find_service(
@ -132,7 +134,65 @@ async def maybe_spawn_daemon(
async with tractor.wait_for_actor(service_name) as portal: async with tractor.wait_for_actor(service_name) as portal:
lock.release() lock.release()
yield portal yield portal
await portal.cancel_actor() # --- ---- ---
# XXX NOTE XXX
# --- ---- ---
# DO NOT PUT A `portal.cancel_actor()` here (as was prior)!
#
# Doing so will cause an "out-of-band" ctxc
# (`tractor.ContextCancelled`) to be raised inside the
# `ServiceMngr.open_context_in_task()`'s call to
# `ctx.wait_for_result()` AND the internal self-ctxc
# "graceful capture" WILL NOT CATCH IT!
#
# This can cause certain types of operations to raise
# that ctxc BEFORE THEY `return`, resulting in
# a "false-negative" ctxc being raised when really
# nothing actually failed, other then our semantic
# "failure" to suppress an expected, graceful,
# self-cancel scenario..
#
# bUt wHy duZ It WorK lIKe dis..
# ------------------------------
# from the perspective of the `tractor.Context` this
# cancel request was conducted "out of band" since
# `Context.cancel()` was never called and thus the
# `._cancel_called: bool` was never set. Despite the
# remote `.canceller` being set to `pikerd` (i.e. the
# same `Actor.uid` of the raising service-mngr task) the
# service-task's ctx itself was never marked as having
# requested cancellation and thus still raises the ctxc
# bc it was unaware of any such request.
#
# How to make grokin these cases easier tho?
# ------------------------------------------
# Because `Portal.cancel_actor()` was called it requests
# "full-`Actor`-runtime-cancellation" of it's peer
# process which IS NOT THE SAME as a single inter-actor
# RPC task cancelling its local context with a remote
# peer `Task` in that same peer process.
#
# ?TODO? It might be better if we do one (or all) of the
# following:
#
# -[ ] at least set a special message for the
# `ContextCancelled` when raised locally by the
# unaware ctx task such that we check for the
# `.canceller` being *our `Actor`* and in the case
# where `Context._cancel_called == False` we specially
# note that this is likely an "out-of-band"
# runtime-cancel request triggered by some call to
# `Portal.cancel_actor()`, possibly even reporting the
# exact LOC of that caller by tracking it inside our
# portal-type?
# -[ ] possibly add another field `ContextCancelled` like
# maybe a,
# `.request_type: Literal['os', 'proc', 'actor',
# 'ctx']` type thing which would allow immediately
# being able to tell what kind of cancellation caused
# the unexpected ctxc?
# -[ ] REMOVE THIS COMMENT, once we've settled on how to
# better augment `tractor` to be more explicit on this!
async def spawn_emsd( async def spawn_emsd(
@ -147,21 +207,22 @@ async def spawn_emsd(
""" """
log.info('Spawning emsd') log.info('Spawning emsd')
portal = await Services.actor_n.start_actor( smngr: ServiceMngr = get_service_mngr()
portal = await smngr.actor_n.start_actor(
'emsd', 'emsd',
enable_modules=[ enable_modules=[
'piker.clearing._ems', 'piker.clearing._ems',
'piker.clearing._client', 'piker.clearing._client',
], ],
loglevel=loglevel, loglevel=loglevel,
debug_mode=Services.debug_mode, # set by pikerd flag debug_mode=smngr.debug_mode, # set by pikerd flag
**extra_tractor_kwargs **extra_tractor_kwargs
) )
# non-blocking setup of clearing service # non-blocking setup of clearing service
from ..clearing._ems import _setup_persistent_emsd from ..clearing._ems import _setup_persistent_emsd
await Services.start_service_task( await smngr.start_service_task(
'emsd', 'emsd',
portal, portal,

View File

@ -18,16 +18,29 @@
daemon-service management API. daemon-service management API.
""" """
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
# contextmanager as cm,
)
from collections import defaultdict from collections import defaultdict
from dataclasses import (
dataclass,
field,
)
import functools
import inspect
from typing import ( from typing import (
Callable, Callable,
Any, Any,
) )
import trio import msgspec
from trio_typing import TaskStatus
import tractor import tractor
import trio
from trio import TaskStatus
from tractor import ( from tractor import (
ActorNursery,
current_actor, current_actor,
ContextCancelled, ContextCancelled,
Context, Context,
@ -39,6 +52,130 @@ from ._util import (
) )
# TODO: implement a singleton deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# TODO: singleton factory API instead of a class API
@acm
async def open_service_mngr(
*,
debug_mode: bool = False,
# impl deat which ensures a single global instance
_singleton: list[ServiceMngr|None] = [None],
**init_kwargs,
) -> ServiceMngr:
'''
Open a multi-subactor-as-service-daemon tree supervisor.
The delivered `ServiceMngr` is a singleton instance for each
actor-process and is allocated on first open and never
de-allocated unless explicitly deleted by al call to
`del_service_mngr()`.
'''
# TODO: factor this an allocation into
# a `._mngr.open_service_mngr()` and put in the
# once-n-only-once setup/`.__aenter__()` part!
# -[ ] how to make this only happen on the `mngr == None` case?
# |_ use `.trionics.maybe_open_context()` (for generic
# async-with-style-only-once of the factory impl, though
# what do we do for the allocation case?
# / `.maybe_open_nursery()` (since for this specific case
# it's simpler?) to activate
async with (
tractor.open_nursery() as an,
trio.open_nursery() as tn,
):
# impl specific obvi..
init_kwargs.update({
'actor_n': an,
'service_n': tn,
})
mngr: ServiceMngr|None
if (mngr := _singleton[0]) is None:
log.info('Allocating a new service mngr!')
mngr = _singleton[0] = ServiceMngr(**init_kwargs)
# TODO: put into `.__aenter__()` section of
# eventual `@singleton_acm` API wrapper.
#
# assign globally for future daemon/task creation
mngr.actor_n = an
mngr.service_n = tn
else:
assert (
mngr.actor_n
and
mngr.service_tn
)
log.info(
'Using extant service mngr!\n\n'
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
)
try:
# NOTE: this is a singleton factory impl specific detail
# which should be supported in the condensed
# `@singleton_acm` API?
mngr.debug_mode = debug_mode
yield mngr
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in mngr.service_tasks:
# await mngr.cancel_service('samplerd')
tn.cancel_scope.cancel()
def get_service_mngr() -> ServiceMngr:
'''
Try to get the singleton service-mngr for this actor presuming it
has already been allocated using,
.. code:: python
async with open_<@singleton_acm(func)>() as mngr`
... this block kept open ...
If not yet allocated raise a `ServiceError`.
'''
# https://stackoverflow.com/a/12627202
# https://docs.python.org/3/library/inspect.html#inspect.Signature
maybe_mngr: ServiceMngr|None = inspect.signature(
open_service_mngr
).parameters['_singleton'].default[0]
if maybe_mngr is None:
raise RuntimeError(
'Someone must allocate a `ServiceMngr` using\n\n'
'`async with open_service_mngr()` beforehand!!\n'
)
return maybe_mngr
# TODO: we need remote wrapping and a general soln: # TODO: we need remote wrapping and a general soln:
# - factor this into a ``tractor.highlevel`` extension # pack for the # - factor this into a ``tractor.highlevel`` extension # pack for the
# library. # library.
@ -46,31 +183,46 @@ from ._util import (
# to the pikerd actor for starting services remotely! # to the pikerd actor for starting services remotely!
# - prolly rename this to ActorServicesNursery since it spawns # - prolly rename this to ActorServicesNursery since it spawns
# new actors and supervises them to completion? # new actors and supervises them to completion?
class Services: @dataclass
class ServiceMngr:
# class ServiceMngr(msgspec.Struct):
'''
A multi-subactor-as-service manager.
actor_n: tractor._supervise.ActorNursery Spawn, supervise and monitor service/daemon subactors in a SC
process tree.
'''
actor_n: ActorNursery
service_n: trio.Nursery service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag debug_mode: bool = False # tractor sub-actor debug mode flag
service_tasks: dict[ service_tasks: dict[
str, str,
tuple[ tuple[
trio.CancelScope, trio.CancelScope,
Context,
Portal, Portal,
trio.Event, trio.Event,
] ]
] = {} ] = field(default_factory=dict)
locks = defaultdict(trio.Lock)
# internal per-service task mutexs
_locks = defaultdict(trio.Lock)
@classmethod
async def start_service_task( async def start_service_task(
self, self,
name: str, name: str,
portal: Portal, portal: Portal,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
target: Callable, target: Callable,
allow_overruns: bool = False, allow_overruns: bool = False,
**ctx_kwargs, **ctx_kwargs,
) -> (trio.CancelScope, Context): ) -> (trio.CancelScope, Context, Any):
''' '''
Open a context in a service sub-actor, add to a stack Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` teardown. that gets unwound at ``pikerd`` teardown.
@ -83,6 +235,7 @@ class Services:
task_status: TaskStatus[ task_status: TaskStatus[
tuple[ tuple[
trio.CancelScope, trio.CancelScope,
Context,
trio.Event, trio.Event,
Any, Any,
] ]
@ -90,64 +243,87 @@ class Services:
) -> Any: ) -> Any:
# TODO: use the ctx._scope directly here instead?
# -[ ] actually what semantics do we expect for this
# usage!?
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
try:
async with portal.open_context(
target,
allow_overruns=allow_overruns,
**ctx_kwargs,
async with portal.open_context( ) as (ctx, started):
target,
allow_overruns=allow_overruns,
**ctx_kwargs,
) as (ctx, first): # unblock once the remote context has started
complete = trio.Event()
# unblock once the remote context has started task_status.started((
complete = trio.Event() cs,
task_status.started((cs, complete, first)) ctx,
log.info( complete,
f'`pikerd` service {name} started with value {first}' started,
) ))
try: log.info(
f'`pikerd` service {name} started with value {started}'
)
# wait on any context's return value # wait on any context's return value
# and any final portal result from the # and any final portal result from the
# sub-actor. # sub-actor.
ctx_res: Any = await ctx.result() ctx_res: Any = await ctx.wait_for_result()
# NOTE: blocks indefinitely until cancelled # NOTE: blocks indefinitely until cancelled
# either by error from the target context # either by error from the target context
# function or by being cancelled here by the # function or by being cancelled here by the
# surrounding cancel scope. # surrounding cancel scope.
return (await portal.result(), ctx_res) return (
except ContextCancelled as ctxe: await portal.wait_for_result(),
canceller: tuple[str, str] = ctxe.canceller ctx_res,
our_uid: tuple[str, str] = current_actor().uid )
if (
canceller != portal.channel.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service {name} was remotely cancelled?\n'
f'remote canceller: {canceller}\n'
f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n'
)
else:
raise
except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.chan.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service `{name}` was remotely cancelled by a peer?\n'
# TODO: this would be a good spot to use
# a respawn feature Bo
f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n'
finally: f'cancellee: {portal.chan.uid}\n'
await portal.cancel_actor() f'canceller: {canceller}\n'
complete.set() )
self.service_tasks.pop(name) else:
raise
cs, complete, first = await self.service_n.start(open_context_in_task) finally:
# NOTE: the ctx MUST be cancelled first if we
# don't want the above `ctx.wait_for_result()` to
# raise a self-ctxc. WHY, well since from the ctx's
# perspective the cancel request will have
# arrived out-out-of-band at the `Actor.cancel()`
# level, thus `Context.cancel_called == False`,
# meaning `ctx._is_self_cancelled() == False`.
# with trio.CancelScope(shield=True):
# await ctx.cancel()
await portal.cancel_actor()
complete.set()
self.service_tasks.pop(name)
cs, sub_ctx, complete, started = await self.service_n.start(
open_context_in_task
)
# store the cancel scope and portal for later cancellation or # store the cancel scope and portal for later cancellation or
# retstart if needed. # retstart if needed.
self.service_tasks[name] = (cs, portal, complete) self.service_tasks[name] = (cs, sub_ctx, portal, complete)
return cs, sub_ctx, started
return cs, first
@classmethod
async def cancel_service( async def cancel_service(
self, self,
name: str, name: str,
@ -158,8 +334,80 @@ class Services:
''' '''
log.info(f'Cancelling `pikerd` service {name}') log.info(f'Cancelling `pikerd` service {name}')
cs, portal, complete = self.service_tasks[name] cs, sub_ctx, portal, complete = self.service_tasks[name]
cs.cancel()
# cs.cancel()
await sub_ctx.cancel()
await complete.wait() await complete.wait()
assert name not in self.service_tasks, \
f'Serice task for {name} not terminated?' if name in self.service_tasks:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Serice task for {name} not terminated?'
)
# assert name not in self.service_tasks, \
# f'Serice task for {name} not terminated?'
async def start_service(
self,
daemon_name: str,
ctx_ep: Callable, # kwargs must `partial`-ed in!
debug_mode: bool = False,
**tractor_actor_kwargs,
) -> Context:
'''
Start a "service" task in a new sub-actor (daemon) and manage it's lifetime
indefinitely.
Services can be cancelled/shutdown using `.cancel_service()`.
'''
entry: tuple|None = self.service_tasks.get(daemon_name)
if entry:
(cs, sub_ctx, portal, complete) = entry
return sub_ctx
if daemon_name not in self.service_tasks:
portal = await self.actor_n.start_actor(
daemon_name,
debug_mode=( # maybe set globally during allocate
debug_mode
or
self.debug_mode
),
**tractor_actor_kwargs,
)
ctx_kwargs: dict[str, Any] = {}
if isinstance(ctx_ep, functools.partial):
ctx_kwargs: dict[str, Any] = ctx_ep.keywords
ctx_ep: Callable = ctx_ep.func
(cs, sub_ctx, started) = await self.start_service_task(
daemon_name,
portal,
ctx_ep,
**ctx_kwargs,
)
return sub_ctx
# TODO:
# -[ ] factor all the common shit from `.data._sampling`
# and `.brokers._daemon` into here / `ServiceMngr`
# in terms of allocating the `Portal` as part of the
# "service-in-subactor" starting!
# -[ ] move to `tractor.hilevel._service`, import and use here!
# NOTE: purposely leaks the ref to the mod-scope Bo
# import tractor
# from tractor.hilevel import (
# open_service_mngr,
# ServiceMngr,
# )
# mngr: ServiceMngr|None = None
# with tractor.hilevel.open_service_mngr() as mngr:
# Services = proxy(mngr)

View File

@ -21,11 +21,13 @@ from typing import (
TYPE_CHECKING, TYPE_CHECKING,
) )
# TODO: oof, needs to be changed to `httpx`!
import asks import asks
if TYPE_CHECKING: if TYPE_CHECKING:
import docker import docker
from ._ahab import DockerContainer from ._ahab import DockerContainer
from . import ServiceMngr
from ._util import log # sub-sys logger from ._util import log # sub-sys logger
from ._util import ( from ._util import (
@ -127,7 +129,7 @@ def start_elasticsearch(
@acm @acm
async def start_ahab_daemon( async def start_ahab_daemon(
service_mngr: Services, service_mngr: ServiceMngr,
user_config: dict | None = None, user_config: dict | None = None,
loglevel: str | None = None, loglevel: str | None = None,

View File

@ -53,7 +53,7 @@ import pendulum
# import purerpc # import purerpc
from ..data.feed import maybe_open_feed from ..data.feed import maybe_open_feed
from . import Services from . import ServiceMngr
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
get_console_log, get_console_log,
@ -233,7 +233,7 @@ def start_marketstore(
@acm @acm
async def start_ahab_daemon( async def start_ahab_daemon(
service_mngr: Services, service_mngr: ServiceMngr,
user_config: dict | None = None, user_config: dict | None = None,
loglevel: str | None = None, loglevel: str | None = None,

View File

@ -458,13 +458,15 @@ async def start_backfill(
'bf_until <- last_start_dt:\n' 'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n' f'{backfill_until_dt} <- {last_start_dt}\n'
) )
# UGH: what's a better way?
# ugh, what's a better way? # TODO: backends are responsible for being correct on
# TODO: fwiw, we probably want a way to signal a throttle # this right!?
# condition (eg. with ib) so that we can halt the # -[ ] in the `ib` case we could maybe offer some way
# request loop until the condition is resolved? # to halt the request loop until the condition is
if timeframe > 1: # resolved or should the backend be entirely in
await tractor.pause() # charge of solving such faults? yes, right?
# if timeframe > 1:
# await tractor.pause()
return return
assert ( assert (
@ -578,6 +580,7 @@ async def start_backfill(
'crypto', 'crypto',
'crypto_currency', 'crypto_currency',
'fiat', # a "forex pair" 'fiat', # a "forex pair"
'perpetual_future', # stupid "perps" from cex land
}: }:
# for now, our table key schema is not including # for now, our table key schema is not including
# the dst[/src] source asset token. # the dst[/src] source asset token.

1371
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -50,10 +50,8 @@ attrs = "^23.1.0"
bidict = "^0.22.1" bidict = "^0.22.1"
colorama = "^0.4.6" colorama = "^0.4.6"
colorlog = "^6.7.0" colorlog = "^6.7.0"
cython = "^3.0.0"
greenback = "^1.1.1"
ib-insync = "^0.9.86" ib-insync = "^0.9.86"
msgspec = "^0.18.0" msgspec = "^0.18.6"
numba = "^0.59.0" numba = "^0.59.0"
numpy = "^1.25" numpy = "^1.25"
polars = "^0.18.13" polars = "^0.18.13"
@ -74,8 +72,8 @@ httpx = "^0.27.0"
[tool.poetry.dependencies.tractor] [tool.poetry.dependencies.tractor]
develop = true develop = true
git = 'https://github.com/goodboy/tractor.git' git = 'https://pikers.dev/goodboy/tractor.git'
branch = 'asyncio_debugger_support' branch = 'aio_abandons'
# path = "../tractor" # path = "../tractor"
[tool.poetry.dependencies.asyncvnc] [tool.poetry.dependencies.asyncvnc]
@ -109,6 +107,8 @@ pytest = "^6.0.0"
elasticsearch = "^8.9.0" elasticsearch = "^8.9.0"
xonsh = "^0.14.2" xonsh = "^0.14.2"
prompt-toolkit = "3.0.40" prompt-toolkit = "3.0.40"
cython = "^3.0.0"
greenback = "^1.1.1"
# console ehancements and eventually remote debugging # console ehancements and eventually remote debugging
# extras/helpers. # extras/helpers.

View File

@ -10,7 +10,7 @@ from piker import (
config, config,
) )
from piker.service import ( from piker.service import (
Services, get_service_mngr,
) )
from piker.log import get_console_log from piker.log import get_console_log
@ -129,7 +129,7 @@ async def _open_test_pikerd(
) as service_manager, ) as service_manager,
): ):
# this proc/actor is the pikerd # this proc/actor is the pikerd
assert service_manager is Services assert service_manager is get_service_mngr()
async with tractor.wait_for_actor( async with tractor.wait_for_actor(
'pikerd', 'pikerd',

View File

@ -26,7 +26,7 @@ import pytest
import tractor import tractor
from uuid import uuid4 from uuid import uuid4
from piker.service import Services from piker.service import ServiceMngr
from piker.log import get_logger from piker.log import get_logger
from piker.clearing._messages import ( from piker.clearing._messages import (
Order, Order,
@ -158,7 +158,7 @@ def load_and_check_pos(
def test_ems_err_on_bad_broker( def test_ems_err_on_bad_broker(
open_test_pikerd: Services, open_test_pikerd: ServiceMngr,
loglevel: str, loglevel: str,
): ):
async def load_bad_fqme(): async def load_bad_fqme():

View File

@ -15,7 +15,7 @@ import tractor
from piker.service import ( from piker.service import (
find_service, find_service,
Services, ServiceMngr,
) )
from piker.data import ( from piker.data import (
open_feed, open_feed,
@ -44,7 +44,7 @@ def test_runtime_boot(
async def main(): async def main():
port = 6666 port = 6666
daemon_addr = ('127.0.0.1', port) daemon_addr = ('127.0.0.1', port)
services: Services services: ServiceMngr
async with ( async with (
open_test_pikerd( open_test_pikerd(