Compare commits

..

39 Commits

Author SHA1 Message Date
Nelson Torres 91d174b95f Add Plot
Here is the `plot_graph()` that is in char of the bars, scatter and vertical line plot items.

Also all the necessary code  for the graph to be shown.
2025-02-17 21:06:59 -03:00
Nelson Torres 63187e7345 Extract logic from get_max_pain()
All the max pain math now is in this two functions:

- get_total_intrinsic_values(): calculate the total value for all strike_prices and stores then in a dict[str, Decimal]

- `get_intrinsic_value_and_max_pain()` given the `intrinsic_values` dict, returns the `max_pain` strike price and the `total_intrinsic_value` for that `strike_price`
2025-02-17 21:06:59 -03:00
Nelson Torres e1ea85ce3b Max pain daemon:
- To calculate the `max_pain` first we need an expiration date,
get_expiration_dates()` retrieves them and the user then enters one of
the shown, then using the select expiry_date on `get_instruments()` we
are good to build the `oi_by_strikes` important!

- Add `update_oi_by_strikes()`.

- Add `check_if_complete()`.

- `get_max_pain()`: here's where all the action takes place, the
`oi_by_strikes` must be complete to start the calculations,

- Use `maybe_open_oi_feed` for open a oi_feed.

- Add `max_pain_readme.rst`
2025-02-17 16:51:40 -03:00
Nelson Torres b1cae45cab Deribit api key changes introduce:
- `get_timestamp_int`: added this is the hack, so we can aboid use the custom deribit date format.

- `get_currencies`: added so we could get all deribit's available currencies.

- `get_instruments`: for a especific expiration date, it return a list of criptofeed.Symbol.

- `get_expiration_dates`: expirations dates available for btc's option contracts .

- `get_strikes_dict`: all the strike prices for an especific expiration date.

- `aio_open_interest_feed_relay` `open_oi_feed` `maybe_open_oi_feed`: this three handles all the portal stuff and the cryptofeed callbacks for the open interest and trades, for some reason it need both to work, i need to check that out at some point.

- Also a couple of format fixes.
2025-02-17 16:51:40 -03:00
Nelson Torres 2b9300103d Deribit api key changes introduce:
- `get_timestamp_int`: added this is the hack, so we can aboid use the custom deribit date format.

- `get_currencies`: added so we could get all deribit's available currencies.

- Also a couple of format fixes.
2025-02-17 15:44:37 -03:00
Tyler Goodlet fdde87c2d0 `deribit.feed`: fix "trade" event streaming
The main change needed to make `piker.data.feed._FeedsBus` work was
to correctly format the `'trade'` msgs with the (new schema) expected
`'ticks': list[dict]` field which,
- we compute the `piker` quote-msg-`dict` from the (now directly proxied through)
  `cryptofeed.types.Trade`'s fields inside the body of `stream_quotes()`.
- similarly, move the `'l1'` msg processing, **out of** the `asyncio`-side
  `_l1()` callback (defined as a closure in `.api.aio_price_feed_relay()`
  and passed to the `cryptofeed.FeedHandler`) and instead mod the
  callback to simply pass through the `.types.L1Book` ref directly to
  the `piker`/`trio` side task for conversion.

In support of all that,
- mask-to-drop the alt-branch to wait on a first rt event when the
  `cryptofeed.LastTradesResult.trades: list[Trade]` is empty; doesn't
  seem like this ever even happens?
- add a buncha typing, comments and doc-strs to the routines in
  `.deribit.api` including notes on where we can choose to mod the
  `.bs_fqme` for our eventually preferred `piker` style format.
- simplify some nested `@acm` enters to the new single `async with
  <tuple>)` style.
- be particularly pedantic about typing
  `tractor.to_asyncio.LinkedTaskChannel`
- bit of pep8 line-spacing fixes in `.venues`.
2025-02-17 15:44:37 -03:00
Tyler Goodlet dc57569f1c `.deribit.feed`: get live quotes workin (again)
The quote-msg `'topic'` field was being set and sent as the
`OptionPair.symbol: str` value instead of as the `MktPair.bs_fqme: str`
as is required for matching on the `piker.data.feed` side. So change to
that and simplify the actual `.bs_fqme: str` value to NOT include the
ISO-format time (for now) since it's a big ugly and longer term we need
a `piker`-fqme friendly-on-ze-eyes format/style anyway..
2025-02-17 15:44:37 -03:00
Tyler Goodlet dc2b5a37d1 Bit more `cryptofeed` adapter formatting and typing for clarity.. 2025-02-17 15:44:37 -03:00
Tyler Goodlet f1675181d8 .deribit.venues: add todo for an ideal `OptionPair.expiry` fmt/value 2025-02-17 15:44:37 -03:00
Tyler Goodlet 1ae1c3c059 Report the closest (via fuzzy match) pairs on unmatched input 2025-02-17 15:44:37 -03:00
Tyler Goodlet f3a74b279e Signal hist start using `OptionPair.creation_timestamp`
Such that the `get_hist()` query func raises `DataUnavailable` with an
explicit message regarding the start of the (option) contract's
lifetime.

Other,
- mask some unused imports (for now?)
- drop a duplicate `tractor.get_console_log()` call which was causing
  duplicate console emits (it's already setup by brokerd init now).
- comment various unused code bits i found.
- add a info log around live quotes so we can see for the moment when
  they actually occur.. XD
2025-02-17 15:44:37 -03:00
Tyler Goodlet ce2945d6b0 `.deribit.api` bit of tidying/typing
There were some imports missing or unused as well as a variety of spots
that had grokability issues due to missing type hints.

Other tweaks as part some more thorough manual testing:
- always raise when not `brokers.toml` section since the API can never
  work (no free data without keys).
- inline the `Asset.atype='crypto_currency` field despite it maybe not
  being the best value for `OptionPair` instruments..
- tossed in a now-masked pause block for debugging history queries in
  `Client.bars()`.
- commented out all the live order ctl (internal) endpoints for now
  since they're unused.
2025-02-17 15:44:37 -03:00
Tyler Goodlet 40fbccd667 'Fix `Optional` and use `'linear/reverse'` in `OptionPair.venue`' 2025-02-17 15:44:37 -03:00
Nelson Torres 08e8fb48f4 Deribit's feed fix
- `FeedInit` for init_msgs in `stream_quotes`.

- new cache is `client_pairs` so is replacing the old `client.cache_symbols`.

- `get_mkt_info` added

- `get_ohlc` fixed to comply the new ways of the feed.
2025-02-17 15:44:37 -03:00
Nelson Torres 0dcd1236c4 Deribit's api fix
key changes:

- Resolved the issue with the expiration dates from deribits, now we int instead of the crazy custom deribits format.

- The client now has a new  `_json_rpc_auth_wrapper` that adquires a first access token and then will refresh the access token when this expires.

- `get_assets` fixed, now  we use the public endpoint to check the availables assets, in the future probably this will change, but for now is working just fine.

- `get_mkt_pairs` added.

- `exch_info` added.

- `cache_symbols` fixed.

- Also a lot of reformat made in api.
2025-02-17 15:44:37 -03:00
Nelson Torres f12a6f7438 Venues
Moved from api to venues all the msgspecs structs, also added critical imports in api, feed and __init__ mods.
2025-02-17 15:44:37 -03:00
Tyler Goodlet bda23c25b9 Mk jsronrpc's underlying ws timeout `float('inf')`
Since currently we're only using this IPC subsys for `deribit`, and
generally speaking we're primarly supporting options markets (which are
fairly "slow moving"), flip to a default of NOT resetting the `NoBsWs`
on timeout since doing so normally breaks the jsron-rpc IPC session.
Without a proper `fixture` passed to `open_autorecon_ws()` (which we
should eventually implement!!) relying on a timeout-to-reset more or
less will just cause breakage issues - a proper reconnect sequence must
be implemented before using that feature.

Deats,
- expose and proxy through the `msg_recv_timeout` from
  `open_jsonrpc_session()` into the underlying `open_autorecon_ws()`
  call.
2025-02-17 13:40:04 -05:00
Tyler Goodlet 4c6c1029d6 Refine history gap/termination signalling
Namely handling backends which do not provide a default "frame
size-duration" in their init-config by making the backfiller guess the
value based on the first frame received.

Deats,
- adjust `start_backfill()` to take a more explicit
  `def_frame_duration: Duration` expected to be unpacked from any
  backend hist init-config by the `tsdb_backfill()` caller which now
  also computes a value from the first received frame when the config
  section isn't provided.
- in `start_backfill()` we now always expect the `def_frame_duration`
  input and always decrement the query range by this value whenever
  a `NoData` is raised by the provider-backend paired with an explicit
  `log.warning()` about the handling.
- also relay any `DataUnavailable.args[0]` message from the provider
  in the handler.
- repair "gap reporting" which checks for expected frame duration vs.
  that received with much better humanized logging on the missing
  segment using `pendulum.Interval/Duration.in_words()` output.
2025-02-17 13:26:09 -05:00
Tyler Goodlet 3ab9a9b741 Only use `frame_types` if delivered during enter
The `open_history_client()` provider endpoint can *optionally*
deliver a `frame_types: dict[int, pendulum.Duration]` subsection in its
`config: dict[str, dict]` (as was implemented with the `ib` backend).
This allows the `tsp` backfilling machinery to use this "recommended
frame duration" to subtract from the `last_start_dt` any time a `NoData`
gap is signalled by the `get_hist()` call allowing gaps to be ignored
safely without missing history by knowing the next earliest dt we can
query from using the `end_dt`. However, currently all crypto$ providers
haven't implemented this feat yet..

As such only try to use the `frame_types` feature if provided when
handling `NoData` conditions inside `tsp.start_backfill()` and otherwise
raise as normal.
2025-02-17 13:26:02 -05:00
Tyler Goodlet 6aedacd1b4 TOCHERRY service_mng_to_tractor: type fix to `.hilevel.ServicecMngr` 2025-02-13 21:20:35 -05:00
Tyler Goodlet b549e0fe67 Enable `greenback` for `.pause_from_sync()` by default? 2025-02-13 21:20:35 -05:00
Tyler Goodlet 0a42f07d17 `.brokers.cli`: module type and todo for `--pdb` flag to NOT src from sub-cmd 2025-02-13 21:20:35 -05:00
Tyler Goodlet 004870d1b5 Catch using `Sampler.bcast_errors` where possible
In all other possible IPC disconnect handling blocks. Also more
comprehensive typing throughout `uniform_rate_send()`.
2025-02-13 21:20:35 -05:00
Tyler Goodlet 11e4e3b6d9 Group bcast errors as `Sampler.bcast_errors`
A new class var `tuple[Exception]` such that the err set can be reffed
externally as needed for catching other similar pub-sub/IPC failures in
other (related) real-time sub-systems.

Also added some now-masked logging for debugging live-feed stream reading
issues that should ONLY be used for debugging since they'll greatly
degrade HFT perf. Used the new `log.mk_repr()` stuff (that one day we
should prolly pull from `modden` as a dep) for pretty console emissions.
2025-02-13 21:20:35 -05:00
Tyler Goodlet ce4bab43cb Suppress `trio.EndOfChannel`s raised by remote peer
Since now `tractor` will raise this native `trio`-exc translated from
a `Stop` msg when the peer gracefully terminates a `tractor.MsgStream`.
Just `info()` log in such cases versus continuing to warn for the
others.
2025-02-13 21:20:35 -05:00
Tyler Goodlet d1bd73ec8c `.tsp._anal`: add (unused) `detect_vlm_gaps()` 2025-02-13 21:20:35 -05:00
Tyler Goodlet 03e21e1f85 `.storage.cli`: collect gap-markup-aids into `tf2aids: dict` prior to pause for introspection 2025-02-13 21:20:35 -05:00
Tyler Goodlet a21b2dc854 Delegate to `tractor.msg.pretty_struct` since it was factored from here! 2025-02-13 21:20:35 -05:00
Tyler Goodlet 8564676ed3 Teensie `piker.data` styling tweaks
- use more compact optional value style with `|`-union
- fix `.flows` typing-only import since we need `MktPair` to be
  immediately defined for use on a `msgspec.Struct` field.
- more "tree-like" warning msg in `.validate()` reporting.
2025-02-13 21:20:35 -05:00
Tyler Goodlet 37dcb6e6be Invert `getattr()` check for `get_mkt_pairs()` ep
Such that we `return` early when not defined by the provider backend to
reduce an indent level in `SymbologyCache.load()`.
2025-02-13 21:20:35 -05:00
Tyler Goodlet 6ada638e62 Various `.clearing` todos/notes on potential issues with loglevel settings.. 2025-02-13 21:20:35 -05:00
Tyler Goodlet 3828a4eb03 Type loaded backend modules 2025-02-13 21:20:35 -05:00
Tyler Goodlet 14bd7d5fd6 Bump various `.brokers.core` doc string content/style 2025-02-13 21:20:35 -05:00
Tyler Goodlet a2f73258ca Ignore any non-`.parquet` files under `.config/piker/nativedb/` subdir 2025-02-13 21:20:35 -05:00
Tyler Goodlet dcbf8c0eeb Doc-n-clean `.data._web_bs.open_jsonrpc_session()`
Add a doc-string reflecting recent refinements, drop all the old hook
params, rename `n: trio.Nursery` -> `tn` for "task nursery" fitting with
code base's naming style.
2025-02-13 21:20:35 -05:00
Tyler Goodlet b773961471 Allow ledger passes to ignore (symcache) unknown fqmes
For example in the paper-eng, if you have a backend that doesn't fully
support a symcache (yet) it's handy to be able to ignore processing
other paper-eng txns when all you care about at the moment is the
simulated symbol.

NOTE, that currently this will still result in a key-error when you load
more then one mkt with the paper engine (for which the backend does not
have the symcache implemented) since no fqme ad-hoc query was made for
the 2nd symbol (and i'm not sure we should support that kinda hackery
over just encouraging the sym-cache being added?). Def needs a little
more thought depending on how many backends are never going to be able
to (easily) support caching..
2025-02-13 21:20:35 -05:00
Tyler Goodlet dfdcaea337 .clearing._ems: Don't require `first_quote['last']`
Instead just check for the field (which i'm not huge on the key-name for
anyway) and if not found get the "last price" from the real-time shm
buffer's latest 'close' sample.

Unrelatedly, use a `subs.copy()` in the `Router.client_broadcast()` loop
such that if a `client_stream` is popped on connection failure, we don't
RTE for the "size changed on iteration".
2025-02-13 21:20:35 -05:00
Tyler Goodlet f675c3cfb6 data._web_bs: try to raise jsonrpc errors in parent task 2025-02-13 21:20:35 -05:00
Tyler Goodlet eb86bfef76 Mask no-data pause, add perps to no-`/src`-in-fqme asset set
Was orig for debugging an issue with `kucoin` i think but definitely
shouldn't be left in XD

Also add `'perpetual_future'` to the `.start_backfill()` input literal
set since we don't expect the 'btc/usd.perp.binance' for now.
2025-02-13 21:20:35 -05:00
27 changed files with 1741 additions and 864 deletions

View File

View File

@ -0,0 +1,239 @@
#!/usr/bin/env python
from decimal import (
Decimal,
)
import trio
import tractor
from datetime import datetime
from pprint import pformat
from piker.brokers.deribit.api import (
get_client,
maybe_open_oi_feed,
)
import sys
import pyqtgraph as pg
from PyQt6 import QtCore
from pyqtgraph import ScatterPlotItem, InfiniteLine
from PyQt6.QtWidgets import QApplication
def check_if_complete(
oi: dict[str, dict[str, Decimal | None]]
) -> bool:
return all(
oi[strike]['C'] is not None
and
oi[strike]['P'] is not None for strike in oi
)
async def max_pain_daemon(
) -> None:
oi_by_strikes: dict[str, dict[str, Decimal | None]]
instruments: list[Symbol] = []
expiry_dates: list[str]
expiry_date: str
currency: str = 'btc'
kind: str = 'option'
async with get_client(
) as client:
expiry_dates: list[str] = await client.get_expiration_dates(
currency=currency,
kind=kind
)
print(f'Available expiration dates for {currency}-{kind}:')
print(f'{expiry_dates}')
expiry_date = input('Please enter a valid expiration date: ').upper()
print('Starting little daemon...')
oi_by_strikes: dict[str, dict[str, Decimal]]
instruments = await client.get_instruments(
expiry_date=expiry_date,
)
oi_by_strikes = client.get_strikes_dict(instruments)
def get_total_intrinsic_values(
oi_by_strikes: dict[str, dict[str, Decimal]]
) -> dict[str, dict[str, Decimal]]:
call_cash: Decimal = Decimal(0)
put_cash: Decimal = Decimal(0)
intrinsic_values: dict[str, dict[str, Decimal]] = {}
closes: list = sorted(Decimal(close) for close in oi_by_strikes)
for strike, oi in oi_by_strikes.items():
s = Decimal(strike)
call_cash = sum(max(0, (s - c) * oi_by_strikes[str(c)]['C']) for c in closes)
put_cash = sum(max(0, (c - s) * oi_by_strikes[str(c)]['P']) for c in closes)
intrinsic_values[strike] = {
'C': call_cash,
'P': put_cash,
'total': call_cash + put_cash,
}
return intrinsic_values
def get_intrinsic_value_and_max_pain(
intrinsic_values: dict[str, dict[str, Decimal]]
):
# We meed to find the lowest value, so we start at
# infinity to ensure that, and the max_pain must be
# an amount greater than zero.
total_intrinsic_value: Decimal = Decimal('Infinity')
max_pain: Decimal = Decimal(0)
for strike, oi in oi_by_strikes.items():
s = Decimal(strike)
if intrinsic_values[strike]['total'] < total_intrinsic_value:
total_intrinsic_value = intrinsic_values[strike]['total']
max_pain = s
return total_intrinsic_value, max_pain
def plot_graph(
oi_by_strikes: dict[str, dict[str, Decimal]],
plot,
):
"""Update the bar graph with new open interest data."""
plot.clear()
intrinsic_values = get_total_intrinsic_values(oi_by_strikes)
for strike_str in sorted(oi_by_strikes, key=lambda x: int(x)):
strike = int(strike_str)
calls_val = float(oi_by_strikes[strike_str]['C'])
puts_val = float(oi_by_strikes[strike_str]['P'])
bar_c = pg.BarGraphItem(
x=[strike - 100],
height=[calls_val],
width=200,
pen='w',
brush=(0, 0, 255, 150)
)
plot.addItem(bar_c)
bar_p = pg.BarGraphItem(
x=[strike + 100],
height=[puts_val],
width=200,
pen='w',
brush=(255, 0, 0, 150)
)
plot.addItem(bar_p)
total_val = float(intrinsic_values[strike_str]['total']) / 100000
scatter_iv = ScatterPlotItem(
x=[strike],
y=[total_val],
pen=pg.mkPen(color=(0, 255, 0), width=2),
brush=pg.mkBrush(0, 255, 0, 150),
size=3,
symbol='o'
)
plot.addItem(scatter_iv)
_, max_pain = get_intrinsic_value_and_max_pain(intrinsic_values)
vertical_line = InfiniteLine(
pos=max_pain,
angle=90,
pen=pg.mkPen(color='yellow', width=1, style=QtCore.Qt.PenStyle.DotLine),
label=f'Max pain: {max_pain:,.0f}',
labelOpts={
'position': 0.85,
'color': 'yellow',
'movable': True
}
)
plot.addItem(vertical_line)
def update_oi_by_strikes(msg: tuple):
nonlocal oi_by_strikes
if 'oi' == msg[0]:
strike_price = msg[1]['strike_price']
option_type = msg[1]['option_type']
open_interest = msg[1]['open_interest']
oi_by_strikes.setdefault(
strike_price, {}
).update(
{option_type: open_interest}
)
def get_max_pain(
oi_by_strikes: dict[str, dict[str, Decimal]]
) -> dict[str, str | Decimal]:
'''
This method requires only the strike_prices and oi for call
and puts, the closes list are the same as the strike_prices
the idea is to sum all the calls and puts cash for each strike
and the ITM strikes from that strike, the lowest value is what we
are looking for the intrinsic value.
'''
nonlocal timestamp
intrinsic_values = get_total_intrinsic_values(oi_by_strikes)
total_intrinsic_value, max_pain = get_intrinsic_value_and_max_pain(intrinsic_values)
return {
'timestamp': timestamp,
'expiry_date': expiry_date,
'total_intrinsic_value': total_intrinsic_value,
'max_pain': max_pain,
}
async with maybe_open_oi_feed(
instruments,
) as oi_feed:
# Initialize QApplication
app = QApplication(sys.argv)
win = pg.GraphicsLayoutWidget(show=True)
win.setWindowTitle('Calls (blue) vs Puts (red)')
plot = win.addPlot(title='OI by Strikes')
plot.showGrid(x=True, y=True)
print('Plot initialized...')
async for msg in oi_feed:
update_oi_by_strikes(msg)
if check_if_complete(oi_by_strikes):
if 'oi' == msg[0]:
timestamp = msg[1]['timestamp']
max_pain = get_max_pain(oi_by_strikes)
intrinsic_values = get_total_intrinsic_values(oi_by_strikes)
# graph here
plot_graph(oi_by_strikes, plot)
print('-----------------------------------------------')
print(f'timestamp: {datetime.fromtimestamp(max_pain['timestamp'])}')
print(f'expiry_date: {max_pain['expiry_date']}')
print(f'max_pain: {max_pain['max_pain']:,.0f}')
print(f'total intrinsic value: {max_pain['total_intrinsic_value']:,.0f}')
print('-----------------------------------------------')
# Process GUI events to keep the window responsive
app.processEvents()
async def main():
async with tractor.open_nursery() as n:
p: tractor.Portal = await n.start_actor(
'max_pain_daemon',
enable_modules=[__name__],
infect_asyncio=True,
)
await p.run(max_pain_daemon)
if __name__ == '__main__':
trio.run(main)

View File

@ -0,0 +1,19 @@
## Max Pain Calculation for Deribit Options
This feature, which calculates the max pain point for options traded on the Deribit exchange using cryptofeed library.
- Functions in the api module for fetching options data from Deribit. [commit](https://pikers.dev/pikers/piker/commit/da55856dd2876291f55a06eb0561438a912d8241)
- Compute the max pain point based on open interest data using deribit's api. [commit](https://pikers.dev/pikers/piker/commit/0d9d6e15ba0edeb662ec97f7599dd66af3046b94)
### How to test it?
**Before start:** in order to get this working with `uv`, you **must** use my `tractor` [fork](https://pikers.dev/ntorres/tractor/src/branch/aio_abandons) and this branch: `aio_abandons`, the reason is that I cherry-pick the `uv_migration` that guille made, for some reason that a didn't dive into, in my system y need tractor using `uv` too. quite hacky I guess.
1. `uv lock`
2. `uv run --no-dev python examples/max_pain.py`
3. A message should be display, enter one of the expiration date available.
4. The script should be up and running.

View File

@ -30,7 +30,8 @@ from types import ModuleType
from typing import (
Any,
Iterator,
Generator
Generator,
TYPE_CHECKING,
)
import pendulum
@ -59,8 +60,10 @@ from ..clearing._messages import (
BrokerdPosition,
)
from piker.types import Struct
from piker.data._symcache import SymbologyCache
from ..log import get_logger
from piker.log import get_logger
if TYPE_CHECKING:
from piker.data._symcache import SymbologyCache
log = get_logger(__name__)
@ -493,6 +496,17 @@ class Account(Struct):
_mktmap_table: dict[str, MktPair] | None = None,
only_require: list[str]|True = True,
# ^list of fqmes that are "required" to be processed from
# this ledger pass; we often don't care about others and
# definitely shouldn't always error in such cases.
# (eg. broker backend loaded that doesn't yet supsport the
# symcache but also, inside the paper engine we don't ad-hoc
# request `get_mkt_info()` for every symbol in the ledger,
# only the one for which we're simulating against).
# TODO, not sure if there's a better soln for this, ideally
# all backends get symcache support afap i guess..
) -> dict[str, Position]:
'''
Update the internal `.pps[str, Position]` table from input
@ -535,11 +549,32 @@ class Account(Struct):
if _mktmap_table is None:
raise
required: bool = (
only_require is True
or (
only_require is not True
and
fqme in only_require
)
)
# XXX: caller is allowed to provide a fallback
# mktmap table for the case where a new position is
# being added and the preloaded symcache didn't
# have this entry prior (eg. with frickin IB..)
mkt = _mktmap_table[fqme]
if (
not (mkt := _mktmap_table.get(fqme))
and
required
):
raise
elif not required:
continue
else:
# should be an entry retreived somewhere
assert mkt
if not (pos := pps.get(bs_mktid)):
@ -656,7 +691,7 @@ class Account(Struct):
def write_config(self) -> None:
'''
Write the current account state to the user's account TOML file, normally
something like ``pps.toml``.
something like `pps.toml`.
'''
# TODO: show diff output?

View File

@ -51,6 +51,7 @@ __brokers__: list[str] = [
'ib',
'kraken',
'kucoin',
'deribit',
# broken but used to work
# 'questrade',
@ -61,7 +62,6 @@ __brokers__: list[str] = [
# wstrade
# iex
# deribit
# bitso
]
@ -98,13 +98,14 @@ async def open_cached_client(
If one has not been setup do it and cache it.
'''
brokermod = get_brokermod(brokername)
brokermod: ModuleType = get_brokermod(brokername)
# TODO: make abstract or `typing.Protocol`
# client: Client
async with maybe_open_context(
acm_func=brokermod.get_client,
kwargs=kwargs,
) as (cache_hit, client):
if cache_hit:
log.runtime(f'Reusing existing {client}')

View File

@ -471,11 +471,15 @@ def search(
'''
# global opts
brokermods = list(config['brokermods'].values())
brokermods: list[ModuleType] = list(config['brokermods'].values())
# TODO: this is coming from the `search --pdb` NOT from
# the `piker --pdb` XD ..
# -[ ] pull from the parent click ctx's values..dumdum
# assert pdb
# define tractor entrypoint
async def main(func):
async with maybe_open_pikerd(
loglevel=config['loglevel'],
debug_mode=pdb,

View File

@ -22,7 +22,9 @@ routines should be primitive data types where possible.
"""
import inspect
from types import ModuleType
from typing import List, Dict, Any, Optional
from typing import (
Any,
)
import trio
@ -34,8 +36,10 @@ from ..accounting import MktPair
async def api(brokername: str, methname: str, **kwargs) -> dict:
"""Make (proxy through) a broker API call by name and return its result.
"""
'''
Make (proxy through) a broker API call by name and return its result.
'''
brokermod = get_brokermod(brokername)
async with brokermod.get_client() as client:
meth = getattr(client, methname, None)
@ -62,10 +66,14 @@ async def api(brokername: str, methname: str, **kwargs) -> dict:
async def stocks_quote(
brokermod: ModuleType,
tickers: List[str]
) -> Dict[str, Dict[str, Any]]:
"""Return quotes dict for ``tickers``.
"""
tickers: list[str]
) -> dict[str, dict[str, Any]]:
'''
Return a `dict` of snapshot quotes for the provided input
`tickers`: a `list` of fqmes.
'''
async with brokermod.get_client() as client:
return await client.quote(tickers)
@ -74,13 +82,15 @@ async def stocks_quote(
async def option_chain(
brokermod: ModuleType,
symbol: str,
date: Optional[str] = None,
) -> Dict[str, Dict[str, Dict[str, Any]]]:
"""Return option chain for ``symbol`` for ``date``.
date: str|None = None,
) -> dict[str, dict[str, dict[str, Any]]]:
'''
Return option chain for ``symbol`` for ``date``.
By default all expiries are returned. If ``date`` is provided
then contract quotes for that single expiry are returned.
"""
'''
async with brokermod.get_client() as client:
if date:
id = int((await client.tickers2ids([symbol]))[symbol])
@ -98,7 +108,7 @@ async def option_chain(
# async def contracts(
# brokermod: ModuleType,
# symbol: str,
# ) -> Dict[str, Dict[str, Dict[str, Any]]]:
# ) -> dict[str, dict[str, dict[str, Any]]]:
# """Return option contracts (all expiries) for ``symbol``.
# """
# async with brokermod.get_client() as client:
@ -110,15 +120,24 @@ async def bars(
brokermod: ModuleType,
symbol: str,
**kwargs,
) -> Dict[str, Dict[str, Dict[str, Any]]]:
"""Return option contracts (all expiries) for ``symbol``.
"""
) -> dict[str, dict[str, dict[str, Any]]]:
'''
Return option contracts (all expiries) for ``symbol``.
'''
async with brokermod.get_client() as client:
return await client.bars(symbol, **kwargs)
async def search_w_brokerd(name: str, pattern: str) -> dict:
async def search_w_brokerd(
name: str,
pattern: str,
) -> dict:
# TODO: WHY NOT WORK!?!
# when we `step` through the next block?
# import tractor
# await tractor.pause()
async with open_cached_client(name) as client:
# TODO: support multiple asset type concurrent searches.
@ -130,12 +149,12 @@ async def symbol_search(
pattern: str,
**kwargs,
) -> Dict[str, Dict[str, Dict[str, Any]]]:
) -> dict[str, dict[str, dict[str, Any]]]:
'''
Return symbol info from broker.
'''
results = []
results: list[str] = []
async def search_backend(
brokermod: ModuleType
@ -143,6 +162,13 @@ async def symbol_search(
brokername: str = mod.name
# TODO: figure this the FUCK OUT
# -> ok so obvi in the root actor any async task that's
# spawned outside the main tractor-root-actor task needs to
# call this..
# await tractor.devx._debug.maybe_init_greenback()
# tractor.pause_from_sync()
async with maybe_spawn_brokerd(
mod.name,
infect_asyncio=getattr(
@ -162,7 +188,6 @@ async def symbol_search(
))
async with trio.open_nursery() as n:
for mod in brokermods:
n.start_soon(search_backend, mod.name)
@ -172,11 +197,13 @@ async def symbol_search(
async def mkt_info(
brokermod: ModuleType,
fqme: str,
**kwargs,
) -> MktPair:
'''
Return MktPair info from broker including src and dst assets.
Return the `piker.accounting.MktPair` info struct from a given
backend broker tradable src/dst asset pair.
'''
async with open_cached_client(brokermod.name) as client:

View File

@ -25,6 +25,7 @@ from .api import (
get_client,
)
from .feed import (
get_mkt_info,
open_history_client,
open_symbol_search,
stream_quotes,
@ -34,15 +35,20 @@ from .feed import (
# open_trade_dialog,
# norm_trade_records,
# )
from .venues import (
OptionPair,
)
log = get_logger(__name__)
__all__ = [
'get_client',
# 'trades_dialogue',
'get_mkt_info',
'open_history_client',
'open_symbol_search',
'stream_quotes',
'OptionPair',
# 'norm_trade_records',
]

File diff suppressed because it is too large Load Diff

View File

@ -18,38 +18,59 @@
Deribit backend.
'''
from __future__ import annotations
from contextlib import asynccontextmanager as acm
from datetime import datetime
from typing import Any, Optional, Callable
from typing import (
# Any,
# Optional,
Callable,
)
# from pprint import pformat
import time
import cryptofeed
import trio
from trio_typing import TaskStatus
import pendulum
from rapidfuzz import process as fuzzy
from pendulum import (
from_timestamp,
)
import numpy as np
import tractor
from piker.brokers import open_cached_client
from piker.log import get_logger, get_console_log
from piker.data import ShmArray
from piker.brokers._util import (
BrokerError,
from piker.accounting import (
Asset,
MktPair,
unpack_fqme,
)
from piker.brokers import (
open_cached_client,
NoData,
DataUnavailable,
)
from cryptofeed import FeedHandler
from cryptofeed.defines import (
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
from piker._cacheables import (
async_lifo_cache,
)
from cryptofeed.symbols import Symbol
from piker.log import (
get_logger,
mk_repr,
)
from piker.data.validate import FeedInit
from .api import (
Client, Trade,
get_config,
str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
Client,
# get_config,
piker_sym_to_cb_sym,
cb_sym_to_deribit_inst,
str_to_cb_sym,
maybe_open_price_feed
)
from .venues import (
Pair,
OptionPair,
Trade,
)
_spawn_kwargs = {
'infect_asyncio': True,
@ -64,90 +85,215 @@ async def open_history_client(
mkt: MktPair,
) -> tuple[Callable, int]:
fnstrument: str = mkt.bs_fqme
# TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client:
pair: OptionPair = client._pairs[mkt.dst.name]
# XXX NOTE, the cuckers use ms !!!
creation_time_s: int = pair.creation_timestamp/1000
async def get_ohlc(
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
timeframe: float,
end_dt: datetime | None = None,
start_dt: datetime | None = None,
) -> tuple[
np.ndarray,
datetime, # start
datetime, # end
]:
if timeframe != 60:
raise DataUnavailable('Only 1m bars are supported')
array = await client.bars(
instrument,
array: np.ndarray = await client.bars(
mkt,
start_dt=start_dt,
end_dt=end_dt,
)
if len(array) == 0:
raise DataUnavailable
if (
end_dt is None
):
raise DataUnavailable(
'No history seems to exist yet?\n\n'
f'{mkt}'
)
elif (
end_dt
and
end_dt.timestamp() < creation_time_s
):
# the contract can't have history
# before it was created.
pair_type_str: str = type(pair).__name__
create_dt: datetime = from_timestamp(creation_time_s)
raise DataUnavailable(
f'No history prior to\n'
f'`{pair_type_str}.creation_timestamp: int = '
f'{pair.creation_timestamp}\n\n'
f'------ deribit sux ------\n'
f'WHICH IN "NORMAL PEOPLE WHO USE EPOCH TIME" form is,\n'
f'creation_time_s: {creation_time_s}\n'
f'create_dt: {create_dt}\n'
)
raise NoData(
f'No frame for {start_dt} -> {end_dt}\n'
)
start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time'])
start_dt = from_timestamp(array[0]['time'])
end_dt = from_timestamp(array[-1]['time'])
times = array['time']
if not times.any():
raise ValueError(
'Bad frame with null-times?\n\n'
f'{times}'
)
if end_dt is None:
inow: int = round(time.time())
if (inow - times[-1]) > 60:
await tractor.pause()
return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 3, 'rate': 3}
yield (
get_ohlc,
{ # backfill config
'erlangs': 3,
'rate': 3,
}
)
@async_lifo_cache()
async def get_mkt_info(
fqme: str,
) -> tuple[MktPair, Pair|OptionPair] | None:
# uppercase since kraken bs_mktid is always upper
if 'deribit' not in fqme.lower():
fqme += '.deribit'
mkt_mode: str = ''
broker, mkt_ep, venue, expiry = unpack_fqme(fqme)
# NOTE: we always upper case all tokens to be consistent with
# binance's symbology style for pairs, like `BTCUSDT`, but in
# theory we could also just keep things lower case; as long as
# we're consistent and the symcache matches whatever this func
# returns, always!
expiry: str = expiry.upper()
venue: str = venue.upper()
# venue_lower: str = venue.lower()
mkt_mode: str = 'option'
async with open_cached_client(
'deribit',
) as client:
assets: dict[str, Asset] = await client.get_assets()
pair_str: str = mkt_ep.lower()
pair: Pair = await client.exch_info(
sym=pair_str,
)
mkt_mode = pair.venue
client.mkt_mode = mkt_mode
dst: Asset | None = assets.get(pair.bs_dst_asset)
src: Asset | None = assets.get(pair.bs_src_asset)
mkt = MktPair(
dst=dst,
src=src,
price_tick=pair.price_tick,
size_tick=pair.size_tick,
bs_mktid=pair.symbol,
venue=mkt_mode,
broker='deribit',
_atype=mkt_mode,
_fqme_without_src=True,
# expiry=pair.expiry,
# XXX TODO, currently we don't use it since it's
# already "described" in the `OptionPair.symbol: str`
# and if we slap in the ISO repr it's kinda hideous..
# -[ ] figure out the best either std
)
return mkt, pair
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
loglevel: str = None,
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
'''
Open a live quote stream for the market set defined by `symbols`.
sym = symbols[0]
Internally this starts a `cryptofeed.FeedHandler` inside an `asyncio`-side
task and relays through L1 and `Trade` msgs here to our `trio.Task`.
'''
sym = symbols[0].split('.')[0]
init_msgs: list[FeedInit] = []
# multiline nested `dict` formatter (since rn quote-msgs are
# just that).
pfmt: Callable[[str], str] = mk_repr(
# so we can see `deribit`'s delightfully mega-long bs fields..
maxstring=100,
)
async with (
open_cached_client('deribit') as client,
send_chan as send_chan
):
mkt: MktPair
pair: Pair
mkt, pair = await get_mkt_info(sym)
init_msgs = {
# pass back token, and bool, signalling if we're the writer
# and that history has been written
sym: {
'symbol_info': {
'asset_type': 'option',
'price_tick_size': 0.0005
},
'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
},
}
# build out init msgs according to latest spec
init_msgs.append(
FeedInit(
mkt_info=mkt,
)
)
# build `cryptofeed` feed-handle
cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym)
nsym = piker_sym_to_cb_sym(sym)
from_cf: tractor.to_asyncio.LinkedTaskChannel
async with maybe_open_price_feed(sym) as from_cf:
async with maybe_open_price_feed(sym) as stream:
# load the "last trades" summary
last_trades_res: cryptofeed.LastTradesResult = await client.last_trades(
cb_sym_to_deribit_inst(cf_sym),
count=1,
)
last_trades: list[Trade] = last_trades_res.trades
cache = await client.cache_symbols()
# TODO, do we even need this or will the above always
# work?
# if not last_trades:
# await tractor.pause()
# async for typ, quote in from_cf:
# if typ == 'trade':
# last_trade = Trade(**(quote['data']))
# break
last_trades = (await client.last_trades(
cb_sym_to_deribit_inst(nsym), count=1)).trades
# else:
last_trade = Trade(
**(last_trades[0])
)
if len(last_trades) == 0:
last_trade = None
async for typ, quote in stream:
if typ == 'trade':
last_trade = Trade(**(quote['data']))
break
else:
last_trade = Trade(**(last_trades[0]))
first_quote = {
first_quote: dict = {
'symbol': sym,
'last': last_trade.price,
'brokerd_ts': last_trade.timestamp,
@ -158,13 +304,84 @@ async def stream_quotes(
'broker_ts': last_trade.timestamp
}]
}
task_status.started((init_msgs, first_quote))
task_status.started((
init_msgs,
first_quote,
))
feed_is_live.set()
async for typ, quote in stream:
topic = quote['symbol']
await send_chan.send({topic: quote})
# NOTE XXX, static for now!
# => since this only handles ONE mkt feed at a time we
# don't need a lookup table to map interleaved quotes
# from multiple possible mkt-pairs
topic: str = mkt.bs_fqme
# deliver until cancelled
async for typ, ref in from_cf:
match typ:
case 'trade':
trade: cryptofeed.types.Trade = ref
# TODO, re-impl this according to teh ideal
# fqme for opts that we choose!!
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(trade.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'last': trade.price,
'broker_ts': time.time(),
# ^TODO, name this `brokerd/datad_ts` and
# use `time.time_ns()` ??
'ticks': [{
'type': 'trade',
'price': float(trade.price),
'size': float(trade.amount),
'broker_ts': trade.timestamp,
}],
}
log.info(
f'deribit {typ!r} quote for {sym!r}\n\n'
f'{trade}\n\n'
f'{pfmt(piker_quote)}\n'
)
case 'l1':
book: cryptofeed.types.L1Book = ref
# TODO, so this is where we can possibly change things
# and instead lever the `MktPair.bs_fqme: str` output?
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(book.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'ticks': [
{'type': 'bid',
'price': float(book.bid_price),
'size': float(book.bid_size)},
{'type': 'bsize',
'price': float(book.bid_price),
'size': float(book.bid_size),},
{'type': 'ask',
'price': float(book.ask_price),
'size': float(book.ask_size),},
{'type': 'asize',
'price': float(book.ask_price),
'size': float(book.ask_size),}
]
}
await send_chan.send({
topic: piker_quote,
})
@tractor.context
@ -174,12 +391,21 @@ async def open_symbol_search(
async with open_cached_client('deribit') as client:
# load all symbols locally for fast search
cache = await client.cache_symbols()
# cache = client._pairs
await ctx.started()
async with ctx.open_stream() as stream:
pattern: str
async for pattern in stream:
# repack in dict form
await stream.send(
await client.search_symbols(pattern))
# NOTE: pattern fuzzy-matching is done within
# the methd impl.
pairs: dict[str, Pair] = await client.search_symbols(
pattern,
)
# repack in fqme-keyed table
byfqme: dict[str, Pair] = {}
for pair in pairs.values():
byfqme[pair.bs_fqme] = pair
await stream.send(byfqme)

View File

@ -0,0 +1,196 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Per market data-type definitions and schemas types.
"""
from __future__ import annotations
import pendulum
from typing import (
Literal,
Optional,
)
from decimal import Decimal
from piker.types import Struct
# API endpoint paths by venue / sub-API
_domain: str = 'deribit.com'
_url = f'https://www.{_domain}'
# WEBsocketz
_ws_url: str = f'wss://www.{_domain}/ws/api/v2'
# test nets
_testnet_ws_url: str = f'wss://test.{_domain}/ws/api/v2'
MarketType = Literal[
'option'
]
def get_api_eps(venue: MarketType) -> tuple[str, str]:
'''
Return API ep root paths per venue.
'''
return {
'option': (
_ws_url,
),
}[venue]
class Pair(Struct, frozen=True, kw_only=True):
symbol: str
# src
quote_currency: str # 'BTC'
# dst
base_currency: str # "BTC",
tick_size: float # 0.0001 # [{'above_price': 0.005, 'tick_size': 0.0005}]
tick_size_steps: list[dict[str, float]]
@property
def price_tick(self) -> Decimal:
return Decimal(str(self.tick_size_steps[0]['above_price']))
@property
def size_tick(self) -> Decimal:
return Decimal(str(self.tick_size))
@property
def bs_fqme(self) -> str:
return f'{self.symbol}'
@property
def bs_mktid(self) -> str:
return f'{self.symbol}.{self.venue}'
class OptionPair(Pair, frozen=True):
taker_commission: float # 0.0003
strike: float # 5000.0
settlement_period: str # 'day'
settlement_currency: str # "BTC",
rfq: bool # false
price_index: str # 'btc_usd'
option_type: str # 'call'
min_trade_amount: float # 0.1
maker_commission: float # 0.0003
kind: str # 'option'
is_active: bool # true
instrument_type: str # 'reversed'
instrument_name: str # 'BTC-1SEP24-55000-C'
instrument_id: int # 364671
expiration_timestamp: int # 1725177600000
creation_timestamp: int # 1724918461000
counter_currency: str # 'USD'
contract_size: float # '1.0'
block_trade_tick_size: float # '0.0001'
block_trade_min_trade_amount: int # '25'
block_trade_commission: float # '0.003'
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.deribit:OptionPair'
# TODO, impl this without the MM:SS part of
# the `'THH:MM:SS..'` etc..
@property
def expiry(self) -> str:
iso_date = pendulum.from_timestamp(
self.expiration_timestamp / 1000
).isoformat()
return iso_date
@property
def venue(self) -> str:
return f'{self.instrument_type}_option'
@property
def bs_fqme(self) -> str:
return f'{self.symbol}'
@property
def bs_src_asset(self) -> str:
return f'{self.quote_currency}'
@property
def bs_dst_asset(self) -> str:
return f'{self.symbol}'
PAIRTYPES: dict[MarketType, Pair] = {
'option': OptionPair,
}
class JSONRPCResult(Struct):
id: int
usIn: int
usOut: int
usDiff: int
testnet: bool
jsonrpc: str = '2.0'
error: Optional[dict] = None
result: Optional[list[dict]] = None
class JSONRPCChannel(Struct):
method: str
params: dict
jsonrpc: str = '2.0'
class KLinesResult(Struct):
low: list[float]
cost: list[float]
high: list[float]
open: list[float]
close: list[float]
ticks: list[int]
status: str
volume: list[float]
class Trade(Struct):
iv: float
price: float
amount: float
trade_id: str
contracts: float
direction: str
trade_seq: int
timestamp: int
mark_price: float
index_price: float
tick_direction: int
instrument_name: str
combo_id: Optional[str] = '',
combo_trade_id: Optional[int] = 0,
block_trade_id: Optional[str] = '',
block_trade_leg_count: Optional[int] = 0,
class LastTradesResult(Struct):
trades: list[Trade]
has_more: bool

View File

@ -168,7 +168,6 @@ class OrderClient(Struct):
async def relay_orders_from_sync_code(
client: OrderClient,
symbol_key: str,
to_ems_stream: tractor.MsgStream,
@ -242,6 +241,11 @@ async def open_ems(
async with maybe_open_emsd(
broker,
# XXX NOTE, LOL so this determines the daemon `emsd` loglevel
# then FYI.. that's kinda wrong no?
# -[ ] shouldn't it be set by `pikerd -l` or no?
# -[ ] would make a lot more sense to have a subsys ctl for
# levels.. like `-l emsd.info` or something?
loglevel=loglevel,
) as portal:

View File

@ -653,7 +653,11 @@ class Router(Struct):
flume = feed.flumes[fqme]
first_quote: dict = flume.first_quote
book: DarkBook = self.get_dark_book(broker)
book.lasts[fqme]: float = float(first_quote['last'])
if not (last := first_quote.get('last')):
last: float = flume.rt_shm.array[-1]['close']
book.lasts[fqme]: float = float(last)
async with self.maybe_open_brokerd_dialog(
brokermod=brokermod,
@ -716,7 +720,7 @@ class Router(Struct):
subs = self.subscribers[sub_key]
sent_some: bool = False
for client_stream in subs:
for client_stream in subs.copy():
try:
await client_stream.send(msg)
sent_some = True
@ -1010,6 +1014,10 @@ async def translate_and_relay_brokerd_events(
status_msg.brokerd_msg = msg
status_msg.src = msg.broker_details['name']
if not status_msg.req:
# likely some order change state?
await tractor.pause()
else:
await router.client_broadcast(
status_msg.req.symbol,
status_msg,

View File

@ -297,6 +297,8 @@ class PaperBoi(Struct):
# transmit pp msg to ems
pp: Position = self.acnt.pps[bs_mktid]
# TODO, this will break if `require_only=True` was passed to
# `.update_from_ledger()`
pp_msg = BrokerdPosition(
broker=self.broker,
@ -653,6 +655,7 @@ async def open_trade_dialog(
# in) use manually constructed table from calling
# the `.get_mkt_info()` provider EP above.
_mktmap_table=mkt_by_fqme,
only_require=list(mkt_by_fqme),
)
pp_msgs: list[BrokerdPosition] = []

View File

@ -30,6 +30,7 @@ subsys: str = 'piker.clearing'
log = get_logger(subsys)
# TODO, oof doesn't this ignore the `loglevel` then???
get_console_log = partial(
get_console_log,
name=subsys,

View File

@ -140,11 +140,10 @@ def pikerd(
if pdb:
log.warning((
"\n"
"!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n"
"When a `piker` daemon crashes it will block the "
"task-thread until resumed from console!\n"
"\n"
'\n\n'
'!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n'
'When a `piker` daemon crashes it will block the '
'task-thread until resumed from console!\n'
))
# service-actor registry endpoint socket-address set
@ -177,7 +176,7 @@ def pikerd(
from .. import service
async def main():
service_mngr: service.Services
service_mngr: service.ServiceMngr
async with (
service.open_pikerd(

View File

@ -95,6 +95,12 @@ class Sampler:
# history loading.
incr_task_cs: trio.CancelScope | None = None
bcast_errors: tuple[Exception] = (
trio.BrokenResourceError,
trio.ClosedResourceError,
trio.EndOfChannel,
)
# holds all the ``tractor.Context`` remote subscriptions for
# a particular sample period increment event: all subscribers are
# notified on a step.
@ -258,14 +264,15 @@ class Sampler:
subs: set
last_ts, subs = pair
task = trio.lowlevel.current_task()
log.debug(
f'SUBS {self.subscribers}\n'
f'PAIR {pair}\n'
f'TASK: {task}: {id(task)}\n'
f'broadcasting {period_s} -> {last_ts}\n'
# NOTE, for debugging pub-sub issues
# task = trio.lowlevel.current_task()
# log.debug(
# f'AlL-SUBS@{period_s!r}: {self.subscribers}\n'
# f'PAIR: {pair}\n'
# f'TASK: {task}: {id(task)}\n'
# f'broadcasting {period_s} -> {last_ts}\n'
# f'consumers: {subs}'
)
# )
borked: set[MsgStream] = set()
sent: set[MsgStream] = set()
while True:
@ -282,12 +289,11 @@ class Sampler:
await stream.send(msg)
sent.add(stream)
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
except self.bcast_errors as err:
log.error(
f'{stream._ctx.chan.uid} dropped connection'
f'Connection dropped for IPC ctx\n'
f'{stream._ctx}\n\n'
f'Due to {type(err)}'
)
borked.add(stream)
else:
@ -394,7 +400,8 @@ async def register_with_sampler(
finally:
if (
sub_for_broadcasts
and subs
and
subs
):
try:
subs.remove(stream)
@ -561,8 +568,7 @@ async def open_sample_stream(
async def sample_and_broadcast(
bus: _FeedsBus, # noqa
bus: _FeedsBus,
rt_shm: ShmArray,
hist_shm: ShmArray,
quote_stream: trio.abc.ReceiveChannel,
@ -582,11 +588,33 @@ async def sample_and_broadcast(
overruns = Counter()
# NOTE, only used for debugging live-data-feed issues, though
# this should be resolved more correctly in the future using the
# new typed-msgspec feats of `tractor`!
#
# XXX, a multiline nested `dict` formatter (since rn quote-msgs
# are just that).
# pfmt: Callable[[str], str] = mk_repr()
# iterate stream delivered by broker
async for quotes in quote_stream:
# print(quotes)
# TODO: ``numba`` this!
# XXX WARNING XXX only enable for debugging bc ow can cost
# ALOT of perf with HF-feedz!!!
#
# log.info(
# 'Rx live quotes:\n'
# f'{pfmt(quotes)}'
# )
# TODO,
# -[ ] `numba` or `cython`-nize this loop possibly?
# |_alternatively could we do it in rust somehow by upacking
# arrow msgs instead of using `msgspec`?
# -[ ] use `msgspec.Struct` support in new typed-msging from
# `tractor` to ensure only allowed msgs are transmitted?
#
for broker_symbol, quote in quotes.items():
# TODO: in theory you can send the IPC msg *before* writing
# to the sharedmem array to decrease latency, however, that
@ -659,6 +687,21 @@ async def sample_and_broadcast(
sub_key: str = broker_symbol.lower()
subs: set[Sub] = bus.get_subs(sub_key)
# TODO, figure out how to make this useful whilst
# incoporating feed "pausing" ..
#
# if not subs:
# all_bs_fqmes: list[str] = list(
# bus._subscribers.keys()
# )
# log.warning(
# f'No subscribers for {brokername!r} live-quote ??\n'
# f'broker_symbol: {broker_symbol}\n\n'
# f'Maybe the backend-sys symbol does not match one of,\n'
# f'{pfmt(all_bs_fqmes)}\n'
# )
# NOTE: by default the broker backend doesn't append
# it's own "name" into the fqme schema (but maybe it
# should?) so we have to manually generate the correct
@ -728,18 +771,14 @@ async def sample_and_broadcast(
if lags > 10:
await tractor.pause()
except (
trio.BrokenResourceError,
trio.ClosedResourceError,
trio.EndOfChannel,
):
except Sampler.bcast_errors as ipc_err:
ctx: Context = ipc._ctx
chan: Channel = ctx.chan
if ctx:
log.warning(
'Dropped `brokerd`-quotes-feed connection:\n'
f'{broker_symbol}:'
f'{ctx.cid}@{chan.uid}'
f'Dropped `brokerd`-feed for {broker_symbol!r} due to,\n'
f'x>) {ctx.cid}@{chan.uid}'
f'|_{ipc_err!r}\n\n'
)
if sub.throttle_rate:
assert ipc._closed
@ -756,12 +795,11 @@ async def sample_and_broadcast(
async def uniform_rate_send(
rate: float,
quote_stream: trio.abc.ReceiveChannel,
stream: MsgStream,
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
@ -779,13 +817,16 @@ async def uniform_rate_send(
https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9
'''
# TODO: compute the approx overhead latency per cycle
left_to_sleep = throttle_period = 1/rate - 0.000616
# ?TODO? dynamically compute the **actual** approx overhead latency per cycle
# instead of this magic # bidinezz?
throttle_period: float = 1/rate - 0.000616
left_to_sleep: float = throttle_period
# send cycle state
first_quote: dict|None
first_quote = last_quote = None
last_send = time.time()
diff = 0
last_send: float = time.time()
diff: float = 0
task_status.started()
ticks_by_type: dict[
@ -796,22 +837,28 @@ async def uniform_rate_send(
clear_types = _tick_groups['clears']
while True:
# compute the remaining time to sleep for this throttled cycle
left_to_sleep = throttle_period - diff
left_to_sleep: float = throttle_period - diff
if left_to_sleep > 0:
cs: trio.CancelScope
with trio.move_on_after(left_to_sleep) as cs:
sym: str
last_quote: dict
try:
sym, last_quote = await quote_stream.receive()
except trio.EndOfChannel:
log.exception(f"feed for {stream} ended?")
log.exception(
f'Live stream for feed for ended?\n'
f'<=c\n'
f' |_[{stream!r}\n'
)
break
diff = time.time() - last_send
diff: float = time.time() - last_send
if not first_quote:
first_quote = last_quote
first_quote: float = last_quote
# first_quote['tbt'] = ticks_by_type
if (throttle_period - diff) > 0:
@ -872,7 +919,9 @@ async def uniform_rate_send(
# TODO: now if only we could sync this to the display
# rate timing exactly lul
try:
await stream.send({sym: first_quote})
await stream.send({
sym: first_quote
})
except tractor.RemoteActorError as rme:
if rme.type is not tractor._exceptions.StreamOverrun:
raise
@ -883,19 +932,28 @@ async def uniform_rate_send(
f'{sym}:{ctx.cid}@{chan.uid}'
)
except (
# NOTE: any of these can be raised by ``tractor``'s IPC
# NOTE: any of these can be raised by `tractor`'s IPC
# transport-layer and we want to be highly resilient
# to consumers which crash or lose network connection.
# I.e. we **DO NOT** want to crash and propagate up to
# ``pikerd`` these kinds of errors!
trio.ClosedResourceError,
trio.BrokenResourceError,
except (
ConnectionResetError,
):
) + Sampler.bcast_errors as ipc_err:
match ipc_err:
case trio.EndOfChannel():
log.info(
f'{stream} terminated by peer,\n'
f'{ipc_err!r}'
)
case _:
# if the feed consumer goes down then drop
# out of this rate limiter
log.warning(f'{stream} closed')
log.warning(
f'{stream} closed due to,\n'
f'{ipc_err!r}'
)
await stream.aclose()
return

View File

@ -31,6 +31,7 @@ from pathlib import Path
from pprint import pformat
from typing import (
Any,
Callable,
Sequence,
Hashable,
TYPE_CHECKING,
@ -56,7 +57,7 @@ from piker.brokers import (
)
if TYPE_CHECKING:
from ..accounting import (
from piker.accounting import (
Asset,
MktPair,
)
@ -149,19 +150,36 @@ class SymbologyCache(Struct):
'Implement `Client.get_assets()`!'
)
if get_mkt_pairs := getattr(client, 'get_mkt_pairs', None):
get_mkt_pairs: Callable|None = getattr(
client,
'get_mkt_pairs',
None,
)
if not get_mkt_pairs:
log.warning(
'No symbology cache `Pair` support for `{provider}`..\n'
'Implement `Client.get_mkt_pairs()`!'
)
return self
pairs: dict[str, Struct] = await get_mkt_pairs()
for bs_fqme, pair in pairs.items():
if not pairs:
log.warning(
'No pairs from intial {provider!r} sym-cache request?\n\n'
'`Client.get_mkt_pairs()` -> {pairs!r} ?'
)
return self
# NOTE: every backend defined pair should
# declare it's ns path for roundtrip
# serialization lookup.
for bs_fqme, pair in pairs.items():
if not getattr(pair, 'ns_path', None):
# XXX: every backend defined pair must declare
# a `.ns_path: tractor.NamespacePath` to enable
# roundtrip serialization lookup from a local
# cache file.
raise TypeError(
f'Pair-struct for {self.mod.name} MUST define a '
'`.ns_path: str`!\n'
f'{pair}'
'`.ns_path: str`!\n\n'
f'{pair!r}'
)
entry = await self.mod.get_mkt_info(pair.bs_fqme)
@ -195,12 +213,6 @@ class SymbologyCache(Struct):
pair,
)
else:
log.warning(
'No symbology cache `Pair` support for `{provider}`..\n'
'Implement `Client.get_mkt_pairs()`!'
)
return self
@classmethod

View File

@ -786,7 +786,6 @@ async def install_brokerd_search(
@acm
async def maybe_open_feed(
fqmes: list[str],
loglevel: str | None = None,
@ -840,13 +839,12 @@ async def maybe_open_feed(
@acm
async def open_feed(
fqmes: list[str],
loglevel: str | None = None,
loglevel: str|None = None,
allow_overruns: bool = True,
start_stream: bool = True,
tick_throttle: float | None = None, # Hz
tick_throttle: float|None = None, # Hz
allow_remote_ctl_ui: bool = False,

View File

@ -36,10 +36,10 @@ from ._sharedmem import (
ShmArray,
_Token,
)
from piker.accounting import MktPair
if TYPE_CHECKING:
from ..accounting import MktPair
from .feed import Feed
from piker.data.feed import Feed
class Flume(Struct):
@ -82,7 +82,7 @@ class Flume(Struct):
# TODO: do we need this really if we can pull the `Portal` from
# ``tractor``'s internals?
feed: Feed | None = None
feed: Feed|None = None
@property
def rt_shm(self) -> ShmArray:

View File

@ -113,9 +113,9 @@ def validate_backend(
)
if ep is None:
log.warning(
f'Provider backend {mod.name} is missing '
f'{daemon_name} support :(\n'
f'The following endpoint is missing: {name}'
f'Provider backend {mod.name!r} is missing '
f'{daemon_name!r} support?\n'
f'|_module endpoint-func missing: {name!r}\n'
)
inits: list[

View File

@ -19,6 +19,10 @@ Log like a forester!
"""
import logging
import json
import reprlib
from typing import (
Callable,
)
import tractor
from pygments import (
@ -84,3 +88,29 @@ def colorize_json(
# likeable styles: algol_nu, tango, monokai
formatters.TerminalTrueColorFormatter(style=style)
)
# TODO, eventually defer to the version in `modden` once
# it becomes a dep!
def mk_repr(
**repr_kws,
) -> Callable[[str], str]:
'''
Allocate and deliver a `repr.Repr` instance with provided input
settings using the std-lib's `reprlib` mod,
* https://docs.python.org/3/library/reprlib.html
------ Ex. ------
An up to 6-layer-nested `dict` as multi-line:
- https://stackoverflow.com/a/79102479
- https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
'''
def_kws: dict[str, int] = dict(
indent=2,
maxlevel=6, # recursion levels
maxstring=66, # match editor line-len limit
)
def_kws |= repr_kws
reprr = reprlib.Repr(**def_kws)
return reprr.repr

View File

@ -119,6 +119,10 @@ async def open_piker_runtime(
# spawn other specialized daemons I think?
enable_modules=enable_modules,
# TODO: how to configure this?
# keep it on by default if debug mode is set?
maybe_enable_greenback=False,
**tractor_kwargs,
) as actor,

View File

@ -21,230 +21,4 @@ Extensions to built-in or (heavily used but 3rd party) friend-lib
types.
'''
from __future__ import annotations
from collections import UserList
from pprint import (
saferepr,
)
from typing import Any
from msgspec import (
msgpack,
Struct as _Struct,
structs,
)
class DiffDump(UserList):
'''
Very simple list delegator that repr() dumps (presumed) tuple
elements of the form `tuple[str, Any, Any]` in a nice
multi-line readable form for analyzing `Struct` diffs.
'''
def __repr__(self) -> str:
if not len(self):
return super().__repr__()
# format by displaying item pair's ``repr()`` on multiple,
# indented lines such that they are more easily visually
# comparable when printed to console when printed to
# console.
repstr: str = '[\n'
for k, left, right in self:
repstr += (
f'({k},\n'
f'\t{repr(left)},\n'
f'\t{repr(right)},\n'
')\n'
)
repstr += ']\n'
return repstr
class Struct(
_Struct,
# https://jcristharif.com/msgspec/structs.html#tagged-unions
# tag='pikerstruct',
# tag=True,
):
'''
A "human friendlier" (aka repl buddy) struct subtype.
'''
def _sin_props(self) -> Iterator[
tuple[
structs.FieldIinfo,
str,
Any,
]
]:
'''
Iterate over all non-@property fields of this struct.
'''
fi: structs.FieldInfo
for fi in structs.fields(self):
key: str = fi.name
val: Any = getattr(self, key)
yield fi, key, val
def to_dict(
self,
include_non_members: bool = True,
) -> dict:
'''
Like it sounds.. direct delegation to:
https://jcristharif.com/msgspec/api.html#msgspec.structs.asdict
BUT, by default we pop all non-member (aka not defined as
struct fields) fields by default.
'''
asdict: dict = structs.asdict(self)
if include_non_members:
return asdict
# only return a dict of the struct members
# which were provided as input, NOT anything
# added as type-defined `@property` methods!
sin_props: dict = {}
fi: structs.FieldInfo
for fi, k, v in self._sin_props():
sin_props[k] = asdict[k]
return sin_props
def pformat(
self,
field_indent: int = 2,
indent: int = 0,
) -> str:
'''
Recursion-safe `pprint.pformat()` style formatting of
a `msgspec.Struct` for sane reading by a human using a REPL.
'''
# global whitespace indent
ws: str = ' '*indent
# field whitespace indent
field_ws: str = ' '*(field_indent + indent)
# qtn: str = ws + self.__class__.__qualname__
qtn: str = self.__class__.__qualname__
obj_str: str = '' # accumulator
fi: structs.FieldInfo
k: str
v: Any
for fi, k, v in self._sin_props():
# TODO: how can we prefer `Literal['option1', 'option2,
# ..]` over .__name__ == `Literal` but still get only the
# latter for simple types like `str | int | None` etc..?
ft: type = fi.type
typ_name: str = getattr(ft, '__name__', str(ft))
# recurse to get sub-struct's `.pformat()` output Bo
if isinstance(v, Struct):
val_str: str = v.pformat(
indent=field_indent + indent,
field_indent=indent + field_indent,
)
else: # the `pprint` recursion-safe format:
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
val_str: str = saferepr(v)
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
return (
f'{qtn}(\n'
f'{obj_str}'
f'{ws})'
)
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty?
# def __repr__(self) -> str:
# ...
# __str__ = __repr__ = pformat
__repr__ = pformat
def copy(
self,
update: dict | None = None,
) -> Struct:
'''
Validate-typecast all self defined fields, return a copy of
us with all such fields.
NOTE: This is kinda like the default behaviour in
`pydantic.BaseModel` except a copy of the object is
returned making it compat with `frozen=True`.
'''
if update:
for k, v in update.items():
setattr(self, k, v)
# NOTE: roundtrip serialize to validate
# - enode to msgpack binary format,
# - decode that back to a struct.
return msgpack.Decoder(type=type(self)).decode(
msgpack.Encoder().encode(self)
)
def typecast(
self,
# TODO: allow only casting a named subset?
# fields: set[str] | None = None,
) -> None:
'''
Cast all fields using their declared type annotations
(kinda like what `pydantic` does by default).
NOTE: this of course won't work on frozen types, use
``.copy()`` above in such cases.
'''
# https://jcristharif.com/msgspec/api.html#msgspec.structs.fields
fi: structs.FieldInfo
for fi in structs.fields(self):
setattr(
self,
fi.name,
fi.type(getattr(self, fi.name)),
)
def __sub__(
self,
other: Struct,
) -> DiffDump[tuple[str, Any, Any]]:
'''
Compare fields/items key-wise and return a ``DiffDump``
for easy visual REPL comparison B)
'''
diffs: DiffDump[tuple[str, Any, Any]] = DiffDump()
for fi in structs.fields(self):
attr_name: str = fi.name
ours: Any = getattr(self, attr_name)
theirs: Any = getattr(other, attr_name)
if ours != theirs:
diffs.append((
attr_name,
ours,
theirs,
))
return diffs
from tractor.msg import Struct as Struct

View File

@ -18,6 +18,22 @@
requires = ["hatchling"]
build-backend = "hatchling.build"
# ------ - ------
[tool.ruff.lint]
# https://docs.astral.sh/ruff/settings/#lint_ignore
ignore = []
# https://docs.astral.sh/ruff/settings/#lint_per-file-ignores
"piker/ui/qt.py" = [
"E402",
'F401', # unused imports (without __all__ or blah as blah)
# "F841", # unused variable rules
]
# ignore-init-module-imports = false
# ------ - ------
[project]
name = "piker"
version = "0.1.0a0dev0"
@ -87,23 +103,15 @@ uis = [
"pyqt6 >=6.7.0, <7.0.0",
"pyqtgraph",
# for consideration,
# - 'visidata'
# ------ - ------
# TODO: add an `--only daemon` group for running non-ui / pikerd
# service tree in distributed mode B)
# https://docs.astral.sh/uv/concepts/projects/dependencies/#optional-dependencies
# [project.optional-dependencies]
]
[dependency-groups]
# TODO: a toolset that makes debugging a `pikerd` service (tree) easy
# to hack on directly using more or less the local env:
# - xonsh + xxh
# - rsyscall + pdbp
# - actor runtime control console like BEAM/OTP
#
# console ehancements and eventually remote debugging extras/helpers.
# use `uv --dev` to enable
dev = [
"pytest >=6.0.0, <7.0.0",
"elasticsearch >=8.9.0, <9.0.0",
@ -111,7 +119,13 @@ dev = [
"prompt-toolkit ==3.0.40",
"cython >=3.0.0, <4.0.0",
"greenback >=1.1.1, <2.0.0",
"ruff>=0.9.6",
# console ehancements and eventually remote debugging
# extras/helpers.
# TODO: add a toolset that makes debugging a `pikerd` service
# (tree) easy to hack on directly using more or less the local env:
# - xonsh + xxh
# - rsyscall + pdbp
# - actor runtime control console like BEAM/OTP
]
[project.scripts]

View File

@ -1,93 +0,0 @@
# from default `ruff.toml` @
# https://docs.astral.sh/ruff/configuration/
# Exclude a variety of commonly ignored directories.
exclude = [
".bzr",
".direnv",
".eggs",
".git",
".git-rewrite",
".hg",
".ipynb_checkpoints",
".mypy_cache",
".nox",
".pants.d",
".pyenv",
".pytest_cache",
".pytype",
".ruff_cache",
".svn",
".tox",
".venv",
".vscode",
"__pypackages__",
"_build",
"buck-out",
"build",
"dist",
"node_modules",
"site-packages",
"venv",
]
# Same as Black.
line-length = 88
indent-width = 4
# Assume Python 3.9
target-version = "py312"
# ------ - ------
# TODO, stop warnings around `anext()` builtin use?
# tool.ruff.target-version = "py310"
[lint]
# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default.
# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or
# McCabe complexity (`C901`) by default.
select = ["E4", "E7", "E9", "F"]
ignore = []
ignore-init-module-imports = false
[lint.per-file-ignores]
"piker/ui/qt.py" = [
"E402",
'F401', # unused imports (without __all__ or blah as blah)
# "F841", # unused variable rules
]
# Allow fix for all enabled rules (when `--fix`) is provided.
fixable = ["ALL"]
unfixable = []
# Allow unused variables when underscore-prefixed.
dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
[format]
# Use single quotes in `ruff format`.
quote-style = "single"
# Like Black, indent with spaces, rather than tabs.
indent-style = "space"
# Like Black, respect magic trailing commas.
skip-magic-trailing-comma = false
# Like Black, automatically detect the appropriate line ending.
line-ending = "auto"
# Enable auto-formatting of code examples in docstrings. Markdown,
# reStructuredText code/literal blocks and doctests are all supported.
#
# This is currently disabled by default, but it is planned for this
# to be opt-out in the future.
docstring-code-format = false
# Set the line length limit used when formatting code snippets in
# docstrings.
#
# This only has an effect when the `docstring-code-format` setting is
# enabled.
docstring-code-line-length = "dynamic"

31
uv.lock
View File

@ -708,7 +708,6 @@ dev = [
{ name = "greenback" },
{ name = "prompt-toolkit" },
{ name = "pytest" },
{ name = "ruff" },
{ name = "xonsh" },
]
@ -740,7 +739,7 @@ requires-dist = [
{ name = "tomli", specifier = ">=2.0.1,<3.0.0" },
{ name = "tomli-w", specifier = ">=1.0.0,<2.0.0" },
{ name = "tomlkit", git = "https://github.com/pikers/tomlkit.git?branch=piker_pin" },
{ name = "tractor", editable = "../tractor" },
{ name = "tractor", directory = "../tractor" },
{ name = "trio", specifier = ">=0.24,<0.25" },
{ name = "trio-util", specifier = ">=0.7.0,<0.8.0" },
{ name = "trio-websocket", specifier = ">=0.10.3,<0.11.0" },
@ -755,7 +754,6 @@ dev = [
{ name = "greenback", specifier = ">=1.1.1,<2.0.0" },
{ name = "prompt-toolkit", specifier = "==3.0.40" },
{ name = "pytest", specifier = ">=6.0.0,<7.0.0" },
{ name = "ruff", specifier = ">=0.9.6" },
{ name = "xonsh", specifier = ">=0.14.2,<0.15.0" },
]
@ -1075,31 +1073,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/19/71/39c7c0d87f8d4e6c020a393182060eaefeeae6c01dab6a84ec346f2567df/rich-13.9.4-py3-none-any.whl", hash = "sha256:6049d5e6ec054bf2779ab3358186963bac2ea89175919d699e378b99738c2a90", size = 242424 },
]
[[package]]
name = "ruff"
version = "0.9.6"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/2a/e1/e265aba384343dd8ddd3083f5e33536cd17e1566c41453a5517b5dd443be/ruff-0.9.6.tar.gz", hash = "sha256:81761592f72b620ec8fa1068a6fd00e98a5ebee342a3642efd84454f3031dca9", size = 3639454 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/76/e3/3d2c022e687e18cf5d93d6bfa2722d46afc64eaa438c7fbbdd603b3597be/ruff-0.9.6-py3-none-linux_armv6l.whl", hash = "sha256:2f218f356dd2d995839f1941322ff021c72a492c470f0b26a34f844c29cdf5ba", size = 11714128 },
{ url = "https://files.pythonhosted.org/packages/e1/22/aff073b70f95c052e5c58153cba735748c9e70107a77d03420d7850710a0/ruff-0.9.6-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:b908ff4df65dad7b251c9968a2e4560836d8f5487c2f0cc238321ed951ea0504", size = 11682539 },
{ url = "https://files.pythonhosted.org/packages/75/a7/f5b7390afd98a7918582a3d256cd3e78ba0a26165a467c1820084587cbf9/ruff-0.9.6-py3-none-macosx_11_0_arm64.whl", hash = "sha256:b109c0ad2ececf42e75fa99dc4043ff72a357436bb171900714a9ea581ddef83", size = 11132512 },
{ url = "https://files.pythonhosted.org/packages/a6/e3/45de13ef65047fea2e33f7e573d848206e15c715e5cd56095589a7733d04/ruff-0.9.6-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1de4367cca3dac99bcbd15c161404e849bb0bfd543664db39232648dc00112dc", size = 11929275 },
{ url = "https://files.pythonhosted.org/packages/7d/f2/23d04cd6c43b2e641ab961ade8d0b5edb212ecebd112506188c91f2a6e6c/ruff-0.9.6-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:ac3ee4d7c2c92ddfdaedf0bf31b2b176fa7aa8950efc454628d477394d35638b", size = 11466502 },
{ url = "https://files.pythonhosted.org/packages/b5/6f/3a8cf166f2d7f1627dd2201e6cbc4cb81f8b7d58099348f0c1ff7b733792/ruff-0.9.6-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:5dc1edd1775270e6aa2386119aea692039781429f0be1e0949ea5884e011aa8e", size = 12676364 },
{ url = "https://files.pythonhosted.org/packages/f5/c4/db52e2189983c70114ff2b7e3997e48c8318af44fe83e1ce9517570a50c6/ruff-0.9.6-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:4a091729086dffa4bd070aa5dab7e39cc6b9d62eb2bef8f3d91172d30d599666", size = 13335518 },
{ url = "https://files.pythonhosted.org/packages/66/44/545f8a4d136830f08f4d24324e7db957c5374bf3a3f7a6c0bc7be4623a37/ruff-0.9.6-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:d1bbc6808bf7b15796cef0815e1dfb796fbd383e7dbd4334709642649625e7c5", size = 12823287 },
{ url = "https://files.pythonhosted.org/packages/c5/26/8208ef9ee7431032c143649a9967c3ae1aae4257d95e6f8519f07309aa66/ruff-0.9.6-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:589d1d9f25b5754ff230dce914a174a7c951a85a4e9270613a2b74231fdac2f5", size = 14592374 },
{ url = "https://files.pythonhosted.org/packages/31/70/e917781e55ff39c5b5208bda384fd397ffd76605e68544d71a7e40944945/ruff-0.9.6-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dc61dd5131742e21103fbbdcad683a8813be0e3c204472d520d9a5021ca8b217", size = 12500173 },
{ url = "https://files.pythonhosted.org/packages/84/f5/e4ddee07660f5a9622a9c2b639afd8f3104988dc4f6ba0b73ffacffa9a8c/ruff-0.9.6-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:5e2d9126161d0357e5c8f30b0bd6168d2c3872372f14481136d13de9937f79b6", size = 11906555 },
{ url = "https://files.pythonhosted.org/packages/f1/2b/6ff2fe383667075eef8656b9892e73dd9b119b5e3add51298628b87f6429/ruff-0.9.6-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:68660eab1a8e65babb5229a1f97b46e3120923757a68b5413d8561f8a85d4897", size = 11538958 },
{ url = "https://files.pythonhosted.org/packages/3c/db/98e59e90de45d1eb46649151c10a062d5707b5b7f76f64eb1e29edf6ebb1/ruff-0.9.6-py3-none-musllinux_1_2_i686.whl", hash = "sha256:c4cae6c4cc7b9b4017c71114115db0445b00a16de3bcde0946273e8392856f08", size = 12117247 },
{ url = "https://files.pythonhosted.org/packages/ec/bc/54e38f6d219013a9204a5a2015c09e7a8c36cedcd50a4b01ac69a550b9d9/ruff-0.9.6-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:19f505b643228b417c1111a2a536424ddde0db4ef9023b9e04a46ed8a1cb4656", size = 12554647 },
{ url = "https://files.pythonhosted.org/packages/a5/7d/7b461ab0e2404293c0627125bb70ac642c2e8d55bf590f6fce85f508f1b2/ruff-0.9.6-py3-none-win32.whl", hash = "sha256:194d8402bceef1b31164909540a597e0d913c0e4952015a5b40e28c146121b5d", size = 9949214 },
{ url = "https://files.pythonhosted.org/packages/ee/30/c3cee10f915ed75a5c29c1e57311282d1a15855551a64795c1b2bbe5cf37/ruff-0.9.6-py3-none-win_amd64.whl", hash = "sha256:03482d5c09d90d4ee3f40d97578423698ad895c87314c4de39ed2af945633caa", size = 10999914 },
{ url = "https://files.pythonhosted.org/packages/e8/a8/d71f44b93e3aa86ae232af1f2126ca7b95c0f515ec135462b3e1f351441c/ruff-0.9.6-py3-none-win_arm64.whl", hash = "sha256:0e2bb706a2be7ddfea4a4af918562fdc1bcb16df255e5fa595bbd800ce322a5a", size = 10177499 },
]
[[package]]
name = "shellingham"
version = "1.5.4"
@ -1215,7 +1188,7 @@ source = { git = "https://github.com/pikers/tomlkit.git?branch=piker_pin#8e0239a
[[package]]
name = "tractor"
version = "0.1.0a6.dev0"
source = { editable = "../tractor" }
source = { directory = "../tractor" }
dependencies = [
{ name = "colorlog" },
{ name = "msgspec" },