In order to support instruments with lifetimes (aka derivatives) we need
generally need special symbol annotations which detail such meta data
(such as `MNQ.GLOBEX.20220717` for daq futes). Further there is really
no reason for the public api for this feed layer to care about getting
a special "brokername" field since generally the data is coming directly
from UIs (eg. search selection) so we might as well accept a fqsn (fully
qualified symbol name) which includes the broker name; for now a suffix
like `'.ib'`. We may change this schema (soon) but this at least gets us
to a point where we expect the full name including broker/provider.
An additional detail: for certain "generic" symbol names (like for
futes) we will pull a so called "front contract" and map this to
a specific fqsn underneath, so there is a double (cached) entry for that
entry such that other consumers can use it the same way if desired.
Some other machinery changes:
- expect the `stream_quotes()` endpoint to deliver it's `.started()` msg
almost immediately since we now need it deliver any fqsn asap (yes
this means the ep should no longer wait on a "live" first quote and
instead deliver what quote data it can right away.
- expect the quotes ohlc sampler task to add in the broker name before
broadcast to remote (actor) consumers since the backend isn't (yet)
expected to do that add in itself.
- obviously we start using all the new fqsn related `Symbol` apis
Break up real-time quote feed and history loading into 2 separate tasks
and deliver a client side `data.Feed` as soon as history is loaded
(instead of waiting for a rt quote - the previous logic). If
a symbol doesn't have history then likely the feed shouldn't be loaded
(since presumably client code will need at least "some" datums history
to do anything) and waiting on a real-time quote is dumb, since it'll
hang if the market isn't open XD. If a symbol doesn't have history we
can always write a zero/null array when we run into that case. This also
greatly speeds up feed loading when both history and quotes are available.
TL;DR summary:
- add a `_Feedsbus.start_task()` one-cancel-scope-per-task method for
assisting with (re-)starting and stopping long running persistent
feeds (basically a "one cancels one" style nursery API).
- add a `manage_history()` task which does all history loading (and
eventually real-time writing) which has an independent signal and
start it in a separate task.
- drop the "sample rate per symbol" stuff since client code doesn't really
care when it can just inspect shm indexing/time-steps itself.
- run throttle tasks in the bus nursery thus avoiding cancelling the
underlying sampler task on feed client disconnects.
- don't store a repeated ref the bus nursery's cancel scope..
This should in theory result in increased burstiness since we remove
the plain `trio.sleep()` and instead always wait on the receive channel
as much as possible until the `trio.move_on_after()` (+ time diffing
calcs) times out and signals the next throttled send cycle. This also is
slightly easier to grok code-wise instead of the `try, except` and
another tight while loop until a `trio.WouldBlock`. The only simpler
way i can think to do it is with 2 tasks: 1 to collect ticks and the
other to read and send at the throttle rate.
Comment out the log msg for now to avoid latency and add much more
detailed comments. Add an overrun log msg to the main sample loop.
There was a lingering issue where the fsp daemon would sync its shm
array with the source data and we'd set the start/end indices to the
same value. Under some races a reader would then read an empty `.array`
which it wasn't expecting. This fixes that as well as tidies up the
`ShmArray.push()` logic and adds a temporary check in `.array` for zero
length if the array hasn't been written yet.
We can now start removing read array length checks in consumer code
and hopefully no more races will show up.
Try out he new broadcast channels from `tractor` for data feeds
we already have cached. Any time there's a cache hit we load the
cached feed and just slap a broadcast receiver on it for the local
consumer task.
If a client attaches to a quotes data feed and requests a throttle rate,
be sure to unsub that side-band memchan + task when it detaches and
especially so on any transport connection error.
Also, use an explicit `tractor.Context.cancel()` on the client feed
block exit since we removed the implicit cancel option from the
`tractor` api.
Adding binance's "hft" ws feeds has resulted in a lot of context
switching in our Qt charts, so much so it's chewin CPU and definitely
worth it to throttle to the detected display rate as per discussion in
issue #192.
This is a first very very naive attempt at throttling L1 tick feeds on
the `brokerd` end (producer side) using a constant and uniform delivery
rate by way of a `trio` task + mem chan. The new func is
`data._sampling.uniform_rate_send()`. Basically if a client request
a feed and provides a throttle rate we just spawn a task and queue up
ticks until approximately the next display rate's worth period of time
has passed before forwarding. It's definitely nothing fancy but does
provide fodder and a start point for an up and coming queueing eng to
start digging into both #107 and #109 ;)
This allows for more deterministically managing long running sub-daemon
services under `pikerd` using the new context api from `tractor`.
The contexts are allocated in an async exit stack and torn down at root
daemon termination. Spawn brokerds using this method by changing the
persistence entry point to be a `@tractor.context`.
Avoid bothering with a trio event and expect the caller to do manual shm
registering with the write loop. Provide OHLC sample period indexing
through a re-branded pub-sub func ``iter_ohlc_periods()``.
Move all feed/stream agnostic logic and shared mem writing into a new
set of routines inside the ``data`` sub-package. This lets us move
toward a more standard API for broker and data backends to provide
cache-able persistent streams to client apps.
The data layer now takes care of
- starting a single background brokerd task to start a stream for as
symbol if none yet exists and register that stream for later lookups
- the existing broker backend actor is now always re-used if possible
if it can be found in a service tree
- synchronization with the brokerd stream's startup sequence is now
oriented around fast startup concurrency such that client code gets
a handle to historical data and quote schema as fast as possible
- historical data loading is delegated to the backend more formally by
starting a ``backfill_bars()`` task
- write shared mem in the brokerd task and only destruct it once requested
either from the parent actor or further clients
- fully de-duplicate stream data by using a dynamic pub-sub strategy
where new clients register for copies of the same quote set per symbol
This new API is entirely working with the IB backend; others will need
to be ported. That's to come shortly.
The min tick size is the smallest step an instrument can move in value
(think the number of decimals places of precision the value can have).
We start leveraging this in a few places:
- make our internal "symbol" type expose it as part of it's api
so that it can be passed around by UI components
- in y-axis view box scaling, use it to keep the bid/ask spread (L1 UI)
always on screen even in the case where the spread has moved further
out of view then the last clearing price
- allows the EMS to determine dark order live order submission offsets
If you have a common broker feed daemon then likely you don't want to
create superfluous shared mem buffers for the same symbol. This adds an
ad hoc little context manger which keeps a bool state of whether
a buffer writer task currently is running in this process. Before we
were checking the shared array token cache and **not** clearing it when
the writer task exited, resulting in incorrect writer/loader logic on
the next entry..
Really, we need a better set of SC semantics around the shared mem stuff
presuming there's only ever one writer per shared buffer at given time.
Hopefully that will come soon!
Wraps the growing tuple of items being delivered by `open_feed()`.
Add lazy loading of the broker's signal step stream with
a `Feed.index_stream()` method.
Add an internal `_Token` to do interchange (un)packing for passing
"references" to shm blocks between actors. Part of the token involves
providing the `numpy.dtype` in a cross-actor format. Add a module
variable for caching "known tokens" per actor. Drop use of context
managers since they tear down shm blocks too soon in debug mode and
there seems to be no reason to unlink/close shm before the process has
terminated; if code needs it torn down explicitly, it can.
Adjust the `data.open_feed()` api to take a shm token so the
broker-daemon can attach a previously created (by the parent actor) mem
buf and push real-time tick data. There's still some sloppiness here in
terms of ensuring only one mem buf per symbol (can be seen in
`stream_quotes()`) which should really managed at the data api level.
Add a bar incrementing stream-task which delivers increment msgs to any
consumers.
Logic in `SharedArray.push()` was totally wrong.
Remove all the `multiprocessing.resource_tracker` crap such that we
aren't loading an extra process at every layer and we don't get tons of
errors when cleaning on in an SC way.
This adds a shared memory "incrementing array" sub-sys interface
for single writer, multi-reader style data passing. The main motivation
is to avoid multiple copies of the same `numpy` array across actors
(plus now we can start being fancy like ray).
There still seems to be some odd issues with the "resource tracker"
complaining at teardown (likely partially to do with SIGINT stuff) so
some further digging in the stdlib code is likely coming.
Pertains to #107 and #98
Since the new FSP system will require time aligned data amongst actors,
it makes sense to share broker data feeds as much as possible on a local
system. There doesn't seem to be downside to this approach either since
if not fanning-out in our code, the broker (server) has to do it anyway
(and who knows how junk their implementation is) though with more
clients, sockets etc. in memory on our end. It also preps the code for
introducing a more "serious" pub-sub systems like zeromq/nanomessage.
Wrap the sync client in an async interface in anticipation of an actual
async client. This starts support for the `open_fee()`/`stream_quotes()`
api though the tick normalization isn't correct yet.