Compare commits

..

4 Commits

Author SHA1 Message Date
Nelson Torres 4c486e6dd2 Port `binance` backend to `httpx` 2024-05-23 18:12:31 -03:00
Tyler Goodlet 44b8c70521 Change type-annots to use `httpx.Response` 2024-05-20 12:55:45 -04:00
Tyler Goodlet e6af97c596 Port `kucoin` backend to `httpx` 2024-05-20 11:09:30 -04:00
Tyler Goodlet 95ace5acb8 Port `kraken` backend to `httpx` 2024-05-20 11:09:10 -04:00
23 changed files with 598 additions and 2786 deletions

View File

@ -1,161 +1,162 @@
piker
-----
trading gear for hackers
trading gear for hackers.
|gh_actions|
.. |gh_actions| image:: https://img.shields.io/endpoint.svg?url=https%3A%2F%2Factions-badge.atrox.dev%2Fpikers%2Fpiker%2Fbadge&style=popout-square
:target: https://actions-badge.atrox.dev/piker/pikers/goto
``piker`` is a broker agnostic, next-gen FOSS toolset and runtime for
real-time computational trading targeted at `hardcore Linux users
<comp_trader>`_ .
``piker`` is a broker agnostic, next-gen FOSS toolset for real-time
computational trading targeted at `hardcore Linux users <comp_trader>`_ .
we use much bleeding edge tech including (but not limited to):
we use as much bleeding edge tech as possible including (but not limited to):
- latest python for glue_
- uv_ for packaging and distribution
- trio_ & tractor_ for our distributed `structured concurrency`_ runtime
- Qt_ for pristine low latency UIs
- pyqtgraph_ (which we've extended) for real-time charting and graphics
- ``polars`` ``numpy`` and ``numba`` for redic `fast numerics`_
- `apache arrow and parquet`_ for time-series storage
- trio_ & tractor_ for our distributed, multi-core, real-time streaming
`structured concurrency`_ runtime B)
- Qt_ for pristine high performance UIs
- pyqtgraph_ for real-time charting
- ``polars`` ``numpy`` and ``numba`` for `fast numerics`_
- `apache arrow and parquet`_ for time series history management
persistence and sharing
- (prototyped) techtonicdb_ for L2 book storage
potential projects we might integrate with soon,
- (already prototyped in ) techtonicdb_ for L2 book storage
.. _comp_trader: https://jfaleiro.wordpress.com/2019/10/09/computational-trader/
.. _glue: https://numpy.org/doc/stable/user/c-info.python-as-glue.html#using-python-as-glue
.. _uv: https://docs.astral.sh/uv/
.. |travis| image:: https://img.shields.io/travis/pikers/piker/master.svg
:target: https://travis-ci.org/pikers/piker
.. _trio: https://github.com/python-trio/trio
.. _tractor: https://github.com/goodboy/tractor
.. _structured concurrency: https://trio.discourse.group/
.. _marketstore: https://github.com/alpacahq/marketstore
.. _techtonicdb: https://github.com/0b01/tectonicdb
.. _Qt: https://www.qt.io/
.. _pyqtgraph: https://github.com/pyqtgraph/pyqtgraph
.. _glue: https://numpy.org/doc/stable/user/c-info.python-as-glue.html#using-python-as-glue
.. _apache arrow and parquet: https://arrow.apache.org/faq/
.. _fast numerics: https://zerowithdot.com/python-numpy-and-pandas-performance/
.. _techtonicdb: https://github.com/0b01/tectonicdb
.. _comp_trader: https://jfaleiro.wordpress.com/2019/10/09/computational-trader/
focus and feats:
****************
fitting with these tenets, we're always open to new
framework/lib/service interop suggestions and ideas!
focus and features:
*******************
- 100% federated: your code, your hardware, your data feeds, your broker fills.
- zero web: low latency, native software that doesn't try to re-invent the OS
- maximal **privacy**: prevent brokers and mms from knowing your
planz; smack their spreads with dark volume.
- zero clutter: modal, context oriented UIs that echew minimalism, reduce
thought noise and encourage un-emotion.
- first class parallelism: built from the ground up on next-gen structured concurrency
primitives.
- traders first: broker/exchange/asset-class agnostic
- systems grounded: real-time financial signal processing that will
make any queuing or DSP eng juice their shorts.
- non-tina UX: sleek, powerful keyboard driven interaction with expected use in tiling wms
- data collaboration: every process and protocol is multi-host scalable.
- fight club ready: zero interest in adoption by suits; no corporate friendly license, ever.
- **100% federated**:
your code, your hardware, your data feeds, your broker fills.
fitting with these tenets, we're always open to new framework suggestions and ideas.
- **zero web**:
low latency as a prime objective, native UIs and modern IPC
protocols without trying to re-invent the "OS-as-an-app"..
- **maximal privacy**:
prevent brokers and mms from knowing your planz; smack their
spreads with dark volume from a VPN tunnel.
- **zero clutter**:
modal, context oriented UIs that echew minimalism, reduce thought
noise and encourage un-emotion.
- **first class parallelism**:
built from the ground up on a next-gen structured concurrency
supervision sys.
- **traders first**:
broker/exchange/venue/asset-class/money-sys agnostic
- **systems grounded**:
real-time financial signal processing (fsp) that will make any
queuing or DSP eng juice their shorts.
- **non-tina UX**:
sleek, powerful keyboard driven interaction with expected use in
tiling wms (or maybe even a DDE).
- **data collab at scale**:
every actor-process and protocol is multi-host aware.
- **fight club ready**:
zero interest in adoption by suits; no corporate friendly license,
ever.
building the hottest looking, fastest, most reliable, keyboard
friendly FOSS trading platform is the dream; join the cause.
building the best looking, most reliable, keyboard friendly trading
platform is the dream; join the cause.
a sane install with `uv`
************************
bc why install with `python` when you can faster with `rust` ::
sane install with `poetry`
**************************
TODO!
uv lock
rigorous install on ``nixos`` using ``poetry2nix``
**************************************************
TODO!
hacky install on nixos
**********************
``NixOS`` is our core devs' distro of choice for which we offer
`NixOS` is our core devs' distro of choice for which we offer
a stringently defined development shell envoirment that can be loaded with::
nix-shell default.nix
nix-shell develop.nix
this will setup the required python environment to run piker, make sure to
run::
pip install -r requirements.txt -e .
once after loading the shell
start a chart
*************
run a realtime OHLCV chart stand-alone::
install wild-west style via `pip`
*********************************
``piker`` is currently under heavy pre-alpha development and as such
should be cloned from this repo and hacked on directly.
piker -l info chart btcusdt.spot.binance xmrusdt.spot.kraken
for a development install::
this runs a chart UI (with 1m sampled OHLCV) and shows 2 spot markets from 2 diff cexes
overlayed on the same graph. Use of `piker` without first starting
a daemon (`pikerd` - see below) means there is an implicit spawning of the
multi-actor-runtime (implemented as a `tractor` app).
For additional subsystem feats available through our chart UI see the
various sub-readmes:
- order control using a mouse-n-keyboard UX B)
- cross venue market-pair (what most call "symbol") search, select, overlay Bo
- financial-signal-processing (`piker.fsp`) write-n-reload to sub-chart BO
- src-asset derivatives scan for anal, like the infamous "max pain" XO
git clone git@github.com:pikers/piker.git
cd piker
virtualenv env
source ./env/bin/activate
pip install -r requirements.txt -e .
spawn a daemon standalone
*************************
we call the root actor-process the ``pikerd``. it can be (and is
recommended normally to be) started separately from the ``piker
chart`` program::
check out our charts
********************
bet you weren't expecting this from the foss::
piker -l info -b kraken -b binance chart btcusdt.binance --pdb
this runs the main chart (currently with 1m sampled OHLC) in in debug
mode and you can practice paper trading using the following
micro-manual:
``order_mode`` (
edge triggered activation by any of the following keys,
``mouse-click`` on y-level to submit at that price
):
- ``f``/ ``ctl-f`` to stage buy
- ``d``/ ``ctl-d`` to stage sell
- ``a`` to stage alert
``search_mode`` (
``ctl-l`` or ``ctl-space`` to open,
``ctl-c`` or ``ctl-space`` to close
) :
- begin typing to have symbol search automatically lookup
symbols from all loaded backend (broker) providers
- arrow keys and mouse click to navigate selection
- vi-like ``ctl-[hjkl]`` for navigation
you can also configure your position allocation limits from the
sidepane.
run in distributed mode
***********************
start the service manager and data feed daemon in the background and
connect to it::
pikerd -l info --pdb
the daemon does nothing until a ``piker``-client (like ``piker
chart``) connects and requests some particular sub-system. for
a connecting chart ``pikerd`` will spawn and manage at least,
- a data-feed daemon: ``datad`` which does all the work of comms with
the backend provider (in this case the ``binance`` cex).
- a paper-trading engine instance, ``paperboi.binance``, (if no live
account has been configured) which allows for auto/manual order
control against the live quote stream.
connect your chart::
*using* an actor-service (aka micro-daemon) manager which dynamically
supervises various sub-subsystems-as-services throughout the ``piker``
runtime-stack.
piker -l info -b kraken -b binance chart xmrusdt.binance --pdb
now you can (implicitly) connect your chart::
piker chart btcusdt.spot.binance
since ``pikerd`` was started separately you can now enjoy a persistent
real-time data stream tied to the daemon-tree's lifetime. i.e. the next
time you spawn a chart it will obviously not only load much faster
(since the underlying ``datad.binance`` is left running with its
in-memory IPC data structures) but also the data-feed and any order
mgmt states should be persistent until you finally cancel ``pikerd``.
enjoy persistent real-time data feeds tied to daemon lifetime. the next
time you spawn a chart it will load much faster since the data feed has
been cached and is now always running live in the background until you
kill ``pikerd``.
if anyone asks you what this project is about
*********************************************
you don't talk about it; just use it.
you don't talk about it.
how do i get involved?
@ -165,15 +166,6 @@ enter the matrix.
how come there ain't that many docs
***********************************
i mean we want/need them but building the core right has been higher
prio then marketting (and likely will stay that way Bp).
soo, suck it up bc,
- no one is trying to sell you on anything
- learning the code base is prolly way more valuable
- the UI/UXs are intended to be "intuitive" for any hacker..
we obviously need tonz help so if you want to start somewhere and
can't necessarily write "advanced" concurrent python/rust code, this
helping document literally anything might be the place for you!
suck it up, learn the code; no one is trying to sell you on anything.
also, we need lotsa help so if you want to start somewhere and can't
necessarily write serious code, this might be the place for you!

View File

@ -1,134 +0,0 @@
with (import <nixpkgs> {});
let
glibStorePath = lib.getLib glib;
zlibStorePath = lib.getLib zlib;
zstdStorePath = lib.getLib zstd;
dbusStorePath = lib.getLib dbus;
libGLStorePath = lib.getLib libGL;
freetypeStorePath = lib.getLib freetype;
qt6baseStorePath = lib.getLib qt6.qtbase;
fontconfigStorePath = lib.getLib fontconfig;
libxkbcommonStorePath = lib.getLib libxkbcommon;
xcbutilcursorStorePath = lib.getLib xcb-util-cursor;
qtpyStorePath = lib.getLib python312Packages.qtpy;
pyqt6StorePath = lib.getLib python312Packages.pyqt6;
pyqt6SipStorePath = lib.getLib python312Packages.pyqt6-sip;
rapidfuzzStorePath = lib.getLib python312Packages.rapidfuzz;
qdarkstyleStorePath = lib.getLib python312Packages.qdarkstyle;
xorgLibX11StorePath = lib.getLib xorg.libX11;
xorgLibxcbStorePath = lib.getLib xorg.libxcb;
xorgxcbutilwmStorePath = lib.getLib xorg.xcbutilwm;
xorgxcbutilimageStorePath = lib.getLib xorg.xcbutilimage;
xorgxcbutilerrorsStorePath = lib.getLib xorg.xcbutilerrors;
xorgxcbutilkeysymsStorePath = lib.getLib xorg.xcbutilkeysyms;
xorgxcbutilrenderutilStorePath = lib.getLib xorg.xcbutilrenderutil;
in
stdenv.mkDerivation {
name = "piker-qt6-uv";
buildInputs = [
# System requirements.
glib
zlib
dbus
zstd
libGL
freetype
qt6.qtbase
libgcc.lib
fontconfig
libxkbcommon
# Xorg requirements
xcb-util-cursor
xorg.libxcb
xorg.libX11
xorg.xcbutilwm
xorg.xcbutilimage
xorg.xcbutilerrors
xorg.xcbutilkeysyms
xorg.xcbutilrenderutil
# Python requirements.
python312Full
python312Packages.uv
python312Packages.qdarkstyle
python312Packages.rapidfuzz
python312Packages.pyqt6
python312Packages.qtpy
];
src = null;
shellHook = ''
set -e
# Set the Qt plugin path
# export QT_DEBUG_PLUGINS=1
QTBASE_PATH="${qt6baseStorePath}/lib"
QT_PLUGIN_PATH="$QTBASE_PATH/qt-6/plugins"
QT_QPA_PLATFORM_PLUGIN_PATH="$QT_PLUGIN_PATH/platforms"
LIB_GCC_PATH="${libgcc.lib}/lib"
GLIB_PATH="${glibStorePath}/lib"
ZSTD_PATH="${zstdStorePath}/lib"
ZLIB_PATH="${zlibStorePath}/lib"
DBUS_PATH="${dbusStorePath}/lib"
LIBGL_PATH="${libGLStorePath}/lib"
FREETYPE_PATH="${freetypeStorePath}/lib"
FONTCONFIG_PATH="${fontconfigStorePath}/lib"
LIB_XKB_COMMON_PATH="${libxkbcommonStorePath}/lib"
XCB_UTIL_CURSOR_PATH="${xcbutilcursorStorePath}/lib"
XORG_LIB_X11_PATH="${xorgLibX11StorePath}/lib"
XORG_LIB_XCB_PATH="${xorgLibxcbStorePath}/lib"
XORG_XCB_UTIL_IMAGE_PATH="${xorgxcbutilimageStorePath}/lib"
XORG_XCB_UTIL_WM_PATH="${xorgxcbutilwmStorePath}/lib"
XORG_XCB_UTIL_RENDER_UTIL_PATH="${xorgxcbutilrenderutilStorePath}/lib"
XORG_XCB_UTIL_KEYSYMS_PATH="${xorgxcbutilkeysymsStorePath}/lib"
XORG_XCB_UTIL_ERRORS_PATH="${xorgxcbutilerrorsStorePath}/lib"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$QTBASE_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$QT_PLUGIN_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$QT_QPA_PLATFORM_PLUGIN_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$LIB_GCC_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$DBUS_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$GLIB_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$ZLIB_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$ZSTD_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$LIBGL_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$FONTCONFIG_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$FREETYPE_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$LIB_XKB_COMMON_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XCB_UTIL_CURSOR_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_LIB_X11_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_LIB_XCB_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_XCB_UTIL_IMAGE_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_XCB_UTIL_WM_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_XCB_UTIL_RENDER_UTIL_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_XCB_UTIL_KEYSYMS_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_XCB_UTIL_ERRORS_PATH"
export LD_LIBRARY_PATH
RPDFUZZ_PATH="${rapidfuzzStorePath}/lib/python3.12/site-packages"
QDRKSTYLE_PATH="${qdarkstyleStorePath}/lib/python3.12/site-packages"
QTPY_PATH="${qtpyStorePath}/lib/python3.12/site-packages"
PYQT6_PATH="${pyqt6StorePath}/lib/python3.12/site-packages"
PYQT6_SIP_PATH="${pyqt6SipStorePath}/lib/python3.12/site-packages"
PATCH="$PATCH:$RPDFUZZ_PATH"
PATCH="$PATCH:$QDRKSTYLE_PATH"
PATCH="$PATCH:$QTPY_PATH"
PATCH="$PATCH:$PYQT6_PATH"
PATCH="$PATCH:$PYQT6_SIP_PATH"
export PATCH
# Install deps
uv lock
'';
}

View File

@ -1,8 +1,8 @@
# piker: trading gear for hackers
# Copyright (C)
# Guillermo Rodriguez (aka ze jefe)
# Tyler Goodlet
# (in stewardship for pikers)
# Guillermo Rodriguez (aka ze jefe)
# Tyler Goodlet
# (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -25,13 +25,14 @@ from __future__ import annotations
from collections import ChainMap
from contextlib import (
asynccontextmanager as acm,
AsyncExitStack,
)
from datetime import datetime
from pprint import pformat
from typing import (
Any,
Callable,
Hashable,
Sequence,
Type,
)
import hmac
@ -43,6 +44,7 @@ from pendulum import (
now,
)
import httpx
from rapidfuzz import process as fuzzy
import numpy as np
from piker import config
@ -52,7 +54,6 @@ from piker.clearing._messages import (
from piker.accounting import (
Asset,
digits_to_dec,
MktPair,
)
from piker.types import Struct
from piker.data import (
@ -68,6 +69,7 @@ from .venues import (
PAIRTYPES,
Pair,
MarketType,
_spot_url,
_futes_url,
_testnet_futes_url,
@ -77,18 +79,19 @@ from .venues import (
log = get_logger('piker.brokers.binance')
def get_config() -> dict[str, Any]:
def get_config() -> dict:
conf: dict
path: Path
conf, path = config.load(
conf_name='brokers',
touch_if_dne=True,
)
section: dict = conf.get('binance')
section = conf.get('binance')
if not section:
log.warning(
f'No config section found for binance in {path}'
)
log.warning(f'No config section found for binance in {path}')
return {}
return section
@ -144,7 +147,7 @@ def binance_timestamp(
class Client:
'''
Async ReST API client using `trio` + `httpx` B)
Async ReST API client using ``trio`` + ``httpx`` B)
Supports all of the spot, margin and futures endpoints depending
on method.
@ -153,17 +156,11 @@ class Client:
def __init__(
self,
venue_sessions: dict[
str, # venue key
tuple[httpx.AsyncClient, str] # session, eps path
],
conf: dict[str, Any],
# TODO: change this to `Client.[mkt_]venue: MarketType`?
mkt_mode: MarketType = 'spot',
httpx_client: httpx.AsyncClient,
) -> None:
self.conf = conf
# build out pair info tables for each market type
# and wrap in a chain-map view for search / query.
self._spot_pairs: dict[str, Pair] = {} # spot info table
@ -190,13 +187,28 @@ class Client:
# market symbols for use by search. See `.exch_info()`.
self._pairs: ChainMap[str, Pair] = ChainMap()
self._create_sessions(httpx_client)
# global client "venue selection" mode.
# set this when you want to switch venues and not have to
# specify the venue for the next request.
self.mkt_mode: MarketType = mkt_mode
# per-mkt-venue API client table
self.venue_sesh = venue_sessions
# per 8
self.venue_sesh: dict[
str, # venue key
tuple[httpx.AsyncClient, str] # session, eps path
] = {
'spot': (self._sesh, '/api/v3/'),
'spot_testnet': (self._test_sesh, '/fapi/v1/'),
'margin': (self._sapi_sesh, '/sapi/v1/'),
'usdtm_futes': (self._fapi_sesh, '/fapi/v1/'),
'usdtm_futes_testnet': (self._test_fapi_sesh, '/fapi/v1/'),
# 'futes_coin': self._dapi, # TODO
}
# lookup for going from `.mkt_mode: str` to the config
# subsection `key: str`
@ -211,6 +223,69 @@ class Client:
'futes': ['usdtm_futes'],
}
# for creating API keys see,
# https://www.binance.com/en/support/faq/how-to-create-api-keys-on-binance-360002502072
self.conf: dict = get_config()
self._setup_api_keys()
def _setup_api_keys(
self
) -> None:
"""
Set up API keys for the configured venues and sessions.
"""
for key, subconf in self.conf.items():
if api_key := subconf.get('api_key', ''):
venue_keys: list[str] = self.confkey2venuekeys[key]
venue_key: str
sesh: httpx.AsyncClient
for venue_key in venue_keys:
sesh, _ = self.venue_sesh[venue_key]
api_key_header: dict = {
# taken from official:
# https://github.com/binance/binance-futures-connector-python/blob/main/binance/api.py#L47
"Content-Type": "application/json;charset=utf-8",
# TODO: prolly should just always query and copy
# in the real latest ver?
"User-Agent": "binance-connector/6.1.6smbz6",
"X-MBX-APIKEY": api_key,
}
sesh.headers.update(api_key_header)
# if `.use_tesnet = true` in the config then
# also add headers for the testnet session which
# will be used for all order control
if subconf.get('use_testnet', False):
testnet_sesh, _ = self.venue_sesh[
venue_key + '_testnet'
]
testnet_sesh.headers.update(api_key_header)
def _create_sessions(
self,
httpx_client: httpx.AsyncClient
) -> None:
"""
Create the necessary AsyncClient sessions for different endpoints.
"""
# spot EPs sesh
self._sesh: httpx.AsyncClient = httpx_client.AsyncClient(base_url=_spot_url)
# spot testnet
self._test_sesh: httpx.AsyncClient = httpx_client.AsyncClient(base_url=__testnet_spot_url)
# margin and extended spot endpoints session.
self._sapi_sesh: httpx.AsyncClient = httpx_client.AsyncClient(base_url=_spot_url)
# futes EPs sesh
self._fapi_sesh: httpx.AsyncClient = httpx_client.AsyncClient(base_url=_futes_url)
# futes testnet
self._test_fapi_sesh: httpx.AsyncClient = httpx_client.AsyncClient(base_url=_testnet_futes_url)
def _mk_sig(
self,
data: dict,
@ -229,6 +304,7 @@ class Client:
'to define the creds for auth-ed endpoints!?'
)
# XXX: Info on security and authentification
# https://binance-docs.github.io/apidocs/#endpoint-security-type
if not (api_secret := subconf.get('api_secret')):
@ -257,7 +333,7 @@ class Client:
params: dict,
method: str = 'get',
venue: str|None = None, # if None use `.mkt_mode` state
venue: str | None = None, # if None use `.mkt_mode` state
signed: bool = False,
allow_testnet: bool = False,
@ -268,9 +344,8 @@ class Client:
- /fapi/v3/ USD-M FUTURES, or
- /api/v3/ SPOT/MARGIN
account/market endpoint request depending on either passed in
`venue: str` or the current setting `.mkt_mode: str` setting,
default `'spot'`.
account/market endpoint request depending on either passed in `venue: str`
or the current setting `.mkt_mode: str` setting, default `'spot'`.
Docs per venue API:
@ -299,6 +374,9 @@ class Client:
venue=venue_key,
)
sesh: httpx.AsyncClient
path: str
# Check if we're configured to route order requests to the
# venue equivalent's testnet.
use_testnet: bool = False
@ -323,12 +401,11 @@ class Client:
# ctl machinery B)
venue_key += '_testnet'
client: httpx.AsyncClient
path: str
client, path = self.venue_sesh[venue_key]
meth: Callable = getattr(client, method)
sesh, path = self.venue_sesh[venue_key]
meth: Callable = getattr(sesh, method)
resp = await meth(
url=path + endpoint,
path=path + endpoint,
params=params,
timeout=float('inf'),
)
@ -370,15 +447,7 @@ class Client:
item['filters'] = filters
pair_type: Type = PAIRTYPES[venue]
try:
pair: Pair = pair_type(**item)
except Exception as e:
e.add_note(
"\nDon't panic, prolly stupid binance changed their symbology schema again..\n"
'Check out their API docs here:\n\n'
'https://binance-docs.github.io/apidocs/spot/en/#exchange-information'
)
raise
pair: Pair = pair_type(**item)
pair_table[pair.symbol.upper()] = pair
# update an additional top-level-cross-venue-table
@ -473,9 +542,7 @@ class Client:
'''
pair_table: dict[str, Pair] = self._venue2pairs[
venue
or
self.mkt_mode
venue or self.mkt_mode
]
if (
expiry
@ -494,9 +561,9 @@ class Client:
venues: list[str] = [venue]
# batch per-venue download of all exchange infos
async with trio.open_nursery() as tn:
async with trio.open_nursery() as rn:
for ven in venues:
tn.start_soon(
rn.start_soon(
self._cache_pairs,
ven,
)
@ -549,11 +616,11 @@ class Client:
) -> dict[str, Any]:
fq_pairs: dict[str, Pair] = await self.exch_info()
fq_pairs: dict = await self.exch_info()
# TODO: cache this list like we were in
# `open_symbol_search()`?
# keys: list[str] = list(fq_pairs)
keys: list[str] = list(fq_pairs)
return match_from_pairs(
pairs=fq_pairs,
@ -561,20 +628,9 @@ class Client:
score_cutoff=50,
)
def pair2venuekey(
self,
pair: Pair,
) -> str:
return {
'USDTM': 'usdtm_futes',
'SPOT': 'spot',
# 'COINM': 'coin_futes',
# ^-TODO-^ bc someone might want it..?
}[pair.venue]
async def bars(
self,
mkt: MktPair,
symbol: str,
start_dt: datetime | None = None,
end_dt: datetime | None = None,
@ -604,20 +660,16 @@ class Client:
start_time = binance_timestamp(start_dt)
end_time = binance_timestamp(end_dt)
bs_pair: Pair = self._pairs[mkt.bs_fqme.upper()]
# https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data
bars = await self._api(
'klines',
params={
# NOTE: always query using their native symbology!
'symbol': mkt.bs_mktid.upper(),
'symbol': symbol.upper(),
'interval': '1m',
'startTime': start_time,
'endTime': end_time,
'limit': limit
},
venue=self.pair2venuekey(bs_pair),
allow_testnet=False,
)
new_bars: list[tuple] = []
@ -934,148 +986,17 @@ class Client:
await self.close_listen_key(key)
_venue_urls: dict[str, str] = {
'spot': (
_spot_url,
'/api/v3/',
),
'spot_testnet': (
_testnet_spot_url,
'/fapi/v1/'
),
# margin and extended spot endpoints session.
# TODO: did this ever get implemented fully?
# 'margin': (
# _spot_url,
# '/sapi/v1/'
# ),
'usdtm_futes': (
_futes_url,
'/fapi/v1/',
),
'usdtm_futes_testnet': (
_testnet_futes_url,
'/fapi/v1/',
),
# TODO: for anyone who actually needs it ;P
# 'coin_futes': ()
}
def init_api_keys(
client: Client,
conf: dict[str, Any],
) -> None:
'''
Set up per-venue API keys each http client according to the user's
`brokers.conf`.
For ex, to use spot-testnet and live usdt futures APIs:
```toml
[binance]
# spot test net
spot.use_testnet = true
spot.api_key = '<spot_api_key_from_binance_account>'
spot.api_secret = '<spot_api_key_password>'
# futes live
futes.use_testnet = false
accounts.usdtm = 'futes'
futes.api_key = '<futes_api_key_from_binance>'
futes.api_secret = '<futes_api_key_password>''
# if uncommented will use the built-in paper engine and not
# connect to `binance` API servers for order ctl.
# accounts.paper = 'paper'
```
'''
for key, subconf in conf.items():
if api_key := subconf.get('api_key', ''):
venue_keys: list[str] = client.confkey2venuekeys[key]
venue_key: str
client: httpx.AsyncClient
for venue_key in venue_keys:
client, _ = client.venue_sesh[venue_key]
api_key_header: dict = {
# taken from official:
# https://github.com/binance/binance-futures-connector-python/blob/main/binance/api.py#L47
"Content-Type": "application/json;charset=utf-8",
# TODO: prolly should just always query and copy
# in the real latest ver?
"User-Agent": "binance-connector/6.1.6smbz6",
"X-MBX-APIKEY": api_key,
}
client.headers.update(api_key_header)
# if `.use_tesnet = true` in the config then
# also add headers for the testnet session which
# will be used for all order control
if subconf.get('use_testnet', False):
testnet_sesh, _ = client.venue_sesh[
venue_key + '_testnet'
]
testnet_sesh.headers.update(api_key_header)
@acm
async def get_client(
mkt_mode: MarketType = 'spot',
) -> Client:
'''
Construct an single `piker` client which composes multiple underlying venue
specific API clients both for live and test networks.
async def get_client() -> Client:
'''
venue_sessions: dict[
str, # venue key
tuple[httpx.AsyncClient, str] # session, eps path
] = {}
async with AsyncExitStack() as client_stack:
for name, (base_url, path) in _venue_urls.items():
api: httpx.AsyncClient = await client_stack.enter_async_context(
httpx.AsyncClient(
base_url=base_url,
# headers={},
client = Client()
await client.exch_info()
log.info(
f'{client} in {client.mkt_mode} mode: caching exchange infos..\n'
'Cached multi-market pairs:\n'
f'spot: {len(client._spot_pairs)}\n'
f'usdtm_futes: {len(client._ufutes_pairs)}\n'
f'Total: {len(client._pairs)}\n'
)
# TODO: is there a way to numerate this?
# https://www.python-httpx.org/advanced/clients/#why-use-a-client
# connections=4
)
)
venue_sessions[name] = (
api,
path,
)
conf: dict[str, Any] = get_config()
# for creating API keys see,
# https://www.binance.com/en/support/faq/how-to-create-api-keys-on-binance-360002502072
client = Client(
venue_sessions=venue_sessions,
conf=conf,
mkt_mode=mkt_mode,
)
init_api_keys(
client=client,
conf=conf,
)
fq_pairs: dict[str, Pair] = await client.exch_info()
assert fq_pairs
log.info(
f'Loaded multi-venue `Client` in mkt_mode={client.mkt_mode!r}\n\n'
f'Symbology Summary:\n'
f'------ - ------\n'
f'spot: {len(client._spot_pairs)}\n'
f'usdtm_futes: {len(client._ufutes_pairs)}\n'
'------ - ------\n'
f'total: {len(client._pairs)}\n'
)
yield client
yield client

View File

@ -264,20 +264,15 @@ async def open_trade_dialog(
# do a open_symcache() call.. though maybe we can hide
# this in a new async version of open_account()?
async with open_cached_client('binance') as client:
subconf: dict|None = client.conf.get(venue_name)
subconf: dict = client.conf[venue_name]
use_testnet = subconf.get('use_testnet', False)
# XXX: if no futes.api_key or spot.api_key has been set we
# always fall back to the paper engine!
if (
not subconf
or
not subconf.get('api_key')
):
if not subconf.get('api_key'):
await ctx.started('paper')
return
use_testnet: bool = subconf.get('use_testnet', False)
async with (
open_cached_client('binance') as client,
):

View File

@ -42,12 +42,12 @@ from trio_typing import TaskStatus
from pendulum import (
from_timestamp,
)
from rapidfuzz import process as fuzzy
import numpy as np
import tractor
from piker.brokers import (
open_cached_client,
NoData,
)
from piker._cacheables import (
async_lifo_cache,
@ -110,7 +110,6 @@ class AggTrade(Struct, frozen=True):
async def stream_messages(
ws: NoBsWs,
) -> AsyncGenerator[NoBsWs, dict]:
# TODO: match syntax here!
@ -221,8 +220,6 @@ def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]:
}
# TODO, why aren't frame resp `log.info()`s showing in upstream
# code?!
@acm
async def open_history_client(
mkt: MktPair,
@ -255,30 +252,24 @@ async def open_history_client(
else:
client.mkt_mode = 'spot'
array: np.ndarray = await client.bars(
mkt=mkt,
# NOTE: always query using their native symbology!
mktid: str = mkt.bs_mktid
array = await client.bars(
mktid,
start_dt=start_dt,
end_dt=end_dt,
)
if array.size == 0:
raise NoData(
f'No frame for {start_dt} -> {end_dt}\n'
)
times = array['time']
if not times.any():
raise ValueError(
'Bad frame with null-times?\n\n'
f'{times}'
)
if end_dt is None:
inow: int = round(time.time())
if (
end_dt is None
):
inow = round(time.time())
if (inow - times[-1]) > 60:
await tractor.pause()
start_dt = from_timestamp(times[0])
end_dt = from_timestamp(times[-1])
return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 3, 'rate': 3}
@ -465,8 +456,6 @@ async def stream_quotes(
):
init_msgs: list[FeedInit] = []
for sym in symbols:
mkt: MktPair
pair: Pair
mkt, pair = await get_mkt_info(sym)
# build out init msgs according to latest spec
@ -515,6 +504,7 @@ async def stream_quotes(
# start streaming
async for typ, quote in msg_gen:
# period = time.time() - last
# hz = 1/period if period else float('inf')
# if hz > 60:
@ -550,7 +540,7 @@ async def open_symbol_search(
)
# repack in fqme-keyed table
byfqme: dict[str, Pair] = {}
byfqme: dict[start, Pair] = {}
for pair in pairs.values():
byfqme[pair.bs_fqme] = pair

View File

@ -137,12 +137,10 @@ class SpotPair(Pair, frozen=True):
quoteOrderQtyMarketAllowed: bool
isSpotTradingAllowed: bool
isMarginTradingAllowed: bool
otoAllowed: bool
defaultSelfTradePreventionMode: str
allowedSelfTradePreventionModes: list[str]
permissions: list[str]
permissionSets: list[list[str]]
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.binance:SpotPair'
@ -181,6 +179,7 @@ class FutesPair(Pair):
quoteAsset: str # 'USDT',
quotePrecision: int # 8,
requiredMarginPercent: float # '5.0000',
settlePlan: int # 0,
timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'],
triggerProtect: float # '0.0500',
underlyingSubType: list[str] # ['PoW'],

View File

@ -100,7 +100,7 @@ async def data_reset_hack(
log.warning(
no_setup_msg
+
'REQUIRES A `vnc_addrs: array` ENTRY'
f'REQUIRES A `vnc_addrs: array` ENTRY'
)
vnc_host, vnc_port = vnc_sockaddr.get(
@ -259,7 +259,7 @@ def i3ipc_xdotool_manual_click_hack() -> None:
timeout=timeout,
)
# re-activate and focus original window
# re-activate and focus original window
subprocess.call([
'xdotool',
'windowactivate', '--sync', str(orig_win_id),

View File

@ -287,31 +287,9 @@ class Client:
self.conf = config
# NOTE: the ib.client here is "throttled" to 45 rps by default
self.ib: IB = ib
self.ib = ib
self.ib.RaiseRequestErrors: bool = True
# self._acnt_names: set[str] = {}
self._acnt_names: list[str] = []
@property
def acnts(self) -> list[str]:
# return list(self._acnt_names)
return self._acnt_names
def __repr__(self) -> str:
return (
f'<{type(self).__name__}('
f'ib={self.ib} '
f'acnts={self.acnts}'
# TODO: we need to mask out acnt-#s and other private
# infos if we're going to console this!
# f' |_.conf:\n'
# f' {pformat(self.conf)}\n'
')>'
)
async def get_fills(self) -> list[Fill]:
'''
Return list of rents `Fills` from trading session.
@ -398,63 +376,55 @@ class Client:
# whatToShow='MIDPOINT',
# whatToShow='TRADES',
)
log.info(
f'REQUESTING {ib_duration_str} worth {bar_size} BARS\n'
f'fqme: {fqme}\n'
f'global _enters: {_enters}\n'
f'kwargs: {pformat(kwargs)}\n'
)
bars = await self.ib.reqHistoricalDataAsync(
**kwargs,
)
query_info: str = (
f'REQUESTING IB history BARS\n'
f' ------ - ------\n'
f'dt_duration: {dt_duration}\n'
f'ib_duration_str: {ib_duration_str}\n'
f'bar_size: {bar_size}\n'
f'fqme: {fqme}\n'
f'actor-global _enters: {_enters}\n'
f'kwargs: {pformat(kwargs)}\n'
)
# tail case if no history for range or none prior.
# NOTE: there's actually 3 cases here to handle (and
# this should be read alongside the implementation of
# `.reqHistoricalDataAsync()`):
# - a timeout occurred in which case insync internals return
# an empty list thing with bars.clear()...
# - no data exists for the period likely due to
# a weekend, holiday or other non-trading period prior to
# ``end_dt`` which exceeds the ``duration``,
# - LITERALLY this is the start of the mkt's history!
if not bars:
# TODO: figure out wut's going on here.
# NOTE: there's actually 3 cases here to handle (and
# this should be read alongside the implementation of
# `.reqHistoricalDataAsync()`):
# - a timeout occurred in which case insync internals return
# an empty list thing with bars.clear()...
# - no data exists for the period likely due to
# a weekend, holiday or other non-trading period prior to
# ``end_dt`` which exceeds the ``duration``,
# - LITERALLY this is the start of the mkt's history!
# TODO: is this handy, a sync requester for tinkering
# with empty frame cases?
# def get_hist():
# return self.ib.reqHistoricalData(**kwargs)
# import pdbp
# pdbp.set_trace()
log.critical(
'STUPID IB SAYS NO HISTORY\n\n'
+ query_info
)
# sync requester for debugging empty frame cases
def get_hist():
return self.ib.reqHistoricalData(**kwargs)
assert get_hist
import pdbp
pdbp.set_trace()
# TODO: we could maybe raise ``NoData`` instead if we
# rewrite the method in the first case?
# right now there's no way to detect a timeout..
return [], np.empty(0), dt_duration
# TODO: we could maybe raise ``NoData`` instead if we
# rewrite the method in the first case? right now there's no
# way to detect a timeout.
log.info(query_info)
# NOTE XXX: ensure minimum duration in bars?
# => recursively call this method until we get at least as
# many bars such that they sum in aggregate to the the
# desired total time (duration) at most.
# - if you query over a gap and get no data
# that may short circuit the history
# NOTE XXX: ensure minimum duration in bars B)
# => we recursively call this method until we get at least
# as many bars such that they sum in aggregate to the the
# desired total time (duration) at most.
# XXX XXX XXX
# WHY DID WE EVEN NEED THIS ORIGINALLY!?
# XXX XXX XXX
# - if you query over a gap and get no data
# that may short circuit the history
if (
# XXX XXX XXX
# => WHY DID WE EVEN NEED THIS ORIGINALLY!? <=
# XXX XXX XXX
False
and end_dt
end_dt
and False
):
nparr: np.ndarray = bars_to_np(bars)
times: np.ndarray = nparr['time']
@ -957,10 +927,7 @@ class Client:
warnset = True
else:
log.info(
'Got first quote for contract\n'
f'{contract}\n'
)
log.info(f'Got first quote for {contract}')
break
else:
if timeouterr and raise_on_timeout:
@ -1024,12 +991,8 @@ class Client:
outsideRth=True,
optOutSmartRouting=True,
# TODO: need to understand this setting better as
# it pertains to shit ass mms..
routeMarketableToBbo=True,
designatedLocation='SMART',
# TODO: make all orders GTC?
# https://interactivebrokers.github.io/tws-api/classIBApi_1_1Order.html#a95539081751afb9980f4c6bd1655a6ba
# goodTillDate=f"yyyyMMdd-HH:mm:ss",
@ -1157,8 +1120,8 @@ def get_config() -> dict[str, Any]:
names = list(accounts.keys())
accts = section['accounts'] = bidict(accounts)
log.info(
f'{path} defines {len(accts)} account aliases:\n'
f'{pformat(names)}\n'
f'brokers.toml defines {len(accts)} accounts: '
f'{pformat(names)}'
)
if section is None:
@ -1225,7 +1188,7 @@ async def load_aio_clients(
try_ports = list(try_ports.values())
_err = None
accounts_def: dict[str, str] = config.load_accounts(['ib'])
accounts_def = config.load_accounts(['ib'])
ports = try_ports if port is None else [port]
combos = list(itertools.product(hosts, ports))
accounts_found: dict[str, Client] = {}
@ -1250,12 +1213,6 @@ async def load_aio_clients(
for i in range(connect_retries):
try:
log.info(
'Trying `ib_async` connect\n'
f'{host}: {port}\n'
f'clientId: {client_id}\n'
f'timeout: {connect_timeout}\n'
)
await ib.connectAsync(
host,
port,
@ -1270,9 +1227,7 @@ async def load_aio_clients(
client = Client(ib=ib, config=conf)
# update all actor-global caches
log.runtime(
f'Connected and caching `Client` @ {sockaddr!r}'
)
log.info(f"Caching client for {sockaddr}")
_client_cache[sockaddr] = client
break
@ -1287,59 +1242,37 @@ async def load_aio_clients(
OSError,
) as ce:
_err = ce
message: str = (
f'Failed to connect on {host}:{port} after {i} tries with\n'
f'{ib.client.apiError.value()!r}\n\n'
'Retrying with a new client id..\n'
)
log.runtime(message)
else:
# XXX report loudly if we never established after all
# re-tries
log.warning(message)
log.warning(
f'Failed to connect on {host}:{port} for {i} time with,\n'
f'{ib.client.apiError.value()}\n'
'retrying with a new client id..')
# Pre-collect all accounts available for this
# connection and map account names to this client
# instance.
for value in ib.accountValues():
acct_number: str = value.account
acct_number = value.account
acnt_alias: str = accounts_def.inverse.get(acct_number)
if not acnt_alias:
# TODO: should we constuct the below reco-ex from
# the existing config content?
_, path = config.load(
conf_name='brokers',
)
entry = accounts_def.inverse.get(acct_number)
if not entry:
raise ValueError(
'No alias in account section for account!\n'
f'Please add an acnt alias entry to your {path}\n'
'For example,\n\n'
'[ib.accounts]\n'
'margin = {accnt_number!r}\n'
'^^^^^^ <- you need this part!\n\n'
'This ensures `piker` will not leak private acnt info '
'to console output by default!\n'
'No section in brokers.toml for account:'
f' {acct_number}\n'
f'Please add entry to continue using this API client'
)
# surjection of account names to operating clients.
if acnt_alias not in accounts_found:
accounts_found[acnt_alias] = client
# client._acnt_names.add(acnt_alias)
client._acnt_names.append(acnt_alias)
if acct_number not in accounts_found:
accounts_found[entry] = client
if accounts_found:
log.info(
f'Loaded accounts for api client\n\n'
f'{pformat(accounts_found)}\n'
)
log.info(
f'Loaded accounts for client @ {host}:{port}\n'
f'{pformat(accounts_found)}'
)
# XXX: why aren't we just updating this directy above
# instead of using the intermediary `accounts_found`?
_accounts2clients.update(accounts_found)
# XXX: why aren't we just updating this directy above
# instead of using the intermediary `accounts_found`?
_accounts2clients.update(accounts_found)
# if we have no clients after the scan loop then error out.
if not _client_cache:
@ -1373,9 +1306,7 @@ async def load_clients_for_trio(
a ``tractor.to_asyncio.open_channel_from()``.
'''
async with load_aio_clients(
disconnect_on_exit=False,
) as accts2clients:
async with load_aio_clients() as accts2clients:
to_trio.send_nowait(accts2clients)
@ -1541,7 +1472,7 @@ async def open_aio_client_method_relay(
msg: tuple[str, dict] | dict | None = await from_trio.get()
match msg:
case None: # termination sentinel
log.info('asyncio `Client` method-proxy SHUTDOWN!')
print('asyncio PROXY-RELAY SHUTDOWN')
break
case (meth_name, kwargs):

View File

@ -1183,14 +1183,7 @@ async def deliver_trade_events(
pos
and fill
):
now_cr: CommissionReport = fill.commissionReport
if (now_cr != cr):
log.warning(
'UhhHh ib updated the commission report mid-fill..?\n'
f'was: {pformat(cr)}\n'
f'now: {pformat(now_cr)}\n'
)
assert fill.commissionReport == cr
await emit_pp_update(
ems_stream,
accounts_def,

View File

@ -671,8 +671,8 @@ async def _setup_quote_stream(
# making them mostly useless and explains why the scanner
# is always slow XD
# '293', # Trade count for day
# '294', # Trade rate / minute
# '295', # Vlm rate / minute
'294', # Trade rate / minute
'295', # Vlm rate / minute
),
contract: Contract | None = None,
@ -915,13 +915,9 @@ async def stream_quotes(
if first_ticker:
first_quote: dict = normalize(first_ticker)
# TODO: we need a stack-oriented log levels filters for
# this!
# log.info(message, filter={'stack': 'live_feed'}) ?
log.runtime(
'Rxed init quote:\n\n'
f'{pformat(first_quote)}\n'
log.info(
'Rxed init quote:\n'
f'{pformat(first_quote)}'
)
# NOTE: it might be outside regular trading hours for
@ -973,11 +969,7 @@ async def stream_quotes(
raise_on_timeout=True,
)
first_quote: dict = normalize(first_ticker)
# TODO: we need a stack-oriented log levels filters for
# this!
# log.info(message, filter={'stack': 'live_feed'}) ?
log.runtime(
log.info(
'Rxed init quote:\n'
f'{pformat(first_quote)}'
)

View File

@ -31,11 +31,7 @@ from typing import (
)
from bidict import bidict
from pendulum import (
DateTime,
parse,
from_timestamp,
)
import pendulum
from ib_insync import (
Contract,
Commodity,
@ -70,11 +66,10 @@ tx_sort: Callable = partial(
iter_by_dt,
parsers={
'dateTime': parse_flex_dt,
'datetime': parse,
# XXX: for some some fucking 2022 and
# back options records.. f@#$ me..
'date': parse,
'datetime': pendulum.parse,
# for some some fucking 2022 and
# back options records...fuck me.
'date': pendulum.parse,
}
)
@ -94,38 +89,15 @@ def norm_trade(
conid: int = str(record.get('conId') or record['conid'])
bs_mktid: str = str(conid)
comms = record.get('commission')
if comms is None:
comms = -1*record['ibCommission']
# NOTE: sometimes weird records (like BTTX?)
# have no field for this?
comms: float = -1 * (
record.get('commission')
or record.get('ibCommission')
or 0
)
if not comms:
log.warning(
'No commissions found for record?\n'
f'{pformat(record)}\n'
)
price: float = (
record.get('price')
or record.get('tradePrice')
)
if price is None:
log.warning(
'No `price` field found in record?\n'
'Skipping normalization..\n'
f'{pformat(record)}\n'
)
return None
price = record.get('price') or record['tradePrice']
# the api doesn't do the -/+ on the quantity for you but flex
# records do.. are you fucking serious ib...!?
size: float|int = (
record.get('quantity')
or record['shares']
) * {
size = record.get('quantity') or record['shares'] * {
'BOT': 1,
'SLD': -1,
}[record['side']]
@ -156,31 +128,26 @@ def norm_trade(
# otype = tail[6]
# strike = tail[7:]
log.warning(
f'Skipping option contract -> NO SUPPORT YET!\n'
f'{symbol}\n'
)
print(f'skipping opts contract {symbol}')
return None
# timestamping is way different in API records
dtstr: str = record.get('datetime')
date: str = record.get('date')
flex_dtstr: str = record.get('dateTime')
dtstr = record.get('datetime')
date = record.get('date')
flex_dtstr = record.get('dateTime')
if dtstr or date:
dt: DateTime = parse(dtstr or date)
dt = pendulum.parse(dtstr or date)
elif flex_dtstr:
# probably a flex record with a wonky non-std timestamp..
dt: DateTime = parse_flex_dt(record['dateTime'])
dt = parse_flex_dt(record['dateTime'])
# special handling of symbol extraction from
# flex records using some ad-hoc schema parsing.
asset_type: str = (
record.get('assetCategory')
or record.get('secType')
or 'STK'
)
asset_type: str = record.get(
'assetCategory'
) or record.get('secType', 'STK')
if (expiry := (
record.get('lastTradeDateOrContractMonth')
@ -390,7 +357,6 @@ def norm_trade_records(
if txn is None:
continue
# inject txns sorted by datetime
insort(
records,
txn,
@ -439,7 +405,7 @@ def api_trades_to_ledger_entries(
txn_dict[attr_name] = val
tid = str(txn_dict['execId'])
dt = from_timestamp(txn_dict['time'])
dt = pendulum.from_timestamp(txn_dict['time'])
txn_dict['datetime'] = str(dt)
acctid = accounts[txn_dict['acctNumber']]

View File

@ -209,10 +209,7 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
break
ib_client = proxy._aio_ns.ib
log.info(
f'Using API client for symbol-search\n'
f'{ib_client}\n'
)
log.info(f'Using {ib_client} for symbol search')
last = time.time()
async for pattern in stream:
@ -297,7 +294,7 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
elif stock_results:
break
# else:
# await tractor.pause()
await tractor.pause()
# # match against our ad-hoc set immediately
# adhoc_matches = fuzzy.extract(
@ -525,21 +522,7 @@ async def get_mkt_info(
venue = con.primaryExchange or con.exchange
price_tick: Decimal = Decimal(str(details.minTick))
ib_min_tick_gt_2: Decimal = Decimal('0.01')
if (
price_tick < ib_min_tick_gt_2
):
# TODO: we need to add some kinda dynamic rounding sys
# to our MktPair i guess?
# not sure where the logic should sit, but likely inside
# the `.clearing._ems` i suppose...
log.warning(
'IB seems to disallow a min price tick < 0.01 '
'when the price is > 2.0..?\n'
f'Decreasing min tick precision for {fqme} to 0.01'
)
# price_tick = ib_min_tick
# await tractor.pause()
# price_tick: Decimal = Decimal('0.01')
if atype == 'stock':
# XXX: GRRRR they don't support fractional share sizes for

View File

@ -62,7 +62,7 @@ from piker._cacheables import (
)
from piker.log import get_logger
from piker.data.validate import FeedInit
from piker.types import Struct # NOTE, this is already a `tractor.msg.Struct`
from piker.types import Struct
from piker.data import (
def_iohlcv_fields,
match_from_pairs,
@ -98,18 +98,9 @@ class KucoinMktPair(Struct, frozen=True):
def size_tick(self) -> Decimal:
return Decimal(str(self.quoteMinSize))
callauctionFirstStageStartTime: None|float
callauctionIsEnabled: bool
callauctionPriceCeiling: float|None
callauctionPriceFloor: float|None
callauctionSecondStageStartTime: float|None
callauctionThirdStageStartTime: float|None
enableTrading: bool
feeCategory: int
feeCurrency: str
isMarginEnabled: bool
makerFeeCoefficient: float
market: str
minFunds: float
name: str
@ -119,10 +110,7 @@ class KucoinMktPair(Struct, frozen=True):
quoteIncrement: float
quoteMaxSize: float
quoteMinSize: float
st: bool
symbol: str # our bs_mktid, kucoin's internal id
takerFeeCoefficient: float
tradingStartTime: float|None
class AccountTrade(Struct, frozen=True):
@ -317,10 +305,9 @@ class Client:
headers=headers,
)
json: dict = res.json()
if (data := json.get('data')) is not None:
if data := json.get('data'):
return data
else:
api_url: str = self._http.base_url
log.error(
f'Error making request to {api_url} ->\n'
f'{pformat(res)}'
@ -404,13 +391,7 @@ class Client:
pairs: dict[str, KucoinMktPair] = {}
fqmes2mktids: bidict[str, str] = bidict()
for item in entries:
try:
pair = pairs[item['name']] = KucoinMktPair(**item)
except TypeError as te:
raise TypeError(
'`KucoinMktPair` and reponse fields do not match ??\n'
f'{KucoinMktPair.fields_diff(item)}\n'
) from te
pair = pairs[item['name']] = KucoinMktPair(**item)
fqmes2mktids[
item['name'].lower().replace('-', '')
] = pair.name
@ -611,7 +592,7 @@ async def get_client() -> AsyncGenerator[Client, None]:
'''
async with (
httpx.AsyncClient(
base_url='https://api.kucoin.com/api',
base_url=f'https://api.kucoin.com/api',
) as trio_client,
):
client = Client(httpx_client=trio_client)
@ -655,7 +636,7 @@ async def open_ping_task(
await trio.sleep((ping_interval - 1000) / 1000)
await ws.send_msg({'id': connect_id, 'type': 'ping'})
log.warning('Starting ping task for kucoin ws connection')
log.info('Starting ping task for kucoin ws connection')
n.start_soon(ping_server)
yield
@ -667,14 +648,9 @@ async def open_ping_task(
async def get_mkt_info(
fqme: str,
) -> tuple[
MktPair,
KucoinMktPair,
]:
) -> tuple[MktPair, KucoinMktPair]:
'''
Query for and return both a `piker.accounting.MktPair` and
`KucoinMktPair` from provided `fqme: str`
(fully-qualified-market-endpoint).
Query for and return a `MktPair` and `KucoinMktPair`.
'''
async with open_cached_client('kucoin') as client:
@ -749,8 +725,6 @@ async def stream_quotes(
log.info(f'Starting up quote stream(s) for {symbols}')
for sym_str in symbols:
mkt: MktPair
pair: KucoinMktPair
mkt, pair = await get_mkt_info(sym_str)
init_msgs.append(
FeedInit(mkt_info=mkt)
@ -758,11 +732,7 @@ async def stream_quotes(
ws: NoBsWs
token, ping_interval = await client._get_ws_token()
log.info('API reported ping_interval: {ping_interval}\n')
connect_id: str = str(uuid4())
typ: str
quote: dict
connect_id = str(uuid4())
async with (
open_autorecon_ws(
(
@ -776,37 +746,20 @@ async def stream_quotes(
),
) as ws,
open_ping_task(ws, ping_interval, connect_id),
aclosing(
iter_normed_quotes(
ws, sym_str
)
) as iter_quotes,
aclosing(stream_messages(ws, sym_str)) as msg_gen,
):
typ, quote = await anext(iter_quotes)
typ, quote = await anext(msg_gen)
# take care to not unblock here until we get a real
# trade quote?
# ^TODO, remove this right?
# -[ ] what often blocks chart boot/new-feed switching
# since we'ere waiting for a live quote instead of just
# loading history afap..
# |_ XXX, not sure if we require a bit of rework to core
# feed init logic or if backends justg gotta be
# changed up.. feel like there was some causality
# dilema prolly only seen with IB too..
# while typ != 'trade':
# typ, quote = await anext(iter_quotes)
while typ != 'trade':
# take care to not unblock here until we get a real
# trade quote
typ, quote = await anext(msg_gen)
task_status.started((init_msgs, quote))
feed_is_live.set()
# XXX NOTE, DO NOT include the `.<backend>` suffix!
# OW the sampling loop will not broadcast correctly..
# since `bus._subscribers.setdefault(bs_fqme, set())`
# is used inside `.data.open_feed_bus()` !!!
topic: str = mkt.bs_fqme
async for typ, quote in iter_quotes:
await send_chan.send({topic: quote})
async for typ, msg in msg_gen:
await send_chan.send({sym_str: msg})
@acm
@ -861,7 +814,7 @@ async def subscribe(
)
async def iter_normed_quotes(
async def stream_messages(
ws: NoBsWs,
sym: str,
@ -892,9 +845,6 @@ async def iter_normed_quotes(
yield 'trade', {
'symbol': sym,
# TODO, is 'last' even used elsewhere/a-good
# semantic? can't we just read the ticks with our
# .data.ticktools.frame_ticks()`/
'last': trade_data.price,
'brokerd_ts': last_trade_ts,
'ticks': [
@ -987,7 +937,7 @@ async def open_history_client(
if end_dt is None:
inow = round(time.time())
log.debug(
print(
f'difference in time between load and processing'
f'{inow - times[-1]}'
)

View File

@ -1,49 +0,0 @@
piker.clearing
______________
trade execution-n-control subsys for both live and paper trading as
well as algo-trading manual override/interaction across any backend
broker and data provider.
avail UIs
*********
order ctl
---------
the `piker.clearing` subsys is exposed mainly though
the `piker chart` GUI as a "chart trader" style UX and
is automatically enabled whenever a chart is opened.
.. ^TODO, more prose here!
the "manual" order control features are exposed via the
`piker.ui.order_mode` API and can pretty much always be
used (at least) in simulated-trading mode, aka "paper"-mode, and
the micro-manual is as follows:
``order_mode`` (
edge triggered activation by any of the following keys,
``mouse-click`` on y-level to submit at that price
):
- ``f``/ ``ctl-f`` to stage buy
- ``d``/ ``ctl-d`` to stage sell
- ``a`` to stage alert
``search_mode`` (
``ctl-l`` or ``ctl-space`` to open,
``ctl-c`` or ``ctl-space`` to close
) :
- begin typing to have symbol search automatically lookup
symbols from all loaded backend (broker) providers
- arrow keys and mouse click to navigate selection
- vi-like ``ctl-[hjkl]`` for navigation
position (pp) mgmt
------------------
you can also configure your position allocation limits from the
sidepane.
.. ^TODO, explain and provide tut once more refined!

View File

@ -104,15 +104,14 @@ def get_app_dir(
# `tractor`) with the testing dir and check for it whenever we
# detect `pytest` is being used (which it isn't under normal
# operation).
# if "pytest" in sys.modules:
# import tractor
# actor = tractor.current_actor(err_on_no_runtime=False)
# if actor: # runtime is up
# rvs = tractor._state._runtime_vars
# import pdbp; pdbp.set_trace()
# testdirpath = Path(rvs['piker_vars']['piker_test_dir'])
# assert testdirpath.exists(), 'piker test harness might be borked!?'
# app_name = str(testdirpath)
if "pytest" in sys.modules:
import tractor
actor = tractor.current_actor(err_on_no_runtime=False)
if actor: # runtime is up
rvs = tractor._state._runtime_vars
testdirpath = Path(rvs['piker_vars']['piker_test_dir'])
assert testdirpath.exists(), 'piker test harness might be borked!?'
app_name = str(testdirpath)
if platform.system() == 'Windows':
key = "APPDATA" if roaming else "LOCALAPPDATA"

View File

@ -273,7 +273,7 @@ async def _reconnect_forever(
nobsws._connected.set()
await trio.sleep_forever()
except HandshakeError:
log.exception('Retrying connection')
log.exception(f'Retrying connection')
# ws & nursery block ends
@ -359,8 +359,8 @@ async def open_autorecon_ws(
'''
JSONRPC response-request style machinery for transparent multiplexing
of msgs over a `NoBsWs`.
JSONRPC response-request style machinery for transparent multiplexing of msgs
over a NoBsWs.
'''
@ -377,82 +377,43 @@ async def open_jsonrpc_session(
url: str,
start_id: int = 0,
response_type: type = JSONRPCResult,
msg_recv_timeout: float = float('inf'),
# ^NOTE, since only `deribit` is using this jsonrpc stuff atm
# and options mkts are generally "slow moving"..
#
# FURTHER if we break the underlying ws connection then since we
# don't pass a `fixture` to the task that manages `NoBsWs`, i.e.
# `_reconnect_forever()`, the jsonrpc "transport pipe" get's
# broken and never restored with wtv init sequence is required to
# re-establish a working req-resp session.
request_type: Optional[type] = None,
request_hook: Optional[Callable] = None,
error_hook: Optional[Callable] = None,
) -> Callable[[str, dict], dict]:
'''
Init a json-RPC-over-websocket connection to the provided `url`.
A `json_rpc: Callable[[str, dict], dict` is delivered to the
caller for sending requests and a bg-`trio.Task` handles
processing of response msgs including error reporting/raising in
the parent/caller task.
'''
# NOTE, store all request msgs so we can raise errors on the
# caller side!
req_msgs: dict[int, dict] = {}
async with (
trio.open_nursery() as tn,
open_autorecon_ws(
url=url,
msg_recv_timeout=msg_recv_timeout,
) as ws
trio.open_nursery() as n,
open_autorecon_ws(url) as ws
):
rpc_id: Iterable[int] = count(start_id)
rpc_id: Iterable = count(start_id)
rpc_results: dict[int, dict] = {}
async def json_rpc(
method: str,
params: dict,
) -> dict:
async def json_rpc(method: str, params: dict) -> dict:
'''
perform a json rpc call and wait for the result, raise exception in
case of error field present on response
'''
nonlocal req_msgs
req_id: int = next(rpc_id)
msg = {
'jsonrpc': '2.0',
'id': req_id,
'id': next(rpc_id),
'method': method,
'params': params
}
_id = msg['id']
result = rpc_results[_id] = {
rpc_results[_id] = {
'result': None,
'error': None,
'event': trio.Event(), # signal caller resp arrived
'event': trio.Event()
}
req_msgs[_id] = msg
await ws.send_msg(msg)
# wait for reponse before unblocking requester code
await rpc_results[_id]['event'].wait()
if (maybe_result := result['result']):
ret = maybe_result
del rpc_results[_id]
ret = rpc_results[_id]['result']
else:
err = result['error']
raise Exception(
f'JSONRPC request failed\n'
f'req: {msg}\n'
f'resp: {err}\n'
)
del rpc_results[_id]
if ret.error is not None:
raise Exception(json.dumps(ret.error, indent=4))
@ -467,7 +428,6 @@ async def open_jsonrpc_session(
the server side.
'''
nonlocal req_msgs
async for msg in ws:
match msg:
case {
@ -491,28 +451,19 @@ async def open_jsonrpc_session(
'params': _,
}:
log.debug(f'Recieved\n{msg}')
if request_hook:
await request_hook(request_type(**msg))
case {
'error': error
}:
# retreive orig request msg, set error
# response in original "result" msg,
# THEN FINALLY set the event to signal caller
# to raise the error in the parent task.
req_id: int = error['id']
req_msg: dict = req_msgs[req_id]
result: dict = rpc_results[req_id]
result['error'] = error
result['event'].set()
log.error(
f'JSONRPC request failed\n'
f'req: {req_msg}\n'
f'resp: {error}\n'
)
log.warning(f'Recieved\n{error}')
if error_hook:
await error_hook(response_type(**msg))
case _:
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
tn.start_soon(recv_task)
n.start_soon(recv_task)
yield json_rpc
tn.cancel_scope.cancel()
n.cancel_scope.cancel()

View File

@ -386,8 +386,6 @@ def ldshm(
open_annot_ctl() as actl,
):
shm_df: pl.DataFrame | None = None
tf2aids: dict[float, dict] = {}
for (
shmfile,
shm,
@ -528,17 +526,16 @@ def ldshm(
new_df,
step_gaps,
)
# last chance manual overwrites in REPL
# await tractor.pause()
await tractor.pause()
assert aids
tf2aids[period_s] = aids
else:
# allow interaction even when no ts problems.
assert not diff
await tractor.pause()
# assert not diff
await tractor.pause()
log.info('Exiting TSP shm anal-izer!')
if shm_df is None:
log.error(

View File

@ -161,13 +161,7 @@ class NativeStorageClient:
def index_files(self):
for path in self._datadir.iterdir():
if (
path.is_dir()
or
'.parquet' not in str(path)
# or
# path.name in {'borked', 'expired',}
):
if path.name in {'borked', 'expired',}:
continue
key: str = path.name.rstrip('.parquet')

View File

@ -44,10 +44,8 @@ import trio
from trio_typing import TaskStatus
import tractor
from pendulum import (
Interval,
DateTime,
Duration,
duration as mk_duration,
from_timestamp,
)
import numpy as np
@ -216,8 +214,7 @@ async def maybe_fill_null_segments(
# pair, immediately stop backfilling?
if (
start_dt
and
end_dt < start_dt
and end_dt < start_dt
):
await tractor.pause()
break
@ -265,7 +262,6 @@ async def maybe_fill_null_segments(
except tractor.ContextCancelled:
# log.exception
await tractor.pause()
raise
null_segs_detected.set()
# RECHECK for more null-gaps
@ -353,7 +349,7 @@ async def maybe_fill_null_segments(
async def start_backfill(
get_hist,
def_frame_duration: Duration,
frame_types: dict[str, Duration] | None,
mod: ModuleType,
mkt: MktPair,
shm: ShmArray,
@ -383,23 +379,22 @@ async def start_backfill(
update_start_on_prepend: bool = False
if backfill_until_dt is None:
# TODO: per-provider default history-durations?
# -[ ] inside the `open_history_client()` config allow
# declaring the history duration limits instead of
# guessing and/or applying the same limits to all?
#
# -[ ] allow declaring (default) per-provider backfill
# limits inside a [storage] sub-section in conf.toml?
#
# NOTE, when no tsdb "last datum" is provided, we just
# load some near-term history by presuming a "decently
# large" 60s duration limit and a much shorter 1s range.
# TODO: drop this right and just expose the backfill
# limits inside a [storage] section in conf.toml?
# when no tsdb "last datum" is provided, we just load
# some near-term history.
# periods = {
# 1: {'days': 1},
# 60: {'days': 14},
# }
# do a decently sized backfill and load it into storage.
periods = {
1: {'days': 2},
60: {'years': 6},
}
period_duration: int = periods[timeframe]
update_start_on_prepend: bool = True
update_start_on_prepend = True
# NOTE: manually set the "latest" datetime which we intend to
# backfill history "until" so as to adhere to the history
@ -421,6 +416,7 @@ async def start_backfill(
f'backfill_until_dt: {backfill_until_dt}\n'
f'last_start_dt: {last_start_dt}\n'
)
try:
(
array,
@ -430,114 +426,71 @@ async def start_backfill(
timeframe,
end_dt=last_start_dt,
)
except NoData as _daterr:
orig_last_start_dt: datetime = last_start_dt
gap_report: str = (
f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n'
f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n'
f'last_start_dt: {orig_last_start_dt}\n\n'
f'bf_until: {backfill_until_dt}\n'
)
# EMPTY FRAME signal with 3 (likely) causes:
#
# 1. range contains legit gap in venue history
# 2. history actually (edge case) **began** at the
# value `last_start_dt`
# 3. some other unknown error (ib blocking the
# history-query bc they don't want you seeing how
# they cucked all the tinas.. like with options
# hist)
#
if def_frame_duration:
# decrement by a duration's (frame) worth of time
# as maybe indicated by the backend to see if we
# can get older data before this possible
# "history gap".
last_start_dt: datetime = last_start_dt.subtract(
seconds=def_frame_duration.total_seconds()
# 3 cases:
# - frame in the middle of a legit venue gap
# - history actually began at the `last_start_dt`
# - some other unknown error (ib blocking the
# history bc they don't want you seeing how they
# cucked all the tinas..)
if dur := frame_types.get(timeframe):
# decrement by a frame's worth of duration and
# retry a few times.
last_start_dt.subtract(
seconds=dur.total_seconds()
)
gap_report += (
f'Decrementing `end_dt` and retrying with,\n'
f'def_frame_duration: {def_frame_duration}\n'
f'(new) last_start_dt: {last_start_dt}\n'
log.warning(
f'{mod.name} -> EMPTY FRAME for end_dt?\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
f'Decrementing `end_dt` by {dur} and retry..\n'
)
log.warning(gap_report)
# skip writing to shm/tsdb and try the next
# duration's worth of prior history.
continue
else:
# await tractor.pause()
raise DataUnavailable(gap_report)
# broker says there never was or is no more history to pull
except DataUnavailable as due:
message: str = due.args[0]
except DataUnavailable:
log.warning(
f'Provider {mod.name!r} halted backfill due to,\n\n'
f'{message}\n'
f'fqme: {mkt.fqme}\n'
f'timeframe: {timeframe}\n'
f'last_start_dt: {last_start_dt}\n'
f'bf_until: {backfill_until_dt}\n'
f'NO-MORE-DATA in range?\n'
f'`{mod.name}` halted history:\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
)
# UGH: what's a better way?
# TODO: backends are responsible for being correct on
# this right!?
# -[ ] in the `ib` case we could maybe offer some way
# to halt the request loop until the condition is
# resolved or should the backend be entirely in
# charge of solving such faults? yes, right?
# ugh, what's a better way?
# TODO: fwiw, we probably want a way to signal a throttle
# condition (eg. with ib) so that we can halt the
# request loop until the condition is resolved?
if timeframe > 1:
await tractor.pause()
return
time: np.ndarray = array['time']
assert (
time[0]
array['time'][0]
==
next_start_dt.timestamp()
)
assert time[-1] == next_end_dt.timestamp()
expected_dur: Interval = last_start_dt - next_start_dt
diff = last_start_dt - next_start_dt
frame_time_diff_s = diff.seconds
# frame's worth of sample-period-steps, in seconds
frame_size_s: float = len(array) * timeframe
recv_frame_dur: Duration = (
from_timestamp(array[-1]['time'])
-
from_timestamp(array[0]['time'])
)
if (
(lt_frame := (recv_frame_dur < expected_dur))
or
(null_frame := (frame_size_s == 0))
# ^XXX, should NEVER hit now!
):
expected_frame_size_s: float = frame_size_s + timeframe
if frame_time_diff_s > expected_frame_size_s:
# XXX: query result includes a start point prior to our
# expected "frame size" and thus is likely some kind of
# history gap (eg. market closed period, outage, etc.)
# so just report it to console for now.
if lt_frame:
reason = 'Possible GAP (or first-datum)'
else:
assert null_frame
reason = 'NULL-FRAME'
missing_dur: Interval = expected_dur.end - recv_frame_dur.end
log.warning(
f'{timeframe}s-series {reason} detected!\n'
f'fqme: {mkt.fqme}\n'
f'last_start_dt: {last_start_dt}\n\n'
f'recv interval: {recv_frame_dur}\n'
f'expected interval: {expected_dur}\n\n'
f'Missing duration of history of {missing_dur.in_words()!r}\n'
f'{missing_dur}\n'
'GAP DETECTED:\n'
f'last_start_dt: {last_start_dt}\n'
f'diff: {diff}\n'
f'frame_time_diff_s: {frame_time_diff_s}\n'
)
# await tractor.pause()
to_push = diff_history(
array,
@ -612,27 +565,22 @@ async def start_backfill(
# long-term storage.
if (
storage is not None
and
write_tsdb
and write_tsdb
):
log.info(
f'Writing {ln} frame to storage:\n'
f'{next_start_dt} -> {last_start_dt}'
)
# NOTE, always drop the src asset token for
# always drop the src asset token for
# non-currency-pair like market types (for now)
#
# THAT IS, for now our table key schema is NOT
# including the dst[/src] source asset token. SO,
# 'tsla.nasdaq.ib' over 'tsla/usd.nasdaq.ib' for
# historical reasons ONLY.
if mkt.dst.atype not in {
'crypto',
'crypto_currency',
'fiat', # a "forex pair"
'perpetual_future', # stupid "perps" from cex land
}:
# for now, our table key schema is not including
# the dst[/src] source asset token.
col_sym_key: str = mkt.get_fqme(
delim_char='',
without_src=True,
@ -737,7 +685,7 @@ async def back_load_from_tsdb(
last_tsdb_dt
and latest_start_dt
):
backfilled_size_s: Duration = (
backfilled_size_s = (
latest_start_dt - last_tsdb_dt
).seconds
# if the shm buffer len is not large enough to contain
@ -960,8 +908,6 @@ async def tsdb_backfill(
f'{pformat(config)}\n'
)
# concurrently load the provider's most-recent-frame AND any
# pre-existing tsdb history already saved in `piker` storage.
dt_eps: list[DateTime, DateTime] = []
async with trio.open_nursery() as tn:
tn.start_soon(
@ -972,6 +918,7 @@ async def tsdb_backfill(
timeframe,
config,
)
tsdb_entry: tuple = await load_tsdb_hist(
storage,
mkt,
@ -1000,25 +947,6 @@ async def tsdb_backfill(
mr_end_dt,
) = dt_eps
first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds
calced_frame_size: Duration = mk_duration(
seconds=first_frame_dur_s,
)
# NOTE, attempt to use the backend declared default frame
# sizing (as allowed by their time-series query APIs) and
# if not provided try to construct a default from the
# first frame received above.
def_frame_durs: dict[
int,
Duration,
]|None = config.get('frame_types', None)
if def_frame_durs:
def_frame_size: Duration = def_frame_durs[timeframe]
assert def_frame_size == calced_frame_size
else:
# use what we calced from first frame above.
def_frame_size = calced_frame_size
# NOTE: when there's no offline data, there's 2 cases:
# - data backend doesn't support timeframe/sample
# period (in which case `dt_eps` should be `None` and
@ -1049,7 +977,7 @@ async def tsdb_backfill(
partial(
start_backfill,
get_hist=get_hist,
def_frame_duration=def_frame_size,
frame_types=config.get('frame_types', None),
mod=mod,
mkt=mkt,
shm=shm,

View File

@ -616,18 +616,6 @@ def detect_price_gaps(
# ])
...
# TODO: probably just use the null_segs impl above?
def detect_vlm_gaps(
df: pl.DataFrame,
col: str = 'volume',
) -> pl.DataFrame:
vnull: pl.DataFrame = w_dts.filter(
pl.col(col) == 0
)
return vnull
def dedupe(
src_df: pl.DataFrame,
@ -638,6 +626,7 @@ def dedupe(
) -> tuple[
pl.DataFrame, # with dts
pl.DataFrame, # gaps
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
int, # len diff between input and deduped
]:
@ -650,22 +639,19 @@ def dedupe(
'''
wdts: pl.DataFrame = with_dts(src_df)
deduped = wdts
# remove duplicated datetime samples/sections
deduped: pl.DataFrame = wdts.unique(
# subset=['dt'],
subset=['time'],
maintain_order=True,
)
# maybe sort on any time field
if sort:
deduped = deduped.sort(by='time')
wdts = wdts.sort(by='time')
# TODO: detect out-of-order segments which were corrected!
# -[ ] report in log msg
# -[ ] possibly return segment sections which were moved?
# remove duplicated datetime samples/sections
deduped: pl.DataFrame = wdts.unique(
subset=['dt'],
maintain_order=True,
)
diff: int = (
wdts.height
-

View File

@ -15,119 +15,140 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[project]
# ------ - ------
[tool.ruff.lint]
# https://docs.astral.sh/ruff/settings/#lint_ignore
ignore = []
# https://docs.astral.sh/ruff/settings/#lint_per-file-ignores
"piker/ui/qt.py" = [
"E402",
'F401', # unused imports (without __all__ or blah as blah)
# "F841", # unused variable rules
]
# ignore-init-module-imports = false
# ------ - ------
[tool.poetry]
name = "piker"
version = "0.1.0a0dev0"
version = "0.1.0.alpha0.dev0"
description = "trading gear for hackers"
authors = [{ name = "Tyler Goodlet", email = "goodboy_foss@protonmail.com" }]
requires-python = ">=3.12, <3.13"
license = "AGPL-3.0-or-later"
authors = ["Tyler Goodlet <goodboy_foss@protonmail.com>"]
license = "AGPLv3"
readme = "README.rst"
keywords = [
"async",
"trading",
"finance",
"quant",
"charting",
]
classifiers = [
"Development Status :: 3 - Alpha",
"License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)",
"Operating System :: POSIX :: Linux",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Intended Audience :: Financial and Insurance Industry",
"Intended Audience :: Science/Research",
"Intended Audience :: Developers",
"Intended Audience :: Education",
]
dependencies = [
"async-generator >=1.10, <2.0.0",
"attrs >=23.1.0, <24.0.0",
"bidict >=0.22.1, <0.23.0",
"colorama >=0.4.6, <0.5.0",
"colorlog >=6.7.0, <7.0.0",
"ib-insync >=0.9.86, <0.10.0",
"numba >=0.59.0, <0.60.0",
"numpy >=1.25, <2.0",
"polars >=0.18.13, <0.19.0",
"pygments >=2.16.1, <3.0.0",
"rich >=13.5.2, <14.0.0",
"tomli >=2.0.1, <3.0.0",
"tomli-w >=1.0.0, <2.0.0",
"trio-util >=0.7.0, <0.8.0",
"trio-websocket >=0.10.3, <0.11.0",
"typer >=0.9.0, <1.0.0",
"rapidfuzz >=3.5.2, <4.0.0",
"pdbp >=1.5.0, <2.0.0",
"trio >=0.24, <0.25",
"pendulum >=3.0.0, <4.0.0",
"httpx >=0.27.0, <0.28.0",
"cryptofeed >=2.4.0, <3.0.0",
"pyarrow >=17.0.0, <18.0.0",
"websockets ==12.0",
"msgspec",
"tractor",
"asyncvnc",
"tomlkit",
]
[project.optional-dependencies]
uis = [
# https://docs.astral.sh/uv/concepts/projects/dependencies/#optional-dependencies
# TODO: make sure the levenshtein shit compiles on nix..
# rapidfuzz = {extras = ["speedup"], version = "^0.18.0"}
"rapidfuzz >=3.2.0, <4.0.0",
"qdarkstyle >=3.0.2, <4.0.0",
"pyqt6 >=6.7.0, <7.0.0",
"pyqtgraph",
# ------ - ------
# for consideration,
# - 'visidata'
[tool.poetry.dependencies]
async-generator = "^1.10"
attrs = "^23.1.0"
bidict = "^0.22.1"
colorama = "^0.4.6"
colorlog = "^6.7.0"
cython = "^3.0.0"
greenback = "^1.1.1"
ib-insync = "^0.9.86"
msgspec = "^0.18.0"
numba = "^0.59.0"
numpy = "^1.25"
polars = "^0.18.13"
pygments = "^2.16.1"
python = ">=3.11, <3.13"
rich = "^13.5.2"
# setuptools = "^68.0.0"
tomli = "^2.0.1"
tomli-w = "^1.0.0"
trio-util = "^0.7.0"
trio-websocket = "^0.10.3"
typer = "^0.9.0"
rapidfuzz = "^3.5.2"
pdbp = "^1.5.0"
trio = "^0.24"
pendulum = "^3.0.0"
httpx = "^0.27.0"
# TODO: add an `--only daemon` group for running non-ui / pikerd
# service tree in distributed mode B)
# https://docs.astral.sh/uv/concepts/projects/dependencies/#optional-dependencies
]
[tool.poetry.dependencies.tractor]
develop = true
git = 'https://github.com/goodboy/tractor.git'
branch = 'asyncio_debugger_support'
# path = "../tractor"
[dependency-groups]
# TODO: a toolset that makes debugging a `pikerd` service (tree) easy
# to hack on directly using more or less the local env:
[tool.poetry.dependencies.asyncvnc]
git = 'https://github.com/pikers/asyncvnc.git'
branch = 'main'
[tool.poetry.dependencies.tomlkit]
develop = true
git = 'https://github.com/pikers/tomlkit.git'
branch = 'piker_pin'
# path = "../tomlkit/"
[tool.poetry.group.uis]
optional = true
[tool.poetry.group.uis.dependencies]
# https://python-poetry.org/docs/managing-dependencies/#dependency-groups
# TODO: make sure the levenshtein shit compiles on nix..
# rapidfuzz = {extras = ["speedup"], version = "^0.18.0"}
rapidfuzz = "^3.2.0"
qdarkstyle = ">=3.0.2"
pyqtgraph = { git = 'https://github.com/pikers/pyqtgraph.git' }
# ------ - ------
pyqt6 = "^6.7.0"
[tool.poetry.group.dev]
optional = true
[tool.poetry.group.dev.dependencies]
# testing / CI
pytest = "^6.0.0"
elasticsearch = "^8.9.0"
xonsh = "^0.14.2"
prompt-toolkit = "3.0.40"
# console ehancements and eventually remote debugging
# extras/helpers.
# TODO: add a toolset that makes debugging a `pikerd` service
# (tree) easy to hack on directly using more or less the local env:
# - xonsh + xxh
# - rsyscall + pdbp
# - actor runtime control console like BEAM/OTP
#
# console ehancements and eventually remote debugging extras/helpers.
# use `uv --dev` to enable
dev = [
"pytest >=6.0.0, <7.0.0",
"elasticsearch >=8.9.0, <9.0.0",
"xonsh >=0.14.2, <0.15.0",
"prompt-toolkit ==3.0.40",
"cython >=3.0.0, <4.0.0",
"greenback >=1.1.1, <2.0.0",
"ruff>=0.9.6",
# ------ - ------
# TODO: add an `--only daemon` group for running non-ui / pikerd
# service tree in distributed mode B)
# https://python-poetry.org/docs/managing-dependencies/#installing-group-dependencies
# [tool.poetry.group.daemon.dependencies]
[tool.poetry.scripts]
piker = 'piker.cli:cli'
pikerd = 'piker.cli:pikerd'
ledger = 'piker.accounting.cli:ledger'
[project]
keywords=[
"async",
"trading",
"finance",
"quant",
"charting",
]
classifiers=[
'Development Status :: 3 - Alpha',
"License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)",
'Operating System :: POSIX :: Linux',
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
'Intended Audience :: Financial and Insurance Industry',
'Intended Audience :: Science/Research',
'Intended Audience :: Developers',
'Intended Audience :: Education',
]
[project.scripts]
piker = "piker.cli:cli"
pikerd = "piker.cli:pikerd"
ledger = "piker.accounting.cli:ledger"
[tool.hatch.build.targets.sdist]
include = ["piker"]
[tool.hatch.build.targets.wheel]
include = ["piker"]
[tool.uv.sources]
pyqtgraph = { git = "https://github.com/pikers/pyqtgraph.git" }
asyncvnc = { git = "https://github.com/pikers/asyncvnc.git", branch = "main" }
tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" }
msgspec = { git = "https://github.com/jcrist/msgspec.git" }
tractor = { path = "../tractor", editable = true }

View File

@ -1,93 +0,0 @@
# from default `ruff.toml` @
# https://docs.astral.sh/ruff/configuration/
# Exclude a variety of commonly ignored directories.
exclude = [
".bzr",
".direnv",
".eggs",
".git",
".git-rewrite",
".hg",
".ipynb_checkpoints",
".mypy_cache",
".nox",
".pants.d",
".pyenv",
".pytest_cache",
".pytype",
".ruff_cache",
".svn",
".tox",
".venv",
".vscode",
"__pypackages__",
"_build",
"buck-out",
"build",
"dist",
"node_modules",
"site-packages",
"venv",
]
# Same as Black.
line-length = 88
indent-width = 4
# Assume Python 3.9
target-version = "py312"
# ------ - ------
# TODO, stop warnings around `anext()` builtin use?
# tool.ruff.target-version = "py310"
[lint]
# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default.
# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or
# McCabe complexity (`C901`) by default.
select = ["E4", "E7", "E9", "F"]
ignore = []
ignore-init-module-imports = false
[lint.per-file-ignores]
"piker/ui/qt.py" = [
"E402",
'F401', # unused imports (without __all__ or blah as blah)
# "F841", # unused variable rules
]
# Allow fix for all enabled rules (when `--fix`) is provided.
fixable = ["ALL"]
unfixable = []
# Allow unused variables when underscore-prefixed.
dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
[format]
# Use single quotes in `ruff format`.
quote-style = "single"
# Like Black, indent with spaces, rather than tabs.
indent-style = "space"
# Like Black, respect magic trailing commas.
skip-magic-trailing-comma = false
# Like Black, automatically detect the appropriate line ending.
line-ending = "auto"
# Enable auto-formatting of code examples in docstrings. Markdown,
# reStructuredText code/literal blocks and doctests are all supported.
#
# This is currently disabled by default, but it is planned for this
# to be opt-out in the future.
docstring-code-format = false
# Set the line length limit used when formatting code snippets in
# docstrings.
#
# This only has an effect when the `docstring-code-format` setting is
# enabled.
docstring-code-line-length = "dynamic"

1500
uv.lock

File diff suppressed because it is too large Load Diff