diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 3ef85c96..5459b4ba 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -5,6 +5,7 @@ We provide tsdb integrations for retrieving and storing data from your brokers as well as sharing your feeds with other fellow pikers. """ +from dataclasses import dataclass from contextlib import asynccontextmanager from importlib import import_module from types import ModuleType @@ -13,24 +14,28 @@ from typing import ( Sequence, AsyncIterator, Optional ) -import trio import tractor from ..brokers import get_brokermod from ..log import get_logger, get_console_log from ._normalize import iterticks from ._sharedmem import ( - maybe_open_shared_array, attach_shared_array, open_shared_array, + maybe_open_shm_array, + attach_shm_array, + open_shm_array, + SharedArray, + get_shm_token, ) from ._buffer import incr_buffer __all__ = [ - 'maybe_open_shared_array', - 'attach_shared_array', - 'open_shared_array', - 'iterticks', - 'incr_buffer', + 'iterticks', + 'maybe_open_shm_array', + 'attach_shm_array', + 'open_shm_array', + 'get_shm_token', + 'incr_buffer', ] @@ -99,6 +104,46 @@ async def maybe_spawn_brokerd( await nursery.cancel() +@dataclass +class Feed: + """A data feed for client-side interaction with far-process + real-time data sources. + + This is an thin abstraction on top of ``tractor``'s portals for + interacting with IPC streams and conducting automatic + memory buffer orchestration. + """ + name: str + stream: AsyncIterator[Dict[str, Any]] + shm: SharedArray + _broker_portal: tractor._portal.Portal + _index_stream: Optional[AsyncIterator[Dict[str, Any]]] = None + + async def receive(self) -> dict: + return await self.stream.__anext__() + + async def index_stream(self) -> AsyncIterator[int]: + if not self._index_stream: + # XXX: this should be singleton on a host, + # a lone broker-daemon per provider should be + # created for all practical purposes + self._index_stream = await self._broker_portal.run( + 'piker.data', + 'incr_buffer', + shm_token=self.shm.token, + topics=['index'], + ) + + return self._index_stream + + +def sym_to_shm_key( + broker: str, + symbol: str, +) -> str: + return f'{broker}.{symbol}' + + @asynccontextmanager async def open_feed( name: str, @@ -115,40 +160,38 @@ async def open_feed( if loglevel is None: loglevel = tractor.current_actor().loglevel - with maybe_open_shared_array( - name=f'{name}.{symbols[0]}.buf', - readonly=True, # we expect the sub-actor to write - ) as shmarr: - async with maybe_spawn_brokerd( - mod.name, - loglevel=loglevel, - ) as portal: - stream = await portal.run( - mod.__name__, - 'stream_quotes', - symbols=symbols, - shared_array_token=shmarr.token, - topics=symbols, - ) - # Feed is required to deliver an initial quote asap. - # TODO: should we timeout and raise a more explicit error? - # with trio.fail_after(5): - with trio.fail_after(float('inf')): - # Retreive initial quote for each symbol - # such that consumer code can know the data layout - first_quote, child_shmarr_token = await stream.__anext__() - log.info(f"Received first quote {first_quote}") + # attempt to allocate (or attach to) shm array for this + # broker/symbol + shm, opened = maybe_open_shm_array( + key=sym_to_shm_key(name, symbols[0]), - if child_shmarr_token is not None: - # we are the buffer writer task - increment_stream = await portal.run( - 'piker.data', - 'incr_buffer', - shm_token=child_shmarr_token, - ) + # we expect the sub-actor to write + readonly=True, + ) - assert child_shmarr_token == shmarr.token - else: - increment_stream = None + async with maybe_spawn_brokerd( + mod.name, + loglevel=loglevel, + ) as portal: + stream = await portal.run( + mod.__name__, + 'stream_quotes', + symbols=symbols, + shm_token=shm.token, - yield (first_quote, stream, increment_stream, shmarr) + # compat with eventual ``tractor.msg.pub`` + topics=symbols, + ) + shm_token, is_writer = await stream.receive() + shm_token['dtype_descr'] = list(shm_token['dtype_descr']) + assert shm_token == shm.token # sanity + + if is_writer: + log.info("Started shared mem bar writer") + + yield Feed( + name=name, + stream=stream, + shm=shm, + _broker_portal=portal, + )