Try out he new broadcast channels from `tractor` for data feeds
we already have cached. Any time there's a cache hit we load the
cached feed and just slap a broadcast receiver on it for the local
consumer task.
If a client attaches to a quotes data feed and requests a throttle rate,
be sure to unsub that side-band memchan + task when it detaches and
especially so on any transport connection error.
Also, use an explicit `tractor.Context.cancel()` on the client feed
block exit since we removed the implicit cancel option from the
`tractor` api.
Adding binance's "hft" ws feeds has resulted in a lot of context
switching in our Qt charts, so much so it's chewin CPU and definitely
worth it to throttle to the detected display rate as per discussion in
issue #192.
This is a first very very naive attempt at throttling L1 tick feeds on
the `brokerd` end (producer side) using a constant and uniform delivery
rate by way of a `trio` task + mem chan. The new func is
`data._sampling.uniform_rate_send()`. Basically if a client request
a feed and provides a throttle rate we just spawn a task and queue up
ticks until approximately the next display rate's worth period of time
has passed before forwarding. It's definitely nothing fancy but does
provide fodder and a start point for an up and coming queueing eng to
start digging into both #107 and #109 ;)
This allows for more deterministically managing long running sub-daemon
services under `pikerd` using the new context api from `tractor`.
The contexts are allocated in an async exit stack and torn down at root
daemon termination. Spawn brokerds using this method by changing the
persistence entry point to be a `@tractor.context`.
Avoid bothering with a trio event and expect the caller to do manual shm
registering with the write loop. Provide OHLC sample period indexing
through a re-branded pub-sub func ``iter_ohlc_periods()``.
Move all feed/stream agnostic logic and shared mem writing into a new
set of routines inside the ``data`` sub-package. This lets us move
toward a more standard API for broker and data backends to provide
cache-able persistent streams to client apps.
The data layer now takes care of
- starting a single background brokerd task to start a stream for as
symbol if none yet exists and register that stream for later lookups
- the existing broker backend actor is now always re-used if possible
if it can be found in a service tree
- synchronization with the brokerd stream's startup sequence is now
oriented around fast startup concurrency such that client code gets
a handle to historical data and quote schema as fast as possible
- historical data loading is delegated to the backend more formally by
starting a ``backfill_bars()`` task
- write shared mem in the brokerd task and only destruct it once requested
either from the parent actor or further clients
- fully de-duplicate stream data by using a dynamic pub-sub strategy
where new clients register for copies of the same quote set per symbol
This new API is entirely working with the IB backend; others will need
to be ported. That's to come shortly.
The min tick size is the smallest step an instrument can move in value
(think the number of decimals places of precision the value can have).
We start leveraging this in a few places:
- make our internal "symbol" type expose it as part of it's api
so that it can be passed around by UI components
- in y-axis view box scaling, use it to keep the bid/ask spread (L1 UI)
always on screen even in the case where the spread has moved further
out of view then the last clearing price
- allows the EMS to determine dark order live order submission offsets
If you have a common broker feed daemon then likely you don't want to
create superfluous shared mem buffers for the same symbol. This adds an
ad hoc little context manger which keeps a bool state of whether
a buffer writer task currently is running in this process. Before we
were checking the shared array token cache and **not** clearing it when
the writer task exited, resulting in incorrect writer/loader logic on
the next entry..
Really, we need a better set of SC semantics around the shared mem stuff
presuming there's only ever one writer per shared buffer at given time.
Hopefully that will come soon!
Wraps the growing tuple of items being delivered by `open_feed()`.
Add lazy loading of the broker's signal step stream with
a `Feed.index_stream()` method.
Add an internal `_Token` to do interchange (un)packing for passing
"references" to shm blocks between actors. Part of the token involves
providing the `numpy.dtype` in a cross-actor format. Add a module
variable for caching "known tokens" per actor. Drop use of context
managers since they tear down shm blocks too soon in debug mode and
there seems to be no reason to unlink/close shm before the process has
terminated; if code needs it torn down explicitly, it can.
Adjust the `data.open_feed()` api to take a shm token so the
broker-daemon can attach a previously created (by the parent actor) mem
buf and push real-time tick data. There's still some sloppiness here in
terms of ensuring only one mem buf per symbol (can be seen in
`stream_quotes()`) which should really managed at the data api level.
Add a bar incrementing stream-task which delivers increment msgs to any
consumers.
Logic in `SharedArray.push()` was totally wrong.
Remove all the `multiprocessing.resource_tracker` crap such that we
aren't loading an extra process at every layer and we don't get tons of
errors when cleaning on in an SC way.
This adds a shared memory "incrementing array" sub-sys interface
for single writer, multi-reader style data passing. The main motivation
is to avoid multiple copies of the same `numpy` array across actors
(plus now we can start being fancy like ray).
There still seems to be some odd issues with the "resource tracker"
complaining at teardown (likely partially to do with SIGINT stuff) so
some further digging in the stdlib code is likely coming.
Pertains to #107 and #98
Since the new FSP system will require time aligned data amongst actors,
it makes sense to share broker data feeds as much as possible on a local
system. There doesn't seem to be downside to this approach either since
if not fanning-out in our code, the broker (server) has to do it anyway
(and who knows how junk their implementation is) though with more
clients, sockets etc. in memory on our end. It also preps the code for
introducing a more "serious" pub-sub systems like zeromq/nanomessage.
Wrap the sync client in an async interface in anticipation of an actual
async client. This starts support for the `open_fee()`/`stream_quotes()`
api though the tick normalization isn't correct yet.