Compare commits

..

21 Commits

Author SHA1 Message Date
Tyler Goodlet e87939e1f2 data._web_bs: try to raise jsonrpc errors in parent task 2025-02-03 18:52:11 -03:00
Nelson Torres 3630c02f48 Updated tractor method name. 2025-02-03 18:52:11 -03:00
Tyler Goodlet fdf34e51bb Drop `.cancel_actor()` from `maybe_spawn_daemon()`
Since `tractor`'s new and improved inter-actor cancellation semantics
are much more pedantic, AND bc we use the `ServiceMngr` for spawning
service actors on-demand, the caller of `maybe_spawn_daemon()` should
NEVER conduct a so called "out of band" `Actor`-runtime cancel request
since this is precisely the job of our `ServiceMngr` XD

Add a super in depth note explaining the underlying issue and adding
a todo list of how we should prolly augment `tractor` to make such cases
easier to grok and fix in the future!
2025-02-03 18:52:11 -03:00
Tyler Goodlet a0b540022e Prep service mngr for move to `tractor.hilevel`
Given it's a fairly simple yet useful abstraction, it makes sense to
offer this sub-sys alongside the core `tractor` runtime lib.

Without going into extreme detail on the impl changes (it'll come in
the commit that moves to the other repo) here is the high level summary:
------ - ------
- rename `Services` -> `ServiceMngr` and use an factory `@acm`
  to guarantee a single-instance-per-actor using a niche approach for a
  singleton object using a default keyword-arg B)
  - the mod level `open_service_mngr()` and `get_service_mngr()` are the
    new allocation/access API.
- add a `ServiceMngr.start_service()` method which does the work of both
  spawning a new subactor (for the daemon) and uses its portal to start
  the mngr side supervision task.
- open actor/task nurseries inside the `@acm` allocator.

Adjust other dependent subsystems to match:
------ - ------
- use `open_service_mngr()` when first allocated in `open_pikerd()`.
- use `get_service_mngr()` instead of importing the class ref inside
  `.service.maybe_spawn_daemon()`, `.brokers._daemon.spawn_brokerd()`
  and `.data._sampling.spawn_samplerd()` using a `partial` to pack in
  the endpoint ctx kwargs (unpacked inside `.start_service()` XD).
2025-02-03 18:52:11 -03:00
Tyler Goodlet 20f6343be2 Lel, forgot to add a `SPOT` venue for `binance`.. 2025-02-03 18:52:11 -03:00
Tyler Goodlet 2f30fc4fb8 Mask no-data pause-point, add perps to sig.
Was orig for debugging an issue with `kucoin` i think but definitely
shouldn't be left in XD

Also add `'perpetual_future'` to `start_backfill()` input literal set.
2025-02-03 18:52:11 -03:00
Nelson Torres 76b5547c2d uv migration 2025-02-03 18:49:07 -03:00
Nelson Torres 08811f9a61 Default.nix fix:
Since the last `poetry` update the command `poetry shell` have been
moved to a plugin, this affects the entire instalation, so now we
need to add more deps to the `buildInputs` and added to the
`LD_LIBRARY_PATH`

To run a command now you need to do something like this:

`poetry run piker ...`
2025-02-03 18:46:20 -03:00
Tyler Goodlet b577180773 ib: `.api` mod and log-fmt cleaning
About time we tidy'd a buncha status logging in this backend..
particularly for boot-up where there's lots of client-try-connect poll
looping with account detection from the user config.

`.api.Client` pprint and logging fmt improvements:
- add `Client.__repr__()` which shows the minimally useful set of info
  from the underlying `.ib: IB` as well as a new `.acnts: list[str]`
  of the account aliases defined in the user's `brokers.toml`.
- mk `.bars()` define a comprehensive `query_info: str` with all the
  request deats but only display if there's a problem with the response
  data.
- mk `.get_config()` report both the config file path and the acnt
  aliases (NOT the actual account #s).
- move all `.load_aio_clients()` client poll loop requests do
  `log.runtime()` statuses, only falling through to a `.warning()` when
  the loop fails to connect the client to the spec-ed API-gw addr, and
 |_ don't allow loading accounts for which the user has not defined an
    alias in `brokers.toml::[ib]`; raise a value-error in such cases
    with a message indicating how to mod the config.
 |_ only `log.info()` about acnts if some were loaded..

Other mod logging de-noising:
- better status fmting in `.symbols.open_symbol_search()` with
  `repr(Client)`.
- for `.feed.stream_quotes()` first quote reporting use `.runtime()`.
2024-06-20 14:40:21 -04:00
Tyler Goodlet f12c452d96 ib: warn about mkt precision cuckups that `Contract`s clearly deliver wrong.. 2024-06-18 12:42:21 -04:00
Tyler Goodlet 3531c2edc1 ib: mask out trade and vlm rates for now 2024-06-18 10:03:34 -04:00
Tyler Goodlet 97dd7e766a ib: more trade record edge case handling
- timestamps came as `'date'`-keyed from 2022 and before but now are
  `'datetime'`..
- some symbols seem to have no commission field, so handle that..
- when no `'price'` field found return `None` from `norm_trade()`.
- add a warn log on mid-fill commission updates.
2024-06-18 10:00:18 -04:00
Tyler Goodlet ab1463d942 Port binance to `httpx`
Like other backends use the `AsyncClient` for all venue specific
client-sessions but change to allocating them inside `get_client()`
using an `AsyncExitStack` and inserting directly in the
`Client.venue_sesh: dict` table during init.

Supporting impl tweaks:
- remove most of the API client session building logic and instead make
  `Client.__init__()` take in a `venue_sessions: dict` (set it to
  `.venue_sesh`) and `conf: dict`, instead opting to do the http client
  configuration inside `get_client()` since all that code only needs to
  be run once.
 |_load config inside `get_client()` once.
 |_move session token creation into a new util func `init_api_keys()` and
  also call it from `get_client()` factory; toss in an ex. toml section
  config to the doc string.
- define `_venue_urls: dict[str, str]` (content taken from old static
  `.venue_sesh` dict) at module level and feed them as `base_url: str`
  inputs to the client create loop.
- adjust all call sigs in httpx-sesh-using methods, namely just
  `._api()`.
- do a `.exch_info()` call in `get_client()` to cache the symbology
  set.

Unrelated changes for various other outstanding buggers:
- to get futures feeds correctly loading when selected
  from search (like 'XMRUSDT.USDTM.PERP'), expect a `MktPair` input to
  `Client.bars()` such that the exact venue-key can be looked up (via
  a new `.pair2venuekey()` meth) and then passed to `._api()`.
- adjust `.broker.open_trade_dialog()` to failover to paper engine when
  there's no `api_key` key set for the `subconf` venue-key.
2024-06-12 09:41:23 -04:00
Nelson Torres 5314cb79d4 Added note to exception when missing field in SpotPair class 2024-06-11 16:57:59 -04:00
Nelson Torres 0c0b7116e3 Added new fields to SpotPair class in venues 2024-06-11 16:57:59 -04:00
Tyler Goodlet 19c343e8b2 binance: raise `NoData` on null hist arrays
Like we do with other history backends to indicate lack of a data set.
This avoids any raise that will will bring down the backloader task with
some downstream error.

Raise a `ValueError` on no time index for now.
2024-06-11 10:28:56 -04:00
Tyler Goodlet b7883325a9 Woops, `data` can be an empty list XD 2024-05-28 16:19:28 -04:00
Tyler Goodlet 37ca081555 Woops, fix missing `api_url` ref in error log 2024-05-24 12:24:25 -04:00
Tyler Goodlet 44b8c70521 Change type-annots to use `httpx.Response` 2024-05-20 12:55:45 -04:00
Tyler Goodlet e6af97c596 Port `kucoin` backend to `httpx` 2024-05-20 11:09:30 -04:00
Tyler Goodlet 95ace5acb8 Port `kraken` backend to `httpx` 2024-05-20 11:09:10 -04:00
28 changed files with 735 additions and 730 deletions

View File

@ -1,161 +1,162 @@
piker piker
----- -----
trading gear for hackers trading gear for hackers.
|gh_actions| |gh_actions|
.. |gh_actions| image:: https://img.shields.io/endpoint.svg?url=https%3A%2F%2Factions-badge.atrox.dev%2Fpikers%2Fpiker%2Fbadge&style=popout-square .. |gh_actions| image:: https://img.shields.io/endpoint.svg?url=https%3A%2F%2Factions-badge.atrox.dev%2Fpikers%2Fpiker%2Fbadge&style=popout-square
:target: https://actions-badge.atrox.dev/piker/pikers/goto :target: https://actions-badge.atrox.dev/piker/pikers/goto
``piker`` is a broker agnostic, next-gen FOSS toolset and runtime for ``piker`` is a broker agnostic, next-gen FOSS toolset for real-time
real-time computational trading targeted at `hardcore Linux users computational trading targeted at `hardcore Linux users <comp_trader>`_ .
<comp_trader>`_ .
we use much bleeding edge tech including (but not limited to): we use as much bleeding edge tech as possible including (but not limited to):
- latest python for glue_ - latest python for glue_
- uv_ for packaging and distribution - trio_ & tractor_ for our distributed, multi-core, real-time streaming
- trio_ & tractor_ for our distributed `structured concurrency`_ runtime `structured concurrency`_ runtime B)
- Qt_ for pristine low latency UIs - Qt_ for pristine high performance UIs
- pyqtgraph_ (which we've extended) for real-time charting and graphics - pyqtgraph_ for real-time charting
- ``polars`` ``numpy`` and ``numba`` for redic `fast numerics`_ - ``polars`` ``numpy`` and ``numba`` for `fast numerics`_
- `apache arrow and parquet`_ for time-series storage - `apache arrow and parquet`_ for time series history management
persistence and sharing
- (prototyped) techtonicdb_ for L2 book storage
potential projects we might integrate with soon, .. |travis| image:: https://img.shields.io/travis/pikers/piker/master.svg
:target: https://travis-ci.org/pikers/piker
- (already prototyped in ) techtonicdb_ for L2 book storage
.. _comp_trader: https://jfaleiro.wordpress.com/2019/10/09/computational-trader/
.. _glue: https://numpy.org/doc/stable/user/c-info.python-as-glue.html#using-python-as-glue
.. _uv: https://docs.astral.sh/uv/
.. _trio: https://github.com/python-trio/trio .. _trio: https://github.com/python-trio/trio
.. _tractor: https://github.com/goodboy/tractor .. _tractor: https://github.com/goodboy/tractor
.. _structured concurrency: https://trio.discourse.group/ .. _structured concurrency: https://trio.discourse.group/
.. _marketstore: https://github.com/alpacahq/marketstore
.. _techtonicdb: https://github.com/0b01/tectonicdb
.. _Qt: https://www.qt.io/ .. _Qt: https://www.qt.io/
.. _pyqtgraph: https://github.com/pyqtgraph/pyqtgraph .. _pyqtgraph: https://github.com/pyqtgraph/pyqtgraph
.. _glue: https://numpy.org/doc/stable/user/c-info.python-as-glue.html#using-python-as-glue
.. _apache arrow and parquet: https://arrow.apache.org/faq/ .. _apache arrow and parquet: https://arrow.apache.org/faq/
.. _fast numerics: https://zerowithdot.com/python-numpy-and-pandas-performance/ .. _fast numerics: https://zerowithdot.com/python-numpy-and-pandas-performance/
.. _techtonicdb: https://github.com/0b01/tectonicdb .. _comp_trader: https://jfaleiro.wordpress.com/2019/10/09/computational-trader/
focus and feats: focus and features:
**************** *******************
fitting with these tenets, we're always open to new - 100% federated: your code, your hardware, your data feeds, your broker fills.
framework/lib/service interop suggestions and ideas! - zero web: low latency, native software that doesn't try to re-invent the OS
- maximal **privacy**: prevent brokers and mms from knowing your
planz; smack their spreads with dark volume.
- zero clutter: modal, context oriented UIs that echew minimalism, reduce
thought noise and encourage un-emotion.
- first class parallelism: built from the ground up on next-gen structured concurrency
primitives.
- traders first: broker/exchange/asset-class agnostic
- systems grounded: real-time financial signal processing that will
make any queuing or DSP eng juice their shorts.
- non-tina UX: sleek, powerful keyboard driven interaction with expected use in tiling wms
- data collaboration: every process and protocol is multi-host scalable.
- fight club ready: zero interest in adoption by suits; no corporate friendly license, ever.
- **100% federated**: fitting with these tenets, we're always open to new framework suggestions and ideas.
your code, your hardware, your data feeds, your broker fills.
- **zero web**: building the best looking, most reliable, keyboard friendly trading
low latency as a prime objective, native UIs and modern IPC platform is the dream; join the cause.
protocols without trying to re-invent the "OS-as-an-app"..
- **maximal privacy**:
prevent brokers and mms from knowing your planz; smack their
spreads with dark volume from a VPN tunnel.
- **zero clutter**:
modal, context oriented UIs that echew minimalism, reduce thought
noise and encourage un-emotion.
- **first class parallelism**:
built from the ground up on a next-gen structured concurrency
supervision sys.
- **traders first**:
broker/exchange/venue/asset-class/money-sys agnostic
- **systems grounded**:
real-time financial signal processing (fsp) that will make any
queuing or DSP eng juice their shorts.
- **non-tina UX**:
sleek, powerful keyboard driven interaction with expected use in
tiling wms (or maybe even a DDE).
- **data collab at scale**:
every actor-process and protocol is multi-host aware.
- **fight club ready**:
zero interest in adoption by suits; no corporate friendly license,
ever.
building the hottest looking, fastest, most reliable, keyboard
friendly FOSS trading platform is the dream; join the cause.
a sane install with `uv` sane install with `poetry`
************************ **************************
bc why install with `python` when you can faster with `rust` :: TODO!
uv lock
rigorous install on ``nixos`` using ``poetry2nix``
**************************************************
TODO!
hacky install on nixos hacky install on nixos
********************** **********************
``NixOS`` is our core devs' distro of choice for which we offer `NixOS` is our core devs' distro of choice for which we offer
a stringently defined development shell envoirment that can be loaded with:: a stringently defined development shell envoirment that can be loaded with::
nix-shell default.nix nix-shell develop.nix
this will setup the required python environment to run piker, make sure to
run::
pip install -r requirements.txt -e .
once after loading the shell
start a chart install wild-west style via `pip`
************* *********************************
run a realtime OHLCV chart stand-alone:: ``piker`` is currently under heavy pre-alpha development and as such
should be cloned from this repo and hacked on directly.
piker -l info chart btcusdt.spot.binance xmrusdt.spot.kraken for a development install::
this runs a chart UI (with 1m sampled OHLCV) and shows 2 spot markets from 2 diff cexes git clone git@github.com:pikers/piker.git
overlayed on the same graph. Use of `piker` without first starting cd piker
a daemon (`pikerd` - see below) means there is an implicit spawning of the virtualenv env
multi-actor-runtime (implemented as a `tractor` app). source ./env/bin/activate
pip install -r requirements.txt -e .
For additional subsystem feats available through our chart UI see the
various sub-readmes:
- order control using a mouse-n-keyboard UX B)
- cross venue market-pair (what most call "symbol") search, select, overlay Bo
- financial-signal-processing (`piker.fsp`) write-n-reload to sub-chart BO
- src-asset derivatives scan for anal, like the infamous "max pain" XO
spawn a daemon standalone check out our charts
************************* ********************
we call the root actor-process the ``pikerd``. it can be (and is bet you weren't expecting this from the foss::
recommended normally to be) started separately from the ``piker
chart`` program:: piker -l info -b kraken -b binance chart btcusdt.binance --pdb
this runs the main chart (currently with 1m sampled OHLC) in in debug
mode and you can practice paper trading using the following
micro-manual:
``order_mode`` (
edge triggered activation by any of the following keys,
``mouse-click`` on y-level to submit at that price
):
- ``f``/ ``ctl-f`` to stage buy
- ``d``/ ``ctl-d`` to stage sell
- ``a`` to stage alert
``search_mode`` (
``ctl-l`` or ``ctl-space`` to open,
``ctl-c`` or ``ctl-space`` to close
) :
- begin typing to have symbol search automatically lookup
symbols from all loaded backend (broker) providers
- arrow keys and mouse click to navigate selection
- vi-like ``ctl-[hjkl]`` for navigation
you can also configure your position allocation limits from the
sidepane.
run in distributed mode
***********************
start the service manager and data feed daemon in the background and
connect to it::
pikerd -l info --pdb pikerd -l info --pdb
the daemon does nothing until a ``piker``-client (like ``piker
chart``) connects and requests some particular sub-system. for
a connecting chart ``pikerd`` will spawn and manage at least,
- a data-feed daemon: ``datad`` which does all the work of comms with connect your chart::
the backend provider (in this case the ``binance`` cex).
- a paper-trading engine instance, ``paperboi.binance``, (if no live
account has been configured) which allows for auto/manual order
control against the live quote stream.
*using* an actor-service (aka micro-daemon) manager which dynamically piker -l info -b kraken -b binance chart xmrusdt.binance --pdb
supervises various sub-subsystems-as-services throughout the ``piker``
runtime-stack.
now you can (implicitly) connect your chart::
piker chart btcusdt.spot.binance enjoy persistent real-time data feeds tied to daemon lifetime. the next
time you spawn a chart it will load much faster since the data feed has
since ``pikerd`` was started separately you can now enjoy a persistent been cached and is now always running live in the background until you
real-time data stream tied to the daemon-tree's lifetime. i.e. the next kill ``pikerd``.
time you spawn a chart it will obviously not only load much faster
(since the underlying ``datad.binance`` is left running with its
in-memory IPC data structures) but also the data-feed and any order
mgmt states should be persistent until you finally cancel ``pikerd``.
if anyone asks you what this project is about if anyone asks you what this project is about
********************************************* *********************************************
you don't talk about it; just use it. you don't talk about it.
how do i get involved? how do i get involved?
@ -165,15 +166,6 @@ enter the matrix.
how come there ain't that many docs how come there ain't that many docs
*********************************** ***********************************
i mean we want/need them but building the core right has been higher suck it up, learn the code; no one is trying to sell you on anything.
prio then marketting (and likely will stay that way Bp). also, we need lotsa help so if you want to start somewhere and can't
necessarily write serious code, this might be the place for you!
soo, suck it up bc,
- no one is trying to sell you on anything
- learning the code base is prolly way more valuable
- the UI/UXs are intended to be "intuitive" for any hacker..
we obviously need tonz help so if you want to start somewhere and
can't necessarily write "advanced" concurrent python/rust code, this
helping document literally anything might be the place for you!

View File

@ -23,6 +23,7 @@ from __future__ import annotations
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from functools import partial
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
@ -190,14 +191,17 @@ def broker_init(
async def spawn_brokerd( async def spawn_brokerd(
brokername: str, brokername: str,
loglevel: str | None = None, loglevel: str | None = None,
**tractor_kwargs, **tractor_kwargs,
) -> bool: ) -> bool:
'''
Spawn a `brokerd.<backendname>` subactor service daemon
using `pikerd`'s service mngr.
'''
from piker.service._util import log # use service mngr log from piker.service._util import log # use service mngr log
log.info(f'Spawning {brokername} broker daemon') log.info(f'Spawning {brokername} broker daemon')
@ -217,27 +221,35 @@ async def spawn_brokerd(
# ask `pikerd` to spawn a new sub-actor and manage it under its # ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery # actor nursery
from piker.service import Services from piker.service import (
get_service_mngr,
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}' ServiceMngr,
portal = await Services.actor_n.start_actor(
dname,
enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'),
debug_mode=Services.debug_mode,
**tractor_kwargs
) )
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
# NOTE: the service mngr expects an already spawned actor + its mngr: ServiceMngr = get_service_mngr()
# portal ref in order to do non-blocking setup of brokerd ctx: tractor.Context = await mngr.start_service(
# service nursery. daemon_name=dname,
await Services.start_service_task( ctx_ep=partial(
dname,
portal,
# signature of target root-task endpoint # signature of target root-task endpoint
daemon_fixture_ep, daemon_fixture_ep,
# passed to daemon_fixture_ep(**kwargs)
brokername=brokername, brokername=brokername,
loglevel=loglevel, loglevel=loglevel,
),
debug_mode=mngr.debug_mode,
loglevel=loglevel,
enable_modules=(
_data_mods
+
tractor_kwargs.pop('enable_modules')
),
**tractor_kwargs
)
assert (
not ctx.cancel_called
and ctx.portal # parent side
and dname in ctx.chan.uid # subactor is named as desired
) )
return True return True
@ -262,8 +274,7 @@ async def maybe_spawn_brokerd(
from piker.service import maybe_spawn_daemon from piker.service import maybe_spawn_daemon
async with maybe_spawn_daemon( async with maybe_spawn_daemon(
service_name=f'brokerd.{brokername}',
f'brokerd.{brokername}',
service_task_target=spawn_brokerd, service_task_target=spawn_brokerd,
spawn_args={ spawn_args={
'brokername': brokername, 'brokername': brokername,

View File

@ -42,6 +42,7 @@ from trio_typing import TaskStatus
from pendulum import ( from pendulum import (
from_timestamp, from_timestamp,
) )
from rapidfuzz import process as fuzzy
import numpy as np import numpy as np
import tractor import tractor
@ -110,7 +111,6 @@ class AggTrade(Struct, frozen=True):
async def stream_messages( async def stream_messages(
ws: NoBsWs, ws: NoBsWs,
) -> AsyncGenerator[NoBsWs, dict]: ) -> AsyncGenerator[NoBsWs, dict]:
# TODO: match syntax here! # TODO: match syntax here!
@ -221,8 +221,6 @@ def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]:
} }
# TODO, why aren't frame resp `log.info()`s showing in upstream
# code?!
@acm @acm
async def open_history_client( async def open_history_client(
mkt: MktPair, mkt: MktPair,
@ -465,8 +463,6 @@ async def stream_quotes(
): ):
init_msgs: list[FeedInit] = [] init_msgs: list[FeedInit] = []
for sym in symbols: for sym in symbols:
mkt: MktPair
pair: Pair
mkt, pair = await get_mkt_info(sym) mkt, pair = await get_mkt_info(sym)
# build out init msgs according to latest spec # build out init msgs according to latest spec
@ -515,6 +511,7 @@ async def stream_quotes(
# start streaming # start streaming
async for typ, quote in msg_gen: async for typ, quote in msg_gen:
# period = time.time() - last # period = time.time() - last
# hz = 1/period if period else float('inf') # hz = 1/period if period else float('inf')
# if hz > 60: # if hz > 60:
@ -550,7 +547,7 @@ async def open_symbol_search(
) )
# repack in fqme-keyed table # repack in fqme-keyed table
byfqme: dict[str, Pair] = {} byfqme: dict[start, Pair] = {}
for pair in pairs.values(): for pair in pairs.values():
byfqme[pair.bs_fqme] = pair byfqme[pair.bs_fqme] = pair

View File

@ -181,6 +181,7 @@ class FutesPair(Pair):
quoteAsset: str # 'USDT', quoteAsset: str # 'USDT',
quotePrecision: int # 8, quotePrecision: int # 8,
requiredMarginPercent: float # '5.0000', requiredMarginPercent: float # '5.0000',
settlePlan: int # 0,
timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'], timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'],
triggerProtect: float # '0.0500', triggerProtect: float # '0.0500',
underlyingSubType: list[str] # ['PoW'], underlyingSubType: list[str] # ['PoW'],

View File

@ -1250,12 +1250,6 @@ async def load_aio_clients(
for i in range(connect_retries): for i in range(connect_retries):
try: try:
log.info(
'Trying `ib_async` connect\n'
f'{host}: {port}\n'
f'clientId: {client_id}\n'
f'timeout: {connect_timeout}\n'
)
await ib.connectAsync( await ib.connectAsync(
host, host,
port, port,
@ -1373,9 +1367,7 @@ async def load_clients_for_trio(
a ``tractor.to_asyncio.open_channel_from()``. a ``tractor.to_asyncio.open_channel_from()``.
''' '''
async with load_aio_clients( async with load_aio_clients() as accts2clients:
disconnect_on_exit=False,
) as accts2clients:
to_trio.send_nowait(accts2clients) to_trio.send_nowait(accts2clients)

View File

@ -62,7 +62,7 @@ from piker._cacheables import (
) )
from piker.log import get_logger from piker.log import get_logger
from piker.data.validate import FeedInit from piker.data.validate import FeedInit
from piker.types import Struct # NOTE, this is already a `tractor.msg.Struct` from piker.types import Struct
from piker.data import ( from piker.data import (
def_iohlcv_fields, def_iohlcv_fields,
match_from_pairs, match_from_pairs,
@ -98,18 +98,9 @@ class KucoinMktPair(Struct, frozen=True):
def size_tick(self) -> Decimal: def size_tick(self) -> Decimal:
return Decimal(str(self.quoteMinSize)) return Decimal(str(self.quoteMinSize))
callauctionFirstStageStartTime: None|float
callauctionIsEnabled: bool
callauctionPriceCeiling: float|None
callauctionPriceFloor: float|None
callauctionSecondStageStartTime: float|None
callauctionThirdStageStartTime: float|None
enableTrading: bool enableTrading: bool
feeCategory: int
feeCurrency: str feeCurrency: str
isMarginEnabled: bool isMarginEnabled: bool
makerFeeCoefficient: float
market: str market: str
minFunds: float minFunds: float
name: str name: str
@ -119,10 +110,7 @@ class KucoinMktPair(Struct, frozen=True):
quoteIncrement: float quoteIncrement: float
quoteMaxSize: float quoteMaxSize: float
quoteMinSize: float quoteMinSize: float
st: bool
symbol: str # our bs_mktid, kucoin's internal id symbol: str # our bs_mktid, kucoin's internal id
takerFeeCoefficient: float
tradingStartTime: float|None
class AccountTrade(Struct, frozen=True): class AccountTrade(Struct, frozen=True):
@ -404,13 +392,7 @@ class Client:
pairs: dict[str, KucoinMktPair] = {} pairs: dict[str, KucoinMktPair] = {}
fqmes2mktids: bidict[str, str] = bidict() fqmes2mktids: bidict[str, str] = bidict()
for item in entries: for item in entries:
try:
pair = pairs[item['name']] = KucoinMktPair(**item) pair = pairs[item['name']] = KucoinMktPair(**item)
except TypeError as te:
raise TypeError(
'`KucoinMktPair` and reponse fields do not match ??\n'
f'{KucoinMktPair.fields_diff(item)}\n'
) from te
fqmes2mktids[ fqmes2mktids[
item['name'].lower().replace('-', '') item['name'].lower().replace('-', '')
] = pair.name ] = pair.name
@ -611,7 +593,7 @@ async def get_client() -> AsyncGenerator[Client, None]:
''' '''
async with ( async with (
httpx.AsyncClient( httpx.AsyncClient(
base_url='https://api.kucoin.com/api', base_url=f'https://api.kucoin.com/api',
) as trio_client, ) as trio_client,
): ):
client = Client(httpx_client=trio_client) client = Client(httpx_client=trio_client)
@ -655,7 +637,7 @@ async def open_ping_task(
await trio.sleep((ping_interval - 1000) / 1000) await trio.sleep((ping_interval - 1000) / 1000)
await ws.send_msg({'id': connect_id, 'type': 'ping'}) await ws.send_msg({'id': connect_id, 'type': 'ping'})
log.warning('Starting ping task for kucoin ws connection') log.info('Starting ping task for kucoin ws connection')
n.start_soon(ping_server) n.start_soon(ping_server)
yield yield
@ -667,14 +649,9 @@ async def open_ping_task(
async def get_mkt_info( async def get_mkt_info(
fqme: str, fqme: str,
) -> tuple[ ) -> tuple[MktPair, KucoinMktPair]:
MktPair,
KucoinMktPair,
]:
''' '''
Query for and return both a `piker.accounting.MktPair` and Query for and return a `MktPair` and `KucoinMktPair`.
`KucoinMktPair` from provided `fqme: str`
(fully-qualified-market-endpoint).
''' '''
async with open_cached_client('kucoin') as client: async with open_cached_client('kucoin') as client:
@ -749,8 +726,6 @@ async def stream_quotes(
log.info(f'Starting up quote stream(s) for {symbols}') log.info(f'Starting up quote stream(s) for {symbols}')
for sym_str in symbols: for sym_str in symbols:
mkt: MktPair
pair: KucoinMktPair
mkt, pair = await get_mkt_info(sym_str) mkt, pair = await get_mkt_info(sym_str)
init_msgs.append( init_msgs.append(
FeedInit(mkt_info=mkt) FeedInit(mkt_info=mkt)
@ -758,11 +733,7 @@ async def stream_quotes(
ws: NoBsWs ws: NoBsWs
token, ping_interval = await client._get_ws_token() token, ping_interval = await client._get_ws_token()
log.info('API reported ping_interval: {ping_interval}\n') connect_id = str(uuid4())
connect_id: str = str(uuid4())
typ: str
quote: dict
async with ( async with (
open_autorecon_ws( open_autorecon_ws(
( (
@ -776,37 +747,20 @@ async def stream_quotes(
), ),
) as ws, ) as ws,
open_ping_task(ws, ping_interval, connect_id), open_ping_task(ws, ping_interval, connect_id),
aclosing( aclosing(stream_messages(ws, sym_str)) as msg_gen,
iter_normed_quotes(
ws, sym_str
)
) as iter_quotes,
): ):
typ, quote = await anext(iter_quotes) typ, quote = await anext(msg_gen)
while typ != 'trade':
# take care to not unblock here until we get a real # take care to not unblock here until we get a real
# trade quote? # trade quote
# ^TODO, remove this right? typ, quote = await anext(msg_gen)
# -[ ] what often blocks chart boot/new-feed switching
# since we'ere waiting for a live quote instead of just
# loading history afap..
# |_ XXX, not sure if we require a bit of rework to core
# feed init logic or if backends justg gotta be
# changed up.. feel like there was some causality
# dilema prolly only seen with IB too..
# while typ != 'trade':
# typ, quote = await anext(iter_quotes)
task_status.started((init_msgs, quote)) task_status.started((init_msgs, quote))
feed_is_live.set() feed_is_live.set()
# XXX NOTE, DO NOT include the `.<backend>` suffix! async for typ, msg in msg_gen:
# OW the sampling loop will not broadcast correctly.. await send_chan.send({sym_str: msg})
# since `bus._subscribers.setdefault(bs_fqme, set())`
# is used inside `.data.open_feed_bus()` !!!
topic: str = mkt.bs_fqme
async for typ, quote in iter_quotes:
await send_chan.send({topic: quote})
@acm @acm
@ -861,7 +815,7 @@ async def subscribe(
) )
async def iter_normed_quotes( async def stream_messages(
ws: NoBsWs, ws: NoBsWs,
sym: str, sym: str,
@ -892,9 +846,6 @@ async def iter_normed_quotes(
yield 'trade', { yield 'trade', {
'symbol': sym, 'symbol': sym,
# TODO, is 'last' even used elsewhere/a-good
# semantic? can't we just read the ticks with our
# .data.ticktools.frame_ticks()`/
'last': trade_data.price, 'last': trade_data.price,
'brokerd_ts': last_trade_ts, 'brokerd_ts': last_trade_ts,
'ticks': [ 'ticks': [
@ -987,7 +938,7 @@ async def open_history_client(
if end_dt is None: if end_dt is None:
inow = round(time.time()) inow = round(time.time())
log.debug( print(
f'difference in time between load and processing' f'difference in time between load and processing'
f'{inow - times[-1]}' f'{inow - times[-1]}'
) )

View File

@ -1,49 +0,0 @@
piker.clearing
______________
trade execution-n-control subsys for both live and paper trading as
well as algo-trading manual override/interaction across any backend
broker and data provider.
avail UIs
*********
order ctl
---------
the `piker.clearing` subsys is exposed mainly though
the `piker chart` GUI as a "chart trader" style UX and
is automatically enabled whenever a chart is opened.
.. ^TODO, more prose here!
the "manual" order control features are exposed via the
`piker.ui.order_mode` API and can pretty much always be
used (at least) in simulated-trading mode, aka "paper"-mode, and
the micro-manual is as follows:
``order_mode`` (
edge triggered activation by any of the following keys,
``mouse-click`` on y-level to submit at that price
):
- ``f``/ ``ctl-f`` to stage buy
- ``d``/ ``ctl-d`` to stage sell
- ``a`` to stage alert
``search_mode`` (
``ctl-l`` or ``ctl-space`` to open,
``ctl-c`` or ``ctl-space`` to close
) :
- begin typing to have symbol search automatically lookup
symbols from all loaded backend (broker) providers
- arrow keys and mouse click to navigate selection
- vi-like ``ctl-[hjkl]`` for navigation
position (pp) mgmt
------------------
you can also configure your position allocation limits from the
sidepane.
.. ^TODO, explain and provide tut once more refined!

View File

@ -335,7 +335,7 @@ def services(config, tl, ports):
name='service_query', name='service_query',
loglevel=config['loglevel'] if tl else None, loglevel=config['loglevel'] if tl else None,
), ),
tractor.get_arbiter( tractor.get_registry(
host=host, host=host,
port=ports[0] port=ports[0]
) as portal ) as portal

View File

@ -104,15 +104,14 @@ def get_app_dir(
# `tractor`) with the testing dir and check for it whenever we # `tractor`) with the testing dir and check for it whenever we
# detect `pytest` is being used (which it isn't under normal # detect `pytest` is being used (which it isn't under normal
# operation). # operation).
# if "pytest" in sys.modules: if "pytest" in sys.modules:
# import tractor import tractor
# actor = tractor.current_actor(err_on_no_runtime=False) actor = tractor.current_actor(err_on_no_runtime=False)
# if actor: # runtime is up if actor: # runtime is up
# rvs = tractor._state._runtime_vars rvs = tractor._state._runtime_vars
# import pdbp; pdbp.set_trace() testdirpath = Path(rvs['piker_vars']['piker_test_dir'])
# testdirpath = Path(rvs['piker_vars']['piker_test_dir']) assert testdirpath.exists(), 'piker test harness might be borked!?'
# assert testdirpath.exists(), 'piker test harness might be borked!?' app_name = str(testdirpath)
# app_name = str(testdirpath)
if platform.system() == 'Windows': if platform.system() == 'Windows':
key = "APPDATA" if roaming else "LOCALAPPDATA" key = "APPDATA" if roaming else "LOCALAPPDATA"

View File

@ -25,6 +25,7 @@ from collections import (
defaultdict, defaultdict,
) )
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from functools import partial
import time import time
from typing import ( from typing import (
Any, Any,
@ -42,7 +43,7 @@ from tractor.trionics import (
maybe_open_nursery, maybe_open_nursery,
) )
import trio import trio
from trio_typing import TaskStatus from trio import TaskStatus
from .ticktools import ( from .ticktools import (
frame_ticks, frame_ticks,
@ -70,6 +71,7 @@ if TYPE_CHECKING:
_default_delay_s: float = 1.0 _default_delay_s: float = 1.0
# TODO: use new `tractor.singleton_acm` API for this!
class Sampler: class Sampler:
''' '''
Global sampling engine registry. Global sampling engine registry.
@ -79,9 +81,9 @@ class Sampler:
This non-instantiated type is meant to be a singleton within This non-instantiated type is meant to be a singleton within
a `samplerd` actor-service spawned once by the user wishing to a `samplerd` actor-service spawned once by the user wishing to
time-step-sample (real-time) quote feeds, see time-step-sample a (real-time) quote feeds, see
``.service.maybe_open_samplerd()`` and the below `.service.maybe_open_samplerd()` and the below
``register_with_sampler()``. `register_with_sampler()`.
''' '''
service_nursery: None | trio.Nursery = None service_nursery: None | trio.Nursery = None
@ -375,7 +377,10 @@ async def register_with_sampler(
assert Sampler.ohlcv_shms assert Sampler.ohlcv_shms
# unblock caller # unblock caller
await ctx.started(set(Sampler.ohlcv_shms.keys())) await ctx.started(
# XXX bc msgpack only allows one array type!
list(Sampler.ohlcv_shms.keys())
)
if open_index_stream: if open_index_stream:
try: try:
@ -419,7 +424,6 @@ async def register_with_sampler(
async def spawn_samplerd( async def spawn_samplerd(
loglevel: str | None = None, loglevel: str | None = None,
**extra_tractor_kwargs **extra_tractor_kwargs
@ -429,7 +433,10 @@ async def spawn_samplerd(
update and increment count write and stream broadcasting. update and increment count write and stream broadcasting.
''' '''
from piker.service import Services from piker.service import (
get_service_mngr,
ServiceMngr,
)
dname = 'samplerd' dname = 'samplerd'
log.info(f'Spawning `{dname}`') log.info(f'Spawning `{dname}`')
@ -437,26 +444,33 @@ async def spawn_samplerd(
# singleton lock creation of ``samplerd`` since we only ever want # singleton lock creation of ``samplerd`` since we only ever want
# one daemon per ``pikerd`` proc tree. # one daemon per ``pikerd`` proc tree.
# TODO: make this built-into the service api? # TODO: make this built-into the service api?
async with Services.locks[dname + '_singleton']: mngr: ServiceMngr = get_service_mngr()
already_started: bool = dname in mngr.service_tasks
if dname not in Services.service_tasks: async with mngr._locks[dname + '_singleton']:
ctx: Context = await mngr.start_service(
daemon_name=dname,
ctx_ep=partial(
register_with_sampler,
period_s=1,
sub_for_broadcasts=False,
),
debug_mode=mngr.debug_mode, # set by pikerd flag
portal = await Services.actor_n.start_actor( # proxy-through to tractor
dname,
enable_modules=[ enable_modules=[
'piker.data._sampling', 'piker.data._sampling',
], ],
loglevel=loglevel, loglevel=loglevel,
debug_mode=Services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs **extra_tractor_kwargs
) )
if not already_started:
await Services.start_service_task( assert (
dname, ctx
portal, and
register_with_sampler, ctx.portal
period_s=1, and
sub_for_broadcasts=False, not ctx.cancel_called
) )
return True return True
@ -889,6 +903,7 @@ async def uniform_rate_send(
# to consumers which crash or lose network connection. # to consumers which crash or lose network connection.
# I.e. we **DO NOT** want to crash and propagate up to # I.e. we **DO NOT** want to crash and propagate up to
# ``pikerd`` these kinds of errors! # ``pikerd`` these kinds of errors!
trio.EndOfChannel,
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
ConnectionResetError, ConnectionResetError,

View File

@ -360,7 +360,7 @@ async def open_autorecon_ws(
''' '''
JSONRPC response-request style machinery for transparent multiplexing JSONRPC response-request style machinery for transparent multiplexing
of msgs over a `NoBsWs`. of msgs over a NoBsWs.
''' '''
@ -377,44 +377,23 @@ async def open_jsonrpc_session(
url: str, url: str,
start_id: int = 0, start_id: int = 0,
response_type: type = JSONRPCResult, response_type: type = JSONRPCResult,
msg_recv_timeout: float = float('inf'), # request_type: Optional[type] = None,
# ^NOTE, since only `deribit` is using this jsonrpc stuff atm # request_hook: Optional[Callable] = None,
# and options mkts are generally "slow moving".. # error_hook: Optional[Callable] = None,
#
# FURTHER if we break the underlying ws connection then since we
# don't pass a `fixture` to the task that manages `NoBsWs`, i.e.
# `_reconnect_forever()`, the jsonrpc "transport pipe" get's
# broken and never restored with wtv init sequence is required to
# re-establish a working req-resp session.
) -> Callable[[str, dict], dict]: ) -> Callable[[str, dict], dict]:
'''
Init a json-RPC-over-websocket connection to the provided `url`.
A `json_rpc: Callable[[str, dict], dict` is delivered to the
caller for sending requests and a bg-`trio.Task` handles
processing of response msgs including error reporting/raising in
the parent/caller task.
'''
# NOTE, store all request msgs so we can raise errors on the # NOTE, store all request msgs so we can raise errors on the
# caller side! # caller side!
req_msgs: dict[int, dict] = {} req_msgs: dict[int, dict] = {}
async with ( async with (
trio.open_nursery() as tn, trio.open_nursery() as n,
open_autorecon_ws( open_autorecon_ws(url) as ws
url=url,
msg_recv_timeout=msg_recv_timeout,
) as ws
): ):
rpc_id: Iterable[int] = count(start_id) rpc_id: Iterable[int] = count(start_id)
rpc_results: dict[int, dict] = {} rpc_results: dict[int, dict] = {}
async def json_rpc( async def json_rpc(method: str, params: dict) -> dict:
method: str,
params: dict,
) -> dict:
''' '''
perform a json rpc call and wait for the result, raise exception in perform a json rpc call and wait for the result, raise exception in
case of error field present on response case of error field present on response
@ -491,10 +470,15 @@ async def open_jsonrpc_session(
'params': _, 'params': _,
}: }:
log.debug(f'Recieved\n{msg}') log.debug(f'Recieved\n{msg}')
# if request_hook:
# await request_hook(request_type(**msg))
case { case {
'error': error 'error': error
}: }:
# if error_hook:
# await error_hook(response_type(**msg))
# retreive orig request msg, set error # retreive orig request msg, set error
# response in original "result" msg, # response in original "result" msg,
# THEN FINALLY set the event to signal caller # THEN FINALLY set the event to signal caller
@ -513,6 +497,6 @@ async def open_jsonrpc_session(
case _: case _:
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
tn.start_soon(recv_task) n.start_soon(recv_task)
yield json_rpc yield json_rpc
tn.cancel_scope.cancel() n.cancel_scope.cancel()

View File

@ -30,7 +30,11 @@ Actor runtime primtives and (distributed) service APIs for,
=> TODO: maybe to (re)move elsewhere? => TODO: maybe to (re)move elsewhere?
''' '''
from ._mngr import Services as Services from ._mngr import (
get_service_mngr as get_service_mngr,
open_service_mngr as open_service_mngr,
ServiceMngr as ServiceMngr,
)
from ._registry import ( from ._registry import (
_tractor_kwargs as _tractor_kwargs, _tractor_kwargs as _tractor_kwargs,
_default_reg_addr as _default_reg_addr, _default_reg_addr as _default_reg_addr,

View File

@ -21,7 +21,6 @@
from __future__ import annotations from __future__ import annotations
import os import os
from typing import ( from typing import (
Optional,
Any, Any,
ClassVar, ClassVar,
) )
@ -30,13 +29,13 @@ from contextlib import (
) )
import tractor import tractor
import trio
from ._util import ( from ._util import (
get_console_log, get_console_log,
) )
from ._mngr import ( from ._mngr import (
Services, open_service_mngr,
ServiceMngr,
) )
from ._registry import ( # noqa from ._registry import ( # noqa
_tractor_kwargs, _tractor_kwargs,
@ -59,7 +58,7 @@ async def open_piker_runtime(
registry_addrs: list[tuple[str, int]] = [], registry_addrs: list[tuple[str, int]] = [],
enable_modules: list[str] = [], enable_modules: list[str] = [],
loglevel: Optional[str] = None, loglevel: str|None = None,
# XXX NOTE XXX: you should pretty much never want debug mode # XXX NOTE XXX: you should pretty much never want debug mode
# for data daemons when running in production. # for data daemons when running in production.
@ -69,7 +68,7 @@ async def open_piker_runtime(
# and spawn the service tree distributed per that. # and spawn the service tree distributed per that.
start_method: str = 'trio', start_method: str = 'trio',
tractor_runtime_overrides: dict | None = None, tractor_runtime_overrides: dict|None = None,
**tractor_kwargs, **tractor_kwargs,
) -> tuple[ ) -> tuple[
@ -119,6 +118,10 @@ async def open_piker_runtime(
# spawn other specialized daemons I think? # spawn other specialized daemons I think?
enable_modules=enable_modules, enable_modules=enable_modules,
# TODO: how to configure this?
# keep it on by default if debug mode is set?
# maybe_enable_greenback=debug_mode,
**tractor_kwargs, **tractor_kwargs,
) as actor, ) as actor,
@ -167,12 +170,13 @@ async def open_pikerd(
**kwargs, **kwargs,
) -> Services: ) -> ServiceMngr:
''' '''
Start a root piker daemon with an indefinite lifetime. Start a root piker daemon actor (aka `pikerd`) with an indefinite
lifetime.
A root actor nursery is created which can be used to create and keep A root actor-nursery is created which can be used to spawn and
alive underling services (see below). supervise underling service sub-actors (see below).
''' '''
# NOTE: for the root daemon we always enable the root # NOTE: for the root daemon we always enable the root
@ -199,8 +203,6 @@ async def open_pikerd(
root_actor, root_actor,
reg_addrs, reg_addrs,
), ),
tractor.open_nursery() as actor_nursery,
trio.open_nursery() as service_nursery,
): ):
for addr in reg_addrs: for addr in reg_addrs:
if addr not in root_actor.accept_addrs: if addr not in root_actor.accept_addrs:
@ -209,25 +211,17 @@ async def open_pikerd(
'Maybe you have another daemon already running?' 'Maybe you have another daemon already running?'
) )
# assign globally for future daemon/task creation mngr: ServiceMngr
Services.actor_n = actor_nursery async with open_service_mngr(
Services.service_n = service_nursery debug_mode=debug_mode,
Services.debug_mode = debug_mode ) as mngr:
yield mngr
try:
yield Services
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in Services.service_tasks:
# await Services.cancel_service('samplerd')
service_nursery.cancel_scope.cancel()
# TODO: do we even need this? # TODO: do we even need this?
# @acm # @acm
# async def maybe_open_runtime( # async def maybe_open_runtime(
# loglevel: Optional[str] = None, # loglevel: str|None = None,
# **kwargs, # **kwargs,
# ) -> None: # ) -> None:
@ -256,7 +250,7 @@ async def maybe_open_pikerd(
loglevel: str | None = None, loglevel: str | None = None,
**kwargs, **kwargs,
) -> tractor._portal.Portal | ClassVar[Services]: ) -> tractor._portal.Portal | ClassVar[ServiceMngr]:
''' '''
If no ``pikerd`` daemon-root-actor can be found start it and If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self yield up (we should probably figure out returning a portal to self

View File

@ -49,7 +49,7 @@ from requests.exceptions import (
ReadTimeout, ReadTimeout,
) )
from ._mngr import Services from ._mngr import ServiceMngr
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
get_console_log, get_console_log,
@ -453,7 +453,7 @@ async def open_ahabd(
@acm @acm
async def start_ahab_service( async def start_ahab_service(
services: Services, services: ServiceMngr,
service_name: str, service_name: str,
# endpoint config passed as **kwargs # endpoint config passed as **kwargs
@ -549,7 +549,8 @@ async def start_ahab_service(
log.warning('Failed to cancel root permsed container') log.warning('Failed to cancel root permsed container')
except ( except (
trio.MultiError, # trio.MultiError,
ExceptionGroup,
) as err: ) as err:
for subexc in err.exceptions: for subexc in err.exceptions:
if isinstance(subexc, PermissionError): if isinstance(subexc, PermissionError):

View File

@ -26,14 +26,17 @@ from typing import (
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from collections import defaultdict
import tractor import tractor
import trio
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
) )
from ._mngr import ( from ._mngr import (
Services, get_service_mngr,
ServiceMngr,
) )
from ._actor_runtime import maybe_open_pikerd from ._actor_runtime import maybe_open_pikerd
from ._registry import find_service from ._registry import find_service
@ -41,15 +44,14 @@ from ._registry import find_service
@acm @acm
async def maybe_spawn_daemon( async def maybe_spawn_daemon(
service_name: str, service_name: str,
service_task_target: Callable, service_task_target: Callable,
spawn_args: dict[str, Any], spawn_args: dict[str, Any],
loglevel: str | None = None, loglevel: str | None = None,
singleton: bool = False, singleton: bool = False,
_locks = defaultdict(trio.Lock),
**pikerd_kwargs, **pikerd_kwargs,
) -> tractor.Portal: ) -> tractor.Portal:
@ -67,7 +69,7 @@ async def maybe_spawn_daemon(
''' '''
# serialize access to this section to avoid # serialize access to this section to avoid
# 2 or more tasks racing to create a daemon # 2 or more tasks racing to create a daemon
lock = Services.locks[service_name] lock = _locks[service_name]
await lock.acquire() await lock.acquire()
async with find_service( async with find_service(
@ -132,7 +134,65 @@ async def maybe_spawn_daemon(
async with tractor.wait_for_actor(service_name) as portal: async with tractor.wait_for_actor(service_name) as portal:
lock.release() lock.release()
yield portal yield portal
await portal.cancel_actor() # --- ---- ---
# XXX NOTE XXX
# --- ---- ---
# DO NOT PUT A `portal.cancel_actor()` here (as was prior)!
#
# Doing so will cause an "out-of-band" ctxc
# (`tractor.ContextCancelled`) to be raised inside the
# `ServiceMngr.open_context_in_task()`'s call to
# `ctx.wait_for_result()` AND the internal self-ctxc
# "graceful capture" WILL NOT CATCH IT!
#
# This can cause certain types of operations to raise
# that ctxc BEFORE THEY `return`, resulting in
# a "false-negative" ctxc being raised when really
# nothing actually failed, other then our semantic
# "failure" to suppress an expected, graceful,
# self-cancel scenario..
#
# bUt wHy duZ It WorK lIKe dis..
# ------------------------------
# from the perspective of the `tractor.Context` this
# cancel request was conducted "out of band" since
# `Context.cancel()` was never called and thus the
# `._cancel_called: bool` was never set. Despite the
# remote `.canceller` being set to `pikerd` (i.e. the
# same `Actor.uid` of the raising service-mngr task) the
# service-task's ctx itself was never marked as having
# requested cancellation and thus still raises the ctxc
# bc it was unaware of any such request.
#
# How to make grokin these cases easier tho?
# ------------------------------------------
# Because `Portal.cancel_actor()` was called it requests
# "full-`Actor`-runtime-cancellation" of it's peer
# process which IS NOT THE SAME as a single inter-actor
# RPC task cancelling its local context with a remote
# peer `Task` in that same peer process.
#
# ?TODO? It might be better if we do one (or all) of the
# following:
#
# -[ ] at least set a special message for the
# `ContextCancelled` when raised locally by the
# unaware ctx task such that we check for the
# `.canceller` being *our `Actor`* and in the case
# where `Context._cancel_called == False` we specially
# note that this is likely an "out-of-band"
# runtime-cancel request triggered by some call to
# `Portal.cancel_actor()`, possibly even reporting the
# exact LOC of that caller by tracking it inside our
# portal-type?
# -[ ] possibly add another field `ContextCancelled` like
# maybe a,
# `.request_type: Literal['os', 'proc', 'actor',
# 'ctx']` type thing which would allow immediately
# being able to tell what kind of cancellation caused
# the unexpected ctxc?
# -[ ] REMOVE THIS COMMENT, once we've settled on how to
# better augment `tractor` to be more explicit on this!
async def spawn_emsd( async def spawn_emsd(
@ -147,21 +207,22 @@ async def spawn_emsd(
""" """
log.info('Spawning emsd') log.info('Spawning emsd')
portal = await Services.actor_n.start_actor( smngr: ServiceMngr = get_service_mngr()
portal = await smngr.actor_n.start_actor(
'emsd', 'emsd',
enable_modules=[ enable_modules=[
'piker.clearing._ems', 'piker.clearing._ems',
'piker.clearing._client', 'piker.clearing._client',
], ],
loglevel=loglevel, loglevel=loglevel,
debug_mode=Services.debug_mode, # set by pikerd flag debug_mode=smngr.debug_mode, # set by pikerd flag
**extra_tractor_kwargs **extra_tractor_kwargs
) )
# non-blocking setup of clearing service # non-blocking setup of clearing service
from ..clearing._ems import _setup_persistent_emsd from ..clearing._ems import _setup_persistent_emsd
await Services.start_service_task( await smngr.start_service_task(
'emsd', 'emsd',
portal, portal,

View File

@ -18,16 +18,29 @@
daemon-service management API. daemon-service management API.
""" """
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
# contextmanager as cm,
)
from collections import defaultdict from collections import defaultdict
from dataclasses import (
dataclass,
field,
)
import functools
import inspect
from typing import ( from typing import (
Callable, Callable,
Any, Any,
) )
import trio import msgspec
from trio_typing import TaskStatus
import tractor import tractor
import trio
from trio import TaskStatus
from tractor import ( from tractor import (
ActorNursery,
current_actor, current_actor,
ContextCancelled, ContextCancelled,
Context, Context,
@ -39,6 +52,130 @@ from ._util import (
) )
# TODO: implement a singleton deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# TODO: singleton factory API instead of a class API
@acm
async def open_service_mngr(
*,
debug_mode: bool = False,
# impl deat which ensures a single global instance
_singleton: list[ServiceMngr|None] = [None],
**init_kwargs,
) -> ServiceMngr:
'''
Open a multi-subactor-as-service-daemon tree supervisor.
The delivered `ServiceMngr` is a singleton instance for each
actor-process and is allocated on first open and never
de-allocated unless explicitly deleted by al call to
`del_service_mngr()`.
'''
# TODO: factor this an allocation into
# a `._mngr.open_service_mngr()` and put in the
# once-n-only-once setup/`.__aenter__()` part!
# -[ ] how to make this only happen on the `mngr == None` case?
# |_ use `.trionics.maybe_open_context()` (for generic
# async-with-style-only-once of the factory impl, though
# what do we do for the allocation case?
# / `.maybe_open_nursery()` (since for this specific case
# it's simpler?) to activate
async with (
tractor.open_nursery() as an,
trio.open_nursery() as tn,
):
# impl specific obvi..
init_kwargs.update({
'actor_n': an,
'service_n': tn,
})
mngr: ServiceMngr|None
if (mngr := _singleton[0]) is None:
log.info('Allocating a new service mngr!')
mngr = _singleton[0] = ServiceMngr(**init_kwargs)
# TODO: put into `.__aenter__()` section of
# eventual `@singleton_acm` API wrapper.
#
# assign globally for future daemon/task creation
mngr.actor_n = an
mngr.service_n = tn
else:
assert (
mngr.actor_n
and
mngr.service_tn
)
log.info(
'Using extant service mngr!\n\n'
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
)
try:
# NOTE: this is a singleton factory impl specific detail
# which should be supported in the condensed
# `@singleton_acm` API?
mngr.debug_mode = debug_mode
yield mngr
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in mngr.service_tasks:
# await mngr.cancel_service('samplerd')
tn.cancel_scope.cancel()
def get_service_mngr() -> ServiceMngr:
'''
Try to get the singleton service-mngr for this actor presuming it
has already been allocated using,
.. code:: python
async with open_<@singleton_acm(func)>() as mngr`
... this block kept open ...
If not yet allocated raise a `ServiceError`.
'''
# https://stackoverflow.com/a/12627202
# https://docs.python.org/3/library/inspect.html#inspect.Signature
maybe_mngr: ServiceMngr|None = inspect.signature(
open_service_mngr
).parameters['_singleton'].default[0]
if maybe_mngr is None:
raise RuntimeError(
'Someone must allocate a `ServiceMngr` using\n\n'
'`async with open_service_mngr()` beforehand!!\n'
)
return maybe_mngr
# TODO: we need remote wrapping and a general soln: # TODO: we need remote wrapping and a general soln:
# - factor this into a ``tractor.highlevel`` extension # pack for the # - factor this into a ``tractor.highlevel`` extension # pack for the
# library. # library.
@ -46,31 +183,46 @@ from ._util import (
# to the pikerd actor for starting services remotely! # to the pikerd actor for starting services remotely!
# - prolly rename this to ActorServicesNursery since it spawns # - prolly rename this to ActorServicesNursery since it spawns
# new actors and supervises them to completion? # new actors and supervises them to completion?
class Services: @dataclass
class ServiceMngr:
# class ServiceMngr(msgspec.Struct):
'''
A multi-subactor-as-service manager.
actor_n: tractor._supervise.ActorNursery Spawn, supervise and monitor service/daemon subactors in a SC
process tree.
'''
actor_n: ActorNursery
service_n: trio.Nursery service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag debug_mode: bool = False # tractor sub-actor debug mode flag
service_tasks: dict[ service_tasks: dict[
str, str,
tuple[ tuple[
trio.CancelScope, trio.CancelScope,
Context,
Portal, Portal,
trio.Event, trio.Event,
] ]
] = {} ] = field(default_factory=dict)
locks = defaultdict(trio.Lock)
# internal per-service task mutexs
_locks = defaultdict(trio.Lock)
@classmethod
async def start_service_task( async def start_service_task(
self, self,
name: str, name: str,
portal: Portal, portal: Portal,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
target: Callable, target: Callable,
allow_overruns: bool = False, allow_overruns: bool = False,
**ctx_kwargs, **ctx_kwargs,
) -> (trio.CancelScope, Context): ) -> (trio.CancelScope, Context, Any):
''' '''
Open a context in a service sub-actor, add to a stack Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` teardown. that gets unwound at ``pikerd`` teardown.
@ -83,6 +235,7 @@ class Services:
task_status: TaskStatus[ task_status: TaskStatus[
tuple[ tuple[
trio.CancelScope, trio.CancelScope,
Context,
trio.Event, trio.Event,
Any, Any,
] ]
@ -90,64 +243,87 @@ class Services:
) -> Any: ) -> Any:
# TODO: use the ctx._scope directly here instead?
# -[ ] actually what semantics do we expect for this
# usage!?
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
try:
async with portal.open_context( async with portal.open_context(
target, target,
allow_overruns=allow_overruns, allow_overruns=allow_overruns,
**ctx_kwargs, **ctx_kwargs,
) as (ctx, first): ) as (ctx, started):
# unblock once the remote context has started # unblock once the remote context has started
complete = trio.Event() complete = trio.Event()
task_status.started((cs, complete, first)) task_status.started((
cs,
ctx,
complete,
started,
))
log.info( log.info(
f'`pikerd` service {name} started with value {first}' f'`pikerd` service {name} started with value {started}'
) )
try:
# wait on any context's return value # wait on any context's return value
# and any final portal result from the # and any final portal result from the
# sub-actor. # sub-actor.
ctx_res: Any = await ctx.result() ctx_res: Any = await ctx.wait_for_result()
# NOTE: blocks indefinitely until cancelled # NOTE: blocks indefinitely until cancelled
# either by error from the target context # either by error from the target context
# function or by being cancelled here by the # function or by being cancelled here by the
# surrounding cancel scope. # surrounding cancel scope.
return (await portal.result(), ctx_res) return (
await portal.wait_for_result(),
ctx_res,
)
except ContextCancelled as ctxe: except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid our_uid: tuple[str, str] = current_actor().uid
if ( if (
canceller != portal.channel.uid canceller != portal.chan.uid
and and
canceller != our_uid canceller != our_uid
): ):
log.cancel( log.cancel(
f'Actor-service {name} was remotely cancelled?\n' f'Actor-service `{name}` was remotely cancelled by a peer?\n'
f'remote canceller: {canceller}\n'
f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n' # TODO: this would be a good spot to use
# a respawn feature Bo
f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n'
f'cancellee: {portal.chan.uid}\n'
f'canceller: {canceller}\n'
) )
else: else:
raise raise
finally: finally:
# NOTE: the ctx MUST be cancelled first if we
# don't want the above `ctx.wait_for_result()` to
# raise a self-ctxc. WHY, well since from the ctx's
# perspective the cancel request will have
# arrived out-out-of-band at the `Actor.cancel()`
# level, thus `Context.cancel_called == False`,
# meaning `ctx._is_self_cancelled() == False`.
# with trio.CancelScope(shield=True):
# await ctx.cancel()
await portal.cancel_actor() await portal.cancel_actor()
complete.set() complete.set()
self.service_tasks.pop(name) self.service_tasks.pop(name)
cs, complete, first = await self.service_n.start(open_context_in_task) cs, sub_ctx, complete, started = await self.service_n.start(
open_context_in_task
)
# store the cancel scope and portal for later cancellation or # store the cancel scope and portal for later cancellation or
# retstart if needed. # retstart if needed.
self.service_tasks[name] = (cs, portal, complete) self.service_tasks[name] = (cs, sub_ctx, portal, complete)
return cs, sub_ctx, started
return cs, first
@classmethod
async def cancel_service( async def cancel_service(
self, self,
name: str, name: str,
@ -158,8 +334,80 @@ class Services:
''' '''
log.info(f'Cancelling `pikerd` service {name}') log.info(f'Cancelling `pikerd` service {name}')
cs, portal, complete = self.service_tasks[name] cs, sub_ctx, portal, complete = self.service_tasks[name]
cs.cancel()
# cs.cancel()
await sub_ctx.cancel()
await complete.wait() await complete.wait()
assert name not in self.service_tasks, \
if name in self.service_tasks:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Serice task for {name} not terminated?' f'Serice task for {name} not terminated?'
)
# assert name not in self.service_tasks, \
# f'Serice task for {name} not terminated?'
async def start_service(
self,
daemon_name: str,
ctx_ep: Callable, # kwargs must `partial`-ed in!
debug_mode: bool = False,
**tractor_actor_kwargs,
) -> Context:
'''
Start a "service" task in a new sub-actor (daemon) and manage it's lifetime
indefinitely.
Services can be cancelled/shutdown using `.cancel_service()`.
'''
entry: tuple|None = self.service_tasks.get(daemon_name)
if entry:
(cs, sub_ctx, portal, complete) = entry
return sub_ctx
if daemon_name not in self.service_tasks:
portal = await self.actor_n.start_actor(
daemon_name,
debug_mode=( # maybe set globally during allocate
debug_mode
or
self.debug_mode
),
**tractor_actor_kwargs,
)
ctx_kwargs: dict[str, Any] = {}
if isinstance(ctx_ep, functools.partial):
ctx_kwargs: dict[str, Any] = ctx_ep.keywords
ctx_ep: Callable = ctx_ep.func
(cs, sub_ctx, started) = await self.start_service_task(
daemon_name,
portal,
ctx_ep,
**ctx_kwargs,
)
return sub_ctx
# TODO:
# -[ ] factor all the common shit from `.data._sampling`
# and `.brokers._daemon` into here / `ServiceMngr`
# in terms of allocating the `Portal` as part of the
# "service-in-subactor" starting!
# -[ ] move to `tractor.hilevel._service`, import and use here!
# NOTE: purposely leaks the ref to the mod-scope Bo
# import tractor
# from tractor.hilevel import (
# open_service_mngr,
# ServiceMngr,
# )
# mngr: ServiceMngr|None = None
# with tractor.hilevel.open_service_mngr() as mngr:
# Services = proxy(mngr)

View File

@ -21,11 +21,13 @@ from typing import (
TYPE_CHECKING, TYPE_CHECKING,
) )
# TODO: oof, needs to be changed to `httpx`!
import asks import asks
if TYPE_CHECKING: if TYPE_CHECKING:
import docker import docker
from ._ahab import DockerContainer from ._ahab import DockerContainer
from . import ServiceMngr
from ._util import log # sub-sys logger from ._util import log # sub-sys logger
from ._util import ( from ._util import (
@ -127,7 +129,7 @@ def start_elasticsearch(
@acm @acm
async def start_ahab_daemon( async def start_ahab_daemon(
service_mngr: Services, service_mngr: ServiceMngr,
user_config: dict | None = None, user_config: dict | None = None,
loglevel: str | None = None, loglevel: str | None = None,

View File

@ -53,7 +53,7 @@ import pendulum
# import purerpc # import purerpc
from ..data.feed import maybe_open_feed from ..data.feed import maybe_open_feed
from . import Services from . import ServiceMngr
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
get_console_log, get_console_log,
@ -233,7 +233,7 @@ def start_marketstore(
@acm @acm
async def start_ahab_daemon( async def start_ahab_daemon(
service_mngr: Services, service_mngr: ServiceMngr,
user_config: dict | None = None, user_config: dict | None = None,
loglevel: str | None = None, loglevel: str | None = None,

View File

@ -386,8 +386,6 @@ def ldshm(
open_annot_ctl() as actl, open_annot_ctl() as actl,
): ):
shm_df: pl.DataFrame | None = None shm_df: pl.DataFrame | None = None
tf2aids: dict[float, dict] = {}
for ( for (
shmfile, shmfile,
shm, shm,
@ -528,17 +526,16 @@ def ldshm(
new_df, new_df,
step_gaps, step_gaps,
) )
# last chance manual overwrites in REPL # last chance manual overwrites in REPL
# await tractor.pause() await tractor.pause()
assert aids assert aids
tf2aids[period_s] = aids
else: else:
# allow interaction even when no ts problems. # allow interaction even when no ts problems.
assert not diff
await tractor.pause() await tractor.pause()
log.info('Exiting TSP shm anal-izer!') # assert not diff
if shm_df is None: if shm_df is None:
log.error( log.error(

View File

@ -161,13 +161,7 @@ class NativeStorageClient:
def index_files(self): def index_files(self):
for path in self._datadir.iterdir(): for path in self._datadir.iterdir():
if ( if path.name in {'borked', 'expired',}:
path.is_dir()
or
'.parquet' not in str(path)
# or
# path.name in {'borked', 'expired',}
):
continue continue
key: str = path.name.rstrip('.parquet') key: str = path.name.rstrip('.parquet')

View File

@ -44,10 +44,8 @@ import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from pendulum import ( from pendulum import (
Interval,
DateTime, DateTime,
Duration, Duration,
duration as mk_duration,
from_timestamp, from_timestamp,
) )
import numpy as np import numpy as np
@ -216,8 +214,7 @@ async def maybe_fill_null_segments(
# pair, immediately stop backfilling? # pair, immediately stop backfilling?
if ( if (
start_dt start_dt
and and end_dt < start_dt
end_dt < start_dt
): ):
await tractor.pause() await tractor.pause()
break break
@ -265,7 +262,6 @@ async def maybe_fill_null_segments(
except tractor.ContextCancelled: except tractor.ContextCancelled:
# log.exception # log.exception
await tractor.pause() await tractor.pause()
raise
null_segs_detected.set() null_segs_detected.set()
# RECHECK for more null-gaps # RECHECK for more null-gaps
@ -353,7 +349,7 @@ async def maybe_fill_null_segments(
async def start_backfill( async def start_backfill(
get_hist, get_hist,
def_frame_duration: Duration, frame_types: dict[str, Duration] | None,
mod: ModuleType, mod: ModuleType,
mkt: MktPair, mkt: MktPair,
shm: ShmArray, shm: ShmArray,
@ -383,23 +379,22 @@ async def start_backfill(
update_start_on_prepend: bool = False update_start_on_prepend: bool = False
if backfill_until_dt is None: if backfill_until_dt is None:
# TODO: per-provider default history-durations? # TODO: drop this right and just expose the backfill
# -[ ] inside the `open_history_client()` config allow # limits inside a [storage] section in conf.toml?
# declaring the history duration limits instead of # when no tsdb "last datum" is provided, we just load
# guessing and/or applying the same limits to all? # some near-term history.
# # periods = {
# -[ ] allow declaring (default) per-provider backfill # 1: {'days': 1},
# limits inside a [storage] sub-section in conf.toml? # 60: {'days': 14},
# # }
# NOTE, when no tsdb "last datum" is provided, we just
# load some near-term history by presuming a "decently # do a decently sized backfill and load it into storage.
# large" 60s duration limit and a much shorter 1s range.
periods = { periods = {
1: {'days': 2}, 1: {'days': 2},
60: {'years': 6}, 60: {'years': 6},
} }
period_duration: int = periods[timeframe] period_duration: int = periods[timeframe]
update_start_on_prepend: bool = True update_start_on_prepend = True
# NOTE: manually set the "latest" datetime which we intend to # NOTE: manually set the "latest" datetime which we intend to
# backfill history "until" so as to adhere to the history # backfill history "until" so as to adhere to the history
@ -421,6 +416,7 @@ async def start_backfill(
f'backfill_until_dt: {backfill_until_dt}\n' f'backfill_until_dt: {backfill_until_dt}\n'
f'last_start_dt: {last_start_dt}\n' f'last_start_dt: {last_start_dt}\n'
) )
try: try:
( (
array, array,
@ -430,58 +426,37 @@ async def start_backfill(
timeframe, timeframe,
end_dt=last_start_dt, end_dt=last_start_dt,
) )
except NoData as _daterr: except NoData as _daterr:
orig_last_start_dt: datetime = last_start_dt # 3 cases:
gap_report: str = ( # - frame in the middle of a legit venue gap
f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n' # - history actually began at the `last_start_dt`
f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n' # - some other unknown error (ib blocking the
f'last_start_dt: {orig_last_start_dt}\n\n' # history bc they don't want you seeing how they
f'bf_until: {backfill_until_dt}\n' # cucked all the tinas..)
if dur := frame_types.get(timeframe):
# decrement by a frame's worth of duration and
# retry a few times.
last_start_dt.subtract(
seconds=dur.total_seconds()
) )
# EMPTY FRAME signal with 3 (likely) causes: log.warning(
# f'{mod.name} -> EMPTY FRAME for end_dt?\n'
# 1. range contains legit gap in venue history f'tf@fqme: {timeframe}@{mkt.fqme}\n'
# 2. history actually (edge case) **began** at the 'bf_until <- last_start_dt:\n'
# value `last_start_dt` f'{backfill_until_dt} <- {last_start_dt}\n'
# 3. some other unknown error (ib blocking the f'Decrementing `end_dt` by {dur} and retry..\n'
# history-query bc they don't want you seeing how
# they cucked all the tinas.. like with options
# hist)
#
if def_frame_duration:
# decrement by a duration's (frame) worth of time
# as maybe indicated by the backend to see if we
# can get older data before this possible
# "history gap".
last_start_dt: datetime = last_start_dt.subtract(
seconds=def_frame_duration.total_seconds()
) )
gap_report += (
f'Decrementing `end_dt` and retrying with,\n'
f'def_frame_duration: {def_frame_duration}\n'
f'(new) last_start_dt: {last_start_dt}\n'
)
log.warning(gap_report)
# skip writing to shm/tsdb and try the next
# duration's worth of prior history.
continue continue
else:
# await tractor.pause()
raise DataUnavailable(gap_report)
# broker says there never was or is no more history to pull # broker says there never was or is no more history to pull
except DataUnavailable as due: except DataUnavailable:
message: str = due.args[0]
log.warning( log.warning(
f'Provider {mod.name!r} halted backfill due to,\n\n' f'NO-MORE-DATA in range?\n'
f'`{mod.name}` halted history:\n'
f'{message}\n' f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'fqme: {mkt.fqme}\n' f'{backfill_until_dt} <- {last_start_dt}\n'
f'timeframe: {timeframe}\n'
f'last_start_dt: {last_start_dt}\n'
f'bf_until: {backfill_until_dt}\n'
) )
# UGH: what's a better way? # UGH: what's a better way?
# TODO: backends are responsible for being correct on # TODO: backends are responsible for being correct on
@ -490,54 +465,34 @@ async def start_backfill(
# to halt the request loop until the condition is # to halt the request loop until the condition is
# resolved or should the backend be entirely in # resolved or should the backend be entirely in
# charge of solving such faults? yes, right? # charge of solving such faults? yes, right?
# if timeframe > 1:
# await tractor.pause()
return return
time: np.ndarray = array['time']
assert ( assert (
time[0] array['time'][0]
== ==
next_start_dt.timestamp() next_start_dt.timestamp()
) )
assert time[-1] == next_end_dt.timestamp() diff = last_start_dt - next_start_dt
frame_time_diff_s = diff.seconds
expected_dur: Interval = last_start_dt - next_start_dt
# frame's worth of sample-period-steps, in seconds # frame's worth of sample-period-steps, in seconds
frame_size_s: float = len(array) * timeframe frame_size_s: float = len(array) * timeframe
recv_frame_dur: Duration = ( expected_frame_size_s: float = frame_size_s + timeframe
from_timestamp(array[-1]['time']) if frame_time_diff_s > expected_frame_size_s:
-
from_timestamp(array[0]['time'])
)
if (
(lt_frame := (recv_frame_dur < expected_dur))
or
(null_frame := (frame_size_s == 0))
# ^XXX, should NEVER hit now!
):
# XXX: query result includes a start point prior to our # XXX: query result includes a start point prior to our
# expected "frame size" and thus is likely some kind of # expected "frame size" and thus is likely some kind of
# history gap (eg. market closed period, outage, etc.) # history gap (eg. market closed period, outage, etc.)
# so just report it to console for now. # so just report it to console for now.
if lt_frame:
reason = 'Possible GAP (or first-datum)'
else:
assert null_frame
reason = 'NULL-FRAME'
missing_dur: Interval = expected_dur.end - recv_frame_dur.end
log.warning( log.warning(
f'{timeframe}s-series {reason} detected!\n' 'GAP DETECTED:\n'
f'fqme: {mkt.fqme}\n' f'last_start_dt: {last_start_dt}\n'
f'last_start_dt: {last_start_dt}\n\n' f'diff: {diff}\n'
f'recv interval: {recv_frame_dur}\n' f'frame_time_diff_s: {frame_time_diff_s}\n'
f'expected interval: {expected_dur}\n\n'
f'Missing duration of history of {missing_dur.in_words()!r}\n'
f'{missing_dur}\n'
) )
# await tractor.pause()
to_push = diff_history( to_push = diff_history(
array, array,
@ -612,27 +567,23 @@ async def start_backfill(
# long-term storage. # long-term storage.
if ( if (
storage is not None storage is not None
and and write_tsdb
write_tsdb
): ):
log.info( log.info(
f'Writing {ln} frame to storage:\n' f'Writing {ln} frame to storage:\n'
f'{next_start_dt} -> {last_start_dt}' f'{next_start_dt} -> {last_start_dt}'
) )
# NOTE, always drop the src asset token for # always drop the src asset token for
# non-currency-pair like market types (for now) # non-currency-pair like market types (for now)
#
# THAT IS, for now our table key schema is NOT
# including the dst[/src] source asset token. SO,
# 'tsla.nasdaq.ib' over 'tsla/usd.nasdaq.ib' for
# historical reasons ONLY.
if mkt.dst.atype not in { if mkt.dst.atype not in {
'crypto', 'crypto',
'crypto_currency', 'crypto_currency',
'fiat', # a "forex pair" 'fiat', # a "forex pair"
'perpetual_future', # stupid "perps" from cex land 'perpetual_future', # stupid "perps" from cex land
}: }:
# for now, our table key schema is not including
# the dst[/src] source asset token.
col_sym_key: str = mkt.get_fqme( col_sym_key: str = mkt.get_fqme(
delim_char='', delim_char='',
without_src=True, without_src=True,
@ -737,7 +688,7 @@ async def back_load_from_tsdb(
last_tsdb_dt last_tsdb_dt
and latest_start_dt and latest_start_dt
): ):
backfilled_size_s: Duration = ( backfilled_size_s = (
latest_start_dt - last_tsdb_dt latest_start_dt - last_tsdb_dt
).seconds ).seconds
# if the shm buffer len is not large enough to contain # if the shm buffer len is not large enough to contain
@ -960,8 +911,6 @@ async def tsdb_backfill(
f'{pformat(config)}\n' f'{pformat(config)}\n'
) )
# concurrently load the provider's most-recent-frame AND any
# pre-existing tsdb history already saved in `piker` storage.
dt_eps: list[DateTime, DateTime] = [] dt_eps: list[DateTime, DateTime] = []
async with trio.open_nursery() as tn: async with trio.open_nursery() as tn:
tn.start_soon( tn.start_soon(
@ -972,6 +921,7 @@ async def tsdb_backfill(
timeframe, timeframe,
config, config,
) )
tsdb_entry: tuple = await load_tsdb_hist( tsdb_entry: tuple = await load_tsdb_hist(
storage, storage,
mkt, mkt,
@ -1000,25 +950,6 @@ async def tsdb_backfill(
mr_end_dt, mr_end_dt,
) = dt_eps ) = dt_eps
first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds
calced_frame_size: Duration = mk_duration(
seconds=first_frame_dur_s,
)
# NOTE, attempt to use the backend declared default frame
# sizing (as allowed by their time-series query APIs) and
# if not provided try to construct a default from the
# first frame received above.
def_frame_durs: dict[
int,
Duration,
]|None = config.get('frame_types', None)
if def_frame_durs:
def_frame_size: Duration = def_frame_durs[timeframe]
assert def_frame_size == calced_frame_size
else:
# use what we calced from first frame above.
def_frame_size = calced_frame_size
# NOTE: when there's no offline data, there's 2 cases: # NOTE: when there's no offline data, there's 2 cases:
# - data backend doesn't support timeframe/sample # - data backend doesn't support timeframe/sample
# period (in which case `dt_eps` should be `None` and # period (in which case `dt_eps` should be `None` and
@ -1049,7 +980,7 @@ async def tsdb_backfill(
partial( partial(
start_backfill, start_backfill,
get_hist=get_hist, get_hist=get_hist,
def_frame_duration=def_frame_size, frame_types=config.get('frame_types', None),
mod=mod, mod=mod,
mkt=mkt, mkt=mkt,
shm=shm, shm=shm,

View File

@ -616,18 +616,6 @@ def detect_price_gaps(
# ]) # ])
... ...
# TODO: probably just use the null_segs impl above?
def detect_vlm_gaps(
df: pl.DataFrame,
col: str = 'volume',
) -> pl.DataFrame:
vnull: pl.DataFrame = w_dts.filter(
pl.col(col) == 0
)
return vnull
def dedupe( def dedupe(
src_df: pl.DataFrame, src_df: pl.DataFrame,
@ -638,6 +626,7 @@ def dedupe(
) -> tuple[ ) -> tuple[
pl.DataFrame, # with dts pl.DataFrame, # with dts
pl.DataFrame, # gaps
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal) pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
int, # len diff between input and deduped int, # len diff between input and deduped
]: ]:
@ -650,22 +639,19 @@ def dedupe(
''' '''
wdts: pl.DataFrame = with_dts(src_df) wdts: pl.DataFrame = with_dts(src_df)
deduped = wdts
# remove duplicated datetime samples/sections
deduped: pl.DataFrame = wdts.unique(
# subset=['dt'],
subset=['time'],
maintain_order=True,
)
# maybe sort on any time field # maybe sort on any time field
if sort: if sort:
deduped = deduped.sort(by='time') wdts = wdts.sort(by='time')
# TODO: detect out-of-order segments which were corrected! # TODO: detect out-of-order segments which were corrected!
# -[ ] report in log msg # -[ ] report in log msg
# -[ ] possibly return segment sections which were moved? # -[ ] possibly return segment sections which were moved?
# remove duplicated datetime samples/sections
deduped: pl.DataFrame = wdts.unique(
subset=['dt'],
maintain_order=True,
)
diff: int = ( diff: int = (
wdts.height wdts.height
- -

View File

@ -18,6 +18,22 @@
requires = ["hatchling"] requires = ["hatchling"]
build-backend = "hatchling.build" build-backend = "hatchling.build"
# ------ - ------
[tool.ruff.lint]
# https://docs.astral.sh/ruff/settings/#lint_ignore
ignore = []
# https://docs.astral.sh/ruff/settings/#lint_per-file-ignores
"piker/ui/qt.py" = [
"E402",
'F401', # unused imports (without __all__ or blah as blah)
# "F841", # unused variable rules
]
# ignore-init-module-imports = false
# ------ - ------
[project] [project]
name = "piker" name = "piker"
version = "0.1.0a0dev0" version = "0.1.0a0dev0"
@ -87,23 +103,15 @@ uis = [
"pyqt6 >=6.7.0, <7.0.0", "pyqt6 >=6.7.0, <7.0.0",
"pyqtgraph", "pyqtgraph",
# for consideration, # ------ - ------
# - 'visidata'
# TODO: add an `--only daemon` group for running non-ui / pikerd # TODO: add an `--only daemon` group for running non-ui / pikerd
# service tree in distributed mode B) # service tree in distributed mode B)
# https://docs.astral.sh/uv/concepts/projects/dependencies/#optional-dependencies # https://docs.astral.sh/uv/concepts/projects/dependencies/#optional-dependencies
# [project.optional-dependencies]
] ]
[dependency-groups] [dependency-groups]
# TODO: a toolset that makes debugging a `pikerd` service (tree) easy
# to hack on directly using more or less the local env:
# - xonsh + xxh
# - rsyscall + pdbp
# - actor runtime control console like BEAM/OTP
#
# console ehancements and eventually remote debugging extras/helpers.
# use `uv --dev` to enable
dev = [ dev = [
"pytest >=6.0.0, <7.0.0", "pytest >=6.0.0, <7.0.0",
"elasticsearch >=8.9.0, <9.0.0", "elasticsearch >=8.9.0, <9.0.0",
@ -111,7 +119,13 @@ dev = [
"prompt-toolkit ==3.0.40", "prompt-toolkit ==3.0.40",
"cython >=3.0.0, <4.0.0", "cython >=3.0.0, <4.0.0",
"greenback >=1.1.1, <2.0.0", "greenback >=1.1.1, <2.0.0",
"ruff>=0.9.6", # console ehancements and eventually remote debugging
# extras/helpers.
# TODO: add a toolset that makes debugging a `pikerd` service
# (tree) easy to hack on directly using more or less the local env:
# - xonsh + xxh
# - rsyscall + pdbp
# - actor runtime control console like BEAM/OTP
] ]
[project.scripts] [project.scripts]
@ -130,4 +144,4 @@ pyqtgraph = { git = "https://github.com/pikers/pyqtgraph.git" }
asyncvnc = { git = "https://github.com/pikers/asyncvnc.git", branch = "main" } asyncvnc = { git = "https://github.com/pikers/asyncvnc.git", branch = "main" }
tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" } tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" }
msgspec = { git = "https://github.com/jcrist/msgspec.git" } msgspec = { git = "https://github.com/jcrist/msgspec.git" }
tractor = { path = "../tractor", editable = true } tractor = { path = "../tractor" }

View File

@ -1,93 +0,0 @@
# from default `ruff.toml` @
# https://docs.astral.sh/ruff/configuration/
# Exclude a variety of commonly ignored directories.
exclude = [
".bzr",
".direnv",
".eggs",
".git",
".git-rewrite",
".hg",
".ipynb_checkpoints",
".mypy_cache",
".nox",
".pants.d",
".pyenv",
".pytest_cache",
".pytype",
".ruff_cache",
".svn",
".tox",
".venv",
".vscode",
"__pypackages__",
"_build",
"buck-out",
"build",
"dist",
"node_modules",
"site-packages",
"venv",
]
# Same as Black.
line-length = 88
indent-width = 4
# Assume Python 3.9
target-version = "py312"
# ------ - ------
# TODO, stop warnings around `anext()` builtin use?
# tool.ruff.target-version = "py310"
[lint]
# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default.
# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or
# McCabe complexity (`C901`) by default.
select = ["E4", "E7", "E9", "F"]
ignore = []
ignore-init-module-imports = false
[lint.per-file-ignores]
"piker/ui/qt.py" = [
"E402",
'F401', # unused imports (without __all__ or blah as blah)
# "F841", # unused variable rules
]
# Allow fix for all enabled rules (when `--fix`) is provided.
fixable = ["ALL"]
unfixable = []
# Allow unused variables when underscore-prefixed.
dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
[format]
# Use single quotes in `ruff format`.
quote-style = "single"
# Like Black, indent with spaces, rather than tabs.
indent-style = "space"
# Like Black, respect magic trailing commas.
skip-magic-trailing-comma = false
# Like Black, automatically detect the appropriate line ending.
line-ending = "auto"
# Enable auto-formatting of code examples in docstrings. Markdown,
# reStructuredText code/literal blocks and doctests are all supported.
#
# This is currently disabled by default, but it is planned for this
# to be opt-out in the future.
docstring-code-format = false
# Set the line length limit used when formatting code snippets in
# docstrings.
#
# This only has an effect when the `docstring-code-format` setting is
# enabled.
docstring-code-line-length = "dynamic"

View File

@ -10,7 +10,7 @@ from piker import (
config, config,
) )
from piker.service import ( from piker.service import (
Services, get_service_mngr,
) )
from piker.log import get_console_log from piker.log import get_console_log
@ -129,7 +129,7 @@ async def _open_test_pikerd(
) as service_manager, ) as service_manager,
): ):
# this proc/actor is the pikerd # this proc/actor is the pikerd
assert service_manager is Services assert service_manager is get_service_mngr()
async with tractor.wait_for_actor( async with tractor.wait_for_actor(
'pikerd', 'pikerd',

View File

@ -26,7 +26,7 @@ import pytest
import tractor import tractor
from uuid import uuid4 from uuid import uuid4
from piker.service import Services from piker.service import ServiceMngr
from piker.log import get_logger from piker.log import get_logger
from piker.clearing._messages import ( from piker.clearing._messages import (
Order, Order,
@ -158,7 +158,7 @@ def load_and_check_pos(
def test_ems_err_on_bad_broker( def test_ems_err_on_bad_broker(
open_test_pikerd: Services, open_test_pikerd: ServiceMngr,
loglevel: str, loglevel: str,
): ):
async def load_bad_fqme(): async def load_bad_fqme():

View File

@ -15,7 +15,7 @@ import tractor
from piker.service import ( from piker.service import (
find_service, find_service,
Services, ServiceMngr,
) )
from piker.data import ( from piker.data import (
open_feed, open_feed,
@ -44,7 +44,7 @@ def test_runtime_boot(
async def main(): async def main():
port = 6666 port = 6666
daemon_addr = ('127.0.0.1', port) daemon_addr = ('127.0.0.1', port)
services: Services services: ServiceMngr
async with ( async with (
open_test_pikerd( open_test_pikerd(

31
uv.lock
View File

@ -708,7 +708,6 @@ dev = [
{ name = "greenback" }, { name = "greenback" },
{ name = "prompt-toolkit" }, { name = "prompt-toolkit" },
{ name = "pytest" }, { name = "pytest" },
{ name = "ruff" },
{ name = "xonsh" }, { name = "xonsh" },
] ]
@ -740,7 +739,7 @@ requires-dist = [
{ name = "tomli", specifier = ">=2.0.1,<3.0.0" }, { name = "tomli", specifier = ">=2.0.1,<3.0.0" },
{ name = "tomli-w", specifier = ">=1.0.0,<2.0.0" }, { name = "tomli-w", specifier = ">=1.0.0,<2.0.0" },
{ name = "tomlkit", git = "https://github.com/pikers/tomlkit.git?branch=piker_pin" }, { name = "tomlkit", git = "https://github.com/pikers/tomlkit.git?branch=piker_pin" },
{ name = "tractor", editable = "../tractor" }, { name = "tractor", directory = "../tractor" },
{ name = "trio", specifier = ">=0.24,<0.25" }, { name = "trio", specifier = ">=0.24,<0.25" },
{ name = "trio-util", specifier = ">=0.7.0,<0.8.0" }, { name = "trio-util", specifier = ">=0.7.0,<0.8.0" },
{ name = "trio-websocket", specifier = ">=0.10.3,<0.11.0" }, { name = "trio-websocket", specifier = ">=0.10.3,<0.11.0" },
@ -755,7 +754,6 @@ dev = [
{ name = "greenback", specifier = ">=1.1.1,<2.0.0" }, { name = "greenback", specifier = ">=1.1.1,<2.0.0" },
{ name = "prompt-toolkit", specifier = "==3.0.40" }, { name = "prompt-toolkit", specifier = "==3.0.40" },
{ name = "pytest", specifier = ">=6.0.0,<7.0.0" }, { name = "pytest", specifier = ">=6.0.0,<7.0.0" },
{ name = "ruff", specifier = ">=0.9.6" },
{ name = "xonsh", specifier = ">=0.14.2,<0.15.0" }, { name = "xonsh", specifier = ">=0.14.2,<0.15.0" },
] ]
@ -1075,31 +1073,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/19/71/39c7c0d87f8d4e6c020a393182060eaefeeae6c01dab6a84ec346f2567df/rich-13.9.4-py3-none-any.whl", hash = "sha256:6049d5e6ec054bf2779ab3358186963bac2ea89175919d699e378b99738c2a90", size = 242424 }, { url = "https://files.pythonhosted.org/packages/19/71/39c7c0d87f8d4e6c020a393182060eaefeeae6c01dab6a84ec346f2567df/rich-13.9.4-py3-none-any.whl", hash = "sha256:6049d5e6ec054bf2779ab3358186963bac2ea89175919d699e378b99738c2a90", size = 242424 },
] ]
[[package]]
name = "ruff"
version = "0.9.6"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/2a/e1/e265aba384343dd8ddd3083f5e33536cd17e1566c41453a5517b5dd443be/ruff-0.9.6.tar.gz", hash = "sha256:81761592f72b620ec8fa1068a6fd00e98a5ebee342a3642efd84454f3031dca9", size = 3639454 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/76/e3/3d2c022e687e18cf5d93d6bfa2722d46afc64eaa438c7fbbdd603b3597be/ruff-0.9.6-py3-none-linux_armv6l.whl", hash = "sha256:2f218f356dd2d995839f1941322ff021c72a492c470f0b26a34f844c29cdf5ba", size = 11714128 },
{ url = "https://files.pythonhosted.org/packages/e1/22/aff073b70f95c052e5c58153cba735748c9e70107a77d03420d7850710a0/ruff-0.9.6-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:b908ff4df65dad7b251c9968a2e4560836d8f5487c2f0cc238321ed951ea0504", size = 11682539 },
{ url = "https://files.pythonhosted.org/packages/75/a7/f5b7390afd98a7918582a3d256cd3e78ba0a26165a467c1820084587cbf9/ruff-0.9.6-py3-none-macosx_11_0_arm64.whl", hash = "sha256:b109c0ad2ececf42e75fa99dc4043ff72a357436bb171900714a9ea581ddef83", size = 11132512 },
{ url = "https://files.pythonhosted.org/packages/a6/e3/45de13ef65047fea2e33f7e573d848206e15c715e5cd56095589a7733d04/ruff-0.9.6-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1de4367cca3dac99bcbd15c161404e849bb0bfd543664db39232648dc00112dc", size = 11929275 },
{ url = "https://files.pythonhosted.org/packages/7d/f2/23d04cd6c43b2e641ab961ade8d0b5edb212ecebd112506188c91f2a6e6c/ruff-0.9.6-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:ac3ee4d7c2c92ddfdaedf0bf31b2b176fa7aa8950efc454628d477394d35638b", size = 11466502 },
{ url = "https://files.pythonhosted.org/packages/b5/6f/3a8cf166f2d7f1627dd2201e6cbc4cb81f8b7d58099348f0c1ff7b733792/ruff-0.9.6-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:5dc1edd1775270e6aa2386119aea692039781429f0be1e0949ea5884e011aa8e", size = 12676364 },
{ url = "https://files.pythonhosted.org/packages/f5/c4/db52e2189983c70114ff2b7e3997e48c8318af44fe83e1ce9517570a50c6/ruff-0.9.6-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:4a091729086dffa4bd070aa5dab7e39cc6b9d62eb2bef8f3d91172d30d599666", size = 13335518 },
{ url = "https://files.pythonhosted.org/packages/66/44/545f8a4d136830f08f4d24324e7db957c5374bf3a3f7a6c0bc7be4623a37/ruff-0.9.6-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:d1bbc6808bf7b15796cef0815e1dfb796fbd383e7dbd4334709642649625e7c5", size = 12823287 },
{ url = "https://files.pythonhosted.org/packages/c5/26/8208ef9ee7431032c143649a9967c3ae1aae4257d95e6f8519f07309aa66/ruff-0.9.6-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:589d1d9f25b5754ff230dce914a174a7c951a85a4e9270613a2b74231fdac2f5", size = 14592374 },
{ url = "https://files.pythonhosted.org/packages/31/70/e917781e55ff39c5b5208bda384fd397ffd76605e68544d71a7e40944945/ruff-0.9.6-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dc61dd5131742e21103fbbdcad683a8813be0e3c204472d520d9a5021ca8b217", size = 12500173 },
{ url = "https://files.pythonhosted.org/packages/84/f5/e4ddee07660f5a9622a9c2b639afd8f3104988dc4f6ba0b73ffacffa9a8c/ruff-0.9.6-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:5e2d9126161d0357e5c8f30b0bd6168d2c3872372f14481136d13de9937f79b6", size = 11906555 },
{ url = "https://files.pythonhosted.org/packages/f1/2b/6ff2fe383667075eef8656b9892e73dd9b119b5e3add51298628b87f6429/ruff-0.9.6-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:68660eab1a8e65babb5229a1f97b46e3120923757a68b5413d8561f8a85d4897", size = 11538958 },
{ url = "https://files.pythonhosted.org/packages/3c/db/98e59e90de45d1eb46649151c10a062d5707b5b7f76f64eb1e29edf6ebb1/ruff-0.9.6-py3-none-musllinux_1_2_i686.whl", hash = "sha256:c4cae6c4cc7b9b4017c71114115db0445b00a16de3bcde0946273e8392856f08", size = 12117247 },
{ url = "https://files.pythonhosted.org/packages/ec/bc/54e38f6d219013a9204a5a2015c09e7a8c36cedcd50a4b01ac69a550b9d9/ruff-0.9.6-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:19f505b643228b417c1111a2a536424ddde0db4ef9023b9e04a46ed8a1cb4656", size = 12554647 },
{ url = "https://files.pythonhosted.org/packages/a5/7d/7b461ab0e2404293c0627125bb70ac642c2e8d55bf590f6fce85f508f1b2/ruff-0.9.6-py3-none-win32.whl", hash = "sha256:194d8402bceef1b31164909540a597e0d913c0e4952015a5b40e28c146121b5d", size = 9949214 },
{ url = "https://files.pythonhosted.org/packages/ee/30/c3cee10f915ed75a5c29c1e57311282d1a15855551a64795c1b2bbe5cf37/ruff-0.9.6-py3-none-win_amd64.whl", hash = "sha256:03482d5c09d90d4ee3f40d97578423698ad895c87314c4de39ed2af945633caa", size = 10999914 },
{ url = "https://files.pythonhosted.org/packages/e8/a8/d71f44b93e3aa86ae232af1f2126ca7b95c0f515ec135462b3e1f351441c/ruff-0.9.6-py3-none-win_arm64.whl", hash = "sha256:0e2bb706a2be7ddfea4a4af918562fdc1bcb16df255e5fa595bbd800ce322a5a", size = 10177499 },
]
[[package]] [[package]]
name = "shellingham" name = "shellingham"
version = "1.5.4" version = "1.5.4"
@ -1215,7 +1188,7 @@ source = { git = "https://github.com/pikers/tomlkit.git?branch=piker_pin#8e0239a
[[package]] [[package]]
name = "tractor" name = "tractor"
version = "0.1.0a6.dev0" version = "0.1.0a6.dev0"
source = { editable = "../tractor" } source = { directory = "../tractor" }
dependencies = [ dependencies = [
{ name = "colorlog" }, { name = "colorlog" },
{ name = "msgspec" }, { name = "msgspec" },