It was a concurrency-hack mess somewhat due to all sorts of limitations
imposed by marketstore (query size limits, strange datetime/timestamp
errors, slow table loads for large queries..) and we can drastically
simplify. There's still some issues with getting new backfills (not yet
in storage) correctly prepended: there's sometimes little gaps due to shm
races when reading history indexing vs. when the live-feed startup
finishes.
We generally need tests for all this and likely a better rework of the
feed layer's init such that we're showing history in chart afap instead
of waiting on backfills or the live feed to come up.
Much more to come B)
Turns out no backend (including kraken) requires it and really this
kinda of measure should be implemented and recorded from our fsp layer
instead of (hackily) sometimes expecting it to be in "source data".
Use `def_iohlcv_fields` for a name and instead of copying and inserting
the index field pop it for the non-index version. Drop creating
`np.dtype()` instances since `numpy`'s apis accept both input forms so
this is simpler on our end.
To kick off our (tsdb) storage backends this adds our first implementing
a new `Storage(Protocol)` client interface. Going foward, the top level
`.storage` pkg-module will now expose backend agnostic APIs and helpers
whilst specific backend implementations will adhere to that middle-ware
layer.
Deats:
- add `.storage.marketstore.Storage` as the first client implementation,
moving all needed (import) dependencies out from
`.service.marketstore` as well as `.ohlc_key_map` and `get_client()`.
- move root `conf.toml` loading from `.data.history` into
`.storage.__init__.open_storage_client()` which now takes in a `name:
str` and does all the work of loading the correct backend module, its
config, and determining if a service-instance can be contacted and
a client loaded; in the case where this fails we raise a new
`StorageConnectionError`.
- add a new `.storage.get_storagemod()` just like we have for brokers.
- make `open_storage_client()` also return the backend module such that
the history-data layer can make backend specific calls as needed (eg.
ohlc_key_map).
- fall back to a basic non-tsdb backfill when `open_storage_client()`
raises the new connection error.
The plan is to offer multiple tsdb and other storage backends (for
a variety of use cases) and expose them similarly to how we do for
broker and data providers B)
Including changing to `LinkedSplits.mkt: MktPair` and adding an explicit
setter method for setting it and being sure that nothing breaks
in the display system init!
For this commit we leave in warning access to `LinkedSplits.symbol` but
will remove in following commit.
Stash it for now in the (now mutable by default) `.shm_write_opts` and
have the new `Flume._has_vlm: bool` (only set to false internally by
feed layer) which can be read via new public `.has_vlm()` predicate.
Move out the old `.ui/_fsp` helper logic to this flume method.
`Flume.mkt.fqme` might not be exactly the same as the local
version now since we've had to add some hacks to certain backends
(cough ib) to handle `MktPair.src` not being set as an `Asset` (yet).
Since porting all backends to the new `FeedInit` + `MktPair` + `Asset`
style init, we can now just directly pass a `MktPair` instance to the
history endpoint(s) since it's always called *after* the live feed
`.stream_quotes()` ep B)
This has a lot of benefits including allowing brokerd backends to have
more flexible, pre-processed market endpoint meta-data that piker has
already validated; makes handling special cases in much more straight
forward as well such as forex pairs from legacy brokers XD
First pass changes all crypto backends to expect this new input, ib will
come next after handling said special cases..
Previously we were passing the `fqme: str` which isn't as extensive nor
were we able to pass `MktPair` direct to backend history manager-loading
routines (which should be able to rely on always receiving it since
currently `stream_quotes()` is always called first for setup).
This also starts a slight bit of configuration oriented tsdb info
loading (via a new `conf.toml`) such that a user can decide to host
their (marketstore) db on a remote host and our container spawning and
client code will do the right startup automatically based on the config.
|-> Related to this I've added some comments about doing storage
backend module loading which should get actually written out as part of
patches coming in #486 (or something related).
Don't allow overruns again in history context since it seems it was
never a problem?
We need to allow overruns during the async multi-broker context spawning
init bc some backends might take longer then others to setup (eg.
binance vs. kucoin) and result in some context (stream) being overrun by
the time we get to the `.open_stream()` phase. Ideally, we can maybe
adjust the concurrent setup to be more of a task-per-provider style to
avoid this in the future - which would also in theory result in
more-immediate per-provider setup in terms showing ready feeds asap.
Also, does a bunch of renaming from fqsn -> fqme and drops the lower
casing of input symbols instead expecting the caller to know what the
data backend it's requesting is going to be able to handle in terms of
symbology.
Since it's a bit weird having service specific implementation details
inside the general service `._daemon` mod, and since i'd mentioned
trying this re-org; let's do it B)
Requires enabling the new mod in both `pikerd` and `brokerd` and
obviously a bit more runtime-loading of the service modules in the
`brokerd` service eps to avoid import cycles.
Also moved `_setup_persistent_brokerd()` into the new mod since the
naming would place it there even though the implementation really
wouldn't (longer run) since we want to split up `.data.feed` layer
backend-invoked eps into a separate actor eventually from the "actual"
`brokerd` which will be the actor running **only** the trade control eps
(eg. trades_dialogue()` and friends).
`trio`'s internals don't allow for async generator (and thus by
consequence dynamic reset of async exit stacks containing `@acm`s)
interleaving since doing so corrupts the cancel-scope stack. See details
in:
- https://github.com/python-trio/trio/issues/638
- https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator
- `trio._core._run.MISNESTING_ADVICE`
We originally tried to address this using
`@trio_util.trio_async_generator` in backend streaming code but for
whatever reason stopped working recently (at least for me) and it's more
or less implemented the same way as this patch but with more layers and
an extra dep. I also don't want us to have to address this problem again
if/when that lib isn't able to keep up to date with wtv `trio` is
doing..
So instead this is a complete rewrite of the conc design of our
auto-reconnect ws API to move all reset logic and msg relay into a bg
task which is respawned on reset-requiring events: user spec-ed msg recv
latency, network errors, roaming events.
Deatz:
- drop all usage of `AsyncExitStack` and no longer require client code
to (hackily) call `NoBsWs._connect()` on msg latency conditions,
intead this is all done behind the scenes and the user can instead
pass in a `msg_recv_timeout: float`.
- massively simplify impl of `NoBsWs` and move all reset logic into a
new `_reconnect_forever()` task.
- offer use of `reset_after: int` a count value that determines how many
`msg_recv_timeout` events are allowed to occur before reconnecting the
entire ws from scratch again.
Since we have made `MktPair.bs_mktid` mean something else now, change
all the feed setup var names to instead be more representative of the
actual value: `bs_fqme: str` and use the new `MktPair.bs_fqme` where
necessary.
The legacy version was a `dict` of `dicts` vs. now we want to be handed
a `list[FeedInit]`; process both in a factored way.
Drop `FeedInit.bs_mktid` since it's already defined on `.mkt.bs_mktid`
and we don't really need it top level.
More or less a replacement for what @guilledk did with the initial
attempt at a "broker check" type script a while back except in this case
we're going to always run this validation routine and it now uses a new
`FeedInit` struct to ensure backends are delivering the right schema-ed
data during startup. Also allows us to stick deprecation warnings / and
or strict API compat errors all in one spot (at least for live feeds).
Factors out a bunch of `MktPair` related adapter-logic into a new
`.validate.valiate_backend()` which warns to the backend implementer via
log msgs all the problems outstanding. Ideally we do our backend module
endpoint scan-and-complain regarding missing feature support from here
as well (eg. search, broker/trade ctl, ledger processing, etc.).
In `.feed` and `._sampling` move to using the new
`tractor.Context.open_stream(allow_overruns: bool)` (cough, A BREAKING
CHANGE).
Also set `Flume.mkt` during construction in `.feed.open_feed()`.
Might as well try and flip it over to the new type; make appropriate
dict serialization changes in `.to_msg()`. Alias back to `.symbol:
Symbol` with a property.
Initial attempt at getting the sampling and shm layer to use the new mkt
info meta-data type. Draft out a potential `BackendInitMsg:
msgspec.Struct` for validating the init msg returned from the
`stream_quotes()` start value; obvs don't actually use it yet.
Add `MktPair` handling block for when a backend delivers
a `mkt_info`-field containing init msg. Adjust the original
`Symbol`-style `'symbol_info'` msg processing to do `Decimal` defaults
and convert to `MktPair` including slapping in a hacky `_atype: str`
field XD
General initial name changes to `bs_mktid` and `_fqme` throughout!
Not sure how i missed this (and left in handling of `list.remove()` and
it ever worked for that?) after the `samplerd` impl in 5ec1a72 but, this
adjusts the remove-broken-subscriber loop to catch the correct
`set.remove()` exception type on a missing (likely already removed)
subscription entry.
There's been way too many issues when trying to calculate this
dynamically from the input array, so just expect the caller to know what
it's doing and don't bother with ever hitting the error case of
calculating and incorrect value internally.
Not sure why this was ever allowed but, for slicing to the sample
*before* whatever target time stamp is passed in we should definitely
not return the prior index as for the slice start since that might
include a very large gap prior to whatever sample is scanned to have
the earliest matching time stamp.
This was essential to fixing overlay intersect points searching in our
``ui.view_mode`` machinery..
In situations where clients are (dynamically) subscribing *while*
broadcasts are starting to taking place we need to handle the
`set`-modified-during-iteration case. This scenario seems to be more
common during races on concurrent startup of multiple symbols. The
solution here is to use another set to take note of subscribers which
are successfully sent-to and then skipping them on re-try.
This also contains an attempt to exception-handle throttled stream
overruns caused by higher frequency feeds (like binance) pushing more
quotes then can be handled during (UI) client startup.
Not really sure there's much we can do besides dump Grpc stuff when we
detect an "error" `str` for the moment..
Either way leave a buncha complaints (como siempre) and do linting
fixups..
Previously we would make the `ahabd` supervisor-actor sync to docker
container startup using pseudo-blocking log message processing.
This has issues,
- we're forced to do a hacky "yield back to `trio`" in order to be
"fake async" when reading the log stream and further,
- blocking on a message is fragile and often slow.
Instead, run the log processor in a background task and in the parent
task poll for the container to be in the client list using a similar
pseudo-async poll pattern. This allows the super to `Context.started()`
sooner (when the container is actually registered as "up") and thus
unblock its (remote) caller faster whilst still doing full log msg
proxying!
Deatz:
- adds `Container.cuid: str` a unique container id for logging.
- correctly proxy through the `loglevel: str` from `pikerd` caller task.
- shield around `Container.cancel()` in the teardown block and use
cancel level logging in that method.
With the addition of a new `elastixsearch` docker support in
https://github.com/pikers/piker/pull/464, adjustments were made
to container startup sync logic (particularly the `trio` checkpoint
sleep period - which itself is a hack around a sync client api) which
caused a regression in upstream startup logic wherein container error
logs were not being bubbled up correctly causing a silent failure mode:
- `marketstore` container started with corrupt input config
- `ahabd` super code timed out on startup phase due to a larger log
polling period, skipped processing startup logs from the container,
and continued on as though the container was started
- history client fails on grpc connection with no clear error on why the
connection failed.
Here we revert to the old poll period (1ms) to avoid any more silent
failures and further extend supervisor control through a configuration
override mechanism. To address the underlying design issue, this patch
adds support for container-endpoint-callbacks to override supervisor
startup configuration parameters via the 2nd value in their returned
tuple: the already delivered configuration `dict` value.
The current exposed values include:
{
'startup_timeout': 1.0,
'startup_query_period': 0.001,
'log_msg_key': 'msg',
},
This allows for container specific control over the startup-sync query
period (the hack mentioned above) as well as the expected log msg key
and of course the startup timeout.
Adds a `piker storage` subcmd with a `-d` flag to wipe a particular
fqsn's time series (both 1s and 60s). Obviously this needs to be
extended much more but provides a start point.
In order to support existing `pps.toml` files in the wild which don't
have the `asset_type, price_tick_size, lot_tick_size` fields, we need to
only optionally read them and instead expect that backends will write
the fields going forward (coming in follow patches).
Further this makes some small asset-size (vlm accounting) quantization
related adjustments:
- rename `Symbol.decimal_quant()` -> `.quantize_size()` since that is
explicitly what this method is doing.
- and expect an input `size: float` which we cast to decimal instead of
doing it inside the `.calc_size()` caller code.
- drop `Symbol.iterfqsns()` which wasn't being used anywhere at all..
Additionally, this drafts out a new replacement market-trading-pair data
type to eventually replace `.data._source.Symbol` -> `MktPair` which we
aren't using yet, but serves as the documentation-driven motivator ;)
and, it relates to https://github.com/pikers/piker/issues/467.
Add decimal quantize API to Symbol to simplify by-broker truncation
Add symbol info to `pps.toml`
Move _assert call to outside the _async_main context manager
Minor indentation and styling changes, also convert a few prints to log calls
Fix multi write / race condition on open_pps call
Switch open_pps to not write by default
Fix integer math kraken syminfo _tick_size initialization
For the purposes of avoiding another full format call we can stash the
last rendered 1d xy pre-graphics formats as
`IncrementalFormatter.x/y_1d: np.ndarray`s and allow readers in the viz
and render machinery to use this data easily for things like "only
drawing the last uppx's worth of data as a line". Also add
a `.flat_index_ratio: float` which can be used similarly as a scalar
applied to indexes into the src array but instead when indexing
(flattened) 1d xy formatted outputs. Finally, this drops the way
overdone/noisy `.__repr__()` meth we had XD
We obviously don't want to be debugging a sample-index issue if/when the
market for the asset is closed (since we'll be guaranteed to have
a mismatch, lul). Pass in the `feed_is_live: trio.Event` throughout the
backfilling routines to allow first checking for the live feed being active
so as to avoid breakpointing on false +ves. Also, add a detailed warning
log message for when *actually* investigating a mismatch.
This should never really happen but when it does it appears to be a race
with writing startup pre-graphics-formatter array data where we get
`x_end` epoch value subtracting some really small offset value (like
`-/+0.5`) or the opposite where the `x_start` is epoch and `x_end` is
small.
This adds a warning msg and `breakpoint()` as well as guards around the
entire code downsampling code path so that when resumed the downsampling
cycle should just be skipped and avoid a crash.
Whenever the last datum is in view `slice_from_time()` need to always
spec the final array index (i.e. the len - 1 value we set as
`read_i_max`) to avoid a uniform-step arithmetic error where gaps in the
underlying time series causes an index that's too low to be returned.