Compare commits

..

No commits in common. "alt_tpts_for_perf" and "gitea_feats" have entirely different histories.

54 changed files with 1244 additions and 3051 deletions

View File

@ -88,23 +88,7 @@ a sane install with `uv`
************************
bc why install with `python` when you can faster with `rust` ::
uv sync
# ^ astral's docs,
# https://docs.astral.sh/uv/concepts/projects/sync/
include all GUIs ::
uv sync --extra uis
AND with all our hacking tools::
uv sync --dev --extra uis
Ensure you can run the root-daemon::
uv run pikerd [-l info --pdb]
uv lock
hacky install on nixos
@ -119,18 +103,7 @@ start a chart
*************
run a realtime OHLCV chart stand-alone::
[uv run] piker -l info chart btcusdt.spot.binance xmrusdt.spot.kraken
# ^^^ iff you haven't activated the py-env,
# - https://docs.astral.sh/uv/concepts/projects/run/
#
# in order to create an explicit virt-env see,
# - https://docs.astral.sh/uv/concepts/projects/layout/#the-project-environment
# - https://docs.astral.sh/uv/pip/environments/
#
# use $UV_PROJECT_ENVIRONMENT to select any non-`.venv/`
# as the venv sudir in the repo's root.
# - https://docs.astral.sh/uv/reference/environment/#uv_project_environment
piker -l info chart btcusdt.spot.binance xmrusdt.spot.kraken
this runs a chart UI (with 1m sampled OHLCV) and shows 2 spot markets from 2 diff cexes
overlayed on the same graph. Use of `piker` without first starting

View File

@ -121,7 +121,6 @@ async def bot_main():
# tick_throttle=10,
) as feed,
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
):
assert accounts

View File

@ -1,338 +0,0 @@
#!/usr/bin/env python
from decimal import (
Decimal,
)
from pathlib import Path
import numpy as np
# import polars as pl
import trio
import tractor
from datetime import datetime
# from pprint import pformat
from piker.brokers.deribit.api import (
get_client,
maybe_open_oi_feed,
)
from piker.storage import open_storage_client, StorageClient
from piker.log import get_logger
import sys
import pyqtgraph as pg
from PyQt6 import QtCore
from pyqtgraph import ScatterPlotItem, InfiniteLine
from PyQt6.QtWidgets import QApplication
from cryptofeed.symbols import Symbol
log = get_logger(__name__)
# XXX, use 2 newlines between top level LOC (even between these
# imports and the next function line ;)
def check_if_complete(
oi: dict[str, dict[str, Decimal | None]]
) -> bool:
return all(
oi[strike]['C'] is not None
and
oi[strike]['P'] is not None for strike in oi
)
async def max_pain_daemon(
) -> None:
oi_by_strikes: dict[str, dict[str, Decimal | None]]
instruments: list[Symbol] = []
expiry_dates: list[str]
expiry_date: str
currency: str = 'btc'
kind: str = 'option'
async with get_client(
) as client:
expiry_dates: list[str] = await client.get_expiration_dates(
currency=currency,
kind=kind
)
log.info(
f'Available expiries for {currency!r}-{kind}:\n'
f'{expiry_dates}\n'
)
expiry_date: str = input(
'Please enter a valid expiration date: '
).upper()
print('Starting little daemon...')
# maybe move this type annot down to the assignment line?
oi_by_strikes: dict[str, dict[str, Decimal]]
instruments = await client.get_instruments(
expiry_date=expiry_date,
)
oi_by_strikes = client.get_strikes_dict(instruments)
def get_total_intrinsic_values(
oi_by_strikes: dict[str, dict[str, Decimal]]
) -> dict[str, dict[str, Decimal]]:
call_cash: Decimal = Decimal(0)
put_cash: Decimal = Decimal(0)
intrinsic_values: dict[str, dict[str, Decimal]] = {}
closes: list = sorted(Decimal(close) for close in oi_by_strikes)
for strike, oi in oi_by_strikes.items():
s = Decimal(strike)
call_cash = sum(max(0, (s - c) * oi_by_strikes[str(c)]['C']) for c in closes)
put_cash = sum(max(0, (c - s) * oi_by_strikes[str(c)]['P']) for c in closes)
intrinsic_values[strike] = {
'C': call_cash,
'P': put_cash,
'total': call_cash + put_cash,
}
return intrinsic_values
def get_intrinsic_value_and_max_pain(
intrinsic_values: dict[str, dict[str, Decimal]]
):
# We meed to find the lowest value, so we start at
# infinity to ensure that, and the max_pain must be
# an amount greater than zero.
total_intrinsic_value: Decimal = Decimal('Infinity')
max_pain: Decimal = Decimal(0)
for strike, oi in oi_by_strikes.items():
s = Decimal(strike)
if intrinsic_values[strike]['total'] < total_intrinsic_value:
total_intrinsic_value = intrinsic_values[strike]['total']
max_pain = s
return total_intrinsic_value, max_pain
def plot_graph(
oi_by_strikes: dict[str, dict[str, Decimal]],
plot,
):
"""Update the bar graph with new open interest data."""
plot.clear()
intrinsic_values = get_total_intrinsic_values(oi_by_strikes)
for strike_str in sorted(oi_by_strikes, key=lambda x: int(x)):
strike = int(strike_str)
calls_val = float(oi_by_strikes[strike_str]['C'])
puts_val = float(oi_by_strikes[strike_str]['P'])
bar_c = pg.BarGraphItem(
x=[strike - 100],
height=[calls_val],
width=200,
pen='w',
brush=(0, 0, 255, 150)
)
plot.addItem(bar_c)
bar_p = pg.BarGraphItem(
x=[strike + 100],
height=[puts_val],
width=200,
pen='w',
brush=(255, 0, 0, 150)
)
plot.addItem(bar_p)
total_val = float(intrinsic_values[strike_str]['total']) / 100000
scatter_iv = ScatterPlotItem(
x=[strike],
y=[total_val],
pen=pg.mkPen(color=(0, 255, 0), width=2),
brush=pg.mkBrush(0, 255, 0, 150),
size=3,
symbol='o'
)
plot.addItem(scatter_iv)
_, max_pain = get_intrinsic_value_and_max_pain(intrinsic_values)
vertical_line = InfiniteLine(
pos=max_pain,
angle=90,
pen=pg.mkPen(color='yellow', width=1, style=QtCore.Qt.PenStyle.DotLine),
label=f'Max pain: {max_pain:,.0f}',
labelOpts={
'position': 0.85,
'color': 'yellow',
'movable': True
}
)
plot.addItem(vertical_line)
def update_oi_by_strikes(msg: tuple):
nonlocal oi_by_strikes
if 'oi' == msg[0]:
strike_price = msg[1]['strike_price']
option_type = msg[1]['option_type']
open_interest = msg[1]['open_interest']
oi_by_strikes.setdefault(
strike_price, {}
).update(
{option_type: open_interest}
)
# Define the structured dtype
dtype = np.dtype([
('time', int),
('oi', float),
('oi_calc', float),
])
async def write_open_interest_on_file(msg: tuple, client: StorageClient):
if 'oi' == msg[0]:
nonlocal expiry_date
timestamp = msg[1]['timestamp']
strike_price = msg[1]["strike_price"]
option_type = msg[1]['option_type'].lower()
col_sym_key = f'btc-{expiry_date.lower()}-{strike_price}-{option_type}'
# Create the numpy array with sample data
data = np.array([
(
int(timestamp),
float(msg[1]['open_interest']),
np.nan,
),
], dtype=dtype)
path: Path = await client.write_oi(
col_sym_key,
data,
)
# TODO, use std logging like this throughout for status
# emissions on console!
log.info(f'Wrote OI history to {path}')
def get_max_pain(
oi_by_strikes: dict[str, dict[str, Decimal]]
) -> dict[str, str | Decimal]:
'''
This method requires only the strike_prices and oi for call
and puts, the closes list are the same as the strike_prices
the idea is to sum all the calls and puts cash for each strike
and the ITM strikes from that strike, the lowest value is what we
are looking for the intrinsic value.
'''
nonlocal timestamp
intrinsic_values = get_total_intrinsic_values(oi_by_strikes)
total_intrinsic_value, max_pain = get_intrinsic_value_and_max_pain(intrinsic_values)
return {
'timestamp': timestamp,
'expiry_date': expiry_date,
'total_intrinsic_value': total_intrinsic_value,
'max_pain': max_pain,
}
async with (
open_storage_client() as (_, storage),
maybe_open_oi_feed(
instruments,
) as oi_feed,
):
# Initialize QApplication
app = QApplication(sys.argv)
win = pg.GraphicsLayoutWidget(show=True)
win.setWindowTitle('Calls (blue) vs Puts (red)')
plot = win.addPlot(title='OI by Strikes')
plot.showGrid(x=True, y=True)
print('Plot initialized...')
async for msg in oi_feed:
# In memory oi_by_strikes dict, all message are filtered here
# and the dict is updated with the open interest data
update_oi_by_strikes(msg)
# Write on file using storage client
await write_open_interest_on_file(msg, storage)
# Max pain calcs, before start we must gather all the open interest for
# all the strike prices and option types available for a expiration date
if check_if_complete(oi_by_strikes):
if 'oi' == msg[0]:
# Here we must read for the filesystem all the latest open interest value for
# each instrument for that specific expiration date, that means look up for the
# last update got the instrument btc-{expity_date}-*oi1s.parquet (1s because is
# hardcoded to something, sorry.)
timestamp = msg[1]['timestamp']
max_pain = get_max_pain(oi_by_strikes)
# intrinsic_values = get_total_intrinsic_values(oi_by_strikes)
# graph here
plot_graph(oi_by_strikes, plot)
# TODO, use a single multiline string with `()`
# and drop the multiple `print()` calls (this
# should be done elsewhere in this file as well!
#
# As per the docs,
# https://docs.python.org/3/reference/lexical_analysis.html#string-literal-concatenation
# you could instead do,
# print(
# '-----------------------------------------------\n'
# f'timestamp: {datetime.fromtimestamp(max_pain['timestamp'])}\n'
# )
# WHY?
# |_ less ctx-switches/calls to `print()`
# |_ the `str` can then be modified / passed
# around as a variable more easily if needed in
# the future ;)
#
# ALSO, i believe there already is a stdlib
# module to do "alignment" of text which you
# could try for doing the right-side alignment,
# https://docs.python.org/3/library/textwrap.html#textwrap.indent
#
print('-----------------------------------------------')
print(f'timestamp: {datetime.fromtimestamp(max_pain['timestamp'])}')
print(f'expiry_date: {max_pain['expiry_date']}')
print(f'max_pain: {max_pain['max_pain']:,.0f}')
print(f'total intrinsic value: {max_pain['total_intrinsic_value']:,.0f}')
print('-----------------------------------------------')
# Process GUI events to keep the window responsive
app.processEvents()
async def main():
async with tractor.open_nursery(
debug_mode=True,
loglevel='info',
) as an:
from tractor import log
log.get_console_log(level='info')
ptl: tractor.Portal = await an.start_actor(
'max_pain_daemon',
enable_modules=[__name__],
infect_asyncio=True,
# ^TODO, we can actually run this in the root-actor now
# if needed as per 2nd "section" in,
# https://pikers.dev/goodboy/tractor/pulls/2
#
# NOTE, will first require us porting to modern
# `tractor:main` though ofc!
)
await ptl.run(max_pain_daemon)
if __name__ == '__main__':
trio.run(main)

View File

@ -1,29 +0,0 @@
## Max Pain Calculation for Deribit Options
This feature, which calculates the max pain point for options traded
on the Deribit exchange using cryptofeed library.
- Functions in the api module for fetching options data from Deribit.
[commit](https://pikers.dev/pikers/piker/commit/da55856dd2876291f55a06eb0561438a912d8241)
- Compute the max pain point based on open interest data using
deribit's api.
[commit](https://pikers.dev/pikers/piker/commit/0d9d6e15ba0edeb662ec97f7599dd66af3046b94)
### How to test it?
**Before start:** in order to get this working with `uv`, you
**must** use my [`tractor` fork](https://pikers.dev/ntorres/tractor/src/branch/aio_abandons)
and this branch: `aio_abandons`, the reason is that I cherry-pick the
`uv_migration` that guille made, for some reason that a didn't dive
into, in my system y need tractor using `uv` too. quite hacky
I guess.
1. `uv lock`
2. `uv run --no-dev python examples/max_pain.py`
3. A message should be display, enter one of the expiration date
available.
4. The script should be up and running.

View File

@ -42,6 +42,7 @@ from ._mktinfo import (
dec_digits,
digits_to_dec,
MktPair,
Symbol,
unpack_fqme,
_derivs as DerivTypes,
)
@ -59,6 +60,7 @@ __all__ = [
'Asset',
'MktPair',
'Position',
'Symbol',
'Transaction',
'TransactionLedger',
'dec_digits',

View File

@ -390,8 +390,8 @@ class MktPair(Struct, frozen=True):
cls,
fqme: str,
price_tick: float|str,
size_tick: float|str,
price_tick: float | str,
size_tick: float | str,
bs_mktid: str,
broker: str | None = None,
@ -677,3 +677,90 @@ def unpack_fqme(
# '.'.join([mkt_ep, venue]),
suffix,
)
class Symbol(Struct):
'''
I guess this is some kinda container thing for dealing with
all the different meta-data formats from brokers?
'''
key: str
broker: str = ''
venue: str = ''
# precision descriptors for price and vlm
tick_size: Decimal = Decimal('0.01')
lot_tick_size: Decimal = Decimal('0.0')
suffix: str = ''
broker_info: dict[str, dict[str, Any]] = {}
@classmethod
def from_fqme(
cls,
fqsn: str,
info: dict[str, Any],
) -> Symbol:
broker, mktep, venue, suffix = unpack_fqme(fqsn)
tick_size = info.get('price_tick_size', 0.01)
lot_size = info.get('lot_tick_size', 0.0)
return Symbol(
broker=broker,
key=mktep,
tick_size=tick_size,
lot_tick_size=lot_size,
venue=venue,
suffix=suffix,
broker_info={broker: info},
)
@property
def type_key(self) -> str:
return list(self.broker_info.values())[0]['asset_type']
@property
def tick_size_digits(self) -> int:
return float_digits(self.tick_size)
@property
def lot_size_digits(self) -> int:
return float_digits(self.lot_tick_size)
@property
def price_tick(self) -> Decimal:
return Decimal(str(self.tick_size))
@property
def size_tick(self) -> Decimal:
return Decimal(str(self.lot_tick_size))
@property
def broker(self) -> str:
return list(self.broker_info.keys())[0]
@property
def fqme(self) -> str:
return maybe_cons_tokens([
self.key, # final "pair name" (eg. qqq[/usd], btcusdt)
self.venue,
self.suffix, # includes expiry and other con info
self.broker,
])
def quantize(
self,
size: float,
) -> Decimal:
digits = float_digits(self.lot_tick_size)
return Decimal(size).quantize(
Decimal(f'1.{"0".ljust(digits, "0")}'),
rounding=ROUND_HALF_EVEN
)
# NOTE: when cast to `str` return fqme
def __str__(self) -> str:
return self.fqme

View File

@ -362,11 +362,7 @@ class Position(Struct):
# added: bool = False
tid: str = t.tid
if tid in self._events:
log.debug(
f'Txn is already added?\n'
f'\n'
f'{t}\n'
)
log.warning(f'{t} is already added?!')
# return added
# TODO: apparently this IS possible with a dict but not
@ -700,7 +696,7 @@ class Account(Struct):
else:
# TODO: we reallly need a diff set of
# loglevels/colors per subsys.
log.debug(
log.warning(
f'Recent position for {fqme} was closed!'
)

View File

@ -22,9 +22,7 @@ you know when you're losing money (if possible) XD
from __future__ import annotations
from collections.abc import ValuesView
from contextlib import contextmanager as cm
from functools import partial
from math import copysign
from pprint import pformat
from typing import (
Any,
Callable,
@ -39,16 +37,12 @@ from pendulum import (
parse,
)
from ..log import get_logger
if TYPE_CHECKING:
from ._ledger import (
Transaction,
TransactionLedger,
)
log = get_logger(__name__)
def ppu(
clears: Iterator[Transaction],
@ -244,9 +238,6 @@ def iter_by_dt(
def dyn_parse_to_dt(
tx: tuple[str, dict[str, Any]] | Transaction,
debug: bool = False,
_invalid: list|None = None,
) -> DateTime:
# handle `.items()` inputs
@ -259,16 +250,11 @@ def iter_by_dt(
# get best parser for this record..
for k in parsers:
if (
(v := getattr(tx, k, None))
or
(
isdict
and
(v := tx.get(k))
)
isdict and k in tx
or getattr(tx, k, None)
):
# TODO? remove yah?
# v = tx[k] if isdict else tx.dt
v = tx[k] if isdict else tx.dt
assert v is not None, f'No valid value for `{k}`!?'
# only call parser on the value if not None from
# the `parsers` table above (when NOT using
@ -276,54 +262,21 @@ def iter_by_dt(
# sort on it directly
if (
not isinstance(v, DateTime)
and
(parser := parsers.get(k))
and (parser := parsers.get(k))
):
ret = parser(v)
return parser(v)
else:
ret = v
return v
return ret
else:
continue
# XXX: should never get here..
else:
if debug:
import tractor
with tractor.devx.maybe_open_crash_handler():
raise ValueError(
f'Invalid txn time ??\n'
f'txn-id: {k!r}\n'
f'{k!r}: {v!r}\n'
)
# assert v is not None, f'No valid value for `{k}`!?'
# XXX: should never get here..
breakpoint()
if _invalid is not None:
_invalid.append(tx)
return from_timestamp(0.)
# breakpoint()
entry: tuple[str, dict]|Transaction
invalid: list = []
entry: tuple[str, dict] | Transaction
for entry in sorted(
records,
key=key or partial(
dyn_parse_to_dt,
_invalid=invalid,
),
key=key or dyn_parse_to_dt,
):
if entry in invalid:
log.warning(
f'Ignoring txn w invalid timestamp ??\n'
f'{pformat(entry)}\n'
# f'txn-id: {k!r}\n'
# f'{k!r}: {v!r}\n'
)
continue
# NOTE the type sig above; either pairs or txns B)
yield entry

View File

@ -51,7 +51,6 @@ __brokers__: list[str] = [
'ib',
'kraken',
'kucoin',
'deribit',
# broken but used to work
# 'questrade',
@ -62,6 +61,7 @@ __brokers__: list[str] = [
# wstrade
# iex
# deribit
# bitso
]

View File

@ -96,10 +96,7 @@ async def _setup_persistent_brokerd(
# - `open_symbol_search()`
# NOTE: see ep invocation details inside `.data.feed`.
try:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as service_nursery
):
async with trio.open_nursery() as service_nursery:
bus: _FeedsBus = feed.get_feed_bus(
brokername,
service_nursery,

View File

@ -374,14 +374,9 @@ class Client:
pair: Pair = pair_type(**item)
except Exception as e:
e.add_note(
f'\n'
f'New or removed field we need to codify!\n'
f'pair-type: {pair_type!r}\n'
f'\n'
f"Don't panic, prolly stupid binance changed their symbology schema again..\n"
f'Check out their API docs here:\n'
f'\n'
f'https://binance-docs.github.io/apidocs/spot/en/#exchange-information\n'
"\nDon't panic, prolly stupid binance changed their symbology schema again..\n"
'Check out their API docs here:\n\n'
'https://binance-docs.github.io/apidocs/spot/en/#exchange-information'
)
raise
pair_table[pair.symbol.upper()] = pair

View File

@ -440,7 +440,6 @@ async def open_trade_dialog(
# - ledger: TransactionLedger
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
ctx.open_stream() as ems_stream,
):

View File

@ -97,13 +97,6 @@ class Pair(Struct, frozen=True, kw_only=True):
baseAsset: str
baseAssetPrecision: int
permissionSets: list[list[str]]
# https://developers.binance.com/docs/binance-spot-api-docs#2025-08-26
# will become non-optional 2025-08-28?
# https://developers.binance.com/docs/binance-spot-api-docs#future-changes
pegInstructionsAllowed: bool|None = None
filters: dict[
str,
str | int | float,
@ -149,11 +142,7 @@ class SpotPair(Pair, frozen=True):
defaultSelfTradePreventionMode: str
allowedSelfTradePreventionModes: list[str]
permissions: list[str]
# can the paint botz creat liq gaps even easier on this asset?
# Bp
# https://developers.binance.com/docs/binance-spot-api-docs/faqs/order_amend_keep_priority
amendAllowed: bool
permissionSets: list[list[str]]
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.binance:SpotPair'

View File

@ -25,7 +25,6 @@ from .api import (
get_client,
)
from .feed import (
get_mkt_info,
open_history_client,
open_symbol_search,
stream_quotes,
@ -35,20 +34,15 @@ from .feed import (
# open_trade_dialog,
# norm_trade_records,
# )
from .venues import (
OptionPair,
)
log = get_logger(__name__)
__all__ = [
'get_client',
# 'trades_dialogue',
'get_mkt_info',
'open_history_client',
'open_symbol_search',
'stream_quotes',
'OptionPair',
# 'norm_trade_records',
]

File diff suppressed because it is too large Load Diff

View File

@ -18,59 +18,38 @@
Deribit backend.
'''
from __future__ import annotations
from contextlib import asynccontextmanager as acm
from datetime import datetime
from typing import (
# Any,
# Optional,
Callable,
)
# from pprint import pformat
from typing import Any, Optional, Callable
import time
import cryptofeed
import trio
from trio_typing import TaskStatus
from pendulum import (
from_timestamp,
)
import pendulum
from rapidfuzz import process as fuzzy
import numpy as np
import tractor
from piker.accounting import (
Asset,
MktPair,
unpack_fqme,
)
from piker.brokers import (
open_cached_client,
NoData,
from piker.brokers import open_cached_client
from piker.log import get_logger, get_console_log
from piker.data import ShmArray
from piker.brokers._util import (
BrokerError,
DataUnavailable,
)
from piker._cacheables import (
async_lifo_cache,
)
from piker.log import (
get_logger,
mk_repr,
)
from piker.data.validate import FeedInit
from cryptofeed import FeedHandler
from cryptofeed.defines import (
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
)
from cryptofeed.symbols import Symbol
from .api import (
Client,
# get_config,
piker_sym_to_cb_sym,
cb_sym_to_deribit_inst,
str_to_cb_sym,
Client, Trade,
get_config,
str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
maybe_open_price_feed
)
from .venues import (
Pair,
OptionPair,
Trade,
)
_spawn_kwargs = {
'infect_asyncio': True,
@ -85,215 +64,90 @@ async def open_history_client(
mkt: MktPair,
) -> tuple[Callable, int]:
fnstrument: str = mkt.bs_fqme
# TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client:
pair: OptionPair = client._pairs[mkt.dst.name]
# XXX NOTE, the cuckers use ms !!!
creation_time_s: int = pair.creation_timestamp/1000
async def get_ohlc(
timeframe: float,
end_dt: datetime | None = None,
start_dt: datetime | None = None,
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
) -> tuple[
np.ndarray,
datetime, # start
datetime, # end
]:
if timeframe != 60:
raise DataUnavailable('Only 1m bars are supported')
array: np.ndarray = await client.bars(
mkt,
array = await client.bars(
instrument,
start_dt=start_dt,
end_dt=end_dt,
)
if len(array) == 0:
if (
end_dt is None
):
raise DataUnavailable(
'No history seems to exist yet?\n\n'
f'{mkt}'
)
elif (
end_dt
and
end_dt.timestamp() < creation_time_s
):
# the contract can't have history
# before it was created.
pair_type_str: str = type(pair).__name__
create_dt: datetime = from_timestamp(creation_time_s)
raise DataUnavailable(
f'No history prior to\n'
f'`{pair_type_str}.creation_timestamp: int = '
f'{pair.creation_timestamp}\n\n'
f'------ deribit sux ------\n'
f'WHICH IN "NORMAL PEOPLE WHO USE EPOCH TIME" form is,\n'
f'creation_time_s: {creation_time_s}\n'
f'create_dt: {create_dt}\n'
)
raise NoData(
f'No frame for {start_dt} -> {end_dt}\n'
)
raise DataUnavailable
start_dt = from_timestamp(array[0]['time'])
end_dt = from_timestamp(array[-1]['time'])
times = array['time']
if not times.any():
raise ValueError(
'Bad frame with null-times?\n\n'
f'{times}'
)
if end_dt is None:
inow: int = round(time.time())
if (inow - times[-1]) > 60:
await tractor.pause()
start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time'])
return array, start_dt, end_dt
yield (
get_ohlc,
{ # backfill config
'erlangs': 3,
'rate': 3,
}
)
@async_lifo_cache()
async def get_mkt_info(
fqme: str,
) -> tuple[MktPair, Pair|OptionPair] | None:
# uppercase since kraken bs_mktid is always upper
if 'deribit' not in fqme.lower():
fqme += '.deribit'
mkt_mode: str = ''
broker, mkt_ep, venue, expiry = unpack_fqme(fqme)
# NOTE: we always upper case all tokens to be consistent with
# binance's symbology style for pairs, like `BTCUSDT`, but in
# theory we could also just keep things lower case; as long as
# we're consistent and the symcache matches whatever this func
# returns, always!
expiry: str = expiry.upper()
venue: str = venue.upper()
# venue_lower: str = venue.lower()
mkt_mode: str = 'option'
async with open_cached_client(
'deribit',
) as client:
assets: dict[str, Asset] = await client.get_assets()
pair_str: str = mkt_ep.lower()
pair: Pair = await client.exch_info(
sym=pair_str,
)
mkt_mode = pair.venue
client.mkt_mode = mkt_mode
dst: Asset | None = assets.get(pair.bs_dst_asset)
src: Asset | None = assets.get(pair.bs_src_asset)
mkt = MktPair(
dst=dst,
src=src,
price_tick=pair.price_tick,
size_tick=pair.size_tick,
bs_mktid=pair.symbol,
venue=mkt_mode,
broker='deribit',
_atype=mkt_mode,
_fqme_without_src=True,
# expiry=pair.expiry,
# XXX TODO, currently we don't use it since it's
# already "described" in the `OptionPair.symbol: str`
# and if we slap in the ISO repr it's kinda hideous..
# -[ ] figure out the best either std
)
return mkt, pair
yield get_ohlc, {'erlangs': 3, 'rate': 3}
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
loglevel: str = None,
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Open a live quote stream for the market set defined by `symbols`.
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
Internally this starts a `cryptofeed.FeedHandler` inside an `asyncio`-side
task and relays through L1 and `Trade` msgs here to our `trio.Task`.
'''
sym = symbols[0].split('.')[0]
init_msgs: list[FeedInit] = []
# multiline nested `dict` formatter (since rn quote-msgs are
# just that).
pfmt: Callable[[str], str] = mk_repr(
# so we can see `deribit`'s delightfully mega-long bs fields..
maxstring=100,
)
sym = symbols[0]
async with (
open_cached_client('deribit') as client,
send_chan as send_chan
):
mkt: MktPair
pair: Pair
mkt, pair = await get_mkt_info(sym)
# build out init msgs according to latest spec
init_msgs.append(
FeedInit(
mkt_info=mkt,
)
)
# build `cryptofeed` feed-handle
cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym)
init_msgs = {
# pass back token, and bool, signalling if we're the writer
# and that history has been written
sym: {
'symbol_info': {
'asset_type': 'option',
'price_tick_size': 0.0005
},
'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
},
}
from_cf: tractor.to_asyncio.LinkedTaskChannel
async with maybe_open_price_feed(sym) as from_cf:
nsym = piker_sym_to_cb_sym(sym)
# load the "last trades" summary
last_trades_res: cryptofeed.LastTradesResult = await client.last_trades(
cb_sym_to_deribit_inst(cf_sym),
count=1,
)
last_trades: list[Trade] = last_trades_res.trades
async with maybe_open_price_feed(sym) as stream:
# TODO, do we even need this or will the above always
# work?
# if not last_trades:
# await tractor.pause()
# async for typ, quote in from_cf:
# if typ == 'trade':
# last_trade = Trade(**(quote['data']))
# break
cache = await client.cache_symbols()
# else:
last_trade = Trade(
**(last_trades[0])
)
last_trades = (await client.last_trades(
cb_sym_to_deribit_inst(nsym), count=1)).trades
first_quote: dict = {
if len(last_trades) == 0:
last_trade = None
async for typ, quote in stream:
if typ == 'trade':
last_trade = Trade(**(quote['data']))
break
else:
last_trade = Trade(**(last_trades[0]))
first_quote = {
'symbol': sym,
'last': last_trade.price,
'brokerd_ts': last_trade.timestamp,
@ -304,84 +158,13 @@ async def stream_quotes(
'broker_ts': last_trade.timestamp
}]
}
task_status.started((
init_msgs,
first_quote,
))
task_status.started((init_msgs, first_quote))
feed_is_live.set()
# NOTE XXX, static for now!
# => since this only handles ONE mkt feed at a time we
# don't need a lookup table to map interleaved quotes
# from multiple possible mkt-pairs
topic: str = mkt.bs_fqme
# deliver until cancelled
async for typ, ref in from_cf:
match typ:
case 'trade':
trade: cryptofeed.types.Trade = ref
# TODO, re-impl this according to teh ideal
# fqme for opts that we choose!!
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(trade.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'last': trade.price,
'broker_ts': time.time(),
# ^TODO, name this `brokerd/datad_ts` and
# use `time.time_ns()` ??
'ticks': [{
'type': 'trade',
'price': float(trade.price),
'size': float(trade.amount),
'broker_ts': trade.timestamp,
}],
}
log.info(
f'deribit {typ!r} quote for {sym!r}\n\n'
f'{trade}\n\n'
f'{pfmt(piker_quote)}\n'
)
case 'l1':
book: cryptofeed.types.L1Book = ref
# TODO, so this is where we can possibly change things
# and instead lever the `MktPair.bs_fqme: str` output?
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(book.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'ticks': [
{'type': 'bid',
'price': float(book.bid_price),
'size': float(book.bid_size)},
{'type': 'bsize',
'price': float(book.bid_price),
'size': float(book.bid_size),},
{'type': 'ask',
'price': float(book.ask_price),
'size': float(book.ask_size),},
{'type': 'asize',
'price': float(book.ask_price),
'size': float(book.ask_size),}
]
}
await send_chan.send({
topic: piker_quote,
})
async for typ, quote in stream:
topic = quote['symbol']
await send_chan.send({topic: quote})
@tractor.context
@ -391,21 +174,12 @@ async def open_symbol_search(
async with open_cached_client('deribit') as client:
# load all symbols locally for fast search
# cache = client._pairs
cache = await client.cache_symbols()
await ctx.started()
async with ctx.open_stream() as stream:
pattern: str
async for pattern in stream:
# NOTE: pattern fuzzy-matching is done within
# the methd impl.
pairs: dict[str, Pair] = await client.search_symbols(
pattern,
)
# repack in fqme-keyed table
byfqme: dict[str, Pair] = {}
for pair in pairs.values():
byfqme[pair.bs_fqme] = pair
await stream.send(byfqme)
# repack in dict form
await stream.send(
await client.search_symbols(pattern))

View File

@ -1,196 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Per market data-type definitions and schemas types.
"""
from __future__ import annotations
import pendulum
from typing import (
Literal,
Optional,
)
from decimal import Decimal
from piker.types import Struct
# API endpoint paths by venue / sub-API
_domain: str = 'deribit.com'
_url = f'https://www.{_domain}'
# WEBsocketz
_ws_url: str = f'wss://www.{_domain}/ws/api/v2'
# test nets
_testnet_ws_url: str = f'wss://test.{_domain}/ws/api/v2'
MarketType = Literal[
'option'
]
def get_api_eps(venue: MarketType) -> tuple[str, str]:
'''
Return API ep root paths per venue.
'''
return {
'option': (
_ws_url,
),
}[venue]
class Pair(Struct, frozen=True, kw_only=True):
symbol: str
# src
quote_currency: str # 'BTC'
# dst
base_currency: str # "BTC",
tick_size: float # 0.0001 # [{'above_price': 0.005, 'tick_size': 0.0005}]
tick_size_steps: list[dict[str, float]]
@property
def price_tick(self) -> Decimal:
return Decimal(str(self.tick_size_steps[0]['above_price']))
@property
def size_tick(self) -> Decimal:
return Decimal(str(self.tick_size))
@property
def bs_fqme(self) -> str:
return f'{self.symbol}'
@property
def bs_mktid(self) -> str:
return f'{self.symbol}.{self.venue}'
class OptionPair(Pair, frozen=True):
taker_commission: float # 0.0003
strike: float # 5000.0
settlement_period: str # 'day'
settlement_currency: str # "BTC",
rfq: bool # false
price_index: str # 'btc_usd'
option_type: str # 'call'
min_trade_amount: float # 0.1
maker_commission: float # 0.0003
kind: str # 'option'
is_active: bool # true
instrument_type: str # 'reversed'
instrument_name: str # 'BTC-1SEP24-55000-C'
instrument_id: int # 364671
expiration_timestamp: int # 1725177600000
creation_timestamp: int # 1724918461000
counter_currency: str # 'USD'
contract_size: float # '1.0'
block_trade_tick_size: float # '0.0001'
block_trade_min_trade_amount: int # '25'
block_trade_commission: float # '0.003'
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.deribit:OptionPair'
# TODO, impl this without the MM:SS part of
# the `'THH:MM:SS..'` etc..
@property
def expiry(self) -> str:
iso_date = pendulum.from_timestamp(
self.expiration_timestamp / 1000
).isoformat()
return iso_date
@property
def venue(self) -> str:
return f'{self.instrument_type}_option'
@property
def bs_fqme(self) -> str:
return f'{self.symbol}'
@property
def bs_src_asset(self) -> str:
return f'{self.quote_currency}'
@property
def bs_dst_asset(self) -> str:
return f'{self.symbol}'
PAIRTYPES: dict[MarketType, Pair] = {
'option': OptionPair,
}
class JSONRPCResult(Struct):
id: int
usIn: int
usOut: int
usDiff: int
testnet: bool
jsonrpc: str = '2.0'
error: Optional[dict] = None
result: Optional[list[dict]] = None
class JSONRPCChannel(Struct):
method: str
params: dict
jsonrpc: str = '2.0'
class KLinesResult(Struct):
low: list[float]
cost: list[float]
high: list[float]
open: list[float]
close: list[float]
ticks: list[int]
status: str
volume: list[float]
class Trade(Struct):
iv: float
price: float
amount: float
trade_id: str
contracts: float
direction: str
trade_seq: int
timestamp: int
mark_price: float
index_price: float
tick_direction: int
instrument_name: str
combo_id: Optional[str] = '',
combo_trade_id: Optional[int] = 0,
block_trade_id: Optional[str] = '',
block_trade_leg_count: Optional[int] = 0,
class LastTradesResult(Struct):
trades: list[Trade]
has_more: bool

View File

@ -34,7 +34,6 @@ from piker.brokers._util import get_logger
if TYPE_CHECKING:
from .api import Client
from ib_insync import IB
import i3ipc
log = get_logger('piker.brokers.ib')
@ -49,37 +48,6 @@ _reset_tech: Literal[
] = 'vnc'
no_setup_msg:str = (
'No data reset hack test setup for {vnc_sockaddr}!\n'
'See config setup tips @\n'
'https://github.com/pikers/piker/tree/master/piker/brokers/ib'
)
def try_xdo_manual(
vnc_sockaddr: str,
):
'''
Do the "manual" `xdo`-based screen switch + click
combo since apparently the `asyncvnc` client ain't workin..
Note this is only meant as a backup method for Xorg users,
ideally you can use a real vnc client and the `vnc_click_hack()`
impl!
'''
global _reset_tech
try:
i3ipc_xdotool_manual_click_hack()
_reset_tech = 'i3ipc_xdotool'
return True
except OSError:
log.exception(
no_setup_msg.format(vnc_sockaddr)
)
return False
async def data_reset_hack(
# vnc_host: str,
client: Client,
@ -122,9 +90,15 @@ async def data_reset_hack(
vnc_port: int
vnc_sockaddr: tuple[str] | None = client.conf.get('vnc_addrs')
no_setup_msg:str = (
f'No data reset hack test setup for {vnc_sockaddr}!\n'
'See config setup tips @\n'
'https://github.com/pikers/piker/tree/master/piker/brokers/ib'
)
if not vnc_sockaddr:
log.warning(
no_setup_msg.format(vnc_sockaddr)
no_setup_msg
+
'REQUIRES A `vnc_addrs: array` ENTRY'
)
@ -145,38 +119,27 @@ async def data_reset_hack(
port=vnc_port,
)
)
except (
OSError, # no VNC server avail..
PermissionError, # asyncvnc pw fail..
):
except OSError:
if vnc_host != 'localhost':
log.warning(no_setup_msg)
return False
try:
import i3ipc # noqa (since a deps dynamic check)
except ModuleNotFoundError:
log.warning(
no_setup_msg.format(vnc_sockaddr)
)
log.warning(no_setup_msg)
return False
if vnc_host not in {
'localhost',
'127.0.0.1',
}:
focussed, matches = i3ipc_fin_wins_titled()
if not matches:
log.warning(
no_setup_msg.format(vnc_sockaddr)
)
return False
else:
try_xdo_manual(vnc_sockaddr)
# localhost but no vnc-client or it borked..
else:
try_xdo_manual(vnc_sockaddr)
try:
i3ipc_xdotool_manual_click_hack()
_reset_tech = 'i3ipc_xdotool'
return True
except OSError:
log.exception(no_setup_msg)
return False
case 'i3ipc_xdotool':
try_xdo_manual(vnc_sockaddr)
# i3ipc_xdotool_manual_click_hack()
i3ipc_xdotool_manual_click_hack()
case _ as tech:
raise RuntimeError(f'{tech} is not supported for reset tech!?')
@ -215,9 +178,9 @@ async def vnc_click_hack(
host,
port=port,
# TODO: doesn't work?
# see, https://github.com/barneygale/asyncvnc/issues/7
password='doggy',
# TODO: doesn't work see:
# https://github.com/barneygale/asyncvnc/issues/7
# password='ibcansmbz',
) as client:
@ -231,103 +194,70 @@ async def vnc_click_hack(
client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked
def i3ipc_fin_wins_titled(
titles: list[str] = [
'Interactive Brokers', # tws running in i3
'IB Gateway', # gw running in i3
# 'IB', # gw running in i3 (newer version?)
# !TODO, remote vnc instance
# -[ ] something in title (or other Con-props) that indicates
# this is explicitly for ibrk sw?
# |_[ ] !can use modden spawn eventually!
'TigerVNC',
# 'vncviewer', # the terminal..
],
) -> tuple[
i3ipc.Con, # orig focussed win
list[tuple[str, i3ipc.Con]], # matching wins by title
]:
'''
Attempt to find a local-DE window titled with an entry in
`titles`.
If found deliver the current focussed window and all matching
`i3ipc.Con`s in a list.
'''
import i3ipc
ipc = i3ipc.Connection()
# TODO: might be worth offering some kinda api for grabbing
# the window id from the pid?
# https://stackoverflow.com/a/2250879
tree = ipc.get_tree()
focussed: i3ipc.Con = tree.find_focused()
matches: list[i3ipc.Con] = []
for name in titles:
results = tree.find_titled(name)
print(f'results for {name}: {results}')
if results:
con = results[0]
matches.append((
name,
con,
))
return (
focussed,
matches,
)
def i3ipc_xdotool_manual_click_hack() -> None:
'''
Do the data reset hack but expecting a local X-window using `xdotool`.
'''
focussed, matches = i3ipc_fin_wins_titled()
orig_win_id = focussed.window
import i3ipc
i3 = i3ipc.Connection()
# TODO: might be worth offering some kinda api for grabbing
# the window id from the pid?
# https://stackoverflow.com/a/2250879
t = i3.get_tree()
orig_win_id = t.find_focused().window
# for tws
win_names: list[str] = [
'Interactive Brokers', # tws running in i3
'IB Gateway', # gw running in i3
# 'IB', # gw running in i3 (newer version?)
]
try:
for name, con in matches:
print(f'Resetting data feed for {name}')
win_id = str(con.window)
w, h = con.rect.width, con.rect.height
for name in win_names:
results = t.find_titled(name)
print(f'results for {name}: {results}')
if results:
con = results[0]
print(f'Resetting data feed for {name}')
win_id = str(con.window)
w, h = con.rect.width, con.rect.height
# TODO: seems to be a few libs for python but not sure
# if they support all the sub commands we need, order of
# most recent commit history:
# https://github.com/rr-/pyxdotool
# https://github.com/ShaneHutter/pyxdotool
# https://github.com/cphyc/pyxdotool
# TODO: seems to be a few libs for python but not sure
# if they support all the sub commands we need, order of
# most recent commit history:
# https://github.com/rr-/pyxdotool
# https://github.com/ShaneHutter/pyxdotool
# https://github.com/cphyc/pyxdotool
# TODO: only run the reconnect (2nd) kc on a detected
# disconnect?
for key_combo, timeout in [
# only required if we need a connection reset.
# ('ctrl+alt+r', 12),
# data feed reset.
('ctrl+alt+f', 6)
]:
subprocess.call([
'xdotool',
'windowactivate', '--sync', win_id,
# TODO: only run the reconnect (2nd) kc on a detected
# disconnect?
for key_combo, timeout in [
# only required if we need a connection reset.
# ('ctrl+alt+r', 12),
# data feed reset.
('ctrl+alt+f', 6)
]:
subprocess.call([
'xdotool',
'windowactivate', '--sync', win_id,
# move mouse to bottom left of window (where
# there should be nothing to click).
'mousemove_relative', '--sync', str(w-4), str(h-4),
# move mouse to bottom left of window (where
# there should be nothing to click).
'mousemove_relative', '--sync', str(w-4), str(h-4),
# NOTE: we may need to stick a `--retry 3` in here..
'click', '--window', win_id,
'--repeat', '3', '1',
# NOTE: we may need to stick a `--retry 3` in here..
'click', '--window', win_id,
'--repeat', '3', '1',
# hackzorzes
'key', key_combo,
],
timeout=timeout,
)
# hackzorzes
'key', key_combo,
],
timeout=timeout,
)
# re-activate and focus original window
subprocess.call([

View File

@ -48,7 +48,6 @@ from bidict import bidict
import trio
import tractor
from tractor import to_asyncio
from tractor import trionics
from pendulum import (
from_timestamp,
DateTime,
@ -97,10 +96,6 @@ from ._util import (
get_logger,
)
# ?TODO? this can now be removed since it was originally to extend
# with a `bar_vwap` field that we removed from the default ohlcv
# dtype since it's better calculated in an FSP func
#
_bar_load_dtype: list[tuple[str, type]] = [
# NOTE XXX: only part that's diff
# from our default fields where
@ -1374,8 +1369,8 @@ async def load_clients_for_trio(
'''
Pure async mngr proxy to ``load_aio_clients()``.
This is a bootstrap entrypoint to call from
a `tractor.to_asyncio.open_channel_from()`.
This is a bootstrap entrypoing to call from
a ``tractor.to_asyncio.open_channel_from()``.
'''
async with load_aio_clients(
@ -1396,10 +1391,7 @@ async def open_client_proxies() -> tuple[
async with (
tractor.trionics.maybe_open_context(
acm_func=tractor.to_asyncio.open_channel_from,
kwargs={
'target': load_clients_for_trio,
# ^XXX, kwarg to `open_channel_from()`
},
kwargs={'target': load_clients_for_trio},
# lock around current actor task access
# TODO: maybe this should be the default in tractor?
@ -1592,8 +1584,7 @@ async def open_client_proxy(
event_consumers=event_table,
) as (first, chan),
trionics.collapse_eg(), # loose-ify
trio.open_nursery() as relay_tn,
trio.open_nursery() as relay_n,
):
assert isinstance(first, Client)
@ -1633,7 +1624,7 @@ async def open_client_proxy(
continue
relay_tn.start_soon(relay_events)
relay_n.start_soon(relay_events)
yield proxy

View File

@ -34,7 +34,6 @@ import trio
from trio_typing import TaskStatus
import tractor
from tractor.to_asyncio import LinkedTaskChannel
from tractor import trionics
from ib_insync.contract import (
Contract,
)
@ -408,7 +407,7 @@ async def update_and_audit_pos_msg(
# TODO: make this a "propaganda" log level?
if ibpos.avgCost != msg.avg_price:
log.debug(
log.warning(
f'IB "FIFO" avg price for {msg.symbol} is DIFF:\n'
f'ib: {ibfmtmsg}\n'
'---------------------------\n'
@ -739,7 +738,7 @@ async def open_trade_dialog(
f'UNEXPECTED POSITION says IB => {msg.symbol}\n'
'Maybe they LIQUIDATED YOU or your ledger is wrong?\n'
)
log.debug(logmsg)
log.error(logmsg)
await ctx.started((
all_positions,
@ -748,22 +747,21 @@ async def open_trade_dialog(
async with (
ctx.open_stream() as ems_stream,
trionics.collapse_eg(),
trio.open_nursery() as tn,
trio.open_nursery() as n,
):
# relay existing open orders to ems
for msg in order_msgs:
await ems_stream.send(msg)
for client in set(aioclients.values()):
trade_event_stream: LinkedTaskChannel = await tn.start(
trade_event_stream: LinkedTaskChannel = await n.start(
open_trade_event_stream,
client,
)
# start order request handler **before** local trades
# event loop
tn.start_soon(
n.start_soon(
handle_order_requests,
ems_stream,
accounts_def,
@ -771,7 +769,7 @@ async def open_trade_dialog(
)
# allocate event relay tasks for each client connection
tn.start_soon(
n.start_soon(
deliver_trade_events,
trade_event_stream,
@ -1243,47 +1241,32 @@ async def deliver_trade_events(
# never relay errors for non-broker related issues
# https://interactivebrokers.github.io/tws-api/message_codes.html
code: int = err['error_code']
reason: str = err['reason']
reqid: str = str(err['reqid'])
# "Warning:" msg codes,
# https://interactivebrokers.github.io/tws-api/message_codes.html#warning_codes
# - 2109: 'Outside Regular Trading Hours'
if 'Warning:' in reason:
log.warning(
f'Order-API-warning: {code!r}\n'
f'reqid: {reqid!r}\n'
f'\n'
f'{pformat(err)}\n'
# ^TODO? should we just print the `reason`
# not the full `err`-dict?
)
continue
# XXX known special (ignore) cases
elif code in {
200, # uhh.. ni idea
if code in {
200, # uhh
# hist pacing / connectivity
162,
165,
# WARNING codes:
# https://interactivebrokers.github.io/tws-api/message_codes.html#warning_codes
# Attribute 'Outside Regular Trading Hours' is
# " 'ignored based on the order type and
# destination. PlaceOrder is now ' 'being
# processed.',
2109,
# XXX: lol this isn't even documented..
# 'No market data during competing live session'
1669,
}:
log.error(
f'Order-API-error which is non-cancel-causing ?!\n'
f'\n'
f'{pformat(err)}\n'
)
continue
reqid: str = str(err['reqid'])
reason: str = err['reason']
if err['reqid'] == -1:
log.error(
f'TWS external order error ??\n'
f'{pformat(err)}\n'
)
log.error(f'TWS external order error:\n{pformat(err)}')
flow: dict = dict(
flows.get(reqid)

View File

@ -34,7 +34,6 @@ import urllib.parse
import hashlib
import hmac
import base64
import tractor
import trio
from piker import config
@ -373,7 +372,8 @@ class Client:
# 1658347714, 'status': 'Success'}]}
if xfers:
await tractor.pause()
import tractor
await tractor.pp()
trans: dict[str, Transaction] = {}
for entry in xfers:
@ -501,8 +501,7 @@ class Client:
for xkey, data in resp['result'].items():
# NOTE: always cache in pairs tables for faster lookup
with tractor.devx.maybe_open_crash_handler(): # as bxerr:
pair = Pair(xname=xkey, **data)
pair = Pair(xname=xkey, **data)
# register the above `Pair` structs for all
# key-sets/monikers: a set of 4 (frickin) tables

View File

@ -175,8 +175,9 @@ async def handle_order_requests(
case {
'account': 'kraken.spot' as account,
'action': 'buy'|'sell',
}:
'action': action,
} if action in {'buy', 'sell'}:
# validate
order = BrokerdOrder(**msg)
@ -261,12 +262,6 @@ async def handle_order_requests(
} | extra
log.info(f'Submitting WS order request:\n{pformat(req)}')
# NOTE HOWTO, debug order requests
#
# if 'XRP' in pair:
# await tractor.pause()
await ws.send_msg(req)
# placehold for sanity checking in relay loop
@ -1090,8 +1085,6 @@ async def handle_order_updates(
f'Failed to {action} order {reqid}:\n'
f'{errmsg}'
)
# if tractor._state.debug_mode():
# await tractor.pause()
symbol: str = 'N/A'
if chain := apiflows.get(reqid):

View File

@ -21,6 +21,7 @@ Symbology defs and search.
from decimal import Decimal
import tractor
from rapidfuzz import process as fuzzy
from piker._cacheables import (
async_lifo_cache,
@ -40,13 +41,8 @@ from piker.accounting._mktinfo import (
)
# https://www.kraken.com/features/api#get-tradable-pairs
class Pair(Struct):
'''
A tradable asset pair as schema-defined by,
https://docs.kraken.com/api/docs/rest-api/get-tradable-asset-pairs
'''
xname: str # idiotic bs_mktid equiv i guess?
altname: str # alternate pair name
wsname: str # WebSocket pair name (if available)
@ -57,6 +53,7 @@ class Pair(Struct):
lot: str # volume lot size
cost_decimals: int
costmin: float
pair_decimals: int # scaling decimal places for pair
lot_decimals: int # scaling decimal places for volume
@ -82,7 +79,6 @@ class Pair(Struct):
tick_size: float # min price step size
status: str
costmin: str|None = None # XXX, only some mktpairs?
short_position_limit: float = 0
long_position_limit: float = float('inf')

View File

@ -25,10 +25,7 @@ from typing import TYPE_CHECKING
import trio
import tractor
from tractor.trionics import (
broadcast_receiver,
collapse_eg,
)
from tractor.trionics import broadcast_receiver
from ._util import (
log, # sub-sys logger
@ -284,11 +281,8 @@ async def open_ems(
client._ems_stream = trades_stream
# start sync code order msg delivery task
async with (
collapse_eg(),
trio.open_nursery() as tn,
):
tn.start_soon(
async with trio.open_nursery() as n:
n.start_soon(
relay_orders_from_sync_code,
client,
fqme,
@ -304,4 +298,4 @@ async def open_ems(
)
# stop the sync-msg-relay task on exit.
tn.cancel_scope.cancel()
n.cancel_scope.cancel()

View File

@ -42,7 +42,6 @@ from bidict import bidict
import trio
from trio_typing import TaskStatus
import tractor
from tractor import trionics
from ._util import (
log, # sub-sys logger
@ -77,6 +76,7 @@ if TYPE_CHECKING:
# TODO: numba all of this
def mk_check(
trigger_price: float,
known_last: float,
action: str,
@ -162,7 +162,7 @@ async def clear_dark_triggers(
router: Router,
brokerd_orders_stream: tractor.MsgStream,
quote_stream: tractor.MsgStream,
quote_stream: tractor.ReceiveMsgStream, # noqa
broker: str,
fqme: str,
@ -178,7 +178,6 @@ async def clear_dark_triggers(
'''
# XXX: optimize this for speed!
# TODO:
# - port to the new ringbuf stuff in `tractor.ipc`!
# - numba all this!
# - this stream may eventually contain multiple symbols
quote_stream._raise_on_lag = False
@ -501,7 +500,7 @@ class Router(Struct):
'''
# setup at actor spawn time
_tn: trio.Nursery
nursery: trio.Nursery
# broker to book map
books: dict[str, DarkBook] = {}
@ -667,7 +666,7 @@ class Router(Struct):
# dark book clearing loop, also lives with parent
# daemon to allow dark order clearing while no
# client is connected.
self._tn.start_soon(
self.nursery.start_soon(
clear_dark_triggers,
self,
relay.brokerd_stream,
@ -690,7 +689,7 @@ class Router(Struct):
# spawn a ``brokerd`` order control dialog stream
# that syncs lifetime with the parent `emsd` daemon.
self._tn.start_soon(
self.nursery.start_soon(
translate_and_relay_brokerd_events,
broker,
relay.brokerd_stream,
@ -764,12 +763,10 @@ async def _setup_persistent_emsd(
global _router
# open a root "service task-nursery" for the `emsd`-actor
async with (
trionics.collapse_eg(),
trio.open_nursery() as tn
):
_router = Router(_tn=tn)
# open a root "service nursery" for the ``emsd`` actor
async with trio.open_nursery() as service_nursery:
_router = Router(nursery=service_nursery)
# TODO: send back the full set of persistent
# orders/execs?
@ -1185,16 +1182,12 @@ async def process_client_order_cmds(
submitting live orders immediately if requested by the client.
'''
# TODO, only allow `msgspec.Struct` form!
cmd: dict
# cmd: dict
async for cmd in client_order_stream:
log.info(
f'Received order cmd:\n'
f'{pformat(cmd)}\n'
)
log.info(f'Received order cmd:\n{pformat(cmd)}')
# CAWT DAMN we need struct support!
oid: str = str(cmd['oid'])
oid = str(cmd['oid'])
# register this stream as an active order dialog (msg flow) for
# this order id such that translated message from the brokerd
@ -1300,7 +1293,7 @@ async def process_client_order_cmds(
case {
'oid': oid,
'symbol': fqme,
'price': price,
'price': trigger_price,
'size': size,
'action': ('buy' | 'sell') as action,
'exec_mode': ('live' | 'paper'),
@ -1332,7 +1325,7 @@ async def process_client_order_cmds(
symbol=sym,
action=action,
price=price,
price=trigger_price,
size=size,
account=req.account,
)
@ -1354,11 +1347,7 @@ async def process_client_order_cmds(
# (``translate_and_relay_brokerd_events()`` above) will
# handle relaying the ems side responses back to
# the client/cmd sender from this request
log.info(
f'Sending live order to {broker}:\n'
f'{pformat(msg)}'
)
log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
await brokerd_order_stream.send(msg)
# an immediate response should be ``BrokerdOrderAck``
@ -1374,7 +1363,7 @@ async def process_client_order_cmds(
case {
'oid': oid,
'symbol': fqme,
'price': price,
'price': trigger_price,
'size': size,
'exec_mode': exec_mode,
'action': action,
@ -1402,12 +1391,7 @@ async def process_client_order_cmds(
if isnan(last):
last = flume.rt_shm.array[-1]['close']
trigger_price: float = float(price)
pred = mk_check(
trigger_price,
last,
action,
)
pred = mk_check(trigger_price, last, action)
# NOTE: for dark orders currently we submit
# the triggered live order at a price 5 ticks
@ -1514,7 +1498,7 @@ async def maybe_open_trade_relays(
loglevel: str = 'info',
):
fqme, relay, feed, client_ready = await _router._tn.start(
fqme, relay, feed, client_ready = await _router.nursery.start(
_router.open_trade_relays,
fqme,
exec_mode,
@ -1547,7 +1531,7 @@ async def _emsd_main(
ctx: tractor.Context,
fqme: str,
exec_mode: str, # ('paper', 'live')
loglevel: str|None = None,
loglevel: str | None = None,
) -> tuple[
dict[

View File

@ -19,7 +19,6 @@ Clearing sub-system message and protocols.
"""
from __future__ import annotations
from decimal import Decimal
from typing import (
Literal,
)
@ -72,15 +71,7 @@ class Order(Struct):
symbol: str # | MktPair
account: str # should we set a default as '' ?
# https://docs.python.org/3/library/decimal.html#decimal-objects
#
# ?TODO? decimal usage throughout?
# -[ ] possibly leverage the `Encoder(decimal_format='number')`
# bit?
# |_https://jcristharif.com/msgspec/supported-types.html#decimal
# -[ ] should we also use it for .size?
#
price: Decimal
price: float
size: float # -ve is "sell", +ve is "buy"
brokers: list[str] = []
@ -187,7 +178,7 @@ class BrokerdOrder(Struct):
time_ns: int
symbol: str # fqme
price: Decimal
price: float
size: float
# TODO: if we instead rely on a +ve/-ve size to determine

View File

@ -508,7 +508,7 @@ async def handle_order_requests(
reqid = await client.submit_limit(
oid=order.oid,
symbol=f'{order.symbol}.{client.broker}',
price=float(order.price),
price=order.price,
action=order.action,
size=order.size,
# XXX: by default 0 tells ``ib_insync`` methods that

View File

@ -184,12 +184,33 @@ def pikerd(
registry_addrs=regaddrs,
loglevel=loglevel,
debug_mode=pdb,
enable_transports=['uds'],
) as service_mngr,
) as service_mngr, # normally delivers a ``Services`` handle
# AsyncExitStack() as stack,
):
assert service_mngr
# ?TODO? spawn all other sub-actor daemons according to
# TODO: spawn all other sub-actor daemons according to
# multiaddress endpoint spec defined by user config
assert service_mngr
# if tsdb:
# dname, conf = await stack.enter_async_context(
# service.marketstore.start_ahab_daemon(
# service_mngr,
# loglevel=loglevel,
# )
# )
# log.info(f'TSDB `{dname}` up with conf:\n{conf}')
# if es:
# dname, conf = await stack.enter_async_context(
# service.elastic.start_ahab_daemon(
# service_mngr,
# loglevel=loglevel,
# )
# )
# log.info(f'DB `{dname}` up with conf:\n{conf}')
await trio.sleep_forever()
trio.run(main)
@ -314,7 +335,7 @@ def services(config, tl, ports):
name='service_query',
loglevel=config['loglevel'] if tl else None,
),
tractor.get_registry(
tractor.get_arbiter(
host=host,
port=ports[0]
) as portal

View File

@ -284,8 +284,7 @@ class Sampler:
except (
trio.BrokenResourceError,
trio.ClosedResourceError,
trio.EndOfChannel,
trio.ClosedResourceError
):
log.error(
f'{stream._ctx.chan.uid} dropped connection'
@ -698,7 +697,7 @@ async def sample_and_broadcast(
log.warning(
f'Feed OVERRUN {sub_key}'
f'@{bus.brokername} -> \n'
'@{bus.brokername} -> \n'
f'feed @ {chan.uid}\n'
f'throttle = {throttle} Hz'
)
@ -877,7 +876,6 @@ async def uniform_rate_send(
except tractor.RemoteActorError as rme:
if rme.type is not tractor._exceptions.StreamOverrun:
raise
ctx = stream._ctx
chan = ctx.chan
log.warning(
@ -894,7 +892,6 @@ async def uniform_rate_send(
trio.ClosedResourceError,
trio.BrokenResourceError,
ConnectionResetError,
trio.EndOfChannel,
):
# if the feed consumer goes down then drop
# out of this rate limiter

View File

@ -39,7 +39,6 @@ from typing import (
AsyncContextManager,
Awaitable,
Sequence,
TYPE_CHECKING,
)
import trio
@ -76,10 +75,6 @@ from ._sampling import (
uniform_rate_send,
)
if TYPE_CHECKING:
from tractor._addr import Address
from tractor.msg.types import Aid
class Sub(Struct, frozen=True):
'''
@ -728,10 +723,7 @@ class Feed(Struct):
async for msg in stream:
await tx.send(msg)
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as nurse
):
async with trio.open_nursery() as nurse:
# spawn a relay task for each stream so that they all
# multiplex to a common channel.
for brokername in mods:
@ -907,19 +899,19 @@ async def open_feed(
feed.portals[brokermod] = portal
# fill out "status info" that the UI can show
chan: tractor.Channel = portal.chan
raddr: Address = chan.raddr
aid: Aid = chan.aid
# TAG_feed_status_update
host, port = portal.channel.raddr
if host == '127.0.0.1':
host = 'localhost'
feed.status.update({
'actor_id': aid,
'actor_short_id': f'{aid.name}@{aid.pid}',
'ipc': chan.raddr.proto_key,
'ipc_addr': raddr,
'actor_name': portal.channel.uid[0],
'host': host,
'port': port,
'hist_shm': 'NA',
'rt_shm': 'NA',
'throttle_hz': tick_throttle,
'throttle_rate': tick_throttle,
})
# feed.status.update(init_msg.pop('status', {}))
# (allocate and) connect to any feed bus for this broker
bus_ctxs.append(

View File

@ -498,7 +498,6 @@ async def cascade(
func_name: str = func.__name__
async with (
tractor.trionics.collapse_eg(), # avoid multi-taskc tb in console
trio.open_nursery() as tn,
):
# TODO: might be better to just make a "restart" method where

View File

@ -18,11 +18,7 @@
Log like a forester!
"""
import logging
import reprlib
import json
from typing import (
Callable,
)
import tractor
from pygments import (
@ -88,27 +84,3 @@ def colorize_json(
# likeable styles: algol_nu, tango, monokai
formatters.TerminalTrueColorFormatter(style=style)
)
def mk_repr(
**repr_kws,
) -> Callable[[str], str]:
'''
Allocate and deliver a `repr.Repr` instance with provided input
settings using the std-lib's `reprlib` mod,
* https://docs.python.org/3/library/reprlib.html
------ Ex. ------
An up to 6-layer-nested `dict` as multi-line:
- https://stackoverflow.com/a/79102479
- https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
'''
def_kws: dict[str, int] = dict(
indent=2,
maxlevel=6, # recursion levels
maxstring=66, # match editor line-len limit
)
def_kws |= repr_kws
reprr = reprlib.Repr(**def_kws)
return reprr.repr

View File

@ -200,8 +200,7 @@ async def open_pikerd(
reg_addrs,
),
tractor.open_nursery() as actor_nursery,
tractor.trionics.collapse_eg(),
trio.open_nursery() as service_tn,
trio.open_nursery() as service_nursery,
):
for addr in reg_addrs:
if addr not in root_actor.accept_addrs:
@ -212,7 +211,7 @@ async def open_pikerd(
# assign globally for future daemon/task creation
Services.actor_n = actor_nursery
Services.service_n = service_tn
Services.service_n = service_nursery
Services.debug_mode = debug_mode
try:
@ -222,7 +221,7 @@ async def open_pikerd(
# TODO: is this more clever/efficient?
# if 'samplerd' in Services.service_tasks:
# await Services.cancel_service('samplerd')
service_tn.cancel_scope.cancel()
service_nursery.cancel_scope.cancel()
# TODO: do we even need this?

View File

@ -138,16 +138,6 @@ class StorageClient(
) -> None:
...
async def write_oi(
self,
fqme: str,
oi: np.ndarray,
append_and_duplicate: bool = True,
limit: int = int(800e3),
) -> None:
...
class TimeseriesNotFound(Exception):
'''

View File

@ -111,24 +111,6 @@ def mk_ohlcv_shm_keyed_filepath(
return path
def mk_oi_shm_keyed_filepath(
fqme: str,
period: float | int,
datadir: Path,
) -> Path:
if period < 1.:
raise ValueError('Sample period should be >= 1.!?')
path: Path = (
datadir
/
f'{fqme}.oi{int(period)}s.parquet'
)
return path
def unpack_fqme_from_parquet_filepath(path: Path) -> str:
filename: str = str(path.name)
@ -190,11 +172,7 @@ class NativeStorageClient:
key: str = path.name.rstrip('.parquet')
fqme, _, descr = key.rpartition('.')
if 'ohlcv' in descr:
prefix, _, suffix = descr.partition('ohlcv')
elif 'oi' in descr:
prefix, _, suffix = descr.partition('oi')
prefix, _, suffix = descr.partition('ohlcv')
period: int = int(suffix.strip('s'))
# cache description data
@ -391,61 +369,6 @@ class NativeStorageClient:
timeframe,
)
def _write_oi(
self,
fqme: str,
oi: np.ndarray,
) -> Path:
'''
Sync version of the public interface meth, since we don't
currently actually need or support an async impl.
'''
path: Path = mk_oi_shm_keyed_filepath(
fqme=fqme,
period=1,
datadir=self._datadir,
)
if isinstance(oi, np.ndarray):
new_df: pl.DataFrame = tsp.np2pl(oi)
else:
new_df = oi
if path.exists():
old_df = pl.read_parquet(path)
df = pl.concat([old_df, new_df])
else:
df = new_df
start = time.time()
df.write_parquet(path)
delay: float = round(
time.time() - start,
ndigits=6,
)
log.info(
f'parquet write took {delay} secs\n'
f'file path: {path}'
)
return path
async def write_oi(
self,
fqme: str,
oi: np.ndarray,
) -> Path:
'''
Write input oi time series for fqme and sampling period
to (local) disk.
'''
return self._write_oi(
fqme,
oi,
)
async def delete_ts(
self,
key: str,

View File

@ -963,10 +963,7 @@ async def tsdb_backfill(
# concurrently load the provider's most-recent-frame AND any
# pre-existing tsdb history already saved in `piker` storage.
dt_eps: list[DateTime, DateTime] = []
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
async with trio.open_nursery() as tn:
tn.start_soon(
push_latest_frame,
dt_eps,
@ -1015,16 +1012,9 @@ async def tsdb_backfill(
int,
Duration,
]|None = config.get('frame_types', None)
if def_frame_durs:
def_frame_size: Duration = def_frame_durs[timeframe]
if def_frame_size != calced_frame_size:
log.warning(
f'Expected frame size {def_frame_size}\n'
f'Rxed frame {calced_frame_size}\n'
)
# await tractor.pause()
assert def_frame_size == calced_frame_size
else:
# use what we calced from first frame above.
def_frame_size = calced_frame_size
@ -1053,9 +1043,7 @@ async def tsdb_backfill(
# if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb
# continue that now in the background
async with trio.open_nursery(
strict_exception_groups=False,
) as tn:
async with trio.open_nursery() as tn:
bf_done = await tn.start(
partial(
@ -1320,7 +1308,6 @@ async def manage_history(
# sampling period) data set since normally differently
# sampled timeseries can be loaded / process independently
# ;)
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
):
log.info(

View File

@ -517,7 +517,7 @@ def with_dts(
'''
return df.with_columns([
pl.col(time_col).shift(1).name.suffix('_prev'),
pl.col(time_col).shift(1).suffix('_prev'),
pl.col(time_col).diff().alias('s_diff'),
pl.from_epoch(pl.col(time_col)).alias('dt'),
]).with_columns([
@ -623,7 +623,7 @@ def detect_vlm_gaps(
) -> pl.DataFrame:
vnull: pl.DataFrame = df.filter(
vnull: pl.DataFrame = w_dts.filter(
pl.col(col) == 0
)
return vnull

View File

@ -21,7 +21,6 @@ Main app startup and run.
from functools import partial
from types import ModuleType
import tractor
import trio
from piker.ui.qt import (
@ -117,7 +116,6 @@ async def _async_main(
needed_brokermods[brokername] = brokers[brokername]
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as root_n,
):
# set root nursery and task stack for spawning other charts/feeds

View File

@ -33,6 +33,7 @@ import trio
from piker.ui.qt import (
QtCore,
QtWidgets,
Qt,
QLineF,
QFrame,

View File

@ -1445,10 +1445,7 @@ async def display_symbol_data(
# for pause/resume on mouse interaction
rt_chart.feed = feed
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as ln,
):
async with trio.open_nursery() as ln:
# if available load volume related built-in display(s)
vlm_charts: dict[
str,

View File

@ -22,10 +22,7 @@ from contextlib import asynccontextmanager as acm
from typing import Callable
import trio
from tractor.trionics import (
gather_contexts,
collapse_eg,
)
from tractor.trionics import gather_contexts
from piker.ui.qt import (
QtCore,
@ -210,10 +207,7 @@ async def open_signal_handler(
async for args in recv:
await async_handler(*args)
async with (
collapse_eg(),
trio.open_nursery() as tn
):
async with trio.open_nursery() as tn:
tn.start_soon(proxy_to_handler)
async with send:
yield
@ -248,7 +242,6 @@ async def open_handlers(
widget: QWidget
streams: list[trio.abc.ReceiveChannel]
async with (
collapse_eg(),
trio.open_nursery() as tn,
gather_contexts([
open_event_stream(

View File

@ -18,11 +18,10 @@
Feed status and controls widget(s) for embedding in a UI-pane.
"""
from __future__ import annotations
from typing import (
Any,
TYPE_CHECKING,
)
from textwrap import dedent
from typing import TYPE_CHECKING
# from PyQt5.QtCore import Qt
@ -50,55 +49,35 @@ def mk_feed_label(
a feed control protocol.
'''
status: dict[str, Any] = feed.status
status = feed.status
assert status
# SO tips on ws/nls,
# https://stackoverflow.com/a/15721400
ws: str = '&nbsp;'
# nl: str = '<br>' # dun work?
actor_info_repr: str = (
f')> **{status["actor_short_id"]}**\n'
'\n' # bc md?
)
msg = dedent("""
actor: **{actor_name}**\n
|_ @**{host}:{port}**\n
""")
# fields to select *IN* for display
# (see `.data.feed.open_feed()` status
# update -> TAG_feed_status_update)
for key in [
'ipc',
'hist_shm',
'rt_shm',
'throttle_hz',
]:
# NOTE, the 2nd key is filled via `.format()` updates.
actor_info_repr += (
f'\n' # bc md?
f'{ws}|_{key}: **{{{key}}}**\n'
)
# ^TODO? formatting and content..
# -[ ] showing which fqme is "forward" on the
# chart/fsp/order-mode?
# '|_ flows: **{symbols}**\n'
#
# -[x] why isn't the indent working?
# => markdown, now solved..
for key, val in status.items():
if key in ('host', 'port', 'actor_name'):
continue
msg += f'\n|_ {key}: **{{{key}}}**\n'
feed_label = FormatLabel(
fmt_str=actor_info_repr,
fmt_str=msg,
# |_ streams: **{symbols}**\n
font=_font.font,
font_size=_font_small.px_size,
font_color='default_lightest',
)
# ?TODO, remove this?
# form.vbox.setAlignment(feed_label, Qt.AlignBottom)
# form.vbox.setAlignment(Qt.AlignBottom)
# _ = chart.height() - (
# form.height() +
# form.fill_bar.height()
# # feed_label.height()
# )
_ = chart.height() - (
form.height() +
form.fill_bar.height()
# feed_label.height()
)
feed_label.format(**feed.status)
return feed_label

View File

@ -600,7 +600,6 @@ async def open_fsp_admin(
kwargs=kwargs,
) as (cache_hit, cluster_map),
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
):
if cache_hit:
@ -614,8 +613,6 @@ async def open_fsp_admin(
)
try:
yield admin
# ??TODO, does this *need* to be inside a finally?
finally:
# terminate all tasks via signals
for key, entry in admin._registry.items():

View File

@ -285,20 +285,18 @@ class FormatLabel(QLabel):
font_size: int,
font_color: str,
use_md: bool = True,
parent=None,
) -> None:
super().__init__(parent)
# by default set the format string verbatim and expect user
# to call ``.format()`` later (presumably they'll notice the
# by default set the format string verbatim and expect user to
# call ``.format()`` later (presumably they'll notice the
# unformatted content if ``fmt_str`` isn't meant to be
# unformatted).
self.fmt_str = fmt_str
# self.setText(fmt_str) # ?TODO, why here?
self.setText(fmt_str)
self.setStyleSheet(
f"""QLabel {{
@ -308,10 +306,9 @@ class FormatLabel(QLabel):
"""
)
self.setFont(_font.font)
if use_md:
self.setTextFormat(
Qt.TextFormat.MarkdownText
)
self.setTextFormat(
Qt.TextFormat.MarkdownText
)
self.setMargin(0)
self.setSizePolicy(
@ -319,10 +316,7 @@ class FormatLabel(QLabel):
size_policy.Expanding,
)
self.setAlignment(
Qt.AlignLeft
|
Qt.AlignBottom
# Qt.AlignVCenter
Qt.AlignVCenter | Qt.AlignLeft
)
self.setText(self.fmt_str)

View File

@ -269,8 +269,6 @@ def hcolor(name: str) -> str:
# default ohlc-bars/curve gray
'bracket': '#666666', # like the logo
'pikers': '#616161', # a trader shade of..
'beast': '#161616', # in the dark alone.
# bluish
'charcoal': '#36454F',

View File

@ -21,7 +21,6 @@ Chart trading, the only way to scalp.
from __future__ import annotations
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from decimal import Decimal
from functools import partial
from pprint import pformat
import time
@ -42,6 +41,7 @@ from piker.accounting import (
Position,
mk_allocator,
MktPair,
Symbol,
)
from piker.clearing import (
open_ems,
@ -143,15 +143,6 @@ class OrderMode:
}
_staged_order: Order | None = None
@property
def curr_mkt(self) -> MktPair:
'''
Deliver the currently selected `MktPair` according
chart state.
'''
return self.chart.linked.mkt
def on_level_change_update_next_order_info(
self,
level: float,
@ -181,11 +172,7 @@ class OrderMode:
line.update_labels(order_info)
# update bound-in staged order
mkt: MktPair = self.curr_mkt
order.price: Decimal = mkt.quantize(
size=level,
quantity_type='price',
)
order.price = level
order.size = order_info['size']
# when an order is changed we flip the settings side-pane to
@ -200,9 +187,7 @@ class OrderMode:
) -> LevelLine:
# TODO, if we instead just always decimalize at the ems layer
# we can avoid this back-n-forth casting?
level = float(order.price)
level = order.price
line = order_line(
chart or self.chart,
@ -239,11 +224,7 @@ class OrderMode:
# the order mode allocator but we still need to update the
# "staged" order message we'll send to the ems
def update_order_price(y: float) -> None:
mkt: MktPair = self.curr_mkt
order.price: Decimal = mkt.quantize(
size=y,
quantity_type='price',
)
order.price = y
line._on_level_change = update_order_price
@ -294,31 +275,34 @@ class OrderMode:
chart = cursor.linked.chart
if (
not chart
and
cursor
and
cursor.active_plot
and cursor
and cursor.active_plot
):
return
chart = cursor.active_plot
price: float = cursor._datum_xy[1]
price = cursor._datum_xy[1]
if not price:
# zero prices are not supported by any means
# since that's illogical / a no-op.
return
mkt: MktPair = self.chart.linked.mkt
# NOTE : we could also use instead,
# mkt.quantize(price, quantity_type='price')
# but it returns a Decimal and it's probably gonna
# be slower?
# TODO: should we be enforcing this precision
# at a different layer in the stack?
# |_ might require `MktPair` tracking in the EMS?
# |_ right now any precision error will be relayed
# all the way back from the backend and vice-versa..
#
mkt: MktPair = self.curr_mkt
price: Decimal = mkt.quantize(
size=price,
quantity_type='price',
# at a different layer in the stack? right now
# any precision error will literally be relayed
# all the way back from the backend.
price = round(
price,
ndigits=mkt.price_tick_digits,
)
order = self._staged_order = Order(
action=action,
price=price,
@ -394,7 +378,7 @@ class OrderMode:
'oid': oid,
})
if float(order.price) <= 0:
if order.price <= 0:
log.error(
'*!? Invalid `Order.price <= 0` ?!*\n'
# TODO: make this present multi-line in object form
@ -531,15 +515,14 @@ class OrderMode:
# if an order msg is provided update the line
# **from** that msg.
if order:
price: float = float(order.price)
if price <= 0:
if order.price <= 0:
log.error(f'Order has 0 price, cancelling..\n{order}')
self.cancel_orders([order.oid])
return None
line.set_level(price)
line.set_level(order.price)
self.on_level_change_update_next_order_info(
level=price,
level=order.price,
line=line,
order=order,
# use the corresponding position tracker for the
@ -698,9 +681,9 @@ class OrderMode:
) -> Dialog | None:
# NOTE: the `.order` attr **must** be set with the
# equivalent order msg in order to be loaded.
order: Order = msg.req
order = msg.req
oid = str(msg.oid)
symbol: str = order.symbol
symbol = order.symbol
# TODO: MEGA UGGG ZONEEEE!
src = msg.src
@ -719,22 +702,13 @@ class OrderMode:
order.oid = str(order.oid)
order.brokers = [brokername]
# ?TODO? change this over to `MktPair`, but it's gonna be
# tough since we don't have any such data really in our
# clearing msg schema..
# BUT WAIT! WHY do we even want/need this!?
#
# order.symbol = self.curr_mkt
#
# XXX, the old approach.. which i don't quire member why..
# -[ ] verify we for sure don't require this any more!
# |_https://github.com/pikers/piker/issues/517
#
# order.symbol = Symbol.from_fqme(
# fqsn=fqme,
# info={},
# )
# TODO: change this over to `MktPair`, but it's
# gonna be tough since we don't have any such data
# really in our clearing msg schema..
order.symbol = Symbol.from_fqme(
fqsn=fqme,
info={},
)
maybe_dialog: Dialog | None = self.submit_order(
send_msg=False,
order=order,
@ -792,7 +766,6 @@ async def open_order_mode(
brokerd_accounts,
ems_dialog_msgs,
),
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
):
@ -1128,7 +1101,7 @@ async def process_trade_msg(
)
)
):
msg.req: Order = order
msg.req = order
dialog: (
Dialog
# NOTE: on an invalid order submission (eg.
@ -1193,7 +1166,7 @@ async def process_trade_msg(
tm = time.time()
mode.on_fill(
oid,
price=float(req.price),
price=req.price,
time_s=tm,
)
mode.lines.remove_line(uuid=oid)
@ -1248,7 +1221,7 @@ async def process_trade_msg(
tm = details['broker_time']
mode.on_fill(
oid,
price=float(details['price']),
price=details['price'],
time_s=tm,
pointing='up' if action == 'buy' else 'down',
)

View File

@ -23,7 +23,7 @@ name = "piker"
version = "0.1.0a0dev0"
description = "trading gear for hackers"
authors = [{ name = "Tyler Goodlet", email = "goodboy_foss@protonmail.com" }]
requires-python = ">=3.12"
requires-python = ">=3.12, <3.13"
license = "AGPL-3.0-or-later"
readme = "README.rst"
keywords = [
@ -39,8 +39,8 @@ classifiers = [
"Operating System :: POSIX :: Linux",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Intended Audience :: Financial and Insurance Industry",
"Intended Audience :: Science/Research",
"Intended Audience :: Developers",
@ -49,13 +49,13 @@ classifiers = [
dependencies = [
"async-generator >=1.10, <2.0.0",
"attrs >=23.1.0, <24.0.0",
"bidict >=0.23.1",
"bidict >=0.22.1, <0.23.0",
"colorama >=0.4.6, <0.5.0",
"colorlog >=6.7.0, <7.0.0",
"ib-insync >=0.9.86, <0.10.0",
"numpy>=2.0",
"polars >=0.20.6",
"polars-fuzzy-match>=0.1.5",
"numba >=0.59.0, <0.60.0",
"numpy >=1.25, <2.0",
"polars >=0.18.13, <0.19.0",
"pygments >=2.16.1, <3.0.0",
"rich >=13.5.2, <14.0.0",
"tomli >=2.0.1, <3.0.0",
@ -65,18 +65,16 @@ dependencies = [
"typer >=0.9.0, <1.0.0",
"rapidfuzz >=3.5.2, <4.0.0",
"pdbp >=1.5.0, <2.0.0",
"trio >=0.27",
"trio >=0.24, <0.25",
"pendulum >=3.0.0, <4.0.0",
"httpx >=0.27.0, <0.28.0",
"cryptofeed >=2.4.0, <3.0.0",
"pyarrow>=18.0.0",
"pyarrow >=17.0.0, <18.0.0",
"websockets ==12.0",
"msgspec>=0.19.0,<0.20",
"msgspec",
"tractor",
"asyncvnc",
"tomlkit",
"trio-typing>=0.10.0",
"numba>=0.61.0",
]
[project.optional-dependencies]
@ -114,7 +112,6 @@ dev = [
"cython >=3.0.0, <4.0.0",
"greenback >=1.1.1, <2.0.0",
"ruff>=0.9.6",
"pyperclip>=1.9.0",
]
[project.scripts]
@ -128,21 +125,9 @@ include = ["piker"]
[tool.hatch.build.targets.wheel]
include = ["piker"]
# TODO? move to a `uv.toml`?
[tool.uv]
python-preference = 'system'
python-downloads = 'manual'
[tool.uv.sources]
pyqtgraph = { git = "https://github.com/pikers/pyqtgraph.git" }
asyncvnc = { git = "https://github.com/pikers/asyncvnc.git", branch = "main" }
tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" }
# tractor = { git = "https://pikers.dev/goodboy/tractor", branch = "main" }
# tractor = { git = "https://pikers.dev/goodboy/tractor", branch = "moar_eg_smoothing" }
# XXX for @goodboy's hackin, usually there's something new in the
# runtime being seriously tested here Bp
msgspec = { git = "https://github.com/jcrist/msgspec.git" }
tractor = { path = "../tractor", editable = true }

View File

@ -62,9 +62,8 @@ ignore-init-module-imports = false
fixable = ["ALL"]
unfixable = []
# TODO? uhh why no work!?
# Allow unused variables when underscore-prefixed.
# dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
[format]
# Use single quotes in `ruff format`.

1
tags
View File

@ -1 +0,0 @@
TAG_feed_status_update ./piker/data/feed.py /TAG_feed_status_update/

View File

@ -179,7 +179,7 @@ def test_ems_err_on_bad_broker(
# NOTE: emsd should error on the actor's enabled modules
# import phase, when looking for a backend named `doggy`.
except tractor.RemoteActorError as re:
assert re.type is ModuleNotFoundError
assert re.type == ModuleNotFoundError
run_and_tollerate_cancels(load_bad_fqme)

View File

@ -142,12 +142,7 @@ async def test_concurrent_tokens_refresh(us_symbols, loglevel):
# async with tractor.open_nursery() as n:
# await n.run_in_actor('other', intermittently_refresh_tokens)
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery(
# strict_exception_groups=False,
) as n
):
async with trio.open_nursery() as n:
quoter = await qt.stock_quoter(client, us_symbols)
@ -388,9 +383,7 @@ async def test_quote_streaming(tmx_symbols, loglevel, stream_what):
else:
symbols = [tmx_symbols]
async with trio.open_nursery(
strict_exception_groups=False,
) as n:
async with trio.open_nursery() as n:
for syms, func in zip(symbols, stream_what):
n.start_soon(func, feed, syms)

1326
uv.lock

File diff suppressed because it is too large Load Diff