Compare commits

..

29 Commits

Author SHA1 Message Date
Tyler Goodlet 7eacab0ac5 Allow ledger passes to ignore (symcache) unknown fqmes
For example in the paper-eng, if you have a backend that doesn't fully
support a symcache (yet) it's handy to be able to ignore processing
other paper-eng txns when all you care about at the moment is the
simulated symbol.

NOTE, that currently this will still result in a key-error when you load
more then one mkt with the paper engine (for which the backend does not
have the symcache implemented) since no fqme ad-hoc query was made for
the 2nd symbol (and i'm not sure we should support that kinda hackery
over just encouraging the sym-cache being added?). Def needs a little
more thought depending on how many backends are never going to be able
to (easily) support caching..
2025-02-11 18:27:41 -05:00
Tyler Goodlet 16e8acf3c4 .clearing._ems: Don't require `first_quote['last']`
Instead just check for the field (which i'm not huge on the key-name for
anyway) and if not found get the "last price" from the real-time shm
buffer's latest 'close' sample.

Unrelatedly, use a `subs.copy()` in the `Router.client_broadcast()` loop
such that if a `client_stream` is popped on connection failure, we don't
RTE for the "size changed on iteration".
2025-02-11 18:27:41 -05:00
Tyler Goodlet 78d76f64cd `kucoin`: repair live quotes streaming..
This must have broke at some point during the new `MktPair` and thus
`.fqme: str` updates; mas-o-menos the symbol key in the quote-msg-`dict`
was NOT set to the `MktPair.bs_fqme: str` value and thus wasn't being
processed by the downstream sampling and feed subsys.

So fix that as well as a few other refinements,
- set the `topic: mkt.bs_fqme` in quote msgs obvi.
- drop the "wait for first clearing vlm" quote poll loop; going to fix
  the sampler to handle a `first_quote` without a `'last'` key.
- add some typing around calls to `get_mkt_info()`.
- rename `stream_messages()` -> `iter_normed_quotes()`.
2025-02-11 18:27:41 -05:00
Nelson Torres 77477bf6a2 Deleted settlePlan field from binance FutesPair. 2025-02-11 18:27:41 -05:00
Nelson Torres d4067b31fb Added missing fields for kucoin.
feeCategory, makerFeeCoefficient, takerFeeCoefficient and st.
2025-02-11 18:27:41 -05:00
Tyler Goodlet 7928d88b5f data._web_bs: try to raise jsonrpc errors in parent task 2025-02-11 18:27:41 -05:00
Tyler Goodlet 40602d40ca Official service-mngr to `tractor.hilevel` move
Such that we maintain that subsys in the actor-runtime repo (with
hopefully an extensive test suite XD).

Port deats,
- rewrite `open_service_mngr()` as a thin wrapper that delegates into
  the new `tractor.hilevel.open_service_mngr()` but with maintenance of
  the `Services` class-singleton for now.
- port `.service._daemon` usage to the new
  `ServiceMngr.start_service_ctx()` a rename from
  `.start_service_task()` which is now likely destined for the soon
  supported `tractor.trionics.TaskMngr` nursery extension.
- ref the new `ServiceMngr.an: ActorNursery` instance var name.

Other,
- always enable the `tractor.pause_from_sync()` support via `greenback`
  whenever `debug_mode` is set at `pikerd` init.
2025-02-11 18:01:48 -05:00
Nelson Torres f148114adb Updated tractor method name. 2025-02-11 18:01:48 -05:00
Tyler Goodlet 9498e5f102 More service-mngr clarity notes
Nothing changing functionally here just adding more `tractor`
operational notes, tips for debug tooling and typing fixes B)

Of particular note is adding further details about the reason we do not
need to call `Context.cancel()` inside the `finally:` block of
`.open_context_in_task()` thanks to `tractor`'s new and improved
inter-actor cancellation semantics Bo
2025-02-11 18:01:48 -05:00
Tyler Goodlet 53eb6b8f91 Drop `.cancel_actor()` from `maybe_spawn_daemon()`
Since `tractor`'s new and improved inter-actor cancellation semantics
are much more pedantic, AND bc we use the `ServiceMngr` for spawning
service actors on-demand, the caller of `maybe_spawn_daemon()` should
NEVER conduct a so called "out of band" `Actor`-runtime cancel request
since this is precisely the job of our `ServiceMngr` XD

Add a super in depth note explaining the underlying issue and adding
a todo list of how we should prolly augment `tractor` to make such cases
easier to grok and fix in the future!
2025-02-11 18:01:48 -05:00
Tyler Goodlet c7d11a68c1 Lel, forgot to add a `SPOT` venue for `binance`.. 2025-02-11 18:01:48 -05:00
Tyler Goodlet c36c38f432 Mask no-data pause-point, add perps to sig.
Was orig for debugging an issue with `kucoin` i think but definitely
shouldn't be left in XD

Also add `'perpetual_future'` to `start_backfill()` input literal set.
2025-02-11 18:01:48 -05:00
Tyler Goodlet 41b0584588 Prep service mngr for move to `tractor.hilevel`
Given it's a fairly simple yet useful abstraction, it makes sense to
offer this sub-sys alongside the core `tractor` runtime lib.

Without going into extreme detail on the impl changes (it'll come in
the commit that moves to the other repo) here is the high level summary:
------ - ------
- rename `Services` -> `ServiceMngr` and use an factory `@acm`
  to guarantee a single-instance-per-actor using a niche approach for a
  singleton object using a default keyword-arg B)
  - the mod level `open_service_mngr()` and `get_service_mngr()` are the
    new allocation/access API.
- add a `ServiceMngr.start_service()` method which does the work of both
  spawning a new subactor (for the daemon) and uses its portal to start
  the mngr side supervision task.
- open actor/task nurseries inside the `@acm` allocator.

Adjust other dependent subsystems to match:
------ - ------
- use `open_service_mngr()` when first allocated in `open_pikerd()`.
- use `get_service_mngr()` instead of importing the class ref inside
  `.service.maybe_spawn_daemon()`, `.brokers._daemon.spawn_brokerd()`
  and `.data._sampling.spawn_samplerd()` using a `partial` to pack in
  the endpoint ctx kwargs (unpacked inside `.start_service()` XD).
2025-02-11 18:01:48 -05:00
Tyler Goodlet 844544ed8e Port binance to `httpx`
Like other backends use the `AsyncClient` for all venue specific
client-sessions but change to allocating them inside `get_client()`
using an `AsyncExitStack` and inserting directly in the
`Client.venue_sesh: dict` table during init.

Supporting impl tweaks:
- remove most of the API client session building logic and instead make
  `Client.__init__()` take in a `venue_sessions: dict` (set it to
  `.venue_sesh`) and `conf: dict`, instead opting to do the http client
  configuration inside `get_client()` since all that code only needs to
  be run once.
 |_load config inside `get_client()` once.
 |_move session token creation into a new util func `init_api_keys()` and
  also call it from `get_client()` factory; toss in an ex. toml section
  config to the doc string.
- define `_venue_urls: dict[str, str]` (content taken from old static
  `.venue_sesh` dict) at module level and feed them as `base_url: str`
  inputs to the client create loop.
- adjust all call sigs in httpx-sesh-using methods, namely just
  `._api()`.
- do a `.exch_info()` call in `get_client()` to cache the symbology
  set.

Unrelated changes for various other outstanding buggers:
- to get futures feeds correctly loading when selected
  from search (like 'XMRUSDT.USDTM.PERP'), expect a `MktPair` input to
  `Client.bars()` such that the exact venue-key can be looked up (via
  a new `.pair2venuekey()` meth) and then passed to `._api()`.
- adjust `.broker.open_trade_dialog()` to failover to paper engine when
  there's no `api_key` key set for the `subconf` venue-key.
2025-02-11 16:27:28 -05:00
Nelson Torres f479252d26 Added note to exception when missing field in SpotPair class 2025-02-11 16:27:28 -05:00
Nelson Torres 033ef2e35e Added new fields to SpotPair class in venues 2025-02-11 16:27:28 -05:00
Tyler Goodlet 2cdece244c binance: raise `NoData` on null hist arrays
Like we do with other history backends to indicate lack of a data set.
This avoids any raise that will will bring down the backloader task with
some downstream error.

Raise a `ValueError` on no time index for now.
2025-02-11 16:27:28 -05:00
Tyler Goodlet 018694bbdb Woops, `data` can be an empty list XD 2025-02-11 16:27:28 -05:00
Tyler Goodlet 128a2d507f Woops, fix missing `api_url` ref in error log 2025-02-11 16:27:28 -05:00
Tyler Goodlet 430650a6a7 Change type-annots to use `httpx.Response` 2025-02-11 16:27:28 -05:00
Tyler Goodlet 1da3cf5698 Port `kucoin` backend to `httpx` 2025-02-11 16:27:28 -05:00
Tyler Goodlet a348603fc4 Port `kraken` backend to `httpx` 2025-02-11 16:27:28 -05:00
goodboy 86047824d8 Merge pull request '`.brokers.ib` random fixes-n-improvements from various other dev branches..' (#27) from ib_refinements into gitea_feats
Merged-in: #27
2025-02-11 21:26:20 +00:00
Tyler Goodlet cb92abbc38 ib: add connect status info emit 2025-02-11 14:56:17 -05:00
Tyler Goodlet 70332e375b ib: `.api` mod and log-fmt cleaning
About time we tidy'd a buncha status logging in this backend..
particularly for boot-up where there's lots of client-try-connect poll
looping with account detection from the user config.

`.api.Client` pprint and logging fmt improvements:
- add `Client.__repr__()` which shows the minimally useful set of info
  from the underlying `.ib: IB` as well as a new `.acnts: list[str]`
  of the account aliases defined in the user's `brokers.toml`.
- mk `.bars()` define a comprehensive `query_info: str` with all the
  request deats but only display if there's a problem with the response
  data.
- mk `.get_config()` report both the config file path and the acnt
  aliases (NOT the actual account #s).
- move all `.load_aio_clients()` client poll loop requests do
  `log.runtime()` statuses, only falling through to a `.warning()` when
  the loop fails to connect the client to the spec-ed API-gw addr, and
 |_ don't allow loading accounts for which the user has not defined an
    alias in `brokers.toml::[ib]`; raise a value-error in such cases
    with a message indicating how to mod the config.
 |_ only `log.info()` about acnts if some were loaded..

Other mod logging de-noising:
- better status fmting in `.symbols.open_symbol_search()` with
  `repr(Client)`.
- for `.feed.stream_quotes()` first quote reporting use `.runtime()`.
2025-02-11 14:56:17 -05:00
Tyler Goodlet 4940aabe05 ib: warn about mkt precision cuckups that `Contract`s clearly deliver wrong.. 2025-02-11 14:56:17 -05:00
Tyler Goodlet 4f9998e9fb ib: mask out trade and vlm rates for now 2025-02-11 14:56:17 -05:00
Tyler Goodlet c92a236196 ib: more trade record edge case handling
- timestamps came as `'date'`-keyed from 2022 and before but now are
  `'datetime'`..
- some symbols seem to have no commission field, so handle that..
- when no `'price'` field found return `None` from `norm_trade()`.
- add a warn log on mid-fill commission updates.
2025-02-11 14:56:17 -05:00
goodboy e4cd1f85f6 Merge pull request 'pyqt6' (#3) from pyqt6 into gitea_feats
Reviewed-on: #3 (well by fomo anyway..)
2025-02-11 17:25:03 +00:00
6 changed files with 155 additions and 2103 deletions

View File

@ -1,134 +0,0 @@
with (import <nixpkgs> {});
let
glibStorePath = lib.getLib glib;
zlibStorePath = lib.getLib zlib;
zstdStorePath = lib.getLib zstd;
dbusStorePath = lib.getLib dbus;
libGLStorePath = lib.getLib libGL;
freetypeStorePath = lib.getLib freetype;
qt6baseStorePath = lib.getLib qt6.qtbase;
fontconfigStorePath = lib.getLib fontconfig;
libxkbcommonStorePath = lib.getLib libxkbcommon;
xcbutilcursorStorePath = lib.getLib xcb-util-cursor;
qtpyStorePath = lib.getLib python312Packages.qtpy;
pyqt6StorePath = lib.getLib python312Packages.pyqt6;
pyqt6SipStorePath = lib.getLib python312Packages.pyqt6-sip;
rapidfuzzStorePath = lib.getLib python312Packages.rapidfuzz;
qdarkstyleStorePath = lib.getLib python312Packages.qdarkstyle;
xorgLibX11StorePath = lib.getLib xorg.libX11;
xorgLibxcbStorePath = lib.getLib xorg.libxcb;
xorgxcbutilwmStorePath = lib.getLib xorg.xcbutilwm;
xorgxcbutilimageStorePath = lib.getLib xorg.xcbutilimage;
xorgxcbutilerrorsStorePath = lib.getLib xorg.xcbutilerrors;
xorgxcbutilkeysymsStorePath = lib.getLib xorg.xcbutilkeysyms;
xorgxcbutilrenderutilStorePath = lib.getLib xorg.xcbutilrenderutil;
in
stdenv.mkDerivation {
name = "piker-qt6-uv";
buildInputs = [
# System requirements.
glib
zlib
dbus
zstd
libGL
freetype
qt6.qtbase
libgcc.lib
fontconfig
libxkbcommon
# Xorg requirements
xcb-util-cursor
xorg.libxcb
xorg.libX11
xorg.xcbutilwm
xorg.xcbutilimage
xorg.xcbutilerrors
xorg.xcbutilkeysyms
xorg.xcbutilrenderutil
# Python requirements.
python312Full
python312Packages.uv
python312Packages.qdarkstyle
python312Packages.rapidfuzz
python312Packages.pyqt6
python312Packages.qtpy
];
src = null;
shellHook = ''
set -e
# Set the Qt plugin path
# export QT_DEBUG_PLUGINS=1
QTBASE_PATH="${qt6baseStorePath}/lib"
QT_PLUGIN_PATH="$QTBASE_PATH/qt-6/plugins"
QT_QPA_PLATFORM_PLUGIN_PATH="$QT_PLUGIN_PATH/platforms"
LIB_GCC_PATH="${libgcc.lib}/lib"
GLIB_PATH="${glibStorePath}/lib"
ZSTD_PATH="${zstdStorePath}/lib"
ZLIB_PATH="${zlibStorePath}/lib"
DBUS_PATH="${dbusStorePath}/lib"
LIBGL_PATH="${libGLStorePath}/lib"
FREETYPE_PATH="${freetypeStorePath}/lib"
FONTCONFIG_PATH="${fontconfigStorePath}/lib"
LIB_XKB_COMMON_PATH="${libxkbcommonStorePath}/lib"
XCB_UTIL_CURSOR_PATH="${xcbutilcursorStorePath}/lib"
XORG_LIB_X11_PATH="${xorgLibX11StorePath}/lib"
XORG_LIB_XCB_PATH="${xorgLibxcbStorePath}/lib"
XORG_XCB_UTIL_IMAGE_PATH="${xorgxcbutilimageStorePath}/lib"
XORG_XCB_UTIL_WM_PATH="${xorgxcbutilwmStorePath}/lib"
XORG_XCB_UTIL_RENDER_UTIL_PATH="${xorgxcbutilrenderutilStorePath}/lib"
XORG_XCB_UTIL_KEYSYMS_PATH="${xorgxcbutilkeysymsStorePath}/lib"
XORG_XCB_UTIL_ERRORS_PATH="${xorgxcbutilerrorsStorePath}/lib"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$QTBASE_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$QT_PLUGIN_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$QT_QPA_PLATFORM_PLUGIN_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$LIB_GCC_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$DBUS_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$GLIB_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$ZLIB_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$ZSTD_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$LIBGL_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$FONTCONFIG_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$FREETYPE_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$LIB_XKB_COMMON_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XCB_UTIL_CURSOR_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_LIB_X11_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_LIB_XCB_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_XCB_UTIL_IMAGE_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_XCB_UTIL_WM_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_XCB_UTIL_RENDER_UTIL_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_XCB_UTIL_KEYSYMS_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_XCB_UTIL_ERRORS_PATH"
export LD_LIBRARY_PATH
RPDFUZZ_PATH="${rapidfuzzStorePath}/lib/python3.12/site-packages"
QDRKSTYLE_PATH="${qdarkstyleStorePath}/lib/python3.12/site-packages"
QTPY_PATH="${qtpyStorePath}/lib/python3.12/site-packages"
PYQT6_PATH="${pyqt6StorePath}/lib/python3.12/site-packages"
PYQT6_SIP_PATH="${pyqt6SipStorePath}/lib/python3.12/site-packages"
PATCH="$PATCH:$RPDFUZZ_PATH"
PATCH="$PATCH:$QDRKSTYLE_PATH"
PATCH="$PATCH:$QTPY_PATH"
PATCH="$PATCH:$PYQT6_PATH"
PATCH="$PATCH:$PYQT6_SIP_PATH"
export PATCH
# Install deps
uv lock
'';
}

View File

@ -1250,6 +1250,12 @@ async def load_aio_clients(
for i in range(connect_retries):
try:
log.info(
'Trying `ib_async` connect\n'
f'{host}: {port}\n'
f'clientId: {client_id}\n'
f'timeout: {connect_timeout}\n'
)
await ib.connectAsync(
host,
port,
@ -1367,7 +1373,9 @@ async def load_clients_for_trio(
a ``tractor.to_asyncio.open_channel_from()``.
'''
async with load_aio_clients() as accts2clients:
async with load_aio_clients(
disconnect_on_exit=False,
) as accts2clients:
to_trio.send_nowait(accts2clients)

View File

@ -104,6 +104,12 @@ async def maybe_spawn_daemon(
# service task for that actor.
started: bool
if pikerd_portal is None:
# await tractor.pause()
if tractor_kwargs.get('debug_mode', False):
from tractor.devx._debug import maybe_init_greenback
await maybe_init_greenback()
started = await service_task_target(
loglevel=loglevel,
**spawn_args,
@ -208,7 +214,7 @@ async def spawn_emsd(
log.info('Spawning emsd')
smngr: ServiceMngr = get_service_mngr()
portal = await smngr.actor_n.start_actor(
portal = await smngr.an.start_actor(
'emsd',
enable_modules=[
'piker.clearing._ems',
@ -222,12 +228,10 @@ async def spawn_emsd(
# non-blocking setup of clearing service
from ..clearing._ems import _setup_persistent_emsd
await smngr.start_service_task(
'emsd',
portal,
# signature of target root-task endpoint
_setup_persistent_emsd,
await smngr.start_service_ctx(
name='emsd',
portal=portal,
ctx_fn=_setup_persistent_emsd,
loglevel=loglevel,
)
return True

View File

@ -18,384 +18,16 @@
daemon-service management API.
"""
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
# contextmanager as cm,
)
from collections import defaultdict
from dataclasses import (
dataclass,
field,
)
import functools
import inspect
from typing import (
Callable,
Any,
)
import msgspec
import tractor
import trio
from trio import TaskStatus
from tractor import (
ActorNursery,
current_actor,
ContextCancelled,
Context,
Portal,
from tractor.hilevel import (
ServiceMngr,
# open_service_mngr as _open_service_mngr,
get_service_mngr as get_service_mngr,
)
from ._util import (
log, # sub-sys logger
)
# TODO: implement a singleton deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# TODO: singleton factory API instead of a class API
@acm
async def open_service_mngr(
*,
debug_mode: bool = False,
# impl deat which ensures a single global instance
_singleton: list[ServiceMngr|None] = [None],
**init_kwargs,
) -> ServiceMngr:
'''
Open a multi-subactor-as-service-daemon tree supervisor.
The delivered `ServiceMngr` is a singleton instance for each
actor-process and is allocated on first open and never
de-allocated unless explicitly deleted by al call to
`del_service_mngr()`.
'''
# TODO: factor this an allocation into
# a `._mngr.open_service_mngr()` and put in the
# once-n-only-once setup/`.__aenter__()` part!
# -[ ] how to make this only happen on the `mngr == None` case?
# |_ use `.trionics.maybe_open_context()` (for generic
# async-with-style-only-once of the factory impl, though
# what do we do for the allocation case?
# / `.maybe_open_nursery()` (since for this specific case
# it's simpler?) to activate
async with (
tractor.open_nursery() as an,
trio.open_nursery() as tn,
):
# impl specific obvi..
init_kwargs.update({
'actor_n': an,
'service_n': tn,
})
mngr: ServiceMngr|None
if (mngr := _singleton[0]) is None:
log.info('Allocating a new service mngr!')
mngr = _singleton[0] = ServiceMngr(**init_kwargs)
# TODO: put into `.__aenter__()` section of
# eventual `@singleton_acm` API wrapper.
#
# assign globally for future daemon/task creation
mngr.actor_n = an
mngr.service_n = tn
else:
assert (
mngr.actor_n
and
mngr.service_tn
)
log.info(
'Using extant service mngr!\n\n'
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
)
try:
# NOTE: this is a singleton factory impl specific detail
# which should be supported in the condensed
# `@singleton_acm` API?
mngr.debug_mode = debug_mode
yield mngr
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in mngr.service_tasks:
# await mngr.cancel_service('samplerd')
tn.cancel_scope.cancel()
def get_service_mngr() -> ServiceMngr:
'''
Try to get the singleton service-mngr for this actor presuming it
has already been allocated using,
.. code:: python
async with open_<@singleton_acm(func)>() as mngr`
... this block kept open ...
If not yet allocated raise a `ServiceError`.
'''
# https://stackoverflow.com/a/12627202
# https://docs.python.org/3/library/inspect.html#inspect.Signature
maybe_mngr: ServiceMngr|None = inspect.signature(
open_service_mngr
).parameters['_singleton'].default[0]
if maybe_mngr is None:
raise RuntimeError(
'Someone must allocate a `ServiceMngr` using\n\n'
'`async with open_service_mngr()` beforehand!!\n'
)
return maybe_mngr
# TODO: we need remote wrapping and a general soln:
# - factor this into a ``tractor.highlevel`` extension # pack for the
# library.
# - wrap a "remote api" wherein you can get a method proxy
# to the pikerd actor for starting services remotely!
# - prolly rename this to ActorServicesNursery since it spawns
# new actors and supervises them to completion?
@dataclass
class ServiceMngr:
# class ServiceMngr(msgspec.Struct):
'''
A multi-subactor-as-service manager.
Spawn, supervise and monitor service/daemon subactors in a SC
process tree.
'''
actor_n: ActorNursery
service_n: trio.Nursery
debug_mode: bool = False # tractor sub-actor debug mode flag
service_tasks: dict[
str,
tuple[
trio.CancelScope,
Context,
Portal,
trio.Event,
]
] = field(default_factory=dict)
# internal per-service task mutexs
_locks = defaultdict(trio.Lock)
async def start_service_task(
self,
name: str,
portal: Portal,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
target: Callable,
allow_overruns: bool = False,
**ctx_kwargs,
) -> (trio.CancelScope, Context, Any):
'''
Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` teardown.
This allows for allocating long-running sub-services in our main
daemon and explicitly controlling their lifetimes.
'''
async def open_context_in_task(
task_status: TaskStatus[
tuple[
trio.CancelScope,
Context,
trio.Event,
Any,
]
] = trio.TASK_STATUS_IGNORED,
) -> Any:
# TODO: use the ctx._scope directly here instead?
# -[ ] actually what semantics do we expect for this
# usage!?
with trio.CancelScope() as cs:
try:
async with portal.open_context(
target,
allow_overruns=allow_overruns,
**ctx_kwargs,
) as (ctx, started):
# unblock once the remote context has started
complete = trio.Event()
task_status.started((
cs,
ctx,
complete,
started,
))
log.info(
f'`pikerd` service {name} started with value {started}'
)
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res: Any = await ctx.wait_for_result()
# NOTE: blocks indefinitely until cancelled
# either by error from the target context
# function or by being cancelled here by the
# surrounding cancel scope.
return (
await portal.wait_for_result(),
ctx_res,
)
except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.chan.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service `{name}` was remotely cancelled by a peer?\n'
# TODO: this would be a good spot to use
# a respawn feature Bo
f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n'
f'cancellee: {portal.chan.uid}\n'
f'canceller: {canceller}\n'
)
else:
raise
finally:
# NOTE: the ctx MUST be cancelled first if we
# don't want the above `ctx.wait_for_result()` to
# raise a self-ctxc. WHY, well since from the ctx's
# perspective the cancel request will have
# arrived out-out-of-band at the `Actor.cancel()`
# level, thus `Context.cancel_called == False`,
# meaning `ctx._is_self_cancelled() == False`.
# with trio.CancelScope(shield=True):
# await ctx.cancel()
await portal.cancel_actor()
complete.set()
self.service_tasks.pop(name)
cs, sub_ctx, complete, started = await self.service_n.start(
open_context_in_task
)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (cs, sub_ctx, portal, complete)
return cs, sub_ctx, started
async def cancel_service(
self,
name: str,
) -> Any:
'''
Cancel the service task and actor for the given ``name``.
'''
log.info(f'Cancelling `pikerd` service {name}')
cs, sub_ctx, portal, complete = self.service_tasks[name]
# cs.cancel()
await sub_ctx.cancel()
await complete.wait()
if name in self.service_tasks:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Serice task for {name} not terminated?'
)
# assert name not in self.service_tasks, \
# f'Serice task for {name} not terminated?'
async def start_service(
self,
daemon_name: str,
ctx_ep: Callable, # kwargs must `partial`-ed in!
debug_mode: bool = False,
**tractor_actor_kwargs,
) -> Context:
'''
Start a "service" task in a new sub-actor (daemon) and manage it's lifetime
indefinitely.
Services can be cancelled/shutdown using `.cancel_service()`.
'''
entry: tuple|None = self.service_tasks.get(daemon_name)
if entry:
(cs, sub_ctx, portal, complete) = entry
return sub_ctx
if daemon_name not in self.service_tasks:
portal = await self.actor_n.start_actor(
daemon_name,
debug_mode=( # maybe set globally during allocate
debug_mode
or
self.debug_mode
),
**tractor_actor_kwargs,
)
ctx_kwargs: dict[str, Any] = {}
if isinstance(ctx_ep, functools.partial):
ctx_kwargs: dict[str, Any] = ctx_ep.keywords
ctx_ep: Callable = ctx_ep.func
(cs, sub_ctx, started) = await self.start_service_task(
daemon_name,
portal,
ctx_ep,
**ctx_kwargs,
)
return sub_ctx
# TODO:
# -[ ] factor all the common shit from `.data._sampling`
# and `.brokers._daemon` into here / `ServiceMngr`
@ -403,11 +35,19 @@ class ServiceMngr:
# "service-in-subactor" starting!
# -[ ] move to `tractor.hilevel._service`, import and use here!
# NOTE: purposely leaks the ref to the mod-scope Bo
# import tractor
# from tractor.hilevel import (
# open_service_mngr,
# ServiceMngr,
# )
# mngr: ServiceMngr|None = None
# with tractor.hilevel.open_service_mngr() as mngr:
# Services = proxy(mngr)
Services: ServiceMngr|None = None
@acm
async def open_service_mngr(
**kwargs,
) -> ServiceMngr:
global Services
async with tractor.hilevel.open_service_mngr(
**kwargs,
) as mngr:
# Services = proxy(mngr)
Services = mngr
yield mngr
Services = None

View File

@ -15,8 +15,8 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
# ------ - ------
@ -34,114 +34,121 @@ ignore = []
# ------ - ------
[project]
[tool.poetry]
name = "piker"
version = "0.1.0a0dev0"
version = "0.1.0.alpha0.dev0"
description = "trading gear for hackers"
authors = [{ name = "Tyler Goodlet", email = "goodboy_foss@protonmail.com" }]
requires-python = ">=3.12, <3.13"
license = "AGPL-3.0-or-later"
authors = ["Tyler Goodlet <goodboy_foss@protonmail.com>"]
license = "AGPLv3"
readme = "README.rst"
keywords = [
"async",
"trading",
"finance",
"quant",
"charting",
# ------ - ------
[tool.poetry.dependencies]
async-generator = "^1.10"
attrs = "^23.1.0"
bidict = "^0.22.1"
colorama = "^0.4.6"
colorlog = "^6.7.0"
cython = "^3.0.0"
greenback = "^1.1.1"
ib-insync = "^0.9.86"
msgspec = "^0.18.0"
numba = "^0.59.0"
numpy = "^1.25"
polars = "^0.18.13"
pygments = "^2.16.1"
python = ">=3.11, <3.13"
rich = "^13.5.2"
# setuptools = "^68.0.0"
tomli = "^2.0.1"
tomli-w = "^1.0.0"
trio-util = "^0.7.0"
trio-websocket = "^0.10.3"
typer = "^0.9.0"
rapidfuzz = "^3.5.2"
pdbp = "^1.5.0"
trio = "^0.24"
pendulum = "^3.0.0"
httpx = "^0.27.0"
[tool.poetry.dependencies.tractor]
develop = true
git = 'https://github.com/goodboy/tractor.git'
branch = 'asyncio_debugger_support'
# path = "../tractor"
[tool.poetry.dependencies.asyncvnc]
git = 'https://github.com/pikers/asyncvnc.git'
branch = 'main'
[tool.poetry.dependencies.tomlkit]
develop = true
git = 'https://github.com/pikers/tomlkit.git'
branch = 'piker_pin'
# path = "../tomlkit/"
[tool.poetry.group.uis]
optional = true
[tool.poetry.group.uis.dependencies]
# https://python-poetry.org/docs/managing-dependencies/#dependency-groups
# TODO: make sure the levenshtein shit compiles on nix..
# rapidfuzz = {extras = ["speedup"], version = "^0.18.0"}
rapidfuzz = "^3.2.0"
qdarkstyle = ">=3.0.2"
pyqtgraph = { git = 'https://github.com/pikers/pyqtgraph.git' }
# ------ - ------
pyqt6 = "^6.7.0"
[tool.poetry.group.dev]
optional = true
[tool.poetry.group.dev.dependencies]
# testing / CI
pytest = "^6.0.0"
elasticsearch = "^8.9.0"
xonsh = "^0.14.2"
prompt-toolkit = "3.0.40"
# console ehancements and eventually remote debugging
# extras/helpers.
# TODO: add a toolset that makes debugging a `pikerd` service
# (tree) easy to hack on directly using more or less the local env:
# - xonsh + xxh
# - rsyscall + pdbp
# - actor runtime control console like BEAM/OTP
# ------ - ------
# TODO: add an `--only daemon` group for running non-ui / pikerd
# service tree in distributed mode B)
# https://python-poetry.org/docs/managing-dependencies/#installing-group-dependencies
# [tool.poetry.group.daemon.dependencies]
[tool.poetry.scripts]
piker = 'piker.cli:cli'
pikerd = 'piker.cli:pikerd'
ledger = 'piker.accounting.cli:ledger'
[project]
keywords=[
"async",
"trading",
"finance",
"quant",
"charting",
]
classifiers = [
"Development Status :: 3 - Alpha",
"License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)",
"Operating System :: POSIX :: Linux",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Intended Audience :: Financial and Insurance Industry",
"Intended Audience :: Science/Research",
"Intended Audience :: Developers",
"Intended Audience :: Education",
classifiers=[
'Development Status :: 3 - Alpha',
"License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)",
'Operating System :: POSIX :: Linux',
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
'Intended Audience :: Financial and Insurance Industry',
'Intended Audience :: Science/Research',
'Intended Audience :: Developers',
'Intended Audience :: Education',
]
dependencies = [
"async-generator >=1.10, <2.0.0",
"attrs >=23.1.0, <24.0.0",
"bidict >=0.22.1, <0.23.0",
"colorama >=0.4.6, <0.5.0",
"colorlog >=6.7.0, <7.0.0",
"ib-insync >=0.9.86, <0.10.0",
"numba >=0.59.0, <0.60.0",
"numpy >=1.25, <2.0",
"polars >=0.18.13, <0.19.0",
"pygments >=2.16.1, <3.0.0",
"rich >=13.5.2, <14.0.0",
"tomli >=2.0.1, <3.0.0",
"tomli-w >=1.0.0, <2.0.0",
"trio-util >=0.7.0, <0.8.0",
"trio-websocket >=0.10.3, <0.11.0",
"typer >=0.9.0, <1.0.0",
"rapidfuzz >=3.5.2, <4.0.0",
"pdbp >=1.5.0, <2.0.0",
"trio >=0.24, <0.25",
"pendulum >=3.0.0, <4.0.0",
"httpx >=0.27.0, <0.28.0",
"cryptofeed >=2.4.0, <3.0.0",
"pyarrow >=17.0.0, <18.0.0",
"websockets ==12.0",
"msgspec",
"tractor",
"asyncvnc",
"tomlkit",
]
[project.optional-dependencies]
uis = [
# https://docs.astral.sh/uv/concepts/projects/dependencies/#optional-dependencies
# TODO: make sure the levenshtein shit compiles on nix..
# rapidfuzz = {extras = ["speedup"], version = "^0.18.0"}
"rapidfuzz >=3.2.0, <4.0.0",
"qdarkstyle >=3.0.2, <4.0.0",
"pyqt6 >=6.7.0, <7.0.0",
"pyqtgraph",
# ------ - ------
# TODO: add an `--only daemon` group for running non-ui / pikerd
# service tree in distributed mode B)
# https://docs.astral.sh/uv/concepts/projects/dependencies/#optional-dependencies
# [project.optional-dependencies]
]
[dependency-groups]
dev = [
"pytest >=6.0.0, <7.0.0",
"elasticsearch >=8.9.0, <9.0.0",
"xonsh >=0.14.2, <0.15.0",
"prompt-toolkit ==3.0.40",
"cython >=3.0.0, <4.0.0",
"greenback >=1.1.1, <2.0.0",
# console ehancements and eventually remote debugging
# extras/helpers.
# TODO: add a toolset that makes debugging a `pikerd` service
# (tree) easy to hack on directly using more or less the local env:
# - xonsh + xxh
# - rsyscall + pdbp
# - actor runtime control console like BEAM/OTP
]
[project.scripts]
piker = "piker.cli:cli"
pikerd = "piker.cli:pikerd"
ledger = "piker.accounting.cli:ledger"
[tool.hatch.build.targets.sdist]
include = ["piker"]
[tool.hatch.build.targets.wheel]
include = ["piker"]
[tool.uv.sources]
pyqtgraph = { git = "https://github.com/pikers/pyqtgraph.git" }
asyncvnc = { git = "https://github.com/pikers/asyncvnc.git", branch = "main" }
tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" }
msgspec = { git = "https://github.com/jcrist/msgspec.git" }
tractor = { path = "../tractor" }

1473
uv.lock

File diff suppressed because it is too large Load Diff