Compare commits
60 Commits
gitea_feat
...
alt_tpts_f
Author | SHA1 | Date |
---|---|---|
|
390a57c96d | |
|
69eac7bb15 | |
|
a45de0b710 | |
|
9df1988aa6 | |
|
f7caa75228 | |
|
e9613e46f6 | |
|
6637ca9e4f | |
|
7e139e6a8e | |
|
c2d9283db4 | |
|
28ba1392bb | |
|
f50202a6af | |
|
baff466ee0 | |
|
b01edcf65a | |
|
2545def7bb | |
|
1b74417688 | |
|
4d4f5d0af5 | |
|
7e82bf0729 | |
|
f1b4550483 | |
|
bdaf74a19a | |
|
b87ca76700 | |
|
94caa248e7 | |
|
da953b6b0c | |
|
fb8375f608 | |
|
d5faf4f59d | |
|
df5e72f7ae | |
|
bf33cb93b1 | |
|
d655e81290 | |
|
bc72e3d206 | |
|
35cb538a69 | |
|
8a768af5bb | |
|
8b0fac3b6c | |
|
36cc0cf750 | |
|
3ff0a86741 | |
|
705f0e86ac | |
|
2a24d1d50c | |
|
84ad34f51e | |
|
cbbf674737 | |
|
ec71dc2018 | |
|
17aebf44a9 | |
|
5f347c9f6a | |
|
cdb41e4881 | |
|
289b63bb2a | |
|
8f1e082c91 | |
|
b9321dbb49 | |
|
21d051b05f | |
|
3118d0f140 | |
|
4278d8e2f1 | |
|
b209512eb6 | |
|
8a9d21468a | |
|
75ddba09f7 | |
|
dae17bb043 | |
|
8bd0a182cf | |
|
04421e5ad2 | |
|
1e0c3da32d | |
|
5b87b3c2a6 | |
|
438e69e42c | |
|
ec6dd7cafc | |
|
f1436c93db | |
|
1061103f76 | |
|
3aea296caa |
31
README.rst
31
README.rst
|
@ -88,7 +88,23 @@ a sane install with `uv`
|
|||
************************
|
||||
bc why install with `python` when you can faster with `rust` ::
|
||||
|
||||
uv lock
|
||||
uv sync
|
||||
|
||||
# ^ astral's docs,
|
||||
# https://docs.astral.sh/uv/concepts/projects/sync/
|
||||
|
||||
include all GUIs ::
|
||||
|
||||
uv sync --extra uis
|
||||
|
||||
AND with all our hacking tools::
|
||||
|
||||
uv sync --dev --extra uis
|
||||
|
||||
|
||||
Ensure you can run the root-daemon::
|
||||
|
||||
uv run pikerd [-l info --pdb]
|
||||
|
||||
|
||||
hacky install on nixos
|
||||
|
@ -103,7 +119,18 @@ start a chart
|
|||
*************
|
||||
run a realtime OHLCV chart stand-alone::
|
||||
|
||||
piker -l info chart btcusdt.spot.binance xmrusdt.spot.kraken
|
||||
[uv run] piker -l info chart btcusdt.spot.binance xmrusdt.spot.kraken
|
||||
|
||||
# ^^^ iff you haven't activated the py-env,
|
||||
# - https://docs.astral.sh/uv/concepts/projects/run/
|
||||
#
|
||||
# in order to create an explicit virt-env see,
|
||||
# - https://docs.astral.sh/uv/concepts/projects/layout/#the-project-environment
|
||||
# - https://docs.astral.sh/uv/pip/environments/
|
||||
#
|
||||
# use $UV_PROJECT_ENVIRONMENT to select any non-`.venv/`
|
||||
# as the venv sudir in the repo's root.
|
||||
# - https://docs.astral.sh/uv/reference/environment/#uv_project_environment
|
||||
|
||||
this runs a chart UI (with 1m sampled OHLCV) and shows 2 spot markets from 2 diff cexes
|
||||
overlayed on the same graph. Use of `piker` without first starting
|
||||
|
|
|
@ -121,6 +121,7 @@ async def bot_main():
|
|||
# tick_throttle=10,
|
||||
) as feed,
|
||||
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
assert accounts
|
||||
|
|
|
@ -0,0 +1,338 @@
|
|||
#!/usr/bin/env python
|
||||
from decimal import (
|
||||
Decimal,
|
||||
)
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
# import polars as pl
|
||||
import trio
|
||||
import tractor
|
||||
from datetime import datetime
|
||||
# from pprint import pformat
|
||||
from piker.brokers.deribit.api import (
|
||||
get_client,
|
||||
maybe_open_oi_feed,
|
||||
)
|
||||
from piker.storage import open_storage_client, StorageClient
|
||||
from piker.log import get_logger
|
||||
import sys
|
||||
import pyqtgraph as pg
|
||||
from PyQt6 import QtCore
|
||||
from pyqtgraph import ScatterPlotItem, InfiniteLine
|
||||
from PyQt6.QtWidgets import QApplication
|
||||
from cryptofeed.symbols import Symbol
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
# XXX, use 2 newlines between top level LOC (even between these
|
||||
# imports and the next function line ;)
|
||||
|
||||
def check_if_complete(
|
||||
oi: dict[str, dict[str, Decimal | None]]
|
||||
) -> bool:
|
||||
return all(
|
||||
oi[strike]['C'] is not None
|
||||
and
|
||||
oi[strike]['P'] is not None for strike in oi
|
||||
)
|
||||
|
||||
|
||||
async def max_pain_daemon(
|
||||
) -> None:
|
||||
oi_by_strikes: dict[str, dict[str, Decimal | None]]
|
||||
instruments: list[Symbol] = []
|
||||
expiry_dates: list[str]
|
||||
expiry_date: str
|
||||
currency: str = 'btc'
|
||||
kind: str = 'option'
|
||||
|
||||
async with get_client(
|
||||
) as client:
|
||||
expiry_dates: list[str] = await client.get_expiration_dates(
|
||||
currency=currency,
|
||||
kind=kind
|
||||
)
|
||||
|
||||
log.info(
|
||||
f'Available expiries for {currency!r}-{kind}:\n'
|
||||
f'{expiry_dates}\n'
|
||||
)
|
||||
expiry_date: str = input(
|
||||
'Please enter a valid expiration date: '
|
||||
).upper()
|
||||
print('Starting little daemon...')
|
||||
|
||||
# maybe move this type annot down to the assignment line?
|
||||
oi_by_strikes: dict[str, dict[str, Decimal]]
|
||||
instruments = await client.get_instruments(
|
||||
expiry_date=expiry_date,
|
||||
)
|
||||
oi_by_strikes = client.get_strikes_dict(instruments)
|
||||
|
||||
|
||||
def get_total_intrinsic_values(
|
||||
oi_by_strikes: dict[str, dict[str, Decimal]]
|
||||
) -> dict[str, dict[str, Decimal]]:
|
||||
call_cash: Decimal = Decimal(0)
|
||||
put_cash: Decimal = Decimal(0)
|
||||
intrinsic_values: dict[str, dict[str, Decimal]] = {}
|
||||
closes: list = sorted(Decimal(close) for close in oi_by_strikes)
|
||||
|
||||
for strike, oi in oi_by_strikes.items():
|
||||
s = Decimal(strike)
|
||||
call_cash = sum(max(0, (s - c) * oi_by_strikes[str(c)]['C']) for c in closes)
|
||||
put_cash = sum(max(0, (c - s) * oi_by_strikes[str(c)]['P']) for c in closes)
|
||||
|
||||
intrinsic_values[strike] = {
|
||||
'C': call_cash,
|
||||
'P': put_cash,
|
||||
'total': call_cash + put_cash,
|
||||
}
|
||||
|
||||
return intrinsic_values
|
||||
|
||||
def get_intrinsic_value_and_max_pain(
|
||||
intrinsic_values: dict[str, dict[str, Decimal]]
|
||||
):
|
||||
# We meed to find the lowest value, so we start at
|
||||
# infinity to ensure that, and the max_pain must be
|
||||
# an amount greater than zero.
|
||||
total_intrinsic_value: Decimal = Decimal('Infinity')
|
||||
max_pain: Decimal = Decimal(0)
|
||||
|
||||
for strike, oi in oi_by_strikes.items():
|
||||
s = Decimal(strike)
|
||||
if intrinsic_values[strike]['total'] < total_intrinsic_value:
|
||||
total_intrinsic_value = intrinsic_values[strike]['total']
|
||||
max_pain = s
|
||||
|
||||
return total_intrinsic_value, max_pain
|
||||
|
||||
def plot_graph(
|
||||
oi_by_strikes: dict[str, dict[str, Decimal]],
|
||||
plot,
|
||||
):
|
||||
"""Update the bar graph with new open interest data."""
|
||||
plot.clear()
|
||||
|
||||
intrinsic_values = get_total_intrinsic_values(oi_by_strikes)
|
||||
|
||||
for strike_str in sorted(oi_by_strikes, key=lambda x: int(x)):
|
||||
strike = int(strike_str)
|
||||
calls_val = float(oi_by_strikes[strike_str]['C'])
|
||||
puts_val = float(oi_by_strikes[strike_str]['P'])
|
||||
|
||||
bar_c = pg.BarGraphItem(
|
||||
x=[strike - 100],
|
||||
height=[calls_val],
|
||||
width=200,
|
||||
pen='w',
|
||||
brush=(0, 0, 255, 150)
|
||||
)
|
||||
plot.addItem(bar_c)
|
||||
|
||||
bar_p = pg.BarGraphItem(
|
||||
x=[strike + 100],
|
||||
height=[puts_val],
|
||||
width=200,
|
||||
pen='w',
|
||||
brush=(255, 0, 0, 150)
|
||||
)
|
||||
plot.addItem(bar_p)
|
||||
|
||||
total_val = float(intrinsic_values[strike_str]['total']) / 100000
|
||||
|
||||
scatter_iv = ScatterPlotItem(
|
||||
x=[strike],
|
||||
y=[total_val],
|
||||
pen=pg.mkPen(color=(0, 255, 0), width=2),
|
||||
brush=pg.mkBrush(0, 255, 0, 150),
|
||||
size=3,
|
||||
symbol='o'
|
||||
)
|
||||
plot.addItem(scatter_iv)
|
||||
|
||||
_, max_pain = get_intrinsic_value_and_max_pain(intrinsic_values)
|
||||
|
||||
vertical_line = InfiniteLine(
|
||||
pos=max_pain,
|
||||
angle=90,
|
||||
pen=pg.mkPen(color='yellow', width=1, style=QtCore.Qt.PenStyle.DotLine),
|
||||
label=f'Max pain: {max_pain:,.0f}',
|
||||
labelOpts={
|
||||
'position': 0.85,
|
||||
'color': 'yellow',
|
||||
'movable': True
|
||||
}
|
||||
)
|
||||
plot.addItem(vertical_line)
|
||||
|
||||
def update_oi_by_strikes(msg: tuple):
|
||||
nonlocal oi_by_strikes
|
||||
if 'oi' == msg[0]:
|
||||
strike_price = msg[1]['strike_price']
|
||||
option_type = msg[1]['option_type']
|
||||
open_interest = msg[1]['open_interest']
|
||||
oi_by_strikes.setdefault(
|
||||
strike_price, {}
|
||||
).update(
|
||||
{option_type: open_interest}
|
||||
)
|
||||
|
||||
# Define the structured dtype
|
||||
dtype = np.dtype([
|
||||
('time', int),
|
||||
('oi', float),
|
||||
('oi_calc', float),
|
||||
])
|
||||
async def write_open_interest_on_file(msg: tuple, client: StorageClient):
|
||||
if 'oi' == msg[0]:
|
||||
nonlocal expiry_date
|
||||
timestamp = msg[1]['timestamp']
|
||||
strike_price = msg[1]["strike_price"]
|
||||
option_type = msg[1]['option_type'].lower()
|
||||
col_sym_key = f'btc-{expiry_date.lower()}-{strike_price}-{option_type}'
|
||||
|
||||
# Create the numpy array with sample data
|
||||
data = np.array([
|
||||
(
|
||||
int(timestamp),
|
||||
float(msg[1]['open_interest']),
|
||||
np.nan,
|
||||
),
|
||||
], dtype=dtype)
|
||||
|
||||
path: Path = await client.write_oi(
|
||||
col_sym_key,
|
||||
data,
|
||||
)
|
||||
# TODO, use std logging like this throughout for status
|
||||
# emissions on console!
|
||||
log.info(f'Wrote OI history to {path}')
|
||||
|
||||
def get_max_pain(
|
||||
oi_by_strikes: dict[str, dict[str, Decimal]]
|
||||
) -> dict[str, str | Decimal]:
|
||||
'''
|
||||
This method requires only the strike_prices and oi for call
|
||||
and puts, the closes list are the same as the strike_prices
|
||||
the idea is to sum all the calls and puts cash for each strike
|
||||
and the ITM strikes from that strike, the lowest value is what we
|
||||
are looking for the intrinsic value.
|
||||
|
||||
'''
|
||||
|
||||
nonlocal timestamp
|
||||
|
||||
intrinsic_values = get_total_intrinsic_values(oi_by_strikes)
|
||||
|
||||
total_intrinsic_value, max_pain = get_intrinsic_value_and_max_pain(intrinsic_values)
|
||||
|
||||
return {
|
||||
'timestamp': timestamp,
|
||||
'expiry_date': expiry_date,
|
||||
'total_intrinsic_value': total_intrinsic_value,
|
||||
'max_pain': max_pain,
|
||||
}
|
||||
|
||||
async with (
|
||||
open_storage_client() as (_, storage),
|
||||
|
||||
maybe_open_oi_feed(
|
||||
instruments,
|
||||
) as oi_feed,
|
||||
):
|
||||
# Initialize QApplication
|
||||
app = QApplication(sys.argv)
|
||||
|
||||
win = pg.GraphicsLayoutWidget(show=True)
|
||||
win.setWindowTitle('Calls (blue) vs Puts (red)')
|
||||
|
||||
plot = win.addPlot(title='OI by Strikes')
|
||||
plot.showGrid(x=True, y=True)
|
||||
print('Plot initialized...')
|
||||
|
||||
async for msg in oi_feed:
|
||||
|
||||
# In memory oi_by_strikes dict, all message are filtered here
|
||||
# and the dict is updated with the open interest data
|
||||
update_oi_by_strikes(msg)
|
||||
|
||||
# Write on file using storage client
|
||||
await write_open_interest_on_file(msg, storage)
|
||||
|
||||
# Max pain calcs, before start we must gather all the open interest for
|
||||
# all the strike prices and option types available for a expiration date
|
||||
if check_if_complete(oi_by_strikes):
|
||||
if 'oi' == msg[0]:
|
||||
# Here we must read for the filesystem all the latest open interest value for
|
||||
# each instrument for that specific expiration date, that means look up for the
|
||||
# last update got the instrument btc-{expity_date}-*oi1s.parquet (1s because is
|
||||
# hardcoded to something, sorry.)
|
||||
timestamp = msg[1]['timestamp']
|
||||
max_pain = get_max_pain(oi_by_strikes)
|
||||
# intrinsic_values = get_total_intrinsic_values(oi_by_strikes)
|
||||
|
||||
# graph here
|
||||
plot_graph(oi_by_strikes, plot)
|
||||
|
||||
# TODO, use a single multiline string with `()`
|
||||
# and drop the multiple `print()` calls (this
|
||||
# should be done elsewhere in this file as well!
|
||||
#
|
||||
# As per the docs,
|
||||
# https://docs.python.org/3/reference/lexical_analysis.html#string-literal-concatenation
|
||||
# you could instead do,
|
||||
# print(
|
||||
# '-----------------------------------------------\n'
|
||||
# f'timestamp: {datetime.fromtimestamp(max_pain['timestamp'])}\n'
|
||||
# )
|
||||
# WHY?
|
||||
# |_ less ctx-switches/calls to `print()`
|
||||
# |_ the `str` can then be modified / passed
|
||||
# around as a variable more easily if needed in
|
||||
# the future ;)
|
||||
#
|
||||
# ALSO, i believe there already is a stdlib
|
||||
# module to do "alignment" of text which you
|
||||
# could try for doing the right-side alignment,
|
||||
# https://docs.python.org/3/library/textwrap.html#textwrap.indent
|
||||
#
|
||||
print('-----------------------------------------------')
|
||||
print(f'timestamp: {datetime.fromtimestamp(max_pain['timestamp'])}')
|
||||
print(f'expiry_date: {max_pain['expiry_date']}')
|
||||
print(f'max_pain: {max_pain['max_pain']:,.0f}')
|
||||
print(f'total intrinsic value: {max_pain['total_intrinsic_value']:,.0f}')
|
||||
print('-----------------------------------------------')
|
||||
|
||||
# Process GUI events to keep the window responsive
|
||||
app.processEvents()
|
||||
|
||||
|
||||
async def main():
|
||||
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=True,
|
||||
loglevel='info',
|
||||
) as an:
|
||||
from tractor import log
|
||||
log.get_console_log(level='info')
|
||||
|
||||
ptl: tractor.Portal = await an.start_actor(
|
||||
'max_pain_daemon',
|
||||
enable_modules=[__name__],
|
||||
infect_asyncio=True,
|
||||
# ^TODO, we can actually run this in the root-actor now
|
||||
# if needed as per 2nd "section" in,
|
||||
# https://pikers.dev/goodboy/tractor/pulls/2
|
||||
#
|
||||
# NOTE, will first require us porting to modern
|
||||
# `tractor:main` though ofc!
|
||||
|
||||
)
|
||||
await ptl.run(max_pain_daemon)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
trio.run(main)
|
|
@ -0,0 +1,29 @@
|
|||
## Max Pain Calculation for Deribit Options
|
||||
|
||||
This feature, which calculates the max pain point for options traded
|
||||
on the Deribit exchange using cryptofeed library.
|
||||
|
||||
- Functions in the api module for fetching options data from Deribit.
|
||||
[commit](https://pikers.dev/pikers/piker/commit/da55856dd2876291f55a06eb0561438a912d8241)
|
||||
|
||||
- Compute the max pain point based on open interest data using
|
||||
deribit's api.
|
||||
[commit](https://pikers.dev/pikers/piker/commit/0d9d6e15ba0edeb662ec97f7599dd66af3046b94)
|
||||
|
||||
### How to test it?
|
||||
|
||||
**Before start:** in order to get this working with `uv`, you
|
||||
**must** use my [`tractor` fork](https://pikers.dev/ntorres/tractor/src/branch/aio_abandons)
|
||||
and this branch: `aio_abandons`, the reason is that I cherry-pick the
|
||||
`uv_migration` that guille made, for some reason that a didn't dive
|
||||
into, in my system y need tractor using `uv` too. quite hacky
|
||||
I guess.
|
||||
|
||||
1. `uv lock`
|
||||
|
||||
2. `uv run --no-dev python examples/max_pain.py`
|
||||
|
||||
3. A message should be display, enter one of the expiration date
|
||||
available.
|
||||
|
||||
4. The script should be up and running.
|
|
@ -42,7 +42,6 @@ from ._mktinfo import (
|
|||
dec_digits,
|
||||
digits_to_dec,
|
||||
MktPair,
|
||||
Symbol,
|
||||
unpack_fqme,
|
||||
_derivs as DerivTypes,
|
||||
)
|
||||
|
@ -60,7 +59,6 @@ __all__ = [
|
|||
'Asset',
|
||||
'MktPair',
|
||||
'Position',
|
||||
'Symbol',
|
||||
'Transaction',
|
||||
'TransactionLedger',
|
||||
'dec_digits',
|
||||
|
|
|
@ -677,90 +677,3 @@ def unpack_fqme(
|
|||
# '.'.join([mkt_ep, venue]),
|
||||
suffix,
|
||||
)
|
||||
|
||||
|
||||
class Symbol(Struct):
|
||||
'''
|
||||
I guess this is some kinda container thing for dealing with
|
||||
all the different meta-data formats from brokers?
|
||||
|
||||
'''
|
||||
key: str
|
||||
|
||||
broker: str = ''
|
||||
venue: str = ''
|
||||
|
||||
# precision descriptors for price and vlm
|
||||
tick_size: Decimal = Decimal('0.01')
|
||||
lot_tick_size: Decimal = Decimal('0.0')
|
||||
|
||||
suffix: str = ''
|
||||
broker_info: dict[str, dict[str, Any]] = {}
|
||||
|
||||
@classmethod
|
||||
def from_fqme(
|
||||
cls,
|
||||
fqsn: str,
|
||||
info: dict[str, Any],
|
||||
|
||||
) -> Symbol:
|
||||
broker, mktep, venue, suffix = unpack_fqme(fqsn)
|
||||
tick_size = info.get('price_tick_size', 0.01)
|
||||
lot_size = info.get('lot_tick_size', 0.0)
|
||||
|
||||
return Symbol(
|
||||
broker=broker,
|
||||
key=mktep,
|
||||
tick_size=tick_size,
|
||||
lot_tick_size=lot_size,
|
||||
venue=venue,
|
||||
suffix=suffix,
|
||||
broker_info={broker: info},
|
||||
)
|
||||
|
||||
@property
|
||||
def type_key(self) -> str:
|
||||
return list(self.broker_info.values())[0]['asset_type']
|
||||
|
||||
@property
|
||||
def tick_size_digits(self) -> int:
|
||||
return float_digits(self.tick_size)
|
||||
|
||||
@property
|
||||
def lot_size_digits(self) -> int:
|
||||
return float_digits(self.lot_tick_size)
|
||||
|
||||
@property
|
||||
def price_tick(self) -> Decimal:
|
||||
return Decimal(str(self.tick_size))
|
||||
|
||||
@property
|
||||
def size_tick(self) -> Decimal:
|
||||
return Decimal(str(self.lot_tick_size))
|
||||
|
||||
@property
|
||||
def broker(self) -> str:
|
||||
return list(self.broker_info.keys())[0]
|
||||
|
||||
@property
|
||||
def fqme(self) -> str:
|
||||
return maybe_cons_tokens([
|
||||
self.key, # final "pair name" (eg. qqq[/usd], btcusdt)
|
||||
self.venue,
|
||||
self.suffix, # includes expiry and other con info
|
||||
self.broker,
|
||||
])
|
||||
|
||||
def quantize(
|
||||
self,
|
||||
size: float,
|
||||
) -> Decimal:
|
||||
digits = float_digits(self.lot_tick_size)
|
||||
return Decimal(size).quantize(
|
||||
Decimal(f'1.{"0".ljust(digits, "0")}'),
|
||||
rounding=ROUND_HALF_EVEN
|
||||
)
|
||||
|
||||
# NOTE: when cast to `str` return fqme
|
||||
def __str__(self) -> str:
|
||||
return self.fqme
|
||||
|
|
|
@ -362,7 +362,11 @@ class Position(Struct):
|
|||
# added: bool = False
|
||||
tid: str = t.tid
|
||||
if tid in self._events:
|
||||
log.warning(f'{t} is already added?!')
|
||||
log.debug(
|
||||
f'Txn is already added?\n'
|
||||
f'\n'
|
||||
f'{t}\n'
|
||||
)
|
||||
# return added
|
||||
|
||||
# TODO: apparently this IS possible with a dict but not
|
||||
|
@ -696,7 +700,7 @@ class Account(Struct):
|
|||
else:
|
||||
# TODO: we reallly need a diff set of
|
||||
# loglevels/colors per subsys.
|
||||
log.warning(
|
||||
log.debug(
|
||||
f'Recent position for {fqme} was closed!'
|
||||
)
|
||||
|
||||
|
|
|
@ -22,7 +22,9 @@ you know when you're losing money (if possible) XD
|
|||
from __future__ import annotations
|
||||
from collections.abc import ValuesView
|
||||
from contextlib import contextmanager as cm
|
||||
from functools import partial
|
||||
from math import copysign
|
||||
from pprint import pformat
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
|
@ -37,12 +39,16 @@ from pendulum import (
|
|||
parse,
|
||||
)
|
||||
|
||||
from ..log import get_logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ._ledger import (
|
||||
Transaction,
|
||||
TransactionLedger,
|
||||
)
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
def ppu(
|
||||
clears: Iterator[Transaction],
|
||||
|
@ -238,6 +244,9 @@ def iter_by_dt(
|
|||
|
||||
def dyn_parse_to_dt(
|
||||
tx: tuple[str, dict[str, Any]] | Transaction,
|
||||
|
||||
debug: bool = False,
|
||||
_invalid: list|None = None,
|
||||
) -> DateTime:
|
||||
|
||||
# handle `.items()` inputs
|
||||
|
@ -250,11 +259,16 @@ def iter_by_dt(
|
|||
# get best parser for this record..
|
||||
for k in parsers:
|
||||
if (
|
||||
isdict and k in tx
|
||||
or getattr(tx, k, None)
|
||||
(v := getattr(tx, k, None))
|
||||
or
|
||||
(
|
||||
isdict
|
||||
and
|
||||
(v := tx.get(k))
|
||||
)
|
||||
):
|
||||
v = tx[k] if isdict else tx.dt
|
||||
assert v is not None, f'No valid value for `{k}`!?'
|
||||
# TODO? remove yah?
|
||||
# v = tx[k] if isdict else tx.dt
|
||||
|
||||
# only call parser on the value if not None from
|
||||
# the `parsers` table above (when NOT using
|
||||
|
@ -262,21 +276,54 @@ def iter_by_dt(
|
|||
# sort on it directly
|
||||
if (
|
||||
not isinstance(v, DateTime)
|
||||
and (parser := parsers.get(k))
|
||||
and
|
||||
(parser := parsers.get(k))
|
||||
):
|
||||
return parser(v)
|
||||
ret = parser(v)
|
||||
else:
|
||||
return v
|
||||
ret = v
|
||||
|
||||
return ret
|
||||
|
||||
else:
|
||||
continue
|
||||
|
||||
# XXX: should never get here..
|
||||
breakpoint()
|
||||
else:
|
||||
if debug:
|
||||
import tractor
|
||||
with tractor.devx.maybe_open_crash_handler():
|
||||
raise ValueError(
|
||||
f'Invalid txn time ??\n'
|
||||
f'txn-id: {k!r}\n'
|
||||
f'{k!r}: {v!r}\n'
|
||||
)
|
||||
# assert v is not None, f'No valid value for `{k}`!?'
|
||||
|
||||
if _invalid is not None:
|
||||
_invalid.append(tx)
|
||||
return from_timestamp(0.)
|
||||
|
||||
# breakpoint()
|
||||
|
||||
entry: tuple[str, dict]|Transaction
|
||||
invalid: list = []
|
||||
for entry in sorted(
|
||||
records,
|
||||
key=key or dyn_parse_to_dt,
|
||||
key=key or partial(
|
||||
dyn_parse_to_dt,
|
||||
_invalid=invalid,
|
||||
),
|
||||
):
|
||||
if entry in invalid:
|
||||
log.warning(
|
||||
f'Ignoring txn w invalid timestamp ??\n'
|
||||
f'{pformat(entry)}\n'
|
||||
# f'txn-id: {k!r}\n'
|
||||
# f'{k!r}: {v!r}\n'
|
||||
)
|
||||
continue
|
||||
|
||||
# NOTE the type sig above; either pairs or txns B)
|
||||
yield entry
|
||||
|
||||
|
|
|
@ -51,6 +51,7 @@ __brokers__: list[str] = [
|
|||
'ib',
|
||||
'kraken',
|
||||
'kucoin',
|
||||
'deribit',
|
||||
|
||||
# broken but used to work
|
||||
# 'questrade',
|
||||
|
@ -61,7 +62,6 @@ __brokers__: list[str] = [
|
|||
# wstrade
|
||||
# iex
|
||||
|
||||
# deribit
|
||||
# bitso
|
||||
]
|
||||
|
||||
|
|
|
@ -96,7 +96,10 @@ async def _setup_persistent_brokerd(
|
|||
# - `open_symbol_search()`
|
||||
# NOTE: see ep invocation details inside `.data.feed`.
|
||||
try:
|
||||
async with trio.open_nursery() as service_nursery:
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as service_nursery
|
||||
):
|
||||
bus: _FeedsBus = feed.get_feed_bus(
|
||||
brokername,
|
||||
service_nursery,
|
||||
|
|
|
@ -374,9 +374,14 @@ class Client:
|
|||
pair: Pair = pair_type(**item)
|
||||
except Exception as e:
|
||||
e.add_note(
|
||||
"\nDon't panic, prolly stupid binance changed their symbology schema again..\n"
|
||||
'Check out their API docs here:\n\n'
|
||||
'https://binance-docs.github.io/apidocs/spot/en/#exchange-information'
|
||||
f'\n'
|
||||
f'New or removed field we need to codify!\n'
|
||||
f'pair-type: {pair_type!r}\n'
|
||||
f'\n'
|
||||
f"Don't panic, prolly stupid binance changed their symbology schema again..\n"
|
||||
f'Check out their API docs here:\n'
|
||||
f'\n'
|
||||
f'https://binance-docs.github.io/apidocs/spot/en/#exchange-information\n'
|
||||
)
|
||||
raise
|
||||
pair_table[pair.symbol.upper()] = pair
|
||||
|
|
|
@ -440,6 +440,7 @@ async def open_trade_dialog(
|
|||
# - ledger: TransactionLedger
|
||||
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
ctx.open_stream() as ems_stream,
|
||||
):
|
||||
|
|
|
@ -97,6 +97,13 @@ class Pair(Struct, frozen=True, kw_only=True):
|
|||
baseAsset: str
|
||||
baseAssetPrecision: int
|
||||
|
||||
permissionSets: list[list[str]]
|
||||
|
||||
# https://developers.binance.com/docs/binance-spot-api-docs#2025-08-26
|
||||
# will become non-optional 2025-08-28?
|
||||
# https://developers.binance.com/docs/binance-spot-api-docs#future-changes
|
||||
pegInstructionsAllowed: bool|None = None
|
||||
|
||||
filters: dict[
|
||||
str,
|
||||
str | int | float,
|
||||
|
@ -142,7 +149,11 @@ class SpotPair(Pair, frozen=True):
|
|||
defaultSelfTradePreventionMode: str
|
||||
allowedSelfTradePreventionModes: list[str]
|
||||
permissions: list[str]
|
||||
permissionSets: list[list[str]]
|
||||
|
||||
# can the paint botz creat liq gaps even easier on this asset?
|
||||
# Bp
|
||||
# https://developers.binance.com/docs/binance-spot-api-docs/faqs/order_amend_keep_priority
|
||||
amendAllowed: bool
|
||||
|
||||
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
|
||||
ns_path: str = 'piker.brokers.binance:SpotPair'
|
||||
|
|
|
@ -25,6 +25,7 @@ from .api import (
|
|||
get_client,
|
||||
)
|
||||
from .feed import (
|
||||
get_mkt_info,
|
||||
open_history_client,
|
||||
open_symbol_search,
|
||||
stream_quotes,
|
||||
|
@ -34,15 +35,20 @@ from .feed import (
|
|||
# open_trade_dialog,
|
||||
# norm_trade_records,
|
||||
# )
|
||||
from .venues import (
|
||||
OptionPair,
|
||||
)
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
__all__ = [
|
||||
'get_client',
|
||||
# 'trades_dialogue',
|
||||
'get_mkt_info',
|
||||
'open_history_client',
|
||||
'open_symbol_search',
|
||||
'stream_quotes',
|
||||
'OptionPair',
|
||||
# 'norm_trade_records',
|
||||
]
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -18,38 +18,59 @@
|
|||
Deribit backend.
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from datetime import datetime
|
||||
from typing import Any, Optional, Callable
|
||||
from typing import (
|
||||
# Any,
|
||||
# Optional,
|
||||
Callable,
|
||||
)
|
||||
# from pprint import pformat
|
||||
import time
|
||||
|
||||
import cryptofeed
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
import pendulum
|
||||
from rapidfuzz import process as fuzzy
|
||||
from pendulum import (
|
||||
from_timestamp,
|
||||
)
|
||||
import numpy as np
|
||||
import tractor
|
||||
|
||||
from piker.brokers import open_cached_client
|
||||
from piker.log import get_logger, get_console_log
|
||||
from piker.data import ShmArray
|
||||
from piker.brokers._util import (
|
||||
BrokerError,
|
||||
from piker.accounting import (
|
||||
Asset,
|
||||
MktPair,
|
||||
unpack_fqme,
|
||||
)
|
||||
from piker.brokers import (
|
||||
open_cached_client,
|
||||
NoData,
|
||||
DataUnavailable,
|
||||
)
|
||||
|
||||
from cryptofeed import FeedHandler
|
||||
from cryptofeed.defines import (
|
||||
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
|
||||
from piker._cacheables import (
|
||||
async_lifo_cache,
|
||||
)
|
||||
from cryptofeed.symbols import Symbol
|
||||
from piker.log import (
|
||||
get_logger,
|
||||
mk_repr,
|
||||
)
|
||||
from piker.data.validate import FeedInit
|
||||
|
||||
|
||||
from .api import (
|
||||
Client, Trade,
|
||||
get_config,
|
||||
str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
|
||||
Client,
|
||||
# get_config,
|
||||
piker_sym_to_cb_sym,
|
||||
cb_sym_to_deribit_inst,
|
||||
str_to_cb_sym,
|
||||
maybe_open_price_feed
|
||||
)
|
||||
from .venues import (
|
||||
Pair,
|
||||
OptionPair,
|
||||
Trade,
|
||||
)
|
||||
|
||||
_spawn_kwargs = {
|
||||
'infect_asyncio': True,
|
||||
|
@ -64,90 +85,215 @@ async def open_history_client(
|
|||
mkt: MktPair,
|
||||
) -> tuple[Callable, int]:
|
||||
|
||||
fnstrument: str = mkt.bs_fqme
|
||||
# TODO implement history getter for the new storage layer.
|
||||
async with open_cached_client('deribit') as client:
|
||||
|
||||
pair: OptionPair = client._pairs[mkt.dst.name]
|
||||
# XXX NOTE, the cuckers use ms !!!
|
||||
creation_time_s: int = pair.creation_timestamp/1000
|
||||
|
||||
async def get_ohlc(
|
||||
end_dt: Optional[datetime] = None,
|
||||
start_dt: Optional[datetime] = None,
|
||||
timeframe: float,
|
||||
end_dt: datetime | None = None,
|
||||
start_dt: datetime | None = None,
|
||||
|
||||
) -> tuple[
|
||||
np.ndarray,
|
||||
datetime, # start
|
||||
datetime, # end
|
||||
]:
|
||||
if timeframe != 60:
|
||||
raise DataUnavailable('Only 1m bars are supported')
|
||||
|
||||
array = await client.bars(
|
||||
instrument,
|
||||
array: np.ndarray = await client.bars(
|
||||
mkt,
|
||||
start_dt=start_dt,
|
||||
end_dt=end_dt,
|
||||
)
|
||||
if len(array) == 0:
|
||||
raise DataUnavailable
|
||||
if (
|
||||
end_dt is None
|
||||
):
|
||||
raise DataUnavailable(
|
||||
'No history seems to exist yet?\n\n'
|
||||
f'{mkt}'
|
||||
)
|
||||
elif (
|
||||
end_dt
|
||||
and
|
||||
end_dt.timestamp() < creation_time_s
|
||||
):
|
||||
# the contract can't have history
|
||||
# before it was created.
|
||||
pair_type_str: str = type(pair).__name__
|
||||
create_dt: datetime = from_timestamp(creation_time_s)
|
||||
raise DataUnavailable(
|
||||
f'No history prior to\n'
|
||||
f'`{pair_type_str}.creation_timestamp: int = '
|
||||
f'{pair.creation_timestamp}\n\n'
|
||||
f'------ deribit sux ------\n'
|
||||
f'WHICH IN "NORMAL PEOPLE WHO USE EPOCH TIME" form is,\n'
|
||||
f'creation_time_s: {creation_time_s}\n'
|
||||
f'create_dt: {create_dt}\n'
|
||||
)
|
||||
raise NoData(
|
||||
f'No frame for {start_dt} -> {end_dt}\n'
|
||||
)
|
||||
|
||||
start_dt = pendulum.from_timestamp(array[0]['time'])
|
||||
end_dt = pendulum.from_timestamp(array[-1]['time'])
|
||||
start_dt = from_timestamp(array[0]['time'])
|
||||
end_dt = from_timestamp(array[-1]['time'])
|
||||
|
||||
times = array['time']
|
||||
if not times.any():
|
||||
raise ValueError(
|
||||
'Bad frame with null-times?\n\n'
|
||||
f'{times}'
|
||||
)
|
||||
|
||||
if end_dt is None:
|
||||
inow: int = round(time.time())
|
||||
if (inow - times[-1]) > 60:
|
||||
await tractor.pause()
|
||||
|
||||
return array, start_dt, end_dt
|
||||
|
||||
yield get_ohlc, {'erlangs': 3, 'rate': 3}
|
||||
yield (
|
||||
get_ohlc,
|
||||
{ # backfill config
|
||||
'erlangs': 3,
|
||||
'rate': 3,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@async_lifo_cache()
|
||||
async def get_mkt_info(
|
||||
fqme: str,
|
||||
|
||||
) -> tuple[MktPair, Pair|OptionPair] | None:
|
||||
|
||||
# uppercase since kraken bs_mktid is always upper
|
||||
if 'deribit' not in fqme.lower():
|
||||
fqme += '.deribit'
|
||||
|
||||
mkt_mode: str = ''
|
||||
broker, mkt_ep, venue, expiry = unpack_fqme(fqme)
|
||||
|
||||
# NOTE: we always upper case all tokens to be consistent with
|
||||
# binance's symbology style for pairs, like `BTCUSDT`, but in
|
||||
# theory we could also just keep things lower case; as long as
|
||||
# we're consistent and the symcache matches whatever this func
|
||||
# returns, always!
|
||||
expiry: str = expiry.upper()
|
||||
venue: str = venue.upper()
|
||||
# venue_lower: str = venue.lower()
|
||||
|
||||
mkt_mode: str = 'option'
|
||||
|
||||
async with open_cached_client(
|
||||
'deribit',
|
||||
) as client:
|
||||
|
||||
assets: dict[str, Asset] = await client.get_assets()
|
||||
pair_str: str = mkt_ep.lower()
|
||||
|
||||
pair: Pair = await client.exch_info(
|
||||
sym=pair_str,
|
||||
)
|
||||
mkt_mode = pair.venue
|
||||
client.mkt_mode = mkt_mode
|
||||
|
||||
dst: Asset | None = assets.get(pair.bs_dst_asset)
|
||||
src: Asset | None = assets.get(pair.bs_src_asset)
|
||||
|
||||
mkt = MktPair(
|
||||
dst=dst,
|
||||
src=src,
|
||||
price_tick=pair.price_tick,
|
||||
size_tick=pair.size_tick,
|
||||
bs_mktid=pair.symbol,
|
||||
venue=mkt_mode,
|
||||
broker='deribit',
|
||||
_atype=mkt_mode,
|
||||
_fqme_without_src=True,
|
||||
|
||||
# expiry=pair.expiry,
|
||||
# XXX TODO, currently we don't use it since it's
|
||||
# already "described" in the `OptionPair.symbol: str`
|
||||
# and if we slap in the ISO repr it's kinda hideous..
|
||||
# -[ ] figure out the best either std
|
||||
)
|
||||
return mkt, pair
|
||||
|
||||
|
||||
async def stream_quotes(
|
||||
|
||||
send_chan: trio.abc.SendChannel,
|
||||
symbols: list[str],
|
||||
feed_is_live: trio.Event,
|
||||
loglevel: str = None,
|
||||
|
||||
# startup sync
|
||||
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> None:
|
||||
# XXX: required to propagate ``tractor`` loglevel to piker logging
|
||||
get_console_log(loglevel or tractor.current_actor().loglevel)
|
||||
'''
|
||||
Open a live quote stream for the market set defined by `symbols`.
|
||||
|
||||
sym = symbols[0]
|
||||
Internally this starts a `cryptofeed.FeedHandler` inside an `asyncio`-side
|
||||
task and relays through L1 and `Trade` msgs here to our `trio.Task`.
|
||||
|
||||
'''
|
||||
sym = symbols[0].split('.')[0]
|
||||
init_msgs: list[FeedInit] = []
|
||||
|
||||
# multiline nested `dict` formatter (since rn quote-msgs are
|
||||
# just that).
|
||||
pfmt: Callable[[str], str] = mk_repr(
|
||||
# so we can see `deribit`'s delightfully mega-long bs fields..
|
||||
maxstring=100,
|
||||
)
|
||||
|
||||
async with (
|
||||
open_cached_client('deribit') as client,
|
||||
send_chan as send_chan
|
||||
):
|
||||
mkt: MktPair
|
||||
pair: Pair
|
||||
mkt, pair = await get_mkt_info(sym)
|
||||
|
||||
init_msgs = {
|
||||
# pass back token, and bool, signalling if we're the writer
|
||||
# and that history has been written
|
||||
sym: {
|
||||
'symbol_info': {
|
||||
'asset_type': 'option',
|
||||
'price_tick_size': 0.0005
|
||||
},
|
||||
'shm_write_opts': {'sum_tick_vml': False},
|
||||
'fqsn': sym,
|
||||
},
|
||||
}
|
||||
# build out init msgs according to latest spec
|
||||
init_msgs.append(
|
||||
FeedInit(
|
||||
mkt_info=mkt,
|
||||
)
|
||||
)
|
||||
# build `cryptofeed` feed-handle
|
||||
cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym)
|
||||
|
||||
nsym = piker_sym_to_cb_sym(sym)
|
||||
from_cf: tractor.to_asyncio.LinkedTaskChannel
|
||||
async with maybe_open_price_feed(sym) as from_cf:
|
||||
|
||||
async with maybe_open_price_feed(sym) as stream:
|
||||
# load the "last trades" summary
|
||||
last_trades_res: cryptofeed.LastTradesResult = await client.last_trades(
|
||||
cb_sym_to_deribit_inst(cf_sym),
|
||||
count=1,
|
||||
)
|
||||
last_trades: list[Trade] = last_trades_res.trades
|
||||
|
||||
cache = await client.cache_symbols()
|
||||
# TODO, do we even need this or will the above always
|
||||
# work?
|
||||
# if not last_trades:
|
||||
# await tractor.pause()
|
||||
# async for typ, quote in from_cf:
|
||||
# if typ == 'trade':
|
||||
# last_trade = Trade(**(quote['data']))
|
||||
# break
|
||||
|
||||
last_trades = (await client.last_trades(
|
||||
cb_sym_to_deribit_inst(nsym), count=1)).trades
|
||||
# else:
|
||||
last_trade = Trade(
|
||||
**(last_trades[0])
|
||||
)
|
||||
|
||||
if len(last_trades) == 0:
|
||||
last_trade = None
|
||||
async for typ, quote in stream:
|
||||
if typ == 'trade':
|
||||
last_trade = Trade(**(quote['data']))
|
||||
break
|
||||
|
||||
else:
|
||||
last_trade = Trade(**(last_trades[0]))
|
||||
|
||||
first_quote = {
|
||||
first_quote: dict = {
|
||||
'symbol': sym,
|
||||
'last': last_trade.price,
|
||||
'brokerd_ts': last_trade.timestamp,
|
||||
|
@ -158,13 +304,84 @@ async def stream_quotes(
|
|||
'broker_ts': last_trade.timestamp
|
||||
}]
|
||||
}
|
||||
task_status.started((init_msgs, first_quote))
|
||||
task_status.started((
|
||||
init_msgs,
|
||||
first_quote,
|
||||
))
|
||||
|
||||
feed_is_live.set()
|
||||
|
||||
async for typ, quote in stream:
|
||||
topic = quote['symbol']
|
||||
await send_chan.send({topic: quote})
|
||||
# NOTE XXX, static for now!
|
||||
# => since this only handles ONE mkt feed at a time we
|
||||
# don't need a lookup table to map interleaved quotes
|
||||
# from multiple possible mkt-pairs
|
||||
topic: str = mkt.bs_fqme
|
||||
|
||||
# deliver until cancelled
|
||||
async for typ, ref in from_cf:
|
||||
match typ:
|
||||
case 'trade':
|
||||
trade: cryptofeed.types.Trade = ref
|
||||
|
||||
# TODO, re-impl this according to teh ideal
|
||||
# fqme for opts that we choose!!
|
||||
bs_fqme: str = cb_sym_to_deribit_inst(
|
||||
str_to_cb_sym(trade.symbol)
|
||||
).lower()
|
||||
|
||||
piker_quote: dict = {
|
||||
'symbol': bs_fqme,
|
||||
'last': trade.price,
|
||||
'broker_ts': time.time(),
|
||||
# ^TODO, name this `brokerd/datad_ts` and
|
||||
# use `time.time_ns()` ??
|
||||
'ticks': [{
|
||||
'type': 'trade',
|
||||
'price': float(trade.price),
|
||||
'size': float(trade.amount),
|
||||
'broker_ts': trade.timestamp,
|
||||
}],
|
||||
}
|
||||
log.info(
|
||||
f'deribit {typ!r} quote for {sym!r}\n\n'
|
||||
f'{trade}\n\n'
|
||||
f'{pfmt(piker_quote)}\n'
|
||||
)
|
||||
|
||||
case 'l1':
|
||||
book: cryptofeed.types.L1Book = ref
|
||||
|
||||
# TODO, so this is where we can possibly change things
|
||||
# and instead lever the `MktPair.bs_fqme: str` output?
|
||||
bs_fqme: str = cb_sym_to_deribit_inst(
|
||||
str_to_cb_sym(book.symbol)
|
||||
).lower()
|
||||
|
||||
piker_quote: dict = {
|
||||
'symbol': bs_fqme,
|
||||
'ticks': [
|
||||
|
||||
{'type': 'bid',
|
||||
'price': float(book.bid_price),
|
||||
'size': float(book.bid_size)},
|
||||
|
||||
{'type': 'bsize',
|
||||
'price': float(book.bid_price),
|
||||
'size': float(book.bid_size),},
|
||||
|
||||
{'type': 'ask',
|
||||
'price': float(book.ask_price),
|
||||
'size': float(book.ask_size),},
|
||||
|
||||
{'type': 'asize',
|
||||
'price': float(book.ask_price),
|
||||
'size': float(book.ask_size),}
|
||||
]
|
||||
}
|
||||
|
||||
await send_chan.send({
|
||||
topic: piker_quote,
|
||||
})
|
||||
|
||||
|
||||
@tractor.context
|
||||
|
@ -174,12 +391,21 @@ async def open_symbol_search(
|
|||
async with open_cached_client('deribit') as client:
|
||||
|
||||
# load all symbols locally for fast search
|
||||
cache = await client.cache_symbols()
|
||||
# cache = client._pairs
|
||||
await ctx.started()
|
||||
|
||||
async with ctx.open_stream() as stream:
|
||||
|
||||
pattern: str
|
||||
async for pattern in stream:
|
||||
# repack in dict form
|
||||
await stream.send(
|
||||
await client.search_symbols(pattern))
|
||||
|
||||
# NOTE: pattern fuzzy-matching is done within
|
||||
# the methd impl.
|
||||
pairs: dict[str, Pair] = await client.search_symbols(
|
||||
pattern,
|
||||
)
|
||||
# repack in fqme-keyed table
|
||||
byfqme: dict[str, Pair] = {}
|
||||
for pair in pairs.values():
|
||||
byfqme[pair.bs_fqme] = pair
|
||||
|
||||
await stream.send(byfqme)
|
||||
|
|
|
@ -0,0 +1,196 @@
|
|||
# piker: trading gear for hackers
|
||||
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
"""
|
||||
Per market data-type definitions and schemas types.
|
||||
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import pendulum
|
||||
from typing import (
|
||||
Literal,
|
||||
Optional,
|
||||
)
|
||||
from decimal import Decimal
|
||||
|
||||
from piker.types import Struct
|
||||
|
||||
|
||||
# API endpoint paths by venue / sub-API
|
||||
_domain: str = 'deribit.com'
|
||||
_url = f'https://www.{_domain}'
|
||||
|
||||
# WEBsocketz
|
||||
_ws_url: str = f'wss://www.{_domain}/ws/api/v2'
|
||||
|
||||
# test nets
|
||||
_testnet_ws_url: str = f'wss://test.{_domain}/ws/api/v2'
|
||||
|
||||
MarketType = Literal[
|
||||
'option'
|
||||
]
|
||||
|
||||
|
||||
def get_api_eps(venue: MarketType) -> tuple[str, str]:
|
||||
'''
|
||||
Return API ep root paths per venue.
|
||||
|
||||
'''
|
||||
return {
|
||||
'option': (
|
||||
_ws_url,
|
||||
),
|
||||
}[venue]
|
||||
|
||||
|
||||
class Pair(Struct, frozen=True, kw_only=True):
|
||||
|
||||
symbol: str
|
||||
|
||||
# src
|
||||
quote_currency: str # 'BTC'
|
||||
|
||||
# dst
|
||||
base_currency: str # "BTC",
|
||||
|
||||
tick_size: float # 0.0001 # [{'above_price': 0.005, 'tick_size': 0.0005}]
|
||||
tick_size_steps: list[dict[str, float]]
|
||||
|
||||
@property
|
||||
def price_tick(self) -> Decimal:
|
||||
return Decimal(str(self.tick_size_steps[0]['above_price']))
|
||||
|
||||
@property
|
||||
def size_tick(self) -> Decimal:
|
||||
return Decimal(str(self.tick_size))
|
||||
|
||||
@property
|
||||
def bs_fqme(self) -> str:
|
||||
return f'{self.symbol}'
|
||||
|
||||
@property
|
||||
def bs_mktid(self) -> str:
|
||||
return f'{self.symbol}.{self.venue}'
|
||||
|
||||
|
||||
class OptionPair(Pair, frozen=True):
|
||||
|
||||
taker_commission: float # 0.0003
|
||||
strike: float # 5000.0
|
||||
settlement_period: str # 'day'
|
||||
settlement_currency: str # "BTC",
|
||||
rfq: bool # false
|
||||
price_index: str # 'btc_usd'
|
||||
option_type: str # 'call'
|
||||
min_trade_amount: float # 0.1
|
||||
maker_commission: float # 0.0003
|
||||
kind: str # 'option'
|
||||
is_active: bool # true
|
||||
instrument_type: str # 'reversed'
|
||||
instrument_name: str # 'BTC-1SEP24-55000-C'
|
||||
instrument_id: int # 364671
|
||||
expiration_timestamp: int # 1725177600000
|
||||
creation_timestamp: int # 1724918461000
|
||||
counter_currency: str # 'USD'
|
||||
contract_size: float # '1.0'
|
||||
block_trade_tick_size: float # '0.0001'
|
||||
block_trade_min_trade_amount: int # '25'
|
||||
block_trade_commission: float # '0.003'
|
||||
|
||||
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
|
||||
ns_path: str = 'piker.brokers.deribit:OptionPair'
|
||||
|
||||
# TODO, impl this without the MM:SS part of
|
||||
# the `'THH:MM:SS..'` etc..
|
||||
@property
|
||||
def expiry(self) -> str:
|
||||
iso_date = pendulum.from_timestamp(
|
||||
self.expiration_timestamp / 1000
|
||||
).isoformat()
|
||||
return iso_date
|
||||
|
||||
@property
|
||||
def venue(self) -> str:
|
||||
return f'{self.instrument_type}_option'
|
||||
|
||||
@property
|
||||
def bs_fqme(self) -> str:
|
||||
return f'{self.symbol}'
|
||||
|
||||
@property
|
||||
def bs_src_asset(self) -> str:
|
||||
return f'{self.quote_currency}'
|
||||
|
||||
@property
|
||||
def bs_dst_asset(self) -> str:
|
||||
return f'{self.symbol}'
|
||||
|
||||
|
||||
PAIRTYPES: dict[MarketType, Pair] = {
|
||||
'option': OptionPair,
|
||||
}
|
||||
|
||||
|
||||
class JSONRPCResult(Struct):
|
||||
id: int
|
||||
usIn: int
|
||||
usOut: int
|
||||
usDiff: int
|
||||
testnet: bool
|
||||
jsonrpc: str = '2.0'
|
||||
error: Optional[dict] = None
|
||||
result: Optional[list[dict]] = None
|
||||
|
||||
|
||||
class JSONRPCChannel(Struct):
|
||||
method: str
|
||||
params: dict
|
||||
jsonrpc: str = '2.0'
|
||||
|
||||
|
||||
class KLinesResult(Struct):
|
||||
low: list[float]
|
||||
cost: list[float]
|
||||
high: list[float]
|
||||
open: list[float]
|
||||
close: list[float]
|
||||
ticks: list[int]
|
||||
status: str
|
||||
volume: list[float]
|
||||
|
||||
|
||||
class Trade(Struct):
|
||||
iv: float
|
||||
price: float
|
||||
amount: float
|
||||
trade_id: str
|
||||
contracts: float
|
||||
direction: str
|
||||
trade_seq: int
|
||||
timestamp: int
|
||||
mark_price: float
|
||||
index_price: float
|
||||
tick_direction: int
|
||||
instrument_name: str
|
||||
combo_id: Optional[str] = '',
|
||||
combo_trade_id: Optional[int] = 0,
|
||||
block_trade_id: Optional[str] = '',
|
||||
block_trade_leg_count: Optional[int] = 0,
|
||||
|
||||
|
||||
class LastTradesResult(Struct):
|
||||
trades: list[Trade]
|
||||
has_more: bool
|
|
@ -34,6 +34,7 @@ from piker.brokers._util import get_logger
|
|||
if TYPE_CHECKING:
|
||||
from .api import Client
|
||||
from ib_insync import IB
|
||||
import i3ipc
|
||||
|
||||
log = get_logger('piker.brokers.ib')
|
||||
|
||||
|
@ -48,6 +49,37 @@ _reset_tech: Literal[
|
|||
] = 'vnc'
|
||||
|
||||
|
||||
no_setup_msg:str = (
|
||||
'No data reset hack test setup for {vnc_sockaddr}!\n'
|
||||
'See config setup tips @\n'
|
||||
'https://github.com/pikers/piker/tree/master/piker/brokers/ib'
|
||||
)
|
||||
|
||||
|
||||
def try_xdo_manual(
|
||||
vnc_sockaddr: str,
|
||||
):
|
||||
'''
|
||||
Do the "manual" `xdo`-based screen switch + click
|
||||
combo since apparently the `asyncvnc` client ain't workin..
|
||||
|
||||
Note this is only meant as a backup method for Xorg users,
|
||||
ideally you can use a real vnc client and the `vnc_click_hack()`
|
||||
impl!
|
||||
|
||||
'''
|
||||
global _reset_tech
|
||||
try:
|
||||
i3ipc_xdotool_manual_click_hack()
|
||||
_reset_tech = 'i3ipc_xdotool'
|
||||
return True
|
||||
except OSError:
|
||||
log.exception(
|
||||
no_setup_msg.format(vnc_sockaddr)
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
async def data_reset_hack(
|
||||
# vnc_host: str,
|
||||
client: Client,
|
||||
|
@ -90,15 +122,9 @@ async def data_reset_hack(
|
|||
vnc_port: int
|
||||
vnc_sockaddr: tuple[str] | None = client.conf.get('vnc_addrs')
|
||||
|
||||
no_setup_msg:str = (
|
||||
f'No data reset hack test setup for {vnc_sockaddr}!\n'
|
||||
'See config setup tips @\n'
|
||||
'https://github.com/pikers/piker/tree/master/piker/brokers/ib'
|
||||
)
|
||||
|
||||
if not vnc_sockaddr:
|
||||
log.warning(
|
||||
no_setup_msg
|
||||
no_setup_msg.format(vnc_sockaddr)
|
||||
+
|
||||
'REQUIRES A `vnc_addrs: array` ENTRY'
|
||||
)
|
||||
|
@ -119,27 +145,38 @@ async def data_reset_hack(
|
|||
port=vnc_port,
|
||||
)
|
||||
)
|
||||
except OSError:
|
||||
if vnc_host != 'localhost':
|
||||
log.warning(no_setup_msg)
|
||||
return False
|
||||
|
||||
except (
|
||||
OSError, # no VNC server avail..
|
||||
PermissionError, # asyncvnc pw fail..
|
||||
):
|
||||
try:
|
||||
import i3ipc # noqa (since a deps dynamic check)
|
||||
except ModuleNotFoundError:
|
||||
log.warning(no_setup_msg)
|
||||
log.warning(
|
||||
no_setup_msg.format(vnc_sockaddr)
|
||||
)
|
||||
return False
|
||||
|
||||
try:
|
||||
i3ipc_xdotool_manual_click_hack()
|
||||
_reset_tech = 'i3ipc_xdotool'
|
||||
return True
|
||||
except OSError:
|
||||
log.exception(no_setup_msg)
|
||||
if vnc_host not in {
|
||||
'localhost',
|
||||
'127.0.0.1',
|
||||
}:
|
||||
focussed, matches = i3ipc_fin_wins_titled()
|
||||
if not matches:
|
||||
log.warning(
|
||||
no_setup_msg.format(vnc_sockaddr)
|
||||
)
|
||||
return False
|
||||
else:
|
||||
try_xdo_manual(vnc_sockaddr)
|
||||
|
||||
# localhost but no vnc-client or it borked..
|
||||
else:
|
||||
try_xdo_manual(vnc_sockaddr)
|
||||
|
||||
case 'i3ipc_xdotool':
|
||||
i3ipc_xdotool_manual_click_hack()
|
||||
try_xdo_manual(vnc_sockaddr)
|
||||
# i3ipc_xdotool_manual_click_hack()
|
||||
|
||||
case _ as tech:
|
||||
raise RuntimeError(f'{tech} is not supported for reset tech!?')
|
||||
|
@ -178,9 +215,9 @@ async def vnc_click_hack(
|
|||
host,
|
||||
port=port,
|
||||
|
||||
# TODO: doesn't work see:
|
||||
# https://github.com/barneygale/asyncvnc/issues/7
|
||||
# password='ibcansmbz',
|
||||
# TODO: doesn't work?
|
||||
# see, https://github.com/barneygale/asyncvnc/issues/7
|
||||
password='doggy',
|
||||
|
||||
) as client:
|
||||
|
||||
|
@ -194,34 +231,67 @@ async def vnc_click_hack(
|
|||
client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked
|
||||
|
||||
|
||||
def i3ipc_fin_wins_titled(
|
||||
titles: list[str] = [
|
||||
'Interactive Brokers', # tws running in i3
|
||||
'IB Gateway', # gw running in i3
|
||||
# 'IB', # gw running in i3 (newer version?)
|
||||
|
||||
# !TODO, remote vnc instance
|
||||
# -[ ] something in title (or other Con-props) that indicates
|
||||
# this is explicitly for ibrk sw?
|
||||
# |_[ ] !can use modden spawn eventually!
|
||||
'TigerVNC',
|
||||
# 'vncviewer', # the terminal..
|
||||
],
|
||||
) -> tuple[
|
||||
i3ipc.Con, # orig focussed win
|
||||
list[tuple[str, i3ipc.Con]], # matching wins by title
|
||||
]:
|
||||
'''
|
||||
Attempt to find a local-DE window titled with an entry in
|
||||
`titles`.
|
||||
|
||||
If found deliver the current focussed window and all matching
|
||||
`i3ipc.Con`s in a list.
|
||||
|
||||
'''
|
||||
import i3ipc
|
||||
ipc = i3ipc.Connection()
|
||||
|
||||
# TODO: might be worth offering some kinda api for grabbing
|
||||
# the window id from the pid?
|
||||
# https://stackoverflow.com/a/2250879
|
||||
tree = ipc.get_tree()
|
||||
focussed: i3ipc.Con = tree.find_focused()
|
||||
|
||||
matches: list[i3ipc.Con] = []
|
||||
for name in titles:
|
||||
results = tree.find_titled(name)
|
||||
print(f'results for {name}: {results}')
|
||||
if results:
|
||||
con = results[0]
|
||||
matches.append((
|
||||
name,
|
||||
con,
|
||||
))
|
||||
|
||||
return (
|
||||
focussed,
|
||||
matches,
|
||||
)
|
||||
|
||||
|
||||
|
||||
def i3ipc_xdotool_manual_click_hack() -> None:
|
||||
'''
|
||||
Do the data reset hack but expecting a local X-window using `xdotool`.
|
||||
|
||||
'''
|
||||
import i3ipc
|
||||
i3 = i3ipc.Connection()
|
||||
|
||||
# TODO: might be worth offering some kinda api for grabbing
|
||||
# the window id from the pid?
|
||||
# https://stackoverflow.com/a/2250879
|
||||
t = i3.get_tree()
|
||||
|
||||
orig_win_id = t.find_focused().window
|
||||
|
||||
# for tws
|
||||
win_names: list[str] = [
|
||||
'Interactive Brokers', # tws running in i3
|
||||
'IB Gateway', # gw running in i3
|
||||
# 'IB', # gw running in i3 (newer version?)
|
||||
]
|
||||
|
||||
focussed, matches = i3ipc_fin_wins_titled()
|
||||
orig_win_id = focussed.window
|
||||
try:
|
||||
for name in win_names:
|
||||
results = t.find_titled(name)
|
||||
print(f'results for {name}: {results}')
|
||||
if results:
|
||||
con = results[0]
|
||||
for name, con in matches:
|
||||
print(f'Resetting data feed for {name}')
|
||||
win_id = str(con.window)
|
||||
w, h = con.rect.width, con.rect.height
|
||||
|
|
|
@ -48,6 +48,7 @@ from bidict import bidict
|
|||
import trio
|
||||
import tractor
|
||||
from tractor import to_asyncio
|
||||
from tractor import trionics
|
||||
from pendulum import (
|
||||
from_timestamp,
|
||||
DateTime,
|
||||
|
@ -96,6 +97,10 @@ from ._util import (
|
|||
get_logger,
|
||||
)
|
||||
|
||||
# ?TODO? this can now be removed since it was originally to extend
|
||||
# with a `bar_vwap` field that we removed from the default ohlcv
|
||||
# dtype since it's better calculated in an FSP func
|
||||
#
|
||||
_bar_load_dtype: list[tuple[str, type]] = [
|
||||
# NOTE XXX: only part that's diff
|
||||
# from our default fields where
|
||||
|
@ -1369,8 +1374,8 @@ async def load_clients_for_trio(
|
|||
'''
|
||||
Pure async mngr proxy to ``load_aio_clients()``.
|
||||
|
||||
This is a bootstrap entrypoing to call from
|
||||
a ``tractor.to_asyncio.open_channel_from()``.
|
||||
This is a bootstrap entrypoint to call from
|
||||
a `tractor.to_asyncio.open_channel_from()`.
|
||||
|
||||
'''
|
||||
async with load_aio_clients(
|
||||
|
@ -1391,7 +1396,10 @@ async def open_client_proxies() -> tuple[
|
|||
async with (
|
||||
tractor.trionics.maybe_open_context(
|
||||
acm_func=tractor.to_asyncio.open_channel_from,
|
||||
kwargs={'target': load_clients_for_trio},
|
||||
kwargs={
|
||||
'target': load_clients_for_trio,
|
||||
# ^XXX, kwarg to `open_channel_from()`
|
||||
},
|
||||
|
||||
# lock around current actor task access
|
||||
# TODO: maybe this should be the default in tractor?
|
||||
|
@ -1584,7 +1592,8 @@ async def open_client_proxy(
|
|||
event_consumers=event_table,
|
||||
) as (first, chan),
|
||||
|
||||
trio.open_nursery() as relay_n,
|
||||
trionics.collapse_eg(), # loose-ify
|
||||
trio.open_nursery() as relay_tn,
|
||||
):
|
||||
|
||||
assert isinstance(first, Client)
|
||||
|
@ -1624,7 +1633,7 @@ async def open_client_proxy(
|
|||
|
||||
continue
|
||||
|
||||
relay_n.start_soon(relay_events)
|
||||
relay_tn.start_soon(relay_events)
|
||||
|
||||
yield proxy
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ import trio
|
|||
from trio_typing import TaskStatus
|
||||
import tractor
|
||||
from tractor.to_asyncio import LinkedTaskChannel
|
||||
from tractor import trionics
|
||||
from ib_insync.contract import (
|
||||
Contract,
|
||||
)
|
||||
|
@ -407,7 +408,7 @@ async def update_and_audit_pos_msg(
|
|||
|
||||
# TODO: make this a "propaganda" log level?
|
||||
if ibpos.avgCost != msg.avg_price:
|
||||
log.warning(
|
||||
log.debug(
|
||||
f'IB "FIFO" avg price for {msg.symbol} is DIFF:\n'
|
||||
f'ib: {ibfmtmsg}\n'
|
||||
'---------------------------\n'
|
||||
|
@ -738,7 +739,7 @@ async def open_trade_dialog(
|
|||
f'UNEXPECTED POSITION says IB => {msg.symbol}\n'
|
||||
'Maybe they LIQUIDATED YOU or your ledger is wrong?\n'
|
||||
)
|
||||
log.error(logmsg)
|
||||
log.debug(logmsg)
|
||||
|
||||
await ctx.started((
|
||||
all_positions,
|
||||
|
@ -747,21 +748,22 @@ async def open_trade_dialog(
|
|||
|
||||
async with (
|
||||
ctx.open_stream() as ems_stream,
|
||||
trio.open_nursery() as n,
|
||||
trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
# relay existing open orders to ems
|
||||
for msg in order_msgs:
|
||||
await ems_stream.send(msg)
|
||||
|
||||
for client in set(aioclients.values()):
|
||||
trade_event_stream: LinkedTaskChannel = await n.start(
|
||||
trade_event_stream: LinkedTaskChannel = await tn.start(
|
||||
open_trade_event_stream,
|
||||
client,
|
||||
)
|
||||
|
||||
# start order request handler **before** local trades
|
||||
# event loop
|
||||
n.start_soon(
|
||||
tn.start_soon(
|
||||
handle_order_requests,
|
||||
ems_stream,
|
||||
accounts_def,
|
||||
|
@ -769,7 +771,7 @@ async def open_trade_dialog(
|
|||
)
|
||||
|
||||
# allocate event relay tasks for each client connection
|
||||
n.start_soon(
|
||||
tn.start_soon(
|
||||
deliver_trade_events,
|
||||
|
||||
trade_event_stream,
|
||||
|
@ -1241,32 +1243,47 @@ async def deliver_trade_events(
|
|||
# never relay errors for non-broker related issues
|
||||
# https://interactivebrokers.github.io/tws-api/message_codes.html
|
||||
code: int = err['error_code']
|
||||
if code in {
|
||||
200, # uhh
|
||||
reason: str = err['reason']
|
||||
reqid: str = str(err['reqid'])
|
||||
|
||||
# "Warning:" msg codes,
|
||||
# https://interactivebrokers.github.io/tws-api/message_codes.html#warning_codes
|
||||
# - 2109: 'Outside Regular Trading Hours'
|
||||
if 'Warning:' in reason:
|
||||
log.warning(
|
||||
f'Order-API-warning: {code!r}\n'
|
||||
f'reqid: {reqid!r}\n'
|
||||
f'\n'
|
||||
f'{pformat(err)}\n'
|
||||
# ^TODO? should we just print the `reason`
|
||||
# not the full `err`-dict?
|
||||
)
|
||||
continue
|
||||
|
||||
# XXX known special (ignore) cases
|
||||
elif code in {
|
||||
200, # uhh.. ni idea
|
||||
|
||||
# hist pacing / connectivity
|
||||
162,
|
||||
165,
|
||||
|
||||
# WARNING codes:
|
||||
# https://interactivebrokers.github.io/tws-api/message_codes.html#warning_codes
|
||||
# Attribute 'Outside Regular Trading Hours' is
|
||||
# " 'ignored based on the order type and
|
||||
# destination. PlaceOrder is now ' 'being
|
||||
# processed.',
|
||||
2109,
|
||||
|
||||
# XXX: lol this isn't even documented..
|
||||
# 'No market data during competing live session'
|
||||
1669,
|
||||
}:
|
||||
log.error(
|
||||
f'Order-API-error which is non-cancel-causing ?!\n'
|
||||
f'\n'
|
||||
f'{pformat(err)}\n'
|
||||
)
|
||||
continue
|
||||
|
||||
reqid: str = str(err['reqid'])
|
||||
reason: str = err['reason']
|
||||
|
||||
if err['reqid'] == -1:
|
||||
log.error(f'TWS external order error:\n{pformat(err)}')
|
||||
log.error(
|
||||
f'TWS external order error ??\n'
|
||||
f'{pformat(err)}\n'
|
||||
)
|
||||
|
||||
flow: dict = dict(
|
||||
flows.get(reqid)
|
||||
|
|
|
@ -34,6 +34,7 @@ import urllib.parse
|
|||
import hashlib
|
||||
import hmac
|
||||
import base64
|
||||
import tractor
|
||||
import trio
|
||||
|
||||
from piker import config
|
||||
|
@ -372,8 +373,7 @@ class Client:
|
|||
# 1658347714, 'status': 'Success'}]}
|
||||
|
||||
if xfers:
|
||||
import tractor
|
||||
await tractor.pp()
|
||||
await tractor.pause()
|
||||
|
||||
trans: dict[str, Transaction] = {}
|
||||
for entry in xfers:
|
||||
|
@ -501,6 +501,7 @@ class Client:
|
|||
for xkey, data in resp['result'].items():
|
||||
|
||||
# NOTE: always cache in pairs tables for faster lookup
|
||||
with tractor.devx.maybe_open_crash_handler(): # as bxerr:
|
||||
pair = Pair(xname=xkey, **data)
|
||||
|
||||
# register the above `Pair` structs for all
|
||||
|
|
|
@ -175,9 +175,8 @@ async def handle_order_requests(
|
|||
|
||||
case {
|
||||
'account': 'kraken.spot' as account,
|
||||
'action': action,
|
||||
} if action in {'buy', 'sell'}:
|
||||
|
||||
'action': 'buy'|'sell',
|
||||
}:
|
||||
# validate
|
||||
order = BrokerdOrder(**msg)
|
||||
|
||||
|
@ -262,6 +261,12 @@ async def handle_order_requests(
|
|||
} | extra
|
||||
|
||||
log.info(f'Submitting WS order request:\n{pformat(req)}')
|
||||
|
||||
# NOTE HOWTO, debug order requests
|
||||
#
|
||||
# if 'XRP' in pair:
|
||||
# await tractor.pause()
|
||||
|
||||
await ws.send_msg(req)
|
||||
|
||||
# placehold for sanity checking in relay loop
|
||||
|
@ -1085,6 +1090,8 @@ async def handle_order_updates(
|
|||
f'Failed to {action} order {reqid}:\n'
|
||||
f'{errmsg}'
|
||||
)
|
||||
# if tractor._state.debug_mode():
|
||||
# await tractor.pause()
|
||||
|
||||
symbol: str = 'N/A'
|
||||
if chain := apiflows.get(reqid):
|
||||
|
|
|
@ -21,7 +21,6 @@ Symbology defs and search.
|
|||
from decimal import Decimal
|
||||
|
||||
import tractor
|
||||
from rapidfuzz import process as fuzzy
|
||||
|
||||
from piker._cacheables import (
|
||||
async_lifo_cache,
|
||||
|
@ -41,8 +40,13 @@ from piker.accounting._mktinfo import (
|
|||
)
|
||||
|
||||
|
||||
# https://www.kraken.com/features/api#get-tradable-pairs
|
||||
class Pair(Struct):
|
||||
'''
|
||||
A tradable asset pair as schema-defined by,
|
||||
|
||||
https://docs.kraken.com/api/docs/rest-api/get-tradable-asset-pairs
|
||||
|
||||
'''
|
||||
xname: str # idiotic bs_mktid equiv i guess?
|
||||
altname: str # alternate pair name
|
||||
wsname: str # WebSocket pair name (if available)
|
||||
|
@ -53,7 +57,6 @@ class Pair(Struct):
|
|||
lot: str # volume lot size
|
||||
|
||||
cost_decimals: int
|
||||
costmin: float
|
||||
pair_decimals: int # scaling decimal places for pair
|
||||
lot_decimals: int # scaling decimal places for volume
|
||||
|
||||
|
@ -79,6 +82,7 @@ class Pair(Struct):
|
|||
tick_size: float # min price step size
|
||||
status: str
|
||||
|
||||
costmin: str|None = None # XXX, only some mktpairs?
|
||||
short_position_limit: float = 0
|
||||
long_position_limit: float = float('inf')
|
||||
|
||||
|
|
|
@ -25,7 +25,10 @@ from typing import TYPE_CHECKING
|
|||
|
||||
import trio
|
||||
import tractor
|
||||
from tractor.trionics import broadcast_receiver
|
||||
from tractor.trionics import (
|
||||
broadcast_receiver,
|
||||
collapse_eg,
|
||||
)
|
||||
|
||||
from ._util import (
|
||||
log, # sub-sys logger
|
||||
|
@ -281,8 +284,11 @@ async def open_ems(
|
|||
client._ems_stream = trades_stream
|
||||
|
||||
# start sync code order msg delivery task
|
||||
async with trio.open_nursery() as n:
|
||||
n.start_soon(
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
tn.start_soon(
|
||||
relay_orders_from_sync_code,
|
||||
client,
|
||||
fqme,
|
||||
|
@ -298,4 +304,4 @@ async def open_ems(
|
|||
)
|
||||
|
||||
# stop the sync-msg-relay task on exit.
|
||||
n.cancel_scope.cancel()
|
||||
tn.cancel_scope.cancel()
|
||||
|
|
|
@ -42,6 +42,7 @@ from bidict import bidict
|
|||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
import tractor
|
||||
from tractor import trionics
|
||||
|
||||
from ._util import (
|
||||
log, # sub-sys logger
|
||||
|
@ -76,7 +77,6 @@ if TYPE_CHECKING:
|
|||
|
||||
# TODO: numba all of this
|
||||
def mk_check(
|
||||
|
||||
trigger_price: float,
|
||||
known_last: float,
|
||||
action: str,
|
||||
|
@ -162,7 +162,7 @@ async def clear_dark_triggers(
|
|||
|
||||
router: Router,
|
||||
brokerd_orders_stream: tractor.MsgStream,
|
||||
quote_stream: tractor.ReceiveMsgStream, # noqa
|
||||
quote_stream: tractor.MsgStream,
|
||||
broker: str,
|
||||
fqme: str,
|
||||
|
||||
|
@ -178,6 +178,7 @@ async def clear_dark_triggers(
|
|||
'''
|
||||
# XXX: optimize this for speed!
|
||||
# TODO:
|
||||
# - port to the new ringbuf stuff in `tractor.ipc`!
|
||||
# - numba all this!
|
||||
# - this stream may eventually contain multiple symbols
|
||||
quote_stream._raise_on_lag = False
|
||||
|
@ -500,7 +501,7 @@ class Router(Struct):
|
|||
|
||||
'''
|
||||
# setup at actor spawn time
|
||||
nursery: trio.Nursery
|
||||
_tn: trio.Nursery
|
||||
|
||||
# broker to book map
|
||||
books: dict[str, DarkBook] = {}
|
||||
|
@ -666,7 +667,7 @@ class Router(Struct):
|
|||
# dark book clearing loop, also lives with parent
|
||||
# daemon to allow dark order clearing while no
|
||||
# client is connected.
|
||||
self.nursery.start_soon(
|
||||
self._tn.start_soon(
|
||||
clear_dark_triggers,
|
||||
self,
|
||||
relay.brokerd_stream,
|
||||
|
@ -689,7 +690,7 @@ class Router(Struct):
|
|||
|
||||
# spawn a ``brokerd`` order control dialog stream
|
||||
# that syncs lifetime with the parent `emsd` daemon.
|
||||
self.nursery.start_soon(
|
||||
self._tn.start_soon(
|
||||
translate_and_relay_brokerd_events,
|
||||
broker,
|
||||
relay.brokerd_stream,
|
||||
|
@ -763,10 +764,12 @@ async def _setup_persistent_emsd(
|
|||
|
||||
global _router
|
||||
|
||||
# open a root "service nursery" for the ``emsd`` actor
|
||||
async with trio.open_nursery() as service_nursery:
|
||||
|
||||
_router = Router(nursery=service_nursery)
|
||||
# open a root "service task-nursery" for the `emsd`-actor
|
||||
async with (
|
||||
trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn
|
||||
):
|
||||
_router = Router(_tn=tn)
|
||||
|
||||
# TODO: send back the full set of persistent
|
||||
# orders/execs?
|
||||
|
@ -1182,12 +1185,16 @@ async def process_client_order_cmds(
|
|||
submitting live orders immediately if requested by the client.
|
||||
|
||||
'''
|
||||
# cmd: dict
|
||||
# TODO, only allow `msgspec.Struct` form!
|
||||
cmd: dict
|
||||
async for cmd in client_order_stream:
|
||||
log.info(f'Received order cmd:\n{pformat(cmd)}')
|
||||
log.info(
|
||||
f'Received order cmd:\n'
|
||||
f'{pformat(cmd)}\n'
|
||||
)
|
||||
|
||||
# CAWT DAMN we need struct support!
|
||||
oid = str(cmd['oid'])
|
||||
oid: str = str(cmd['oid'])
|
||||
|
||||
# register this stream as an active order dialog (msg flow) for
|
||||
# this order id such that translated message from the brokerd
|
||||
|
@ -1293,7 +1300,7 @@ async def process_client_order_cmds(
|
|||
case {
|
||||
'oid': oid,
|
||||
'symbol': fqme,
|
||||
'price': trigger_price,
|
||||
'price': price,
|
||||
'size': size,
|
||||
'action': ('buy' | 'sell') as action,
|
||||
'exec_mode': ('live' | 'paper'),
|
||||
|
@ -1325,7 +1332,7 @@ async def process_client_order_cmds(
|
|||
|
||||
symbol=sym,
|
||||
action=action,
|
||||
price=trigger_price,
|
||||
price=price,
|
||||
size=size,
|
||||
account=req.account,
|
||||
)
|
||||
|
@ -1347,7 +1354,11 @@ async def process_client_order_cmds(
|
|||
# (``translate_and_relay_brokerd_events()`` above) will
|
||||
# handle relaying the ems side responses back to
|
||||
# the client/cmd sender from this request
|
||||
log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
|
||||
log.info(
|
||||
f'Sending live order to {broker}:\n'
|
||||
f'{pformat(msg)}'
|
||||
)
|
||||
|
||||
await brokerd_order_stream.send(msg)
|
||||
|
||||
# an immediate response should be ``BrokerdOrderAck``
|
||||
|
@ -1363,7 +1374,7 @@ async def process_client_order_cmds(
|
|||
case {
|
||||
'oid': oid,
|
||||
'symbol': fqme,
|
||||
'price': trigger_price,
|
||||
'price': price,
|
||||
'size': size,
|
||||
'exec_mode': exec_mode,
|
||||
'action': action,
|
||||
|
@ -1391,7 +1402,12 @@ async def process_client_order_cmds(
|
|||
if isnan(last):
|
||||
last = flume.rt_shm.array[-1]['close']
|
||||
|
||||
pred = mk_check(trigger_price, last, action)
|
||||
trigger_price: float = float(price)
|
||||
pred = mk_check(
|
||||
trigger_price,
|
||||
last,
|
||||
action,
|
||||
)
|
||||
|
||||
# NOTE: for dark orders currently we submit
|
||||
# the triggered live order at a price 5 ticks
|
||||
|
@ -1498,7 +1514,7 @@ async def maybe_open_trade_relays(
|
|||
loglevel: str = 'info',
|
||||
):
|
||||
|
||||
fqme, relay, feed, client_ready = await _router.nursery.start(
|
||||
fqme, relay, feed, client_ready = await _router._tn.start(
|
||||
_router.open_trade_relays,
|
||||
fqme,
|
||||
exec_mode,
|
||||
|
|
|
@ -19,6 +19,7 @@ Clearing sub-system message and protocols.
|
|||
|
||||
"""
|
||||
from __future__ import annotations
|
||||
from decimal import Decimal
|
||||
from typing import (
|
||||
Literal,
|
||||
)
|
||||
|
@ -71,7 +72,15 @@ class Order(Struct):
|
|||
symbol: str # | MktPair
|
||||
account: str # should we set a default as '' ?
|
||||
|
||||
price: float
|
||||
# https://docs.python.org/3/library/decimal.html#decimal-objects
|
||||
#
|
||||
# ?TODO? decimal usage throughout?
|
||||
# -[ ] possibly leverage the `Encoder(decimal_format='number')`
|
||||
# bit?
|
||||
# |_https://jcristharif.com/msgspec/supported-types.html#decimal
|
||||
# -[ ] should we also use it for .size?
|
||||
#
|
||||
price: Decimal
|
||||
size: float # -ve is "sell", +ve is "buy"
|
||||
|
||||
brokers: list[str] = []
|
||||
|
@ -178,7 +187,7 @@ class BrokerdOrder(Struct):
|
|||
time_ns: int
|
||||
|
||||
symbol: str # fqme
|
||||
price: float
|
||||
price: Decimal
|
||||
size: float
|
||||
|
||||
# TODO: if we instead rely on a +ve/-ve size to determine
|
||||
|
|
|
@ -508,7 +508,7 @@ async def handle_order_requests(
|
|||
reqid = await client.submit_limit(
|
||||
oid=order.oid,
|
||||
symbol=f'{order.symbol}.{client.broker}',
|
||||
price=order.price,
|
||||
price=float(order.price),
|
||||
action=order.action,
|
||||
size=order.size,
|
||||
# XXX: by default 0 tells ``ib_insync`` methods that
|
||||
|
|
|
@ -184,33 +184,12 @@ def pikerd(
|
|||
registry_addrs=regaddrs,
|
||||
loglevel=loglevel,
|
||||
debug_mode=pdb,
|
||||
|
||||
) as service_mngr, # normally delivers a ``Services`` handle
|
||||
|
||||
# AsyncExitStack() as stack,
|
||||
enable_transports=['uds'],
|
||||
) as service_mngr,
|
||||
):
|
||||
# TODO: spawn all other sub-actor daemons according to
|
||||
# multiaddress endpoint spec defined by user config
|
||||
assert service_mngr
|
||||
|
||||
# if tsdb:
|
||||
# dname, conf = await stack.enter_async_context(
|
||||
# service.marketstore.start_ahab_daemon(
|
||||
# service_mngr,
|
||||
# loglevel=loglevel,
|
||||
# )
|
||||
# )
|
||||
# log.info(f'TSDB `{dname}` up with conf:\n{conf}')
|
||||
|
||||
# if es:
|
||||
# dname, conf = await stack.enter_async_context(
|
||||
# service.elastic.start_ahab_daemon(
|
||||
# service_mngr,
|
||||
# loglevel=loglevel,
|
||||
# )
|
||||
# )
|
||||
# log.info(f'DB `{dname}` up with conf:\n{conf}')
|
||||
|
||||
# ?TODO? spawn all other sub-actor daemons according to
|
||||
# multiaddress endpoint spec defined by user config
|
||||
await trio.sleep_forever()
|
||||
|
||||
trio.run(main)
|
||||
|
@ -335,7 +314,7 @@ def services(config, tl, ports):
|
|||
name='service_query',
|
||||
loglevel=config['loglevel'] if tl else None,
|
||||
),
|
||||
tractor.get_arbiter(
|
||||
tractor.get_registry(
|
||||
host=host,
|
||||
port=ports[0]
|
||||
) as portal
|
||||
|
|
|
@ -284,7 +284,8 @@ class Sampler:
|
|||
|
||||
except (
|
||||
trio.BrokenResourceError,
|
||||
trio.ClosedResourceError
|
||||
trio.ClosedResourceError,
|
||||
trio.EndOfChannel,
|
||||
):
|
||||
log.error(
|
||||
f'{stream._ctx.chan.uid} dropped connection'
|
||||
|
@ -697,7 +698,7 @@ async def sample_and_broadcast(
|
|||
|
||||
log.warning(
|
||||
f'Feed OVERRUN {sub_key}'
|
||||
'@{bus.brokername} -> \n'
|
||||
f'@{bus.brokername} -> \n'
|
||||
f'feed @ {chan.uid}\n'
|
||||
f'throttle = {throttle} Hz'
|
||||
)
|
||||
|
@ -876,6 +877,7 @@ async def uniform_rate_send(
|
|||
except tractor.RemoteActorError as rme:
|
||||
if rme.type is not tractor._exceptions.StreamOverrun:
|
||||
raise
|
||||
|
||||
ctx = stream._ctx
|
||||
chan = ctx.chan
|
||||
log.warning(
|
||||
|
@ -892,6 +894,7 @@ async def uniform_rate_send(
|
|||
trio.ClosedResourceError,
|
||||
trio.BrokenResourceError,
|
||||
ConnectionResetError,
|
||||
trio.EndOfChannel,
|
||||
):
|
||||
# if the feed consumer goes down then drop
|
||||
# out of this rate limiter
|
||||
|
|
|
@ -39,6 +39,7 @@ from typing import (
|
|||
AsyncContextManager,
|
||||
Awaitable,
|
||||
Sequence,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
import trio
|
||||
|
@ -75,6 +76,10 @@ from ._sampling import (
|
|||
uniform_rate_send,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from tractor._addr import Address
|
||||
from tractor.msg.types import Aid
|
||||
|
||||
|
||||
class Sub(Struct, frozen=True):
|
||||
'''
|
||||
|
@ -723,7 +728,10 @@ class Feed(Struct):
|
|||
async for msg in stream:
|
||||
await tx.send(msg)
|
||||
|
||||
async with trio.open_nursery() as nurse:
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as nurse
|
||||
):
|
||||
# spawn a relay task for each stream so that they all
|
||||
# multiplex to a common channel.
|
||||
for brokername in mods:
|
||||
|
@ -899,19 +907,19 @@ async def open_feed(
|
|||
feed.portals[brokermod] = portal
|
||||
|
||||
# fill out "status info" that the UI can show
|
||||
host, port = portal.channel.raddr
|
||||
if host == '127.0.0.1':
|
||||
host = 'localhost'
|
||||
|
||||
chan: tractor.Channel = portal.chan
|
||||
raddr: Address = chan.raddr
|
||||
aid: Aid = chan.aid
|
||||
# TAG_feed_status_update
|
||||
feed.status.update({
|
||||
'actor_name': portal.channel.uid[0],
|
||||
'host': host,
|
||||
'port': port,
|
||||
'actor_id': aid,
|
||||
'actor_short_id': f'{aid.name}@{aid.pid}',
|
||||
'ipc': chan.raddr.proto_key,
|
||||
'ipc_addr': raddr,
|
||||
'hist_shm': 'NA',
|
||||
'rt_shm': 'NA',
|
||||
'throttle_rate': tick_throttle,
|
||||
'throttle_hz': tick_throttle,
|
||||
})
|
||||
# feed.status.update(init_msg.pop('status', {}))
|
||||
|
||||
# (allocate and) connect to any feed bus for this broker
|
||||
bus_ctxs.append(
|
||||
|
|
|
@ -498,6 +498,7 @@ async def cascade(
|
|||
|
||||
func_name: str = func.__name__
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(), # avoid multi-taskc tb in console
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
# TODO: might be better to just make a "restart" method where
|
||||
|
|
28
piker/log.py
28
piker/log.py
|
@ -18,7 +18,11 @@
|
|||
Log like a forester!
|
||||
"""
|
||||
import logging
|
||||
import reprlib
|
||||
import json
|
||||
from typing import (
|
||||
Callable,
|
||||
)
|
||||
|
||||
import tractor
|
||||
from pygments import (
|
||||
|
@ -84,3 +88,27 @@ def colorize_json(
|
|||
# likeable styles: algol_nu, tango, monokai
|
||||
formatters.TerminalTrueColorFormatter(style=style)
|
||||
)
|
||||
|
||||
|
||||
def mk_repr(
|
||||
**repr_kws,
|
||||
) -> Callable[[str], str]:
|
||||
'''
|
||||
Allocate and deliver a `repr.Repr` instance with provided input
|
||||
settings using the std-lib's `reprlib` mod,
|
||||
* https://docs.python.org/3/library/reprlib.html
|
||||
|
||||
------ Ex. ------
|
||||
An up to 6-layer-nested `dict` as multi-line:
|
||||
- https://stackoverflow.com/a/79102479
|
||||
- https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
|
||||
|
||||
'''
|
||||
def_kws: dict[str, int] = dict(
|
||||
indent=2,
|
||||
maxlevel=6, # recursion levels
|
||||
maxstring=66, # match editor line-len limit
|
||||
)
|
||||
def_kws |= repr_kws
|
||||
reprr = reprlib.Repr(**def_kws)
|
||||
return reprr.repr
|
||||
|
|
|
@ -200,7 +200,8 @@ async def open_pikerd(
|
|||
reg_addrs,
|
||||
),
|
||||
tractor.open_nursery() as actor_nursery,
|
||||
trio.open_nursery() as service_nursery,
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as service_tn,
|
||||
):
|
||||
for addr in reg_addrs:
|
||||
if addr not in root_actor.accept_addrs:
|
||||
|
@ -211,7 +212,7 @@ async def open_pikerd(
|
|||
|
||||
# assign globally for future daemon/task creation
|
||||
Services.actor_n = actor_nursery
|
||||
Services.service_n = service_nursery
|
||||
Services.service_n = service_tn
|
||||
Services.debug_mode = debug_mode
|
||||
|
||||
try:
|
||||
|
@ -221,7 +222,7 @@ async def open_pikerd(
|
|||
# TODO: is this more clever/efficient?
|
||||
# if 'samplerd' in Services.service_tasks:
|
||||
# await Services.cancel_service('samplerd')
|
||||
service_nursery.cancel_scope.cancel()
|
||||
service_tn.cancel_scope.cancel()
|
||||
|
||||
|
||||
# TODO: do we even need this?
|
||||
|
|
|
@ -138,6 +138,16 @@ class StorageClient(
|
|||
) -> None:
|
||||
...
|
||||
|
||||
async def write_oi(
|
||||
self,
|
||||
fqme: str,
|
||||
oi: np.ndarray,
|
||||
append_and_duplicate: bool = True,
|
||||
limit: int = int(800e3),
|
||||
|
||||
) -> None:
|
||||
...
|
||||
|
||||
|
||||
class TimeseriesNotFound(Exception):
|
||||
'''
|
||||
|
|
|
@ -111,6 +111,24 @@ def mk_ohlcv_shm_keyed_filepath(
|
|||
return path
|
||||
|
||||
|
||||
def mk_oi_shm_keyed_filepath(
|
||||
fqme: str,
|
||||
period: float | int,
|
||||
datadir: Path,
|
||||
|
||||
) -> Path:
|
||||
|
||||
if period < 1.:
|
||||
raise ValueError('Sample period should be >= 1.!?')
|
||||
|
||||
path: Path = (
|
||||
datadir
|
||||
/
|
||||
f'{fqme}.oi{int(period)}s.parquet'
|
||||
)
|
||||
return path
|
||||
|
||||
|
||||
def unpack_fqme_from_parquet_filepath(path: Path) -> str:
|
||||
|
||||
filename: str = str(path.name)
|
||||
|
@ -172,7 +190,11 @@ class NativeStorageClient:
|
|||
|
||||
key: str = path.name.rstrip('.parquet')
|
||||
fqme, _, descr = key.rpartition('.')
|
||||
if 'ohlcv' in descr:
|
||||
prefix, _, suffix = descr.partition('ohlcv')
|
||||
elif 'oi' in descr:
|
||||
prefix, _, suffix = descr.partition('oi')
|
||||
|
||||
period: int = int(suffix.strip('s'))
|
||||
|
||||
# cache description data
|
||||
|
@ -369,6 +391,61 @@ class NativeStorageClient:
|
|||
timeframe,
|
||||
)
|
||||
|
||||
def _write_oi(
|
||||
self,
|
||||
fqme: str,
|
||||
oi: np.ndarray,
|
||||
|
||||
) -> Path:
|
||||
'''
|
||||
Sync version of the public interface meth, since we don't
|
||||
currently actually need or support an async impl.
|
||||
|
||||
'''
|
||||
path: Path = mk_oi_shm_keyed_filepath(
|
||||
fqme=fqme,
|
||||
period=1,
|
||||
datadir=self._datadir,
|
||||
)
|
||||
if isinstance(oi, np.ndarray):
|
||||
new_df: pl.DataFrame = tsp.np2pl(oi)
|
||||
else:
|
||||
new_df = oi
|
||||
|
||||
if path.exists():
|
||||
old_df = pl.read_parquet(path)
|
||||
df = pl.concat([old_df, new_df])
|
||||
else:
|
||||
df = new_df
|
||||
|
||||
start = time.time()
|
||||
df.write_parquet(path)
|
||||
delay: float = round(
|
||||
time.time() - start,
|
||||
ndigits=6,
|
||||
)
|
||||
log.info(
|
||||
f'parquet write took {delay} secs\n'
|
||||
f'file path: {path}'
|
||||
)
|
||||
return path
|
||||
|
||||
async def write_oi(
|
||||
self,
|
||||
fqme: str,
|
||||
oi: np.ndarray,
|
||||
|
||||
) -> Path:
|
||||
'''
|
||||
Write input oi time series for fqme and sampling period
|
||||
to (local) disk.
|
||||
|
||||
'''
|
||||
return self._write_oi(
|
||||
fqme,
|
||||
oi,
|
||||
)
|
||||
|
||||
async def delete_ts(
|
||||
self,
|
||||
key: str,
|
||||
|
|
|
@ -963,7 +963,10 @@ async def tsdb_backfill(
|
|||
# concurrently load the provider's most-recent-frame AND any
|
||||
# pre-existing tsdb history already saved in `piker` storage.
|
||||
dt_eps: list[DateTime, DateTime] = []
|
||||
async with trio.open_nursery() as tn:
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn
|
||||
):
|
||||
tn.start_soon(
|
||||
push_latest_frame,
|
||||
dt_eps,
|
||||
|
@ -1012,9 +1015,16 @@ async def tsdb_backfill(
|
|||
int,
|
||||
Duration,
|
||||
]|None = config.get('frame_types', None)
|
||||
|
||||
if def_frame_durs:
|
||||
def_frame_size: Duration = def_frame_durs[timeframe]
|
||||
assert def_frame_size == calced_frame_size
|
||||
|
||||
if def_frame_size != calced_frame_size:
|
||||
log.warning(
|
||||
f'Expected frame size {def_frame_size}\n'
|
||||
f'Rxed frame {calced_frame_size}\n'
|
||||
)
|
||||
# await tractor.pause()
|
||||
else:
|
||||
# use what we calced from first frame above.
|
||||
def_frame_size = calced_frame_size
|
||||
|
@ -1043,7 +1053,9 @@ async def tsdb_backfill(
|
|||
# if there is a gap to backfill from the first
|
||||
# history frame until the last datum loaded from the tsdb
|
||||
# continue that now in the background
|
||||
async with trio.open_nursery() as tn:
|
||||
async with trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as tn:
|
||||
|
||||
bf_done = await tn.start(
|
||||
partial(
|
||||
|
@ -1308,6 +1320,7 @@ async def manage_history(
|
|||
# sampling period) data set since normally differently
|
||||
# sampled timeseries can be loaded / process independently
|
||||
# ;)
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
log.info(
|
||||
|
|
|
@ -517,7 +517,7 @@ def with_dts(
|
|||
|
||||
'''
|
||||
return df.with_columns([
|
||||
pl.col(time_col).shift(1).suffix('_prev'),
|
||||
pl.col(time_col).shift(1).name.suffix('_prev'),
|
||||
pl.col(time_col).diff().alias('s_diff'),
|
||||
pl.from_epoch(pl.col(time_col)).alias('dt'),
|
||||
]).with_columns([
|
||||
|
@ -623,7 +623,7 @@ def detect_vlm_gaps(
|
|||
|
||||
) -> pl.DataFrame:
|
||||
|
||||
vnull: pl.DataFrame = w_dts.filter(
|
||||
vnull: pl.DataFrame = df.filter(
|
||||
pl.col(col) == 0
|
||||
)
|
||||
return vnull
|
||||
|
|
|
@ -21,6 +21,7 @@ Main app startup and run.
|
|||
from functools import partial
|
||||
from types import ModuleType
|
||||
|
||||
import tractor
|
||||
import trio
|
||||
|
||||
from piker.ui.qt import (
|
||||
|
@ -116,6 +117,7 @@ async def _async_main(
|
|||
needed_brokermods[brokername] = brokers[brokername]
|
||||
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as root_n,
|
||||
):
|
||||
# set root nursery and task stack for spawning other charts/feeds
|
||||
|
|
|
@ -33,7 +33,6 @@ import trio
|
|||
|
||||
from piker.ui.qt import (
|
||||
QtCore,
|
||||
QtWidgets,
|
||||
Qt,
|
||||
QLineF,
|
||||
QFrame,
|
||||
|
|
|
@ -1445,7 +1445,10 @@ async def display_symbol_data(
|
|||
# for pause/resume on mouse interaction
|
||||
rt_chart.feed = feed
|
||||
|
||||
async with trio.open_nursery() as ln:
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as ln,
|
||||
):
|
||||
# if available load volume related built-in display(s)
|
||||
vlm_charts: dict[
|
||||
str,
|
||||
|
|
|
@ -22,7 +22,10 @@ from contextlib import asynccontextmanager as acm
|
|||
from typing import Callable
|
||||
|
||||
import trio
|
||||
from tractor.trionics import gather_contexts
|
||||
from tractor.trionics import (
|
||||
gather_contexts,
|
||||
collapse_eg,
|
||||
)
|
||||
|
||||
from piker.ui.qt import (
|
||||
QtCore,
|
||||
|
@ -207,7 +210,10 @@ async def open_signal_handler(
|
|||
async for args in recv:
|
||||
await async_handler(*args)
|
||||
|
||||
async with trio.open_nursery() as tn:
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as tn
|
||||
):
|
||||
tn.start_soon(proxy_to_handler)
|
||||
async with send:
|
||||
yield
|
||||
|
@ -242,6 +248,7 @@ async def open_handlers(
|
|||
widget: QWidget
|
||||
streams: list[trio.abc.ReceiveChannel]
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
gather_contexts([
|
||||
open_event_stream(
|
||||
|
|
|
@ -18,10 +18,11 @@
|
|||
Feed status and controls widget(s) for embedding in a UI-pane.
|
||||
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
from textwrap import dedent
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import (
|
||||
Any,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
# from PyQt5.QtCore import Qt
|
||||
|
||||
|
@ -49,35 +50,55 @@ def mk_feed_label(
|
|||
a feed control protocol.
|
||||
|
||||
'''
|
||||
status = feed.status
|
||||
status: dict[str, Any] = feed.status
|
||||
assert status
|
||||
|
||||
msg = dedent("""
|
||||
actor: **{actor_name}**\n
|
||||
|_ @**{host}:{port}**\n
|
||||
""")
|
||||
# SO tips on ws/nls,
|
||||
# https://stackoverflow.com/a/15721400
|
||||
ws: str = ' '
|
||||
# nl: str = '<br>' # dun work?
|
||||
actor_info_repr: str = (
|
||||
f')> **{status["actor_short_id"]}**\n'
|
||||
'\n' # bc md?
|
||||
)
|
||||
|
||||
for key, val in status.items():
|
||||
if key in ('host', 'port', 'actor_name'):
|
||||
continue
|
||||
msg += f'\n|_ {key}: **{{{key}}}**\n'
|
||||
# fields to select *IN* for display
|
||||
# (see `.data.feed.open_feed()` status
|
||||
# update -> TAG_feed_status_update)
|
||||
for key in [
|
||||
'ipc',
|
||||
'hist_shm',
|
||||
'rt_shm',
|
||||
'throttle_hz',
|
||||
]:
|
||||
# NOTE, the 2nd key is filled via `.format()` updates.
|
||||
actor_info_repr += (
|
||||
f'\n' # bc md?
|
||||
f'{ws}|_{key}: **{{{key}}}**\n'
|
||||
)
|
||||
# ^TODO? formatting and content..
|
||||
# -[ ] showing which fqme is "forward" on the
|
||||
# chart/fsp/order-mode?
|
||||
# '|_ flows: **{symbols}**\n'
|
||||
#
|
||||
# -[x] why isn't the indent working?
|
||||
# => markdown, now solved..
|
||||
|
||||
feed_label = FormatLabel(
|
||||
fmt_str=msg,
|
||||
# |_ streams: **{symbols}**\n
|
||||
fmt_str=actor_info_repr,
|
||||
font=_font.font,
|
||||
font_size=_font_small.px_size,
|
||||
font_color='default_lightest',
|
||||
)
|
||||
|
||||
# ?TODO, remove this?
|
||||
# form.vbox.setAlignment(feed_label, Qt.AlignBottom)
|
||||
# form.vbox.setAlignment(Qt.AlignBottom)
|
||||
_ = chart.height() - (
|
||||
form.height() +
|
||||
form.fill_bar.height()
|
||||
# feed_label.height()
|
||||
)
|
||||
# _ = chart.height() - (
|
||||
# form.height() +
|
||||
# form.fill_bar.height()
|
||||
# # feed_label.height()
|
||||
# )
|
||||
|
||||
feed_label.format(**feed.status)
|
||||
|
||||
return feed_label
|
||||
|
|
|
@ -600,6 +600,7 @@ async def open_fsp_admin(
|
|||
kwargs=kwargs,
|
||||
) as (cache_hit, cluster_map),
|
||||
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
if cache_hit:
|
||||
|
@ -613,6 +614,8 @@ async def open_fsp_admin(
|
|||
)
|
||||
try:
|
||||
yield admin
|
||||
|
||||
# ??TODO, does this *need* to be inside a finally?
|
||||
finally:
|
||||
# terminate all tasks via signals
|
||||
for key, entry in admin._registry.items():
|
||||
|
|
|
@ -285,18 +285,20 @@ class FormatLabel(QLabel):
|
|||
font_size: int,
|
||||
font_color: str,
|
||||
|
||||
use_md: bool = True,
|
||||
|
||||
parent=None,
|
||||
|
||||
) -> None:
|
||||
|
||||
super().__init__(parent)
|
||||
|
||||
# by default set the format string verbatim and expect user to
|
||||
# call ``.format()`` later (presumably they'll notice the
|
||||
# by default set the format string verbatim and expect user
|
||||
# to call ``.format()`` later (presumably they'll notice the
|
||||
# unformatted content if ``fmt_str`` isn't meant to be
|
||||
# unformatted).
|
||||
self.fmt_str = fmt_str
|
||||
self.setText(fmt_str)
|
||||
# self.setText(fmt_str) # ?TODO, why here?
|
||||
|
||||
self.setStyleSheet(
|
||||
f"""QLabel {{
|
||||
|
@ -306,6 +308,7 @@ class FormatLabel(QLabel):
|
|||
"""
|
||||
)
|
||||
self.setFont(_font.font)
|
||||
if use_md:
|
||||
self.setTextFormat(
|
||||
Qt.TextFormat.MarkdownText
|
||||
)
|
||||
|
@ -316,7 +319,10 @@ class FormatLabel(QLabel):
|
|||
size_policy.Expanding,
|
||||
)
|
||||
self.setAlignment(
|
||||
Qt.AlignVCenter | Qt.AlignLeft
|
||||
Qt.AlignLeft
|
||||
|
|
||||
Qt.AlignBottom
|
||||
# Qt.AlignVCenter
|
||||
)
|
||||
self.setText(self.fmt_str)
|
||||
|
||||
|
|
|
@ -269,6 +269,8 @@ def hcolor(name: str) -> str:
|
|||
|
||||
# default ohlc-bars/curve gray
|
||||
'bracket': '#666666', # like the logo
|
||||
'pikers': '#616161', # a trader shade of..
|
||||
'beast': '#161616', # in the dark alone.
|
||||
|
||||
# bluish
|
||||
'charcoal': '#36454F',
|
||||
|
|
|
@ -21,6 +21,7 @@ Chart trading, the only way to scalp.
|
|||
from __future__ import annotations
|
||||
from contextlib import asynccontextmanager
|
||||
from dataclasses import dataclass, field
|
||||
from decimal import Decimal
|
||||
from functools import partial
|
||||
from pprint import pformat
|
||||
import time
|
||||
|
@ -41,7 +42,6 @@ from piker.accounting import (
|
|||
Position,
|
||||
mk_allocator,
|
||||
MktPair,
|
||||
Symbol,
|
||||
)
|
||||
from piker.clearing import (
|
||||
open_ems,
|
||||
|
@ -143,6 +143,15 @@ class OrderMode:
|
|||
}
|
||||
_staged_order: Order | None = None
|
||||
|
||||
@property
|
||||
def curr_mkt(self) -> MktPair:
|
||||
'''
|
||||
Deliver the currently selected `MktPair` according
|
||||
chart state.
|
||||
|
||||
'''
|
||||
return self.chart.linked.mkt
|
||||
|
||||
def on_level_change_update_next_order_info(
|
||||
self,
|
||||
level: float,
|
||||
|
@ -172,7 +181,11 @@ class OrderMode:
|
|||
line.update_labels(order_info)
|
||||
|
||||
# update bound-in staged order
|
||||
order.price = level
|
||||
mkt: MktPair = self.curr_mkt
|
||||
order.price: Decimal = mkt.quantize(
|
||||
size=level,
|
||||
quantity_type='price',
|
||||
)
|
||||
order.size = order_info['size']
|
||||
|
||||
# when an order is changed we flip the settings side-pane to
|
||||
|
@ -187,7 +200,9 @@ class OrderMode:
|
|||
|
||||
) -> LevelLine:
|
||||
|
||||
level = order.price
|
||||
# TODO, if we instead just always decimalize at the ems layer
|
||||
# we can avoid this back-n-forth casting?
|
||||
level = float(order.price)
|
||||
|
||||
line = order_line(
|
||||
chart or self.chart,
|
||||
|
@ -224,7 +239,11 @@ class OrderMode:
|
|||
# the order mode allocator but we still need to update the
|
||||
# "staged" order message we'll send to the ems
|
||||
def update_order_price(y: float) -> None:
|
||||
order.price = y
|
||||
mkt: MktPair = self.curr_mkt
|
||||
order.price: Decimal = mkt.quantize(
|
||||
size=y,
|
||||
quantity_type='price',
|
||||
)
|
||||
|
||||
line._on_level_change = update_order_price
|
||||
|
||||
|
@ -275,34 +294,31 @@ class OrderMode:
|
|||
chart = cursor.linked.chart
|
||||
if (
|
||||
not chart
|
||||
and cursor
|
||||
and cursor.active_plot
|
||||
and
|
||||
cursor
|
||||
and
|
||||
cursor.active_plot
|
||||
):
|
||||
return
|
||||
|
||||
chart = cursor.active_plot
|
||||
price = cursor._datum_xy[1]
|
||||
price: float = cursor._datum_xy[1]
|
||||
if not price:
|
||||
# zero prices are not supported by any means
|
||||
# since that's illogical / a no-op.
|
||||
return
|
||||
|
||||
mkt: MktPair = self.chart.linked.mkt
|
||||
|
||||
# NOTE : we could also use instead,
|
||||
# mkt.quantize(price, quantity_type='price')
|
||||
# but it returns a Decimal and it's probably gonna
|
||||
# be slower?
|
||||
# TODO: should we be enforcing this precision
|
||||
# at a different layer in the stack? right now
|
||||
# any precision error will literally be relayed
|
||||
# all the way back from the backend.
|
||||
|
||||
price = round(
|
||||
price,
|
||||
ndigits=mkt.price_tick_digits,
|
||||
# at a different layer in the stack?
|
||||
# |_ might require `MktPair` tracking in the EMS?
|
||||
# |_ right now any precision error will be relayed
|
||||
# all the way back from the backend and vice-versa..
|
||||
#
|
||||
mkt: MktPair = self.curr_mkt
|
||||
price: Decimal = mkt.quantize(
|
||||
size=price,
|
||||
quantity_type='price',
|
||||
)
|
||||
|
||||
order = self._staged_order = Order(
|
||||
action=action,
|
||||
price=price,
|
||||
|
@ -378,7 +394,7 @@ class OrderMode:
|
|||
'oid': oid,
|
||||
})
|
||||
|
||||
if order.price <= 0:
|
||||
if float(order.price) <= 0:
|
||||
log.error(
|
||||
'*!? Invalid `Order.price <= 0` ?!*\n'
|
||||
# TODO: make this present multi-line in object form
|
||||
|
@ -515,14 +531,15 @@ class OrderMode:
|
|||
# if an order msg is provided update the line
|
||||
# **from** that msg.
|
||||
if order:
|
||||
if order.price <= 0:
|
||||
price: float = float(order.price)
|
||||
if price <= 0:
|
||||
log.error(f'Order has 0 price, cancelling..\n{order}')
|
||||
self.cancel_orders([order.oid])
|
||||
return None
|
||||
|
||||
line.set_level(order.price)
|
||||
line.set_level(price)
|
||||
self.on_level_change_update_next_order_info(
|
||||
level=order.price,
|
||||
level=price,
|
||||
line=line,
|
||||
order=order,
|
||||
# use the corresponding position tracker for the
|
||||
|
@ -681,9 +698,9 @@ class OrderMode:
|
|||
) -> Dialog | None:
|
||||
# NOTE: the `.order` attr **must** be set with the
|
||||
# equivalent order msg in order to be loaded.
|
||||
order = msg.req
|
||||
order: Order = msg.req
|
||||
oid = str(msg.oid)
|
||||
symbol = order.symbol
|
||||
symbol: str = order.symbol
|
||||
|
||||
# TODO: MEGA UGGG ZONEEEE!
|
||||
src = msg.src
|
||||
|
@ -702,13 +719,22 @@ class OrderMode:
|
|||
order.oid = str(order.oid)
|
||||
order.brokers = [brokername]
|
||||
|
||||
# TODO: change this over to `MktPair`, but it's
|
||||
# gonna be tough since we don't have any such data
|
||||
# really in our clearing msg schema..
|
||||
order.symbol = Symbol.from_fqme(
|
||||
fqsn=fqme,
|
||||
info={},
|
||||
)
|
||||
# ?TODO? change this over to `MktPair`, but it's gonna be
|
||||
# tough since we don't have any such data really in our
|
||||
# clearing msg schema..
|
||||
# BUT WAIT! WHY do we even want/need this!?
|
||||
#
|
||||
# order.symbol = self.curr_mkt
|
||||
#
|
||||
# XXX, the old approach.. which i don't quire member why..
|
||||
# -[ ] verify we for sure don't require this any more!
|
||||
# |_https://github.com/pikers/piker/issues/517
|
||||
#
|
||||
# order.symbol = Symbol.from_fqme(
|
||||
# fqsn=fqme,
|
||||
# info={},
|
||||
# )
|
||||
|
||||
maybe_dialog: Dialog | None = self.submit_order(
|
||||
send_msg=False,
|
||||
order=order,
|
||||
|
@ -766,6 +792,7 @@ async def open_order_mode(
|
|||
brokerd_accounts,
|
||||
ems_dialog_msgs,
|
||||
),
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
|
||||
):
|
||||
|
@ -1101,7 +1128,7 @@ async def process_trade_msg(
|
|||
)
|
||||
)
|
||||
):
|
||||
msg.req = order
|
||||
msg.req: Order = order
|
||||
dialog: (
|
||||
Dialog
|
||||
# NOTE: on an invalid order submission (eg.
|
||||
|
@ -1166,7 +1193,7 @@ async def process_trade_msg(
|
|||
tm = time.time()
|
||||
mode.on_fill(
|
||||
oid,
|
||||
price=req.price,
|
||||
price=float(req.price),
|
||||
time_s=tm,
|
||||
)
|
||||
mode.lines.remove_line(uuid=oid)
|
||||
|
@ -1221,7 +1248,7 @@ async def process_trade_msg(
|
|||
tm = details['broker_time']
|
||||
mode.on_fill(
|
||||
oid,
|
||||
price=details['price'],
|
||||
price=float(details['price']),
|
||||
time_s=tm,
|
||||
pointing='up' if action == 'buy' else 'down',
|
||||
)
|
||||
|
|
|
@ -23,7 +23,7 @@ name = "piker"
|
|||
version = "0.1.0a0dev0"
|
||||
description = "trading gear for hackers"
|
||||
authors = [{ name = "Tyler Goodlet", email = "goodboy_foss@protonmail.com" }]
|
||||
requires-python = ">=3.12, <3.13"
|
||||
requires-python = ">=3.12"
|
||||
license = "AGPL-3.0-or-later"
|
||||
readme = "README.rst"
|
||||
keywords = [
|
||||
|
@ -39,8 +39,8 @@ classifiers = [
|
|||
"Operating System :: POSIX :: Linux",
|
||||
"Programming Language :: Python :: Implementation :: CPython",
|
||||
"Programming Language :: Python :: 3 :: Only",
|
||||
"Programming Language :: Python :: 3.11",
|
||||
"Programming Language :: Python :: 3.12",
|
||||
"Programming Language :: Python :: 3.13",
|
||||
"Intended Audience :: Financial and Insurance Industry",
|
||||
"Intended Audience :: Science/Research",
|
||||
"Intended Audience :: Developers",
|
||||
|
@ -49,13 +49,13 @@ classifiers = [
|
|||
dependencies = [
|
||||
"async-generator >=1.10, <2.0.0",
|
||||
"attrs >=23.1.0, <24.0.0",
|
||||
"bidict >=0.22.1, <0.23.0",
|
||||
"bidict >=0.23.1",
|
||||
"colorama >=0.4.6, <0.5.0",
|
||||
"colorlog >=6.7.0, <7.0.0",
|
||||
"ib-insync >=0.9.86, <0.10.0",
|
||||
"numba >=0.59.0, <0.60.0",
|
||||
"numpy >=1.25, <2.0",
|
||||
"polars >=0.18.13, <0.19.0",
|
||||
"numpy>=2.0",
|
||||
"polars >=0.20.6",
|
||||
"polars-fuzzy-match>=0.1.5",
|
||||
"pygments >=2.16.1, <3.0.0",
|
||||
"rich >=13.5.2, <14.0.0",
|
||||
"tomli >=2.0.1, <3.0.0",
|
||||
|
@ -65,16 +65,18 @@ dependencies = [
|
|||
"typer >=0.9.0, <1.0.0",
|
||||
"rapidfuzz >=3.5.2, <4.0.0",
|
||||
"pdbp >=1.5.0, <2.0.0",
|
||||
"trio >=0.24, <0.25",
|
||||
"trio >=0.27",
|
||||
"pendulum >=3.0.0, <4.0.0",
|
||||
"httpx >=0.27.0, <0.28.0",
|
||||
"cryptofeed >=2.4.0, <3.0.0",
|
||||
"pyarrow >=17.0.0, <18.0.0",
|
||||
"pyarrow>=18.0.0",
|
||||
"websockets ==12.0",
|
||||
"msgspec",
|
||||
"msgspec>=0.19.0,<0.20",
|
||||
"tractor",
|
||||
"asyncvnc",
|
||||
"tomlkit",
|
||||
"trio-typing>=0.10.0",
|
||||
"numba>=0.61.0",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
|
@ -112,6 +114,7 @@ dev = [
|
|||
"cython >=3.0.0, <4.0.0",
|
||||
"greenback >=1.1.1, <2.0.0",
|
||||
"ruff>=0.9.6",
|
||||
"pyperclip>=1.9.0",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
|
@ -125,9 +128,21 @@ include = ["piker"]
|
|||
[tool.hatch.build.targets.wheel]
|
||||
include = ["piker"]
|
||||
|
||||
|
||||
# TODO? move to a `uv.toml`?
|
||||
[tool.uv]
|
||||
python-preference = 'system'
|
||||
python-downloads = 'manual'
|
||||
|
||||
|
||||
[tool.uv.sources]
|
||||
pyqtgraph = { git = "https://github.com/pikers/pyqtgraph.git" }
|
||||
asyncvnc = { git = "https://github.com/pikers/asyncvnc.git", branch = "main" }
|
||||
tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" }
|
||||
msgspec = { git = "https://github.com/jcrist/msgspec.git" }
|
||||
|
||||
# tractor = { git = "https://pikers.dev/goodboy/tractor", branch = "main" }
|
||||
# tractor = { git = "https://pikers.dev/goodboy/tractor", branch = "moar_eg_smoothing" }
|
||||
|
||||
# XXX for @goodboy's hackin, usually there's something new in the
|
||||
# runtime being seriously tested here Bp
|
||||
tractor = { path = "../tractor", editable = true }
|
||||
|
|
|
@ -62,8 +62,9 @@ ignore-init-module-imports = false
|
|||
fixable = ["ALL"]
|
||||
unfixable = []
|
||||
|
||||
# TODO? uhh why no work!?
|
||||
# Allow unused variables when underscore-prefixed.
|
||||
dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
|
||||
# dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
|
||||
|
||||
[format]
|
||||
# Use single quotes in `ruff format`.
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
TAG_feed_status_update ./piker/data/feed.py /TAG_feed_status_update/
|
|
@ -179,7 +179,7 @@ def test_ems_err_on_bad_broker(
|
|||
# NOTE: emsd should error on the actor's enabled modules
|
||||
# import phase, when looking for a backend named `doggy`.
|
||||
except tractor.RemoteActorError as re:
|
||||
assert re.type == ModuleNotFoundError
|
||||
assert re.type is ModuleNotFoundError
|
||||
|
||||
run_and_tollerate_cancels(load_bad_fqme)
|
||||
|
||||
|
|
|
@ -142,7 +142,12 @@ async def test_concurrent_tokens_refresh(us_symbols, loglevel):
|
|||
# async with tractor.open_nursery() as n:
|
||||
# await n.run_in_actor('other', intermittently_refresh_tokens)
|
||||
|
||||
async with trio.open_nursery() as n:
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery(
|
||||
# strict_exception_groups=False,
|
||||
) as n
|
||||
):
|
||||
|
||||
quoter = await qt.stock_quoter(client, us_symbols)
|
||||
|
||||
|
@ -383,7 +388,9 @@ async def test_quote_streaming(tmx_symbols, loglevel, stream_what):
|
|||
else:
|
||||
symbols = [tmx_symbols]
|
||||
|
||||
async with trio.open_nursery() as n:
|
||||
async with trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as n:
|
||||
for syms, func in zip(symbols, stream_what):
|
||||
n.start_soon(func, feed, syms)
|
||||
|
||||
|
|
Loading…
Reference in New Issue