Compare commits

..

No commits in common. "gitea_feats" and "ib_refinements" have entirely different histories.

21 changed files with 575 additions and 2695 deletions

View File

@ -1,161 +1,162 @@
piker piker
----- -----
trading gear for hackers trading gear for hackers.
|gh_actions| |gh_actions|
.. |gh_actions| image:: https://img.shields.io/endpoint.svg?url=https%3A%2F%2Factions-badge.atrox.dev%2Fpikers%2Fpiker%2Fbadge&style=popout-square .. |gh_actions| image:: https://img.shields.io/endpoint.svg?url=https%3A%2F%2Factions-badge.atrox.dev%2Fpikers%2Fpiker%2Fbadge&style=popout-square
:target: https://actions-badge.atrox.dev/piker/pikers/goto :target: https://actions-badge.atrox.dev/piker/pikers/goto
``piker`` is a broker agnostic, next-gen FOSS toolset and runtime for ``piker`` is a broker agnostic, next-gen FOSS toolset for real-time
real-time computational trading targeted at `hardcore Linux users computational trading targeted at `hardcore Linux users <comp_trader>`_ .
<comp_trader>`_ .
we use much bleeding edge tech including (but not limited to): we use as much bleeding edge tech as possible including (but not limited to):
- latest python for glue_ - latest python for glue_
- uv_ for packaging and distribution - trio_ & tractor_ for our distributed, multi-core, real-time streaming
- trio_ & tractor_ for our distributed `structured concurrency`_ runtime `structured concurrency`_ runtime B)
- Qt_ for pristine low latency UIs - Qt_ for pristine high performance UIs
- pyqtgraph_ (which we've extended) for real-time charting and graphics - pyqtgraph_ for real-time charting
- ``polars`` ``numpy`` and ``numba`` for redic `fast numerics`_ - ``polars`` ``numpy`` and ``numba`` for `fast numerics`_
- `apache arrow and parquet`_ for time-series storage - `apache arrow and parquet`_ for time series history management
persistence and sharing
- (prototyped) techtonicdb_ for L2 book storage
potential projects we might integrate with soon, .. |travis| image:: https://img.shields.io/travis/pikers/piker/master.svg
:target: https://travis-ci.org/pikers/piker
- (already prototyped in ) techtonicdb_ for L2 book storage
.. _comp_trader: https://jfaleiro.wordpress.com/2019/10/09/computational-trader/
.. _glue: https://numpy.org/doc/stable/user/c-info.python-as-glue.html#using-python-as-glue
.. _uv: https://docs.astral.sh/uv/
.. _trio: https://github.com/python-trio/trio .. _trio: https://github.com/python-trio/trio
.. _tractor: https://github.com/goodboy/tractor .. _tractor: https://github.com/goodboy/tractor
.. _structured concurrency: https://trio.discourse.group/ .. _structured concurrency: https://trio.discourse.group/
.. _marketstore: https://github.com/alpacahq/marketstore
.. _techtonicdb: https://github.com/0b01/tectonicdb
.. _Qt: https://www.qt.io/ .. _Qt: https://www.qt.io/
.. _pyqtgraph: https://github.com/pyqtgraph/pyqtgraph .. _pyqtgraph: https://github.com/pyqtgraph/pyqtgraph
.. _glue: https://numpy.org/doc/stable/user/c-info.python-as-glue.html#using-python-as-glue
.. _apache arrow and parquet: https://arrow.apache.org/faq/ .. _apache arrow and parquet: https://arrow.apache.org/faq/
.. _fast numerics: https://zerowithdot.com/python-numpy-and-pandas-performance/ .. _fast numerics: https://zerowithdot.com/python-numpy-and-pandas-performance/
.. _techtonicdb: https://github.com/0b01/tectonicdb .. _comp_trader: https://jfaleiro.wordpress.com/2019/10/09/computational-trader/
focus and feats: focus and features:
**************** *******************
fitting with these tenets, we're always open to new - 100% federated: your code, your hardware, your data feeds, your broker fills.
framework/lib/service interop suggestions and ideas! - zero web: low latency, native software that doesn't try to re-invent the OS
- maximal **privacy**: prevent brokers and mms from knowing your
planz; smack their spreads with dark volume.
- zero clutter: modal, context oriented UIs that echew minimalism, reduce
thought noise and encourage un-emotion.
- first class parallelism: built from the ground up on next-gen structured concurrency
primitives.
- traders first: broker/exchange/asset-class agnostic
- systems grounded: real-time financial signal processing that will
make any queuing or DSP eng juice their shorts.
- non-tina UX: sleek, powerful keyboard driven interaction with expected use in tiling wms
- data collaboration: every process and protocol is multi-host scalable.
- fight club ready: zero interest in adoption by suits; no corporate friendly license, ever.
- **100% federated**: fitting with these tenets, we're always open to new framework suggestions and ideas.
your code, your hardware, your data feeds, your broker fills.
- **zero web**: building the best looking, most reliable, keyboard friendly trading
low latency as a prime objective, native UIs and modern IPC platform is the dream; join the cause.
protocols without trying to re-invent the "OS-as-an-app"..
- **maximal privacy**:
prevent brokers and mms from knowing your planz; smack their
spreads with dark volume from a VPN tunnel.
- **zero clutter**:
modal, context oriented UIs that echew minimalism, reduce thought
noise and encourage un-emotion.
- **first class parallelism**:
built from the ground up on a next-gen structured concurrency
supervision sys.
- **traders first**:
broker/exchange/venue/asset-class/money-sys agnostic
- **systems grounded**:
real-time financial signal processing (fsp) that will make any
queuing or DSP eng juice their shorts.
- **non-tina UX**:
sleek, powerful keyboard driven interaction with expected use in
tiling wms (or maybe even a DDE).
- **data collab at scale**:
every actor-process and protocol is multi-host aware.
- **fight club ready**:
zero interest in adoption by suits; no corporate friendly license,
ever.
building the hottest looking, fastest, most reliable, keyboard
friendly FOSS trading platform is the dream; join the cause.
a sane install with `uv` sane install with `poetry`
************************ **************************
bc why install with `python` when you can faster with `rust` :: TODO!
uv lock
rigorous install on ``nixos`` using ``poetry2nix``
**************************************************
TODO!
hacky install on nixos hacky install on nixos
********************** **********************
``NixOS`` is our core devs' distro of choice for which we offer `NixOS` is our core devs' distro of choice for which we offer
a stringently defined development shell envoirment that can be loaded with:: a stringently defined development shell envoirment that can be loaded with::
nix-shell default.nix nix-shell develop.nix
this will setup the required python environment to run piker, make sure to
run::
pip install -r requirements.txt -e .
once after loading the shell
start a chart install wild-west style via `pip`
************* *********************************
run a realtime OHLCV chart stand-alone:: ``piker`` is currently under heavy pre-alpha development and as such
should be cloned from this repo and hacked on directly.
piker -l info chart btcusdt.spot.binance xmrusdt.spot.kraken for a development install::
this runs a chart UI (with 1m sampled OHLCV) and shows 2 spot markets from 2 diff cexes git clone git@github.com:pikers/piker.git
overlayed on the same graph. Use of `piker` without first starting cd piker
a daemon (`pikerd` - see below) means there is an implicit spawning of the virtualenv env
multi-actor-runtime (implemented as a `tractor` app). source ./env/bin/activate
pip install -r requirements.txt -e .
For additional subsystem feats available through our chart UI see the
various sub-readmes:
- order control using a mouse-n-keyboard UX B)
- cross venue market-pair (what most call "symbol") search, select, overlay Bo
- financial-signal-processing (`piker.fsp`) write-n-reload to sub-chart BO
- src-asset derivatives scan for anal, like the infamous "max pain" XO
spawn a daemon standalone check out our charts
************************* ********************
we call the root actor-process the ``pikerd``. it can be (and is bet you weren't expecting this from the foss::
recommended normally to be) started separately from the ``piker
chart`` program:: piker -l info -b kraken -b binance chart btcusdt.binance --pdb
this runs the main chart (currently with 1m sampled OHLC) in in debug
mode and you can practice paper trading using the following
micro-manual:
``order_mode`` (
edge triggered activation by any of the following keys,
``mouse-click`` on y-level to submit at that price
):
- ``f``/ ``ctl-f`` to stage buy
- ``d``/ ``ctl-d`` to stage sell
- ``a`` to stage alert
``search_mode`` (
``ctl-l`` or ``ctl-space`` to open,
``ctl-c`` or ``ctl-space`` to close
) :
- begin typing to have symbol search automatically lookup
symbols from all loaded backend (broker) providers
- arrow keys and mouse click to navigate selection
- vi-like ``ctl-[hjkl]`` for navigation
you can also configure your position allocation limits from the
sidepane.
run in distributed mode
***********************
start the service manager and data feed daemon in the background and
connect to it::
pikerd -l info --pdb pikerd -l info --pdb
the daemon does nothing until a ``piker``-client (like ``piker
chart``) connects and requests some particular sub-system. for
a connecting chart ``pikerd`` will spawn and manage at least,
- a data-feed daemon: ``datad`` which does all the work of comms with connect your chart::
the backend provider (in this case the ``binance`` cex).
- a paper-trading engine instance, ``paperboi.binance``, (if no live
account has been configured) which allows for auto/manual order
control against the live quote stream.
*using* an actor-service (aka micro-daemon) manager which dynamically piker -l info -b kraken -b binance chart xmrusdt.binance --pdb
supervises various sub-subsystems-as-services throughout the ``piker``
runtime-stack.
now you can (implicitly) connect your chart::
piker chart btcusdt.spot.binance enjoy persistent real-time data feeds tied to daemon lifetime. the next
time you spawn a chart it will load much faster since the data feed has
since ``pikerd`` was started separately you can now enjoy a persistent been cached and is now always running live in the background until you
real-time data stream tied to the daemon-tree's lifetime. i.e. the next kill ``pikerd``.
time you spawn a chart it will obviously not only load much faster
(since the underlying ``datad.binance`` is left running with its
in-memory IPC data structures) but also the data-feed and any order
mgmt states should be persistent until you finally cancel ``pikerd``.
if anyone asks you what this project is about if anyone asks you what this project is about
********************************************* *********************************************
you don't talk about it; just use it. you don't talk about it.
how do i get involved? how do i get involved?
@ -165,15 +166,6 @@ enter the matrix.
how come there ain't that many docs how come there ain't that many docs
*********************************** ***********************************
i mean we want/need them but building the core right has been higher suck it up, learn the code; no one is trying to sell you on anything.
prio then marketting (and likely will stay that way Bp). also, we need lotsa help so if you want to start somewhere and can't
necessarily write serious code, this might be the place for you!
soo, suck it up bc,
- no one is trying to sell you on anything
- learning the code base is prolly way more valuable
- the UI/UXs are intended to be "intuitive" for any hacker..
we obviously need tonz help so if you want to start somewhere and
can't necessarily write "advanced" concurrent python/rust code, this
helping document literally anything might be the place for you!

View File

@ -1,134 +0,0 @@
with (import <nixpkgs> {});
let
glibStorePath = lib.getLib glib;
zlibStorePath = lib.getLib zlib;
zstdStorePath = lib.getLib zstd;
dbusStorePath = lib.getLib dbus;
libGLStorePath = lib.getLib libGL;
freetypeStorePath = lib.getLib freetype;
qt6baseStorePath = lib.getLib qt6.qtbase;
fontconfigStorePath = lib.getLib fontconfig;
libxkbcommonStorePath = lib.getLib libxkbcommon;
xcbutilcursorStorePath = lib.getLib xcb-util-cursor;
qtpyStorePath = lib.getLib python312Packages.qtpy;
pyqt6StorePath = lib.getLib python312Packages.pyqt6;
pyqt6SipStorePath = lib.getLib python312Packages.pyqt6-sip;
rapidfuzzStorePath = lib.getLib python312Packages.rapidfuzz;
qdarkstyleStorePath = lib.getLib python312Packages.qdarkstyle;
xorgLibX11StorePath = lib.getLib xorg.libX11;
xorgLibxcbStorePath = lib.getLib xorg.libxcb;
xorgxcbutilwmStorePath = lib.getLib xorg.xcbutilwm;
xorgxcbutilimageStorePath = lib.getLib xorg.xcbutilimage;
xorgxcbutilerrorsStorePath = lib.getLib xorg.xcbutilerrors;
xorgxcbutilkeysymsStorePath = lib.getLib xorg.xcbutilkeysyms;
xorgxcbutilrenderutilStorePath = lib.getLib xorg.xcbutilrenderutil;
in
stdenv.mkDerivation {
name = "piker-qt6-uv";
buildInputs = [
# System requirements.
glib
zlib
dbus
zstd
libGL
freetype
qt6.qtbase
libgcc.lib
fontconfig
libxkbcommon
# Xorg requirements
xcb-util-cursor
xorg.libxcb
xorg.libX11
xorg.xcbutilwm
xorg.xcbutilimage
xorg.xcbutilerrors
xorg.xcbutilkeysyms
xorg.xcbutilrenderutil
# Python requirements.
python312Full
python312Packages.uv
python312Packages.qdarkstyle
python312Packages.rapidfuzz
python312Packages.pyqt6
python312Packages.qtpy
];
src = null;
shellHook = ''
set -e
# Set the Qt plugin path
# export QT_DEBUG_PLUGINS=1
QTBASE_PATH="${qt6baseStorePath}/lib"
QT_PLUGIN_PATH="$QTBASE_PATH/qt-6/plugins"
QT_QPA_PLATFORM_PLUGIN_PATH="$QT_PLUGIN_PATH/platforms"
LIB_GCC_PATH="${libgcc.lib}/lib"
GLIB_PATH="${glibStorePath}/lib"
ZSTD_PATH="${zstdStorePath}/lib"
ZLIB_PATH="${zlibStorePath}/lib"
DBUS_PATH="${dbusStorePath}/lib"
LIBGL_PATH="${libGLStorePath}/lib"
FREETYPE_PATH="${freetypeStorePath}/lib"
FONTCONFIG_PATH="${fontconfigStorePath}/lib"
LIB_XKB_COMMON_PATH="${libxkbcommonStorePath}/lib"
XCB_UTIL_CURSOR_PATH="${xcbutilcursorStorePath}/lib"
XORG_LIB_X11_PATH="${xorgLibX11StorePath}/lib"
XORG_LIB_XCB_PATH="${xorgLibxcbStorePath}/lib"
XORG_XCB_UTIL_IMAGE_PATH="${xorgxcbutilimageStorePath}/lib"
XORG_XCB_UTIL_WM_PATH="${xorgxcbutilwmStorePath}/lib"
XORG_XCB_UTIL_RENDER_UTIL_PATH="${xorgxcbutilrenderutilStorePath}/lib"
XORG_XCB_UTIL_KEYSYMS_PATH="${xorgxcbutilkeysymsStorePath}/lib"
XORG_XCB_UTIL_ERRORS_PATH="${xorgxcbutilerrorsStorePath}/lib"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$QTBASE_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$QT_PLUGIN_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$QT_QPA_PLATFORM_PLUGIN_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$LIB_GCC_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$DBUS_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$GLIB_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$ZLIB_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$ZSTD_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$LIBGL_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$FONTCONFIG_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$FREETYPE_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$LIB_XKB_COMMON_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XCB_UTIL_CURSOR_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_LIB_X11_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_LIB_XCB_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_XCB_UTIL_IMAGE_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_XCB_UTIL_WM_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_XCB_UTIL_RENDER_UTIL_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_XCB_UTIL_KEYSYMS_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_XCB_UTIL_ERRORS_PATH"
export LD_LIBRARY_PATH
RPDFUZZ_PATH="${rapidfuzzStorePath}/lib/python3.12/site-packages"
QDRKSTYLE_PATH="${qdarkstyleStorePath}/lib/python3.12/site-packages"
QTPY_PATH="${qtpyStorePath}/lib/python3.12/site-packages"
PYQT6_PATH="${pyqt6StorePath}/lib/python3.12/site-packages"
PYQT6_SIP_PATH="${pyqt6SipStorePath}/lib/python3.12/site-packages"
PATCH="$PATCH:$RPDFUZZ_PATH"
PATCH="$PATCH:$QDRKSTYLE_PATH"
PATCH="$PATCH:$QTPY_PATH"
PATCH="$PATCH:$PYQT6_PATH"
PATCH="$PATCH:$PYQT6_SIP_PATH"
export PATCH
# Install deps
uv lock
'';
}

View File

@ -50,7 +50,7 @@ __brokers__: list[str] = [
'binance', 'binance',
'ib', 'ib',
'kraken', 'kraken',
'kucoin', 'kucoin'
# broken but used to work # broken but used to work
# 'questrade', # 'questrade',
@ -71,7 +71,7 @@ def get_brokermod(brokername: str) -> ModuleType:
Return the imported broker module by name. Return the imported broker module by name.
''' '''
module: ModuleType = import_module('.' + brokername, 'piker.brokers') module = import_module('.' + brokername, 'piker.brokers')
# we only allow monkeying because it's for internal keying # we only allow monkeying because it's for internal keying
module.name = module.__name__.split('.')[-1] module.name = module.__name__.split('.')[-1]
return module return module

View File

@ -18,11 +18,10 @@
Handy cross-broker utils. Handy cross-broker utils.
""" """
from __future__ import annotations
from functools import partial from functools import partial
import json import json
import httpx import asks
import logging import logging
from ..log import ( from ..log import (
@ -61,11 +60,11 @@ class NoData(BrokerError):
def __init__( def __init__(
self, self,
*args, *args,
info: dict|None = None, info: dict,
) -> None: ) -> None:
super().__init__(*args) super().__init__(*args)
self.info: dict|None = info self.info: dict = info
# when raised, machinery can check if the backend # when raised, machinery can check if the backend
# set a "frame size" for doing datetime calcs. # set a "frame size" for doing datetime calcs.
@ -91,18 +90,16 @@ class DataThrottle(BrokerError):
def resproc( def resproc(
resp: httpx.Response, resp: asks.response_objects.Response,
log: logging.Logger, log: logging.Logger,
return_json: bool = True, return_json: bool = True,
log_resp: bool = False, log_resp: bool = False,
) -> httpx.Response: ) -> asks.response_objects.Response:
''' """Process response and return its json content.
Process response and return its json content.
Raise the appropriate error on non-200 OK responses. Raise the appropriate error on non-200 OK responses.
"""
'''
if not resp.status_code == 200: if not resp.status_code == 200:
raise BrokerError(resp.body) raise BrokerError(resp.body)
try: try:

View File

@ -25,13 +25,14 @@ from __future__ import annotations
from collections import ChainMap from collections import ChainMap
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
AsyncExitStack,
) )
from datetime import datetime from datetime import datetime
from pprint import pformat from pprint import pformat
from typing import ( from typing import (
Any, Any,
Callable, Callable,
Hashable,
Sequence,
Type, Type,
) )
import hmac import hmac
@ -42,7 +43,8 @@ import trio
from pendulum import ( from pendulum import (
now, now,
) )
import httpx import asks
from rapidfuzz import process as fuzzy
import numpy as np import numpy as np
from piker import config from piker import config
@ -52,7 +54,6 @@ from piker.clearing._messages import (
from piker.accounting import ( from piker.accounting import (
Asset, Asset,
digits_to_dec, digits_to_dec,
MktPair,
) )
from piker.types import Struct from piker.types import Struct
from piker.data import ( from piker.data import (
@ -68,6 +69,7 @@ from .venues import (
PAIRTYPES, PAIRTYPES,
Pair, Pair,
MarketType, MarketType,
_spot_url, _spot_url,
_futes_url, _futes_url,
_testnet_futes_url, _testnet_futes_url,
@ -77,18 +79,19 @@ from .venues import (
log = get_logger('piker.brokers.binance') log = get_logger('piker.brokers.binance')
def get_config() -> dict[str, Any]: def get_config() -> dict:
conf: dict conf: dict
path: Path path: Path
conf, path = config.load( conf, path = config.load(
conf_name='brokers', conf_name='brokers',
touch_if_dne=True, touch_if_dne=True,
) )
section: dict = conf.get('binance')
section = conf.get('binance')
if not section: if not section:
log.warning( log.warning(f'No config section found for binance in {path}')
f'No config section found for binance in {path}'
)
return {} return {}
return section return section
@ -144,7 +147,7 @@ def binance_timestamp(
class Client: class Client:
''' '''
Async ReST API client using `trio` + `httpx` B) Async ReST API client using ``trio`` + ``asks`` B)
Supports all of the spot, margin and futures endpoints depending Supports all of the spot, margin and futures endpoints depending
on method. on method.
@ -153,17 +156,10 @@ class Client:
def __init__( def __init__(
self, self,
venue_sessions: dict[
str, # venue key
tuple[httpx.AsyncClient, str] # session, eps path
],
conf: dict[str, Any],
# TODO: change this to `Client.[mkt_]venue: MarketType`? # TODO: change this to `Client.[mkt_]venue: MarketType`?
mkt_mode: MarketType = 'spot', mkt_mode: MarketType = 'spot',
) -> None: ) -> None:
self.conf = conf
# build out pair info tables for each market type # build out pair info tables for each market type
# and wrap in a chain-map view for search / query. # and wrap in a chain-map view for search / query.
self._spot_pairs: dict[str, Pair] = {} # spot info table self._spot_pairs: dict[str, Pair] = {} # spot info table
@ -190,13 +186,44 @@ class Client:
# market symbols for use by search. See `.exch_info()`. # market symbols for use by search. See `.exch_info()`.
self._pairs: ChainMap[str, Pair] = ChainMap() self._pairs: ChainMap[str, Pair] = ChainMap()
# spot EPs sesh
self._sesh = asks.Session(connections=4)
self._sesh.base_location: str = _spot_url
# spot testnet
self._test_sesh: asks.Session = asks.Session(connections=4)
self._test_sesh.base_location: str = _testnet_spot_url
# margin and extended spot endpoints session.
self._sapi_sesh = asks.Session(connections=4)
self._sapi_sesh.base_location: str = _spot_url
# futes EPs sesh
self._fapi_sesh = asks.Session(connections=4)
self._fapi_sesh.base_location: str = _futes_url
# futes testnet
self._test_fapi_sesh: asks.Session = asks.Session(connections=4)
self._test_fapi_sesh.base_location: str = _testnet_futes_url
# global client "venue selection" mode. # global client "venue selection" mode.
# set this when you want to switch venues and not have to # set this when you want to switch venues and not have to
# specify the venue for the next request. # specify the venue for the next request.
self.mkt_mode: MarketType = mkt_mode self.mkt_mode: MarketType = mkt_mode
# per-mkt-venue API client table # per 8
self.venue_sesh = venue_sessions self.venue_sesh: dict[
str, # venue key
tuple[asks.Session, str] # session, eps path
] = {
'spot': (self._sesh, '/api/v3/'),
'spot_testnet': (self._test_sesh, '/fapi/v1/'),
'margin': (self._sapi_sesh, '/sapi/v1/'),
'usdtm_futes': (self._fapi_sesh, '/fapi/v1/'),
'usdtm_futes_testnet': (self._test_fapi_sesh, '/fapi/v1/'),
# 'futes_coin': self._dapi, # TODO
}
# lookup for going from `.mkt_mode: str` to the config # lookup for going from `.mkt_mode: str` to the config
# subsection `key: str` # subsection `key: str`
@ -211,6 +238,40 @@ class Client:
'futes': ['usdtm_futes'], 'futes': ['usdtm_futes'],
} }
# for creating API keys see,
# https://www.binance.com/en/support/faq/how-to-create-api-keys-on-binance-360002502072
self.conf: dict = get_config()
for key, subconf in self.conf.items():
if api_key := subconf.get('api_key', ''):
venue_keys: list[str] = self.confkey2venuekeys[key]
venue_key: str
sesh: asks.Session
for venue_key in venue_keys:
sesh, _ = self.venue_sesh[venue_key]
api_key_header: dict = {
# taken from official:
# https://github.com/binance/binance-futures-connector-python/blob/main/binance/api.py#L47
"Content-Type": "application/json;charset=utf-8",
# TODO: prolly should just always query and copy
# in the real latest ver?
"User-Agent": "binance-connector/6.1.6smbz6",
"X-MBX-APIKEY": api_key,
}
sesh.headers.update(api_key_header)
# if `.use_tesnet = true` in the config then
# also add headers for the testnet session which
# will be used for all order control
if subconf.get('use_testnet', False):
testnet_sesh, _ = self.venue_sesh[
venue_key + '_testnet'
]
testnet_sesh.headers.update(api_key_header)
def _mk_sig( def _mk_sig(
self, self,
data: dict, data: dict,
@ -229,6 +290,7 @@ class Client:
'to define the creds for auth-ed endpoints!?' 'to define the creds for auth-ed endpoints!?'
) )
# XXX: Info on security and authentification # XXX: Info on security and authentification
# https://binance-docs.github.io/apidocs/#endpoint-security-type # https://binance-docs.github.io/apidocs/#endpoint-security-type
if not (api_secret := subconf.get('api_secret')): if not (api_secret := subconf.get('api_secret')):
@ -268,9 +330,8 @@ class Client:
- /fapi/v3/ USD-M FUTURES, or - /fapi/v3/ USD-M FUTURES, or
- /api/v3/ SPOT/MARGIN - /api/v3/ SPOT/MARGIN
account/market endpoint request depending on either passed in account/market endpoint request depending on either passed in `venue: str`
`venue: str` or the current setting `.mkt_mode: str` setting, or the current setting `.mkt_mode: str` setting, default `'spot'`.
default `'spot'`.
Docs per venue API: Docs per venue API:
@ -299,6 +360,9 @@ class Client:
venue=venue_key, venue=venue_key,
) )
sesh: asks.Session
path: str
# Check if we're configured to route order requests to the # Check if we're configured to route order requests to the
# venue equivalent's testnet. # venue equivalent's testnet.
use_testnet: bool = False use_testnet: bool = False
@ -323,12 +387,11 @@ class Client:
# ctl machinery B) # ctl machinery B)
venue_key += '_testnet' venue_key += '_testnet'
client: httpx.AsyncClient sesh, path = self.venue_sesh[venue_key]
path: str
client, path = self.venue_sesh[venue_key] meth: Callable = getattr(sesh, method)
meth: Callable = getattr(client, method)
resp = await meth( resp = await meth(
url=path + endpoint, path=path + endpoint,
params=params, params=params,
timeout=float('inf'), timeout=float('inf'),
) )
@ -370,15 +433,7 @@ class Client:
item['filters'] = filters item['filters'] = filters
pair_type: Type = PAIRTYPES[venue] pair_type: Type = PAIRTYPES[venue]
try:
pair: Pair = pair_type(**item) pair: Pair = pair_type(**item)
except Exception as e:
e.add_note(
"\nDon't panic, prolly stupid binance changed their symbology schema again..\n"
'Check out their API docs here:\n\n'
'https://binance-docs.github.io/apidocs/spot/en/#exchange-information'
)
raise
pair_table[pair.symbol.upper()] = pair pair_table[pair.symbol.upper()] = pair
# update an additional top-level-cross-venue-table # update an additional top-level-cross-venue-table
@ -473,9 +528,7 @@ class Client:
''' '''
pair_table: dict[str, Pair] = self._venue2pairs[ pair_table: dict[str, Pair] = self._venue2pairs[
venue venue or self.mkt_mode
or
self.mkt_mode
] ]
if ( if (
expiry expiry
@ -494,9 +547,9 @@ class Client:
venues: list[str] = [venue] venues: list[str] = [venue]
# batch per-venue download of all exchange infos # batch per-venue download of all exchange infos
async with trio.open_nursery() as tn: async with trio.open_nursery() as rn:
for ven in venues: for ven in venues:
tn.start_soon( rn.start_soon(
self._cache_pairs, self._cache_pairs,
ven, ven,
) )
@ -549,11 +602,11 @@ class Client:
) -> dict[str, Any]: ) -> dict[str, Any]:
fq_pairs: dict[str, Pair] = await self.exch_info() fq_pairs: dict = await self.exch_info()
# TODO: cache this list like we were in # TODO: cache this list like we were in
# `open_symbol_search()`? # `open_symbol_search()`?
# keys: list[str] = list(fq_pairs) keys: list[str] = list(fq_pairs)
return match_from_pairs( return match_from_pairs(
pairs=fq_pairs, pairs=fq_pairs,
@ -561,20 +614,9 @@ class Client:
score_cutoff=50, score_cutoff=50,
) )
def pair2venuekey(
self,
pair: Pair,
) -> str:
return {
'USDTM': 'usdtm_futes',
'SPOT': 'spot',
# 'COINM': 'coin_futes',
# ^-TODO-^ bc someone might want it..?
}[pair.venue]
async def bars( async def bars(
self, self,
mkt: MktPair, symbol: str,
start_dt: datetime | None = None, start_dt: datetime | None = None,
end_dt: datetime | None = None, end_dt: datetime | None = None,
@ -604,20 +646,16 @@ class Client:
start_time = binance_timestamp(start_dt) start_time = binance_timestamp(start_dt)
end_time = binance_timestamp(end_dt) end_time = binance_timestamp(end_dt)
bs_pair: Pair = self._pairs[mkt.bs_fqme.upper()]
# https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data # https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data
bars = await self._api( bars = await self._api(
'klines', 'klines',
params={ params={
# NOTE: always query using their native symbology! 'symbol': symbol.upper(),
'symbol': mkt.bs_mktid.upper(),
'interval': '1m', 'interval': '1m',
'startTime': start_time, 'startTime': start_time,
'endTime': end_time, 'endTime': end_time,
'limit': limit 'limit': limit
}, },
venue=self.pair2venuekey(bs_pair),
allow_testnet=False, allow_testnet=False,
) )
new_bars: list[tuple] = [] new_bars: list[tuple] = []
@ -934,148 +972,17 @@ class Client:
await self.close_listen_key(key) await self.close_listen_key(key)
_venue_urls: dict[str, str] = {
'spot': (
_spot_url,
'/api/v3/',
),
'spot_testnet': (
_testnet_spot_url,
'/fapi/v1/'
),
# margin and extended spot endpoints session.
# TODO: did this ever get implemented fully?
# 'margin': (
# _spot_url,
# '/sapi/v1/'
# ),
'usdtm_futes': (
_futes_url,
'/fapi/v1/',
),
'usdtm_futes_testnet': (
_testnet_futes_url,
'/fapi/v1/',
),
# TODO: for anyone who actually needs it ;P
# 'coin_futes': ()
}
def init_api_keys(
client: Client,
conf: dict[str, Any],
) -> None:
'''
Set up per-venue API keys each http client according to the user's
`brokers.conf`.
For ex, to use spot-testnet and live usdt futures APIs:
```toml
[binance]
# spot test net
spot.use_testnet = true
spot.api_key = '<spot_api_key_from_binance_account>'
spot.api_secret = '<spot_api_key_password>'
# futes live
futes.use_testnet = false
accounts.usdtm = 'futes'
futes.api_key = '<futes_api_key_from_binance>'
futes.api_secret = '<futes_api_key_password>''
# if uncommented will use the built-in paper engine and not
# connect to `binance` API servers for order ctl.
# accounts.paper = 'paper'
```
'''
for key, subconf in conf.items():
if api_key := subconf.get('api_key', ''):
venue_keys: list[str] = client.confkey2venuekeys[key]
venue_key: str
client: httpx.AsyncClient
for venue_key in venue_keys:
client, _ = client.venue_sesh[venue_key]
api_key_header: dict = {
# taken from official:
# https://github.com/binance/binance-futures-connector-python/blob/main/binance/api.py#L47
"Content-Type": "application/json;charset=utf-8",
# TODO: prolly should just always query and copy
# in the real latest ver?
"User-Agent": "binance-connector/6.1.6smbz6",
"X-MBX-APIKEY": api_key,
}
client.headers.update(api_key_header)
# if `.use_tesnet = true` in the config then
# also add headers for the testnet session which
# will be used for all order control
if subconf.get('use_testnet', False):
testnet_sesh, _ = client.venue_sesh[
venue_key + '_testnet'
]
testnet_sesh.headers.update(api_key_header)
@acm @acm
async def get_client( async def get_client() -> Client:
mkt_mode: MarketType = 'spot',
) -> Client:
'''
Construct an single `piker` client which composes multiple underlying venue
specific API clients both for live and test networks.
''' client = Client()
venue_sessions: dict[ await client.exch_info()
str, # venue key
tuple[httpx.AsyncClient, str] # session, eps path
] = {}
async with AsyncExitStack() as client_stack:
for name, (base_url, path) in _venue_urls.items():
api: httpx.AsyncClient = await client_stack.enter_async_context(
httpx.AsyncClient(
base_url=base_url,
# headers={},
# TODO: is there a way to numerate this?
# https://www.python-httpx.org/advanced/clients/#why-use-a-client
# connections=4
)
)
venue_sessions[name] = (
api,
path,
)
conf: dict[str, Any] = get_config()
# for creating API keys see,
# https://www.binance.com/en/support/faq/how-to-create-api-keys-on-binance-360002502072
client = Client(
venue_sessions=venue_sessions,
conf=conf,
mkt_mode=mkt_mode,
)
init_api_keys(
client=client,
conf=conf,
)
fq_pairs: dict[str, Pair] = await client.exch_info()
assert fq_pairs
log.info( log.info(
f'Loaded multi-venue `Client` in mkt_mode={client.mkt_mode!r}\n\n' f'{client} in {client.mkt_mode} mode: caching exchange infos..\n'
f'Symbology Summary:\n' 'Cached multi-market pairs:\n'
f'------ - ------\n'
f'spot: {len(client._spot_pairs)}\n' f'spot: {len(client._spot_pairs)}\n'
f'usdtm_futes: {len(client._ufutes_pairs)}\n' f'usdtm_futes: {len(client._ufutes_pairs)}\n'
'------ - ------\n' f'Total: {len(client._pairs)}\n'
f'total: {len(client._pairs)}\n'
) )
yield client yield client

View File

@ -264,20 +264,15 @@ async def open_trade_dialog(
# do a open_symcache() call.. though maybe we can hide # do a open_symcache() call.. though maybe we can hide
# this in a new async version of open_account()? # this in a new async version of open_account()?
async with open_cached_client('binance') as client: async with open_cached_client('binance') as client:
subconf: dict|None = client.conf.get(venue_name) subconf: dict = client.conf[venue_name]
use_testnet = subconf.get('use_testnet', False)
# XXX: if no futes.api_key or spot.api_key has been set we # XXX: if no futes.api_key or spot.api_key has been set we
# always fall back to the paper engine! # always fall back to the paper engine!
if ( if not subconf.get('api_key'):
not subconf
or
not subconf.get('api_key')
):
await ctx.started('paper') await ctx.started('paper')
return return
use_testnet: bool = subconf.get('use_testnet', False)
async with ( async with (
open_cached_client('binance') as client, open_cached_client('binance') as client,
): ):

View File

@ -42,12 +42,12 @@ from trio_typing import TaskStatus
from pendulum import ( from pendulum import (
from_timestamp, from_timestamp,
) )
from rapidfuzz import process as fuzzy
import numpy as np import numpy as np
import tractor import tractor
from piker.brokers import ( from piker.brokers import (
open_cached_client, open_cached_client,
NoData,
) )
from piker._cacheables import ( from piker._cacheables import (
async_lifo_cache, async_lifo_cache,
@ -110,7 +110,6 @@ class AggTrade(Struct, frozen=True):
async def stream_messages( async def stream_messages(
ws: NoBsWs, ws: NoBsWs,
) -> AsyncGenerator[NoBsWs, dict]: ) -> AsyncGenerator[NoBsWs, dict]:
# TODO: match syntax here! # TODO: match syntax here!
@ -221,8 +220,6 @@ def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]:
} }
# TODO, why aren't frame resp `log.info()`s showing in upstream
# code?!
@acm @acm
async def open_history_client( async def open_history_client(
mkt: MktPair, mkt: MktPair,
@ -255,30 +252,24 @@ async def open_history_client(
else: else:
client.mkt_mode = 'spot' client.mkt_mode = 'spot'
array: np.ndarray = await client.bars( # NOTE: always query using their native symbology!
mkt=mkt, mktid: str = mkt.bs_mktid
array = await client.bars(
mktid,
start_dt=start_dt, start_dt=start_dt,
end_dt=end_dt, end_dt=end_dt,
) )
if array.size == 0:
raise NoData(
f'No frame for {start_dt} -> {end_dt}\n'
)
times = array['time'] times = array['time']
if not times.any(): if (
raise ValueError( end_dt is None
'Bad frame with null-times?\n\n' ):
f'{times}' inow = round(time.time())
)
if end_dt is None:
inow: int = round(time.time())
if (inow - times[-1]) > 60: if (inow - times[-1]) > 60:
await tractor.pause() await tractor.pause()
start_dt = from_timestamp(times[0]) start_dt = from_timestamp(times[0])
end_dt = from_timestamp(times[-1]) end_dt = from_timestamp(times[-1])
return array, start_dt, end_dt return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 3, 'rate': 3} yield get_ohlc, {'erlangs': 3, 'rate': 3}
@ -465,8 +456,6 @@ async def stream_quotes(
): ):
init_msgs: list[FeedInit] = [] init_msgs: list[FeedInit] = []
for sym in symbols: for sym in symbols:
mkt: MktPair
pair: Pair
mkt, pair = await get_mkt_info(sym) mkt, pair = await get_mkt_info(sym)
# build out init msgs according to latest spec # build out init msgs according to latest spec
@ -515,6 +504,7 @@ async def stream_quotes(
# start streaming # start streaming
async for typ, quote in msg_gen: async for typ, quote in msg_gen:
# period = time.time() - last # period = time.time() - last
# hz = 1/period if period else float('inf') # hz = 1/period if period else float('inf')
# if hz > 60: # if hz > 60:
@ -550,7 +540,7 @@ async def open_symbol_search(
) )
# repack in fqme-keyed table # repack in fqme-keyed table
byfqme: dict[str, Pair] = {} byfqme: dict[start, Pair] = {}
for pair in pairs.values(): for pair in pairs.values():
byfqme[pair.bs_fqme] = pair byfqme[pair.bs_fqme] = pair

View File

@ -137,12 +137,10 @@ class SpotPair(Pair, frozen=True):
quoteOrderQtyMarketAllowed: bool quoteOrderQtyMarketAllowed: bool
isSpotTradingAllowed: bool isSpotTradingAllowed: bool
isMarginTradingAllowed: bool isMarginTradingAllowed: bool
otoAllowed: bool
defaultSelfTradePreventionMode: str defaultSelfTradePreventionMode: str
allowedSelfTradePreventionModes: list[str] allowedSelfTradePreventionModes: list[str]
permissions: list[str] permissions: list[str]
permissionSets: list[list[str]]
# NOTE: see `.data._symcache.SymbologyCache.load()` for why # NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.binance:SpotPair' ns_path: str = 'piker.brokers.binance:SpotPair'
@ -181,6 +179,7 @@ class FutesPair(Pair):
quoteAsset: str # 'USDT', quoteAsset: str # 'USDT',
quotePrecision: int # 8, quotePrecision: int # 8,
requiredMarginPercent: float # '5.0000', requiredMarginPercent: float # '5.0000',
settlePlan: int # 0,
timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'], timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'],
triggerProtect: float # '0.0500', triggerProtect: float # '0.0500',
underlyingSubType: list[str] # ['PoW'], underlyingSubType: list[str] # ['PoW'],

View File

@ -27,8 +27,8 @@ from typing import (
) )
import time import time
import httpx
import pendulum import pendulum
import asks
import numpy as np import numpy as np
import urllib.parse import urllib.parse
import hashlib import hashlib
@ -60,11 +60,6 @@ log = get_logger('piker.brokers.kraken')
# <uri>/<version>/ # <uri>/<version>/
_url = 'https://api.kraken.com/0' _url = 'https://api.kraken.com/0'
_headers: dict[str, str] = {
'User-Agent': 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
}
# TODO: this is the only backend providing this right? # TODO: this is the only backend providing this right?
# in which case we should drop it from the defaults and # in which case we should drop it from the defaults and
# instead make a custom fields descr in this module! # instead make a custom fields descr in this module!
@ -140,15 +135,16 @@ class Client:
def __init__( def __init__(
self, self,
config: dict[str, str], config: dict[str, str],
httpx_client: httpx.AsyncClient,
name: str = '', name: str = '',
api_key: str = '', api_key: str = '',
secret: str = '' secret: str = ''
) -> None: ) -> None:
self._sesh = asks.Session(connections=4)
self._sesh: httpx.AsyncClient = httpx_client self._sesh.base_location = _url
self._sesh.headers.update({
'User-Agent':
'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
})
self._name = name self._name = name
self._api_key = api_key self._api_key = api_key
self._secret = secret self._secret = secret
@ -170,9 +166,10 @@ class Client:
method: str, method: str,
data: dict, data: dict,
) -> dict[str, Any]: ) -> dict[str, Any]:
resp: httpx.Response = await self._sesh.post( resp = await self._sesh.post(
url=f'/public/{method}', path=f'/public/{method}',
json=data, json=data,
timeout=float('inf')
) )
return resproc(resp, log) return resproc(resp, log)
@ -183,18 +180,18 @@ class Client:
uri_path: str uri_path: str
) -> dict[str, Any]: ) -> dict[str, Any]:
headers = { headers = {
'Content-Type': 'application/x-www-form-urlencoded', 'Content-Type':
'API-Key': self._api_key, 'application/x-www-form-urlencoded',
'API-Sign': get_kraken_signature( 'API-Key':
uri_path, self._api_key,
data, 'API-Sign':
self._secret, get_kraken_signature(uri_path, data, self._secret)
),
} }
resp: httpx.Response = await self._sesh.post( resp = await self._sesh.post(
url=f'/private/{method}', path=f'/private/{method}',
data=data, data=data,
headers=headers, headers=headers,
timeout=float('inf')
) )
return resproc(resp, log) return resproc(resp, log)
@ -668,19 +665,10 @@ class Client:
@acm @acm
async def get_client() -> Client: async def get_client() -> Client:
conf: dict[str, Any] = get_config() conf = get_config()
async with httpx.AsyncClient(
base_url=_url,
headers=_headers,
# TODO: is there a way to numerate this?
# https://www.python-httpx.org/advanced/clients/#why-use-a-client
# connections=4
) as trio_client:
if conf: if conf:
client = Client( client = Client(
conf, conf,
httpx_client=trio_client,
# TODO: don't break these up and just do internal # TODO: don't break these up and just do internal
# conf lookups instead.. # conf lookups instead..
@ -689,10 +677,7 @@ async def get_client() -> Client:
secret=conf['secret'] secret=conf['secret']
) )
else: else:
client = Client( client = Client({})
conf={},
httpx_client=trio_client,
)
# at startup, load all symbols, and asset info in # at startup, load all symbols, and asset info in
# batch requests. # batch requests.

View File

@ -612,18 +612,18 @@ async def open_trade_dialog(
# enter relay loop # enter relay loop
await handle_order_updates( await handle_order_updates(
client=client, client,
ws=ws, ws,
ws_stream=stream, stream,
ems_stream=ems_stream, ems_stream,
apiflows=apiflows, apiflows,
ids=ids, ids,
reqids2txids=reqids2txids, reqids2txids,
acnt=acnt, acnt,
ledger=ledger, api_trans,
acctid=acctid, acctid,
acc_name=acc_name, acc_name,
token=token, token,
) )
@ -639,8 +639,7 @@ async def handle_order_updates(
# transaction records which will be updated # transaction records which will be updated
# on new trade clearing events (aka order "fills") # on new trade clearing events (aka order "fills")
ledger: TransactionLedger, ledger_trans: dict[str, Transaction],
# ledger_trans: dict[str, Transaction],
acctid: str, acctid: str,
acc_name: str, acc_name: str,
token: str, token: str,
@ -700,8 +699,7 @@ async def handle_order_updates(
# if tid not in ledger_trans # if tid not in ledger_trans
} }
for tid, trade in trades.items(): for tid, trade in trades.items():
# assert tid not in ledger_trans assert tid not in ledger_trans
assert tid not in ledger
txid = trade['ordertxid'] txid = trade['ordertxid']
reqid = trade.get('userref') reqid = trade.get('userref')
@ -749,17 +747,11 @@ async def handle_order_updates(
client, client,
api_name_set='wsname', api_name_set='wsname',
) )
ppmsgs: list[BrokerdPosition] = trades2pps( ppmsgs = trades2pps(
acnt=acnt, acnt,
ledger=ledger, acctid,
acctid=acctid, new_trans,
new_trans=new_trans,
) )
# ppmsgs = trades2pps(
# acnt,
# acctid,
# new_trans,
# )
for pp_msg in ppmsgs: for pp_msg in ppmsgs:
await ems_stream.send(pp_msg) await ems_stream.send(pp_msg)

View File

@ -16,9 +16,10 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
''' '''
Kucoin cex API backend. Kucoin broker backend
''' '''
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
aclosing, aclosing,
@ -41,7 +42,7 @@ import wsproto
from uuid import uuid4 from uuid import uuid4
from trio_typing import TaskStatus from trio_typing import TaskStatus
import httpx import asks
from bidict import bidict from bidict import bidict
import numpy as np import numpy as np
import pendulum import pendulum
@ -62,7 +63,7 @@ from piker._cacheables import (
) )
from piker.log import get_logger from piker.log import get_logger
from piker.data.validate import FeedInit from piker.data.validate import FeedInit
from piker.types import Struct # NOTE, this is already a `tractor.msg.Struct` from piker.types import Struct
from piker.data import ( from piker.data import (
def_iohlcv_fields, def_iohlcv_fields,
match_from_pairs, match_from_pairs,
@ -98,18 +99,9 @@ class KucoinMktPair(Struct, frozen=True):
def size_tick(self) -> Decimal: def size_tick(self) -> Decimal:
return Decimal(str(self.quoteMinSize)) return Decimal(str(self.quoteMinSize))
callauctionFirstStageStartTime: None|float
callauctionIsEnabled: bool
callauctionPriceCeiling: float|None
callauctionPriceFloor: float|None
callauctionSecondStageStartTime: float|None
callauctionThirdStageStartTime: float|None
enableTrading: bool enableTrading: bool
feeCategory: int
feeCurrency: str feeCurrency: str
isMarginEnabled: bool isMarginEnabled: bool
makerFeeCoefficient: float
market: str market: str
minFunds: float minFunds: float
name: str name: str
@ -119,10 +111,7 @@ class KucoinMktPair(Struct, frozen=True):
quoteIncrement: float quoteIncrement: float
quoteMaxSize: float quoteMaxSize: float
quoteMinSize: float quoteMinSize: float
st: bool
symbol: str # our bs_mktid, kucoin's internal id symbol: str # our bs_mktid, kucoin's internal id
takerFeeCoefficient: float
tradingStartTime: float|None
class AccountTrade(Struct, frozen=True): class AccountTrade(Struct, frozen=True):
@ -223,11 +212,7 @@ def get_config() -> BrokerConfig | None:
class Client: class Client:
def __init__( def __init__(self) -> None:
self,
httpx_client: httpx.AsyncClient,
) -> None:
self._http: httpx.AsyncClient = httpx_client
self._config: BrokerConfig | None = get_config() self._config: BrokerConfig | None = get_config()
self._pairs: dict[str, KucoinMktPair] = {} self._pairs: dict[str, KucoinMktPair] = {}
self._fqmes2mktids: bidict[str, str] = bidict() self._fqmes2mktids: bidict[str, str] = bidict()
@ -242,24 +227,18 @@ class Client:
) -> dict[str, str | bytes]: ) -> dict[str, str | bytes]:
''' '''
Generate authenticated request headers: Generate authenticated request headers
https://docs.kucoin.com/#authentication https://docs.kucoin.com/#authentication
https://www.kucoin.com/docs/basic-info/connection-method/authentication/creating-a-request
https://www.kucoin.com/docs/basic-info/connection-method/authentication/signing-a-message
''' '''
if not self._config: if not self._config:
raise ValueError( raise ValueError(
'No config found when trying to send authenticated request' 'No config found when trying to send authenticated request')
)
str_to_sign = ( str_to_sign = (
str(int(time.time() * 1000)) str(int(time.time() * 1000))
+ + action + f'/api/{api}/{endpoint.lstrip("/")}'
action
+
f'/api/{api}/{endpoint.lstrip("/")}'
) )
signature = base64.b64encode( signature = base64.b64encode(
@ -270,7 +249,6 @@ class Client:
).digest() ).digest()
) )
# TODO: can we cache this between calls?
passphrase = base64.b64encode( passphrase = base64.b64encode(
hmac.new( hmac.new(
self._config.key_secret.encode('utf-8'), self._config.key_secret.encode('utf-8'),
@ -292,10 +270,8 @@ class Client:
self, self,
action: Literal['POST', 'GET'], action: Literal['POST', 'GET'],
endpoint: str, endpoint: str,
api: str = 'v2', api: str = 'v2',
headers: dict = {}, headers: dict = {},
) -> Any: ) -> Any:
''' '''
Generic request wrapper for Kucoin API Generic request wrapper for Kucoin API
@ -308,19 +284,14 @@ class Client:
api, api,
) )
req_meth: Callable = getattr( api_url = f'https://api.kucoin.com/api/{api}/{endpoint}'
self._http,
action.lower(), res = await asks.request(action, api_url, headers=headers)
)
res = await req_meth( json = res.json()
url=f'/{api}/{endpoint}', if 'data' in json:
headers=headers, return json['data']
)
json: dict = res.json()
if (data := json.get('data')) is not None:
return data
else: else:
api_url: str = self._http.base_url
log.error( log.error(
f'Error making request to {api_url} ->\n' f'Error making request to {api_url} ->\n'
f'{pformat(res)}' f'{pformat(res)}'
@ -378,8 +349,8 @@ class Client:
currencies: dict[str, Currency] = {} currencies: dict[str, Currency] = {}
entries: list[dict] = await self._request( entries: list[dict] = await self._request(
'GET', 'GET',
endpoint='currencies',
api='v1', api='v1',
endpoint='currencies',
) )
for entry in entries: for entry in entries:
curr = Currency(**entry).copy() curr = Currency(**entry).copy()
@ -395,22 +366,13 @@ class Client:
dict[str, KucoinMktPair], dict[str, KucoinMktPair],
bidict[str, KucoinMktPair], bidict[str, KucoinMktPair],
]: ]:
entries = await self._request( entries = await self._request('GET', 'symbols')
'GET',
endpoint='symbols',
)
log.info(f' {len(entries)} Kucoin market pairs fetched') log.info(f' {len(entries)} Kucoin market pairs fetched')
pairs: dict[str, KucoinMktPair] = {} pairs: dict[str, KucoinMktPair] = {}
fqmes2mktids: bidict[str, str] = bidict() fqmes2mktids: bidict[str, str] = bidict()
for item in entries: for item in entries:
try:
pair = pairs[item['name']] = KucoinMktPair(**item) pair = pairs[item['name']] = KucoinMktPair(**item)
except TypeError as te:
raise TypeError(
'`KucoinMktPair` and reponse fields do not match ??\n'
f'{KucoinMktPair.fields_diff(item)}\n'
) from te
fqmes2mktids[ fqmes2mktids[
item['name'].lower().replace('-', '') item['name'].lower().replace('-', '')
] = pair.name ] = pair.name
@ -605,18 +567,10 @@ def fqme_to_kucoin_sym(
@acm @acm
async def get_client() -> AsyncGenerator[Client, None]: async def get_client() -> AsyncGenerator[Client, None]:
''' client = Client()
Load an API `Client` preconfigured from user settings
''' async with trio.open_nursery() as n:
async with ( n.start_soon(client.get_mkt_pairs)
httpx.AsyncClient(
base_url='https://api.kucoin.com/api',
) as trio_client,
):
client = Client(httpx_client=trio_client)
async with trio.open_nursery() as tn:
tn.start_soon(client.get_mkt_pairs)
await client.get_currencies() await client.get_currencies()
yield client yield client
@ -655,7 +609,7 @@ async def open_ping_task(
await trio.sleep((ping_interval - 1000) / 1000) await trio.sleep((ping_interval - 1000) / 1000)
await ws.send_msg({'id': connect_id, 'type': 'ping'}) await ws.send_msg({'id': connect_id, 'type': 'ping'})
log.warning('Starting ping task for kucoin ws connection') log.info('Starting ping task for kucoin ws connection')
n.start_soon(ping_server) n.start_soon(ping_server)
yield yield
@ -667,14 +621,9 @@ async def open_ping_task(
async def get_mkt_info( async def get_mkt_info(
fqme: str, fqme: str,
) -> tuple[ ) -> tuple[MktPair, KucoinMktPair]:
MktPair,
KucoinMktPair,
]:
''' '''
Query for and return both a `piker.accounting.MktPair` and Query for and return a `MktPair` and `KucoinMktPair`.
`KucoinMktPair` from provided `fqme: str`
(fully-qualified-market-endpoint).
''' '''
async with open_cached_client('kucoin') as client: async with open_cached_client('kucoin') as client:
@ -749,8 +698,6 @@ async def stream_quotes(
log.info(f'Starting up quote stream(s) for {symbols}') log.info(f'Starting up quote stream(s) for {symbols}')
for sym_str in symbols: for sym_str in symbols:
mkt: MktPair
pair: KucoinMktPair
mkt, pair = await get_mkt_info(sym_str) mkt, pair = await get_mkt_info(sym_str)
init_msgs.append( init_msgs.append(
FeedInit(mkt_info=mkt) FeedInit(mkt_info=mkt)
@ -758,11 +705,7 @@ async def stream_quotes(
ws: NoBsWs ws: NoBsWs
token, ping_interval = await client._get_ws_token() token, ping_interval = await client._get_ws_token()
log.info('API reported ping_interval: {ping_interval}\n') connect_id = str(uuid4())
connect_id: str = str(uuid4())
typ: str
quote: dict
async with ( async with (
open_autorecon_ws( open_autorecon_ws(
( (
@ -776,37 +719,20 @@ async def stream_quotes(
), ),
) as ws, ) as ws,
open_ping_task(ws, ping_interval, connect_id), open_ping_task(ws, ping_interval, connect_id),
aclosing( aclosing(stream_messages(ws, sym_str)) as msg_gen,
iter_normed_quotes(
ws, sym_str
)
) as iter_quotes,
): ):
typ, quote = await anext(iter_quotes) typ, quote = await anext(msg_gen)
while typ != 'trade':
# take care to not unblock here until we get a real # take care to not unblock here until we get a real
# trade quote? # trade quote
# ^TODO, remove this right? typ, quote = await anext(msg_gen)
# -[ ] what often blocks chart boot/new-feed switching
# since we'ere waiting for a live quote instead of just
# loading history afap..
# |_ XXX, not sure if we require a bit of rework to core
# feed init logic or if backends justg gotta be
# changed up.. feel like there was some causality
# dilema prolly only seen with IB too..
# while typ != 'trade':
# typ, quote = await anext(iter_quotes)
task_status.started((init_msgs, quote)) task_status.started((init_msgs, quote))
feed_is_live.set() feed_is_live.set()
# XXX NOTE, DO NOT include the `.<backend>` suffix! async for typ, msg in msg_gen:
# OW the sampling loop will not broadcast correctly.. await send_chan.send({sym_str: msg})
# since `bus._subscribers.setdefault(bs_fqme, set())`
# is used inside `.data.open_feed_bus()` !!!
topic: str = mkt.bs_fqme
async for typ, quote in iter_quotes:
await send_chan.send({topic: quote})
@acm @acm
@ -861,7 +787,7 @@ async def subscribe(
) )
async def iter_normed_quotes( async def stream_messages(
ws: NoBsWs, ws: NoBsWs,
sym: str, sym: str,
@ -892,9 +818,6 @@ async def iter_normed_quotes(
yield 'trade', { yield 'trade', {
'symbol': sym, 'symbol': sym,
# TODO, is 'last' even used elsewhere/a-good
# semantic? can't we just read the ticks with our
# .data.ticktools.frame_ticks()`/
'last': trade_data.price, 'last': trade_data.price,
'brokerd_ts': last_trade_ts, 'brokerd_ts': last_trade_ts,
'ticks': [ 'ticks': [
@ -987,7 +910,7 @@ async def open_history_client(
if end_dt is None: if end_dt is None:
inow = round(time.time()) inow = round(time.time())
log.debug( print(
f'difference in time between load and processing' f'difference in time between load and processing'
f'{inow - times[-1]}' f'{inow - times[-1]}'
) )

View File

@ -1,49 +0,0 @@
piker.clearing
______________
trade execution-n-control subsys for both live and paper trading as
well as algo-trading manual override/interaction across any backend
broker and data provider.
avail UIs
*********
order ctl
---------
the `piker.clearing` subsys is exposed mainly though
the `piker chart` GUI as a "chart trader" style UX and
is automatically enabled whenever a chart is opened.
.. ^TODO, more prose here!
the "manual" order control features are exposed via the
`piker.ui.order_mode` API and can pretty much always be
used (at least) in simulated-trading mode, aka "paper"-mode, and
the micro-manual is as follows:
``order_mode`` (
edge triggered activation by any of the following keys,
``mouse-click`` on y-level to submit at that price
):
- ``f``/ ``ctl-f`` to stage buy
- ``d``/ ``ctl-d`` to stage sell
- ``a`` to stage alert
``search_mode`` (
``ctl-l`` or ``ctl-space`` to open,
``ctl-c`` or ``ctl-space`` to close
) :
- begin typing to have symbol search automatically lookup
symbols from all loaded backend (broker) providers
- arrow keys and mouse click to navigate selection
- vi-like ``ctl-[hjkl]`` for navigation
position (pp) mgmt
------------------
you can also configure your position allocation limits from the
sidepane.
.. ^TODO, explain and provide tut once more refined!

View File

@ -104,15 +104,14 @@ def get_app_dir(
# `tractor`) with the testing dir and check for it whenever we # `tractor`) with the testing dir and check for it whenever we
# detect `pytest` is being used (which it isn't under normal # detect `pytest` is being used (which it isn't under normal
# operation). # operation).
# if "pytest" in sys.modules: if "pytest" in sys.modules:
# import tractor import tractor
# actor = tractor.current_actor(err_on_no_runtime=False) actor = tractor.current_actor(err_on_no_runtime=False)
# if actor: # runtime is up if actor: # runtime is up
# rvs = tractor._state._runtime_vars rvs = tractor._state._runtime_vars
# import pdbp; pdbp.set_trace() testdirpath = Path(rvs['piker_vars']['piker_test_dir'])
# testdirpath = Path(rvs['piker_vars']['piker_test_dir']) assert testdirpath.exists(), 'piker test harness might be borked!?'
# assert testdirpath.exists(), 'piker test harness might be borked!?' app_name = str(testdirpath)
# app_name = str(testdirpath)
if platform.system() == 'Windows': if platform.system() == 'Windows':
key = "APPDATA" if roaming else "LOCALAPPDATA" key = "APPDATA" if roaming else "LOCALAPPDATA"

View File

@ -273,7 +273,7 @@ async def _reconnect_forever(
nobsws._connected.set() nobsws._connected.set()
await trio.sleep_forever() await trio.sleep_forever()
except HandshakeError: except HandshakeError:
log.exception('Retrying connection') log.exception(f'Retrying connection')
# ws & nursery block ends # ws & nursery block ends
@ -359,8 +359,8 @@ async def open_autorecon_ws(
''' '''
JSONRPC response-request style machinery for transparent multiplexing JSONRPC response-request style machinery for transparent multiplexing of msgs
of msgs over a `NoBsWs`. over a NoBsWs.
''' '''
@ -377,82 +377,43 @@ async def open_jsonrpc_session(
url: str, url: str,
start_id: int = 0, start_id: int = 0,
response_type: type = JSONRPCResult, response_type: type = JSONRPCResult,
msg_recv_timeout: float = float('inf'), request_type: Optional[type] = None,
# ^NOTE, since only `deribit` is using this jsonrpc stuff atm request_hook: Optional[Callable] = None,
# and options mkts are generally "slow moving".. error_hook: Optional[Callable] = None,
#
# FURTHER if we break the underlying ws connection then since we
# don't pass a `fixture` to the task that manages `NoBsWs`, i.e.
# `_reconnect_forever()`, the jsonrpc "transport pipe" get's
# broken and never restored with wtv init sequence is required to
# re-establish a working req-resp session.
) -> Callable[[str, dict], dict]: ) -> Callable[[str, dict], dict]:
'''
Init a json-RPC-over-websocket connection to the provided `url`.
A `json_rpc: Callable[[str, dict], dict` is delivered to the
caller for sending requests and a bg-`trio.Task` handles
processing of response msgs including error reporting/raising in
the parent/caller task.
'''
# NOTE, store all request msgs so we can raise errors on the
# caller side!
req_msgs: dict[int, dict] = {}
async with ( async with (
trio.open_nursery() as tn, trio.open_nursery() as n,
open_autorecon_ws( open_autorecon_ws(url) as ws
url=url,
msg_recv_timeout=msg_recv_timeout,
) as ws
): ):
rpc_id: Iterable[int] = count(start_id) rpc_id: Iterable = count(start_id)
rpc_results: dict[int, dict] = {} rpc_results: dict[int, dict] = {}
async def json_rpc( async def json_rpc(method: str, params: dict) -> dict:
method: str,
params: dict,
) -> dict:
''' '''
perform a json rpc call and wait for the result, raise exception in perform a json rpc call and wait for the result, raise exception in
case of error field present on response case of error field present on response
''' '''
nonlocal req_msgs
req_id: int = next(rpc_id)
msg = { msg = {
'jsonrpc': '2.0', 'jsonrpc': '2.0',
'id': req_id, 'id': next(rpc_id),
'method': method, 'method': method,
'params': params 'params': params
} }
_id = msg['id'] _id = msg['id']
result = rpc_results[_id] = { rpc_results[_id] = {
'result': None, 'result': None,
'error': None, 'event': trio.Event()
'event': trio.Event(), # signal caller resp arrived
} }
req_msgs[_id] = msg
await ws.send_msg(msg) await ws.send_msg(msg)
# wait for reponse before unblocking requester code
await rpc_results[_id]['event'].wait() await rpc_results[_id]['event'].wait()
if (maybe_result := result['result']): ret = rpc_results[_id]['result']
ret = maybe_result
del rpc_results[_id]
else: del rpc_results[_id]
err = result['error']
raise Exception(
f'JSONRPC request failed\n'
f'req: {msg}\n'
f'resp: {err}\n'
)
if ret.error is not None: if ret.error is not None:
raise Exception(json.dumps(ret.error, indent=4)) raise Exception(json.dumps(ret.error, indent=4))
@ -467,7 +428,6 @@ async def open_jsonrpc_session(
the server side. the server side.
''' '''
nonlocal req_msgs
async for msg in ws: async for msg in ws:
match msg: match msg:
case { case {
@ -491,28 +451,19 @@ async def open_jsonrpc_session(
'params': _, 'params': _,
}: }:
log.debug(f'Recieved\n{msg}') log.debug(f'Recieved\n{msg}')
if request_hook:
await request_hook(request_type(**msg))
case { case {
'error': error 'error': error
}: }:
# retreive orig request msg, set error log.warning(f'Recieved\n{error}')
# response in original "result" msg, if error_hook:
# THEN FINALLY set the event to signal caller await error_hook(response_type(**msg))
# to raise the error in the parent task.
req_id: int = error['id']
req_msg: dict = req_msgs[req_id]
result: dict = rpc_results[req_id]
result['error'] = error
result['event'].set()
log.error(
f'JSONRPC request failed\n'
f'req: {req_msg}\n'
f'resp: {error}\n'
)
case _: case _:
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
tn.start_soon(recv_task) n.start_soon(recv_task)
yield json_rpc yield json_rpc
tn.cancel_scope.cancel() n.cancel_scope.cancel()

View File

@ -386,8 +386,6 @@ def ldshm(
open_annot_ctl() as actl, open_annot_ctl() as actl,
): ):
shm_df: pl.DataFrame | None = None shm_df: pl.DataFrame | None = None
tf2aids: dict[float, dict] = {}
for ( for (
shmfile, shmfile,
shm, shm,
@ -528,17 +526,16 @@ def ldshm(
new_df, new_df,
step_gaps, step_gaps,
) )
# last chance manual overwrites in REPL # last chance manual overwrites in REPL
# await tractor.pause() await tractor.pause()
assert aids assert aids
tf2aids[period_s] = aids
else: else:
# allow interaction even when no ts problems. # allow interaction even when no ts problems.
assert not diff
await tractor.pause() await tractor.pause()
log.info('Exiting TSP shm anal-izer!') # assert not diff
if shm_df is None: if shm_df is None:
log.error( log.error(

View File

@ -161,13 +161,7 @@ class NativeStorageClient:
def index_files(self): def index_files(self):
for path in self._datadir.iterdir(): for path in self._datadir.iterdir():
if ( if path.name in {'borked', 'expired',}:
path.is_dir()
or
'.parquet' not in str(path)
# or
# path.name in {'borked', 'expired',}
):
continue continue
key: str = path.name.rstrip('.parquet') key: str = path.name.rstrip('.parquet')

View File

@ -44,10 +44,8 @@ import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from pendulum import ( from pendulum import (
Interval,
DateTime, DateTime,
Duration, Duration,
duration as mk_duration,
from_timestamp, from_timestamp,
) )
import numpy as np import numpy as np
@ -216,8 +214,7 @@ async def maybe_fill_null_segments(
# pair, immediately stop backfilling? # pair, immediately stop backfilling?
if ( if (
start_dt start_dt
and and end_dt < start_dt
end_dt < start_dt
): ):
await tractor.pause() await tractor.pause()
break break
@ -265,7 +262,6 @@ async def maybe_fill_null_segments(
except tractor.ContextCancelled: except tractor.ContextCancelled:
# log.exception # log.exception
await tractor.pause() await tractor.pause()
raise
null_segs_detected.set() null_segs_detected.set()
# RECHECK for more null-gaps # RECHECK for more null-gaps
@ -353,7 +349,7 @@ async def maybe_fill_null_segments(
async def start_backfill( async def start_backfill(
get_hist, get_hist,
def_frame_duration: Duration, frame_types: dict[str, Duration] | None,
mod: ModuleType, mod: ModuleType,
mkt: MktPair, mkt: MktPair,
shm: ShmArray, shm: ShmArray,
@ -383,23 +379,22 @@ async def start_backfill(
update_start_on_prepend: bool = False update_start_on_prepend: bool = False
if backfill_until_dt is None: if backfill_until_dt is None:
# TODO: per-provider default history-durations? # TODO: drop this right and just expose the backfill
# -[ ] inside the `open_history_client()` config allow # limits inside a [storage] section in conf.toml?
# declaring the history duration limits instead of # when no tsdb "last datum" is provided, we just load
# guessing and/or applying the same limits to all? # some near-term history.
# # periods = {
# -[ ] allow declaring (default) per-provider backfill # 1: {'days': 1},
# limits inside a [storage] sub-section in conf.toml? # 60: {'days': 14},
# # }
# NOTE, when no tsdb "last datum" is provided, we just
# load some near-term history by presuming a "decently # do a decently sized backfill and load it into storage.
# large" 60s duration limit and a much shorter 1s range.
periods = { periods = {
1: {'days': 2}, 1: {'days': 2},
60: {'years': 6}, 60: {'years': 6},
} }
period_duration: int = periods[timeframe] period_duration: int = periods[timeframe]
update_start_on_prepend: bool = True update_start_on_prepend = True
# NOTE: manually set the "latest" datetime which we intend to # NOTE: manually set the "latest" datetime which we intend to
# backfill history "until" so as to adhere to the history # backfill history "until" so as to adhere to the history
@ -421,6 +416,7 @@ async def start_backfill(
f'backfill_until_dt: {backfill_until_dt}\n' f'backfill_until_dt: {backfill_until_dt}\n'
f'last_start_dt: {last_start_dt}\n' f'last_start_dt: {last_start_dt}\n'
) )
try: try:
( (
array, array,
@ -430,114 +426,71 @@ async def start_backfill(
timeframe, timeframe,
end_dt=last_start_dt, end_dt=last_start_dt,
) )
except NoData as _daterr: except NoData as _daterr:
orig_last_start_dt: datetime = last_start_dt # 3 cases:
gap_report: str = ( # - frame in the middle of a legit venue gap
f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n' # - history actually began at the `last_start_dt`
f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n' # - some other unknown error (ib blocking the
f'last_start_dt: {orig_last_start_dt}\n\n' # history bc they don't want you seeing how they
f'bf_until: {backfill_until_dt}\n' # cucked all the tinas..)
if dur := frame_types.get(timeframe):
# decrement by a frame's worth of duration and
# retry a few times.
last_start_dt.subtract(
seconds=dur.total_seconds()
) )
# EMPTY FRAME signal with 3 (likely) causes: log.warning(
# f'{mod.name} -> EMPTY FRAME for end_dt?\n'
# 1. range contains legit gap in venue history f'tf@fqme: {timeframe}@{mkt.fqme}\n'
# 2. history actually (edge case) **began** at the 'bf_until <- last_start_dt:\n'
# value `last_start_dt` f'{backfill_until_dt} <- {last_start_dt}\n'
# 3. some other unknown error (ib blocking the f'Decrementing `end_dt` by {dur} and retry..\n'
# history-query bc they don't want you seeing how
# they cucked all the tinas.. like with options
# hist)
#
if def_frame_duration:
# decrement by a duration's (frame) worth of time
# as maybe indicated by the backend to see if we
# can get older data before this possible
# "history gap".
last_start_dt: datetime = last_start_dt.subtract(
seconds=def_frame_duration.total_seconds()
) )
gap_report += (
f'Decrementing `end_dt` and retrying with,\n'
f'def_frame_duration: {def_frame_duration}\n'
f'(new) last_start_dt: {last_start_dt}\n'
)
log.warning(gap_report)
# skip writing to shm/tsdb and try the next
# duration's worth of prior history.
continue continue
else:
# await tractor.pause()
raise DataUnavailable(gap_report)
# broker says there never was or is no more history to pull # broker says there never was or is no more history to pull
except DataUnavailable as due: except DataUnavailable:
message: str = due.args[0]
log.warning( log.warning(
f'Provider {mod.name!r} halted backfill due to,\n\n' f'NO-MORE-DATA in range?\n'
f'`{mod.name}` halted history:\n'
f'{message}\n' f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'fqme: {mkt.fqme}\n' f'{backfill_until_dt} <- {last_start_dt}\n'
f'timeframe: {timeframe}\n'
f'last_start_dt: {last_start_dt}\n'
f'bf_until: {backfill_until_dt}\n'
) )
# UGH: what's a better way?
# TODO: backends are responsible for being correct on # ugh, what's a better way?
# this right!? # TODO: fwiw, we probably want a way to signal a throttle
# -[ ] in the `ib` case we could maybe offer some way # condition (eg. with ib) so that we can halt the
# to halt the request loop until the condition is # request loop until the condition is resolved?
# resolved or should the backend be entirely in if timeframe > 1:
# charge of solving such faults? yes, right? await tractor.pause()
return return
time: np.ndarray = array['time']
assert ( assert (
time[0] array['time'][0]
== ==
next_start_dt.timestamp() next_start_dt.timestamp()
) )
assert time[-1] == next_end_dt.timestamp() diff = last_start_dt - next_start_dt
frame_time_diff_s = diff.seconds
expected_dur: Interval = last_start_dt - next_start_dt
# frame's worth of sample-period-steps, in seconds # frame's worth of sample-period-steps, in seconds
frame_size_s: float = len(array) * timeframe frame_size_s: float = len(array) * timeframe
recv_frame_dur: Duration = ( expected_frame_size_s: float = frame_size_s + timeframe
from_timestamp(array[-1]['time']) if frame_time_diff_s > expected_frame_size_s:
-
from_timestamp(array[0]['time'])
)
if (
(lt_frame := (recv_frame_dur < expected_dur))
or
(null_frame := (frame_size_s == 0))
# ^XXX, should NEVER hit now!
):
# XXX: query result includes a start point prior to our # XXX: query result includes a start point prior to our
# expected "frame size" and thus is likely some kind of # expected "frame size" and thus is likely some kind of
# history gap (eg. market closed period, outage, etc.) # history gap (eg. market closed period, outage, etc.)
# so just report it to console for now. # so just report it to console for now.
if lt_frame:
reason = 'Possible GAP (or first-datum)'
else:
assert null_frame
reason = 'NULL-FRAME'
missing_dur: Interval = expected_dur.end - recv_frame_dur.end
log.warning( log.warning(
f'{timeframe}s-series {reason} detected!\n' 'GAP DETECTED:\n'
f'fqme: {mkt.fqme}\n' f'last_start_dt: {last_start_dt}\n'
f'last_start_dt: {last_start_dt}\n\n' f'diff: {diff}\n'
f'recv interval: {recv_frame_dur}\n' f'frame_time_diff_s: {frame_time_diff_s}\n'
f'expected interval: {expected_dur}\n\n'
f'Missing duration of history of {missing_dur.in_words()!r}\n'
f'{missing_dur}\n'
) )
# await tractor.pause()
to_push = diff_history( to_push = diff_history(
array, array,
@ -612,27 +565,22 @@ async def start_backfill(
# long-term storage. # long-term storage.
if ( if (
storage is not None storage is not None
and and write_tsdb
write_tsdb
): ):
log.info( log.info(
f'Writing {ln} frame to storage:\n' f'Writing {ln} frame to storage:\n'
f'{next_start_dt} -> {last_start_dt}' f'{next_start_dt} -> {last_start_dt}'
) )
# NOTE, always drop the src asset token for # always drop the src asset token for
# non-currency-pair like market types (for now) # non-currency-pair like market types (for now)
#
# THAT IS, for now our table key schema is NOT
# including the dst[/src] source asset token. SO,
# 'tsla.nasdaq.ib' over 'tsla/usd.nasdaq.ib' for
# historical reasons ONLY.
if mkt.dst.atype not in { if mkt.dst.atype not in {
'crypto', 'crypto',
'crypto_currency', 'crypto_currency',
'fiat', # a "forex pair" 'fiat', # a "forex pair"
'perpetual_future', # stupid "perps" from cex land
}: }:
# for now, our table key schema is not including
# the dst[/src] source asset token.
col_sym_key: str = mkt.get_fqme( col_sym_key: str = mkt.get_fqme(
delim_char='', delim_char='',
without_src=True, without_src=True,
@ -737,7 +685,7 @@ async def back_load_from_tsdb(
last_tsdb_dt last_tsdb_dt
and latest_start_dt and latest_start_dt
): ):
backfilled_size_s: Duration = ( backfilled_size_s = (
latest_start_dt - last_tsdb_dt latest_start_dt - last_tsdb_dt
).seconds ).seconds
# if the shm buffer len is not large enough to contain # if the shm buffer len is not large enough to contain
@ -960,8 +908,6 @@ async def tsdb_backfill(
f'{pformat(config)}\n' f'{pformat(config)}\n'
) )
# concurrently load the provider's most-recent-frame AND any
# pre-existing tsdb history already saved in `piker` storage.
dt_eps: list[DateTime, DateTime] = [] dt_eps: list[DateTime, DateTime] = []
async with trio.open_nursery() as tn: async with trio.open_nursery() as tn:
tn.start_soon( tn.start_soon(
@ -972,6 +918,7 @@ async def tsdb_backfill(
timeframe, timeframe,
config, config,
) )
tsdb_entry: tuple = await load_tsdb_hist( tsdb_entry: tuple = await load_tsdb_hist(
storage, storage,
mkt, mkt,
@ -1000,25 +947,6 @@ async def tsdb_backfill(
mr_end_dt, mr_end_dt,
) = dt_eps ) = dt_eps
first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds
calced_frame_size: Duration = mk_duration(
seconds=first_frame_dur_s,
)
# NOTE, attempt to use the backend declared default frame
# sizing (as allowed by their time-series query APIs) and
# if not provided try to construct a default from the
# first frame received above.
def_frame_durs: dict[
int,
Duration,
]|None = config.get('frame_types', None)
if def_frame_durs:
def_frame_size: Duration = def_frame_durs[timeframe]
assert def_frame_size == calced_frame_size
else:
# use what we calced from first frame above.
def_frame_size = calced_frame_size
# NOTE: when there's no offline data, there's 2 cases: # NOTE: when there's no offline data, there's 2 cases:
# - data backend doesn't support timeframe/sample # - data backend doesn't support timeframe/sample
# period (in which case `dt_eps` should be `None` and # period (in which case `dt_eps` should be `None` and
@ -1049,7 +977,7 @@ async def tsdb_backfill(
partial( partial(
start_backfill, start_backfill,
get_hist=get_hist, get_hist=get_hist,
def_frame_duration=def_frame_size, frame_types=config.get('frame_types', None),
mod=mod, mod=mod,
mkt=mkt, mkt=mkt,
shm=shm, shm=shm,

View File

@ -616,18 +616,6 @@ def detect_price_gaps(
# ]) # ])
... ...
# TODO: probably just use the null_segs impl above?
def detect_vlm_gaps(
df: pl.DataFrame,
col: str = 'volume',
) -> pl.DataFrame:
vnull: pl.DataFrame = w_dts.filter(
pl.col(col) == 0
)
return vnull
def dedupe( def dedupe(
src_df: pl.DataFrame, src_df: pl.DataFrame,
@ -638,6 +626,7 @@ def dedupe(
) -> tuple[ ) -> tuple[
pl.DataFrame, # with dts pl.DataFrame, # with dts
pl.DataFrame, # gaps
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal) pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
int, # len diff between input and deduped int, # len diff between input and deduped
]: ]:
@ -650,22 +639,19 @@ def dedupe(
''' '''
wdts: pl.DataFrame = with_dts(src_df) wdts: pl.DataFrame = with_dts(src_df)
deduped = wdts
# remove duplicated datetime samples/sections
deduped: pl.DataFrame = wdts.unique(
# subset=['dt'],
subset=['time'],
maintain_order=True,
)
# maybe sort on any time field # maybe sort on any time field
if sort: if sort:
deduped = deduped.sort(by='time') wdts = wdts.sort(by='time')
# TODO: detect out-of-order segments which were corrected! # TODO: detect out-of-order segments which were corrected!
# -[ ] report in log msg # -[ ] report in log msg
# -[ ] possibly return segment sections which were moved? # -[ ] possibly return segment sections which were moved?
# remove duplicated datetime samples/sections
deduped: pl.DataFrame = wdts.unique(
subset=['dt'],
maintain_order=True,
)
diff: int = ( diff: int = (
wdts.height wdts.height
- -

View File

@ -15,17 +15,123 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
[build-system] [build-system]
requires = ["hatchling"] requires = ["poetry-core"]
build-backend = "hatchling.build" build-backend = "poetry.core.masonry.api"
# ------ - ------
[tool.ruff.lint]
# https://docs.astral.sh/ruff/settings/#lint_ignore
ignore = []
# https://docs.astral.sh/ruff/settings/#lint_per-file-ignores
"piker/ui/qt.py" = [
"E402",
'F401', # unused imports (without __all__ or blah as blah)
# "F841", # unused variable rules
]
# ignore-init-module-imports = false
# ------ - ------
[tool.poetry]
name = "piker"
version = "0.1.0.alpha0.dev0"
description = "trading gear for hackers"
authors = ["Tyler Goodlet <goodboy_foss@protonmail.com>"]
license = "AGPLv3"
readme = "README.rst"
# ------ - ------
[tool.poetry.dependencies]
async-generator = "^1.10"
attrs = "^23.1.0"
bidict = "^0.22.1"
colorama = "^0.4.6"
colorlog = "^6.7.0"
cython = "^3.0.0"
greenback = "^1.1.1"
ib-insync = "^0.9.86"
msgspec = "^0.18.0"
numba = "^0.59.0"
numpy = "^1.25"
polars = "^0.18.13"
pygments = "^2.16.1"
python = ">=3.11, <3.13"
rich = "^13.5.2"
# setuptools = "^68.0.0"
tomli = "^2.0.1"
tomli-w = "^1.0.0"
trio-util = "^0.7.0"
trio-websocket = "^0.10.3"
typer = "^0.9.0"
rapidfuzz = "^3.5.2"
pdbp = "^1.5.0"
trio = "^0.24"
pendulum = "^3.0.0"
httpx = "^0.27.0"
[tool.poetry.dependencies.tractor]
develop = true
git = 'https://github.com/goodboy/tractor.git'
branch = 'asyncio_debugger_support'
# path = "../tractor"
[tool.poetry.dependencies.asyncvnc]
git = 'https://github.com/pikers/asyncvnc.git'
branch = 'main'
[tool.poetry.dependencies.tomlkit]
develop = true
git = 'https://github.com/pikers/tomlkit.git'
branch = 'piker_pin'
# path = "../tomlkit/"
[tool.poetry.group.uis]
optional = true
[tool.poetry.group.uis.dependencies]
# https://python-poetry.org/docs/managing-dependencies/#dependency-groups
# TODO: make sure the levenshtein shit compiles on nix..
# rapidfuzz = {extras = ["speedup"], version = "^0.18.0"}
rapidfuzz = "^3.2.0"
qdarkstyle = ">=3.0.2"
pyqtgraph = { git = 'https://github.com/pikers/pyqtgraph.git' }
# ------ - ------
pyqt6 = "^6.7.0"
[tool.poetry.group.dev]
optional = true
[tool.poetry.group.dev.dependencies]
# testing / CI
pytest = "^6.0.0"
elasticsearch = "^8.9.0"
xonsh = "^0.14.2"
prompt-toolkit = "3.0.40"
# console ehancements and eventually remote debugging
# extras/helpers.
# TODO: add a toolset that makes debugging a `pikerd` service
# (tree) easy to hack on directly using more or less the local env:
# - xonsh + xxh
# - rsyscall + pdbp
# - actor runtime control console like BEAM/OTP
# ------ - ------
# TODO: add an `--only daemon` group for running non-ui / pikerd
# service tree in distributed mode B)
# https://python-poetry.org/docs/managing-dependencies/#installing-group-dependencies
# [tool.poetry.group.daemon.dependencies]
[tool.poetry.scripts]
piker = 'piker.cli:cli'
pikerd = 'piker.cli:pikerd'
ledger = 'piker.accounting.cli:ledger'
[project] [project]
name = "piker"
version = "0.1.0a0dev0"
description = "trading gear for hackers"
authors = [{ name = "Tyler Goodlet", email = "goodboy_foss@protonmail.com" }]
requires-python = ">=3.12, <3.13"
license = "AGPL-3.0-or-later"
readme = "README.rst"
keywords=[ keywords=[
"async", "async",
"trading", "trading",
@ -34,100 +140,15 @@ keywords = [
"charting", "charting",
] ]
classifiers=[ classifiers=[
"Development Status :: 3 - Alpha", 'Development Status :: 3 - Alpha',
"License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)", "License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)",
"Operating System :: POSIX :: Linux", 'Operating System :: POSIX :: Linux',
"Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: 3 :: Only", "Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.12",
"Intended Audience :: Financial and Insurance Industry", 'Intended Audience :: Financial and Insurance Industry',
"Intended Audience :: Science/Research", 'Intended Audience :: Science/Research',
"Intended Audience :: Developers", 'Intended Audience :: Developers',
"Intended Audience :: Education", 'Intended Audience :: Education',
] ]
dependencies = [
"async-generator >=1.10, <2.0.0",
"attrs >=23.1.0, <24.0.0",
"bidict >=0.22.1, <0.23.0",
"colorama >=0.4.6, <0.5.0",
"colorlog >=6.7.0, <7.0.0",
"ib-insync >=0.9.86, <0.10.0",
"numba >=0.59.0, <0.60.0",
"numpy >=1.25, <2.0",
"polars >=0.18.13, <0.19.0",
"pygments >=2.16.1, <3.0.0",
"rich >=13.5.2, <14.0.0",
"tomli >=2.0.1, <3.0.0",
"tomli-w >=1.0.0, <2.0.0",
"trio-util >=0.7.0, <0.8.0",
"trio-websocket >=0.10.3, <0.11.0",
"typer >=0.9.0, <1.0.0",
"rapidfuzz >=3.5.2, <4.0.0",
"pdbp >=1.5.0, <2.0.0",
"trio >=0.24, <0.25",
"pendulum >=3.0.0, <4.0.0",
"httpx >=0.27.0, <0.28.0",
"cryptofeed >=2.4.0, <3.0.0",
"pyarrow >=17.0.0, <18.0.0",
"websockets ==12.0",
"msgspec",
"tractor",
"asyncvnc",
"tomlkit",
]
[project.optional-dependencies]
uis = [
# https://docs.astral.sh/uv/concepts/projects/dependencies/#optional-dependencies
# TODO: make sure the levenshtein shit compiles on nix..
# rapidfuzz = {extras = ["speedup"], version = "^0.18.0"}
"rapidfuzz >=3.2.0, <4.0.0",
"qdarkstyle >=3.0.2, <4.0.0",
"pyqt6 >=6.7.0, <7.0.0",
"pyqtgraph",
# for consideration,
# - 'visidata'
# TODO: add an `--only daemon` group for running non-ui / pikerd
# service tree in distributed mode B)
# https://docs.astral.sh/uv/concepts/projects/dependencies/#optional-dependencies
]
[dependency-groups]
# TODO: a toolset that makes debugging a `pikerd` service (tree) easy
# to hack on directly using more or less the local env:
# - xonsh + xxh
# - rsyscall + pdbp
# - actor runtime control console like BEAM/OTP
#
# console ehancements and eventually remote debugging extras/helpers.
# use `uv --dev` to enable
dev = [
"pytest >=6.0.0, <7.0.0",
"elasticsearch >=8.9.0, <9.0.0",
"xonsh >=0.14.2, <0.15.0",
"prompt-toolkit ==3.0.40",
"cython >=3.0.0, <4.0.0",
"greenback >=1.1.1, <2.0.0",
"ruff>=0.9.6",
]
[project.scripts]
piker = "piker.cli:cli"
pikerd = "piker.cli:pikerd"
ledger = "piker.accounting.cli:ledger"
[tool.hatch.build.targets.sdist]
include = ["piker"]
[tool.hatch.build.targets.wheel]
include = ["piker"]
[tool.uv.sources]
pyqtgraph = { git = "https://github.com/pikers/pyqtgraph.git" }
asyncvnc = { git = "https://github.com/pikers/asyncvnc.git", branch = "main" }
tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" }
msgspec = { git = "https://github.com/jcrist/msgspec.git" }
tractor = { path = "../tractor", editable = true }

View File

@ -1,93 +0,0 @@
# from default `ruff.toml` @
# https://docs.astral.sh/ruff/configuration/
# Exclude a variety of commonly ignored directories.
exclude = [
".bzr",
".direnv",
".eggs",
".git",
".git-rewrite",
".hg",
".ipynb_checkpoints",
".mypy_cache",
".nox",
".pants.d",
".pyenv",
".pytest_cache",
".pytype",
".ruff_cache",
".svn",
".tox",
".venv",
".vscode",
"__pypackages__",
"_build",
"buck-out",
"build",
"dist",
"node_modules",
"site-packages",
"venv",
]
# Same as Black.
line-length = 88
indent-width = 4
# Assume Python 3.9
target-version = "py312"
# ------ - ------
# TODO, stop warnings around `anext()` builtin use?
# tool.ruff.target-version = "py310"
[lint]
# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default.
# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or
# McCabe complexity (`C901`) by default.
select = ["E4", "E7", "E9", "F"]
ignore = []
ignore-init-module-imports = false
[lint.per-file-ignores]
"piker/ui/qt.py" = [
"E402",
'F401', # unused imports (without __all__ or blah as blah)
# "F841", # unused variable rules
]
# Allow fix for all enabled rules (when `--fix`) is provided.
fixable = ["ALL"]
unfixable = []
# Allow unused variables when underscore-prefixed.
dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
[format]
# Use single quotes in `ruff format`.
quote-style = "single"
# Like Black, indent with spaces, rather than tabs.
indent-style = "space"
# Like Black, respect magic trailing commas.
skip-magic-trailing-comma = false
# Like Black, automatically detect the appropriate line ending.
line-ending = "auto"
# Enable auto-formatting of code examples in docstrings. Markdown,
# reStructuredText code/literal blocks and doctests are all supported.
#
# This is currently disabled by default, but it is planned for this
# to be opt-out in the future.
docstring-code-format = false
# Set the line length limit used when formatting code snippets in
# docstrings.
#
# This only has an effect when the `docstring-code-format` setting is
# enabled.
docstring-code-line-length = "dynamic"

1500
uv.lock

File diff suppressed because it is too large Load Diff