Track data feed subscribers using a new `Sub(Struct)`

In prep for supporting reverse-ipc connect-back to UI actors from
middle-ware systems (for the purposes of triggering data-view canvas
re-renders and built-in tsp annotations), add a new struct type to
better generalize the management of remote feed subscriptions. Include
a `Sub.rc_ui: bool` for now (with nearby todo-comment) and expose an
`allow_remote_ctl_ui: bool` through the feed endpoints to help drive
/ prep for all that ^

Rework all the sampler tasks to expect the `Sub`'s new iface:

- split up the `Sub.ipc: MsgStream`  and `.send_chan` as separate fields
  since we're handling the throttle case in separate
  `sample_and_broadcast()` logic blocks anyway and avoids needing to
  monkey-patch on the `._ctx` malarky..
- explicitly provide the optional handle to the `_throttle_cs:
  CancelScope` again for the case where throttling/event-downsampling is
  requested.
- add `_FeedsBus.subs_items()` as a public iterator.
distribute_dis
Tyler Goodlet 2023-12-26 20:34:54 -05:00
parent 88f415e5b8
commit 1231c459aa
2 changed files with 122 additions and 77 deletions

View File

@ -33,6 +33,11 @@ from typing import (
) )
import tractor import tractor
from tractor import (
Context,
MsgStream,
Channel,
)
from tractor.trionics import ( from tractor.trionics import (
maybe_open_nursery, maybe_open_nursery,
) )
@ -53,7 +58,10 @@ if TYPE_CHECKING:
from ._sharedmem import ( from ._sharedmem import (
ShmArray, ShmArray,
) )
from .feed import _FeedsBus from .feed import (
_FeedsBus,
Sub,
)
# highest frequency sample step is 1 second by default, though in # highest frequency sample step is 1 second by default, though in
@ -94,7 +102,7 @@ class Sampler:
float, float,
list[ list[
float, float,
set[tractor.MsgStream] set[MsgStream]
], ],
] = defaultdict( ] = defaultdict(
lambda: [ lambda: [
@ -258,8 +266,8 @@ class Sampler:
f'broadcasting {period_s} -> {last_ts}\n' f'broadcasting {period_s} -> {last_ts}\n'
# f'consumers: {subs}' # f'consumers: {subs}'
) )
borked: set[tractor.MsgStream] = set() borked: set[MsgStream] = set()
sent: set[tractor.MsgStream] = set() sent: set[MsgStream] = set()
while True: while True:
try: try:
for stream in (subs - sent): for stream in (subs - sent):
@ -314,7 +322,7 @@ class Sampler:
@tractor.context @tractor.context
async def register_with_sampler( async def register_with_sampler(
ctx: tractor.Context, ctx: Context,
period_s: float, period_s: float,
shms_by_period: dict[float, dict] | None = None, shms_by_period: dict[float, dict] | None = None,
@ -649,12 +657,7 @@ async def sample_and_broadcast(
# eventually block this producer end of the feed and # eventually block this producer end of the feed and
# thus other consumers still attached. # thus other consumers still attached.
sub_key: str = broker_symbol.lower() sub_key: str = broker_symbol.lower()
subs: list[ subs: set[Sub] = bus.get_subs(sub_key)
tuple[
tractor.MsgStream | trio.MemorySendChannel,
float | None, # tick throttle in Hz
]
] = bus.get_subs(sub_key)
# NOTE: by default the broker backend doesn't append # NOTE: by default the broker backend doesn't append
# it's own "name" into the fqme schema (but maybe it # it's own "name" into the fqme schema (but maybe it
@ -663,34 +666,40 @@ async def sample_and_broadcast(
fqme: str = f'{broker_symbol}.{brokername}' fqme: str = f'{broker_symbol}.{brokername}'
lags: int = 0 lags: int = 0
# TODO: speed up this loop in an AOT compiled lang (like # XXX TODO XXX: speed up this loop in an AOT compiled
# rust or nim or zig) and/or instead of doing a fan out to # lang (like rust or nim or zig)!
# TCP sockets here, we add a shm-style tick queue which # AND/OR instead of doing a fan out to TCP sockets
# readers can pull from instead of placing the burden of # here, we add a shm-style tick queue which readers can
# broadcast on solely on this `brokerd` actor. see issues: # pull from instead of placing the burden of broadcast
# on solely on this `brokerd` actor. see issues:
# - https://github.com/pikers/piker/issues/98 # - https://github.com/pikers/piker/issues/98
# - https://github.com/pikers/piker/issues/107 # - https://github.com/pikers/piker/issues/107
for (stream, tick_throttle) in subs.copy(): # for (stream, tick_throttle) in subs.copy():
for sub in subs.copy():
ipc: MsgStream = sub.ipc
throttle: float = sub.throttle_rate
try: try:
with trio.move_on_after(0.2) as cs: with trio.move_on_after(0.2) as cs:
if tick_throttle: if throttle:
send_chan: trio.abc.SendChannel = sub.send_chan
# this is a send mem chan that likely # this is a send mem chan that likely
# pushes to the ``uniform_rate_send()`` below. # pushes to the ``uniform_rate_send()`` below.
try: try:
stream.send_nowait( send_chan.send_nowait(
(fqme, quote) (fqme, quote)
) )
except trio.WouldBlock: except trio.WouldBlock:
overruns[sub_key] += 1 overruns[sub_key] += 1
ctx = stream._ctx ctx: Context = ipc._ctx
chan = ctx.chan chan: Channel = ctx.chan
log.warning( log.warning(
f'Feed OVERRUN {sub_key}' f'Feed OVERRUN {sub_key}'
'@{bus.brokername} -> \n' '@{bus.brokername} -> \n'
f'feed @ {chan.uid}\n' f'feed @ {chan.uid}\n'
f'throttle = {tick_throttle} Hz' f'throttle = {throttle} Hz'
) )
if overruns[sub_key] > 6: if overruns[sub_key] > 6:
@ -707,10 +716,10 @@ async def sample_and_broadcast(
f'{sub_key}:' f'{sub_key}:'
f'{ctx.cid}@{chan.uid}' f'{ctx.cid}@{chan.uid}'
) )
await stream.aclose() await ipc.aclose()
raise trio.BrokenResourceError raise trio.BrokenResourceError
else: else:
await stream.send( await ipc.send(
{fqme: quote} {fqme: quote}
) )
@ -724,16 +733,16 @@ async def sample_and_broadcast(
trio.ClosedResourceError, trio.ClosedResourceError,
trio.EndOfChannel, trio.EndOfChannel,
): ):
ctx = stream._ctx ctx: Context = ipc._ctx
chan = ctx.chan chan: Channel = ctx.chan
if ctx: if ctx:
log.warning( log.warning(
'Dropped `brokerd`-quotes-feed connection:\n' 'Dropped `brokerd`-quotes-feed connection:\n'
f'{broker_symbol}:' f'{broker_symbol}:'
f'{ctx.cid}@{chan.uid}' f'{ctx.cid}@{chan.uid}'
) )
if tick_throttle: if sub.throttle_rate:
assert stream._closed assert ipc._closed
# XXX: do we need to deregister here # XXX: do we need to deregister here
# if it's done in the fee bus code? # if it's done in the fee bus code?
@ -742,7 +751,7 @@ async def sample_and_broadcast(
# since there seems to be some kinda race.. # since there seems to be some kinda race..
bus.remove_subs( bus.remove_subs(
sub_key, sub_key,
{(stream, tick_throttle)}, {sub},
) )
@ -750,7 +759,7 @@ async def uniform_rate_send(
rate: float, rate: float,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
stream: tractor.MsgStream, stream: MsgStream,
task_status: TaskStatus = trio.TASK_STATUS_IGNORED, task_status: TaskStatus = trio.TASK_STATUS_IGNORED,

View File

@ -28,6 +28,7 @@ module.
from __future__ import annotations from __future__ import annotations
from collections import ( from collections import (
defaultdict, defaultdict,
abc,
) )
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
@ -36,7 +37,6 @@ from types import ModuleType
from typing import ( from typing import (
Any, Any,
AsyncContextManager, AsyncContextManager,
Optional,
Awaitable, Awaitable,
Sequence, Sequence,
) )
@ -76,6 +76,31 @@ from ._sampling import (
) )
class Sub(Struct, frozen=True):
'''
A live feed subscription entry.
Contains meta-data on the remote-actor type (in functionality
terms) as well as refs to IPC streams and sampler runtime
params.
'''
ipc: tractor.MsgStream
send_chan: trio.abc.SendChannel | None = None
# tick throttle rate in Hz; determines how live
# quotes/ticks should be downsampled before relay
# to the receiving remote consumer (process).
throttle_rate: float | None = None
_throttle_cs: trio.CancelScope | None = None
# TODO: actually stash comms info for the far end to allow
# `.tsp`, `.fsp` and `.data._sampling` sub-systems to re-render
# the data view as needed via msging with the `._remote_ctl`
# ipc ctx.
rc_ui: bool = False
class _FeedsBus(Struct): class _FeedsBus(Struct):
''' '''
Data feeds broadcaster and persistence management. Data feeds broadcaster and persistence management.
@ -100,13 +125,7 @@ class _FeedsBus(Struct):
_subscribers: defaultdict[ _subscribers: defaultdict[
str, str,
set[ set[Sub]
tuple[
tractor.MsgStream | trio.MemorySendChannel,
# tractor.Context,
float | None, # tick throttle in Hz
]
]
] = defaultdict(set) ] = defaultdict(set)
async def start_task( async def start_task(
@ -140,31 +159,28 @@ class _FeedsBus(Struct):
def get_subs( def get_subs(
self, self,
key: str, key: str,
) -> set[
tuple[ ) -> set[Sub]:
tractor.MsgStream | trio.MemorySendChannel,
float | None, # tick throttle in Hz
]
]:
''' '''
Get the ``set`` of consumer subscription entries for the given key. Get the ``set`` of consumer subscription entries for the given key.
''' '''
return self._subscribers[key] return self._subscribers[key]
def subs_items(self) -> abc.ItemsView[str, set[Sub]]:
return self._subscribers.items()
def add_subs( def add_subs(
self, self,
key: str, key: str,
subs: set[tuple[ subs: set[Sub],
tractor.MsgStream | trio.MemorySendChannel,
float | None, # tick throttle in Hz ) -> set[Sub]:
]],
) -> set[tuple]:
''' '''
Add a ``set`` of consumer subscription entries for the given key. Add a ``set`` of consumer subscription entries for the given key.
''' '''
_subs: set[tuple] = self._subscribers[key] _subs: set[Sub] = self._subscribers.setdefault(key, set())
_subs.update(subs) _subs.update(subs)
return _subs return _subs
@ -441,8 +457,9 @@ async def open_feed_bus(
symbols: list[str], # normally expected to the broker-specific fqme symbols: list[str], # normally expected to the broker-specific fqme
loglevel: str = 'error', loglevel: str = 'error',
tick_throttle: Optional[float] = None, tick_throttle: float | None = None,
start_stream: bool = True, start_stream: bool = True,
allow_remote_ctl_ui: bool = False,
) -> dict[ ) -> dict[
str, # fqme str, # fqme
@ -519,10 +536,10 @@ async def open_feed_bus(
# pack for ``.started()`` sync msg # pack for ``.started()`` sync msg
flumes[fqme] = flume flumes[fqme] = flume
# we use the broker-specific fqme (bs_fqme) for the # we use the broker-specific fqme (bs_fqme) for the sampler
# sampler subscription since the backend isn't (yet) expected to # subscription since the backend isn't (yet) expected to
# append it's own name to the fqme, so we filter on keys which # append it's own name to the fqme, so we filter on keys
# *do not* include that name (e.g .ib) . # which *do not* include that name (e.g .ib) .
bus._subscribers.setdefault(bs_fqme, set()) bus._subscribers.setdefault(bs_fqme, set())
# sync feed subscribers with flume handles # sync feed subscribers with flume handles
@ -561,49 +578,60 @@ async def open_feed_bus(
# that the ``sample_and_broadcast()`` task (spawned inside # that the ``sample_and_broadcast()`` task (spawned inside
# ``allocate_persistent_feed()``) will push real-time quote # ``allocate_persistent_feed()``) will push real-time quote
# (ticks) to this new consumer. # (ticks) to this new consumer.
cs: trio.CancelScope | None = None
send: trio.MemorySendChannel | None = None
if tick_throttle: if tick_throttle:
flume.throttle_rate = tick_throttle flume.throttle_rate = tick_throttle
# open a bg task which receives quotes over a mem chan # open a bg task which receives quotes over a mem
# and only pushes them to the target actor-consumer at # chan and only pushes them to the target
# a max ``tick_throttle`` instantaneous rate. # actor-consumer at a max ``tick_throttle``
# (instantaneous) rate.
send, recv = trio.open_memory_channel(2**10) send, recv = trio.open_memory_channel(2**10)
cs = await bus.start_task( # NOTE: the ``.send`` channel here is a swapped-in
# trio mem chan which gets `.send()`-ed by the normal
# sampler task but instead of being sent directly
# over the IPC msg stream it's the throttle task
# does the work of incrementally forwarding to the
# IPC stream at the throttle rate.
cs: trio.CancelScope = await bus.start_task(
uniform_rate_send, uniform_rate_send,
tick_throttle, tick_throttle,
recv, recv,
stream, stream,
) )
# NOTE: so the ``send`` channel here is actually a swapped
# in trio mem chan which gets pushed by the normal sampler
# task but instead of being sent directly over the IPC msg
# stream it's the throttle task does the work of
# incrementally forwarding to the IPC stream at the throttle
# rate.
send._ctx = ctx # mock internal ``tractor.MsgStream`` ref
sub = (send, tick_throttle)
else: sub = Sub(
sub = (stream, tick_throttle) ipc=stream,
send_chan=send,
throttle_rate=tick_throttle,
_throttle_cs=cs,
rc_ui=allow_remote_ctl_ui,
)
# TODO: add an api for this on the bus? # TODO: add an api for this on the bus?
# maybe use the current task-id to key the sub list that's # maybe use the current task-id to key the sub list that's
# added / removed? Or maybe we can add a general # added / removed? Or maybe we can add a general
# pause-resume by sub-key api? # pause-resume by sub-key api?
bs_fqme = fqme.removesuffix(f'.{brokername}') bs_fqme = fqme.removesuffix(f'.{brokername}')
local_subs.setdefault(bs_fqme, set()).add(sub) local_subs.setdefault(
bus.add_subs(bs_fqme, {sub}) bs_fqme,
set()
).add(sub)
bus.add_subs(
bs_fqme,
{sub}
)
# sync caller with all subs registered state # sync caller with all subs registered state
sub_registered.set() sub_registered.set()
uid = ctx.chan.uid uid: tuple[str, str] = ctx.chan.uid
try: try:
# ctrl protocol for start/stop of quote streams based on UI # ctrl protocol for start/stop of live quote streams
# state (eg. don't need a stream when a symbol isn't being # based on UI state (eg. don't need a stream when
# displayed). # a symbol isn't being displayed).
async for msg in stream: async for msg in stream:
if msg == 'pause': if msg == 'pause':
@ -760,7 +788,7 @@ async def install_brokerd_search(
async def maybe_open_feed( async def maybe_open_feed(
fqmes: list[str], fqmes: list[str],
loglevel: Optional[str] = None, loglevel: str | None = None,
**kwargs, **kwargs,
@ -820,6 +848,8 @@ async def open_feed(
start_stream: bool = True, start_stream: bool = True,
tick_throttle: float | None = None, # Hz tick_throttle: float | None = None, # Hz
allow_remote_ctl_ui: bool = False,
) -> Feed: ) -> Feed:
''' '''
Open a "data feed" which provides streamed real-time quotes. Open a "data feed" which provides streamed real-time quotes.
@ -902,6 +932,12 @@ async def open_feed(
# of these stream open sequences sequentially per # of these stream open sequences sequentially per
# backend? .. need some thot! # backend? .. need some thot!
allow_overruns=True, allow_overruns=True,
# NOTE: UI actors (like charts) can allow
# remote control of certain graphics rendering
# capabilities via the
# `.ui._remote_ctl.remote_annotate()` msg loop.
allow_remote_ctl_ui=allow_remote_ctl_ui,
) )
) )