Compare commits

..

No commits in common. "testing_utils" and "gitea_feats" have entirely different histories.

66 changed files with 1589 additions and 3874 deletions

2
.gitignore vendored
View File

@ -99,8 +99,6 @@ ENV/
# extra scripts dir
/snippets
# testing-utils aren't actively in use.
/piker/_testing
# mypy
.mypy_cache/

View File

@ -88,23 +88,7 @@ a sane install with `uv`
************************
bc why install with `python` when you can faster with `rust` ::
uv sync
# ^ astral's docs,
# https://docs.astral.sh/uv/concepts/projects/sync/
include all GUIs ::
uv sync --extra uis
AND with all our hacking tools::
uv sync --dev --extra uis
Ensure you can run the root-daemon::
uv run pikerd [-l info --pdb]
uv lock
hacky install on nixos
@ -119,18 +103,7 @@ start a chart
*************
run a realtime OHLCV chart stand-alone::
[uv run] piker -l info chart btcusdt.spot.binance xmrusdt.spot.kraken
# ^^^ iff you haven't activated the py-env,
# - https://docs.astral.sh/uv/concepts/projects/run/
#
# in order to create an explicit virt-env see,
# - https://docs.astral.sh/uv/concepts/projects/layout/#the-project-environment
# - https://docs.astral.sh/uv/pip/environments/
#
# use $UV_PROJECT_ENVIRONMENT to select any non-`.venv/`
# as the venv sudir in the repo's root.
# - https://docs.astral.sh/uv/reference/environment/#uv_project_environment
piker -l info chart btcusdt.spot.binance xmrusdt.spot.kraken
this runs a chart UI (with 1m sampled OHLCV) and shows 2 spot markets from 2 diff cexes
overlayed on the same graph. Use of `piker` without first starting

View File

@ -121,7 +121,6 @@ async def bot_main():
# tick_throttle=10,
) as feed,
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
):
assert accounts

View File

@ -1,338 +0,0 @@
#!/usr/bin/env python
from decimal import (
Decimal,
)
from pathlib import Path
import numpy as np
# import polars as pl
import trio
import tractor
from datetime import datetime
# from pprint import pformat
from piker.brokers.deribit.api import (
get_client,
maybe_open_oi_feed,
)
from piker.storage import open_storage_client, StorageClient
from piker.log import get_logger
import sys
import pyqtgraph as pg
from PyQt6 import QtCore
from pyqtgraph import ScatterPlotItem, InfiniteLine
from PyQt6.QtWidgets import QApplication
from cryptofeed.symbols import Symbol
log = get_logger(__name__)
# XXX, use 2 newlines between top level LOC (even between these
# imports and the next function line ;)
def check_if_complete(
oi: dict[str, dict[str, Decimal | None]]
) -> bool:
return all(
oi[strike]['C'] is not None
and
oi[strike]['P'] is not None for strike in oi
)
async def max_pain_daemon(
) -> None:
oi_by_strikes: dict[str, dict[str, Decimal | None]]
instruments: list[Symbol] = []
expiry_dates: list[str]
expiry_date: str
currency: str = 'btc'
kind: str = 'option'
async with get_client(
) as client:
expiry_dates: list[str] = await client.get_expiration_dates(
currency=currency,
kind=kind
)
log.info(
f'Available expiries for {currency!r}-{kind}:\n'
f'{expiry_dates}\n'
)
expiry_date: str = input(
'Please enter a valid expiration date: '
).upper()
print('Starting little daemon...')
# maybe move this type annot down to the assignment line?
oi_by_strikes: dict[str, dict[str, Decimal]]
instruments = await client.get_instruments(
expiry_date=expiry_date,
)
oi_by_strikes = client.get_strikes_dict(instruments)
def get_total_intrinsic_values(
oi_by_strikes: dict[str, dict[str, Decimal]]
) -> dict[str, dict[str, Decimal]]:
call_cash: Decimal = Decimal(0)
put_cash: Decimal = Decimal(0)
intrinsic_values: dict[str, dict[str, Decimal]] = {}
closes: list = sorted(Decimal(close) for close in oi_by_strikes)
for strike, oi in oi_by_strikes.items():
s = Decimal(strike)
call_cash = sum(max(0, (s - c) * oi_by_strikes[str(c)]['C']) for c in closes)
put_cash = sum(max(0, (c - s) * oi_by_strikes[str(c)]['P']) for c in closes)
intrinsic_values[strike] = {
'C': call_cash,
'P': put_cash,
'total': call_cash + put_cash,
}
return intrinsic_values
def get_intrinsic_value_and_max_pain(
intrinsic_values: dict[str, dict[str, Decimal]]
):
# We meed to find the lowest value, so we start at
# infinity to ensure that, and the max_pain must be
# an amount greater than zero.
total_intrinsic_value: Decimal = Decimal('Infinity')
max_pain: Decimal = Decimal(0)
for strike, oi in oi_by_strikes.items():
s = Decimal(strike)
if intrinsic_values[strike]['total'] < total_intrinsic_value:
total_intrinsic_value = intrinsic_values[strike]['total']
max_pain = s
return total_intrinsic_value, max_pain
def plot_graph(
oi_by_strikes: dict[str, dict[str, Decimal]],
plot,
):
"""Update the bar graph with new open interest data."""
plot.clear()
intrinsic_values = get_total_intrinsic_values(oi_by_strikes)
for strike_str in sorted(oi_by_strikes, key=lambda x: int(x)):
strike = int(strike_str)
calls_val = float(oi_by_strikes[strike_str]['C'])
puts_val = float(oi_by_strikes[strike_str]['P'])
bar_c = pg.BarGraphItem(
x=[strike - 100],
height=[calls_val],
width=200,
pen='w',
brush=(0, 0, 255, 150)
)
plot.addItem(bar_c)
bar_p = pg.BarGraphItem(
x=[strike + 100],
height=[puts_val],
width=200,
pen='w',
brush=(255, 0, 0, 150)
)
plot.addItem(bar_p)
total_val = float(intrinsic_values[strike_str]['total']) / 100000
scatter_iv = ScatterPlotItem(
x=[strike],
y=[total_val],
pen=pg.mkPen(color=(0, 255, 0), width=2),
brush=pg.mkBrush(0, 255, 0, 150),
size=3,
symbol='o'
)
plot.addItem(scatter_iv)
_, max_pain = get_intrinsic_value_and_max_pain(intrinsic_values)
vertical_line = InfiniteLine(
pos=max_pain,
angle=90,
pen=pg.mkPen(color='yellow', width=1, style=QtCore.Qt.PenStyle.DotLine),
label=f'Max pain: {max_pain:,.0f}',
labelOpts={
'position': 0.85,
'color': 'yellow',
'movable': True
}
)
plot.addItem(vertical_line)
def update_oi_by_strikes(msg: tuple):
nonlocal oi_by_strikes
if 'oi' == msg[0]:
strike_price = msg[1]['strike_price']
option_type = msg[1]['option_type']
open_interest = msg[1]['open_interest']
oi_by_strikes.setdefault(
strike_price, {}
).update(
{option_type: open_interest}
)
# Define the structured dtype
dtype = np.dtype([
('time', int),
('oi', float),
('oi_calc', float),
])
async def write_open_interest_on_file(msg: tuple, client: StorageClient):
if 'oi' == msg[0]:
nonlocal expiry_date
timestamp = msg[1]['timestamp']
strike_price = msg[1]["strike_price"]
option_type = msg[1]['option_type'].lower()
col_sym_key = f'btc-{expiry_date.lower()}-{strike_price}-{option_type}'
# Create the numpy array with sample data
data = np.array([
(
int(timestamp),
float(msg[1]['open_interest']),
np.nan,
),
], dtype=dtype)
path: Path = await client.write_oi(
col_sym_key,
data,
)
# TODO, use std logging like this throughout for status
# emissions on console!
log.info(f'Wrote OI history to {path}')
def get_max_pain(
oi_by_strikes: dict[str, dict[str, Decimal]]
) -> dict[str, str | Decimal]:
'''
This method requires only the strike_prices and oi for call
and puts, the closes list are the same as the strike_prices
the idea is to sum all the calls and puts cash for each strike
and the ITM strikes from that strike, the lowest value is what we
are looking for the intrinsic value.
'''
nonlocal timestamp
intrinsic_values = get_total_intrinsic_values(oi_by_strikes)
total_intrinsic_value, max_pain = get_intrinsic_value_and_max_pain(intrinsic_values)
return {
'timestamp': timestamp,
'expiry_date': expiry_date,
'total_intrinsic_value': total_intrinsic_value,
'max_pain': max_pain,
}
async with (
open_storage_client() as (_, storage),
maybe_open_oi_feed(
instruments,
) as oi_feed,
):
# Initialize QApplication
app = QApplication(sys.argv)
win = pg.GraphicsLayoutWidget(show=True)
win.setWindowTitle('Calls (blue) vs Puts (red)')
plot = win.addPlot(title='OI by Strikes')
plot.showGrid(x=True, y=True)
print('Plot initialized...')
async for msg in oi_feed:
# In memory oi_by_strikes dict, all message are filtered here
# and the dict is updated with the open interest data
update_oi_by_strikes(msg)
# Write on file using storage client
await write_open_interest_on_file(msg, storage)
# Max pain calcs, before start we must gather all the open interest for
# all the strike prices and option types available for a expiration date
if check_if_complete(oi_by_strikes):
if 'oi' == msg[0]:
# Here we must read for the filesystem all the latest open interest value for
# each instrument for that specific expiration date, that means look up for the
# last update got the instrument btc-{expity_date}-*oi1s.parquet (1s because is
# hardcoded to something, sorry.)
timestamp = msg[1]['timestamp']
max_pain = get_max_pain(oi_by_strikes)
# intrinsic_values = get_total_intrinsic_values(oi_by_strikes)
# graph here
plot_graph(oi_by_strikes, plot)
# TODO, use a single multiline string with `()`
# and drop the multiple `print()` calls (this
# should be done elsewhere in this file as well!
#
# As per the docs,
# https://docs.python.org/3/reference/lexical_analysis.html#string-literal-concatenation
# you could instead do,
# print(
# '-----------------------------------------------\n'
# f'timestamp: {datetime.fromtimestamp(max_pain['timestamp'])}\n'
# )
# WHY?
# |_ less ctx-switches/calls to `print()`
# |_ the `str` can then be modified / passed
# around as a variable more easily if needed in
# the future ;)
#
# ALSO, i believe there already is a stdlib
# module to do "alignment" of text which you
# could try for doing the right-side alignment,
# https://docs.python.org/3/library/textwrap.html#textwrap.indent
#
print('-----------------------------------------------')
print(f'timestamp: {datetime.fromtimestamp(max_pain['timestamp'])}')
print(f'expiry_date: {max_pain['expiry_date']}')
print(f'max_pain: {max_pain['max_pain']:,.0f}')
print(f'total intrinsic value: {max_pain['total_intrinsic_value']:,.0f}')
print('-----------------------------------------------')
# Process GUI events to keep the window responsive
app.processEvents()
async def main():
async with tractor.open_nursery(
debug_mode=True,
loglevel='info',
) as an:
from tractor import log
log.get_console_log(level='info')
ptl: tractor.Portal = await an.start_actor(
'max_pain_daemon',
enable_modules=[__name__],
infect_asyncio=True,
# ^TODO, we can actually run this in the root-actor now
# if needed as per 2nd "section" in,
# https://pikers.dev/goodboy/tractor/pulls/2
#
# NOTE, will first require us porting to modern
# `tractor:main` though ofc!
)
await ptl.run(max_pain_daemon)
if __name__ == '__main__':
trio.run(main)

View File

@ -1,29 +0,0 @@
## Max Pain Calculation for Deribit Options
This feature, which calculates the max pain point for options traded
on the Deribit exchange using cryptofeed library.
- Functions in the api module for fetching options data from Deribit.
[commit](https://pikers.dev/pikers/piker/commit/da55856dd2876291f55a06eb0561438a912d8241)
- Compute the max pain point based on open interest data using
deribit's api.
[commit](https://pikers.dev/pikers/piker/commit/0d9d6e15ba0edeb662ec97f7599dd66af3046b94)
### How to test it?
**Before start:** in order to get this working with `uv`, you
**must** use my [`tractor` fork](https://pikers.dev/ntorres/tractor/src/branch/aio_abandons)
and this branch: `aio_abandons`, the reason is that I cherry-pick the
`uv_migration` that guille made, for some reason that a didn't dive
into, in my system y need tractor using `uv` too. quite hacky
I guess.
1. `uv lock`
2. `uv run --no-dev python examples/max_pain.py`
3. A message should be display, enter one of the expiration date
available.
4. The script should be up and running.

View File

@ -1,81 +0,0 @@
# manual label testing cruft
import trio
import tractor
async def test_bed(
ohlcv,
chart,
lc,
):
from ._graphics._lines import order_line
sleep = 6
# from PyQt5.QtCore import QPointF
vb = chart._vb
# scene = vb.scene()
# raxis = chart.getAxis('right')
# vb_right = vb.boundingRect().right()
last, i_end = ohlcv.array[-1][['close', 'index']]
line = order_line(
chart,
level=last,
level_digits=2
)
# eps = line.getEndpoints()
# llabel = line._labels[1][1]
line.update_labels({'level': last})
return
# rl = eps[1]
# rlabel.setPos(rl)
# ti = pg.TextItem(text='Fuck you')
# ti.setPos(pg.Point(i_end, last))
# ti.setParentItem(line)
# ti.setAnchor(pg.Point(1, 1))
# vb.addItem(ti)
# chart.plotItem.addItem(ti)
from ._label import Label
txt = Label(
vb,
fmt_str='fuck {it}',
)
txt.format(it='boy')
txt.place_on_scene('left')
txt.set_view_y(last)
# txt = QtGui.QGraphicsTextItem()
# txt.setPlainText("FUCK YOU")
# txt.setFont(_font.font)
# txt.setDefaultTextColor(pg.mkColor(hcolor('bracket')))
# # txt.setParentItem(vb)
# w = txt.boundingRect().width()
# scene.addItem(txt)
# txt.setParentItem(line)
# d_coords = vb.mapFromView(QPointF(i_end, last))
# txt.setPos(vb_right - w, d_coords.y())
# txt.show()
# txt.update()
# rlabel.setPos(vb_right - 2*w, d_coords.y())
# rlabel.show()
i = 0
while True:
await trio.sleep(sleep)
await tractor.breakpoint()
txt.format(it=f'dog_{i}')
# d_coords = vb.mapFromView(QPointF(i_end, last))
# txt.setPos(vb_right - w, d_coords.y())
# txt.setPlainText(f"FUCK YOU {i}")
i += 1

View File

@ -42,6 +42,7 @@ from ._mktinfo import (
dec_digits,
digits_to_dec,
MktPair,
Symbol,
unpack_fqme,
_derivs as DerivTypes,
)
@ -59,6 +60,7 @@ __all__ = [
'Asset',
'MktPair',
'Position',
'Symbol',
'Transaction',
'TransactionLedger',
'dec_digits',

View File

@ -390,8 +390,8 @@ class MktPair(Struct, frozen=True):
cls,
fqme: str,
price_tick: float|str,
size_tick: float|str,
price_tick: float | str,
size_tick: float | str,
bs_mktid: str,
broker: str | None = None,
@ -677,3 +677,90 @@ def unpack_fqme(
# '.'.join([mkt_ep, venue]),
suffix,
)
class Symbol(Struct):
'''
I guess this is some kinda container thing for dealing with
all the different meta-data formats from brokers?
'''
key: str
broker: str = ''
venue: str = ''
# precision descriptors for price and vlm
tick_size: Decimal = Decimal('0.01')
lot_tick_size: Decimal = Decimal('0.0')
suffix: str = ''
broker_info: dict[str, dict[str, Any]] = {}
@classmethod
def from_fqme(
cls,
fqsn: str,
info: dict[str, Any],
) -> Symbol:
broker, mktep, venue, suffix = unpack_fqme(fqsn)
tick_size = info.get('price_tick_size', 0.01)
lot_size = info.get('lot_tick_size', 0.0)
return Symbol(
broker=broker,
key=mktep,
tick_size=tick_size,
lot_tick_size=lot_size,
venue=venue,
suffix=suffix,
broker_info={broker: info},
)
@property
def type_key(self) -> str:
return list(self.broker_info.values())[0]['asset_type']
@property
def tick_size_digits(self) -> int:
return float_digits(self.tick_size)
@property
def lot_size_digits(self) -> int:
return float_digits(self.lot_tick_size)
@property
def price_tick(self) -> Decimal:
return Decimal(str(self.tick_size))
@property
def size_tick(self) -> Decimal:
return Decimal(str(self.lot_tick_size))
@property
def broker(self) -> str:
return list(self.broker_info.keys())[0]
@property
def fqme(self) -> str:
return maybe_cons_tokens([
self.key, # final "pair name" (eg. qqq[/usd], btcusdt)
self.venue,
self.suffix, # includes expiry and other con info
self.broker,
])
def quantize(
self,
size: float,
) -> Decimal:
digits = float_digits(self.lot_tick_size)
return Decimal(size).quantize(
Decimal(f'1.{"0".ljust(digits, "0")}'),
rounding=ROUND_HALF_EVEN
)
# NOTE: when cast to `str` return fqme
def __str__(self) -> str:
return self.fqme

View File

@ -362,11 +362,7 @@ class Position(Struct):
# added: bool = False
tid: str = t.tid
if tid in self._events:
log.debug(
f'Txn is already added?\n'
f'\n'
f'{t}\n'
)
log.warning(f'{t} is already added?!')
# return added
# TODO: apparently this IS possible with a dict but not
@ -700,7 +696,7 @@ class Account(Struct):
else:
# TODO: we reallly need a diff set of
# loglevels/colors per subsys.
log.debug(
log.warning(
f'Recent position for {fqme} was closed!'
)

View File

@ -22,9 +22,7 @@ you know when you're losing money (if possible) XD
from __future__ import annotations
from collections.abc import ValuesView
from contextlib import contextmanager as cm
from functools import partial
from math import copysign
from pprint import pformat
from typing import (
Any,
Callable,
@ -39,16 +37,12 @@ from pendulum import (
parse,
)
from ..log import get_logger
if TYPE_CHECKING:
from ._ledger import (
Transaction,
TransactionLedger,
)
log = get_logger(__name__)
def ppu(
clears: Iterator[Transaction],
@ -244,9 +238,6 @@ def iter_by_dt(
def dyn_parse_to_dt(
tx: tuple[str, dict[str, Any]] | Transaction,
debug: bool = False,
_invalid: list|None = None,
) -> DateTime:
# handle `.items()` inputs
@ -259,16 +250,11 @@ def iter_by_dt(
# get best parser for this record..
for k in parsers:
if (
(v := getattr(tx, k, None))
or
(
isdict
and
(v := tx.get(k))
)
isdict and k in tx
or getattr(tx, k, None)
):
# TODO? remove yah?
# v = tx[k] if isdict else tx.dt
v = tx[k] if isdict else tx.dt
assert v is not None, f'No valid value for `{k}`!?'
# only call parser on the value if not None from
# the `parsers` table above (when NOT using
@ -276,54 +262,21 @@ def iter_by_dt(
# sort on it directly
if (
not isinstance(v, DateTime)
and
(parser := parsers.get(k))
and (parser := parsers.get(k))
):
ret = parser(v)
return parser(v)
else:
ret = v
return v
return ret
else:
continue
# XXX: should never get here..
else:
if debug:
import tractor
with tractor.devx.maybe_open_crash_handler():
raise ValueError(
f'Invalid txn time ??\n'
f'txn-id: {k!r}\n'
f'{k!r}: {v!r}\n'
)
# assert v is not None, f'No valid value for `{k}`!?'
# XXX: should never get here..
breakpoint()
if _invalid is not None:
_invalid.append(tx)
return from_timestamp(0.)
# breakpoint()
entry: tuple[str, dict]|Transaction
invalid: list = []
entry: tuple[str, dict] | Transaction
for entry in sorted(
records,
key=key or partial(
dyn_parse_to_dt,
_invalid=invalid,
),
key=key or dyn_parse_to_dt,
):
if entry in invalid:
log.warning(
f'Ignoring txn w invalid timestamp ??\n'
f'{pformat(entry)}\n'
# f'txn-id: {k!r}\n'
# f'{k!r}: {v!r}\n'
)
continue
# NOTE the type sig above; either pairs or txns B)
yield entry

View File

@ -51,7 +51,6 @@ __brokers__: list[str] = [
'ib',
'kraken',
'kucoin',
'deribit',
# broken but used to work
# 'questrade',
@ -62,6 +61,7 @@ __brokers__: list[str] = [
# wstrade
# iex
# deribit
# bitso
]

View File

@ -96,10 +96,7 @@ async def _setup_persistent_brokerd(
# - `open_symbol_search()`
# NOTE: see ep invocation details inside `.data.feed`.
try:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as service_nursery
):
async with trio.open_nursery() as service_nursery:
bus: _FeedsBus = feed.get_feed_bus(
brokername,
service_nursery,

View File

@ -374,14 +374,9 @@ class Client:
pair: Pair = pair_type(**item)
except Exception as e:
e.add_note(
f'\n'
f'New or removed field we need to codify!\n'
f'pair-type: {pair_type!r}\n'
f'\n'
f"Don't panic, prolly stupid binance changed their symbology schema again..\n"
f'Check out their API docs here:\n'
f'\n'
f'https://binance-docs.github.io/apidocs/spot/en/#exchange-information\n'
"\nDon't panic, prolly stupid binance changed their symbology schema again..\n"
'Check out their API docs here:\n\n'
'https://binance-docs.github.io/apidocs/spot/en/#exchange-information'
)
raise
pair_table[pair.symbol.upper()] = pair

View File

@ -440,7 +440,6 @@ async def open_trade_dialog(
# - ledger: TransactionLedger
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
ctx.open_stream() as ems_stream,
):

View File

@ -448,6 +448,7 @@ async def subscribe(
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
@ -459,7 +460,6 @@ async def stream_quotes(
) -> None:
async with (
tractor.trionics.maybe_raise_from_masking_exc(),
send_chan as send_chan,
open_cached_client('binance') as client,
):

View File

@ -97,13 +97,6 @@ class Pair(Struct, frozen=True, kw_only=True):
baseAsset: str
baseAssetPrecision: int
permissionSets: list[list[str]]
# https://developers.binance.com/docs/binance-spot-api-docs#2025-08-26
# will become non-optional 2025-08-28?
# https://developers.binance.com/docs/binance-spot-api-docs#future-changes
pegInstructionsAllowed: bool|None = None
filters: dict[
str,
str | int | float,
@ -149,11 +142,7 @@ class SpotPair(Pair, frozen=True):
defaultSelfTradePreventionMode: str
allowedSelfTradePreventionModes: list[str]
permissions: list[str]
# can the paint botz creat liq gaps even easier on this asset?
# Bp
# https://developers.binance.com/docs/binance-spot-api-docs/faqs/order_amend_keep_priority
amendAllowed: bool
permissionSets: list[list[str]]
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.binance:SpotPair'

View File

@ -25,7 +25,6 @@ from .api import (
get_client,
)
from .feed import (
get_mkt_info,
open_history_client,
open_symbol_search,
stream_quotes,
@ -35,20 +34,15 @@ from .feed import (
# open_trade_dialog,
# norm_trade_records,
# )
from .venues import (
OptionPair,
)
log = get_logger(__name__)
__all__ = [
'get_client',
# 'trades_dialogue',
'get_mkt_info',
'open_history_client',
'open_symbol_search',
'stream_quotes',
'OptionPair',
# 'norm_trade_records',
]

File diff suppressed because it is too large Load Diff

View File

@ -18,59 +18,38 @@
Deribit backend.
'''
from __future__ import annotations
from contextlib import asynccontextmanager as acm
from datetime import datetime
from typing import (
# Any,
# Optional,
Callable,
)
# from pprint import pformat
from typing import Any, Optional, Callable
import time
import cryptofeed
import trio
from trio_typing import TaskStatus
from pendulum import (
from_timestamp,
)
import pendulum
from rapidfuzz import process as fuzzy
import numpy as np
import tractor
from piker.accounting import (
Asset,
MktPair,
unpack_fqme,
)
from piker.brokers import (
open_cached_client,
NoData,
from piker.brokers import open_cached_client
from piker.log import get_logger, get_console_log
from piker.data import ShmArray
from piker.brokers._util import (
BrokerError,
DataUnavailable,
)
from piker._cacheables import (
async_lifo_cache,
)
from piker.log import (
get_logger,
mk_repr,
)
from piker.data.validate import FeedInit
from cryptofeed import FeedHandler
from cryptofeed.defines import (
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
)
from cryptofeed.symbols import Symbol
from .api import (
Client,
# get_config,
piker_sym_to_cb_sym,
cb_sym_to_deribit_inst,
str_to_cb_sym,
Client, Trade,
get_config,
str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
maybe_open_price_feed
)
from .venues import (
Pair,
OptionPair,
Trade,
)
_spawn_kwargs = {
'infect_asyncio': True,
@ -85,215 +64,90 @@ async def open_history_client(
mkt: MktPair,
) -> tuple[Callable, int]:
fnstrument: str = mkt.bs_fqme
# TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client:
pair: OptionPair = client._pairs[mkt.dst.name]
# XXX NOTE, the cuckers use ms !!!
creation_time_s: int = pair.creation_timestamp/1000
async def get_ohlc(
timeframe: float,
end_dt: datetime | None = None,
start_dt: datetime | None = None,
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
) -> tuple[
np.ndarray,
datetime, # start
datetime, # end
]:
if timeframe != 60:
raise DataUnavailable('Only 1m bars are supported')
array: np.ndarray = await client.bars(
mkt,
array = await client.bars(
instrument,
start_dt=start_dt,
end_dt=end_dt,
)
if len(array) == 0:
if (
end_dt is None
):
raise DataUnavailable(
'No history seems to exist yet?\n\n'
f'{mkt}'
)
elif (
end_dt
and
end_dt.timestamp() < creation_time_s
):
# the contract can't have history
# before it was created.
pair_type_str: str = type(pair).__name__
create_dt: datetime = from_timestamp(creation_time_s)
raise DataUnavailable(
f'No history prior to\n'
f'`{pair_type_str}.creation_timestamp: int = '
f'{pair.creation_timestamp}\n\n'
f'------ deribit sux ------\n'
f'WHICH IN "NORMAL PEOPLE WHO USE EPOCH TIME" form is,\n'
f'creation_time_s: {creation_time_s}\n'
f'create_dt: {create_dt}\n'
)
raise NoData(
f'No frame for {start_dt} -> {end_dt}\n'
)
raise DataUnavailable
start_dt = from_timestamp(array[0]['time'])
end_dt = from_timestamp(array[-1]['time'])
times = array['time']
if not times.any():
raise ValueError(
'Bad frame with null-times?\n\n'
f'{times}'
)
if end_dt is None:
inow: int = round(time.time())
if (inow - times[-1]) > 60:
await tractor.pause()
start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time'])
return array, start_dt, end_dt
yield (
get_ohlc,
{ # backfill config
'erlangs': 3,
'rate': 3,
}
)
@async_lifo_cache()
async def get_mkt_info(
fqme: str,
) -> tuple[MktPair, Pair|OptionPair] | None:
# uppercase since kraken bs_mktid is always upper
if 'deribit' not in fqme.lower():
fqme += '.deribit'
mkt_mode: str = ''
broker, mkt_ep, venue, expiry = unpack_fqme(fqme)
# NOTE: we always upper case all tokens to be consistent with
# binance's symbology style for pairs, like `BTCUSDT`, but in
# theory we could also just keep things lower case; as long as
# we're consistent and the symcache matches whatever this func
# returns, always!
expiry: str = expiry.upper()
venue: str = venue.upper()
# venue_lower: str = venue.lower()
mkt_mode: str = 'option'
async with open_cached_client(
'deribit',
) as client:
assets: dict[str, Asset] = await client.get_assets()
pair_str: str = mkt_ep.lower()
pair: Pair = await client.exch_info(
sym=pair_str,
)
mkt_mode = pair.venue
client.mkt_mode = mkt_mode
dst: Asset | None = assets.get(pair.bs_dst_asset)
src: Asset | None = assets.get(pair.bs_src_asset)
mkt = MktPair(
dst=dst,
src=src,
price_tick=pair.price_tick,
size_tick=pair.size_tick,
bs_mktid=pair.symbol,
venue=mkt_mode,
broker='deribit',
_atype=mkt_mode,
_fqme_without_src=True,
# expiry=pair.expiry,
# XXX TODO, currently we don't use it since it's
# already "described" in the `OptionPair.symbol: str`
# and if we slap in the ISO repr it's kinda hideous..
# -[ ] figure out the best either std
)
return mkt, pair
yield get_ohlc, {'erlangs': 3, 'rate': 3}
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
loglevel: str = None,
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Open a live quote stream for the market set defined by `symbols`.
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
Internally this starts a `cryptofeed.FeedHandler` inside an `asyncio`-side
task and relays through L1 and `Trade` msgs here to our `trio.Task`.
'''
sym = symbols[0].split('.')[0]
init_msgs: list[FeedInit] = []
# multiline nested `dict` formatter (since rn quote-msgs are
# just that).
pfmt: Callable[[str], str] = mk_repr(
# so we can see `deribit`'s delightfully mega-long bs fields..
maxstring=100,
)
sym = symbols[0]
async with (
open_cached_client('deribit') as client,
send_chan as send_chan
):
mkt: MktPair
pair: Pair
mkt, pair = await get_mkt_info(sym)
# build out init msgs according to latest spec
init_msgs.append(
FeedInit(
mkt_info=mkt,
)
)
# build `cryptofeed` feed-handle
cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym)
init_msgs = {
# pass back token, and bool, signalling if we're the writer
# and that history has been written
sym: {
'symbol_info': {
'asset_type': 'option',
'price_tick_size': 0.0005
},
'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
},
}
from_cf: tractor.to_asyncio.LinkedTaskChannel
async with maybe_open_price_feed(sym) as from_cf:
nsym = piker_sym_to_cb_sym(sym)
# load the "last trades" summary
last_trades_res: cryptofeed.LastTradesResult = await client.last_trades(
cb_sym_to_deribit_inst(cf_sym),
count=1,
)
last_trades: list[Trade] = last_trades_res.trades
async with maybe_open_price_feed(sym) as stream:
# TODO, do we even need this or will the above always
# work?
# if not last_trades:
# await tractor.pause()
# async for typ, quote in from_cf:
# if typ == 'trade':
# last_trade = Trade(**(quote['data']))
# break
cache = await client.cache_symbols()
# else:
last_trade = Trade(
**(last_trades[0])
)
last_trades = (await client.last_trades(
cb_sym_to_deribit_inst(nsym), count=1)).trades
first_quote: dict = {
if len(last_trades) == 0:
last_trade = None
async for typ, quote in stream:
if typ == 'trade':
last_trade = Trade(**(quote['data']))
break
else:
last_trade = Trade(**(last_trades[0]))
first_quote = {
'symbol': sym,
'last': last_trade.price,
'brokerd_ts': last_trade.timestamp,
@ -304,84 +158,13 @@ async def stream_quotes(
'broker_ts': last_trade.timestamp
}]
}
task_status.started((
init_msgs,
first_quote,
))
task_status.started((init_msgs, first_quote))
feed_is_live.set()
# NOTE XXX, static for now!
# => since this only handles ONE mkt feed at a time we
# don't need a lookup table to map interleaved quotes
# from multiple possible mkt-pairs
topic: str = mkt.bs_fqme
# deliver until cancelled
async for typ, ref in from_cf:
match typ:
case 'trade':
trade: cryptofeed.types.Trade = ref
# TODO, re-impl this according to teh ideal
# fqme for opts that we choose!!
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(trade.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'last': trade.price,
'broker_ts': time.time(),
# ^TODO, name this `brokerd/datad_ts` and
# use `time.time_ns()` ??
'ticks': [{
'type': 'trade',
'price': float(trade.price),
'size': float(trade.amount),
'broker_ts': trade.timestamp,
}],
}
log.info(
f'deribit {typ!r} quote for {sym!r}\n\n'
f'{trade}\n\n'
f'{pfmt(piker_quote)}\n'
)
case 'l1':
book: cryptofeed.types.L1Book = ref
# TODO, so this is where we can possibly change things
# and instead lever the `MktPair.bs_fqme: str` output?
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(book.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'ticks': [
{'type': 'bid',
'price': float(book.bid_price),
'size': float(book.bid_size)},
{'type': 'bsize',
'price': float(book.bid_price),
'size': float(book.bid_size),},
{'type': 'ask',
'price': float(book.ask_price),
'size': float(book.ask_size),},
{'type': 'asize',
'price': float(book.ask_price),
'size': float(book.ask_size),}
]
}
await send_chan.send({
topic: piker_quote,
})
async for typ, quote in stream:
topic = quote['symbol']
await send_chan.send({topic: quote})
@tractor.context
@ -391,21 +174,12 @@ async def open_symbol_search(
async with open_cached_client('deribit') as client:
# load all symbols locally for fast search
# cache = client._pairs
cache = await client.cache_symbols()
await ctx.started()
async with ctx.open_stream() as stream:
pattern: str
async for pattern in stream:
# NOTE: pattern fuzzy-matching is done within
# the methd impl.
pairs: dict[str, Pair] = await client.search_symbols(
pattern,
)
# repack in fqme-keyed table
byfqme: dict[str, Pair] = {}
for pair in pairs.values():
byfqme[pair.bs_fqme] = pair
await stream.send(byfqme)
# repack in dict form
await stream.send(
await client.search_symbols(pattern))

View File

@ -1,196 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Per market data-type definitions and schemas types.
"""
from __future__ import annotations
import pendulum
from typing import (
Literal,
Optional,
)
from decimal import Decimal
from piker.types import Struct
# API endpoint paths by venue / sub-API
_domain: str = 'deribit.com'
_url = f'https://www.{_domain}'
# WEBsocketz
_ws_url: str = f'wss://www.{_domain}/ws/api/v2'
# test nets
_testnet_ws_url: str = f'wss://test.{_domain}/ws/api/v2'
MarketType = Literal[
'option'
]
def get_api_eps(venue: MarketType) -> tuple[str, str]:
'''
Return API ep root paths per venue.
'''
return {
'option': (
_ws_url,
),
}[venue]
class Pair(Struct, frozen=True, kw_only=True):
symbol: str
# src
quote_currency: str # 'BTC'
# dst
base_currency: str # "BTC",
tick_size: float # 0.0001 # [{'above_price': 0.005, 'tick_size': 0.0005}]
tick_size_steps: list[dict[str, float]]
@property
def price_tick(self) -> Decimal:
return Decimal(str(self.tick_size_steps[0]['above_price']))
@property
def size_tick(self) -> Decimal:
return Decimal(str(self.tick_size))
@property
def bs_fqme(self) -> str:
return f'{self.symbol}'
@property
def bs_mktid(self) -> str:
return f'{self.symbol}.{self.venue}'
class OptionPair(Pair, frozen=True):
taker_commission: float # 0.0003
strike: float # 5000.0
settlement_period: str # 'day'
settlement_currency: str # "BTC",
rfq: bool # false
price_index: str # 'btc_usd'
option_type: str # 'call'
min_trade_amount: float # 0.1
maker_commission: float # 0.0003
kind: str # 'option'
is_active: bool # true
instrument_type: str # 'reversed'
instrument_name: str # 'BTC-1SEP24-55000-C'
instrument_id: int # 364671
expiration_timestamp: int # 1725177600000
creation_timestamp: int # 1724918461000
counter_currency: str # 'USD'
contract_size: float # '1.0'
block_trade_tick_size: float # '0.0001'
block_trade_min_trade_amount: int # '25'
block_trade_commission: float # '0.003'
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.deribit:OptionPair'
# TODO, impl this without the MM:SS part of
# the `'THH:MM:SS..'` etc..
@property
def expiry(self) -> str:
iso_date = pendulum.from_timestamp(
self.expiration_timestamp / 1000
).isoformat()
return iso_date
@property
def venue(self) -> str:
return f'{self.instrument_type}_option'
@property
def bs_fqme(self) -> str:
return f'{self.symbol}'
@property
def bs_src_asset(self) -> str:
return f'{self.quote_currency}'
@property
def bs_dst_asset(self) -> str:
return f'{self.symbol}'
PAIRTYPES: dict[MarketType, Pair] = {
'option': OptionPair,
}
class JSONRPCResult(Struct):
id: int
usIn: int
usOut: int
usDiff: int
testnet: bool
jsonrpc: str = '2.0'
error: Optional[dict] = None
result: Optional[list[dict]] = None
class JSONRPCChannel(Struct):
method: str
params: dict
jsonrpc: str = '2.0'
class KLinesResult(Struct):
low: list[float]
cost: list[float]
high: list[float]
open: list[float]
close: list[float]
ticks: list[int]
status: str
volume: list[float]
class Trade(Struct):
iv: float
price: float
amount: float
trade_id: str
contracts: float
direction: str
trade_seq: int
timestamp: int
mark_price: float
index_price: float
tick_direction: int
instrument_name: str
combo_id: Optional[str] = '',
combo_trade_id: Optional[int] = 0,
block_trade_id: Optional[str] = '',
block_trade_leg_count: Optional[int] = 0,
class LastTradesResult(Struct):
trades: list[Trade]
has_more: bool

View File

@ -20,11 +20,6 @@ runnable script-programs.
'''
from __future__ import annotations
from datetime import ( # noqa
datetime,
date,
tzinfo as TzInfo,
)
from functools import partial
from typing import (
Literal,
@ -39,7 +34,6 @@ from piker.brokers._util import get_logger
if TYPE_CHECKING:
from .api import Client
from ib_insync import IB
import i3ipc
log = get_logger('piker.brokers.ib')
@ -54,37 +48,6 @@ _reset_tech: Literal[
] = 'vnc'
no_setup_msg:str = (
'No data reset hack test setup for {vnc_sockaddr}!\n'
'See config setup tips @\n'
'https://github.com/pikers/piker/tree/master/piker/brokers/ib'
)
def try_xdo_manual(
vnc_sockaddr: str,
):
'''
Do the "manual" `xdo`-based screen switch + click
combo since apparently the `asyncvnc` client ain't workin..
Note this is only meant as a backup method for Xorg users,
ideally you can use a real vnc client and the `vnc_click_hack()`
impl!
'''
global _reset_tech
try:
i3ipc_xdotool_manual_click_hack()
_reset_tech = 'i3ipc_xdotool'
return True
except OSError:
log.exception(
no_setup_msg.format(vnc_sockaddr=vnc_sockaddr)
)
return False
async def data_reset_hack(
# vnc_host: str,
client: Client,
@ -127,9 +90,15 @@ async def data_reset_hack(
vnc_port: int
vnc_sockaddr: tuple[str] | None = client.conf.get('vnc_addrs')
no_setup_msg:str = (
f'No data reset hack test setup for {vnc_sockaddr}!\n'
'See config setup tips @\n'
'https://github.com/pikers/piker/tree/master/piker/brokers/ib'
)
if not vnc_sockaddr:
log.warning(
no_setup_msg.format(vnc_sockaddr=vnc_sockaddr)
no_setup_msg
+
'REQUIRES A `vnc_addrs: array` ENTRY'
)
@ -150,38 +119,27 @@ async def data_reset_hack(
port=vnc_port,
)
)
except (
OSError, # no VNC server avail..
PermissionError, # asyncvnc pw fail..
):
except OSError:
if vnc_host != 'localhost':
log.warning(no_setup_msg)
return False
try:
import i3ipc # noqa (since a deps dynamic check)
except ModuleNotFoundError:
log.warning(
no_setup_msg.format(vnc_sockaddr=vnc_sockaddr)
)
log.warning(no_setup_msg)
return False
if vnc_host not in {
'localhost',
'127.0.0.1',
}:
focussed, matches = i3ipc_fin_wins_titled()
if not matches:
log.warning(
no_setup_msg.format(vnc_sockaddr=vnc_sockaddr)
)
return False
else:
try_xdo_manual(vnc_sockaddr)
# localhost but no vnc-client or it borked..
else:
try_xdo_manual(vnc_sockaddr)
try:
i3ipc_xdotool_manual_click_hack()
_reset_tech = 'i3ipc_xdotool'
return True
except OSError:
log.exception(no_setup_msg)
return False
case 'i3ipc_xdotool':
try_xdo_manual(vnc_sockaddr)
# i3ipc_xdotool_manual_click_hack()
i3ipc_xdotool_manual_click_hack()
case _ as tech:
raise RuntimeError(f'{tech} is not supported for reset tech!?')
@ -220,9 +178,9 @@ async def vnc_click_hack(
host,
port=port,
# TODO: doesn't work?
# see, https://github.com/barneygale/asyncvnc/issues/7
password='doggy',
# TODO: doesn't work see:
# https://github.com/barneygale/asyncvnc/issues/7
# password='ibcansmbz',
) as client:
@ -236,103 +194,70 @@ async def vnc_click_hack(
client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked
def i3ipc_fin_wins_titled(
titles: list[str] = [
'Interactive Brokers', # tws running in i3
'IB Gateway', # gw running in i3
# 'IB', # gw running in i3 (newer version?)
# !TODO, remote vnc instance
# -[ ] something in title (or other Con-props) that indicates
# this is explicitly for ibrk sw?
# |_[ ] !can use modden spawn eventually!
'TigerVNC',
# 'vncviewer', # the terminal..
],
) -> tuple[
i3ipc.Con, # orig focussed win
list[tuple[str, i3ipc.Con]], # matching wins by title
]:
'''
Attempt to find a local-DE window titled with an entry in
`titles`.
If found deliver the current focussed window and all matching
`i3ipc.Con`s in a list.
'''
import i3ipc
ipc = i3ipc.Connection()
# TODO: might be worth offering some kinda api for grabbing
# the window id from the pid?
# https://stackoverflow.com/a/2250879
tree = ipc.get_tree()
focussed: i3ipc.Con = tree.find_focused()
matches: list[i3ipc.Con] = []
for name in titles:
results = tree.find_titled(name)
print(f'results for {name}: {results}')
if results:
con = results[0]
matches.append((
name,
con,
))
return (
focussed,
matches,
)
def i3ipc_xdotool_manual_click_hack() -> None:
'''
Do the data reset hack but expecting a local X-window using `xdotool`.
'''
focussed, matches = i3ipc_fin_wins_titled()
orig_win_id = focussed.window
import i3ipc
i3 = i3ipc.Connection()
# TODO: might be worth offering some kinda api for grabbing
# the window id from the pid?
# https://stackoverflow.com/a/2250879
t = i3.get_tree()
orig_win_id = t.find_focused().window
# for tws
win_names: list[str] = [
'Interactive Brokers', # tws running in i3
'IB Gateway', # gw running in i3
# 'IB', # gw running in i3 (newer version?)
]
try:
for name, con in matches:
print(f'Resetting data feed for {name}')
win_id = str(con.window)
w, h = con.rect.width, con.rect.height
for name in win_names:
results = t.find_titled(name)
print(f'results for {name}: {results}')
if results:
con = results[0]
print(f'Resetting data feed for {name}')
win_id = str(con.window)
w, h = con.rect.width, con.rect.height
# TODO: seems to be a few libs for python but not sure
# if they support all the sub commands we need, order of
# most recent commit history:
# https://github.com/rr-/pyxdotool
# https://github.com/ShaneHutter/pyxdotool
# https://github.com/cphyc/pyxdotool
# TODO: seems to be a few libs for python but not sure
# if they support all the sub commands we need, order of
# most recent commit history:
# https://github.com/rr-/pyxdotool
# https://github.com/ShaneHutter/pyxdotool
# https://github.com/cphyc/pyxdotool
# TODO: only run the reconnect (2nd) kc on a detected
# disconnect?
for key_combo, timeout in [
# only required if we need a connection reset.
# ('ctrl+alt+r', 12),
# data feed reset.
('ctrl+alt+f', 6)
]:
subprocess.call([
'xdotool',
'windowactivate', '--sync', win_id,
# TODO: only run the reconnect (2nd) kc on a detected
# disconnect?
for key_combo, timeout in [
# only required if we need a connection reset.
# ('ctrl+alt+r', 12),
# data feed reset.
('ctrl+alt+f', 6)
]:
subprocess.call([
'xdotool',
'windowactivate', '--sync', win_id,
# move mouse to bottom left of window (where
# there should be nothing to click).
'mousemove_relative', '--sync', str(w-4), str(h-4),
# move mouse to bottom left of window (where
# there should be nothing to click).
'mousemove_relative', '--sync', str(w-4), str(h-4),
# NOTE: we may need to stick a `--retry 3` in here..
'click', '--window', win_id,
'--repeat', '3', '1',
# NOTE: we may need to stick a `--retry 3` in here..
'click', '--window', win_id,
'--repeat', '3', '1',
# hackzorzes
'key', key_combo,
],
timeout=timeout,
)
# hackzorzes
'key', key_combo,
],
timeout=timeout,
)
# re-activate and focus original window
subprocess.call([
@ -342,99 +267,3 @@ def i3ipc_xdotool_manual_click_hack() -> None:
])
except subprocess.TimeoutExpired:
log.exception('xdotool timed out?')
def is_current_time_in_range(
start_dt: datetime,
end_dt: datetime,
) -> bool:
'''
Check if current time is within the datetime range.
Use any/the-same timezone as provided by `start_dt.tzinfo` value
in the range.
'''
now: datetime = datetime.now(start_dt.tzinfo)
return start_dt <= now <= end_dt
# TODO, put this into `._util` and call it from here!
#
# NOTE, this was generated by @guille from a gpt5 prompt
# and was originally thot to be needed before learning about
# `ib_insync.contract.ContractDetails._parseSessions()` and
# it's downstream meths..
#
# This is still likely useful to keep for now to parse the
# `.tradingHours: str` value manually if we ever decide
# to move off `ib_async` and implement our own `trio`/`anyio`
# based version Bp
#
# >attempt to parse the retarted ib "time stampy thing" they
# >do for "venue hours" with this.. written by
# >gpt5-"thinking",
#
def parse_trading_hours(
spec: str,
tz: TzInfo|None = None
) -> dict[
date,
tuple[datetime, datetime]
]|None:
'''
Parse venue hours like:
'YYYYMMDD:HHMM-YYYYMMDD:HHMM;YYYYMMDD:CLOSED;...'
Returns `dict[date] = (open_dt, close_dt)` or `None` if
closed.
'''
if (
not isinstance(spec, str)
or
not spec
):
raise ValueError('spec must be a non-empty string')
out: dict[
date,
tuple[datetime, datetime]
]|None = {}
for part in (p.strip() for p in spec.split(';') if p.strip()):
if part.endswith(':CLOSED'):
day_s, _ = part.split(':', 1)
d = datetime.strptime(day_s, '%Y%m%d').date()
out[d] = None
continue
try:
start_s, end_s = part.split('-', 1)
start_dt = datetime.strptime(start_s, '%Y%m%d:%H%M')
end_dt = datetime.strptime(end_s, '%Y%m%d:%H%M')
except ValueError as exc:
raise ValueError(f'invalid segment: {part}') from exc
if tz is not None:
start_dt = start_dt.replace(tzinfo=tz)
end_dt = end_dt.replace(tzinfo=tz)
out[start_dt.date()] = (start_dt, end_dt)
return out
# ORIG desired usage,
#
# TODO, for non-drunk tomorrow,
# - call above fn and check that `output[today] is not None`
# trading_hrs: dict = parse_trading_hours(
# details.tradingHours
# )
# liq_hrs: dict = parse_trading_hours(
# details.liquidHours
# )

View File

@ -48,7 +48,6 @@ from bidict import bidict
import trio
import tractor
from tractor import to_asyncio
from tractor import trionics
from pendulum import (
from_timestamp,
DateTime,
@ -97,10 +96,6 @@ from ._util import (
get_logger,
)
# ?TODO? this can now be removed since it was originally to extend
# with a `bar_vwap` field that we removed from the default ohlcv
# dtype since it's better calculated in an FSP func
#
_bar_load_dtype: list[tuple[str, type]] = [
# NOTE XXX: only part that's diff
# from our default fields where
@ -338,15 +333,15 @@ class Client:
fqme: str,
# EST in ISO 8601 format is required... below is EPOCH
start_dt: datetime|str = "1970-01-01T00:00:00.000000-05:00",
end_dt: datetime|str = "",
start_dt: datetime | str = "1970-01-01T00:00:00.000000-05:00",
end_dt: datetime | str = "",
# ohlc sample period in seconds
sample_period_s: int = 1,
# optional "duration of time" equal to the
# length of the returned history frame.
duration: str|None = None,
duration: str | None = None,
**kwargs,
@ -720,8 +715,8 @@ class Client:
async def find_contracts(
self,
pattern: str|None = None,
contract: Contract|None = None,
pattern: str | None = None,
contract: Contract | None = None,
qualify: bool = True,
err_on_qualify: bool = True,
@ -866,7 +861,7 @@ class Client:
self,
fqme: str,
) -> datetime|None:
) -> datetime | None:
'''
Return the first datetime stamp for `fqme` or `None`
on request failure.
@ -922,7 +917,7 @@ class Client:
tries: int = 100,
raise_on_timeout: bool = False,
) -> Ticker|None:
) -> Ticker | None:
'''
Return a single (snap) quote for symbol.
@ -934,7 +929,7 @@ class Client:
ready: ticker.TickerUpdateEvent = ticker.updateEvent
# ensure a last price gets filled in before we deliver quote
timeouterr: Exception|None = None
timeouterr: Exception | None = None
warnset: bool = False
for _ in range(tries):
@ -1374,8 +1369,8 @@ async def load_clients_for_trio(
'''
Pure async mngr proxy to ``load_aio_clients()``.
This is a bootstrap entrypoint to call from
a `tractor.to_asyncio.open_channel_from()`.
This is a bootstrap entrypoing to call from
a ``tractor.to_asyncio.open_channel_from()``.
'''
async with load_aio_clients(
@ -1396,10 +1391,7 @@ async def open_client_proxies() -> tuple[
async with (
tractor.trionics.maybe_open_context(
acm_func=tractor.to_asyncio.open_channel_from,
kwargs={
'target': load_clients_for_trio,
# ^XXX, kwarg to `open_channel_from()`
},
kwargs={'target': load_clients_for_trio},
# lock around current actor task access
# TODO: maybe this should be the default in tractor?
@ -1509,7 +1501,7 @@ class MethodProxy:
self,
pattern: str,
) -> dict[str, Any]|trio.Event:
) -> dict[str, Any] | trio.Event:
ev = self.event_table.get(pattern)
@ -1546,7 +1538,7 @@ async def open_aio_client_method_relay(
# relay all method requests to ``asyncio``-side client and deliver
# back results
while not to_trio._closed:
msg: tuple[str, dict]|dict|None = await from_trio.get()
msg: tuple[str, dict] | dict | None = await from_trio.get()
match msg:
case None: # termination sentinel
log.info('asyncio `Client` method-proxy SHUTDOWN!')
@ -1592,8 +1584,7 @@ async def open_client_proxy(
event_consumers=event_table,
) as (first, chan),
trionics.collapse_eg(), # loose-ify
trio.open_nursery() as relay_tn,
trio.open_nursery() as relay_n,
):
assert isinstance(first, Client)
@ -1633,7 +1624,7 @@ async def open_client_proxy(
continue
relay_tn.start_soon(relay_events)
relay_n.start_soon(relay_events)
yield proxy

View File

@ -34,7 +34,6 @@ import trio
from trio_typing import TaskStatus
import tractor
from tractor.to_asyncio import LinkedTaskChannel
from tractor import trionics
from ib_insync.contract import (
Contract,
)
@ -408,7 +407,7 @@ async def update_and_audit_pos_msg(
# TODO: make this a "propaganda" log level?
if ibpos.avgCost != msg.avg_price:
log.debug(
log.warning(
f'IB "FIFO" avg price for {msg.symbol} is DIFF:\n'
f'ib: {ibfmtmsg}\n'
'---------------------------\n'
@ -547,10 +546,7 @@ async def open_trade_dialog(
),
# TODO: do this as part of `open_account()`!?
open_symcache(
'ib',
only_from_memcache=True,
) as symcache,
open_symcache('ib', only_from_memcache=True) as symcache,
):
# Open a trade ledgers stack for appending trade records over
# multiple accounts.
@ -559,9 +555,7 @@ async def open_trade_dialog(
tables: dict[str, Account] = {}
order_msgs: list[Status] = []
conf = get_config()
accounts_def_inv: bidict[str, str] = bidict(
conf['accounts']
).inverse
accounts_def_inv: bidict[str, str] = bidict(conf['accounts']).inverse
with (
ExitStack() as lstack,
@ -711,11 +705,7 @@ async def open_trade_dialog(
# client-account and build out position msgs to deliver to
# EMS.
for acctid, acnt in tables.items():
active_pps: dict[str, Position]
(
active_pps,
closed_pps,
) = acnt.dump_active()
active_pps, closed_pps = acnt.dump_active()
for pps in [active_pps, closed_pps]:
piker_pps: list[Position] = list(pps.values())
@ -731,7 +721,6 @@ async def open_trade_dialog(
)
if ibpos:
bs_mktid: str = str(ibpos.contract.conId)
msg = await update_and_audit_pos_msg(
acctid,
pikerpos,
@ -749,7 +738,7 @@ async def open_trade_dialog(
f'UNEXPECTED POSITION says IB => {msg.symbol}\n'
'Maybe they LIQUIDATED YOU or your ledger is wrong?\n'
)
log.debug(logmsg)
log.error(logmsg)
await ctx.started((
all_positions,
@ -758,22 +747,21 @@ async def open_trade_dialog(
async with (
ctx.open_stream() as ems_stream,
trionics.collapse_eg(),
trio.open_nursery() as tn,
trio.open_nursery() as n,
):
# relay existing open orders to ems
for msg in order_msgs:
await ems_stream.send(msg)
for client in set(aioclients.values()):
trade_event_stream: LinkedTaskChannel = await tn.start(
trade_event_stream: LinkedTaskChannel = await n.start(
open_trade_event_stream,
client,
)
# start order request handler **before** local trades
# event loop
tn.start_soon(
n.start_soon(
handle_order_requests,
ems_stream,
accounts_def,
@ -781,7 +769,7 @@ async def open_trade_dialog(
)
# allocate event relay tasks for each client connection
tn.start_soon(
n.start_soon(
deliver_trade_events,
trade_event_stream,
@ -1253,47 +1241,32 @@ async def deliver_trade_events(
# never relay errors for non-broker related issues
# https://interactivebrokers.github.io/tws-api/message_codes.html
code: int = err['error_code']
reason: str = err['reason']
reqid: str = str(err['reqid'])
# "Warning:" msg codes,
# https://interactivebrokers.github.io/tws-api/message_codes.html#warning_codes
# - 2109: 'Outside Regular Trading Hours'
if 'Warning:' in reason:
log.warning(
f'Order-API-warning: {code!r}\n'
f'reqid: {reqid!r}\n'
f'\n'
f'{pformat(err)}\n'
# ^TODO? should we just print the `reason`
# not the full `err`-dict?
)
continue
# XXX known special (ignore) cases
elif code in {
200, # uhh.. ni idea
if code in {
200, # uhh
# hist pacing / connectivity
162,
165,
# WARNING codes:
# https://interactivebrokers.github.io/tws-api/message_codes.html#warning_codes
# Attribute 'Outside Regular Trading Hours' is
# " 'ignored based on the order type and
# destination. PlaceOrder is now ' 'being
# processed.',
2109,
# XXX: lol this isn't even documented..
# 'No market data during competing live session'
1669,
}:
log.error(
f'Order-API-error which is non-cancel-causing ?!\n'
f'\n'
f'{pformat(err)}\n'
)
continue
reqid: str = str(err['reqid'])
reason: str = err['reason']
if err['reqid'] == -1:
log.error(
f'TWS external order error ??\n'
f'{pformat(err)}\n'
)
log.error(f'TWS external order error:\n{pformat(err)}')
flow: dict = dict(
flows.get(reqid)

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers
# Copyright (C) 2018-forever Tyler Goodlet (in stewardship for pikers)
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -13,12 +13,10 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Data feed endpoints pre-wrapped and ready for use with ``tractor``/``trio``.
'''
Data feed endpoints pre-wrapped and ready for use with `tractor`/`trio`
via "infected-asyncio-mode".
'''
"""
from __future__ import annotations
import asyncio
from contextlib import (
@ -28,6 +26,7 @@ from dataclasses import asdict
from datetime import datetime
from functools import partial
from pprint import pformat
from math import isnan
import time
from typing import (
Any,
@ -41,6 +40,7 @@ import numpy as np
from pendulum import (
now,
from_timestamp,
# DateTime,
Duration,
duration as mk_duration,
)
@ -69,10 +69,7 @@ from .api import (
Contract,
RequestError,
)
from ._util import (
data_reset_hack,
is_current_time_in_range,
)
from ._util import data_reset_hack
from .symbols import get_mkt_info
if TYPE_CHECKING:
@ -187,8 +184,7 @@ async def open_history_client(
if (
start_dt
and
start_dt.timestamp() == 0
and start_dt.timestamp() == 0
):
await tractor.pause()
@ -207,7 +203,7 @@ async def open_history_client(
):
count += 1
mean += latency / count
log.debug(
print(
f'HISTORY FRAME QUERY LATENCY: {latency}\n'
f'mean: {mean}'
)
@ -289,9 +285,8 @@ _pacing: str = (
async def wait_on_data_reset(
proxy: MethodProxy,
reset_type: str = 'data',
timeout: float = 16,
timeout: float = 16, # float('inf'),
task_status: TaskStatus[
tuple[
@ -300,47 +295,29 @@ async def wait_on_data_reset(
]
] = trio.TASK_STATUS_IGNORED,
) -> bool:
'''
Wait on a (global-ish) "data-farm" event to be emitted
by the IB api server.
Allows syncing to reconnect event-messages emitted on the API
console, such as:
- 'HMDS data farm connection is OK:ushmds'
- 'Market data farm is connecting:usfuture'
- 'Market data farm connection is OK:usfuture'
Deliver a `(cs, done: Event)` pair to the caller to support it
waiting or cancelling the associated "data-reset-request";
normally a manual data-reset-req is expected to be the cause and
thus trigger such events (such as our click-hack-magic from
`.ib._util`).
'''
# ?TODO, do we need a task-lock around this method?
#
# register for an API "status event" wrapped for `trio`-sync.
hist_ev: trio.Event = proxy.status_event(
# TODO: we might have to put a task lock around this
# method..
hist_ev = proxy.status_event(
'HMDS data farm connection is OK:ushmds'
)
#
# ^TODO: other event-messages we might want to support waiting-for
# but i wasn't able to get reliable..
#
# TODO: other event messages we might want to try and
# wait for but i wasn't able to get any of this
# reliable..
# reconnect_start = proxy.status_event(
# 'Market data farm is connecting:usfuture'
# )
# live_ev = proxy.status_event(
# 'Market data farm connection is OK:usfuture'
# )
# try to wait on the reset event(s) to arrive, a timeout
# will trigger a retry up to 6 times (for now).
client: Client = proxy._aio_ns
done = trio.Event()
with trio.move_on_after(timeout) as cs:
task_status.started((cs, done))
log.warning(
@ -419,9 +396,8 @@ async def get_bars(
bool, # timed out hint
]:
'''
Request-n-retrieve historical data frames from a `trio.Task`
using a `MethoProxy` to query the `asyncio`-side's
`.ib.api.Client` methods.
Retrieve historical data from a ``trio``-side task using
a ``MethoProxy``.
'''
global _data_resetter_task, _failed_resets
@ -631,10 +607,7 @@ async def get_bars(
# such that simultaneous symbol queries don't try data resettingn
# too fast..
unset_resetter: bool = False
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as nurse
):
async with trio.open_nursery() as nurse:
# start history request that we allow
# to run indefinitely until a result is acquired
@ -680,14 +653,14 @@ async def get_bars(
)
# per-actor cache of inter-eventloop-chans
_quote_streams: dict[str, trio.abc.ReceiveStream] = {}
# TODO! update to the new style sig with,
# `chan: to_asyncio.LinkedTaskChannel,`
async def _setup_quote_stream(
chan: tractor.to_asyncio.LinkedTaskChannel,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
symbol: str,
opts: tuple[int] = (
'375', # RT trade volume (excludes utrades)
@ -705,13 +678,10 @@ async def _setup_quote_stream(
) -> trio.abc.ReceiveChannel:
'''
Stream L1 quotes via the `Ticker.updateEvent.connect(push)`
callback API by registering a `push` callback which simply
`chan.send_nowait()`s quote msgs back to the calling
parent-`trio.Task`-side.
Stream a ticker using the std L1 api.
NOTE, that this task-fn is run on the `asyncio.Task`-side ONLY
and is thus run via `tractor.to_asyncio.open_channel_from()`.
This task is ``asyncio``-side and must be called from
``tractor.to_asyncio.open_channel_from()``.
'''
global _quote_streams
@ -719,79 +689,37 @@ async def _setup_quote_stream(
async with load_aio_clients(
disconnect_on_exit=False,
) as accts2clients:
# XXX since this is an `asyncio.Task`, we must use
# tractor.pause_from_sync()
caccount_name, client = get_preferred_data_client(accts2clients)
contract = (
contract
or
(await client.find_contract(symbol))
)
chan.started_nowait(contract) # cuz why not
ticker: Ticker = client.ib.reqMktData(
contract,
','.join(opts),
)
maybe_exc: BaseException|None = None
handler_tries: int = 0
aio_task: asyncio.Task = asyncio.current_task()
contract = contract or (await client.find_contract(symbol))
to_trio.send_nowait(contract) # cuz why not
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
# ?TODO? this API is batch-wise and quite slow-af but,
# - seems to be 5s updates?
# - maybe we could use it for backchecking?
#
# NOTE: it's batch-wise and slow af but I guess could
# be good for backchecking? Seems to be every 5s maybe?
# ticker: Ticker = client.ib.reqTickByTickData(
# contract, 'Last',
# )
# define a very naive queue-pushing callback that relays
# quote-packets directly the calling (parent) `trio.Task`.
# Ensure on teardown we cancel the feed via their cancel API.
#
# # define a simple queue push routine that streams quote packets
# # to trio over the ``to_trio`` memory channel.
# to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
def teardown():
'''
Disconnect our `push`-er callback and cancel the data-feed
for `contract`.
'''
nonlocal maybe_exc
ticker.updateEvent.disconnect(push)
report: str = f'Disconnected mkt-data for {symbol!r} due to '
if maybe_exc is not None:
report += (
'error,\n'
f'{maybe_exc!r}\n'
)
log.error(report)
else:
report += (
'cancellation.\n'
)
log.cancel(report)
log.error(f"Disconnected stream for `{symbol}`")
client.ib.cancelMktData(contract)
# decouple broadcast mem chan
_quote_streams.pop(symbol, None)
def push(
t: Ticker,
tries_before_raise: int = 6,
) -> None:
'''
Push quotes verbatim to parent-side `trio.Task`.
def push(t: Ticker) -> None:
"""
Push quotes to trio task.
'''
nonlocal maybe_exc, handler_tries
# log.debug(f'new IB quote: {t}\n')
"""
# log.debug(t)
try:
chan.send_nowait(t)
to_trio.send_nowait(t)
# XXX TODO XXX replicate in `tractor` tests
# as per `CancelledError`-handler notes below!
# assert 0
except (
trio.BrokenResourceError,
@ -806,104 +734,35 @@ async def _setup_quote_stream(
# resulting in tracebacks spammed to console..
# Manually do the dereg ourselves.
teardown()
# for slow debugging purposes to avoid clobbering prompt
# with log msgs
except trio.WouldBlock:
log.exception(
f'Asyncio->Trio `chan.send_nowait()` blocked !?\n'
f'\n'
f'{chan._to_trio.statistics()}\n'
)
# ?TODO, handle re-connection attempts?
except BaseException as _berr:
berr = _berr
if handler_tries >= tries_before_raise:
# breakpoint()
maybe_exc = _berr
# task.set_exception(berr)
aio_task.cancel(msg=berr.args)
raise berr
else:
handler_tries += 1
log.exception(
f'Failed to push ticker quote !?\n'
f'handler_tries={handler_tries!r}\n'
f'ticker: {t!r}\n'
f'\n'
f'{chan._to_trio.statistics()}\n'
f'\n'
f'CAUSE: {berr}\n'
)
# log.warning(
# f'channel is blocking symbol feed for {symbol}?'
# f'\n{to_trio.statistics}'
# )
pass
# except trio.WouldBlock:
# # for slow debugging purposes to avoid clobbering prompt
# # with log msgs
# pass
ticker.updateEvent.connect(push)
try:
await asyncio.sleep(float('inf'))
# XXX, for debug.. TODO? can we rm again?
#
# tractor.pause_from_sync()
# while True:
# await asyncio.sleep(1.6)
# if ticker.ticks:
# log.debug(
# f'ticker.ticks = \n'
# f'{ticker.ticks}\n'
# )
# else:
# log.warning(
# 'UHH no ticker.ticks ??'
# )
# XXX TODO XXX !?!?
# apparently **without this handler** and the subsequent
# re-raising of `maybe_exc from _taskc` cancelling the
# `aio_task` from the `push()`-callback will cause a very
# strange chain of exc raising that breaks alll sorts of
# downstream callers, tasks and remote-actor tasks!?
#
# -[ ] we need some lowlevel reproducting tests to replicate
# those worst-case scenarios in `tractor` core!!
# -[ ] likely we should factor-out the `tractor.to_asyncio`
# attempts at workarounds in `.translate_aio_errors()`
# for failed `asyncio.Task.set_exception()` to either
# call `aio_task.cancel()` and/or
# `aio_task._fut_waiter.set_exception()` to a re-useable
# toolset in something like a `.to_asyncio._utils`??
#
except asyncio.CancelledError as _taskc:
if maybe_exc is not None:
raise maybe_exc from _taskc
raise _taskc
except BaseException as _berr:
# stash any crash cause for reporting in `teardown()`
maybe_exc = _berr
raise _berr
finally:
# always disconnect our `push()` and cancel the
# ib-"mkt-data-feed".
teardown()
# return from_aio
@acm
async def open_aio_quote_stream(
symbol: str,
contract: Contract|None = None,
contract: Contract | None = None,
) -> trio.abc.ReceiveStream:
'''
Open a real-time `Ticker` quote stream from an `asyncio.Task`
spawned via `tractor.to_asyncio.open_channel_from()`, deliver the
inter-event-loop channel to the `trio.Task` caller and cache it
globally for re-use.
'''
from tractor.trionics import broadcast_receiver
global _quote_streams
@ -928,10 +787,6 @@ async def open_aio_quote_stream(
assert contract
# TODO? de-reg on teardown of last consumer task?
# -> why aren't we using `.trionics.maybe_open_context()`
# here again?? (we are in `open_client_proxies()` tho?)
#
# cache feed for later consumers
_quote_streams[symbol] = from_aio
@ -946,12 +801,7 @@ def normalize(
calc_price: bool = False
) -> dict:
'''
Translate `ib_async`'s `Ticker.ticks` values to a `piker`
normalized `dict` form for transmit to downstream `.data` layer
consumers.
'''
# check for special contract types
con = ticker.contract
fqme, calc_price = con2fqme(con)
@ -970,7 +820,7 @@ def normalize(
tbt = ticker.tickByTicks
if tbt:
log.info(f'tickbyticks:\n {ticker.tickByTicks}')
print(f'tickbyticks:\n {ticker.tickByTicks}')
ticker.ticks = new_ticks
@ -1006,39 +856,27 @@ def normalize(
return data
# ?TODO? feels like this task-fn could be factored to reduce some
# indentation levels?
# -[ ] the reconnect while loop on ib-gw "data farm connection.."s
# -[ ] everything embedded under the `async with aclosing(stream):`
# as the "meat" of the quote delivery once the connection is
# stable.
#
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
# TODO? we need to hook into the `ib_async` logger like
# we can with i3ipc from modden!
# loglevel: str|None = None,
loglevel: str = None,
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Stream `symbols[0]` quotes back via `send_chan`.
Stream symbol quotes.
The `feed_is_live: Event` is set to signal the caller that it can
begin processing msgs from the mem-chan.
This is a ``trio`` callable routine meant to be invoked
once the brokerd is up.
'''
# TODO: support multiple subscriptions
sym: str = symbols[0]
log.info(
f'request for real-time quotes\n'
f'sym: {sym!r}\n'
)
sym = symbols[0]
log.info(f'request for real-time quotes: {sym}')
init_msgs: list[FeedInit] = []
@ -1047,49 +885,34 @@ async def stream_quotes(
details: ibis.ContractDetails
async with (
open_data_client() as proxy,
# trio.open_nursery() as tn,
):
mkt, details = await get_mkt_info(
sym,
proxy=proxy, # passed to avoid implicit client load
)
# is venue active rn?
venue_is_open: bool = any(
is_current_time_in_range(
start_dt=sesh.start,
end_dt=sesh.end,
)
for sesh in details.tradingSessions()
)
init_msg = FeedInit(mkt_info=mkt)
# NOTE, tell sampler (via config) to skip vlm summing for dst
# assets which provide no vlm data..
if mkt.dst.atype in {
'fiat',
'index',
'commodity',
}:
# tell sampler config that it shouldn't do vlm summing.
init_msg.shm_write_opts['sum_tick_vlm'] = False
init_msg.shm_write_opts['has_vlm'] = False
init_msgs.append(init_msg)
con: Contract = details.contract
first_ticker: Ticker|None = None
with trio.move_on_after(1.6) as quote_cs:
first_ticker: Ticker | None = None
with trio.move_on_after(1):
first_ticker: Ticker = await proxy.get_quote(
contract=con,
raise_on_timeout=False,
)
# XXX should never happen with this ep right?
# but if so then, more then likely mkt is closed?
if quote_cs.cancelled_caught:
await tractor.pause()
if first_ticker:
first_quote: dict = normalize(first_ticker)
@ -1101,27 +924,28 @@ async def stream_quotes(
f'{pformat(first_quote)}\n'
)
# XXX NOTE: whenever we're "outside regular trading hours"
# (only relevant for assets coming from the "legacy markets"
# space) so we basically (from an API/runtime-operational
# perspective) "pretend the feed is live" even if it's
# actually closed.
#
# IOW, we signal to the effective caller (task) that the live
# feed is "already up" but really we're just indicating that
# the OHLCV history can start being loaded immediately by the
# `piker.data`/`.tsp` layers.
#
# XXX, deats: the "pretend we're live" is just done by
# a `feed_is_live.set()` even though nothing is actually live
# Bp
if not venue_is_open:
log.warning(
f'Venue is closed, unable to establish real-time feed.\n'
f'mkt: {mkt!r}\n'
f'\n'
f'first_ticker: {first_ticker}\n'
)
# NOTE: it might be outside regular trading hours for
# assets with "standard venue operating hours" so we
# only "pretend the feed is live" when the dst asset
# type is NOT within the NON-NORMAL-venue set: aka not
# commodities, forex or crypto currencies which CAN
# always return a NaN on a snap quote request during
# normal venue hours. In the case of a closed venue
# (equitiies, futes, bonds etc.) we at least try to
# grab the OHLC history.
if (
first_ticker
and
isnan(first_ticker.last)
# SO, if the last quote price value is NaN we ONLY
# "pretend to do" `feed_is_live.set()` if it's a known
# dst asset venue with a lot of closed operating hours.
and mkt.dst.atype not in {
'commodity',
'fiat',
'crypto',
}
):
task_status.started((
init_msgs,
first_quote,
@ -1132,12 +956,10 @@ async def stream_quotes(
feed_is_live.set()
# block and let data history backfill code run.
# XXX obvi given the venue is closed, we never expect feed
# to come up; a taskc should be the only way to
# terminate this task.
await trio.sleep_forever()
return # we never expect feed to come up?
# ?TODO, we could instead spawn a task that waits on a feed
# TODO: we should instead spawn a task that waits on a feed
# to start and let it wait indefinitely..instead of this
# hard coded stuff.
# async def wait_for_first_quote():
@ -1159,26 +981,23 @@ async def stream_quotes(
'Rxed init quote:\n'
f'{pformat(first_quote)}'
)
cs: trio.CancelScope|None = None
cs: trio.CancelScope | None = None
startup: bool = True
while (
startup
or
cs.cancel_called
or cs.cancel_called
):
with trio.CancelScope() as cs:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as nurse,
open_aio_quote_stream(
symbol=sym,
contract=con,
) as stream,
):
# ?TODO? can we rm this - particularly for `ib_async`?
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash)
# first_ticker.ticks = []
first_ticker.ticks = []
# only on first entry at feed boot up
if startup:
@ -1192,8 +1011,8 @@ async def stream_quotes(
# data feed event.
async def reset_on_feed():
# ??TODO? this seems to be surpressed from the
# traceback in `tractor`?
# TODO: this seems to be surpressed from the
# traceback in ``tractor``?
# assert 0
rt_ev = proxy.status_event(
@ -1237,7 +1056,7 @@ async def stream_quotes(
# ugh, clear ticks since we've
# consumed them (ahem, ib_insync is
# truly stateful trash)
# ticker.ticks = []
ticker.ticks = []
# XXX: this works because we don't use
# ``aclosing()`` above?
@ -1254,12 +1073,8 @@ async def stream_quotes(
async for ticker in stream:
quote = normalize(ticker)
fqme = quote['fqme']
log.debug(
f'Sending quote\n'
f'{quote}'
)
await send_chan.send({fqme: quote})
# ugh, clear ticks since we've consumed them
# ticker.ticks = []
ticker.ticks = []
# last = time.time()

View File

@ -34,7 +34,6 @@ import urllib.parse
import hashlib
import hmac
import base64
import tractor
import trio
from piker import config
@ -373,7 +372,8 @@ class Client:
# 1658347714, 'status': 'Success'}]}
if xfers:
await tractor.pause()
import tractor
await tractor.pp()
trans: dict[str, Transaction] = {}
for entry in xfers:
@ -501,8 +501,7 @@ class Client:
for xkey, data in resp['result'].items():
# NOTE: always cache in pairs tables for faster lookup
with tractor.devx.maybe_open_crash_handler(): # as bxerr:
pair = Pair(xname=xkey, **data)
pair = Pair(xname=xkey, **data)
# register the above `Pair` structs for all
# key-sets/monikers: a set of 4 (frickin) tables

View File

@ -175,8 +175,9 @@ async def handle_order_requests(
case {
'account': 'kraken.spot' as account,
'action': 'buy'|'sell',
}:
'action': action,
} if action in {'buy', 'sell'}:
# validate
order = BrokerdOrder(**msg)
@ -261,12 +262,6 @@ async def handle_order_requests(
} | extra
log.info(f'Submitting WS order request:\n{pformat(req)}')
# NOTE HOWTO, debug order requests
#
# if 'XRP' in pair:
# await tractor.pause()
await ws.send_msg(req)
# placehold for sanity checking in relay loop
@ -1090,8 +1085,6 @@ async def handle_order_updates(
f'Failed to {action} order {reqid}:\n'
f'{errmsg}'
)
# if tractor._state.debug_mode():
# await tractor.pause()
symbol: str = 'N/A'
if chain := apiflows.get(reqid):

View File

@ -21,6 +21,7 @@ Symbology defs and search.
from decimal import Decimal
import tractor
from rapidfuzz import process as fuzzy
from piker._cacheables import (
async_lifo_cache,
@ -40,13 +41,8 @@ from piker.accounting._mktinfo import (
)
# https://www.kraken.com/features/api#get-tradable-pairs
class Pair(Struct):
'''
A tradable asset pair as schema-defined by,
https://docs.kraken.com/api/docs/rest-api/get-tradable-asset-pairs
'''
xname: str # idiotic bs_mktid equiv i guess?
altname: str # alternate pair name
wsname: str # WebSocket pair name (if available)
@ -57,6 +53,7 @@ class Pair(Struct):
lot: str # volume lot size
cost_decimals: int
costmin: float
pair_decimals: int # scaling decimal places for pair
lot_decimals: int # scaling decimal places for volume
@ -82,7 +79,6 @@ class Pair(Struct):
tick_size: float # min price step size
status: str
costmin: str|None = None # XXX, only some mktpairs?
short_position_limit: float = 0
long_position_limit: float = float('inf')

View File

@ -25,10 +25,7 @@ from typing import TYPE_CHECKING
import trio
import tractor
from tractor.trionics import (
broadcast_receiver,
collapse_eg,
)
from tractor.trionics import broadcast_receiver
from ._util import (
log, # sub-sys logger
@ -284,11 +281,8 @@ async def open_ems(
client._ems_stream = trades_stream
# start sync code order msg delivery task
async with (
collapse_eg(),
trio.open_nursery() as tn,
):
tn.start_soon(
async with trio.open_nursery() as n:
n.start_soon(
relay_orders_from_sync_code,
client,
fqme,
@ -304,4 +298,4 @@ async def open_ems(
)
# stop the sync-msg-relay task on exit.
tn.cancel_scope.cancel()
n.cancel_scope.cancel()

View File

@ -42,7 +42,6 @@ from bidict import bidict
import trio
from trio_typing import TaskStatus
import tractor
from tractor import trionics
from ._util import (
log, # sub-sys logger
@ -77,6 +76,7 @@ if TYPE_CHECKING:
# TODO: numba all of this
def mk_check(
trigger_price: float,
known_last: float,
action: str,
@ -162,7 +162,7 @@ async def clear_dark_triggers(
router: Router,
brokerd_orders_stream: tractor.MsgStream,
quote_stream: tractor.MsgStream,
quote_stream: tractor.ReceiveMsgStream, # noqa
broker: str,
fqme: str,
@ -178,7 +178,6 @@ async def clear_dark_triggers(
'''
# XXX: optimize this for speed!
# TODO:
# - port to the new ringbuf stuff in `tractor.ipc`!
# - numba all this!
# - this stream may eventually contain multiple symbols
quote_stream._raise_on_lag = False
@ -501,7 +500,7 @@ class Router(Struct):
'''
# setup at actor spawn time
_tn: trio.Nursery
nursery: trio.Nursery
# broker to book map
books: dict[str, DarkBook] = {}
@ -667,7 +666,7 @@ class Router(Struct):
# dark book clearing loop, also lives with parent
# daemon to allow dark order clearing while no
# client is connected.
self._tn.start_soon(
self.nursery.start_soon(
clear_dark_triggers,
self,
relay.brokerd_stream,
@ -690,7 +689,7 @@ class Router(Struct):
# spawn a ``brokerd`` order control dialog stream
# that syncs lifetime with the parent `emsd` daemon.
self._tn.start_soon(
self.nursery.start_soon(
translate_and_relay_brokerd_events,
broker,
relay.brokerd_stream,
@ -764,12 +763,10 @@ async def _setup_persistent_emsd(
global _router
# open a root "service task-nursery" for the `emsd`-actor
async with (
trionics.collapse_eg(),
trio.open_nursery() as tn
):
_router = Router(_tn=tn)
# open a root "service nursery" for the ``emsd`` actor
async with trio.open_nursery() as service_nursery:
_router = Router(nursery=service_nursery)
# TODO: send back the full set of persistent
# orders/execs?
@ -1185,16 +1182,12 @@ async def process_client_order_cmds(
submitting live orders immediately if requested by the client.
'''
# TODO, only allow `msgspec.Struct` form!
cmd: dict
# cmd: dict
async for cmd in client_order_stream:
log.info(
f'Received order cmd:\n'
f'{pformat(cmd)}\n'
)
log.info(f'Received order cmd:\n{pformat(cmd)}')
# CAWT DAMN we need struct support!
oid: str = str(cmd['oid'])
oid = str(cmd['oid'])
# register this stream as an active order dialog (msg flow) for
# this order id such that translated message from the brokerd
@ -1300,7 +1293,7 @@ async def process_client_order_cmds(
case {
'oid': oid,
'symbol': fqme,
'price': price,
'price': trigger_price,
'size': size,
'action': ('buy' | 'sell') as action,
'exec_mode': ('live' | 'paper'),
@ -1332,7 +1325,7 @@ async def process_client_order_cmds(
symbol=sym,
action=action,
price=price,
price=trigger_price,
size=size,
account=req.account,
)
@ -1354,11 +1347,7 @@ async def process_client_order_cmds(
# (``translate_and_relay_brokerd_events()`` above) will
# handle relaying the ems side responses back to
# the client/cmd sender from this request
log.info(
f'Sending live order to {broker}:\n'
f'{pformat(msg)}'
)
log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
await brokerd_order_stream.send(msg)
# an immediate response should be ``BrokerdOrderAck``
@ -1374,7 +1363,7 @@ async def process_client_order_cmds(
case {
'oid': oid,
'symbol': fqme,
'price': price,
'price': trigger_price,
'size': size,
'exec_mode': exec_mode,
'action': action,
@ -1402,12 +1391,7 @@ async def process_client_order_cmds(
if isnan(last):
last = flume.rt_shm.array[-1]['close']
trigger_price: float = float(price)
pred = mk_check(
trigger_price,
last,
action,
)
pred = mk_check(trigger_price, last, action)
# NOTE: for dark orders currently we submit
# the triggered live order at a price 5 ticks
@ -1514,7 +1498,7 @@ async def maybe_open_trade_relays(
loglevel: str = 'info',
):
fqme, relay, feed, client_ready = await _router._tn.start(
fqme, relay, feed, client_ready = await _router.nursery.start(
_router.open_trade_relays,
fqme,
exec_mode,
@ -1547,7 +1531,7 @@ async def _emsd_main(
ctx: tractor.Context,
fqme: str,
exec_mode: str, # ('paper', 'live')
loglevel: str|None = None,
loglevel: str | None = None,
) -> tuple[
dict[

View File

@ -19,7 +19,6 @@ Clearing sub-system message and protocols.
"""
from __future__ import annotations
from decimal import Decimal
from typing import (
Literal,
)
@ -72,15 +71,7 @@ class Order(Struct):
symbol: str # | MktPair
account: str # should we set a default as '' ?
# https://docs.python.org/3/library/decimal.html#decimal-objects
#
# ?TODO? decimal usage throughout?
# -[ ] possibly leverage the `Encoder(decimal_format='number')`
# bit?
# |_https://jcristharif.com/msgspec/supported-types.html#decimal
# -[ ] should we also use it for .size?
#
price: Decimal
price: float
size: float # -ve is "sell", +ve is "buy"
brokers: list[str] = []
@ -187,7 +178,7 @@ class BrokerdOrder(Struct):
time_ns: int
symbol: str # fqme
price: Decimal
price: float
size: float
# TODO: if we instead rely on a +ve/-ve size to determine

View File

@ -508,7 +508,7 @@ async def handle_order_requests(
reqid = await client.submit_limit(
oid=order.oid,
symbol=f'{order.symbol}.{client.broker}',
price=float(order.price),
price=order.price,
action=order.action,
size=order.size,
# XXX: by default 0 tells ``ib_insync`` methods that

View File

@ -134,65 +134,86 @@ def pikerd(
Spawn the piker broker-daemon.
'''
# from tractor.devx import maybe_open_crash_handler
# with maybe_open_crash_handler(pdb=False):
log = get_console_log(loglevel, name='cli')
from tractor.devx import maybe_open_crash_handler
with maybe_open_crash_handler(pdb=pdb):
log = get_console_log(loglevel, name='cli')
if pdb:
log.warning((
"\n"
"!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n"
"When a `piker` daemon crashes it will block the "
"task-thread until resumed from console!\n"
"\n"
))
# service-actor registry endpoint socket-address set
regaddrs: list[tuple[str, int]] = []
conf, _ = config.load(
conf_name='conf',
)
network: dict = conf.get('network')
if (
network is None
and not maddr
):
regaddrs = [(
_default_registry_host,
_default_registry_port,
)]
else:
eps: dict = load_trans_eps(
network,
maddr,
)
for layers in eps['pikerd']:
regaddrs.append((
layers['ipv4']['addr'],
layers['tcp']['port'],
if pdb:
log.warning((
"\n"
"!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n"
"When a `piker` daemon crashes it will block the "
"task-thread until resumed from console!\n"
"\n"
))
from .. import service
# service-actor registry endpoint socket-address set
regaddrs: list[tuple[str, int]] = []
async def main():
service_mngr: service.Services
async with (
service.open_pikerd(
registry_addrs=regaddrs,
loglevel=loglevel,
debug_mode=pdb,
enable_transports=['uds'],
# enable_transports=['tcp'],
) as service_mngr,
conf, _ = config.load(
conf_name='conf',
)
network: dict = conf.get('network')
if (
network is None
and not maddr
):
assert service_mngr
# ?TODO? spawn all other sub-actor daemons according to
# multiaddress endpoint spec defined by user config
await trio.sleep_forever()
regaddrs = [(
_default_registry_host,
_default_registry_port,
)]
trio.run(main)
else:
eps: dict = load_trans_eps(
network,
maddr,
)
for layers in eps['pikerd']:
regaddrs.append((
layers['ipv4']['addr'],
layers['tcp']['port'],
))
from .. import service
async def main():
service_mngr: service.Services
async with (
service.open_pikerd(
registry_addrs=regaddrs,
loglevel=loglevel,
debug_mode=pdb,
) as service_mngr, # normally delivers a ``Services`` handle
# AsyncExitStack() as stack,
):
# TODO: spawn all other sub-actor daemons according to
# multiaddress endpoint spec defined by user config
assert service_mngr
# if tsdb:
# dname, conf = await stack.enter_async_context(
# service.marketstore.start_ahab_daemon(
# service_mngr,
# loglevel=loglevel,
# )
# )
# log.info(f'TSDB `{dname}` up with conf:\n{conf}')
# if es:
# dname, conf = await stack.enter_async_context(
# service.elastic.start_ahab_daemon(
# service_mngr,
# loglevel=loglevel,
# )
# )
# log.info(f'DB `{dname}` up with conf:\n{conf}')
await trio.sleep_forever()
trio.run(main)
@click.group(context_settings=config._context_defaults)
@ -307,10 +328,6 @@ def services(config, tl, ports):
if not ports:
ports = [_default_registry_port]
addr = tractor._addr.wrap_address(
addr=(host, ports[0])
)
async def list_services():
nonlocal host
async with (
@ -318,18 +335,16 @@ def services(config, tl, ports):
name='service_query',
loglevel=config['loglevel'] if tl else None,
),
tractor.get_registry(
addr=addr,
tractor.get_arbiter(
host=host,
port=ports[0]
) as portal
):
registry = await portal.run_from_ns(
'self',
'get_registry',
)
registry = await portal.run_from_ns('self', 'get_registry')
json_d = {}
for key, socket in registry.items():
json_d[key] = f'{socket}'
host, port = socket
json_d[key] = f'{host}:{port}'
click.echo(f"{colorize_json(json_d)}")
trio.run(list_services)

View File

@ -284,8 +284,7 @@ class Sampler:
except (
trio.BrokenResourceError,
trio.ClosedResourceError,
trio.EndOfChannel,
trio.ClosedResourceError
):
log.error(
f'{stream._ctx.chan.uid} dropped connection'
@ -698,7 +697,7 @@ async def sample_and_broadcast(
log.warning(
f'Feed OVERRUN {sub_key}'
f'@{bus.brokername} -> \n'
'@{bus.brokername} -> \n'
f'feed @ {chan.uid}\n'
f'throttle = {throttle} Hz'
)
@ -877,7 +876,6 @@ async def uniform_rate_send(
except tractor.RemoteActorError as rme:
if rme.type is not tractor._exceptions.StreamOverrun:
raise
ctx = stream._ctx
chan = ctx.chan
log.warning(
@ -894,7 +892,6 @@ async def uniform_rate_send(
trio.ClosedResourceError,
trio.BrokenResourceError,
ConnectionResetError,
trio.EndOfChannel,
):
# if the feed consumer goes down then drop
# out of this rate limiter

View File

@ -27,6 +27,7 @@ from functools import partial
from types import ModuleType
from typing import (
Any,
Optional,
Callable,
AsyncContextManager,
AsyncGenerator,
@ -34,7 +35,6 @@ from typing import (
)
import json
import tractor
import trio
from trio_typing import TaskStatus
from trio_websocket import (
@ -167,7 +167,7 @@ async def _reconnect_forever(
async def proxy_msgs(
ws: WebSocketConnection,
rent_cs: trio.CancelScope, # parent cancel scope
pcs: trio.CancelScope, # parent cancel scope
):
'''
Receive (under `timeout` deadline) all msgs from from underlying
@ -192,7 +192,7 @@ async def _reconnect_forever(
f'{url} connection bail with:'
)
await trio.sleep(0.5)
rent_cs.cancel()
pcs.cancel()
# go back to reonnect loop in parent task
return
@ -204,7 +204,7 @@ async def _reconnect_forever(
f'{src_mod}\n'
'WS feed seems down and slow af.. reconnecting\n'
)
rent_cs.cancel()
pcs.cancel()
# go back to reonnect loop in parent task
return
@ -228,12 +228,7 @@ async def _reconnect_forever(
nobsws._connected = trio.Event()
task_status.started()
mc_state: trio._channel.MemoryChannelState = snd._state
while (
mc_state.open_receive_channels > 0
and
mc_state.open_send_channels > 0
):
while not snd._closed:
log.info(
f'{src_mod}\n'
f'{url} trying (RE)CONNECT'
@ -242,11 +237,10 @@ async def _reconnect_forever(
ws: WebSocketConnection
try:
async with (
trio.open_nursery() as n,
open_websocket_url(url) as ws,
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
):
cs = nobsws._cs = tn.cancel_scope
cs = nobsws._cs = n.cancel_scope
nobsws._ws = ws
log.info(
f'{src_mod}\n'
@ -254,7 +248,7 @@ async def _reconnect_forever(
)
# begin relay loop to forward msgs
tn.start_soon(
n.start_soon(
proxy_msgs,
ws,
cs,
@ -268,7 +262,7 @@ async def _reconnect_forever(
# TODO: should we return an explicit sub-cs
# from this fixture task?
await tn.start(
await n.start(
open_fixture,
fixture,
nobsws,
@ -278,23 +272,11 @@ async def _reconnect_forever(
# to let tasks run **inside** the ws open block above.
nobsws._connected.set()
await trio.sleep_forever()
except (
HandshakeError,
ConnectionRejected,
):
except HandshakeError:
log.exception('Retrying connection')
await trio.sleep(0.5) # throttle
except BaseException as _berr:
berr = _berr
log.exception(
'Reconnect-attempt failed ??\n'
)
await trio.sleep(0.2) # throttle
raise berr
# ws & nursery block ends
#|_ws & nursery block ends
nobsws._connected = trio.Event()
if cs.cancelled_caught:
log.cancel(
@ -342,25 +324,21 @@ async def open_autorecon_ws(
connetivity errors, or some user defined recv timeout.
You can provide a ``fixture`` async-context-manager which will be
entered/exitted around each connection reset; eg. for
(re)requesting subscriptions without requiring streaming setup
code to rerun.
entered/exitted around each connection reset; eg. for (re)requesting
subscriptions without requiring streaming setup code to rerun.
'''
snd: trio.MemorySendChannel
rcv: trio.MemoryReceiveChannel
snd, rcv = trio.open_memory_channel(616)
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
async with trio.open_nursery() as n:
nobsws = NoBsWs(
url,
rcv,
msg_recv_timeout=msg_recv_timeout,
)
await tn.start(
await n.start(
partial(
_reconnect_forever,
url,
@ -373,10 +351,11 @@ async def open_autorecon_ws(
await nobsws._connected.wait()
assert nobsws._cs
assert nobsws.connected()
try:
yield nobsws
finally:
tn.cancel_scope.cancel()
n.cancel_scope.cancel()
'''
@ -389,8 +368,8 @@ of msgs over a `NoBsWs`.
class JSONRPCResult(Struct):
id: int
jsonrpc: str = '2.0'
result: dict|None = None
error: dict|None = None
result: Optional[dict] = None
error: Optional[dict] = None
@acm

View File

@ -39,7 +39,6 @@ from typing import (
AsyncContextManager,
Awaitable,
Sequence,
TYPE_CHECKING,
)
import trio
@ -76,10 +75,6 @@ from ._sampling import (
uniform_rate_send,
)
if TYPE_CHECKING:
from tractor._addr import Address
from tractor.msg.types import Aid
class Sub(Struct, frozen=True):
'''
@ -728,10 +723,7 @@ class Feed(Struct):
async for msg in stream:
await tx.send(msg)
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as nurse
):
async with trio.open_nursery() as nurse:
# spawn a relay task for each stream so that they all
# multiplex to a common channel.
for brokername in mods:
@ -907,19 +899,19 @@ async def open_feed(
feed.portals[brokermod] = portal
# fill out "status info" that the UI can show
chan: tractor.Channel = portal.chan
raddr: Address = chan.raddr
aid: Aid = chan.aid
# TAG_feed_status_update
host, port = portal.channel.raddr
if host == '127.0.0.1':
host = 'localhost'
feed.status.update({
'actor_id': aid,
'actor_short_id': f'{aid.name}@{aid.pid}',
'ipc': chan.raddr.proto_key,
'ipc_addr': raddr,
'actor_name': portal.channel.uid[0],
'host': host,
'port': port,
'hist_shm': 'NA',
'rt_shm': 'NA',
'throttle_hz': tick_throttle,
'throttle_rate': tick_throttle,
})
# feed.status.update(init_msg.pop('status', {}))
# (allocate and) connect to any feed bus for this broker
bus_ctxs.append(

View File

@ -498,7 +498,6 @@ async def cascade(
func_name: str = func.__name__
async with (
tractor.trionics.collapse_eg(), # avoid multi-taskc tb in console
trio.open_nursery() as tn,
):
# TODO: might be better to just make a "restart" method where

View File

@ -18,11 +18,7 @@
Log like a forester!
"""
import logging
import reprlib
import json
from typing import (
Callable,
)
import tractor
from pygments import (
@ -88,27 +84,3 @@ def colorize_json(
# likeable styles: algol_nu, tango, monokai
formatters.TerminalTrueColorFormatter(style=style)
)
def mk_repr(
**repr_kws,
) -> Callable[[str], str]:
'''
Allocate and deliver a `repr.Repr` instance with provided input
settings using the std-lib's `reprlib` mod,
* https://docs.python.org/3/library/reprlib.html
------ Ex. ------
An up to 6-layer-nested `dict` as multi-line:
- https://stackoverflow.com/a/79102479
- https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
'''
def_kws: dict[str, int] = dict(
indent=2,
maxlevel=6, # recursion levels
maxstring=66, # match editor line-len limit
)
def_kws |= repr_kws
reprr = reprlib.Repr(**def_kws)
return reprr.repr

View File

@ -107,22 +107,17 @@ async def open_piker_runtime(
async with (
tractor.open_root_actor(
# passed through to `open_root_actor`
# passed through to ``open_root_actor``
registry_addrs=registry_addrs,
name=name,
start_method=start_method,
loglevel=loglevel,
debug_mode=debug_mode,
# XXX NOTE MEMBER DAT der's a perf hit yo!!
# https://greenback.readthedocs.io/en/latest/principle.html#performance
maybe_enable_greenback=True,
start_method=start_method,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=enable_modules,
hide_tb=False,
**tractor_kwargs,
) as actor,
@ -205,8 +200,7 @@ async def open_pikerd(
reg_addrs,
),
tractor.open_nursery() as actor_nursery,
tractor.trionics.collapse_eg(),
trio.open_nursery() as service_tn,
trio.open_nursery() as service_nursery,
):
for addr in reg_addrs:
if addr not in root_actor.accept_addrs:
@ -217,7 +211,7 @@ async def open_pikerd(
# assign globally for future daemon/task creation
Services.actor_n = actor_nursery
Services.service_n = service_tn
Services.service_n = service_nursery
Services.debug_mode = debug_mode
try:
@ -227,7 +221,7 @@ async def open_pikerd(
# TODO: is this more clever/efficient?
# if 'samplerd' in Services.service_tasks:
# await Services.cancel_service('samplerd')
service_tn.cancel_scope.cancel()
service_nursery.cancel_scope.cancel()
# TODO: do we even need this?
@ -262,10 +256,7 @@ async def maybe_open_pikerd(
loglevel: str | None = None,
**kwargs,
) -> (
tractor._portal.Portal
|ClassVar[Services]
):
) -> tractor._portal.Portal | ClassVar[Services]:
'''
If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self
@ -290,11 +281,10 @@ async def maybe_open_pikerd(
registry_addrs: list[tuple[str, int]] = (
registry_addrs
or
[_default_reg_addr]
or [_default_reg_addr]
)
pikerd_portal: tractor.Portal|None
pikerd_portal: tractor.Portal | None
async with (
open_piker_runtime(
name=query_name,

View File

@ -28,7 +28,6 @@ from contextlib import (
)
import tractor
from trio.lowlevel import current_task
from ._util import (
log, # sub-sys logger
@ -71,84 +70,69 @@ async def maybe_spawn_daemon(
lock = Services.locks[service_name]
await lock.acquire()
try:
async with find_service(
service_name,
registry_addrs=[('127.0.0.1', 6116)],
) as portal:
if portal is not None:
lock.release()
yield portal
return
log.warning(
f"Couldn't find any existing {service_name}\n"
'Attempting to spawn new daemon-service..'
)
# ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the
# process tree
async with maybe_open_pikerd(
loglevel=loglevel,
**pikerd_kwargs,
) as pikerd_portal:
# we are the root and thus are `pikerd`
# so spawn the target service directly by calling
# the provided target routine.
# XXX: this assumes that the target is well formed and will
# do the right things to setup both a sub-actor **and** call
# the ``_Services`` api from above to start the top level
# service task for that actor.
started: bool
if pikerd_portal is None:
started = await service_task_target(
loglevel=loglevel,
**spawn_args,
)
else:
# request a remote `pikerd` (service manager) to start the
# target daemon-task, the target can't return
# a non-serializable value since it is expected that service
# starting is non-blocking and the target task will persist
# running "under" or "within" the `pikerd` actor tree after
# the questing client disconnects. in other words this
# spawns a persistent daemon actor that continues to live
# for the lifespan of whatever the service manager inside
# `pikerd` says it should.
started = await pikerd_portal.run(
service_task_target,
loglevel=loglevel,
**spawn_args,
)
if started:
log.info(f'Service {service_name} started!')
# block until we can discover (by IPC connection) to the newly
# spawned daemon-actor and then deliver the portal to the
# caller.
async with tractor.wait_for_actor(service_name) as portal:
lock.release()
yield portal
await portal.cancel_actor()
except BaseException as _err:
err = _err
if (
lock.locked()
and
lock.statistics().owner is current_task()
):
log.exception(
f'Releasing stale lock after crash..?'
f'{err!r}\n'
)
async with find_service(
service_name,
registry_addrs=[('127.0.0.1', 6116)],
) as portal:
if portal is not None:
lock.release()
raise err
yield portal
return
log.warning(
f"Couldn't find any existing {service_name}\n"
'Attempting to spawn new daemon-service..'
)
# ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the
# process tree
async with maybe_open_pikerd(
loglevel=loglevel,
**pikerd_kwargs,
) as pikerd_portal:
# we are the root and thus are `pikerd`
# so spawn the target service directly by calling
# the provided target routine.
# XXX: this assumes that the target is well formed and will
# do the right things to setup both a sub-actor **and** call
# the ``_Services`` api from above to start the top level
# service task for that actor.
started: bool
if pikerd_portal is None:
started = await service_task_target(
loglevel=loglevel,
**spawn_args,
)
else:
# request a remote `pikerd` (service manager) to start the
# target daemon-task, the target can't return
# a non-serializable value since it is expected that service
# starting is non-blocking and the target task will persist
# running "under" or "within" the `pikerd` actor tree after
# the questing client disconnects. in other words this
# spawns a persistent daemon actor that continues to live
# for the lifespan of whatever the service manager inside
# `pikerd` says it should.
started = await pikerd_portal.run(
service_task_target,
loglevel=loglevel,
**spawn_args,
)
if started:
log.info(f'Service {service_name} started!')
# block until we can discover (by IPC connection) to the newly
# spawned daemon-actor and then deliver the portal to the
# caller.
async with tractor.wait_for_actor(service_name) as portal:
lock.release()
yield portal
await portal.cancel_actor()
async def spawn_emsd(

View File

@ -109,7 +109,7 @@ class Services:
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res: Any = await ctx.wait_for_result()
ctx_res: Any = await ctx.result()
# NOTE: blocks indefinitely until cancelled
# either by error from the target context

View File

@ -101,15 +101,13 @@ async def open_registry(
if (
not tractor.is_root_process()
and
not Registry.addrs
and not Registry.addrs
):
Registry.addrs.extend(actor.reg_addrs)
if (
ensure_exists
and
not Registry.addrs
and not Registry.addrs
):
raise RuntimeError(
f"`{uid}` registry should already exist but doesn't?"
@ -148,7 +146,7 @@ async def find_service(
| list[Portal]
| None
):
# try:
reg_addrs: list[tuple[str, int]]
async with open_registry(
addrs=(
@ -159,39 +157,22 @@ async def find_service(
or Registry.addrs
),
) as reg_addrs:
log.info(f'Scanning for service `{service_name}`')
log.info(
f'Scanning for service {service_name!r}'
)
maybe_portals: list[Portal] | Portal | None
# attach to existing daemon by name if possible
maybe_portals: list[Portal]|Portal|None
async with tractor.find_actor(
service_name,
registry_addrs=reg_addrs,
only_first=first_only, # if set only returns single ref
) as maybe_portals:
if not maybe_portals:
# log.info(
print(
f'Could NOT find service {service_name!r} -> {maybe_portals!r}'
)
yield None
return
# log.info(
print(
f'Found service {service_name!r} -> {maybe_portals}'
)
yield maybe_portals
# except BaseException as _berr:
# berr = _berr
# log.exception(
# 'tractor.find_actor() failed with,\n'
# )
# raise berr
async def check_for_service(
service_name: str,

View File

@ -138,16 +138,6 @@ class StorageClient(
) -> None:
...
async def write_oi(
self,
fqme: str,
oi: np.ndarray,
append_and_duplicate: bool = True,
limit: int = int(800e3),
) -> None:
...
class TimeseriesNotFound(Exception):
'''

View File

@ -111,24 +111,6 @@ def mk_ohlcv_shm_keyed_filepath(
return path
def mk_oi_shm_keyed_filepath(
fqme: str,
period: float | int,
datadir: Path,
) -> Path:
if period < 1.:
raise ValueError('Sample period should be >= 1.!?')
path: Path = (
datadir
/
f'{fqme}.oi{int(period)}s.parquet'
)
return path
def unpack_fqme_from_parquet_filepath(path: Path) -> str:
filename: str = str(path.name)
@ -190,11 +172,7 @@ class NativeStorageClient:
key: str = path.name.rstrip('.parquet')
fqme, _, descr = key.rpartition('.')
if 'ohlcv' in descr:
prefix, _, suffix = descr.partition('ohlcv')
elif 'oi' in descr:
prefix, _, suffix = descr.partition('oi')
prefix, _, suffix = descr.partition('ohlcv')
period: int = int(suffix.strip('s'))
# cache description data
@ -391,61 +369,6 @@ class NativeStorageClient:
timeframe,
)
def _write_oi(
self,
fqme: str,
oi: np.ndarray,
) -> Path:
'''
Sync version of the public interface meth, since we don't
currently actually need or support an async impl.
'''
path: Path = mk_oi_shm_keyed_filepath(
fqme=fqme,
period=1,
datadir=self._datadir,
)
if isinstance(oi, np.ndarray):
new_df: pl.DataFrame = tsp.np2pl(oi)
else:
new_df = oi
if path.exists():
old_df = pl.read_parquet(path)
df = pl.concat([old_df, new_df])
else:
df = new_df
start = time.time()
df.write_parquet(path)
delay: float = round(
time.time() - start,
ndigits=6,
)
log.info(
f'parquet write took {delay} secs\n'
f'file path: {path}'
)
return path
async def write_oi(
self,
fqme: str,
oi: np.ndarray,
) -> Path:
'''
Write input oi time series for fqme and sampling period
to (local) disk.
'''
return self._write_oi(
fqme,
oi,
)
async def delete_ts(
self,
key: str,

View File

@ -963,10 +963,7 @@ async def tsdb_backfill(
# concurrently load the provider's most-recent-frame AND any
# pre-existing tsdb history already saved in `piker` storage.
dt_eps: list[DateTime, DateTime] = []
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
async with trio.open_nursery() as tn:
tn.start_soon(
push_latest_frame,
dt_eps,
@ -1015,16 +1012,9 @@ async def tsdb_backfill(
int,
Duration,
]|None = config.get('frame_types', None)
if def_frame_durs:
def_frame_size: Duration = def_frame_durs[timeframe]
if def_frame_size != calced_frame_size:
log.warning(
f'Expected frame size {def_frame_size}\n'
f'Rxed frame {calced_frame_size}\n'
)
# await tractor.pause()
assert def_frame_size == calced_frame_size
else:
# use what we calced from first frame above.
def_frame_size = calced_frame_size
@ -1053,9 +1043,7 @@ async def tsdb_backfill(
# if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb
# continue that now in the background
async with trio.open_nursery(
strict_exception_groups=False,
) as tn:
async with trio.open_nursery() as tn:
bf_done = await tn.start(
partial(
@ -1320,7 +1308,6 @@ async def manage_history(
# sampling period) data set since normally differently
# sampled timeseries can be loaded / process independently
# ;)
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
):
log.info(

View File

@ -517,7 +517,7 @@ def with_dts(
'''
return df.with_columns([
pl.col(time_col).shift(1).name.suffix('_prev'),
pl.col(time_col).shift(1).suffix('_prev'),
pl.col(time_col).diff().alias('s_diff'),
pl.from_epoch(pl.col(time_col)).alias('dt'),
]).with_columns([
@ -623,7 +623,7 @@ def detect_vlm_gaps(
) -> pl.DataFrame:
vnull: pl.DataFrame = df.filter(
vnull: pl.DataFrame = w_dts.filter(
pl.col(col) == 0
)
return vnull

View File

@ -21,7 +21,6 @@ Main app startup and run.
from functools import partial
from types import ModuleType
import tractor
import trio
from piker.ui.qt import (
@ -117,7 +116,6 @@ async def _async_main(
needed_brokermods[brokername] = brokers[brokername]
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as root_n,
):
# set root nursery and task stack for spawning other charts/feeds

View File

@ -33,6 +33,7 @@ import trio
from piker.ui.qt import (
QtCore,
QtWidgets,
Qt,
QLineF,
QFrame,

View File

@ -1445,10 +1445,7 @@ async def display_symbol_data(
# for pause/resume on mouse interaction
rt_chart.feed = feed
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as ln,
):
async with trio.open_nursery() as ln:
# if available load volume related built-in display(s)
vlm_charts: dict[
str,

View File

@ -22,10 +22,7 @@ from contextlib import asynccontextmanager as acm
from typing import Callable
import trio
from tractor.trionics import (
gather_contexts,
collapse_eg,
)
from tractor.trionics import gather_contexts
from piker.ui.qt import (
QtCore,
@ -210,10 +207,7 @@ async def open_signal_handler(
async for args in recv:
await async_handler(*args)
async with (
collapse_eg(),
trio.open_nursery() as tn
):
async with trio.open_nursery() as tn:
tn.start_soon(proxy_to_handler)
async with send:
yield
@ -248,7 +242,6 @@ async def open_handlers(
widget: QWidget
streams: list[trio.abc.ReceiveChannel]
async with (
collapse_eg(),
trio.open_nursery() as tn,
gather_contexts([
open_event_stream(

View File

@ -18,11 +18,10 @@
Feed status and controls widget(s) for embedding in a UI-pane.
"""
from __future__ import annotations
from typing import (
Any,
TYPE_CHECKING,
)
from textwrap import dedent
from typing import TYPE_CHECKING
# from PyQt5.QtCore import Qt
@ -50,55 +49,35 @@ def mk_feed_label(
a feed control protocol.
'''
status: dict[str, Any] = feed.status
status = feed.status
assert status
# SO tips on ws/nls,
# https://stackoverflow.com/a/15721400
ws: str = '&nbsp;'
# nl: str = '<br>' # dun work?
actor_info_repr: str = (
f')> **{status["actor_short_id"]}**\n'
'\n' # bc md?
)
msg = dedent("""
actor: **{actor_name}**\n
|_ @**{host}:{port}**\n
""")
# fields to select *IN* for display
# (see `.data.feed.open_feed()` status
# update -> TAG_feed_status_update)
for key in [
'ipc',
'hist_shm',
'rt_shm',
'throttle_hz',
]:
# NOTE, the 2nd key is filled via `.format()` updates.
actor_info_repr += (
f'\n' # bc md?
f'{ws}|_{key}: **{{{key}}}**\n'
)
# ^TODO? formatting and content..
# -[ ] showing which fqme is "forward" on the
# chart/fsp/order-mode?
# '|_ flows: **{symbols}**\n'
#
# -[x] why isn't the indent working?
# => markdown, now solved..
for key, val in status.items():
if key in ('host', 'port', 'actor_name'):
continue
msg += f'\n|_ {key}: **{{{key}}}**\n'
feed_label = FormatLabel(
fmt_str=actor_info_repr,
fmt_str=msg,
# |_ streams: **{symbols}**\n
font=_font.font,
font_size=_font_small.px_size,
font_color='default_lightest',
)
# ?TODO, remove this?
# form.vbox.setAlignment(feed_label, Qt.AlignBottom)
# form.vbox.setAlignment(Qt.AlignBottom)
# _ = chart.height() - (
# form.height() +
# form.fill_bar.height()
# # feed_label.height()
# )
_ = chart.height() - (
form.height() +
form.fill_bar.height()
# feed_label.height()
)
feed_label.format(**feed.status)
return feed_label

View File

@ -600,7 +600,6 @@ async def open_fsp_admin(
kwargs=kwargs,
) as (cache_hit, cluster_map),
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
):
if cache_hit:
@ -614,8 +613,6 @@ async def open_fsp_admin(
)
try:
yield admin
# ??TODO, does this *need* to be inside a finally?
finally:
# terminate all tasks via signals
for key, entry in admin._registry.items():

View File

@ -285,20 +285,18 @@ class FormatLabel(QLabel):
font_size: int,
font_color: str,
use_md: bool = True,
parent=None,
) -> None:
super().__init__(parent)
# by default set the format string verbatim and expect user
# to call ``.format()`` later (presumably they'll notice the
# by default set the format string verbatim and expect user to
# call ``.format()`` later (presumably they'll notice the
# unformatted content if ``fmt_str`` isn't meant to be
# unformatted).
self.fmt_str = fmt_str
# self.setText(fmt_str) # ?TODO, why here?
self.setText(fmt_str)
self.setStyleSheet(
f"""QLabel {{
@ -308,10 +306,9 @@ class FormatLabel(QLabel):
"""
)
self.setFont(_font.font)
if use_md:
self.setTextFormat(
Qt.TextFormat.MarkdownText
)
self.setTextFormat(
Qt.TextFormat.MarkdownText
)
self.setMargin(0)
self.setSizePolicy(
@ -319,10 +316,7 @@ class FormatLabel(QLabel):
size_policy.Expanding,
)
self.setAlignment(
Qt.AlignLeft
|
Qt.AlignBottom
# Qt.AlignVCenter
Qt.AlignVCenter | Qt.AlignLeft
)
self.setText(self.fmt_str)

View File

@ -15,8 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
qompleterz: embeddable search and complete using trio, Qt and
rapidfuzz.
qompleterz: embeddable search and complete using trio, Qt and rapidfuzz.
"""
@ -47,7 +46,6 @@ import time
from pprint import pformat
from rapidfuzz import process as fuzzy
import tractor
import trio
from trio_typing import TaskStatus
@ -55,7 +53,7 @@ from piker.ui.qt import (
size_policy,
align_flag,
Qt,
# QtCore,
QtCore,
QtWidgets,
QModelIndex,
QItemSelectionModel,
@ -922,10 +920,7 @@ async def fill_results(
# issue multi-provider fan-out search request and place
# "searching.." statuses on outstanding results providers
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
async with trio.open_nursery() as n:
for provider, (search, pause) in (
_searcher_cache.copy().items()
@ -949,7 +944,7 @@ async def fill_results(
status_field='-> searchin..',
)
await tn.start(
await n.start(
pack_matches,
view,
has_results,
@ -1009,14 +1004,12 @@ async def handle_keyboard_input(
view.set_font_size(searchbar.dpi_font.px_size)
send, recv = trio.open_memory_channel(616)
async with (
tractor.trionics.collapse_eg(), # needed?
trio.open_nursery() as tn
):
async with trio.open_nursery() as n:
# start a background multi-searcher task which receives
# patterns relayed from this keyboard input handler and
# async updates the completer view's results.
tn.start_soon(
n.start_soon(
partial(
fill_results,
searchw,

View File

@ -269,8 +269,6 @@ def hcolor(name: str) -> str:
# default ohlc-bars/curve gray
'bracket': '#666666', # like the logo
'pikers': '#616161', # a trader shade of..
'beast': '#161616', # in the dark alone.
# bluish
'charcoal': '#36454F',

View File

@ -21,7 +21,6 @@ Chart trading, the only way to scalp.
from __future__ import annotations
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from decimal import Decimal
from functools import partial
from pprint import pformat
import time
@ -42,6 +41,7 @@ from piker.accounting import (
Position,
mk_allocator,
MktPair,
Symbol,
)
from piker.clearing import (
open_ems,
@ -143,15 +143,6 @@ class OrderMode:
}
_staged_order: Order | None = None
@property
def curr_mkt(self) -> MktPair:
'''
Deliver the currently selected `MktPair` according
chart state.
'''
return self.chart.linked.mkt
def on_level_change_update_next_order_info(
self,
level: float,
@ -181,11 +172,7 @@ class OrderMode:
line.update_labels(order_info)
# update bound-in staged order
mkt: MktPair = self.curr_mkt
order.price: Decimal = mkt.quantize(
size=level,
quantity_type='price',
)
order.price = level
order.size = order_info['size']
# when an order is changed we flip the settings side-pane to
@ -200,9 +187,7 @@ class OrderMode:
) -> LevelLine:
# TODO, if we instead just always decimalize at the ems layer
# we can avoid this back-n-forth casting?
level = float(order.price)
level = order.price
line = order_line(
chart or self.chart,
@ -239,11 +224,7 @@ class OrderMode:
# the order mode allocator but we still need to update the
# "staged" order message we'll send to the ems
def update_order_price(y: float) -> None:
mkt: MktPair = self.curr_mkt
order.price: Decimal = mkt.quantize(
size=y,
quantity_type='price',
)
order.price = y
line._on_level_change = update_order_price
@ -294,31 +275,34 @@ class OrderMode:
chart = cursor.linked.chart
if (
not chart
and
cursor
and
cursor.active_plot
and cursor
and cursor.active_plot
):
return
chart = cursor.active_plot
price: float = cursor._datum_xy[1]
price = cursor._datum_xy[1]
if not price:
# zero prices are not supported by any means
# since that's illogical / a no-op.
return
mkt: MktPair = self.chart.linked.mkt
# NOTE : we could also use instead,
# mkt.quantize(price, quantity_type='price')
# but it returns a Decimal and it's probably gonna
# be slower?
# TODO: should we be enforcing this precision
# at a different layer in the stack?
# |_ might require `MktPair` tracking in the EMS?
# |_ right now any precision error will be relayed
# all the way back from the backend and vice-versa..
#
mkt: MktPair = self.curr_mkt
price: Decimal = mkt.quantize(
size=price,
quantity_type='price',
# at a different layer in the stack? right now
# any precision error will literally be relayed
# all the way back from the backend.
price = round(
price,
ndigits=mkt.price_tick_digits,
)
order = self._staged_order = Order(
action=action,
price=price,
@ -394,7 +378,7 @@ class OrderMode:
'oid': oid,
})
if float(order.price) <= 0:
if order.price <= 0:
log.error(
'*!? Invalid `Order.price <= 0` ?!*\n'
# TODO: make this present multi-line in object form
@ -531,15 +515,14 @@ class OrderMode:
# if an order msg is provided update the line
# **from** that msg.
if order:
price: float = float(order.price)
if price <= 0:
if order.price <= 0:
log.error(f'Order has 0 price, cancelling..\n{order}')
self.cancel_orders([order.oid])
return None
line.set_level(price)
line.set_level(order.price)
self.on_level_change_update_next_order_info(
level=price,
level=order.price,
line=line,
order=order,
# use the corresponding position tracker for the
@ -698,9 +681,9 @@ class OrderMode:
) -> Dialog | None:
# NOTE: the `.order` attr **must** be set with the
# equivalent order msg in order to be loaded.
order: Order = msg.req
order = msg.req
oid = str(msg.oid)
symbol: str = order.symbol
symbol = order.symbol
# TODO: MEGA UGGG ZONEEEE!
src = msg.src
@ -719,22 +702,13 @@ class OrderMode:
order.oid = str(order.oid)
order.brokers = [brokername]
# ?TODO? change this over to `MktPair`, but it's gonna be
# tough since we don't have any such data really in our
# clearing msg schema..
# BUT WAIT! WHY do we even want/need this!?
#
# order.symbol = self.curr_mkt
#
# XXX, the old approach.. which i don't quire member why..
# -[ ] verify we for sure don't require this any more!
# |_https://github.com/pikers/piker/issues/517
#
# order.symbol = Symbol.from_fqme(
# fqsn=fqme,
# info={},
# )
# TODO: change this over to `MktPair`, but it's
# gonna be tough since we don't have any such data
# really in our clearing msg schema..
order.symbol = Symbol.from_fqme(
fqsn=fqme,
info={},
)
maybe_dialog: Dialog | None = self.submit_order(
send_msg=False,
order=order,
@ -792,7 +766,6 @@ async def open_order_mode(
brokerd_accounts,
ems_dialog_msgs,
),
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
):
@ -1128,7 +1101,7 @@ async def process_trade_msg(
)
)
):
msg.req: Order = order
msg.req = order
dialog: (
Dialog
# NOTE: on an invalid order submission (eg.
@ -1193,7 +1166,7 @@ async def process_trade_msg(
tm = time.time()
mode.on_fill(
oid,
price=float(req.price),
price=req.price,
time_s=tm,
)
mode.lines.remove_line(uuid=oid)
@ -1248,7 +1221,7 @@ async def process_trade_msg(
tm = details['broker_time']
mode.on_fill(
oid,
price=float(details['price']),
price=details['price'],
time_s=tm,
pointing='up' if action == 'buy' else 'down',
)

View File

@ -23,7 +23,7 @@ name = "piker"
version = "0.1.0a0dev0"
description = "trading gear for hackers"
authors = [{ name = "Tyler Goodlet", email = "goodboy_foss@protonmail.com" }]
requires-python = ">=3.12"
requires-python = ">=3.12, <3.13"
license = "AGPL-3.0-or-later"
readme = "README.rst"
keywords = [
@ -39,8 +39,8 @@ classifiers = [
"Operating System :: POSIX :: Linux",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Intended Audience :: Financial and Insurance Industry",
"Intended Audience :: Science/Research",
"Intended Audience :: Developers",
@ -49,13 +49,13 @@ classifiers = [
dependencies = [
"async-generator >=1.10, <2.0.0",
"attrs >=23.1.0, <24.0.0",
"bidict >=0.23.1",
"bidict >=0.22.1, <0.23.0",
"colorama >=0.4.6, <0.5.0",
"colorlog >=6.7.0, <7.0.0",
"ib-insync >=0.9.86, <0.10.0",
"numpy>=2.0",
"polars >=0.20.6",
"polars-fuzzy-match>=0.1.5",
"numba >=0.59.0, <0.60.0",
"numpy >=1.25, <2.0",
"polars >=0.18.13, <0.19.0",
"pygments >=2.16.1, <3.0.0",
"rich >=13.5.2, <14.0.0",
"tomli >=2.0.1, <3.0.0",
@ -65,18 +65,16 @@ dependencies = [
"typer >=0.9.0, <1.0.0",
"rapidfuzz >=3.5.2, <4.0.0",
"pdbp >=1.5.0, <2.0.0",
"trio >=0.27",
"trio >=0.24, <0.25",
"pendulum >=3.0.0, <4.0.0",
"httpx >=0.27.0, <0.28.0",
"cryptofeed >=2.4.0, <3.0.0",
"pyarrow>=18.0.0",
"pyarrow >=17.0.0, <18.0.0",
"websockets ==12.0",
"msgspec>=0.19.0,<0.20",
"msgspec",
"tractor",
"asyncvnc",
"tomlkit",
"trio-typing>=0.10.0",
"numba>=0.61.0",
]
[project.optional-dependencies]
@ -109,13 +107,11 @@ uis = [
dev = [
"pytest >=6.0.0, <7.0.0",
"elasticsearch >=8.9.0, <9.0.0",
'xonsh',
"xonsh >=0.14.2, <0.15.0",
"prompt-toolkit ==3.0.40",
"cython >=3.0.0, <4.0.0",
"greenback >=1.1.1, <2.0.0",
"ruff>=0.9.6",
"pyperclip>=1.9.0",
"i3ipc>=2.2.1",
]
[project.scripts]
@ -129,24 +125,9 @@ include = ["piker"]
[tool.hatch.build.targets.wheel]
include = ["piker"]
# TODO? move to a `uv.toml`?
[tool.uv]
python-preference = 'system'
python-downloads = 'manual'
[tool.uv.sources]
pyqtgraph = { git = "https://github.com/pikers/pyqtgraph.git" }
asyncvnc = { git = "https://github.com/pikers/asyncvnc.git", branch = "main" }
tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" }
# TODO, long term we should be synced to upstream `main` branch!
# tractor = { git = "https://github.com/goodboy/tractor.git", branch ="piker_pin" }
tractor = { git = "https://pikers.dev/goodboy/tractor", branch = "piker_pin" }
# goodboy's dev-env
# XXX for @goodboy's hackin dev env, usually there's something new in
# the runtime being seriously tested here Bp
# tractor = { path = "../tractor/", editable = true }
# xonsh = { path = "../xonsh", editable = true }
msgspec = { git = "https://github.com/jcrist/msgspec.git" }
tractor = { path = "../tractor", editable = true }

View File

@ -62,9 +62,8 @@ ignore-init-module-imports = false
fixable = ["ALL"]
unfixable = []
# TODO? uhh why no work!?
# Allow unused variables when underscore-prefixed.
# dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
[format]
# Use single quotes in `ruff format`.

View File

@ -1,22 +1,4 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
A per-display, DPI (scaling) info dumper.
"""
Resource list for mucking with DPIs on multiple screens:
- https://stackoverflow.com/questions/42141354/convert-pixel-size-to-point-size-for-fonts-on-multiple-platforms
@ -30,86 +12,89 @@ Resource list for mucking with DPIs on multiple screens:
- https://stackoverflow.com/questions/16561879/what-is-the-difference-between-logicaldpix-and-physicaldpix-in-qt
- https://doc.qt.io/qt-5/qguiapplication.html#screenAt
'''
"""
from pyqtgraph import QtGui
from PyQt6 import (
QtCore,
QtWidgets,
)
from PyQt6.QtCore import (
Qt,
QCoreApplication,
QSize,
QRect,
from PyQt5.QtCore import (
Qt, QCoreApplication
)
# Proper high DPI scaling is available in Qt >= 5.6.0. This attibute
# must be set before creating the application
if hasattr(Qt, 'AA_EnableHighDpiScaling'):
QCoreApplication.setAttribute(
Qt.AA_EnableHighDpiScaling,
True,
)
QCoreApplication.setAttribute(Qt.AA_EnableHighDpiScaling, True)
if hasattr(Qt, 'AA_UseHighDpiPixmaps'):
QCoreApplication.setAttribute(
Qt.AA_UseHighDpiPixmaps,
True,
)
QCoreApplication.setAttribute(Qt.AA_UseHighDpiPixmaps, True)
app = QtWidgets.QApplication([])
window = QtWidgets.QMainWindow()
main_widget = QtWidgets.QWidget()
app = QtGui.QApplication([])
window = QtGui.QMainWindow()
main_widget = QtGui.QWidget()
window.setCentralWidget(main_widget)
window.show()
pxr: float = main_widget.devicePixelRatioF()
pxr = main_widget.devicePixelRatioF()
# explicitly get main widget and primary displays
current_screen: QtGui.QScreen = app.screenAt(
main_widget.geometry().center()
# screen_num = app.desktop().screenNumber()
# screen = app.screens()[screen_num]
screen = app.screenAt(main_widget.geometry().center())
name = screen.name()
size = screen.size()
geo = screen.availableGeometry()
phydpi = screen.physicalDotsPerInch()
logdpi = screen.logicalDotsPerInch()
print(
# f'screen number: {screen_num}\n',
f'screen name: {name}\n'
f'screen size: {size}\n'
f'screen geometry: {geo}\n\n'
f'devicePixelRationF(): {pxr}\n'
f'physical dpi: {phydpi}\n'
f'logical dpi: {logdpi}\n'
)
primary_screen: QtGui.QScreen = app.primaryScreen()
screen: QtGui.QScreen
for screen in app.screens():
name: str = screen.name()
model: str = screen.model().rstrip()
size: QSize = screen.size()
geo: QRect = screen.availableGeometry()
phydpi: float = screen.physicalDotsPerInch()
logdpi: float = screen.logicalDotsPerInch()
is_primary: bool = screen is primary_screen
is_current: bool = screen is current_screen
print('-'*50)
print(
f'------ screen name: {name} ------\n'
f'|_primary: {is_primary}\n'
f' _current: {is_current}\n'
f' _model: {model}\n'
f' _screen size: {size}\n'
f' _screen geometry: {geo}\n'
f' _devicePixelRationF(): {pxr}\n'
f' _physical dpi: {phydpi}\n'
f' _logical dpi: {logdpi}\n'
)
screen = app.primaryScreen()
# app-wide font info
name = screen.name()
size = screen.size()
geo = screen.availableGeometry()
phydpi = screen.physicalDotsPerInch()
logdpi = screen.logicalDotsPerInch()
print(
# f'screen number: {screen_num}\n',
f'screen name: {name}\n'
f'screen size: {size}\n'
f'screen geometry: {geo}\n\n'
f'devicePixelRationF(): {pxr}\n'
f'physical dpi: {phydpi}\n'
f'logical dpi: {logdpi}\n'
)
# app-wide font
font = QtGui.QFont("Hack")
# use pixel size to be cross-resolution compatible?
font.setPixelSize(6)
fm = QtGui.QFontMetrics(font)
fontdpi: float = fm.fontDpi()
font_h: int = fm.height()
string: str = '10000'
str_br: QtCore.QRect = fm.boundingRect(string)
str_w: int = str_br.width()
fm = QtGui.QFontMetrics(font)
fontdpi = fm.fontDpi()
font_h = fm.height()
string = '10000'
str_br = fm.boundingRect(string)
str_w = str_br.width()
print(
f'------ global font settings ------\n'
# f'screen number: {screen_num}\n',
f'font dpi: {fontdpi}\n'
f'font height: {font_h}\n'
f'string bounding rect: {str_br}\n'

1
tags
View File

@ -1 +0,0 @@
TAG_feed_status_update ./piker/data/feed.py /TAG_feed_status_update/

View File

@ -179,7 +179,7 @@ def test_ems_err_on_bad_broker(
# NOTE: emsd should error on the actor's enabled modules
# import phase, when looking for a backend named `doggy`.
except tractor.RemoteActorError as re:
assert re.type is ModuleNotFoundError
assert re.type == ModuleNotFoundError
run_and_tollerate_cancels(load_bad_fqme)

View File

@ -142,12 +142,7 @@ async def test_concurrent_tokens_refresh(us_symbols, loglevel):
# async with tractor.open_nursery() as n:
# await n.run_in_actor('other', intermittently_refresh_tokens)
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery(
# strict_exception_groups=False,
) as n
):
async with trio.open_nursery() as n:
quoter = await qt.stock_quoter(client, us_symbols)
@ -388,9 +383,7 @@ async def test_quote_streaming(tmx_symbols, loglevel, stream_what):
else:
symbols = [tmx_symbols]
async with trio.open_nursery(
strict_exception_groups=False,
) as n:
async with trio.open_nursery() as n:
for syms, func in zip(symbols, stream_what):
n.start_soon(func, feed, syms)

1364
uv.lock

File diff suppressed because it is too large Load Diff