Compare commits

..

21 Commits

Author SHA1 Message Date
Tyler Goodlet 008a3bcc2f Official service-mngr to `tractor.hilevel` move
Such that we maintain that subsys in the actor-runtime repo (with
hopefully an extensive test suite XD).

Port deats,
- rewrite `open_service_mngr()` as a thin wrapper that delegates into
  the new `tractor.hilevel.open_service_mngr()` but with maintenance of
  the `Services` class-singleton for now.
- port `.service._daemon` usage to the new
  `ServiceMngr.start_service_ctx()` a rename from
  `.start_service_task()` which is now likely destined for the soon
  supported `tractor.trionics.TaskMngr` nursery extension.
- ref the new `ServiceMngr.an: ActorNursery` instance var name.

Other,
- always enable the `tractor.pause_from_sync()` support via `greenback`
  whenever `debug_mode` is set at `pikerd` init.
2025-02-11 15:55:19 -05:00
Nelson Torres ce308b8754 Updated tractor method name. 2025-02-11 14:56:48 -05:00
Tyler Goodlet 592798c76f More service-mngr clarity notes
Nothing changing functionally here just adding more `tractor`
operational notes, tips for debug tooling and typing fixes B)

Of particular note is adding further details about the reason we do not
need to call `Context.cancel()` inside the `finally:` block of
`.open_context_in_task()` thanks to `tractor`'s new and improved
inter-actor cancellation semantics Bo
2025-02-11 14:56:48 -05:00
Tyler Goodlet 06bd305430 Drop `.cancel_actor()` from `maybe_spawn_daemon()`
Since `tractor`'s new and improved inter-actor cancellation semantics
are much more pedantic, AND bc we use the `ServiceMngr` for spawning
service actors on-demand, the caller of `maybe_spawn_daemon()` should
NEVER conduct a so called "out of band" `Actor`-runtime cancel request
since this is precisely the job of our `ServiceMngr` XD

Add a super in depth note explaining the underlying issue and adding
a todo list of how we should prolly augment `tractor` to make such cases
easier to grok and fix in the future!
2025-02-11 14:56:48 -05:00
Tyler Goodlet babb28d733 Lel, forgot to add a `SPOT` venue for `binance`.. 2025-02-11 14:56:48 -05:00
Tyler Goodlet 8c9d19e551 Mask no-data pause-point, add perps to sig.
Was orig for debugging an issue with `kucoin` i think but definitely
shouldn't be left in XD

Also add `'perpetual_future'` to `start_backfill()` input literal set.
2025-02-11 14:56:48 -05:00
Tyler Goodlet 911e929758 Prep service mngr for move to `tractor.hilevel`
Given it's a fairly simple yet useful abstraction, it makes sense to
offer this sub-sys alongside the core `tractor` runtime lib.

Without going into extreme detail on the impl changes (it'll come in
the commit that moves to the other repo) here is the high level summary:
------ - ------
- rename `Services` -> `ServiceMngr` and use an factory `@acm`
  to guarantee a single-instance-per-actor using a niche approach for a
  singleton object using a default keyword-arg B)
  - the mod level `open_service_mngr()` and `get_service_mngr()` are the
    new allocation/access API.
- add a `ServiceMngr.start_service()` method which does the work of both
  spawning a new subactor (for the daemon) and uses its portal to start
  the mngr side supervision task.
- open actor/task nurseries inside the `@acm` allocator.

Adjust other dependent subsystems to match:
------ - ------
- use `open_service_mngr()` when first allocated in `open_pikerd()`.
- use `get_service_mngr()` instead of importing the class ref inside
  `.service.maybe_spawn_daemon()`, `.brokers._daemon.spawn_brokerd()`
  and `.data._sampling.spawn_samplerd()` using a `partial` to pack in
  the endpoint ctx kwargs (unpacked inside `.start_service()` XD).
2025-02-11 14:56:48 -05:00
Tyler Goodlet b064ded813 Port binance to `httpx`
Like other backends use the `AsyncClient` for all venue specific
client-sessions but change to allocating them inside `get_client()`
using an `AsyncExitStack` and inserting directly in the
`Client.venue_sesh: dict` table during init.

Supporting impl tweaks:
- remove most of the API client session building logic and instead make
  `Client.__init__()` take in a `venue_sessions: dict` (set it to
  `.venue_sesh`) and `conf: dict`, instead opting to do the http client
  configuration inside `get_client()` since all that code only needs to
  be run once.
 |_load config inside `get_client()` once.
 |_move session token creation into a new util func `init_api_keys()` and
  also call it from `get_client()` factory; toss in an ex. toml section
  config to the doc string.
- define `_venue_urls: dict[str, str]` (content taken from old static
  `.venue_sesh` dict) at module level and feed them as `base_url: str`
  inputs to the client create loop.
- adjust all call sigs in httpx-sesh-using methods, namely just
  `._api()`.
- do a `.exch_info()` call in `get_client()` to cache the symbology
  set.

Unrelated changes for various other outstanding buggers:
- to get futures feeds correctly loading when selected
  from search (like 'XMRUSDT.USDTM.PERP'), expect a `MktPair` input to
  `Client.bars()` such that the exact venue-key can be looked up (via
  a new `.pair2venuekey()` meth) and then passed to `._api()`.
- adjust `.broker.open_trade_dialog()` to failover to paper engine when
  there's no `api_key` key set for the `subconf` venue-key.
2025-02-11 14:56:30 -05:00
Nelson Torres fea0bee1aa Added note to exception when missing field in SpotPair class 2025-02-11 14:56:30 -05:00
Nelson Torres 9e7e02b44f Added new fields to SpotPair class in venues 2025-02-11 14:56:30 -05:00
Tyler Goodlet d65e8cad7e binance: raise `NoData` on null hist arrays
Like we do with other history backends to indicate lack of a data set.
This avoids any raise that will will bring down the backloader task with
some downstream error.

Raise a `ValueError` on no time index for now.
2025-02-11 14:56:30 -05:00
Tyler Goodlet 968d202ebc Woops, `data` can be an empty list XD 2025-02-11 14:56:30 -05:00
Tyler Goodlet 9b7a409dbb Woops, fix missing `api_url` ref in error log 2025-02-11 14:56:30 -05:00
Tyler Goodlet aec652e05c Change type-annots to use `httpx.Response` 2025-02-11 14:56:30 -05:00
Tyler Goodlet ed248225df Port `kucoin` backend to `httpx` 2025-02-11 14:56:30 -05:00
Tyler Goodlet f3620f3d42 Port `kraken` backend to `httpx` 2025-02-11 14:56:30 -05:00
Tyler Goodlet cb92abbc38 ib: add connect status info emit 2025-02-11 14:56:17 -05:00
Tyler Goodlet 70332e375b ib: `.api` mod and log-fmt cleaning
About time we tidy'd a buncha status logging in this backend..
particularly for boot-up where there's lots of client-try-connect poll
looping with account detection from the user config.

`.api.Client` pprint and logging fmt improvements:
- add `Client.__repr__()` which shows the minimally useful set of info
  from the underlying `.ib: IB` as well as a new `.acnts: list[str]`
  of the account aliases defined in the user's `brokers.toml`.
- mk `.bars()` define a comprehensive `query_info: str` with all the
  request deats but only display if there's a problem with the response
  data.
- mk `.get_config()` report both the config file path and the acnt
  aliases (NOT the actual account #s).
- move all `.load_aio_clients()` client poll loop requests do
  `log.runtime()` statuses, only falling through to a `.warning()` when
  the loop fails to connect the client to the spec-ed API-gw addr, and
 |_ don't allow loading accounts for which the user has not defined an
    alias in `brokers.toml::[ib]`; raise a value-error in such cases
    with a message indicating how to mod the config.
 |_ only `log.info()` about acnts if some were loaded..

Other mod logging de-noising:
- better status fmting in `.symbols.open_symbol_search()` with
  `repr(Client)`.
- for `.feed.stream_quotes()` first quote reporting use `.runtime()`.
2025-02-11 14:56:17 -05:00
Tyler Goodlet 4940aabe05 ib: warn about mkt precision cuckups that `Contract`s clearly deliver wrong.. 2025-02-11 14:56:17 -05:00
Tyler Goodlet 4f9998e9fb ib: mask out trade and vlm rates for now 2025-02-11 14:56:17 -05:00
Tyler Goodlet c92a236196 ib: more trade record edge case handling
- timestamps came as `'date'`-keyed from 2022 and before but now are
  `'datetime'`..
- some symbols seem to have no commission field, so handle that..
- when no `'price'` field found return `None` from `norm_trade()`.
- add a warn log on mid-fill commission updates.
2025-02-11 14:56:17 -05:00
2 changed files with 31 additions and 428 deletions

View File

@ -104,6 +104,12 @@ async def maybe_spawn_daemon(
# service task for that actor. # service task for that actor.
started: bool started: bool
if pikerd_portal is None: if pikerd_portal is None:
# await tractor.pause()
if tractor_kwargs.get('debug_mode', False):
from tractor.devx._debug import maybe_init_greenback
await maybe_init_greenback()
started = await service_task_target( started = await service_task_target(
loglevel=loglevel, loglevel=loglevel,
**spawn_args, **spawn_args,
@ -208,7 +214,7 @@ async def spawn_emsd(
log.info('Spawning emsd') log.info('Spawning emsd')
smngr: ServiceMngr = get_service_mngr() smngr: ServiceMngr = get_service_mngr()
portal = await smngr.actor_n.start_actor( portal = await smngr.an.start_actor(
'emsd', 'emsd',
enable_modules=[ enable_modules=[
'piker.clearing._ems', 'piker.clearing._ems',
@ -222,12 +228,10 @@ async def spawn_emsd(
# non-blocking setup of clearing service # non-blocking setup of clearing service
from ..clearing._ems import _setup_persistent_emsd from ..clearing._ems import _setup_persistent_emsd
await smngr.start_service_task( await smngr.start_service_ctx(
'emsd', name='emsd',
portal, portal=portal,
ctx_fn=_setup_persistent_emsd,
# signature of target root-task endpoint
_setup_persistent_emsd,
loglevel=loglevel, loglevel=loglevel,
) )
return True return True

View File

@ -18,425 +18,16 @@
daemon-service management API. daemon-service management API.
""" """
from __future__ import annotations
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
# contextmanager as cm,
)
from collections import defaultdict
from dataclasses import (
dataclass,
field,
)
import functools
import inspect
from typing import (
Callable,
Any,
) )
import msgspec
import tractor import tractor
import trio from tractor.hilevel import (
from trio import TaskStatus ServiceMngr,
from tractor import ( # open_service_mngr as _open_service_mngr,
ActorNursery, get_service_mngr as get_service_mngr,
current_actor,
ContextCancelled,
Context,
Portal,
) )
from ._util import (
log, # sub-sys logger
)
# TODO: implement a singleton deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# TODO: singleton factory API instead of a class API
@acm
async def open_service_mngr(
*,
debug_mode: bool = False,
# impl deat which ensures a single global instance
_singleton: list[ServiceMngr|None] = [None],
**init_kwargs,
) -> ServiceMngr:
'''
Open a multi-subactor-as-service-daemon tree supervisor.
The delivered `ServiceMngr` is a singleton instance for each
actor-process and is allocated on first open and never
de-allocated unless explicitly deleted by al call to
`del_service_mngr()`.
'''
# TODO: factor this an allocation into
# a `._mngr.open_service_mngr()` and put in the
# once-n-only-once setup/`.__aenter__()` part!
# -[ ] how to make this only happen on the `mngr == None` case?
# |_ use `.trionics.maybe_open_context()` (for generic
# async-with-style-only-once of the factory impl, though
# what do we do for the allocation case?
# / `.maybe_open_nursery()` (since for this specific case
# it's simpler?) to activate
async with (
tractor.open_nursery() as an,
trio.open_nursery() as tn,
):
# impl specific obvi..
init_kwargs.update({
'actor_n': an,
'service_n': tn,
})
mngr: ServiceMngr|None
if (mngr := _singleton[0]) is None:
log.info('Allocating a new service mngr!')
mngr = _singleton[0] = ServiceMngr(**init_kwargs)
# TODO: put into `.__aenter__()` section of
# eventual `@singleton_acm` API wrapper.
#
# assign globally for future daemon/task creation
mngr.actor_n = an
mngr.service_n = tn
else:
assert (
mngr.actor_n
and
mngr.service_tn
)
log.info(
'Using extant service mngr!\n\n'
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
)
try:
# NOTE: this is a singleton factory impl specific detail
# which should be supported in the condensed
# `@singleton_acm` API?
mngr.debug_mode = debug_mode
yield mngr
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in mngr.service_tasks:
# await mngr.cancel_service('samplerd')
# await tractor.pause(shield=True)
# ^XXX, if needed mk sure to shield it ;)
tn.cancel_scope.cancel()
def get_service_mngr() -> ServiceMngr:
'''
Try to get the singleton service-mngr for this actor presuming it
has already been allocated using,
.. code:: python
async with open_<@singleton_acm(func)>() as mngr`
... this block kept open ...
If not yet allocated raise a `ServiceError`.
'''
# https://stackoverflow.com/a/12627202
# https://docs.python.org/3/library/inspect.html#inspect.Signature
maybe_mngr: ServiceMngr|None = inspect.signature(
open_service_mngr
).parameters['_singleton'].default[0]
if maybe_mngr is None:
raise RuntimeError(
'Someone must allocate a `ServiceMngr` using\n\n'
'`async with open_service_mngr()` beforehand!!\n'
)
return maybe_mngr
# TODO: we need remote wrapping and a general soln:
# - factor this into a ``tractor.highlevel`` extension # pack for the
# library.
# - wrap a "remote api" wherein you can get a method proxy
# to the pikerd actor for starting services remotely!
# - prolly rename this to ActorServicesNursery since it spawns
# new actors and supervises them to completion?
@dataclass
class ServiceMngr:
# class ServiceMngr(msgspec.Struct):
'''
A multi-subactor-as-service manager.
Spawn, supervise and monitor service/daemon subactors in a SC
process tree.
'''
actor_n: ActorNursery
service_n: trio.Nursery
debug_mode: bool = False # tractor sub-actor debug mode flag
service_tasks: dict[
str,
tuple[
trio.CancelScope,
Context,
Portal,
trio.Event,
]
] = field(default_factory=dict)
# internal per-service task mutexs
_locks = defaultdict(trio.Lock)
async def start_service_task(
self,
name: str,
portal: Portal,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
target: Callable,
allow_overruns: bool = False,
**ctx_kwargs,
) -> (trio.CancelScope, Context, Any):
'''
Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` teardown.
This allows for allocating long-running sub-services in our main
daemon and explicitly controlling their lifetimes.
'''
async def open_context_in_task(
task_status: TaskStatus[
tuple[
trio.CancelScope,
Context,
trio.Event,
Any,
]
] = trio.TASK_STATUS_IGNORED,
) -> tuple[
trio.CancelScope,
Context,
Any, # started value from ctx
]:
# TODO: use the ctx._scope directly here instead?
# -[ ] actually what semantics do we expect for this
# usage!?
with trio.CancelScope() as cs:
try:
async with portal.open_context(
target,
allow_overruns=allow_overruns,
# hide_tb=False,
# ^XXX^ HAWT TIPZ
**ctx_kwargs,
) as (ctx, started):
# unblock once the remote context has started
complete = trio.Event()
task_status.started((
cs,
ctx,
complete,
started,
))
log.info(
f'`pikerd` service {name} started with value {started}'
)
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res: Any = await ctx.wait_for_result(
# hide_tb=False,
)
# NOTE: blocks indefinitely until cancelled
# either by error from the target context
# function or by being cancelled here by the
# surrounding cancel scope.
return (
await portal.wait_for_result(),
ctx_res,
)
except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.chan.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service `{name}` was remotely cancelled by a peer?\n'
# TODO: this would be a good spot to use
# a respawn feature Bo
f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n'
f'cancellee: {portal.chan.uid}\n'
f'canceller: {canceller}\n'
)
else:
raise
finally:
# NOTE: the ctx MUST be cancelled first if we
# don't want the above `ctx.wait_for_result()` to
# raise a self-ctxc.
#
# WHY, well since from the ctx's
# perspective the cancel request will have
# arrived out-out-of-band at the `Actor.cancel()`
# level (since pikerd will have called
# `Portal.cancel_actor()`), and thus
# `Context.cancel_called == False`,
# meaning `ctx._is_self_cancelled() == False`.
#
# HOWEVER, this should happen implicitly WITHOUT
# a manual `ctx.cancel()` call HERE since,
#
# - in the mngr shutdown case the surrounding
# `.service_n.cancel_scope` should be
# `.cancel_called == True` and the
# `Portal.open_context()` internals should take
# care of it.
#
# - in the specific-service cancellation case,
# `.cancel_service()` makes the manual
# `ctx.cancel()` call for us which SHOULD mean
# the ctxc is never raised above (since, again,
# it will be gracefully suppressed by
# `.open_context()` internals) and thus we only
# need to shut down the service actor.
await portal.cancel_actor()
self.service_tasks.pop(name)
complete.set()
(
cs, # internally allocated
sub_ctx, # RPC peer-actor ctx
complete, # termination syncing
started, # proxyed from internal `.open_context()` entry.
) = await self.service_n.start(
open_context_in_task
)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (cs, sub_ctx, portal, complete)
return (
cs,
sub_ctx,
started,
)
async def cancel_service(
self,
name: str,
) -> Any:
'''
Cancel the service task and actor for the given ``name``.
'''
log.info(f'Cancelling `pikerd` service {name}')
cs, sub_ctx, portal, complete = self.service_tasks[name]
# cs.cancel()
await sub_ctx.cancel()
await complete.wait()
if name in self.service_tasks:
raise RuntimeError(
f'Serice task for {name} not terminated?'
)
# raise ServiceError(
# ^TODO? custom err type?
# assert name not in self.service_tasks, \
# f'Serice task for {name} not terminated?'
async def start_service(
self,
daemon_name: str,
ctx_ep: Callable, # kwargs must `partial`-ed in!
debug_mode: bool = False,
**tractor_actor_kwargs,
) -> Context:
'''
Start a "service" task in a new sub-actor (daemon) and manage it's lifetime
indefinitely.
Services can be cancelled/shutdown using `.cancel_service()`.
'''
entry: tuple|None = self.service_tasks.get(daemon_name)
if entry:
(cs, sub_ctx, portal, complete) = entry
return sub_ctx
if daemon_name not in self.service_tasks:
portal = await self.actor_n.start_actor(
daemon_name,
debug_mode=( # maybe set globally during allocate
debug_mode
or
self.debug_mode
),
**tractor_actor_kwargs,
)
ctx_kwargs: dict[str, Any] = {}
if isinstance(ctx_ep, functools.partial):
ctx_kwargs: dict[str, Any] = ctx_ep.keywords
ctx_ep: Callable = ctx_ep.func
(cs, sub_ctx, started) = await self.start_service_task(
daemon_name,
portal,
ctx_ep,
**ctx_kwargs,
)
return sub_ctx
# TODO: # TODO:
# -[ ] factor all the common shit from `.data._sampling` # -[ ] factor all the common shit from `.data._sampling`
# and `.brokers._daemon` into here / `ServiceMngr` # and `.brokers._daemon` into here / `ServiceMngr`
@ -444,11 +35,19 @@ class ServiceMngr:
# "service-in-subactor" starting! # "service-in-subactor" starting!
# -[ ] move to `tractor.hilevel._service`, import and use here! # -[ ] move to `tractor.hilevel._service`, import and use here!
# NOTE: purposely leaks the ref to the mod-scope Bo # NOTE: purposely leaks the ref to the mod-scope Bo
# import tractor
# from tractor.hilevel import ( Services: ServiceMngr|None = None
# open_service_mngr,
# ServiceMngr, @acm
# ) async def open_service_mngr(
# mngr: ServiceMngr|None = None **kwargs,
# with tractor.hilevel.open_service_mngr() as mngr: ) -> ServiceMngr:
# Services = proxy(mngr)
global Services
async with tractor.hilevel.open_service_mngr(
**kwargs,
) as mngr:
# Services = proxy(mngr)
Services = mngr
yield mngr
Services = None