As just added for binance move to using an explicit `.<venue>.kraken`
style for spot markets which makes the current spot symbology expand to
`<PAIR>.SPOT` from the new `Pair.bs_fqme: str`. Reasons for why are
laid out in the equivalent patch for binance. Obviously this also primes
for supporting kraken's futures venue APIs as well 🏄https://docs.futures.kraken.com/#introduction
Detalles:
- add `.spot.kraken` parsing to `get_mkt_info()` so that if the venue
token is not passed by caller we implicitly expand it in.
- change `normalize()` to only return the `quote: dict` not the topic
key.
- rewrite live feed msg loop to use `match:` syntax B)
Since there are indeed multiple futures (perp swaps) contracts including
a set with expiry, we need a way to distinguish through search and
`FutesPair` lookup which contract we're requesting. To solve this extend
the `FutesPair` and `SpotPair` to include a `.bs_fqme` field similar to
`MktPair` and key the `Client._pairs: ChainMap`'s backing tables with
these expanded fqmes. For example the perp swap now expands to
`btcusdt.usdtm.perp` which fills in the venue as `'usdtm'` (the
usd-margined fututes market) and the expiry as `'perp'` (as before).
This allows distinguishing explicitly from, for ex., coin-margined
contracts which could instead (since we haven't added the support yet)
fqmes of the sort `btcusdt.<coin>m.perp.binance` thus making it explicit
and obvious which contract is which B)
Further we interpolate the venue token to `spot` for spot markets going
forward, which again makes cex spot markets explicit in symbology; we'll
need to add this as well to other cex backends ;)
Other misc detalles:
- change USD-M futes `MarketType` key to `'usdtm_futes'`.
- add `Pair.bs_fqme: str` for all pair subtypes with particular
special contract handling for futes including quarterlies, perps and
the weird "DEFI" ones..
- drop `OHLC.bar_wap` since it's no longer in the default time-series
schema and we weren't filling it in here anyway..
- `Client._pairs: ChainMap` is now a read-only fqme-re-keyed view into
the underlying pairs tables (which themselves are ideally keyed
identically cross-venue) which we populate inside `Client.exch_info()`
which itself now does concurrent pairs info fetching via a new
`._cache_pairs()` using a `trio` task per API-venue.
- support klines history query across all venues using same
`Client.mkt_mode_req[Client.mkt_mode]` style as we're doing for
`.exch_info()` B)
- use the venue specific klines history query limits where documented.
- handle new FQME venue / expiry fields inside `get_mkt_info()` ep such
that again the correct `Client.mkt_mode` is selected based on parsing
the desired spot vs. derivative contract.
- do venue-specific-WSS-addr lookup based on output from
`get_mkt_info()`; use usdtm venue WSS addr if a `FutesPair` is loaded.
- set `topic: str` to the `.bs_fqme` value in live feed quotes!
- use `Pair.bs_fqme: str` values for fuzzy-search input set.
The beginning of supporting multi-markets through a common API client.
Change to futes market mode in the client if `.perp.` is matched in the
fqme. Currently the exchange info and live feed ws impl will swap out
for their usd-margin futures market equivalent (endpoints).
Not sure why it seemed like futures pairs didn't have this field but add
it to the parent `Pair` type as well as drop the overridden
`.price/size_tick` fields instead doing the same as in spot as well.
Also moves the `MarketType: Literal` (for the `Client.mkt_mode: str`)
and adds a pair type lookup table for exchange info loading.
Add the usd-futes "Pair" type and thus ability to load all exchange
(info for) contracts settled in USDT. Luckily we don't seem to have to
modify anything in the `Client` interface (yet) other then a new
`.mkt_mode: str` which determines which endpoint set to make requests.
Obviously data received from endpoints will likely need diff handling as
per below.
Deats:
- add a bunch more API and WSS top level domains to `.api` with comments
- start a `.binance.schemas` module to house the structs for loading
different `Pair` subtypes depending on target market: `SpotPair`,
`FutesPair`, .. etc. and implement required `MktPair` fields on the
new futes type for compatibility with the clearing layer.
- add `Client.mkt_mode: str` and a method lookup for endpoint parent
paths depending on market via `.mkt_req: dict`
Also related to live feeds,
- drop `Struct` typecasting instead opting for specific fields both for
speed and simplicity atm.
- breakout `subscribe()` into module level acm from being embedded
closure.
- for now swap over the ws feed to be strictly the futes ep (while
testing) and set the `.mkt_mode = 'usd_futes'`.
- hack in `Client._pairs` to only load `FutesPair`s until we figure out
whether we want separate `Client` instances per market or not..
Instead of having a buncha logic branches for 'get', 'post', etc. just
pass the `method: str` and do a attr lookup on the `asks` sesh.
Also, adjust the `trades_dialogue()` ep to switch to paper mode when no
client API key is detected/loaded.
First draft originally by @guilledk but update by myself 2 years later
xD. Will crash at runtime but at least has the machinery to setup signed
requests for auth-ed endpoints B)
Also adds a generic `NoSignature` error for when credentials are not
present in `brokers.toml` but user is trying to access auth-ed eps with
the client.
Since we only ever want to do incremental y-range calcs based on the
price always skip any tick types emitted by the data daemon which aren't
defined in the fundamental set. Further, toss in a new `debug_n_trade:
bool` toggle which by default turns off all loggin and profiler calls;
if you want to do profiling this has to now be adjusted manually!
Since crypto backends now also may expand an FQME like `xbteur.kraken`
-> `xbteur.spot.kraken` (by filling in the venue token), we need to use
this identifier when looking up per-market order dialogs or submitting
new requests. The simple fix is to simply look up that expanded from
from the `Feed.flumes` table which is always keyed by the `MktPair.fqme:
str` - the expanded form.
This was actually incorrect prior, we were rounding triggered limit
orders with the `.size_tick` value's digits when we should have been
using the `.price_tick` (facepalm). So fix that and compute the rounding
number of digits (as passed to the round(<value>, ndigits=<here>)`
builtin) and store it in the `DarkBook.triggers` tuples so that at
trigger/match time the round call is done *just prior* to msg send to
`brokerd` given the last known live L1 queue price.
Not sure how this lasted so long without complaint (literally since we
added history 1m OHLC it seems; guess it means most backends are pretty
tolerant XD ) but we've been sending 2 cancels per order (dialog) due to
the mirrored lines on each chart: 1s and 1m. This fixes that by
reworking the `OrderMode` methods to be a bit more sane and less
conflated with the graphics (lines) layer.
Deatz:
- add new methods:
- `.oids_from_lines()` line -> oid extraction,
- `.cancel_orders()` which makes the order client cancel requests from
a `oids: list[str]`.
- re-impl `.cancel_all_orders()` and `.cancel_orders_under_cursor()` to
use the above methods thus fixing the original bug B)
Since we want to be able to support user-configurable vnc socketaddrs,
this preps for passing the piker client direct into the vnc hacker
routine so that we can (eventually load) and read the ib brokers config
settings into the client and then read those in the `asyncvnc` task
spawner.
It's been getting setup in the `brokerd` daemon-actor spawn task for
a while now and worker tasks already get a ref to that global log
instance so they don't need to care (in data or trading) task spawn
endpoints.
Also move to the new `open_trade_dialog()` naming for working broker
backends B)
Discovered due to originally having a history loading bug between
btcusdt futes display where the same time series was being loaded into
the graphics system, this avoids the issue where 2 (or more) curves are
measured to have the same dispersion and thus do not get added as unique
entries to the `overlay_table: dict[float, tuple]` during the scaling
phase..
Practically speaking this should never really be a problem if the curves
(and their backing timeseries) are indeed unique but keying the
overlay table by the dispersion and the `Viz` is a minimal performance
hit when looping the sorted table and is a lot nicer then you **do want
to show** duplicate curves then having one overlay just not be ranged
correctly at all XD
Instead of effectively (and poorly) duplicating the trade dialog setup
logic, just use the new helper we exposed in the EMS module B)
Also, handle paper accounts that have no ledger / positions existing.
As part of bringing the brokerd agnostic APIs up to date and modernizing
wrapping CLIs, this adds a new sub-cmd to allow more or less directly
calling the `.get_mkt_info()` broker mod endpoint and dumping the both
the backend specific `Pair`-ish and `.accounting.MktPair` normalized
version to console.
Deatz:
- make the click config's `brokermods` entry a `dict`
- make `.brokers.core.mkt_info()` strip the broker name part from the
input fqme before calling the backend.
Connecting to a `brokerd` daemon's trading dialog via a helper `@acm`
func is handy so that arbitrary trading middleware clients **and** the
ems can setup a trading dialog and, at the least, query existing
position state; this is in fact our immediate need when simply querying
for an account's position status in the `.accounting.cli.ledger` cli.
It's now exposed (for now) as `.clearing._ems.open_brokerd_dialog()` and
is called by the `Router.maybe_open_brokerd_dialog()` for every new
relay allocation or paper-account engine instance.
Changed from the old `store clone` to instead simply load any shm buffer
matching a user provided `FQME: str` pattern; writing to parquet file is
only done if an explicit option flag is passed by user.
Implement new `iter_dfs_from_shms()` generator which allows interatively
loading both 1m and 1s buffers delivering the `Path`, `ShmArray` and
`polars.DataFrame` instances per matching file B)
Also add a todo for a `NativeStorageClient.clear_range()` method.
Also adjust sizing such that the history buffer will backfill the last
six years by default (in 1m OHLC) and the hft buffer will do only 3 days
worth. Also ensure the fsp layer passes the src shm's buffer size when
allocating since the size is now required by allocators in the shm apis.
Avoid unnecessarily re-rendering the wrong (1min OHLC history) chart
and/or other such charts with update tasks listening to the sampler
stream. Instead only redraw in tasks which are updating vizs which match
the actual details of the backfill event.
We can probably also eventually match against a range tuple (emitted in
the msg) and then have the task further only update the formatter layer
unless the range is actually in view?
It's no longer part of the default OHLCV array-buffer schema and just
generally we should be processing and managing **any** non source data
in the FSP subsystem(s) despite it maybe being provided as a default by
some backends.
Explains why stuff always seemed wrong before XD
Previously whenever a time-gappy asset (like a stock due to it's venue
operating hours) was being loaded, we weren't querying for a "durations
worth" of bars and this was causing all sorts of actual gaps in our
data set that shouldn't exist..
Fix that by always attempting to retrieve a min aggregate-time's
worth/duration of bars/datums in the history manager. Actually,
i implemented this in both the feed and api layers for this backend
since it doesn't seem to strictly work just implementing it at the
`Client.bars()` level, not sure why but..
Also, buncha `ruff` linting cleanups and fix the logger nameeee, lel.
For now, just detect and fill in gaps (via fresh backend queries)
*in the shm buffer* but eventually i'm pretty sure we can just write
these direct to the parquet file as well.
Use the new `.data._timeseries.detect_null_time_gap()` to find and fill
in the `ShmArray` index range, re-check it and enter a prompt if it
didn't totally fill.
Also,
- do a massive cleanup and removal of all unused/commented code.
- drop the duplicate frames tracking, don't think we need it after
removing multi-frame concurrent queries.
- change backfill loop variable `end_dt` -> `last_start_dt` which is
more semantically correct.
- fix logic to backfill any missing sub-sequence portion for any frame
query that overruns the shm buffer prependable space by detecting
the available rows left to insert and only push those.
- add a new `shm_push_in_between()` helper to match.
It took a little while (and a lot of commenting out of old no longer
needed code) but, this gets tsdb (from parquet file) loading *before*
final backfilling from the most recent history frame until the most
recent tsdb time stamp!
More or less all the convoluted concurrency shit we had for coping with
`marketstore` IPC junk is no longer needed, particularly all the query
size limits and accompanying load loops.. The recent frame loading
technique/order *has* now changed though since we'd like to show charts
asap once tsdb history loads.
The new load sequence is as follows:
- load mr (most recent) frame from backend.
- load existing history (one shot) from the "tsdb" aka parquet files
with `polars`.
- backfill the gap part from the mr frame back to the tsdb start
incrementally by making (hacky) `ShmArray.push(start=<blah>)` calls
and *not* updating the `._first.value` while doing it XD
Dirtier deatz:
- make `tsdb_backfill()` run per timeframe in a separate task.
- drop all the loop through timeframes and insert `dts_per_tf` crap.
- only spawn a subtask for the `start_backfill()` call which in turn
only does the gap backfilling as mentioned above.
- mask out all the code related to being limited to certain query sizes
(over gRPC) as was restricted by marketstore.. not gonna go through
what all of that was since it's probably getting deleted in a follow
up commit.
- buncha off-by-one tweaks to do with backfilling the gap from mr frame
to tsdb start.. mostly tinkered it to get it all right but seems to be
working correctly B)
- still use the `broadcast_all()` msg stuff when doing the gap backfill
though don't have it really working yet on the UI side (since
previously we were relying on the shm first/last values.. so this will
be "coming soon" :)
For OHLCV time series we normally presume a uniform sampling period
(1s or 60s by default) and it's handy to have tools to ensure a series
is gapless or contains expected gaps based on (legacy) market hours.
For this we leverage `polars`:
- add `.nativedb.with_dts()` a datetime-from-epoch-time-column frame
"column-expander" which inserts datetime-casted, epoch-diff and
dt-diff columns.
- add `.nativedb.detect_time_gaps()` which filters to any larger then
expected sampling period rows.
- wrap the above (for now) in a `piker store anal` (analysis) cmd which
atm always enters a breakpoint for tinkering.
Supporting storage client additions:
- add a `detect_period()` helper for extracting expected OHLC time step.
- add new `NativedbStorageClient` methods and attrs to provide for the above:
- `.mk_path()` to **only** deliver a parquet-file path for use in
other methods.
- `._dfs` to house cached `pl.DataFrame`s loaded from `.parquet` files.
- `.as_df()` which loads cached frames or loads them from disk and
then caches (for next use).
- `_write_ohlcv()` a private-sync version of the public equivalent
meth since we don't currently have any actual async file IO
underneath; add a flag for whether to return as a `numpy.ndarray`.
- drop buncha cruft from `store ls` cmd and make it work for
multi-backend fqme listing.
- including adding an `.address` to the mkts client which shows the
grpc socketaddr details.
- change defauls to new `'nativedb'.
- drop 'marketstore' from built-in backend list (for now)
It was a concurrency-hack mess somewhat due to all sorts of limitations
imposed by marketstore (query size limits, strange datetime/timestamp
errors, slow table loads for large queries..) and we can drastically
simplify. There's still some issues with getting new backfills (not yet
in storage) correctly prepended: there's sometimes little gaps due to shm
races when reading history indexing vs. when the live-feed startup
finishes.
We generally need tests for all this and likely a better rework of the
feed layer's init such that we're showing history in chart afap instead
of waiting on backfills or the live feed to come up.
Much more to come B)
Turns out no backend (including kraken) requires it and really this
kinda of measure should be implemented and recorded from our fsp layer
instead of (hackily) sometimes expecting it to be in "source data".
After much frustration with a particular tsdb (cough) this instead
implements a new native-file (and apache tech based) backend which
stores time series in parquet files (for now) using the `polars` apis
(since we plan to use that lib as well for processing).
Note this code is currently **very** rough and in draft mode.
Details:
- add conversion routines for going from `polars.DataFrame` to
`numpy.ndarray` and back.
- lay out a simple file-name as series key symbology:
`fqme.<datadescriptions>.parquet`, though probably it will evolve.
- implement the entire `StorageClient` interface as it stands.
- adjust `storage.cli` cmds to instead expect to use this new backend,
which means it's a complete mess XD
Main benefits/motivation:
- wayy faster load times with no "datums to load limit" required.
- smaller space footprint and we haven't even touched compression
settings yet!
- wayyy more compatible with other systems which can lever the apache
ecosystem.
- gives us finer grained control over the filesystem usage so we can
choose to swap out stuff like the replication system or networking
access.
Turns out just (over)writing `.parquet` files with >= 1M datums is like
less then a second, and we can likely speed up appends using
`fastparquet` (usage coming soon).
Includes:
- a new `clone` CLI subcmd to test this all out by ad-hoc copy of
(literally hardcoded to a daemon-actor specific shm allocation X) an
existing `/dev/shm/<ShmArray>` and push to `.parquet` file.
- code to convert from our `ShmArray.array: np.ndarray` ->
`polars.DataFrame` (thanks SO).
- timing checks around the file IO and np -> polars conversion.
- a `read` subcmd which i was using to test the sync `pymarketstore`
client against our async one to see if the issues from
https://github.com/pikers/piker/issues/443 were resolved, but nope!
Turns out trying to switch to the old sync client and going back to
using the old json-RPC API (after having had to patch the upstream repo
to not import gRPC machinery to avoid crashes..) I'm basically getting
the exact same issues.
New tinkering results does possibly tell some new stuff:
- the EOF error seems to indeed be due to trying fetch records which haven't been
written (properly) - like asking for a `end=<epoch_int>` that is
earlier then the earliest record.
- the "snappy input corrupt" error seems to have something to do with
the `Params.end` field not being an `int` and/or the int precision not
being chosen correctly?
- toying with this a bunch manually shows that the internals of the
client (particularly `.build_query()` stuff) is parsing/calcing the
`Epoch` and `Nanoseconds` values out incorrectly.. which is likely
part of the problem.
- we also changed `anyio_marketstore.MarketStoreclient.build_query()`
logic when removing `pandas` a while back, which also seems to be
part of the problem on the async side, however reverting those
changes also didn't fix the issue entirely; likely something else
more subtle going on (maybe with the write vs. read `Epoch` field
type we pass?).
Despite all this malarky, we're already underway more or less obsoleting
this whole thing with a much less complex approach of using apache
parquet files and modern filesystem tools to get a more flexible and
numerics-native dataframe-oriented tsdb B)
Turns out the reason we were originally making the `time: float` column in our
ohlcv arrays was bc that's what **only** ib uses XD (and/or 🤦)
Instead we changed the default field type to be an `int` (which is also
more correct to avoid `float` rounding/precision discrepancies) and thus
**do not need to override it** in all other (crypto) backends (except
`ib`). Now we only do the customization (via `._ohlc_dtype`) to `float`
only for `ib` for now (though pretty sure we can also not do that
eventually as well..)!