Compare commits

..

45 Commits

Author SHA1 Message Date
Nelson Torres c8bb8d7893 config refactor
only one get_config method for api class and cryptofeed feed handler
2025-01-29 01:03:27 -03:00
Nelson Torres b4141d4208 move constants to venue 2025-01-29 01:03:27 -03:00
Nelson Torres 468b33cf60 refactor redundant code 2025-01-29 01:03:27 -03:00
Nelson Torres 2c2fa990ed name formatting fixes 2025-01-29 01:03:27 -03:00
Nelson Torres 7bc98f5055 get_mkt_info cleanup 2025-01-29 01:03:27 -03:00
Nelson Torres 8b4bcf134c cache_symbols refactor 2025-01-29 01:03:27 -03:00
Nelson Torres 3388bc89cc json_rpc_auth_wrapper 2025-01-29 01:03:27 -03:00
Nelson Torres b84880cce5 move object classes to venue 2025-01-29 01:03:27 -03:00
Nelson Torres 7f5e655720 Added options symbols to get_assets 2025-01-29 01:03:27 -03:00
Tyler Goodlet b8ad77e861 .clearing._ems: Don't require `first_quote['last']`
Instead just check for the field (which i'm not huge on the key-name for
anyway) and if not found get the "last price" from the real-time shm
buffer's latest 'close' sample.

Unrelatedly, use a `subs.copy()` in the `Router.client_broadcast()` loop
such that if a `client_stream` is popped on connection failure, we don't
RTE for the "size changed on iteration".
2025-01-29 01:03:27 -03:00
Tyler Goodlet dbafb64bc3 `kucoin`: repair live quotes streaming..
This must have broke at some point during the new `MktPair` and thus
`.fqme: str` updates; mas-o-menos the symbol key in the quote-msg-`dict`
was NOT set to the `MktPair.bs_fqme: str` value and thus wasn't being
processed by the downstream sampling and feed subsys.

So fix that as well as a few other refinements,
- set the `topic: mkt.bs_fqme` in quote msgs obvi.
- drop the "wait for first clearing vlm" quote poll loop; going to fix
  the sampler to handle a `first_quote` without a `'last'` key.
- add some typing around calls to `get_mkt_info()`.
- rename `stream_messages()` -> `iter_normed_quotes()`.
2025-01-29 01:03:27 -03:00
Nelson Torres 00fb5ceddd Deleted settlePlan field from binance FutesPair. 2025-01-29 01:03:27 -03:00
Nelson Torres e8d26f8b30 Added missing fields for kucoin.
feeCategory, makerFeeCoefficient, takerFeeCoefficient and st.
2025-01-29 01:03:27 -03:00
Nelson Torres fb76d1c9ed get_assets now uses public endpoint
It's better if the data is available through a public endpoint.
2025-01-29 01:03:27 -03:00
Nelson Torres 9688f4899a now using exch_info in search_symbols 2025-01-29 01:03:27 -03:00
Nelson Torres 7298374fe4 Fix bs_fqme using venue and expiry 2025-01-29 01:03:27 -03:00
Nelson Torres a7d8b5e1f4 Added expiry property for OptionPair 2025-01-29 01:03:27 -03:00
Nelson Torres 65e4bd80b2 No longer needed 2025-01-29 01:03:27 -03:00
Nelson Torres 660c580786 bs_mktid instead bs_fqme for deribits options 2025-01-29 01:03:27 -03:00
Nelson Torres a2651fd1de Fixed pair instrument name in search_symbols endpoint.
Fixed instrument in bars endpoint, for options in deribits bs_mktid instead bs_fqme.
Fixed the id is in msg.
2025-01-29 01:03:27 -03:00
Tyler Goodlet 12d557f12b data._web_bs: try to raise jsonrpc errors in parent task 2025-01-29 01:03:27 -03:00
Nelson Torres 8365b6d373 Add necessary classes in init file for deribit 2025-01-29 01:03:27 -03:00
Nelson Torres badb7342f7 Minor refactor in open_symbol_search 2025-01-29 01:03:27 -03:00
Nelson Torres b1bac1dc9f stream_quotes now using FeedInit 2025-01-29 01:03:27 -03:00
Nelson Torres be660f0275 symbol_info refactor 2025-01-29 01:03:27 -03:00
Nelson Torres 41e69af090 search_symbols output type fix 2025-01-29 01:03:27 -03:00
Nelson Torres 01e8fe18dd add get_mkt_pairs method 2025-01-29 01:03:27 -03:00
Nelson Torres b104e603db get_assets refactor 2025-01-29 01:03:27 -03:00
Nelson Torres 67424562ba formatting 2025-01-29 01:03:27 -03:00
Nelson Torres 0a92f54b59 created exch_info in api class 2025-01-29 01:03:27 -03:00
Nelson Torres bf1e2da1a7 modify self_pairs type to ChainMap 2025-01-29 01:03:27 -03:00
Nelson Torres d11e1d1ee4 Necessary imports 2025-01-29 01:03:27 -03:00
Nelson Torres cf2b507453 add get_market_info 2025-01-29 01:03:27 -03:00
Nelson Torres 61bde5a58d Necessary imports 2025-01-29 01:03:27 -03:00
Nelson Torres 43ec2e1897 minor fixes in venues 2025-01-29 01:03:27 -03:00
Nelson Torres d7669ba18b add class Pair in venues, PAIRTYPES for future uses 2025-01-29 01:03:27 -03:00
Nelson Torres 3821be082b fix syms for venues.
little refactor in get_config, and created get_fh_config for cryptofeed.
2025-01-29 01:03:27 -03:00
Nelson Torres 25d60d6bdd venues for deribit 2025-01-29 01:03:27 -03:00
Nelson Torres a9178b211d Added cryptofeed and pyarrow necessary for the feed, enable deribit
in the brokers init file, at this point the feed is working, to check
the tables use vd tool.
2025-01-29 01:03:27 -03:00
Nelson Torres ec2203168a `default.nix` is created with required nixos deps "wiring"
`pyproject.toml` update to use pikers.dev/goodboy/tractor.git, branch: `aio_abandons`.
2025-01-29 00:34:25 -03:00
Nelson Torres 69b1bb57ea Updated tractor method name. 2025-01-29 00:17:36 -03:00
Tyler Goodlet 1f2755ee36 Drop `.cancel_actor()` from `maybe_spawn_daemon()`
Since `tractor`'s new and improved inter-actor cancellation semantics
are much more pedantic, AND bc we use the `ServiceMngr` for spawning
service actors on-demand, the caller of `maybe_spawn_daemon()` should
NEVER conduct a so called "out of band" `Actor`-runtime cancel request
since this is precisely the job of our `ServiceMngr` XD

Add a super in depth note explaining the underlying issue and adding
a todo list of how we should prolly augment `tractor` to make such cases
easier to grok and fix in the future!
2025-01-29 00:17:36 -03:00
Tyler Goodlet ff171af168 Prep service mngr for move to `tractor.hilevel`
Given it's a fairly simple yet useful abstraction, it makes sense to
offer this sub-sys alongside the core `tractor` runtime lib.

Without going into extreme detail on the impl changes (it'll come in
the commit that moves to the other repo) here is the high level summary:
------ - ------
- rename `Services` -> `ServiceMngr` and use an factory `@acm`
  to guarantee a single-instance-per-actor using a niche approach for a
  singleton object using a default keyword-arg B)
  - the mod level `open_service_mngr()` and `get_service_mngr()` are the
    new allocation/access API.
- add a `ServiceMngr.start_service()` method which does the work of both
  spawning a new subactor (for the daemon) and uses its portal to start
  the mngr side supervision task.
- open actor/task nurseries inside the `@acm` allocator.

Adjust other dependent subsystems to match:
------ - ------
- use `open_service_mngr()` when first allocated in `open_pikerd()`.
- use `get_service_mngr()` instead of importing the class ref inside
  `.service.maybe_spawn_daemon()`, `.brokers._daemon.spawn_brokerd()`
  and `.data._sampling.spawn_samplerd()` using a `partial` to pack in
  the endpoint ctx kwargs (unpacked inside `.start_service()` XD).
2025-01-29 00:17:36 -03:00
Tyler Goodlet aa4a89451a Lel, forgot to add a `SPOT` venue for `binance`.. 2025-01-29 00:17:36 -03:00
Tyler Goodlet a461fcc3e3 Mask no-data pause-point, add perps to sig.
Was orig for debugging an issue with `kucoin` i think but definitely
shouldn't be left in XD

Also add `'perpetual_future'` to `start_backfill()` input literal set.
2025-01-29 00:17:36 -03:00
17 changed files with 578 additions and 156 deletions

82
default.nix 100644
View File

@ -0,0 +1,82 @@
with (import <nixpkgs> {});
with python312Packages;
let
glibStorePath = lib.getLib glib;
qtpyStorePath = lib.getLib qtpy;
pyqt6StorePath = lib.getLib pyqt6;
pyqt6SipStorePath = lib.getLib pyqt6-sip;
qt6baseStorePath = lib.getLib qt6.qtbase;
rapidfuzzStorePath = lib.getLib rapidfuzz;
qdarkstyleStorePath = lib.getLib qdarkstyle;
in
stdenv.mkDerivation {
name = "piker-qt6-poetry-shell";
buildInputs = [
# System requirements.
glib
qt6.qtbase
libgcc.lib
# Python requirements.
python312Full
poetry-core
qdarkstyle
rapidfuzz
pyqt6
qtpy
];
src = null;
shellHook = ''
set -e
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${libgcc.lib}/lib:${glibStorePath}/lib
# Set the Qt plugin path
# export QT_DEBUG_PLUGINS=1
QTBASE_PATH="${qt6baseStorePath}"
echo "qtbase path: $QTBASE_PATH"
echo ""
export QT_PLUGIN_PATH="$QTBASE_PATH/lib/qt-6/plugins"
export QT_QPA_PLATFORM_PLUGIN_PATH="$QT_PLUGIN_PATH/platforms"
echo "qt plugin path: $QT_PLUGIN_PATH"
echo ""
# Maybe create venv & install deps
poetry install --with uis
# Use pyqt6 from System, patch activate script
ACTIVATE_SCRIPT_PATH="$(poetry env info --path)/bin/activate"
export RPDFUZZ_PATH="${rapidfuzzStorePath}/lib/python3.12/site-packages"
export QDRKSTYLE_PATH="${qdarkstyleStorePath}/lib/python3.12/site-packages"
export QTPY_PATH="${qtpyStorePath}/lib/python3.12/site-packages"
export PYQT6_PATH="${pyqt6StorePath}/lib/python3.12/site-packages"
export PYQT6_SIP_PATH="${pyqt6SipStorePath}/lib/python3.12/site-packages"
echo "rapidfuzz at: $RPDFUZZ_PATH"
echo "qdarkstyle at: $QDRKSTYLE_PATH"
echo "qtpy at: $QTPY_PATH"
echo "pyqt6 at: $PYQT6_PATH"
echo "pyqt6-sip at: $PYQT6_SIP_PATH"
echo ""
PATCH="export PYTHONPATH=\""
PATCH="$PATCH\$RPDFUZZ_PATH"
PATCH="$PATCH:\$QDRKSTYLE_PATH"
PATCH="$PATCH:\$QTPY_PATH"
PATCH="$PATCH:\$PYQT6_PATH"
PATCH="$PATCH:\$PYQT6_SIP_PATH"
PATCH="$PATCH\""
if grep -q "$PATCH" "$ACTIVATE_SCRIPT_PATH"; then
echo "venv is already patched."
else
echo "patching $ACTIVATE_SCRIPT_PATH to use pyqt6 from nixos..."
sed -i "\$i$PATCH" $ACTIVATE_SCRIPT_PATH
fi
poetry shell
'';
}

View File

@ -23,6 +23,7 @@ from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
)
from functools import partial
from types import ModuleType
from typing import (
TYPE_CHECKING,
@ -190,14 +191,17 @@ def broker_init(
async def spawn_brokerd(
brokername: str,
loglevel: str | None = None,
**tractor_kwargs,
) -> bool:
'''
Spawn a `brokerd.<backendname>` subactor service daemon
using `pikerd`'s service mngr.
'''
from piker.service._util import log # use service mngr log
log.info(f'Spawning {brokername} broker daemon')
@ -217,27 +221,35 @@ async def spawn_brokerd(
# ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery
from piker.service import Services
from piker.service import (
get_service_mngr,
ServiceMngr,
)
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
portal = await Services.actor_n.start_actor(
dname,
enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'),
debug_mode=Services.debug_mode,
mngr: ServiceMngr = get_service_mngr()
ctx: tractor.Context = await mngr.start_service(
daemon_name=dname,
ctx_ep=partial(
# signature of target root-task endpoint
daemon_fixture_ep,
# passed to daemon_fixture_ep(**kwargs)
brokername=brokername,
loglevel=loglevel,
),
debug_mode=mngr.debug_mode,
loglevel=loglevel,
enable_modules=(
_data_mods
+
tractor_kwargs.pop('enable_modules')
),
**tractor_kwargs
)
# NOTE: the service mngr expects an already spawned actor + its
# portal ref in order to do non-blocking setup of brokerd
# service nursery.
await Services.start_service_task(
dname,
portal,
# signature of target root-task endpoint
daemon_fixture_ep,
brokername=brokername,
loglevel=loglevel,
assert (
not ctx.cancel_called
and ctx.portal # parent side
and dname in ctx.chan.uid # subactor is named as desired
)
return True
@ -262,8 +274,7 @@ async def maybe_spawn_brokerd(
from piker.service import maybe_spawn_daemon
async with maybe_spawn_daemon(
f'brokerd.{brokername}',
service_name=f'brokerd.{brokername}',
service_task_target=spawn_brokerd,
spawn_args={
'brokername': brokername,

View File

@ -567,6 +567,7 @@ class Client:
) -> str:
return {
'USDTM': 'usdtm_futes',
'SPOT': 'spot',
# 'COINM': 'coin_futes',
# ^-TODO-^ bc someone might want it..?
}[pair.venue]

View File

@ -335,7 +335,7 @@ def services(config, tl, ports):
name='service_query',
loglevel=config['loglevel'] if tl else None,
),
tractor.get_arbiter(
tractor.get_registry(
host=host,
port=ports[0]
) as portal

View File

@ -25,6 +25,7 @@ from collections import (
defaultdict,
)
from contextlib import asynccontextmanager as acm
from functools import partial
import time
from typing import (
Any,
@ -42,7 +43,7 @@ from tractor.trionics import (
maybe_open_nursery,
)
import trio
from trio_typing import TaskStatus
from trio import TaskStatus
from .ticktools import (
frame_ticks,
@ -70,6 +71,7 @@ if TYPE_CHECKING:
_default_delay_s: float = 1.0
# TODO: use new `tractor.singleton_acm` API for this!
class Sampler:
'''
Global sampling engine registry.
@ -79,9 +81,9 @@ class Sampler:
This non-instantiated type is meant to be a singleton within
a `samplerd` actor-service spawned once by the user wishing to
time-step-sample (real-time) quote feeds, see
``.service.maybe_open_samplerd()`` and the below
``register_with_sampler()``.
time-step-sample a (real-time) quote feeds, see
`.service.maybe_open_samplerd()` and the below
`register_with_sampler()`.
'''
service_nursery: None | trio.Nursery = None
@ -375,7 +377,10 @@ async def register_with_sampler(
assert Sampler.ohlcv_shms
# unblock caller
await ctx.started(set(Sampler.ohlcv_shms.keys()))
await ctx.started(
# XXX bc msgpack only allows one array type!
list(Sampler.ohlcv_shms.keys())
)
if open_index_stream:
try:
@ -419,7 +424,6 @@ async def register_with_sampler(
async def spawn_samplerd(
loglevel: str | None = None,
**extra_tractor_kwargs
@ -429,7 +433,10 @@ async def spawn_samplerd(
update and increment count write and stream broadcasting.
'''
from piker.service import Services
from piker.service import (
get_service_mngr,
ServiceMngr,
)
dname = 'samplerd'
log.info(f'Spawning `{dname}`')
@ -437,26 +444,33 @@ async def spawn_samplerd(
# singleton lock creation of ``samplerd`` since we only ever want
# one daemon per ``pikerd`` proc tree.
# TODO: make this built-into the service api?
async with Services.locks[dname + '_singleton']:
mngr: ServiceMngr = get_service_mngr()
already_started: bool = dname in mngr.service_tasks
if dname not in Services.service_tasks:
portal = await Services.actor_n.start_actor(
dname,
enable_modules=[
'piker.data._sampling',
],
loglevel=loglevel,
debug_mode=Services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs
)
await Services.start_service_task(
dname,
portal,
async with mngr._locks[dname + '_singleton']:
ctx: Context = await mngr.start_service(
daemon_name=dname,
ctx_ep=partial(
register_with_sampler,
period_s=1,
sub_for_broadcasts=False,
),
debug_mode=mngr.debug_mode, # set by pikerd flag
# proxy-through to tractor
enable_modules=[
'piker.data._sampling',
],
loglevel=loglevel,
**extra_tractor_kwargs
)
if not already_started:
assert (
ctx
and
ctx.portal
and
not ctx.cancel_called
)
return True
@ -889,6 +903,7 @@ async def uniform_rate_send(
# to consumers which crash or lose network connection.
# I.e. we **DO NOT** want to crash and propagate up to
# ``pikerd`` these kinds of errors!
trio.EndOfChannel,
trio.ClosedResourceError,
trio.BrokenResourceError,
ConnectionResetError,

View File

@ -30,7 +30,11 @@ Actor runtime primtives and (distributed) service APIs for,
=> TODO: maybe to (re)move elsewhere?
'''
from ._mngr import Services as Services
from ._mngr import (
get_service_mngr as get_service_mngr,
open_service_mngr as open_service_mngr,
ServiceMngr as ServiceMngr,
)
from ._registry import (
_tractor_kwargs as _tractor_kwargs,
_default_reg_addr as _default_reg_addr,

View File

@ -21,7 +21,6 @@
from __future__ import annotations
import os
from typing import (
Optional,
Any,
ClassVar,
)
@ -30,13 +29,13 @@ from contextlib import (
)
import tractor
import trio
from ._util import (
get_console_log,
)
from ._mngr import (
Services,
open_service_mngr,
ServiceMngr,
)
from ._registry import ( # noqa
_tractor_kwargs,
@ -59,7 +58,7 @@ async def open_piker_runtime(
registry_addrs: list[tuple[str, int]] = [],
enable_modules: list[str] = [],
loglevel: Optional[str] = None,
loglevel: str|None = None,
# XXX NOTE XXX: you should pretty much never want debug mode
# for data daemons when running in production.
@ -69,7 +68,7 @@ async def open_piker_runtime(
# and spawn the service tree distributed per that.
start_method: str = 'trio',
tractor_runtime_overrides: dict | None = None,
tractor_runtime_overrides: dict|None = None,
**tractor_kwargs,
) -> tuple[
@ -119,6 +118,10 @@ async def open_piker_runtime(
# spawn other specialized daemons I think?
enable_modules=enable_modules,
# TODO: how to configure this?
# keep it on by default if debug mode is set?
# maybe_enable_greenback=debug_mode,
**tractor_kwargs,
) as actor,
@ -167,12 +170,13 @@ async def open_pikerd(
**kwargs,
) -> Services:
) -> ServiceMngr:
'''
Start a root piker daemon with an indefinite lifetime.
Start a root piker daemon actor (aka `pikerd`) with an indefinite
lifetime.
A root actor nursery is created which can be used to create and keep
alive underling services (see below).
A root actor-nursery is created which can be used to spawn and
supervise underling service sub-actors (see below).
'''
# NOTE: for the root daemon we always enable the root
@ -199,8 +203,6 @@ async def open_pikerd(
root_actor,
reg_addrs,
),
tractor.open_nursery() as actor_nursery,
trio.open_nursery() as service_nursery,
):
for addr in reg_addrs:
if addr not in root_actor.accept_addrs:
@ -209,25 +211,17 @@ async def open_pikerd(
'Maybe you have another daemon already running?'
)
# assign globally for future daemon/task creation
Services.actor_n = actor_nursery
Services.service_n = service_nursery
Services.debug_mode = debug_mode
try:
yield Services
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in Services.service_tasks:
# await Services.cancel_service('samplerd')
service_nursery.cancel_scope.cancel()
mngr: ServiceMngr
async with open_service_mngr(
debug_mode=debug_mode,
) as mngr:
yield mngr
# TODO: do we even need this?
# @acm
# async def maybe_open_runtime(
# loglevel: Optional[str] = None,
# loglevel: str|None = None,
# **kwargs,
# ) -> None:
@ -256,7 +250,7 @@ async def maybe_open_pikerd(
loglevel: str | None = None,
**kwargs,
) -> tractor._portal.Portal | ClassVar[Services]:
) -> tractor._portal.Portal | ClassVar[ServiceMngr]:
'''
If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self

View File

@ -49,7 +49,7 @@ from requests.exceptions import (
ReadTimeout,
)
from ._mngr import Services
from ._mngr import ServiceMngr
from ._util import (
log, # sub-sys logger
get_console_log,
@ -453,7 +453,7 @@ async def open_ahabd(
@acm
async def start_ahab_service(
services: Services,
services: ServiceMngr,
service_name: str,
# endpoint config passed as **kwargs
@ -549,7 +549,8 @@ async def start_ahab_service(
log.warning('Failed to cancel root permsed container')
except (
trio.MultiError,
# trio.MultiError,
ExceptionGroup,
) as err:
for subexc in err.exceptions:
if isinstance(subexc, PermissionError):

View File

@ -26,14 +26,17 @@ from typing import (
from contextlib import (
asynccontextmanager as acm,
)
from collections import defaultdict
import tractor
import trio
from ._util import (
log, # sub-sys logger
)
from ._mngr import (
Services,
get_service_mngr,
ServiceMngr,
)
from ._actor_runtime import maybe_open_pikerd
from ._registry import find_service
@ -41,15 +44,14 @@ from ._registry import find_service
@acm
async def maybe_spawn_daemon(
service_name: str,
service_task_target: Callable,
spawn_args: dict[str, Any],
loglevel: str | None = None,
singleton: bool = False,
_locks = defaultdict(trio.Lock),
**pikerd_kwargs,
) -> tractor.Portal:
@ -67,7 +69,7 @@ async def maybe_spawn_daemon(
'''
# serialize access to this section to avoid
# 2 or more tasks racing to create a daemon
lock = Services.locks[service_name]
lock = _locks[service_name]
await lock.acquire()
async with find_service(
@ -132,7 +134,65 @@ async def maybe_spawn_daemon(
async with tractor.wait_for_actor(service_name) as portal:
lock.release()
yield portal
await portal.cancel_actor()
# --- ---- ---
# XXX NOTE XXX
# --- ---- ---
# DO NOT PUT A `portal.cancel_actor()` here (as was prior)!
#
# Doing so will cause an "out-of-band" ctxc
# (`tractor.ContextCancelled`) to be raised inside the
# `ServiceMngr.open_context_in_task()`'s call to
# `ctx.wait_for_result()` AND the internal self-ctxc
# "graceful capture" WILL NOT CATCH IT!
#
# This can cause certain types of operations to raise
# that ctxc BEFORE THEY `return`, resulting in
# a "false-negative" ctxc being raised when really
# nothing actually failed, other then our semantic
# "failure" to suppress an expected, graceful,
# self-cancel scenario..
#
# bUt wHy duZ It WorK lIKe dis..
# ------------------------------
# from the perspective of the `tractor.Context` this
# cancel request was conducted "out of band" since
# `Context.cancel()` was never called and thus the
# `._cancel_called: bool` was never set. Despite the
# remote `.canceller` being set to `pikerd` (i.e. the
# same `Actor.uid` of the raising service-mngr task) the
# service-task's ctx itself was never marked as having
# requested cancellation and thus still raises the ctxc
# bc it was unaware of any such request.
#
# How to make grokin these cases easier tho?
# ------------------------------------------
# Because `Portal.cancel_actor()` was called it requests
# "full-`Actor`-runtime-cancellation" of it's peer
# process which IS NOT THE SAME as a single inter-actor
# RPC task cancelling its local context with a remote
# peer `Task` in that same peer process.
#
# ?TODO? It might be better if we do one (or all) of the
# following:
#
# -[ ] at least set a special message for the
# `ContextCancelled` when raised locally by the
# unaware ctx task such that we check for the
# `.canceller` being *our `Actor`* and in the case
# where `Context._cancel_called == False` we specially
# note that this is likely an "out-of-band"
# runtime-cancel request triggered by some call to
# `Portal.cancel_actor()`, possibly even reporting the
# exact LOC of that caller by tracking it inside our
# portal-type?
# -[ ] possibly add another field `ContextCancelled` like
# maybe a,
# `.request_type: Literal['os', 'proc', 'actor',
# 'ctx']` type thing which would allow immediately
# being able to tell what kind of cancellation caused
# the unexpected ctxc?
# -[ ] REMOVE THIS COMMENT, once we've settled on how to
# better augment `tractor` to be more explicit on this!
async def spawn_emsd(
@ -147,21 +207,22 @@ async def spawn_emsd(
"""
log.info('Spawning emsd')
portal = await Services.actor_n.start_actor(
smngr: ServiceMngr = get_service_mngr()
portal = await smngr.actor_n.start_actor(
'emsd',
enable_modules=[
'piker.clearing._ems',
'piker.clearing._client',
],
loglevel=loglevel,
debug_mode=Services.debug_mode, # set by pikerd flag
debug_mode=smngr.debug_mode, # set by pikerd flag
**extra_tractor_kwargs
)
# non-blocking setup of clearing service
from ..clearing._ems import _setup_persistent_emsd
await Services.start_service_task(
await smngr.start_service_task(
'emsd',
portal,

View File

@ -18,16 +18,29 @@
daemon-service management API.
"""
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
# contextmanager as cm,
)
from collections import defaultdict
from dataclasses import (
dataclass,
field,
)
import functools
import inspect
from typing import (
Callable,
Any,
)
import trio
from trio_typing import TaskStatus
import msgspec
import tractor
import trio
from trio import TaskStatus
from tractor import (
ActorNursery,
current_actor,
ContextCancelled,
Context,
@ -39,6 +52,130 @@ from ._util import (
)
# TODO: implement a singleton deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# TODO: singleton factory API instead of a class API
@acm
async def open_service_mngr(
*,
debug_mode: bool = False,
# impl deat which ensures a single global instance
_singleton: list[ServiceMngr|None] = [None],
**init_kwargs,
) -> ServiceMngr:
'''
Open a multi-subactor-as-service-daemon tree supervisor.
The delivered `ServiceMngr` is a singleton instance for each
actor-process and is allocated on first open and never
de-allocated unless explicitly deleted by al call to
`del_service_mngr()`.
'''
# TODO: factor this an allocation into
# a `._mngr.open_service_mngr()` and put in the
# once-n-only-once setup/`.__aenter__()` part!
# -[ ] how to make this only happen on the `mngr == None` case?
# |_ use `.trionics.maybe_open_context()` (for generic
# async-with-style-only-once of the factory impl, though
# what do we do for the allocation case?
# / `.maybe_open_nursery()` (since for this specific case
# it's simpler?) to activate
async with (
tractor.open_nursery() as an,
trio.open_nursery() as tn,
):
# impl specific obvi..
init_kwargs.update({
'actor_n': an,
'service_n': tn,
})
mngr: ServiceMngr|None
if (mngr := _singleton[0]) is None:
log.info('Allocating a new service mngr!')
mngr = _singleton[0] = ServiceMngr(**init_kwargs)
# TODO: put into `.__aenter__()` section of
# eventual `@singleton_acm` API wrapper.
#
# assign globally for future daemon/task creation
mngr.actor_n = an
mngr.service_n = tn
else:
assert (
mngr.actor_n
and
mngr.service_tn
)
log.info(
'Using extant service mngr!\n\n'
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
)
try:
# NOTE: this is a singleton factory impl specific detail
# which should be supported in the condensed
# `@singleton_acm` API?
mngr.debug_mode = debug_mode
yield mngr
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in mngr.service_tasks:
# await mngr.cancel_service('samplerd')
tn.cancel_scope.cancel()
def get_service_mngr() -> ServiceMngr:
'''
Try to get the singleton service-mngr for this actor presuming it
has already been allocated using,
.. code:: python
async with open_<@singleton_acm(func)>() as mngr`
... this block kept open ...
If not yet allocated raise a `ServiceError`.
'''
# https://stackoverflow.com/a/12627202
# https://docs.python.org/3/library/inspect.html#inspect.Signature
maybe_mngr: ServiceMngr|None = inspect.signature(
open_service_mngr
).parameters['_singleton'].default[0]
if maybe_mngr is None:
raise RuntimeError(
'Someone must allocate a `ServiceMngr` using\n\n'
'`async with open_service_mngr()` beforehand!!\n'
)
return maybe_mngr
# TODO: we need remote wrapping and a general soln:
# - factor this into a ``tractor.highlevel`` extension # pack for the
# library.
@ -46,31 +183,46 @@ from ._util import (
# to the pikerd actor for starting services remotely!
# - prolly rename this to ActorServicesNursery since it spawns
# new actors and supervises them to completion?
class Services:
@dataclass
class ServiceMngr:
# class ServiceMngr(msgspec.Struct):
'''
A multi-subactor-as-service manager.
actor_n: tractor._supervise.ActorNursery
Spawn, supervise and monitor service/daemon subactors in a SC
process tree.
'''
actor_n: ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
debug_mode: bool = False # tractor sub-actor debug mode flag
service_tasks: dict[
str,
tuple[
trio.CancelScope,
Context,
Portal,
trio.Event,
]
] = {}
locks = defaultdict(trio.Lock)
] = field(default_factory=dict)
# internal per-service task mutexs
_locks = defaultdict(trio.Lock)
@classmethod
async def start_service_task(
self,
name: str,
portal: Portal,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
target: Callable,
allow_overruns: bool = False,
**ctx_kwargs,
) -> (trio.CancelScope, Context):
) -> (trio.CancelScope, Context, Any):
'''
Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` teardown.
@ -83,6 +235,7 @@ class Services:
task_status: TaskStatus[
tuple[
trio.CancelScope,
Context,
trio.Event,
Any,
]
@ -90,64 +243,87 @@ class Services:
) -> Any:
# TODO: use the ctx._scope directly here instead?
# -[ ] actually what semantics do we expect for this
# usage!?
with trio.CancelScope() as cs:
try:
async with portal.open_context(
target,
allow_overruns=allow_overruns,
**ctx_kwargs,
async with portal.open_context(
target,
allow_overruns=allow_overruns,
**ctx_kwargs,
) as (ctx, started):
) as (ctx, first):
# unblock once the remote context has started
complete = trio.Event()
task_status.started((cs, complete, first))
log.info(
f'`pikerd` service {name} started with value {first}'
)
try:
# unblock once the remote context has started
complete = trio.Event()
task_status.started((
cs,
ctx,
complete,
started,
))
log.info(
f'`pikerd` service {name} started with value {started}'
)
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res: Any = await ctx.result()
ctx_res: Any = await ctx.wait_for_result()
# NOTE: blocks indefinitely until cancelled
# either by error from the target context
# function or by being cancelled here by the
# surrounding cancel scope.
return (await portal.result(), ctx_res)
except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.channel.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service {name} was remotely cancelled?\n'
f'remote canceller: {canceller}\n'
f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n'
)
else:
raise
return (
await portal.wait_for_result(),
ctx_res,
)
except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.chan.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service `{name}` was remotely cancelled by a peer?\n'
# TODO: this would be a good spot to use
# a respawn feature Bo
f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n'
finally:
await portal.cancel_actor()
complete.set()
self.service_tasks.pop(name)
f'cancellee: {portal.chan.uid}\n'
f'canceller: {canceller}\n'
)
else:
raise
cs, complete, first = await self.service_n.start(open_context_in_task)
finally:
# NOTE: the ctx MUST be cancelled first if we
# don't want the above `ctx.wait_for_result()` to
# raise a self-ctxc. WHY, well since from the ctx's
# perspective the cancel request will have
# arrived out-out-of-band at the `Actor.cancel()`
# level, thus `Context.cancel_called == False`,
# meaning `ctx._is_self_cancelled() == False`.
# with trio.CancelScope(shield=True):
# await ctx.cancel()
await portal.cancel_actor()
complete.set()
self.service_tasks.pop(name)
cs, sub_ctx, complete, started = await self.service_n.start(
open_context_in_task
)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (cs, portal, complete)
self.service_tasks[name] = (cs, sub_ctx, portal, complete)
return cs, sub_ctx, started
return cs, first
@classmethod
async def cancel_service(
self,
name: str,
@ -158,8 +334,80 @@ class Services:
'''
log.info(f'Cancelling `pikerd` service {name}')
cs, portal, complete = self.service_tasks[name]
cs.cancel()
cs, sub_ctx, portal, complete = self.service_tasks[name]
# cs.cancel()
await sub_ctx.cancel()
await complete.wait()
assert name not in self.service_tasks, \
f'Serice task for {name} not terminated?'
if name in self.service_tasks:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Serice task for {name} not terminated?'
)
# assert name not in self.service_tasks, \
# f'Serice task for {name} not terminated?'
async def start_service(
self,
daemon_name: str,
ctx_ep: Callable, # kwargs must `partial`-ed in!
debug_mode: bool = False,
**tractor_actor_kwargs,
) -> Context:
'''
Start a "service" task in a new sub-actor (daemon) and manage it's lifetime
indefinitely.
Services can be cancelled/shutdown using `.cancel_service()`.
'''
entry: tuple|None = self.service_tasks.get(daemon_name)
if entry:
(cs, sub_ctx, portal, complete) = entry
return sub_ctx
if daemon_name not in self.service_tasks:
portal = await self.actor_n.start_actor(
daemon_name,
debug_mode=( # maybe set globally during allocate
debug_mode
or
self.debug_mode
),
**tractor_actor_kwargs,
)
ctx_kwargs: dict[str, Any] = {}
if isinstance(ctx_ep, functools.partial):
ctx_kwargs: dict[str, Any] = ctx_ep.keywords
ctx_ep: Callable = ctx_ep.func
(cs, sub_ctx, started) = await self.start_service_task(
daemon_name,
portal,
ctx_ep,
**ctx_kwargs,
)
return sub_ctx
# TODO:
# -[ ] factor all the common shit from `.data._sampling`
# and `.brokers._daemon` into here / `ServiceMngr`
# in terms of allocating the `Portal` as part of the
# "service-in-subactor" starting!
# -[ ] move to `tractor.hilevel._service`, import and use here!
# NOTE: purposely leaks the ref to the mod-scope Bo
# import tractor
# from tractor.hilevel import (
# open_service_mngr,
# ServiceMngr,
# )
# mngr: ServiceMngr|None = None
# with tractor.hilevel.open_service_mngr() as mngr:
# Services = proxy(mngr)

View File

@ -21,11 +21,13 @@ from typing import (
TYPE_CHECKING,
)
# TODO: oof, needs to be changed to `httpx`!
import asks
if TYPE_CHECKING:
import docker
from ._ahab import DockerContainer
from . import ServiceMngr
from ._util import log # sub-sys logger
from ._util import (
@ -127,7 +129,7 @@ def start_elasticsearch(
@acm
async def start_ahab_daemon(
service_mngr: Services,
service_mngr: ServiceMngr,
user_config: dict | None = None,
loglevel: str | None = None,

View File

@ -53,7 +53,7 @@ import pendulum
# import purerpc
from ..data.feed import maybe_open_feed
from . import Services
from . import ServiceMngr
from ._util import (
log, # sub-sys logger
get_console_log,
@ -233,7 +233,7 @@ def start_marketstore(
@acm
async def start_ahab_daemon(
service_mngr: Services,
service_mngr: ServiceMngr,
user_config: dict | None = None,
loglevel: str | None = None,

View File

@ -458,13 +458,15 @@ async def start_backfill(
'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
)
# ugh, what's a better way?
# TODO: fwiw, we probably want a way to signal a throttle
# condition (eg. with ib) so that we can halt the
# request loop until the condition is resolved?
if timeframe > 1:
await tractor.pause()
# UGH: what's a better way?
# TODO: backends are responsible for being correct on
# this right!?
# -[ ] in the `ib` case we could maybe offer some way
# to halt the request loop until the condition is
# resolved or should the backend be entirely in
# charge of solving such faults? yes, right?
# if timeframe > 1:
# await tractor.pause()
return
assert (
@ -578,6 +580,7 @@ async def start_backfill(
'crypto',
'crypto_currency',
'fiat', # a "forex pair"
'perpetual_future', # stupid "perps" from cex land
}:
# for now, our table key schema is not including
# the dst[/src] source asset token.

View File

@ -50,10 +50,8 @@ attrs = "^23.1.0"
bidict = "^0.22.1"
colorama = "^0.4.6"
colorlog = "^6.7.0"
cython = "^3.0.0"
greenback = "^1.1.1"
ib-insync = "^0.9.86"
msgspec = "^0.18.0"
msgspec = "^0.18.6"
numba = "^0.59.0"
numpy = "^1.25"
polars = "^0.18.13"
@ -76,8 +74,8 @@ pyarrow = "^17.0.0"
[tool.poetry.dependencies.tractor]
develop = true
git = 'https://github.com/goodboy/tractor.git'
branch = 'asyncio_debugger_support'
git = 'https://pikers.dev/goodboy/tractor.git'
branch = 'aio_abandons'
# path = "../tractor"
[tool.poetry.dependencies.asyncvnc]
@ -111,6 +109,8 @@ pytest = "^6.0.0"
elasticsearch = "^8.9.0"
xonsh = "^0.14.2"
prompt-toolkit = "3.0.40"
cython = "^3.0.0"
greenback = "^1.1.1"
# console ehancements and eventually remote debugging
# extras/helpers.

View File

@ -10,7 +10,7 @@ from piker import (
config,
)
from piker.service import (
Services,
get_service_mngr,
)
from piker.log import get_console_log
@ -129,7 +129,7 @@ async def _open_test_pikerd(
) as service_manager,
):
# this proc/actor is the pikerd
assert service_manager is Services
assert service_manager is get_service_mngr()
async with tractor.wait_for_actor(
'pikerd',

View File

@ -26,7 +26,7 @@ import pytest
import tractor
from uuid import uuid4
from piker.service import Services
from piker.service import ServiceMngr
from piker.log import get_logger
from piker.clearing._messages import (
Order,
@ -158,7 +158,7 @@ def load_and_check_pos(
def test_ems_err_on_bad_broker(
open_test_pikerd: Services,
open_test_pikerd: ServiceMngr,
loglevel: str,
):
async def load_bad_fqme():

View File

@ -15,7 +15,7 @@ import tractor
from piker.service import (
find_service,
Services,
ServiceMngr,
)
from piker.data import (
open_feed,
@ -44,7 +44,7 @@ def test_runtime_boot(
async def main():
port = 6666
daemon_addr = ('127.0.0.1', port)
services: Services
services: ServiceMngr
async with (
open_test_pikerd(