Compare commits

..

8 Commits

Author SHA1 Message Date
Tyler Goodlet e69796ad6d Allow ledger passes to ignore (symcache) unknown fqmes
For example in the paper-eng, if you have a backend that doesn't fully
support a symcache (yet) it's handy to be able to ignore processing
other paper-eng txns when all you care about at the moment is the
simulated symbol.

NOTE, that currently this will still result in a key-error when you load
more then one mkt with the paper engine (for which the backend does not
have the symcache implemented) since no fqme ad-hoc query was made for
the 2nd symbol (and i'm not sure we should support that kinda hackery
over just encouraging the sym-cache being added?). Def needs a little
more thought depending on how many backends are never going to be able
to (easily) support caching..
2025-02-11 18:42:45 -05:00
Tyler Goodlet 46bf4ef217 .clearing._ems: Don't require `first_quote['last']`
Instead just check for the field (which i'm not huge on the key-name for
anyway) and if not found get the "last price" from the real-time shm
buffer's latest 'close' sample.

Unrelatedly, use a `subs.copy()` in the `Router.client_broadcast()` loop
such that if a `client_stream` is popped on connection failure, we don't
RTE for the "size changed on iteration".
2025-02-11 18:42:45 -05:00
Tyler Goodlet d2a43c38f7 `kucoin`: repair live quotes streaming..
This must have broke at some point during the new `MktPair` and thus
`.fqme: str` updates; mas-o-menos the symbol key in the quote-msg-`dict`
was NOT set to the `MktPair.bs_fqme: str` value and thus wasn't being
processed by the downstream sampling and feed subsys.

So fix that as well as a few other refinements,
- set the `topic: mkt.bs_fqme` in quote msgs obvi.
- drop the "wait for first clearing vlm" quote poll loop; going to fix
  the sampler to handle a `first_quote` without a `'last'` key.
- add some typing around calls to `get_mkt_info()`.
- rename `stream_messages()` -> `iter_normed_quotes()`.
2025-02-11 18:42:45 -05:00
Nelson Torres 086729186f Deleted settlePlan field from binance FutesPair. 2025-02-11 18:42:45 -05:00
Nelson Torres 1fcee30156 Added missing fields for kucoin.
feeCategory, makerFeeCoefficient, takerFeeCoefficient and st.
2025-02-11 18:42:45 -05:00
Tyler Goodlet 79c9d38f30 data._web_bs: try to raise jsonrpc errors in parent task 2025-02-11 18:42:45 -05:00
Tyler Goodlet 923ca0cda2 Lel, forgot to add a `SPOT` venue for `binance`.. 2025-02-11 18:42:45 -05:00
Tyler Goodlet 5d96fa01a1 Mask no-data pause-point, add perps to sig.
Was orig for debugging an issue with `kucoin` i think but definitely
shouldn't be left in XD

Also add `'perpetual_future'` to `start_backfill()` input literal set.
2025-02-11 18:42:45 -05:00
13 changed files with 235 additions and 215 deletions

View File

@ -23,7 +23,6 @@ from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
)
from functools import partial
from types import ModuleType
from typing import (
TYPE_CHECKING,
@ -191,17 +190,14 @@ def broker_init(
async def spawn_brokerd(
brokername: str,
loglevel: str | None = None,
**tractor_kwargs,
) -> bool:
'''
Spawn a `brokerd.<backendname>` subactor service daemon
using `pikerd`'s service mngr.
'''
from piker.service._util import log # use service mngr log
log.info(f'Spawning {brokername} broker daemon')
@ -221,35 +217,27 @@ async def spawn_brokerd(
# ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery
from piker.service import (
get_service_mngr,
ServiceMngr,
)
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
mngr: ServiceMngr = get_service_mngr()
ctx: tractor.Context = await mngr.start_service(
daemon_name=dname,
ctx_ep=partial(
# signature of target root-task endpoint
daemon_fixture_ep,
from piker.service import Services
# passed to daemon_fixture_ep(**kwargs)
brokername=brokername,
loglevel=loglevel,
),
debug_mode=mngr.debug_mode,
loglevel=loglevel,
enable_modules=(
_data_mods
+
tractor_kwargs.pop('enable_modules')
),
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
portal = await Services.actor_n.start_actor(
dname,
enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'),
debug_mode=Services.debug_mode,
**tractor_kwargs
)
assert (
not ctx.cancel_called
and ctx.portal # parent side
and dname in ctx.chan.uid # subactor is named as desired
# NOTE: the service mngr expects an already spawned actor + its
# portal ref in order to do non-blocking setup of brokerd
# service nursery.
await Services.start_service_task(
dname,
portal,
# signature of target root-task endpoint
daemon_fixture_ep,
brokername=brokername,
loglevel=loglevel,
)
return True
@ -274,7 +262,8 @@ async def maybe_spawn_brokerd(
from piker.service import maybe_spawn_daemon
async with maybe_spawn_daemon(
service_name=f'brokerd.{brokername}',
f'brokerd.{brokername}',
service_task_target=spawn_brokerd,
spawn_args={
'brokername': brokername,

View File

@ -335,7 +335,7 @@ def services(config, tl, ports):
name='service_query',
loglevel=config['loglevel'] if tl else None,
),
tractor.get_registry(
tractor.get_arbiter(
host=host,
port=ports[0]
) as portal

View File

@ -25,7 +25,6 @@ from collections import (
defaultdict,
)
from contextlib import asynccontextmanager as acm
from functools import partial
import time
from typing import (
Any,
@ -43,7 +42,7 @@ from tractor.trionics import (
maybe_open_nursery,
)
import trio
from trio import TaskStatus
from trio_typing import TaskStatus
from .ticktools import (
frame_ticks,
@ -71,7 +70,6 @@ if TYPE_CHECKING:
_default_delay_s: float = 1.0
# TODO: use new `tractor.singleton_acm` API for this!
class Sampler:
'''
Global sampling engine registry.
@ -81,9 +79,9 @@ class Sampler:
This non-instantiated type is meant to be a singleton within
a `samplerd` actor-service spawned once by the user wishing to
time-step-sample a (real-time) quote feeds, see
`.service.maybe_open_samplerd()` and the below
`register_with_sampler()`.
time-step-sample (real-time) quote feeds, see
``.service.maybe_open_samplerd()`` and the below
``register_with_sampler()``.
'''
service_nursery: None | trio.Nursery = None
@ -377,10 +375,7 @@ async def register_with_sampler(
assert Sampler.ohlcv_shms
# unblock caller
await ctx.started(
# XXX bc msgpack only allows one array type!
list(Sampler.ohlcv_shms.keys())
)
await ctx.started(set(Sampler.ohlcv_shms.keys()))
if open_index_stream:
try:
@ -424,6 +419,7 @@ async def register_with_sampler(
async def spawn_samplerd(
loglevel: str | None = None,
**extra_tractor_kwargs
@ -433,10 +429,7 @@ async def spawn_samplerd(
update and increment count write and stream broadcasting.
'''
from piker.service import (
get_service_mngr,
ServiceMngr,
)
from piker.service import Services
dname = 'samplerd'
log.info(f'Spawning `{dname}`')
@ -444,33 +437,26 @@ async def spawn_samplerd(
# singleton lock creation of ``samplerd`` since we only ever want
# one daemon per ``pikerd`` proc tree.
# TODO: make this built-into the service api?
mngr: ServiceMngr = get_service_mngr()
already_started: bool = dname in mngr.service_tasks
async with Services.locks[dname + '_singleton']:
async with mngr._locks[dname + '_singleton']:
ctx: Context = await mngr.start_service(
daemon_name=dname,
ctx_ep=partial(
if dname not in Services.service_tasks:
portal = await Services.actor_n.start_actor(
dname,
enable_modules=[
'piker.data._sampling',
],
loglevel=loglevel,
debug_mode=Services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs
)
await Services.start_service_task(
dname,
portal,
register_with_sampler,
period_s=1,
sub_for_broadcasts=False,
),
debug_mode=mngr.debug_mode, # set by pikerd flag
# proxy-through to tractor
enable_modules=[
'piker.data._sampling',
],
loglevel=loglevel,
**extra_tractor_kwargs
)
if not already_started:
assert (
ctx
and
ctx.portal
and
not ctx.cancel_called
)
return True
@ -903,7 +889,6 @@ async def uniform_rate_send(
# to consumers which crash or lose network connection.
# I.e. we **DO NOT** want to crash and propagate up to
# ``pikerd`` these kinds of errors!
trio.EndOfChannel,
trio.ClosedResourceError,
trio.BrokenResourceError,
ConnectionResetError,

View File

@ -30,11 +30,7 @@ Actor runtime primtives and (distributed) service APIs for,
=> TODO: maybe to (re)move elsewhere?
'''
from ._mngr import (
get_service_mngr as get_service_mngr,
open_service_mngr as open_service_mngr,
ServiceMngr as ServiceMngr,
)
from ._mngr import Services as Services
from ._registry import (
_tractor_kwargs as _tractor_kwargs,
_default_reg_addr as _default_reg_addr,

View File

@ -21,6 +21,7 @@
from __future__ import annotations
import os
from typing import (
Optional,
Any,
ClassVar,
)
@ -29,13 +30,13 @@ from contextlib import (
)
import tractor
import trio
from ._util import (
get_console_log,
)
from ._mngr import (
open_service_mngr,
ServiceMngr,
Services,
)
from ._registry import ( # noqa
_tractor_kwargs,
@ -58,7 +59,7 @@ async def open_piker_runtime(
registry_addrs: list[tuple[str, int]] = [],
enable_modules: list[str] = [],
loglevel: str|None = None,
loglevel: Optional[str] = None,
# XXX NOTE XXX: you should pretty much never want debug mode
# for data daemons when running in production.
@ -68,7 +69,7 @@ async def open_piker_runtime(
# and spawn the service tree distributed per that.
start_method: str = 'trio',
tractor_runtime_overrides: dict|None = None,
tractor_runtime_overrides: dict | None = None,
**tractor_kwargs,
) -> tuple[
@ -118,10 +119,6 @@ async def open_piker_runtime(
# spawn other specialized daemons I think?
enable_modules=enable_modules,
# TODO: how to configure this?
# keep it on by default if debug mode is set?
# maybe_enable_greenback=debug_mode,
**tractor_kwargs,
) as actor,
@ -170,13 +167,12 @@ async def open_pikerd(
**kwargs,
) -> ServiceMngr:
) -> Services:
'''
Start a root piker daemon actor (aka `pikerd`) with an indefinite
lifetime.
Start a root piker daemon with an indefinite lifetime.
A root actor-nursery is created which can be used to spawn and
supervise underling service sub-actors (see below).
A root actor nursery is created which can be used to create and keep
alive underling services (see below).
'''
# NOTE: for the root daemon we always enable the root
@ -203,6 +199,8 @@ async def open_pikerd(
root_actor,
reg_addrs,
),
tractor.open_nursery() as actor_nursery,
trio.open_nursery() as service_nursery,
):
for addr in reg_addrs:
if addr not in root_actor.accept_addrs:
@ -211,17 +209,25 @@ async def open_pikerd(
'Maybe you have another daemon already running?'
)
mngr: ServiceMngr
async with open_service_mngr(
debug_mode=debug_mode,
) as mngr:
yield mngr
# assign globally for future daemon/task creation
Services.actor_n = actor_nursery
Services.service_n = service_nursery
Services.debug_mode = debug_mode
try:
yield Services
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in Services.service_tasks:
# await Services.cancel_service('samplerd')
service_nursery.cancel_scope.cancel()
# TODO: do we even need this?
# @acm
# async def maybe_open_runtime(
# loglevel: str|None = None,
# loglevel: Optional[str] = None,
# **kwargs,
# ) -> None:
@ -250,7 +256,7 @@ async def maybe_open_pikerd(
loglevel: str | None = None,
**kwargs,
) -> tractor._portal.Portal | ClassVar[ServiceMngr]:
) -> tractor._portal.Portal | ClassVar[Services]:
'''
If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self

View File

@ -49,7 +49,7 @@ from requests.exceptions import (
ReadTimeout,
)
from ._mngr import ServiceMngr
from ._mngr import Services
from ._util import (
log, # sub-sys logger
get_console_log,
@ -453,7 +453,7 @@ async def open_ahabd(
@acm
async def start_ahab_service(
services: ServiceMngr,
services: Services,
service_name: str,
# endpoint config passed as **kwargs
@ -549,8 +549,7 @@ async def start_ahab_service(
log.warning('Failed to cancel root permsed container')
except (
# trio.MultiError,
ExceptionGroup,
trio.MultiError,
) as err:
for subexc in err.exceptions:
if isinstance(subexc, PermissionError):

View File

@ -26,17 +26,14 @@ from typing import (
from contextlib import (
asynccontextmanager as acm,
)
from collections import defaultdict
import tractor
import trio
from ._util import (
log, # sub-sys logger
)
from ._mngr import (
get_service_mngr,
ServiceMngr,
Services,
)
from ._actor_runtime import maybe_open_pikerd
from ._registry import find_service
@ -44,14 +41,15 @@ from ._registry import find_service
@acm
async def maybe_spawn_daemon(
service_name: str,
service_task_target: Callable,
spawn_args: dict[str, Any],
loglevel: str | None = None,
singleton: bool = False,
_locks = defaultdict(trio.Lock),
**pikerd_kwargs,
) -> tractor.Portal:
@ -69,7 +67,7 @@ async def maybe_spawn_daemon(
'''
# serialize access to this section to avoid
# 2 or more tasks racing to create a daemon
lock = _locks[service_name]
lock = Services.locks[service_name]
await lock.acquire()
async with find_service(
@ -104,12 +102,6 @@ async def maybe_spawn_daemon(
# service task for that actor.
started: bool
if pikerd_portal is None:
# await tractor.pause()
if tractor_kwargs.get('debug_mode', False):
from tractor.devx._debug import maybe_init_greenback
await maybe_init_greenback()
started = await service_task_target(
loglevel=loglevel,
**spawn_args,
@ -140,65 +132,7 @@ async def maybe_spawn_daemon(
async with tractor.wait_for_actor(service_name) as portal:
lock.release()
yield portal
# --- ---- ---
# XXX NOTE XXX
# --- ---- ---
# DO NOT PUT A `portal.cancel_actor()` here (as was prior)!
#
# Doing so will cause an "out-of-band" ctxc
# (`tractor.ContextCancelled`) to be raised inside the
# `ServiceMngr.open_context_in_task()`'s call to
# `ctx.wait_for_result()` AND the internal self-ctxc
# "graceful capture" WILL NOT CATCH IT!
#
# This can cause certain types of operations to raise
# that ctxc BEFORE THEY `return`, resulting in
# a "false-negative" ctxc being raised when really
# nothing actually failed, other then our semantic
# "failure" to suppress an expected, graceful,
# self-cancel scenario..
#
# bUt wHy duZ It WorK lIKe dis..
# ------------------------------
# from the perspective of the `tractor.Context` this
# cancel request was conducted "out of band" since
# `Context.cancel()` was never called and thus the
# `._cancel_called: bool` was never set. Despite the
# remote `.canceller` being set to `pikerd` (i.e. the
# same `Actor.uid` of the raising service-mngr task) the
# service-task's ctx itself was never marked as having
# requested cancellation and thus still raises the ctxc
# bc it was unaware of any such request.
#
# How to make grokin these cases easier tho?
# ------------------------------------------
# Because `Portal.cancel_actor()` was called it requests
# "full-`Actor`-runtime-cancellation" of it's peer
# process which IS NOT THE SAME as a single inter-actor
# RPC task cancelling its local context with a remote
# peer `Task` in that same peer process.
#
# ?TODO? It might be better if we do one (or all) of the
# following:
#
# -[ ] at least set a special message for the
# `ContextCancelled` when raised locally by the
# unaware ctx task such that we check for the
# `.canceller` being *our `Actor`* and in the case
# where `Context._cancel_called == False` we specially
# note that this is likely an "out-of-band"
# runtime-cancel request triggered by some call to
# `Portal.cancel_actor()`, possibly even reporting the
# exact LOC of that caller by tracking it inside our
# portal-type?
# -[ ] possibly add another field `ContextCancelled` like
# maybe a,
# `.request_type: Literal['os', 'proc', 'actor',
# 'ctx']` type thing which would allow immediately
# being able to tell what kind of cancellation caused
# the unexpected ctxc?
# -[ ] REMOVE THIS COMMENT, once we've settled on how to
# better augment `tractor` to be more explicit on this!
await portal.cancel_actor()
async def spawn_emsd(
@ -213,25 +147,26 @@ async def spawn_emsd(
"""
log.info('Spawning emsd')
smngr: ServiceMngr = get_service_mngr()
portal = await smngr.an.start_actor(
portal = await Services.actor_n.start_actor(
'emsd',
enable_modules=[
'piker.clearing._ems',
'piker.clearing._client',
],
loglevel=loglevel,
debug_mode=smngr.debug_mode, # set by pikerd flag
debug_mode=Services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs
)
# non-blocking setup of clearing service
from ..clearing._ems import _setup_persistent_emsd
await smngr.start_service_ctx(
name='emsd',
portal=portal,
ctx_fn=_setup_persistent_emsd,
await Services.start_service_task(
'emsd',
portal,
# signature of target root-task endpoint
_setup_persistent_emsd,
loglevel=loglevel,
)
return True

View File

@ -18,36 +18,148 @@
daemon-service management API.
"""
from contextlib import (
asynccontextmanager as acm,
from collections import defaultdict
from typing import (
Callable,
Any,
)
import trio
from trio_typing import TaskStatus
import tractor
from tractor.hilevel import (
ServiceMngr,
# open_service_mngr as _open_service_mngr,
get_service_mngr as get_service_mngr,
from tractor import (
current_actor,
ContextCancelled,
Context,
Portal,
)
# TODO:
# -[ ] factor all the common shit from `.data._sampling`
# and `.brokers._daemon` into here / `ServiceMngr`
# in terms of allocating the `Portal` as part of the
# "service-in-subactor" starting!
# -[ ] move to `tractor.hilevel._service`, import and use here!
# NOTE: purposely leaks the ref to the mod-scope Bo
Services: ServiceMngr|None = None
from ._util import (
log, # sub-sys logger
)
@acm
async def open_service_mngr(
**kwargs,
) -> ServiceMngr:
global Services
async with tractor.hilevel.open_service_mngr(
**kwargs,
) as mngr:
# Services = proxy(mngr)
Services = mngr
yield mngr
Services = None
# TODO: we need remote wrapping and a general soln:
# - factor this into a ``tractor.highlevel`` extension # pack for the
# library.
# - wrap a "remote api" wherein you can get a method proxy
# to the pikerd actor for starting services remotely!
# - prolly rename this to ActorServicesNursery since it spawns
# new actors and supervises them to completion?
class Services:
actor_n: tractor._supervise.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[
str,
tuple[
trio.CancelScope,
Portal,
trio.Event,
]
] = {}
locks = defaultdict(trio.Lock)
@classmethod
async def start_service_task(
self,
name: str,
portal: Portal,
target: Callable,
allow_overruns: bool = False,
**ctx_kwargs,
) -> (trio.CancelScope, Context):
'''
Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` teardown.
This allows for allocating long-running sub-services in our main
daemon and explicitly controlling their lifetimes.
'''
async def open_context_in_task(
task_status: TaskStatus[
tuple[
trio.CancelScope,
trio.Event,
Any,
]
] = trio.TASK_STATUS_IGNORED,
) -> Any:
with trio.CancelScope() as cs:
async with portal.open_context(
target,
allow_overruns=allow_overruns,
**ctx_kwargs,
) as (ctx, first):
# unblock once the remote context has started
complete = trio.Event()
task_status.started((cs, complete, first))
log.info(
f'`pikerd` service {name} started with value {first}'
)
try:
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res: Any = await ctx.result()
# NOTE: blocks indefinitely until cancelled
# either by error from the target context
# function or by being cancelled here by the
# surrounding cancel scope.
return (await portal.result(), ctx_res)
except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.channel.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service {name} was remotely cancelled?\n'
f'remote canceller: {canceller}\n'
f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n'
)
else:
raise
finally:
await portal.cancel_actor()
complete.set()
self.service_tasks.pop(name)
cs, complete, first = await self.service_n.start(open_context_in_task)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (cs, portal, complete)
return cs, first
@classmethod
async def cancel_service(
self,
name: str,
) -> Any:
'''
Cancel the service task and actor for the given ``name``.
'''
log.info(f'Cancelling `pikerd` service {name}')
cs, portal, complete = self.service_tasks[name]
cs.cancel()
await complete.wait()
assert name not in self.service_tasks, \
f'Serice task for {name} not terminated?'

View File

@ -21,13 +21,11 @@ from typing import (
TYPE_CHECKING,
)
# TODO: oof, needs to be changed to `httpx`!
import asks
if TYPE_CHECKING:
import docker
from ._ahab import DockerContainer
from . import ServiceMngr
from ._util import log # sub-sys logger
from ._util import (
@ -129,7 +127,7 @@ def start_elasticsearch(
@acm
async def start_ahab_daemon(
service_mngr: ServiceMngr,
service_mngr: Services,
user_config: dict | None = None,
loglevel: str | None = None,

View File

@ -53,7 +53,7 @@ import pendulum
# import purerpc
from ..data.feed import maybe_open_feed
from . import ServiceMngr
from . import Services
from ._util import (
log, # sub-sys logger
get_console_log,
@ -233,7 +233,7 @@ def start_marketstore(
@acm
async def start_ahab_daemon(
service_mngr: ServiceMngr,
service_mngr: Services,
user_config: dict | None = None,
loglevel: str | None = None,

View File

@ -10,7 +10,7 @@ from piker import (
config,
)
from piker.service import (
get_service_mngr,
Services,
)
from piker.log import get_console_log
@ -129,7 +129,7 @@ async def _open_test_pikerd(
) as service_manager,
):
# this proc/actor is the pikerd
assert service_manager is get_service_mngr()
assert service_manager is Services
async with tractor.wait_for_actor(
'pikerd',

View File

@ -26,7 +26,7 @@ import pytest
import tractor
from uuid import uuid4
from piker.service import ServiceMngr
from piker.service import Services
from piker.log import get_logger
from piker.clearing._messages import (
Order,
@ -158,7 +158,7 @@ def load_and_check_pos(
def test_ems_err_on_bad_broker(
open_test_pikerd: ServiceMngr,
open_test_pikerd: Services,
loglevel: str,
):
async def load_bad_fqme():

View File

@ -15,7 +15,7 @@ import tractor
from piker.service import (
find_service,
ServiceMngr,
Services,
)
from piker.data import (
open_feed,
@ -44,7 +44,7 @@ def test_runtime_boot(
async def main():
port = 6666
daemon_addr = ('127.0.0.1', port)
services: ServiceMngr
services: Services
async with (
open_test_pikerd(