Turned out to be super simple to get the first draft to work since the
fast and slow chart now use the same domain, however, it seems like
maybe there's an offset issue still where the fast may be a couple
minutes ahead of the slow?
Need to dig in a bit..
Using a global "last index step" (via module var) obviously has problems
when working with multiple feed sets in a single global app instance:
any separate feed-set will be incremented according to an app-global
index-step and thus won't correctly calc per-feed-set-step update info.
Impl deatz:
- drop `DisplayState.incr_info()` (since previously moved to `Viz`) and
call that method on each appropriate `Viz` instance where necessary;
further ensure the appropriate `DisplayState` instance is passed in to
each call and make sure to pass a `state: DisplayState`.
- add `DisplayState.hist_vars: dict` for history chart (sets) to
determine the per-feed (not set) current slow chart (time) step.
- add `DisplayState.globalz: dict` to house a common per-feed-set state
and use it inside the new `Viz.incr_info()` such that
a `should_increment: bool` can be returned and used by the display
loop to determine whether to x-shift the current chart.
Read the `Viz.index_step()` directly to avoid always reading 1 on the
slow chart; this was completely broken before and resulting in not
rendering the bars graphic on the slow chart until at a true uppx of
1 which obviously doesn't work for 60 width bars XD
Further cleanups to `._render` module:
- drop `array` output from `Renderer.render()`, `read_from_key` input
and fix type annot.
- drop `should_line`, `changed_to_line` and `render_kwargs` from
`render_baritems()` outputs and instead calc `should_redraw` logic
inside the func body and return as output.
First allocation vs. first "prepend" of source data to an xy `ndarray`
format **must be mutex** in order to avoid a double prepend.
Previously when both blocks were executed we'd end up with
a `.xy_nd_start` that was decremented (at least) twice as much as it
should be on the first `.format_to_1d()` call which is obviously
incorrect (and causes problems for m4 downsampling as discussed below).
Further, since the underlying `ShmArray` buffer indexing is managed
(i.e. write-updated) completely independently from the incremental
formatter updates and internal xy indexing, we can't use
`ShmArray._first.value` and instead need to use the particular `.diff()`
output's prepend length value to decrement the `.xy_nd_start` on updates
after initial alloc.
Problems this resolves with m4:
- m4 uses a x-domain diff to calculate the number of "frames" to
downsample to, this is normally based on the ratio of pixel columns on
screen vs. the size of the input xy data.
- previously using an int-index (not epoch time) the max diff between
first and last index would be the size of the input buffer and thus
would never cause a large mem allocation issue (though it may have
been inefficient in terms of needed size).
- with an epoch time index this max diff could explode if you had some
near-now epoch time stamp **minus** an x-allocation value: generally
some value in `[0.5, -0.5]` which would result in a massive frames and
thus internal `np.ndarray()` allocation causing either a crash in
`numba` code or actual system mem over allocation.
Further, put in some more x value checks that trigger breakpoints if we
detect values that caused this issue - we'll remove em after this has
been tested enough.
Turns out we can't seem to avoid the artefacts when click-drag-scrolling
(results in weird repeated "smeared" curve segments) so just go back to
the original code.
Ensures that a "last datum" graphics object exists so that zooming can
read it using `.x_last()`. Also, disable the linked region stuff for now
since it's totally borked after flipping to the time indexing.
Since we don't really need it defined on the "chart widget" move it to
a viz method and rework it to hell:
- always discard the invalid view l > r case.
- use the graphic's UPPX to determine UI-to-scene coordinate scaling for
the L1-label collision detection, if there is no L1 just offset by
a few (index step scaled) datums; this allows us to drop the 2x
x-range calls as was hacked previous.
- handle no-data-in-view cases explicitly and error if we get any
ostensibly impossible cases.
- expect caller to trigger a graphics cycle if needed.
Further support this includes a rework a slew of other important
details:
- add `Viz.index_step`, an idempotent computed, index (presumably uniform)
step value which is needed for variable sample rate graphics displayed
on an epoch (second) time index.
- rework `Viz.datums_range()` to pass view x-endpoints as first and last
elements in return `tuple`; tighten up snap-to-data edge case logic
using `max()`/`min()` calls and better internal var naming.
- adjust all calls to `slice_from_time()` to not expect an "abs" slice.
- drop all `.yrange` resetting since we can just have the `Renderer` do
it when necessary.
If we presume that time indexing using a uniform step we can calculate
the exact index (using `//`) for the input time presuming the data
set has zero gaps. This gives a massive speedup over `numpy` fancy
indexing and (naive) `numba` iteration. Further in the case where time
gaps are detected, we can use `numpy.searchsorted()` to binary search
for the nearest expected index at lower latency.
Deatz,
- comment-disable the call to the naive `numba` scan impl.
- add a optional `step: int` input (calced if not provided).
- add todos for caching binary search results in the gap detection
cases.
- drop returning the "absolute buffer indexing" slice since the caller
can always just use the read-relative slice to acquire it.
When we use an epoch index and any sample rate > 1s we need to scale the
"number of bars" to that step in order to place the view correctly in
x-domain terms. For now we're calcing the step in-method but likely,
longer run, we'll pull this from elsewhere (like a ``Viz`` attr).
Gives approx a 3-4x speedup using plain old iterate-with-for-loop style
though still not really happy with this .5 to 1 ms latency..
Move the core `@njit` part to a `_slice_from_time()` with a pure python
func with orig name around it. Also, drop the output `mask` array since
we can generally just use the slices in the caller to accomplish the
same input array slicing, duh..
We need to subtract the first index in the array segment read, not the
first index value in the time-sliced output, to get the correct offset
into the non-absolute (`ShmArray.array` read) array..
Further we **do** need the `&` between the advance indexing conditions
and this adds profiling to see that it is indeed real slow (like 20ms
ish even when using `np.where()`).
Again, to make epoch indexing a flip-of-switch for testing look up the
`Viz.index_field: str` value when updating labels.
Also, drops the legacy tick-type set tracking which we no longer use
thanks to the new throttler subsys and it's framing msgs.
Planning to put the formatters into a new mod and aggregate all path
gen/op helpers into this module.
Further tweak include:
- moving `path_arrays_from_ohlc()` back to module level
- slice out the last xy datum for `OHLCBarsAsCurveFmtr` 1d formatting
- always copy the new x-value from the source to `.x_nd`
This was a major cause of error (particularly trying to get epoch
indexing working) and really isn't necessary; instead just have
`.diff()` always read from the underlying source array for current
index-step diffing and append/prepend slice construction.
Allows us to,
- drop `._last_read` state management and thus usage.
- better handle startup indexing by setting `.xy_nd_start/stop` to
`None` initially so that the first update can be done in one large
prepend.
- better understand and document the step curve "slice back to previous
level" logic which is now heavily commented B)
- drop all the `slice_to_head` stuff from and instead allow each
formatter to choose it's 1d segmenting.
In an effort to make it easy to override the indexing scheme.
Further, this repairs the `.datums_range()` special case to handle when
the view box is to-the-right-of the data set (i.e. l > datum_start).
As in make the call to `Flume.slice_from_time()` to try and convert any
time index values from the view range to array-indices; all untested
atm.
Also drop some old/unused/moved methods:
- `._set_xlimits()`
- `.bars_range()`
- `.curve_width_pxs()`
and fix some `flow` -> `viz` var naming.
Don't expect values (array + slice) to be returned and applied by
`.incr_update_xy_nd()` and instead presume this will implemented
internally in each (sub)formatter.
Attempt to simplify some incr-update routines, (particularly in the step
curve formatter, though most of it was reverted to just a simpler form
of the original implementation XD) including:
- dropping the need for the `slice_to_head: int` control.
- using the `xy_nd_start/stop` index counters over custom lookups.
Remove harcoded `'index'` field refs from all formatters in a first
attempt at moving towards epoch-time alignment (though don't actually
use it it yet).
Adjustments to the formatter interface:
- property for `.xy_nd` the x/y nd arrays.
- property for and `.xy_slice` the nd format array(s) start->stop index
slice.
Internal routine tweaks:
- drop `read_src_from_key` and always pass full source array on updates
and adjust handlers to expect to have to index the data field of
interest.
- set `.last_read` right after update calls instead of after 1d
conversion.
- drop `slice_to_head` array read slicing.
- add some debug points for testing 'time' indexing (though not used
here yet).
- add `.x_nd` array update logic for when the `.index_field` is not
'index' - i.e. when we begin to try and support epoch time.
- simplify some new y_nd updates to not require use of `np.broadcast()`
where possible.
Probably means it doesn't need to be a `Flume` method but it's
convenient to expect the caller to pass in the `np.ndarray` with
a `'time'` field instead of a `timeframe: str` arg; also, return the
slice mask instead of the sliced array as output (again allowing the
caller to do any slicing). Also, handle the slice-outside-time-range
case by just returning the entire index range with a `None` mask.
Adjust `Viz.view_data()` to instead do timeframe (for rt vs. hist shm
array) lookup and equiv array slicing with the returned mask.
Since these modules no longer contain Qt specific code we might
as well include them in the data sub-package.
Also, add `IncrementalFormatter.index_field` as single point to def the
indexing field that should be used for all x-domain graphics-data
rendering.
Was broken since the `_adhoc_futes_set` rework a while back. Removes the
cmdty symbols from that set into a new one and fixes the contract
case block to catch `Contract(secType='CMDTY')` case. Also makes
`Client.search_symbols()` return details `dict`s so that `piker search`
will work again..
Since higher level charting and fsp management need access to the
new `Flume` indexing apis this adjusts some func sigs to pass through
(and/or create) flume instances:
- `LinkedSplits.add_plot()` and dependents.
- `ChartPlotWidget.draw_curve()` and deps, and it now returns a `Flow`.
- `.ui._fsp.open_fsp_admin()` and `FspAdmin.open_fsp_ui()` related
methods => now we wrap the destination fsp shm in a flume on the admin
side and is returned from `.start_engine_method()`.
Drop a bunch of (unused) chart widget methods including some already
moved to flume methods: `.get_index()`, `.in_view()`,
`.last_bar_in_view()`, `.is_valid_index()`.
Move to expect and process new by-tick-event frames where the display
loop can now just iterate the most recent tick events by type instead of
the entire tick history sequence - thus we reduce iterations inside the
update loop.
Also, go back to use using the detected display's refresh rate (minus 6)
as the default feed requested throttle rate since we can now handle
much more bursty-ness in display updates thanks to the new framing
format B)
Factor out the chart widget creation since it's only executed once
during rendering of the first feed/flow whilst keeping plotitem overlay
creation inside the (flume oriented) init loop. Only create one vlm and
FSP chart/chain for now until we figure out if we want FSPs overlayed by
default or selected based on the "front" symbol in use. Add a default
color-palette set using shades of gray when plotting overlays. Presume
that the display loop's quote throttle rate should be uniformly
distributed over all input symbol-feeds for now. Restore feed pausing on
mouse interaction.
Initial support for real-time multi-symbol overlay charts using an
aggregate feed delivered by `Feed.open_multi_stream()`.
The setup steps for constructing the overlayed plot items is still very
very rough and will likely provide incentive for better refactoring high
level "charting APIs". For each fqsn passed into `display_symbol_data()`
we now synchronously,
- create a single call to `LinkedSplits.plot_ohlc_main() -> `ChartPlotWidget`
where we cache the chart in scope and for all other "sibling" fqsns
we,
- make a call to `ChartPlotWidget.overlay_plotitem()` -> `PlotItem`, hide its axes,
make another call with this plotitem input to
`ChartPlotWidget.draw_curve()`, set a sym-specific view box auto-yrange maxmin callback,
register the plotitem in a global `pis: dict[str, list[pgo.PlotItem, pgo.PlotItem]] = {}`
Once all plots have been created we then asynchronously for each symbol,
- maybe create a volume chart and register it in a similar task-global
table: `vlms: dict[str, ChartPlotWidget] = {}`
- start fsp displays for each symbol
Then common entrypoints are entered once for all symbols:
- a single `graphics_update_loop()` loop-task is started wherein
real-time graphics update components for each symbol are created,
* `L1Labels`
* y-axis last clearing price stickies
* `maxmin()` auto-ranger
* `DisplayState` (stored in a table `dss: dict[str, DisplayState] = {}`)
* an `increment_history_view()` task
and a single call to `Feed.open_multi_stream()` is used to create
a symbol-multiplexed quote stream which drives a single loop over all
symbols wherein for each quote the appropriate components are looked
up and passed to `graphics_update_cycle()`.
- a single call to `open_order_mode()` is made with the first symbol
provided as input, though eventually we want to support passing in the
entire list.
Further internal implementation details:
- special tweaks to the `pg.LinearRegionItem` setup wherein the region
is added with a zero opacity and *after* all plotitem overlays to
avoid and issue where overlays weren't being shown within the region
area in the history chart.
- all symbol-specific graphics oriented update calls are adjusted to
pass in the fqsn:
* `update_fsp_chart()`
* `ChartView._set_yrange()`
* ChartPlotWidget.update_graphics_from_flow()`
- avoid a double increment on sample step updates by not calling the
increment on any vlm chart since it seems the vlm-ohlc chart linking
already takes care of this now?
- use global counters for the last epoch time step to avoid incrementing
all views more then once per new time step given underlying shm array
buffers may be on different array-index values from one another.
Main "public" API change is to make `GodWidget.get/set_chart_symbol()`
accept and cache-on fqsn tuples to allow handling overlayed chart groups
and adjust method names to be plural to match.
Wrt `LinkedSplits`,
- create all chart widget axes with a `None` plotitem argument and set
the `.pi` field after axis creation (since apparently we have another
object reference causality dilemma..)
- set a monkeyed `PlotItem.chart_widget` for use in axes that still need
the widget reference.
- drop feed pause/resume for now since it's leaking feed tasks on the
`brokerd` side and we probably don't really need it any more, and if
we still do it should be done on the feed not the flume.
Wrt `ChartPlotItem`,
- drop `._add_sticky()` and use the `Axis` method instead and add some
overlay + axis sanity checks.
- refactor `.draw_ohlc()` to be a lighter wrapper around a call to
`.add_plot()`.
We have this method on our `ChartPlotWidget` but it makes more sense to
directly associate axis-labels with, well, the label's parent axis XD.
We add `._stickies: dict[str, YAxisLabel]` to replace
`ChartPlotWidget._ysticks` and pass in the `pg.PlotItem` to each axis
instance, stored as `Axis.pi` instead of handing around linked split
references (which are way out of scope for a single axis).
More work needs to be done to remove dependence on `.chart:
ChartPlotWidget` references in the date axis type as per comments.
Comments out the pixel-cache resetting since it doesn't seem we need it
any more to avoid draw oddities?
For `.fast_path` appends, this nearly got it working except the new path
segments are either not being connected correctly (step curve) or not
being drawn in full since the history path (plain line).
Leaving the attempted code commented in for a retry in the future; my
best guesses are that maybe,
- `.connectPath()` call is being done with incorrect segment length
and/or start point.
- the "appended" data: `appended = array[-append_len-1:slice_to_head]`
(done inside the formatter) isn't correct (i.e. endpoint handling
considering a path append) and needs special handling for different
curve types?
Ensure `.boundingRect()` calcs and `.draw_last_datum()` do geo-sizing
based on source data instead of presuming some `1.0` unit steps in some
spots; we need this to support an epoch index as is needed for overlays.
Further, clean out a bunch of old bounding rect calc code and add some
commented code for trying out `QRectF.united()` on the path + last datum
curve segment. Turns out that approach is slower as per eyeballing the
added profiler points.
After trying to hack epoch indexed time series and failing miserably,
decided to properly factor out all formatting routines into a common
subsystem API: ``IncrementalFormatter`` which provides the interface for
incrementally updating and tracking pre-path-graphics formatted data.
Previously this functionality was mangled into our `Renderer` (which
also does the work of `QPath` generation and update) but splitting it
out also preps for being able to do graphics-buffer downsampling and
caching on a remote host B)
The ``IncrementalFormatter`` (parent type) has the default behaviour of
tracking a single field-array on some source `ShmArray`, updating
a flattened `numpy.ndarray` in-mem allocation, and providing a default
1d conversion for pre-downsampling and path generation.
Changed out of `Renderer`,
- `.allocate_xy()`, `update_xy()` and `format_xy()` all are moved to
more explicitly named formatter methods.
- all `.x/y_data` nd array management and update
- "last view range" tracking
- `.last_read`, `.diff()`
- now calls `IncrementalFormatter.format_to_1d()` inside `.render()`
The new API gets,
- `.diff()`, `.last_read`
- all view range diff tracking through `.track_inview_range()`.
- better nd format array names: `.x/y_nd`, `xy_nd_start/stop`.
- `.format_to_1d()` which renders pre-path formatted arrays ready for
both m4 sampling and path gen.
- better explicit overloadable formatting method names:
* `.allocate_xy()` -> `.allocate_xy_nd()`
* `.update_xy()` -> `.incr_update_xy_nd()`
* `.format_xy()` -> `.format_xy_nd_to_1d()`
Finally this implements per-graphics-type formatters which define
each set up related formatting routines:
- `OHLCBarsFmtr`: std multi-line style bars
- `OHLCBarsAsCurveFmtr`: draws an interpolated line for ohlc sampled data
- `StepCurveFmtr`: handles vlm style curves
More or less a revamp (and possibly first draft for something similar in
`tractor` core) which ensures all actor trees attempt to discover the
`pikerd` registry actor.
Implementation improvements include:
- new `Registry` singleton which houses the `pikerd` discovery
socket-address `Registry.addr` + a `open_registry()` manager which
provides bootstrapped actor-local access.
- refine `open_piker_runtime()` to do the work of opening a root actor
and call the new `open_registry()` depending on whether a runtime has
yet been bootstrapped.
- rejig `[maybe_]open_pikerd()` in terms of the above.
Allows running simultaneous data feed services on the same (linux) host
by avoiding file-name collisions instead keying shm buffer sets by the
given `brokerd` instance. This allows, for example, either multiple dev
versions of the data layer to run side-by-side or for the test suite to
be seamlessly run alongside a production instance.
Previously we were relying on implicit actor termination in
`maybe_spawn_daemon()` but really on `pikerd` teardown we should be sure
to tear down not only all service tasks in each actor but also the actor
runtimes. This adjusts `Services.cancel_service()` to only cancel the
service task scope and wait on the `complete` event and reworks the
`open_context_in_task()` inner closure body to,
- always cancel the service actor at exit.
- not call `.cancel_service()` (potentially causing recursion issues on
cancellation).
- allocate a `complete: trio.Event` to signal full task + actor termination.
- pop the service task from the `.service_tasks` registry.
Further, add a `maybe_set_global_registry_sockaddr()` helper-cm to do
the work of checking whether a registry socket needs-to/has-been set
and use it for discovery calls to the `pikerd` service tree.
Seems that by default their history indexing rounds down/back to the
previous time step, so make sure we add a minute inside `Client.bars()`
when the `end_dt=None`, indicating "get the latest bar". Add
a breakpoint block that should trigger whenever the latest bar vs. the
latest epoch time is mismatched; we'll remove this after some testing
verifying the history bars issue is resolved.
Further this drops the legacy `backfill_bars()` endpoint which has been
deprecated and unused for a while.
Always use `open_sample_stream()` to register fast and slow quote feed
buffers and get a sampler stream which we use to trigger
`Sampler.broadcast_all()` calls on the service side after backfill
events.
Now spawned under the `pikerd` tree as a singleton-daemon-actor we offer
a slew of new routines in support of this micro-service:
- `maybe_open_samplerd()` and `spawn_samplerd()` which provide the
`._daemon.Services` integration to conduct service spawning.
- `open_sample_stream()` which is a client-side endpoint which does all
the work of (lazily) starting the `samplerd` service (if dne) and
registers shm buffers for update as well as connect a sample-index
stream for iterator by the caller.
- `register_with_sampler()` which is the `samplerd`-side service task
endpoint implementing all the shm buffer and index-stream registry
details as well as logic to ensure a lone service task runs
`Services.increment_ohlc_buffer()`; it increments at the shortest period
registered which, for now, is the default 1s duration.
Further impl notes:
- fixes to `Services.broadcast()` to ensure broken streams get discarded
gracefully.
- we use a `pikerd` side singleton mutex `trio.Lock()` to ensure
one-and-only-one `samplerd` is ever spawned per `pikerd` actor tree.
Drop the `_services` module level ref and adjust all client code to
match. Drop struct inheritance and convert all methods to class level.
Move `Brokerd.locks` -> `Services.locks` and add sampling mod to pikerd
enabled set.
We're moving toward a single actor managing sampler work and distributed
independently of `brokerd` services such that a user can run samplers on
different hosts then real-time data feed infra. Most of the
implementation details include aggregating `.data._sampling` routines
into a new `Sampler` singleton type.
Move the following methods to class methods:
- `.increment_ohlc_buffer()` to allow a single task to increment all
registered shm buffers.
- `.broadcast()` for IPC relay to all registered clients/shms.
Further add a new `maybe_open_global_sampler()` which allocates
a service nursery and assigns it to the `Sampler.service_nursery`; this
is prep for putting the step incrementer in a singleton service task
higher up the data-layer actor tree.
When we see multiple history frames that are duplicate to the request
set, bail re-trying after a number of tries (6 just cuz) and return
early from the tsdb backfill loop; presume that this many duplicates
means we've hit the beginning of history. Use a `collections.Counter`
for the duplicate counts. Make sure and warn log in such cases.
Wow, turns out tick framing was totally borked since we weren't framing
on "greater then throttle period long waits" XD
This moves all the framing logic into a common func and calls it in
every case:
- every (normal) "pre throttle period expires" quote receive
- each "no new quote before throttle period expires" (slow case)
- each "no clearing tick yet received" / only burst on clears case
Add some (untested) data slicing util methods for mapping time ranges to
source data indices:
- `.get_index()` which maps a single input epoch time to an equiv array
(int) index.
- add `slice_from_time()` which returns a view of the shm data from an
input epoch range presuming the underlying struct array contains
a `'time'` field with epoch stamps.
- `.view_data()` which slices out the "in view" data according to the
current state of the passed in `pg.PlotItem`'s view box.
This has been an outstanding idea for a while and changes the framing
format of tick events into a `dict[str, list[dict]]` wherein for each
tick "type" (eg. 'bid', 'ask', 'trade', 'asize'..etc) we create an FIFO
ordered `list` of events (data) and then pack this table into each
(throttled) send. This gives an additional implied downsample reduction
(in terms of iteration on the consumer side) from `N` tick-events to
a (max) `T` tick-types presuming the rx side only needs the latest tick
event.
Drop the `types: set` and adjust clearing event test to use the new
`ticks_by_type` map's keys.
Instead of uniformly distributing the msg send rate for a given
aggregate subscription, choose to be more bursty around clearing ticks
so as to avoid saturating the consumer with L1 book updates and vs.
delivering real trade data as-fast-as-possible.
Presuming the consumer is in the "UI land of slow" (eg. modern display
frame rates) such an approach serves more useful for seeing "material
changes" in the market as-bursty-as-possible (i.e. more short lived fast
changes in last clearing price vs. many slower changes in the bid-ask
spread queues). Such an approach also lends better to multi-feed
overlays which in aggregate tend to scale linearly with the number of
feeds/overlays; centralization of bursty arrival rates allows for
a higher overall throttle rate if used cleverly with framing.
Seems that by default their history indexing rounds down/back to the
previous time step, so make sure we add a minute inside `Client.bars()`
when the `end_dt=None`, indicating "get the latest bar". Add
a breakpoint block that should trigger whenever the latest bar vs. the
latest epoch time is mismatched; we'll remove this after some testing
verifying the history bars issue is resolved.
Further this drops the legacy `backfill_bars()` endpoint which has been
deprecated and unused for a while.
Likely pertains to helping with stuff in issues #345 and #373 and just
generally is handy to have when processing ledgers / clearing event
tables.
Adds the following helper methods:
- `iter_by_dt()` to iter-sort an arbitrary `Transaction`-like table of
clear entries.
- `Position.iter_clears()` as a convenience wrapper for the above.
Trying to send a message in the `NoBsWs.fixture()` exit when the ws is
not currently disconnected causes a double `._stack.close()` call which
will corrupt `trio`'s coro stack. Instead only do the unsub if we detect
the ws is still up.
Also drops the legacy `backfill_bars()` module endpoint.
Fixes#437
Seems that by default their history indexing rounds down/back to the
previous time step, so make sure we add a minute inside `Client.bars()`
when the `end_dt=None`, indicating "get the latest bar". Add
a breakpoint block that should trigger whenever the latest bar vs. the
latest epoch time is mismatched; we'll remove this after some testing
verifying the history bars issue is resolved.
Further this drops the legacy `backfill_bars()` endpoint which has been
deprecated and unused for a while.
Instead of requiring any `-b` try to import all built-in broker backend
python modules by default and only load those detected from the input symbol
list's fqsn values. In other words the `piker chart` cmd can be run sin
`-b` now and that flag is only required if you only want to load
a subset of the built-ins or are trying to load a specific
not-yet-builtin backend.
Allows using `set` ops for subscription management and guarantees no
duplicates per `brokerd` actor. New API is simpler for dynamic
pause/resume changes per `Feed`:
- `_FeedsBus.add_subs()`, `.get_subs()`, `.remove_subs()` all accept multi-sub
`set` inputs.
- `Feed.pause()` / `.resume()` encapsulates management of *only* sending
a msg on each unique underlying IPC msg stream.
Use new api in sampler task.
Previously we would only detect overruns and drop subscriptions on
non-throttled feed subs, however you can get the same issue with
a wrapping throttler task:
- the intermediate mem chan can be blocked either by the throttler task
being too slow, in which case we still want to warn about it
- the stream's IPC channel actually breaks and we still want to drop
the connection and subscription so it doesn't be come a source of
stale backpressure.
Set each quote-stream by matching the provider for each `Flume` and thus
results in some flumes mapping to the same (multiplexed) stream.
Monkey-patch the equivalent `tractor.MsgStream._ctx: tractor.Context` on
each broadcast-receiver subscription to allow use by feed bus methods as
well as other internals which need to reference IPC channel/portal info.
Start a `_FeedsBus` subscription management API:
- add `.get_subs()` which returns the list of tuples registered for the
given key (normally the fqsn).
- add `.remove_sub()` which allows removing by key and tuple value and
provides encapsulation for sampler task(s) which deal with dropped
connections/subscribers.
Adds provider-list-filtered (quote) stream multiplexing support allowing
for merged real-time `tractor.MsgStream`s using an `@acm` interface.
Behind the scenes we are just doing a classic multi-task push to common
mem chan approach.
Details to make it work on `Feed`:
- add `Feed.mods: dict[str, Moduletype]` and
`Feed.portals[ModuleType, tractor.Portal]` which are both populated
during init in `open_feed()`
- drop `Feed.portal` and `Feed.name`
Also fix a final lingering tsdb history loading loop termination bug.
A slight facepalm but, the main issue was a simple indexing logic error:
we need to slice with `tsdb_history[-shm._first.value:]` to push most
recent history not oldest.. This allows cleanup of tsdb backfill loop as
well.
Further, greatly simply `diff_history()` time slicing by using the
classic `numpy` conditional slice on the epoch field.
This had a bug prior where the end of a frame (a partial) wasn't being
sliced correctly and we'd get odd gaps showing up in the backfilled from
`brokerd` vs. tsdb end index. Repair this by doing timeframe aware index
diffing in `diff_history()` which seems to resolve it. Also, use the
frame-result's `end_dt: datetime` for the loop exit condition.
Sync per-symbol sampler loop start to subscription registers such that
the loop can't start until the consumer's stream subscription is added;
the task-sync uses a `trio.Event`. This patch also drops a ton of
commented cruft.
Further adjustments needed to get parity with prior functionality:
- pass init msg 'symbol_info' field to the `Symbol.broker_info: dict`.
- ensure the `_FeedsBus._subscriptions` table uses the broker specific
(without brokername suffix) as keys for lookup so that the sampler
loop doesn't have to append in the brokername as a suffix.
- ensure the `open_feed_bus()` flumes-table-msg returned sent by
`tractor.Context.started()` uses the `.to_msg()` form of all flume
structs.
- ensure `maybe_open_feed()` uses `tractor.MsgStream.subscribe()` on all
`Flume.stream`s on cache hits using the
`tractor.trionics.gather_contexts()` helper.
Orient shm-flow-arrays around the new idea of a `Flume` which provides
access, mgmt and basic measure of real-time data flow sets (see water
flow management semantics).
- We discard the previous idea of a "init message" which contained all
the shm attachment info and instead send a startup message full of
`Flume.to_msg()`s which are symmetrically loaded on the caller actor
side.
- Create data-flows "entries" for every passed in fqsn such that the consumer gets back
streams and shm for each, now all wrapped in `Flume` types. For now we
allocate `brokermod.stream_quotes()` tasks 1-to-1 for each fqsn
(instead of expecting each backend to do multi-plexing, though we
might want that eventually) as well a `_FeedsBus._subscriber` entry
for each. The pause/resume management loop is adjusted to match.
Previously `Feed`s were allocated 1-to-1 with each fqsn.
- Make `Feed` a `Struct` subtype instead of a `@dataclass` and move all
flow specific attrs to the new `Flume`:
- move `.index_stream()`, `.get_ds_info()` to `Flume`.
- drop `.receive()`: each fqsn entry will now require knowledge of
separate streams by feed users.
- add multi-fqsn tables: `.flumes`, `.streams` which point to the
appropriate per-symbol entries.
- Async load all `Flume`s from all contexts and all quote streams using
`tractor.trionics.gather_contexts()` on the client `open_feed()` side.
- Update feeds test to include streaming 2 symbols on the same (binance)
backend.