Sync per-symbol sampler loop start to subscription registers such that
the loop can't start until the consumer's stream subscription is added;
the task-sync uses a `trio.Event`. This patch also drops a ton of
commented cruft.
Further adjustments needed to get parity with prior functionality:
- pass init msg 'symbol_info' field to the `Symbol.broker_info: dict`.
- ensure the `_FeedsBus._subscriptions` table uses the broker specific
(without brokername suffix) as keys for lookup so that the sampler
loop doesn't have to append in the brokername as a suffix.
- ensure the `open_feed_bus()` flumes-table-msg returned sent by
`tractor.Context.started()` uses the `.to_msg()` form of all flume
structs.
- ensure `maybe_open_feed()` uses `tractor.MsgStream.subscribe()` on all
`Flume.stream`s on cache hits using the
`tractor.trionics.gather_contexts()` helper.
Orient shm-flow-arrays around the new idea of a `Flume` which provides
access, mgmt and basic measure of real-time data flow sets (see water
flow management semantics).
- We discard the previous idea of a "init message" which contained all
the shm attachment info and instead send a startup message full of
`Flume.to_msg()`s which are symmetrically loaded on the caller actor
side.
- Create data-flows "entries" for every passed in fqsn such that the consumer gets back
streams and shm for each, now all wrapped in `Flume` types. For now we
allocate `brokermod.stream_quotes()` tasks 1-to-1 for each fqsn
(instead of expecting each backend to do multi-plexing, though we
might want that eventually) as well a `_FeedsBus._subscriber` entry
for each. The pause/resume management loop is adjusted to match.
Previously `Feed`s were allocated 1-to-1 with each fqsn.
- Make `Feed` a `Struct` subtype instead of a `@dataclass` and move all
flow specific attrs to the new `Flume`:
- move `.index_stream()`, `.get_ds_info()` to `Flume`.
- drop `.receive()`: each fqsn entry will now require knowledge of
separate streams by feed users.
- add multi-fqsn tables: `.flumes`, `.streams` which point to the
appropriate per-symbol entries.
- Async load all `Flume`s from all contexts and all quote streams using
`tractor.trionics.gather_contexts()` on the client `open_feed()` side.
- Update feeds test to include streaming 2 symbols on the same (binance)
backend.
Initial test that starts a `binance` feed and reads the quote messages
alongside shm buffers for 1s and 1m OHLC; just prints to console for
now.
Template out parametrization for multi-symbol quote-multiplexed feeds
which coming soon B)
They need to be run with a local private API key and haven't been ported
to latest data apis anyway. It's also a broker most peeps aren't going
to be using any time soon.
Questrade is the default broker backend (for now) so the CI
can run using a practice account token handed down through an
env variable. If we add a cached directory to the build then the token
should remain persistent in the brokers config and will only need to be
updated if something goes wrong.
Also, add a `--confdir` flag for pytest much in the same way as for
the `piker` cli.