diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 3ddb39c1..7bb0231d 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -33,6 +33,11 @@ from typing import ( ) import tractor +from tractor import ( + Context, + MsgStream, + Channel, +) from tractor.trionics import ( maybe_open_nursery, ) @@ -53,7 +58,10 @@ if TYPE_CHECKING: from ._sharedmem import ( ShmArray, ) - from .feed import _FeedsBus + from .feed import ( + _FeedsBus, + Sub, + ) # highest frequency sample step is 1 second by default, though in @@ -94,7 +102,7 @@ class Sampler: float, list[ float, - set[tractor.MsgStream] + set[MsgStream] ], ] = defaultdict( lambda: [ @@ -258,8 +266,8 @@ class Sampler: f'broadcasting {period_s} -> {last_ts}\n' # f'consumers: {subs}' ) - borked: set[tractor.MsgStream] = set() - sent: set[tractor.MsgStream] = set() + borked: set[MsgStream] = set() + sent: set[MsgStream] = set() while True: try: for stream in (subs - sent): @@ -314,7 +322,7 @@ class Sampler: @tractor.context async def register_with_sampler( - ctx: tractor.Context, + ctx: Context, period_s: float, shms_by_period: dict[float, dict] | None = None, @@ -649,12 +657,7 @@ async def sample_and_broadcast( # eventually block this producer end of the feed and # thus other consumers still attached. sub_key: str = broker_symbol.lower() - subs: list[ - tuple[ - tractor.MsgStream | trio.MemorySendChannel, - float | None, # tick throttle in Hz - ] - ] = bus.get_subs(sub_key) + subs: set[Sub] = bus.get_subs(sub_key) # NOTE: by default the broker backend doesn't append # it's own "name" into the fqme schema (but maybe it @@ -663,34 +666,40 @@ async def sample_and_broadcast( fqme: str = f'{broker_symbol}.{brokername}' lags: int = 0 - # TODO: speed up this loop in an AOT compiled lang (like - # rust or nim or zig) and/or instead of doing a fan out to - # TCP sockets here, we add a shm-style tick queue which - # readers can pull from instead of placing the burden of - # broadcast on solely on this `brokerd` actor. see issues: + # XXX TODO XXX: speed up this loop in an AOT compiled + # lang (like rust or nim or zig)! + # AND/OR instead of doing a fan out to TCP sockets + # here, we add a shm-style tick queue which readers can + # pull from instead of placing the burden of broadcast + # on solely on this `brokerd` actor. see issues: # - https://github.com/pikers/piker/issues/98 # - https://github.com/pikers/piker/issues/107 - for (stream, tick_throttle) in subs.copy(): + # for (stream, tick_throttle) in subs.copy(): + for sub in subs.copy(): + ipc: MsgStream = sub.ipc + throttle: float = sub.throttle_rate try: with trio.move_on_after(0.2) as cs: - if tick_throttle: + if throttle: + send_chan: trio.abc.SendChannel = sub.send_chan + # this is a send mem chan that likely # pushes to the ``uniform_rate_send()`` below. try: - stream.send_nowait( + send_chan.send_nowait( (fqme, quote) ) except trio.WouldBlock: overruns[sub_key] += 1 - ctx = stream._ctx - chan = ctx.chan + ctx: Context = ipc._ctx + chan: Channel = ctx.chan log.warning( f'Feed OVERRUN {sub_key}' '@{bus.brokername} -> \n' f'feed @ {chan.uid}\n' - f'throttle = {tick_throttle} Hz' + f'throttle = {throttle} Hz' ) if overruns[sub_key] > 6: @@ -707,10 +716,10 @@ async def sample_and_broadcast( f'{sub_key}:' f'{ctx.cid}@{chan.uid}' ) - await stream.aclose() + await ipc.aclose() raise trio.BrokenResourceError else: - await stream.send( + await ipc.send( {fqme: quote} ) @@ -724,16 +733,16 @@ async def sample_and_broadcast( trio.ClosedResourceError, trio.EndOfChannel, ): - ctx = stream._ctx - chan = ctx.chan + ctx: Context = ipc._ctx + chan: Channel = ctx.chan if ctx: log.warning( 'Dropped `brokerd`-quotes-feed connection:\n' f'{broker_symbol}:' f'{ctx.cid}@{chan.uid}' ) - if tick_throttle: - assert stream._closed + if sub.throttle_rate: + assert ipc._closed # XXX: do we need to deregister here # if it's done in the fee bus code? @@ -742,7 +751,7 @@ async def sample_and_broadcast( # since there seems to be some kinda race.. bus.remove_subs( sub_key, - {(stream, tick_throttle)}, + {sub}, ) @@ -750,7 +759,7 @@ async def uniform_rate_send( rate: float, quote_stream: trio.abc.ReceiveChannel, - stream: tractor.MsgStream, + stream: MsgStream, task_status: TaskStatus = trio.TASK_STATUS_IGNORED, diff --git a/piker/data/feed.py b/piker/data/feed.py index bd9812c8..7264c8e6 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -28,6 +28,7 @@ module. from __future__ import annotations from collections import ( defaultdict, + abc, ) from contextlib import asynccontextmanager as acm from functools import partial @@ -36,7 +37,6 @@ from types import ModuleType from typing import ( Any, AsyncContextManager, - Optional, Awaitable, Sequence, ) @@ -76,6 +76,31 @@ from ._sampling import ( ) +class Sub(Struct, frozen=True): + ''' + A live feed subscription entry. + + Contains meta-data on the remote-actor type (in functionality + terms) as well as refs to IPC streams and sampler runtime + params. + + ''' + ipc: tractor.MsgStream + send_chan: trio.abc.SendChannel | None = None + + # tick throttle rate in Hz; determines how live + # quotes/ticks should be downsampled before relay + # to the receiving remote consumer (process). + throttle_rate: float | None = None + _throttle_cs: trio.CancelScope | None = None + + # TODO: actually stash comms info for the far end to allow + # `.tsp`, `.fsp` and `.data._sampling` sub-systems to re-render + # the data view as needed via msging with the `._remote_ctl` + # ipc ctx. + rc_ui: bool = False + + class _FeedsBus(Struct): ''' Data feeds broadcaster and persistence management. @@ -100,13 +125,7 @@ class _FeedsBus(Struct): _subscribers: defaultdict[ str, - set[ - tuple[ - tractor.MsgStream | trio.MemorySendChannel, - # tractor.Context, - float | None, # tick throttle in Hz - ] - ] + set[Sub] ] = defaultdict(set) async def start_task( @@ -140,31 +159,28 @@ class _FeedsBus(Struct): def get_subs( self, key: str, - ) -> set[ - tuple[ - tractor.MsgStream | trio.MemorySendChannel, - float | None, # tick throttle in Hz - ] - ]: + + ) -> set[Sub]: ''' Get the ``set`` of consumer subscription entries for the given key. ''' return self._subscribers[key] + def subs_items(self) -> abc.ItemsView[str, set[Sub]]: + return self._subscribers.items() + def add_subs( self, key: str, - subs: set[tuple[ - tractor.MsgStream | trio.MemorySendChannel, - float | None, # tick throttle in Hz - ]], - ) -> set[tuple]: + subs: set[Sub], + + ) -> set[Sub]: ''' Add a ``set`` of consumer subscription entries for the given key. ''' - _subs: set[tuple] = self._subscribers[key] + _subs: set[Sub] = self._subscribers.setdefault(key, set()) _subs.update(subs) return _subs @@ -441,8 +457,9 @@ async def open_feed_bus( symbols: list[str], # normally expected to the broker-specific fqme loglevel: str = 'error', - tick_throttle: Optional[float] = None, + tick_throttle: float | None = None, start_stream: bool = True, + allow_remote_ctl_ui: bool = False, ) -> dict[ str, # fqme @@ -519,10 +536,10 @@ async def open_feed_bus( # pack for ``.started()`` sync msg flumes[fqme] = flume - # we use the broker-specific fqme (bs_fqme) for the - # sampler subscription since the backend isn't (yet) expected to - # append it's own name to the fqme, so we filter on keys which - # *do not* include that name (e.g .ib) . + # we use the broker-specific fqme (bs_fqme) for the sampler + # subscription since the backend isn't (yet) expected to + # append it's own name to the fqme, so we filter on keys + # which *do not* include that name (e.g .ib) . bus._subscribers.setdefault(bs_fqme, set()) # sync feed subscribers with flume handles @@ -561,49 +578,60 @@ async def open_feed_bus( # that the ``sample_and_broadcast()`` task (spawned inside # ``allocate_persistent_feed()``) will push real-time quote # (ticks) to this new consumer. - + cs: trio.CancelScope | None = None + send: trio.MemorySendChannel | None = None if tick_throttle: flume.throttle_rate = tick_throttle - # open a bg task which receives quotes over a mem chan - # and only pushes them to the target actor-consumer at - # a max ``tick_throttle`` instantaneous rate. + # open a bg task which receives quotes over a mem + # chan and only pushes them to the target + # actor-consumer at a max ``tick_throttle`` + # (instantaneous) rate. send, recv = trio.open_memory_channel(2**10) - cs = await bus.start_task( + # NOTE: the ``.send`` channel here is a swapped-in + # trio mem chan which gets `.send()`-ed by the normal + # sampler task but instead of being sent directly + # over the IPC msg stream it's the throttle task + # does the work of incrementally forwarding to the + # IPC stream at the throttle rate. + cs: trio.CancelScope = await bus.start_task( uniform_rate_send, tick_throttle, recv, stream, ) - # NOTE: so the ``send`` channel here is actually a swapped - # in trio mem chan which gets pushed by the normal sampler - # task but instead of being sent directly over the IPC msg - # stream it's the throttle task does the work of - # incrementally forwarding to the IPC stream at the throttle - # rate. - send._ctx = ctx # mock internal ``tractor.MsgStream`` ref - sub = (send, tick_throttle) - else: - sub = (stream, tick_throttle) + sub = Sub( + ipc=stream, + send_chan=send, + throttle_rate=tick_throttle, + _throttle_cs=cs, + rc_ui=allow_remote_ctl_ui, + ) # TODO: add an api for this on the bus? # maybe use the current task-id to key the sub list that's # added / removed? Or maybe we can add a general # pause-resume by sub-key api? bs_fqme = fqme.removesuffix(f'.{brokername}') - local_subs.setdefault(bs_fqme, set()).add(sub) - bus.add_subs(bs_fqme, {sub}) + local_subs.setdefault( + bs_fqme, + set() + ).add(sub) + bus.add_subs( + bs_fqme, + {sub} + ) # sync caller with all subs registered state sub_registered.set() - uid = ctx.chan.uid + uid: tuple[str, str] = ctx.chan.uid try: - # ctrl protocol for start/stop of quote streams based on UI - # state (eg. don't need a stream when a symbol isn't being - # displayed). + # ctrl protocol for start/stop of live quote streams + # based on UI state (eg. don't need a stream when + # a symbol isn't being displayed). async for msg in stream: if msg == 'pause': @@ -760,7 +788,7 @@ async def install_brokerd_search( async def maybe_open_feed( fqmes: list[str], - loglevel: Optional[str] = None, + loglevel: str | None = None, **kwargs, @@ -820,6 +848,8 @@ async def open_feed( start_stream: bool = True, tick_throttle: float | None = None, # Hz + allow_remote_ctl_ui: bool = False, + ) -> Feed: ''' Open a "data feed" which provides streamed real-time quotes. @@ -902,6 +932,12 @@ async def open_feed( # of these stream open sequences sequentially per # backend? .. need some thot! allow_overruns=True, + + # NOTE: UI actors (like charts) can allow + # remote control of certain graphics rendering + # capabilities via the + # `.ui._remote_ctl.remote_annotate()` msg loop. + allow_remote_ctl_ui=allow_remote_ctl_ui, ) )