Compare commits

..

46 Commits

Author SHA1 Message Date
Nelson Torres 6555ccfbba config refactor
only one get_config method for api class and cryptofeed feed handler
2024-11-15 15:24:08 -03:00
Nelson Torres 75d1d007fb move constants to venue 2024-11-15 14:41:47 -03:00
Nelson Torres 2bdbe0f20e refactor redundant code 2024-11-15 14:26:16 -03:00
Nelson Torres a117177759 name formatting fixes 2024-11-15 11:23:05 -03:00
Nelson Torres 30060a83c9 get_mkt_info cleanup 2024-11-15 11:22:27 -03:00
Nelson Torres 156a35b606 cache_symbols refactor 2024-11-15 11:20:48 -03:00
Nelson Torres 89e241c132 json_rpc_auth_wrapper 2024-11-15 11:18:55 -03:00
Nelson Torres df8d1274ae move object classes to venue 2024-11-15 11:15:25 -03:00
Nelson Torres 0916b707e2 Added options symbols to get_assets 2024-11-14 17:39:52 -03:00
Tyler Goodlet 45788b0b53 .clearing._ems: Don't require `first_quote['last']`
Instead just check for the field (which i'm not huge on the key-name for
anyway) and if not found get the "last price" from the real-time shm
buffer's latest 'close' sample.

Unrelatedly, use a `subs.copy()` in the `Router.client_broadcast()` loop
such that if a `client_stream` is popped on connection failure, we don't
RTE for the "size changed on iteration".
2024-11-13 13:50:43 +00:00
Tyler Goodlet 38a1f0b9ee `kucoin`: repair live quotes streaming..
This must have broke at some point during the new `MktPair` and thus
`.fqme: str` updates; mas-o-menos the symbol key in the quote-msg-`dict`
was NOT set to the `MktPair.bs_fqme: str` value and thus wasn't being
processed by the downstream sampling and feed subsys.

So fix that as well as a few other refinements,
- set the `topic: mkt.bs_fqme` in quote msgs obvi.
- drop the "wait for first clearing vlm" quote poll loop; going to fix
  the sampler to handle a `first_quote` without a `'last'` key.
- add some typing around calls to `get_mkt_info()`.
- rename `stream_messages()` -> `iter_normed_quotes()`.
2024-11-13 13:50:16 +00:00
Nelson Torres f291654dbe Deleted settlePlan field from binance FutesPair. 2024-11-13 13:49:49 +00:00
Nelson Torres e9fa422916 Added missing fields for kucoin.
feeCategory, makerFeeCoefficient, takerFeeCoefficient and st.
2024-11-13 13:49:32 +00:00
Nelson Torres 5304a36b87 get_assets now uses public endpoint
It's better if the data is available through a public endpoint.
2024-11-13 10:43:30 -03:00
Nelson Torres 089c79e905 now using exch_info in search_symbols 2024-11-13 10:40:05 -03:00
Nelson Torres d848050b52 Fix bs_fqme using venue and expiry 2024-11-12 16:07:46 -03:00
Nelson Torres ddffe2bec6 Added expiry property for OptionPair 2024-11-12 16:06:59 -03:00
Nelson Torres 19b4ca9d85 No longer needed 2024-11-12 16:05:54 -03:00
Nelson Torres f037f851d8 bs_mktid instead bs_fqme for deribits options 2024-11-12 12:22:28 -03:00
Nelson Torres a3ab8dd8fe Fixed pair instrument name in search_symbols endpoint.
Fixed instrument in bars endpoint, for options in deribits bs_mktid instead bs_fqme.
Fixed the id is in msg.
2024-11-12 12:16:07 -03:00
Tyler Goodlet 6fa0d4bcf3 data._web_bs: try to raise jsonrpc errors in parent task 2024-11-11 13:07:39 +00:00
Nelson Torres a4f7fa9c1a Add necessary classes in init file for deribit 2024-11-08 21:58:45 +00:00
Nelson Torres 266ecf6206 Minor refactor in open_symbol_search 2024-11-08 21:58:06 +00:00
Nelson Torres ea6126d310 stream_quotes now using FeedInit 2024-11-08 21:57:35 +00:00
Nelson Torres 1f4a5b80c4 symbol_info refactor 2024-11-08 21:53:10 +00:00
Nelson Torres ac6f52088a search_symbols output type fix 2024-11-08 21:48:28 +00:00
Nelson Torres 960298514c add get_mkt_pairs method 2024-11-08 21:47:24 +00:00
Nelson Torres 71f3a0a4cd get_assets refactor 2024-11-08 21:46:58 +00:00
Nelson Torres b25a7699ab formatting 2024-11-08 21:45:27 +00:00
Nelson Torres b39affc96e created exch_info in api class 2024-11-08 21:42:42 +00:00
Nelson Torres be8629929b modify self_pairs type to ChainMap 2024-11-08 21:41:52 +00:00
Nelson Torres 4776be6736 Necessary imports 2024-11-08 21:38:08 +00:00
Nelson Torres 008e68174b add get_market_info 2024-11-08 21:36:23 +00:00
Nelson Torres b4a9b86783 Necessary imports 2024-11-08 21:35:09 +00:00
Nelson Torres d3ca571c0e minor fixes in venues 2024-11-08 21:34:03 +00:00
Nelson Torres b3bbef30c0 add class Pair in venues, PAIRTYPES for future uses 2024-11-06 14:45:11 +00:00
Nelson Torres 499b2d0090 fix syms for venues.
little refactor in get_config, and created get_fh_config for cryptofeed.
2024-11-04 19:36:16 +00:00
Nelson Torres 8b0f1e7045 venues for deribit 2024-11-04 09:34:21 -03:00
Nelson Torres b2cfa3444f Added cryptofeed and pyarrow necessary for the feed, enable deribit
in the brokers init file, at this point the feed is working, to check
the tables use vd tool.
2024-08-28 23:58:48 +00:00
Nelson Torres 0be454c3d6 Updated tractor method name. 2024-08-23 18:06:05 +00:00
Tyler Goodlet de6189da4d Drop `.cancel_actor()` from `maybe_spawn_daemon()`
Since `tractor`'s new and improved inter-actor cancellation semantics
are much more pedantic, AND bc we use the `ServiceMngr` for spawning
service actors on-demand, the caller of `maybe_spawn_daemon()` should
NEVER conduct a so called "out of band" `Actor`-runtime cancel request
since this is precisely the job of our `ServiceMngr` XD

Add a super in depth note explaining the underlying issue and adding
a todo list of how we should prolly augment `tractor` to make such cases
easier to grok and fix in the future!
2024-08-21 17:37:40 +00:00
Tyler Goodlet cc5b21a7e6 Prep service mngr for move to `tractor.hilevel`
Given it's a fairly simple yet useful abstraction, it makes sense to
offer this sub-sys alongside the core `tractor` runtime lib.

Without going into extreme detail on the impl changes (it'll come in
the commit that moves to the other repo) here is the high level summary:
------ - ------
- rename `Services` -> `ServiceMngr` and use an factory `@acm`
  to guarantee a single-instance-per-actor using a niche approach for a
  singleton object using a default keyword-arg B)
  - the mod level `open_service_mngr()` and `get_service_mngr()` are the
    new allocation/access API.
- add a `ServiceMngr.start_service()` method which does the work of both
  spawning a new subactor (for the daemon) and uses its portal to start
  the mngr side supervision task.
- open actor/task nurseries inside the `@acm` allocator.

Adjust other dependent subsystems to match:
------ - ------
- use `open_service_mngr()` when first allocated in `open_pikerd()`.
- use `get_service_mngr()` instead of importing the class ref inside
  `.service.maybe_spawn_daemon()`, `.brokers._daemon.spawn_brokerd()`
  and `.data._sampling.spawn_samplerd()` using a `partial` to pack in
  the endpoint ctx kwargs (unpacked inside `.start_service()` XD).
2024-08-20 23:01:51 -03:00
Nelson Torres 35a9d8ec9d tractor branch updated, msgspec version upgraded, cython and greenback dependencies moved under dev group 2024-08-14 23:22:30 +00:00
Nelson Torres a831212c86 default.nix file for qt6 uis 2024-08-14 20:17:14 -03:00
Tyler Goodlet e987d7d7c4 Lel, forgot to add a `SPOT` venue for `binance`.. 2024-08-14 22:51:23 +00:00
Tyler Goodlet 5ec756234a Mask no-data pause-point, add perps to sig.
Was orig for debugging an issue with `kucoin` i think but definitely
shouldn't be left in XD

Also add `'perpetual_future'` to `start_backfill()` input literal set.
2024-08-14 19:35:24 -03:00
17 changed files with 578 additions and 156 deletions

82
default.nix 100644
View File

@ -0,0 +1,82 @@
with (import <nixpkgs> {});
with python312Packages;
let
glibStorePath = lib.getLib glib;
qtpyStorePath = lib.getLib qtpy;
pyqt6StorePath = lib.getLib pyqt6;
pyqt6SipStorePath = lib.getLib pyqt6-sip;
qt6baseStorePath = lib.getLib qt6.qtbase;
rapidfuzzStorePath = lib.getLib rapidfuzz;
qdarkstyleStorePath = lib.getLib qdarkstyle;
in
stdenv.mkDerivation {
name = "piker-qt6-poetry-shell";
buildInputs = [
# System requirements.
glib
qt6.qtbase
libgcc.lib
# Python requirements.
python312Full
poetry-core
qdarkstyle
rapidfuzz
pyqt6
qtpy
];
src = null;
shellHook = ''
set -e
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${libgcc.lib}/lib:${glibStorePath}/lib
# Set the Qt plugin path
# export QT_DEBUG_PLUGINS=1
QTBASE_PATH="${qt6baseStorePath}"
echo "qtbase path: $QTBASE_PATH"
echo ""
export QT_PLUGIN_PATH="$QTBASE_PATH/lib/qt-6/plugins"
export QT_QPA_PLATFORM_PLUGIN_PATH="$QT_PLUGIN_PATH/platforms"
echo "qt plugin path: $QT_PLUGIN_PATH"
echo ""
# Maybe create venv & install deps
poetry install --with uis
# Use pyqt6 from System, patch activate script
ACTIVATE_SCRIPT_PATH="$(poetry env info --path)/bin/activate"
export RPDFUZZ_PATH="${rapidfuzzStorePath}/lib/python3.12/site-packages"
export QDRKSTYLE_PATH="${qdarkstyleStorePath}/lib/python3.12/site-packages"
export QTPY_PATH="${qtpyStorePath}/lib/python3.12/site-packages"
export PYQT6_PATH="${pyqt6StorePath}/lib/python3.12/site-packages"
export PYQT6_SIP_PATH="${pyqt6SipStorePath}/lib/python3.12/site-packages"
echo "rapidfuzz at: $RPDFUZZ_PATH"
echo "qdarkstyle at: $QDRKSTYLE_PATH"
echo "qtpy at: $QTPY_PATH"
echo "pyqt6 at: $PYQT6_PATH"
echo "pyqt6-sip at: $PYQT6_SIP_PATH"
echo ""
PATCH="export PYTHONPATH=\""
PATCH="$PATCH\$RPDFUZZ_PATH"
PATCH="$PATCH:\$QDRKSTYLE_PATH"
PATCH="$PATCH:\$QTPY_PATH"
PATCH="$PATCH:\$PYQT6_PATH"
PATCH="$PATCH:\$PYQT6_SIP_PATH"
PATCH="$PATCH\""
if grep -q "$PATCH" "$ACTIVATE_SCRIPT_PATH"; then
echo "venv is already patched."
else
echo "patching $ACTIVATE_SCRIPT_PATH to use pyqt6 from nixos..."
sed -i "\$i$PATCH" $ACTIVATE_SCRIPT_PATH
fi
poetry shell
'';
}

View File

@ -23,6 +23,7 @@ from __future__ import annotations
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from functools import partial
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
@ -190,14 +191,17 @@ def broker_init(
async def spawn_brokerd( async def spawn_brokerd(
brokername: str, brokername: str,
loglevel: str | None = None, loglevel: str | None = None,
**tractor_kwargs, **tractor_kwargs,
) -> bool: ) -> bool:
'''
Spawn a `brokerd.<backendname>` subactor service daemon
using `pikerd`'s service mngr.
'''
from piker.service._util import log # use service mngr log from piker.service._util import log # use service mngr log
log.info(f'Spawning {brokername} broker daemon') log.info(f'Spawning {brokername} broker daemon')
@ -217,27 +221,35 @@ async def spawn_brokerd(
# ask `pikerd` to spawn a new sub-actor and manage it under its # ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery # actor nursery
from piker.service import Services from piker.service import (
get_service_mngr,
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}' ServiceMngr,
portal = await Services.actor_n.start_actor(
dname,
enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'),
debug_mode=Services.debug_mode,
**tractor_kwargs
) )
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
# NOTE: the service mngr expects an already spawned actor + its mngr: ServiceMngr = get_service_mngr()
# portal ref in order to do non-blocking setup of brokerd ctx: tractor.Context = await mngr.start_service(
# service nursery. daemon_name=dname,
await Services.start_service_task( ctx_ep=partial(
dname,
portal,
# signature of target root-task endpoint # signature of target root-task endpoint
daemon_fixture_ep, daemon_fixture_ep,
# passed to daemon_fixture_ep(**kwargs)
brokername=brokername, brokername=brokername,
loglevel=loglevel, loglevel=loglevel,
),
debug_mode=mngr.debug_mode,
loglevel=loglevel,
enable_modules=(
_data_mods
+
tractor_kwargs.pop('enable_modules')
),
**tractor_kwargs
)
assert (
not ctx.cancel_called
and ctx.portal # parent side
and dname in ctx.chan.uid # subactor is named as desired
) )
return True return True
@ -262,8 +274,7 @@ async def maybe_spawn_brokerd(
from piker.service import maybe_spawn_daemon from piker.service import maybe_spawn_daemon
async with maybe_spawn_daemon( async with maybe_spawn_daemon(
service_name=f'brokerd.{brokername}',
f'brokerd.{brokername}',
service_task_target=spawn_brokerd, service_task_target=spawn_brokerd,
spawn_args={ spawn_args={
'brokername': brokername, 'brokername': brokername,

View File

@ -567,6 +567,7 @@ class Client:
) -> str: ) -> str:
return { return {
'USDTM': 'usdtm_futes', 'USDTM': 'usdtm_futes',
'SPOT': 'spot',
# 'COINM': 'coin_futes', # 'COINM': 'coin_futes',
# ^-TODO-^ bc someone might want it..? # ^-TODO-^ bc someone might want it..?
}[pair.venue] }[pair.venue]

View File

@ -335,7 +335,7 @@ def services(config, tl, ports):
name='service_query', name='service_query',
loglevel=config['loglevel'] if tl else None, loglevel=config['loglevel'] if tl else None,
), ),
tractor.get_arbiter( tractor.get_registry(
host=host, host=host,
port=ports[0] port=ports[0]
) as portal ) as portal

View File

@ -25,6 +25,7 @@ from collections import (
defaultdict, defaultdict,
) )
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from functools import partial
import time import time
from typing import ( from typing import (
Any, Any,
@ -42,7 +43,7 @@ from tractor.trionics import (
maybe_open_nursery, maybe_open_nursery,
) )
import trio import trio
from trio_typing import TaskStatus from trio import TaskStatus
from .ticktools import ( from .ticktools import (
frame_ticks, frame_ticks,
@ -70,6 +71,7 @@ if TYPE_CHECKING:
_default_delay_s: float = 1.0 _default_delay_s: float = 1.0
# TODO: use new `tractor.singleton_acm` API for this!
class Sampler: class Sampler:
''' '''
Global sampling engine registry. Global sampling engine registry.
@ -79,9 +81,9 @@ class Sampler:
This non-instantiated type is meant to be a singleton within This non-instantiated type is meant to be a singleton within
a `samplerd` actor-service spawned once by the user wishing to a `samplerd` actor-service spawned once by the user wishing to
time-step-sample (real-time) quote feeds, see time-step-sample a (real-time) quote feeds, see
``.service.maybe_open_samplerd()`` and the below `.service.maybe_open_samplerd()` and the below
``register_with_sampler()``. `register_with_sampler()`.
''' '''
service_nursery: None | trio.Nursery = None service_nursery: None | trio.Nursery = None
@ -375,7 +377,10 @@ async def register_with_sampler(
assert Sampler.ohlcv_shms assert Sampler.ohlcv_shms
# unblock caller # unblock caller
await ctx.started(set(Sampler.ohlcv_shms.keys())) await ctx.started(
# XXX bc msgpack only allows one array type!
list(Sampler.ohlcv_shms.keys())
)
if open_index_stream: if open_index_stream:
try: try:
@ -419,7 +424,6 @@ async def register_with_sampler(
async def spawn_samplerd( async def spawn_samplerd(
loglevel: str | None = None, loglevel: str | None = None,
**extra_tractor_kwargs **extra_tractor_kwargs
@ -429,7 +433,10 @@ async def spawn_samplerd(
update and increment count write and stream broadcasting. update and increment count write and stream broadcasting.
''' '''
from piker.service import Services from piker.service import (
get_service_mngr,
ServiceMngr,
)
dname = 'samplerd' dname = 'samplerd'
log.info(f'Spawning `{dname}`') log.info(f'Spawning `{dname}`')
@ -437,26 +444,33 @@ async def spawn_samplerd(
# singleton lock creation of ``samplerd`` since we only ever want # singleton lock creation of ``samplerd`` since we only ever want
# one daemon per ``pikerd`` proc tree. # one daemon per ``pikerd`` proc tree.
# TODO: make this built-into the service api? # TODO: make this built-into the service api?
async with Services.locks[dname + '_singleton']: mngr: ServiceMngr = get_service_mngr()
already_started: bool = dname in mngr.service_tasks
if dname not in Services.service_tasks: async with mngr._locks[dname + '_singleton']:
ctx: Context = await mngr.start_service(
daemon_name=dname,
ctx_ep=partial(
register_with_sampler,
period_s=1,
sub_for_broadcasts=False,
),
debug_mode=mngr.debug_mode, # set by pikerd flag
portal = await Services.actor_n.start_actor( # proxy-through to tractor
dname,
enable_modules=[ enable_modules=[
'piker.data._sampling', 'piker.data._sampling',
], ],
loglevel=loglevel, loglevel=loglevel,
debug_mode=Services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs **extra_tractor_kwargs
) )
if not already_started:
await Services.start_service_task( assert (
dname, ctx
portal, and
register_with_sampler, ctx.portal
period_s=1, and
sub_for_broadcasts=False, not ctx.cancel_called
) )
return True return True
@ -889,6 +903,7 @@ async def uniform_rate_send(
# to consumers which crash or lose network connection. # to consumers which crash or lose network connection.
# I.e. we **DO NOT** want to crash and propagate up to # I.e. we **DO NOT** want to crash and propagate up to
# ``pikerd`` these kinds of errors! # ``pikerd`` these kinds of errors!
trio.EndOfChannel,
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
ConnectionResetError, ConnectionResetError,

View File

@ -30,7 +30,11 @@ Actor runtime primtives and (distributed) service APIs for,
=> TODO: maybe to (re)move elsewhere? => TODO: maybe to (re)move elsewhere?
''' '''
from ._mngr import Services as Services from ._mngr import (
get_service_mngr as get_service_mngr,
open_service_mngr as open_service_mngr,
ServiceMngr as ServiceMngr,
)
from ._registry import ( from ._registry import (
_tractor_kwargs as _tractor_kwargs, _tractor_kwargs as _tractor_kwargs,
_default_reg_addr as _default_reg_addr, _default_reg_addr as _default_reg_addr,

View File

@ -21,7 +21,6 @@
from __future__ import annotations from __future__ import annotations
import os import os
from typing import ( from typing import (
Optional,
Any, Any,
ClassVar, ClassVar,
) )
@ -30,13 +29,13 @@ from contextlib import (
) )
import tractor import tractor
import trio
from ._util import ( from ._util import (
get_console_log, get_console_log,
) )
from ._mngr import ( from ._mngr import (
Services, open_service_mngr,
ServiceMngr,
) )
from ._registry import ( # noqa from ._registry import ( # noqa
_tractor_kwargs, _tractor_kwargs,
@ -59,7 +58,7 @@ async def open_piker_runtime(
registry_addrs: list[tuple[str, int]] = [], registry_addrs: list[tuple[str, int]] = [],
enable_modules: list[str] = [], enable_modules: list[str] = [],
loglevel: Optional[str] = None, loglevel: str|None = None,
# XXX NOTE XXX: you should pretty much never want debug mode # XXX NOTE XXX: you should pretty much never want debug mode
# for data daemons when running in production. # for data daemons when running in production.
@ -119,6 +118,10 @@ async def open_piker_runtime(
# spawn other specialized daemons I think? # spawn other specialized daemons I think?
enable_modules=enable_modules, enable_modules=enable_modules,
# TODO: how to configure this?
# keep it on by default if debug mode is set?
# maybe_enable_greenback=debug_mode,
**tractor_kwargs, **tractor_kwargs,
) as actor, ) as actor,
@ -167,12 +170,13 @@ async def open_pikerd(
**kwargs, **kwargs,
) -> Services: ) -> ServiceMngr:
''' '''
Start a root piker daemon with an indefinite lifetime. Start a root piker daemon actor (aka `pikerd`) with an indefinite
lifetime.
A root actor nursery is created which can be used to create and keep A root actor-nursery is created which can be used to spawn and
alive underling services (see below). supervise underling service sub-actors (see below).
''' '''
# NOTE: for the root daemon we always enable the root # NOTE: for the root daemon we always enable the root
@ -199,8 +203,6 @@ async def open_pikerd(
root_actor, root_actor,
reg_addrs, reg_addrs,
), ),
tractor.open_nursery() as actor_nursery,
trio.open_nursery() as service_nursery,
): ):
for addr in reg_addrs: for addr in reg_addrs:
if addr not in root_actor.accept_addrs: if addr not in root_actor.accept_addrs:
@ -209,25 +211,17 @@ async def open_pikerd(
'Maybe you have another daemon already running?' 'Maybe you have another daemon already running?'
) )
# assign globally for future daemon/task creation mngr: ServiceMngr
Services.actor_n = actor_nursery async with open_service_mngr(
Services.service_n = service_nursery debug_mode=debug_mode,
Services.debug_mode = debug_mode ) as mngr:
yield mngr
try:
yield Services
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in Services.service_tasks:
# await Services.cancel_service('samplerd')
service_nursery.cancel_scope.cancel()
# TODO: do we even need this? # TODO: do we even need this?
# @acm # @acm
# async def maybe_open_runtime( # async def maybe_open_runtime(
# loglevel: Optional[str] = None, # loglevel: str|None = None,
# **kwargs, # **kwargs,
# ) -> None: # ) -> None:
@ -256,7 +250,7 @@ async def maybe_open_pikerd(
loglevel: str | None = None, loglevel: str | None = None,
**kwargs, **kwargs,
) -> tractor._portal.Portal | ClassVar[Services]: ) -> tractor._portal.Portal | ClassVar[ServiceMngr]:
''' '''
If no ``pikerd`` daemon-root-actor can be found start it and If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self yield up (we should probably figure out returning a portal to self

View File

@ -49,7 +49,7 @@ from requests.exceptions import (
ReadTimeout, ReadTimeout,
) )
from ._mngr import Services from ._mngr import ServiceMngr
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
get_console_log, get_console_log,
@ -453,7 +453,7 @@ async def open_ahabd(
@acm @acm
async def start_ahab_service( async def start_ahab_service(
services: Services, services: ServiceMngr,
service_name: str, service_name: str,
# endpoint config passed as **kwargs # endpoint config passed as **kwargs
@ -549,7 +549,8 @@ async def start_ahab_service(
log.warning('Failed to cancel root permsed container') log.warning('Failed to cancel root permsed container')
except ( except (
trio.MultiError, # trio.MultiError,
ExceptionGroup,
) as err: ) as err:
for subexc in err.exceptions: for subexc in err.exceptions:
if isinstance(subexc, PermissionError): if isinstance(subexc, PermissionError):

View File

@ -26,14 +26,17 @@ from typing import (
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from collections import defaultdict
import tractor import tractor
import trio
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
) )
from ._mngr import ( from ._mngr import (
Services, get_service_mngr,
ServiceMngr,
) )
from ._actor_runtime import maybe_open_pikerd from ._actor_runtime import maybe_open_pikerd
from ._registry import find_service from ._registry import find_service
@ -41,15 +44,14 @@ from ._registry import find_service
@acm @acm
async def maybe_spawn_daemon( async def maybe_spawn_daemon(
service_name: str, service_name: str,
service_task_target: Callable, service_task_target: Callable,
spawn_args: dict[str, Any], spawn_args: dict[str, Any],
loglevel: str | None = None, loglevel: str | None = None,
singleton: bool = False, singleton: bool = False,
_locks = defaultdict(trio.Lock),
**pikerd_kwargs, **pikerd_kwargs,
) -> tractor.Portal: ) -> tractor.Portal:
@ -67,7 +69,7 @@ async def maybe_spawn_daemon(
''' '''
# serialize access to this section to avoid # serialize access to this section to avoid
# 2 or more tasks racing to create a daemon # 2 or more tasks racing to create a daemon
lock = Services.locks[service_name] lock = _locks[service_name]
await lock.acquire() await lock.acquire()
async with find_service( async with find_service(
@ -132,7 +134,65 @@ async def maybe_spawn_daemon(
async with tractor.wait_for_actor(service_name) as portal: async with tractor.wait_for_actor(service_name) as portal:
lock.release() lock.release()
yield portal yield portal
await portal.cancel_actor() # --- ---- ---
# XXX NOTE XXX
# --- ---- ---
# DO NOT PUT A `portal.cancel_actor()` here (as was prior)!
#
# Doing so will cause an "out-of-band" ctxc
# (`tractor.ContextCancelled`) to be raised inside the
# `ServiceMngr.open_context_in_task()`'s call to
# `ctx.wait_for_result()` AND the internal self-ctxc
# "graceful capture" WILL NOT CATCH IT!
#
# This can cause certain types of operations to raise
# that ctxc BEFORE THEY `return`, resulting in
# a "false-negative" ctxc being raised when really
# nothing actually failed, other then our semantic
# "failure" to suppress an expected, graceful,
# self-cancel scenario..
#
# bUt wHy duZ It WorK lIKe dis..
# ------------------------------
# from the perspective of the `tractor.Context` this
# cancel request was conducted "out of band" since
# `Context.cancel()` was never called and thus the
# `._cancel_called: bool` was never set. Despite the
# remote `.canceller` being set to `pikerd` (i.e. the
# same `Actor.uid` of the raising service-mngr task) the
# service-task's ctx itself was never marked as having
# requested cancellation and thus still raises the ctxc
# bc it was unaware of any such request.
#
# How to make grokin these cases easier tho?
# ------------------------------------------
# Because `Portal.cancel_actor()` was called it requests
# "full-`Actor`-runtime-cancellation" of it's peer
# process which IS NOT THE SAME as a single inter-actor
# RPC task cancelling its local context with a remote
# peer `Task` in that same peer process.
#
# ?TODO? It might be better if we do one (or all) of the
# following:
#
# -[ ] at least set a special message for the
# `ContextCancelled` when raised locally by the
# unaware ctx task such that we check for the
# `.canceller` being *our `Actor`* and in the case
# where `Context._cancel_called == False` we specially
# note that this is likely an "out-of-band"
# runtime-cancel request triggered by some call to
# `Portal.cancel_actor()`, possibly even reporting the
# exact LOC of that caller by tracking it inside our
# portal-type?
# -[ ] possibly add another field `ContextCancelled` like
# maybe a,
# `.request_type: Literal['os', 'proc', 'actor',
# 'ctx']` type thing which would allow immediately
# being able to tell what kind of cancellation caused
# the unexpected ctxc?
# -[ ] REMOVE THIS COMMENT, once we've settled on how to
# better augment `tractor` to be more explicit on this!
async def spawn_emsd( async def spawn_emsd(
@ -147,21 +207,22 @@ async def spawn_emsd(
""" """
log.info('Spawning emsd') log.info('Spawning emsd')
portal = await Services.actor_n.start_actor( smngr: ServiceMngr = get_service_mngr()
portal = await smngr.actor_n.start_actor(
'emsd', 'emsd',
enable_modules=[ enable_modules=[
'piker.clearing._ems', 'piker.clearing._ems',
'piker.clearing._client', 'piker.clearing._client',
], ],
loglevel=loglevel, loglevel=loglevel,
debug_mode=Services.debug_mode, # set by pikerd flag debug_mode=smngr.debug_mode, # set by pikerd flag
**extra_tractor_kwargs **extra_tractor_kwargs
) )
# non-blocking setup of clearing service # non-blocking setup of clearing service
from ..clearing._ems import _setup_persistent_emsd from ..clearing._ems import _setup_persistent_emsd
await Services.start_service_task( await smngr.start_service_task(
'emsd', 'emsd',
portal, portal,

View File

@ -18,16 +18,29 @@
daemon-service management API. daemon-service management API.
""" """
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
# contextmanager as cm,
)
from collections import defaultdict from collections import defaultdict
from dataclasses import (
dataclass,
field,
)
import functools
import inspect
from typing import ( from typing import (
Callable, Callable,
Any, Any,
) )
import trio import msgspec
from trio_typing import TaskStatus
import tractor import tractor
import trio
from trio import TaskStatus
from tractor import ( from tractor import (
ActorNursery,
current_actor, current_actor,
ContextCancelled, ContextCancelled,
Context, Context,
@ -39,6 +52,130 @@ from ._util import (
) )
# TODO: implement a singleton deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# TODO: singleton factory API instead of a class API
@acm
async def open_service_mngr(
*,
debug_mode: bool = False,
# impl deat which ensures a single global instance
_singleton: list[ServiceMngr|None] = [None],
**init_kwargs,
) -> ServiceMngr:
'''
Open a multi-subactor-as-service-daemon tree supervisor.
The delivered `ServiceMngr` is a singleton instance for each
actor-process and is allocated on first open and never
de-allocated unless explicitly deleted by al call to
`del_service_mngr()`.
'''
# TODO: factor this an allocation into
# a `._mngr.open_service_mngr()` and put in the
# once-n-only-once setup/`.__aenter__()` part!
# -[ ] how to make this only happen on the `mngr == None` case?
# |_ use `.trionics.maybe_open_context()` (for generic
# async-with-style-only-once of the factory impl, though
# what do we do for the allocation case?
# / `.maybe_open_nursery()` (since for this specific case
# it's simpler?) to activate
async with (
tractor.open_nursery() as an,
trio.open_nursery() as tn,
):
# impl specific obvi..
init_kwargs.update({
'actor_n': an,
'service_n': tn,
})
mngr: ServiceMngr|None
if (mngr := _singleton[0]) is None:
log.info('Allocating a new service mngr!')
mngr = _singleton[0] = ServiceMngr(**init_kwargs)
# TODO: put into `.__aenter__()` section of
# eventual `@singleton_acm` API wrapper.
#
# assign globally for future daemon/task creation
mngr.actor_n = an
mngr.service_n = tn
else:
assert (
mngr.actor_n
and
mngr.service_tn
)
log.info(
'Using extant service mngr!\n\n'
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
)
try:
# NOTE: this is a singleton factory impl specific detail
# which should be supported in the condensed
# `@singleton_acm` API?
mngr.debug_mode = debug_mode
yield mngr
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in mngr.service_tasks:
# await mngr.cancel_service('samplerd')
tn.cancel_scope.cancel()
def get_service_mngr() -> ServiceMngr:
'''
Try to get the singleton service-mngr for this actor presuming it
has already been allocated using,
.. code:: python
async with open_<@singleton_acm(func)>() as mngr`
... this block kept open ...
If not yet allocated raise a `ServiceError`.
'''
# https://stackoverflow.com/a/12627202
# https://docs.python.org/3/library/inspect.html#inspect.Signature
maybe_mngr: ServiceMngr|None = inspect.signature(
open_service_mngr
).parameters['_singleton'].default[0]
if maybe_mngr is None:
raise RuntimeError(
'Someone must allocate a `ServiceMngr` using\n\n'
'`async with open_service_mngr()` beforehand!!\n'
)
return maybe_mngr
# TODO: we need remote wrapping and a general soln: # TODO: we need remote wrapping and a general soln:
# - factor this into a ``tractor.highlevel`` extension # pack for the # - factor this into a ``tractor.highlevel`` extension # pack for the
# library. # library.
@ -46,31 +183,46 @@ from ._util import (
# to the pikerd actor for starting services remotely! # to the pikerd actor for starting services remotely!
# - prolly rename this to ActorServicesNursery since it spawns # - prolly rename this to ActorServicesNursery since it spawns
# new actors and supervises them to completion? # new actors and supervises them to completion?
class Services: @dataclass
class ServiceMngr:
# class ServiceMngr(msgspec.Struct):
'''
A multi-subactor-as-service manager.
actor_n: tractor._supervise.ActorNursery Spawn, supervise and monitor service/daemon subactors in a SC
process tree.
'''
actor_n: ActorNursery
service_n: trio.Nursery service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag debug_mode: bool = False # tractor sub-actor debug mode flag
service_tasks: dict[ service_tasks: dict[
str, str,
tuple[ tuple[
trio.CancelScope, trio.CancelScope,
Context,
Portal, Portal,
trio.Event, trio.Event,
] ]
] = {} ] = field(default_factory=dict)
locks = defaultdict(trio.Lock)
# internal per-service task mutexs
_locks = defaultdict(trio.Lock)
@classmethod
async def start_service_task( async def start_service_task(
self, self,
name: str, name: str,
portal: Portal, portal: Portal,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
target: Callable, target: Callable,
allow_overruns: bool = False, allow_overruns: bool = False,
**ctx_kwargs, **ctx_kwargs,
) -> (trio.CancelScope, Context): ) -> (trio.CancelScope, Context, Any):
''' '''
Open a context in a service sub-actor, add to a stack Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` teardown. that gets unwound at ``pikerd`` teardown.
@ -83,6 +235,7 @@ class Services:
task_status: TaskStatus[ task_status: TaskStatus[
tuple[ tuple[
trio.CancelScope, trio.CancelScope,
Context,
trio.Event, trio.Event,
Any, Any,
] ]
@ -90,64 +243,87 @@ class Services:
) -> Any: ) -> Any:
# TODO: use the ctx._scope directly here instead?
# -[ ] actually what semantics do we expect for this
# usage!?
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
try:
async with portal.open_context( async with portal.open_context(
target, target,
allow_overruns=allow_overruns, allow_overruns=allow_overruns,
**ctx_kwargs, **ctx_kwargs,
) as (ctx, first): ) as (ctx, started):
# unblock once the remote context has started # unblock once the remote context has started
complete = trio.Event() complete = trio.Event()
task_status.started((cs, complete, first)) task_status.started((
cs,
ctx,
complete,
started,
))
log.info( log.info(
f'`pikerd` service {name} started with value {first}' f'`pikerd` service {name} started with value {started}'
) )
try:
# wait on any context's return value # wait on any context's return value
# and any final portal result from the # and any final portal result from the
# sub-actor. # sub-actor.
ctx_res: Any = await ctx.result() ctx_res: Any = await ctx.wait_for_result()
# NOTE: blocks indefinitely until cancelled # NOTE: blocks indefinitely until cancelled
# either by error from the target context # either by error from the target context
# function or by being cancelled here by the # function or by being cancelled here by the
# surrounding cancel scope. # surrounding cancel scope.
return (await portal.result(), ctx_res) return (
await portal.wait_for_result(),
ctx_res,
)
except ContextCancelled as ctxe: except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid our_uid: tuple[str, str] = current_actor().uid
if ( if (
canceller != portal.channel.uid canceller != portal.chan.uid
and and
canceller != our_uid canceller != our_uid
): ):
log.cancel( log.cancel(
f'Actor-service {name} was remotely cancelled?\n' f'Actor-service `{name}` was remotely cancelled by a peer?\n'
f'remote canceller: {canceller}\n'
f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n' # TODO: this would be a good spot to use
# a respawn feature Bo
f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n'
f'cancellee: {portal.chan.uid}\n'
f'canceller: {canceller}\n'
) )
else: else:
raise raise
finally: finally:
# NOTE: the ctx MUST be cancelled first if we
# don't want the above `ctx.wait_for_result()` to
# raise a self-ctxc. WHY, well since from the ctx's
# perspective the cancel request will have
# arrived out-out-of-band at the `Actor.cancel()`
# level, thus `Context.cancel_called == False`,
# meaning `ctx._is_self_cancelled() == False`.
# with trio.CancelScope(shield=True):
# await ctx.cancel()
await portal.cancel_actor() await portal.cancel_actor()
complete.set() complete.set()
self.service_tasks.pop(name) self.service_tasks.pop(name)
cs, complete, first = await self.service_n.start(open_context_in_task) cs, sub_ctx, complete, started = await self.service_n.start(
open_context_in_task
)
# store the cancel scope and portal for later cancellation or # store the cancel scope and portal for later cancellation or
# retstart if needed. # retstart if needed.
self.service_tasks[name] = (cs, portal, complete) self.service_tasks[name] = (cs, sub_ctx, portal, complete)
return cs, sub_ctx, started
return cs, first
@classmethod
async def cancel_service( async def cancel_service(
self, self,
name: str, name: str,
@ -158,8 +334,80 @@ class Services:
''' '''
log.info(f'Cancelling `pikerd` service {name}') log.info(f'Cancelling `pikerd` service {name}')
cs, portal, complete = self.service_tasks[name] cs, sub_ctx, portal, complete = self.service_tasks[name]
cs.cancel()
# cs.cancel()
await sub_ctx.cancel()
await complete.wait() await complete.wait()
assert name not in self.service_tasks, \
if name in self.service_tasks:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Serice task for {name} not terminated?' f'Serice task for {name} not terminated?'
)
# assert name not in self.service_tasks, \
# f'Serice task for {name} not terminated?'
async def start_service(
self,
daemon_name: str,
ctx_ep: Callable, # kwargs must `partial`-ed in!
debug_mode: bool = False,
**tractor_actor_kwargs,
) -> Context:
'''
Start a "service" task in a new sub-actor (daemon) and manage it's lifetime
indefinitely.
Services can be cancelled/shutdown using `.cancel_service()`.
'''
entry: tuple|None = self.service_tasks.get(daemon_name)
if entry:
(cs, sub_ctx, portal, complete) = entry
return sub_ctx
if daemon_name not in self.service_tasks:
portal = await self.actor_n.start_actor(
daemon_name,
debug_mode=( # maybe set globally during allocate
debug_mode
or
self.debug_mode
),
**tractor_actor_kwargs,
)
ctx_kwargs: dict[str, Any] = {}
if isinstance(ctx_ep, functools.partial):
ctx_kwargs: dict[str, Any] = ctx_ep.keywords
ctx_ep: Callable = ctx_ep.func
(cs, sub_ctx, started) = await self.start_service_task(
daemon_name,
portal,
ctx_ep,
**ctx_kwargs,
)
return sub_ctx
# TODO:
# -[ ] factor all the common shit from `.data._sampling`
# and `.brokers._daemon` into here / `ServiceMngr`
# in terms of allocating the `Portal` as part of the
# "service-in-subactor" starting!
# -[ ] move to `tractor.hilevel._service`, import and use here!
# NOTE: purposely leaks the ref to the mod-scope Bo
# import tractor
# from tractor.hilevel import (
# open_service_mngr,
# ServiceMngr,
# )
# mngr: ServiceMngr|None = None
# with tractor.hilevel.open_service_mngr() as mngr:
# Services = proxy(mngr)

View File

@ -21,11 +21,13 @@ from typing import (
TYPE_CHECKING, TYPE_CHECKING,
) )
# TODO: oof, needs to be changed to `httpx`!
import asks import asks
if TYPE_CHECKING: if TYPE_CHECKING:
import docker import docker
from ._ahab import DockerContainer from ._ahab import DockerContainer
from . import ServiceMngr
from ._util import log # sub-sys logger from ._util import log # sub-sys logger
from ._util import ( from ._util import (
@ -127,7 +129,7 @@ def start_elasticsearch(
@acm @acm
async def start_ahab_daemon( async def start_ahab_daemon(
service_mngr: Services, service_mngr: ServiceMngr,
user_config: dict | None = None, user_config: dict | None = None,
loglevel: str | None = None, loglevel: str | None = None,

View File

@ -53,7 +53,7 @@ import pendulum
# import purerpc # import purerpc
from ..data.feed import maybe_open_feed from ..data.feed import maybe_open_feed
from . import Services from . import ServiceMngr
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
get_console_log, get_console_log,
@ -233,7 +233,7 @@ def start_marketstore(
@acm @acm
async def start_ahab_daemon( async def start_ahab_daemon(
service_mngr: Services, service_mngr: ServiceMngr,
user_config: dict | None = None, user_config: dict | None = None,
loglevel: str | None = None, loglevel: str | None = None,

View File

@ -458,13 +458,15 @@ async def start_backfill(
'bf_until <- last_start_dt:\n' 'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n' f'{backfill_until_dt} <- {last_start_dt}\n'
) )
# UGH: what's a better way?
# ugh, what's a better way? # TODO: backends are responsible for being correct on
# TODO: fwiw, we probably want a way to signal a throttle # this right!?
# condition (eg. with ib) so that we can halt the # -[ ] in the `ib` case we could maybe offer some way
# request loop until the condition is resolved? # to halt the request loop until the condition is
if timeframe > 1: # resolved or should the backend be entirely in
await tractor.pause() # charge of solving such faults? yes, right?
# if timeframe > 1:
# await tractor.pause()
return return
assert ( assert (
@ -578,6 +580,7 @@ async def start_backfill(
'crypto', 'crypto',
'crypto_currency', 'crypto_currency',
'fiat', # a "forex pair" 'fiat', # a "forex pair"
'perpetual_future', # stupid "perps" from cex land
}: }:
# for now, our table key schema is not including # for now, our table key schema is not including
# the dst[/src] source asset token. # the dst[/src] source asset token.

View File

@ -50,10 +50,8 @@ attrs = "^23.1.0"
bidict = "^0.22.1" bidict = "^0.22.1"
colorama = "^0.4.6" colorama = "^0.4.6"
colorlog = "^6.7.0" colorlog = "^6.7.0"
cython = "^3.0.0"
greenback = "^1.1.1"
ib-insync = "^0.9.86" ib-insync = "^0.9.86"
msgspec = "^0.18.0" msgspec = "^0.18.6"
numba = "^0.59.0" numba = "^0.59.0"
numpy = "^1.25" numpy = "^1.25"
polars = "^0.18.13" polars = "^0.18.13"
@ -76,8 +74,8 @@ pyarrow = "^17.0.0"
[tool.poetry.dependencies.tractor] [tool.poetry.dependencies.tractor]
develop = true develop = true
git = 'https://github.com/goodboy/tractor.git' git = 'https://pikers.dev/goodboy/tractor.git'
branch = 'asyncio_debugger_support' branch = 'aio_abandons'
# path = "../tractor" # path = "../tractor"
[tool.poetry.dependencies.asyncvnc] [tool.poetry.dependencies.asyncvnc]
@ -111,6 +109,8 @@ pytest = "^6.0.0"
elasticsearch = "^8.9.0" elasticsearch = "^8.9.0"
xonsh = "^0.14.2" xonsh = "^0.14.2"
prompt-toolkit = "3.0.40" prompt-toolkit = "3.0.40"
cython = "^3.0.0"
greenback = "^1.1.1"
# console ehancements and eventually remote debugging # console ehancements and eventually remote debugging
# extras/helpers. # extras/helpers.

View File

@ -10,7 +10,7 @@ from piker import (
config, config,
) )
from piker.service import ( from piker.service import (
Services, get_service_mngr,
) )
from piker.log import get_console_log from piker.log import get_console_log
@ -129,7 +129,7 @@ async def _open_test_pikerd(
) as service_manager, ) as service_manager,
): ):
# this proc/actor is the pikerd # this proc/actor is the pikerd
assert service_manager is Services assert service_manager is get_service_mngr()
async with tractor.wait_for_actor( async with tractor.wait_for_actor(
'pikerd', 'pikerd',

View File

@ -26,7 +26,7 @@ import pytest
import tractor import tractor
from uuid import uuid4 from uuid import uuid4
from piker.service import Services from piker.service import ServiceMngr
from piker.log import get_logger from piker.log import get_logger
from piker.clearing._messages import ( from piker.clearing._messages import (
Order, Order,
@ -158,7 +158,7 @@ def load_and_check_pos(
def test_ems_err_on_bad_broker( def test_ems_err_on_bad_broker(
open_test_pikerd: Services, open_test_pikerd: ServiceMngr,
loglevel: str, loglevel: str,
): ):
async def load_bad_fqme(): async def load_bad_fqme():

View File

@ -15,7 +15,7 @@ import tractor
from piker.service import ( from piker.service import (
find_service, find_service,
Services, ServiceMngr,
) )
from piker.data import ( from piker.data import (
open_feed, open_feed,
@ -44,7 +44,7 @@ def test_runtime_boot(
async def main(): async def main():
port = 6666 port = 6666
daemon_addr = ('127.0.0.1', port) daemon_addr = ('127.0.0.1', port)
services: Services services: ServiceMngr
async with ( async with (
open_test_pikerd( open_test_pikerd(