Add a `data.Feed` type

Wraps the growing tuple of items being delivered by `open_feed()`.
Add lazy loading of the broker's signal step stream with
a `Feed.index_stream()` method.
bar_select
Tyler Goodlet 2020-09-22 12:14:24 -04:00
parent 38469bd6ef
commit b1093dc71d
1 changed files with 84 additions and 41 deletions

View File

@ -5,6 +5,7 @@ We provide tsdb integrations for retrieving
and storing data from your brokers as well as and storing data from your brokers as well as
sharing your feeds with other fellow pikers. sharing your feeds with other fellow pikers.
""" """
from dataclasses import dataclass
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from importlib import import_module from importlib import import_module
from types import ModuleType from types import ModuleType
@ -13,23 +14,27 @@ from typing import (
Sequence, AsyncIterator, Optional Sequence, AsyncIterator, Optional
) )
import trio
import tractor import tractor
from ..brokers import get_brokermod from ..brokers import get_brokermod
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ._normalize import iterticks from ._normalize import iterticks
from ._sharedmem import ( from ._sharedmem import (
maybe_open_shared_array, attach_shared_array, open_shared_array, maybe_open_shm_array,
attach_shm_array,
open_shm_array,
SharedArray,
get_shm_token,
) )
from ._buffer import incr_buffer from ._buffer import incr_buffer
__all__ = [ __all__ = [
'maybe_open_shared_array',
'attach_shared_array',
'open_shared_array',
'iterticks', 'iterticks',
'maybe_open_shm_array',
'attach_shm_array',
'open_shm_array',
'get_shm_token',
'incr_buffer', 'incr_buffer',
] ]
@ -99,6 +104,46 @@ async def maybe_spawn_brokerd(
await nursery.cancel() await nursery.cancel()
@dataclass
class Feed:
"""A data feed for client-side interaction with far-process
real-time data sources.
This is an thin abstraction on top of ``tractor``'s portals for
interacting with IPC streams and conducting automatic
memory buffer orchestration.
"""
name: str
stream: AsyncIterator[Dict[str, Any]]
shm: SharedArray
_broker_portal: tractor._portal.Portal
_index_stream: Optional[AsyncIterator[Dict[str, Any]]] = None
async def receive(self) -> dict:
return await self.stream.__anext__()
async def index_stream(self) -> AsyncIterator[int]:
if not self._index_stream:
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
# created for all practical purposes
self._index_stream = await self._broker_portal.run(
'piker.data',
'incr_buffer',
shm_token=self.shm.token,
topics=['index'],
)
return self._index_stream
def sym_to_shm_key(
broker: str,
symbol: str,
) -> str:
return f'{broker}.{symbol}'
@asynccontextmanager @asynccontextmanager
async def open_feed( async def open_feed(
name: str, name: str,
@ -115,10 +160,15 @@ async def open_feed(
if loglevel is None: if loglevel is None:
loglevel = tractor.current_actor().loglevel loglevel = tractor.current_actor().loglevel
with maybe_open_shared_array( # attempt to allocate (or attach to) shm array for this
name=f'{name}.{symbols[0]}.buf', # broker/symbol
readonly=True, # we expect the sub-actor to write shm, opened = maybe_open_shm_array(
) as shmarr: key=sym_to_shm_key(name, symbols[0]),
# we expect the sub-actor to write
readonly=True,
)
async with maybe_spawn_brokerd( async with maybe_spawn_brokerd(
mod.name, mod.name,
loglevel=loglevel, loglevel=loglevel,
@ -127,28 +177,21 @@ async def open_feed(
mod.__name__, mod.__name__,
'stream_quotes', 'stream_quotes',
symbols=symbols, symbols=symbols,
shared_array_token=shmarr.token, shm_token=shm.token,
# compat with eventual ``tractor.msg.pub``
topics=symbols, topics=symbols,
) )
# Feed is required to deliver an initial quote asap. shm_token, is_writer = await stream.receive()
# TODO: should we timeout and raise a more explicit error? shm_token['dtype_descr'] = list(shm_token['dtype_descr'])
# with trio.fail_after(5): assert shm_token == shm.token # sanity
with trio.fail_after(float('inf')):
# Retreive initial quote for each symbol
# such that consumer code can know the data layout
first_quote, child_shmarr_token = await stream.__anext__()
log.info(f"Received first quote {first_quote}")
if child_shmarr_token is not None: if is_writer:
# we are the buffer writer task log.info("Started shared mem bar writer")
increment_stream = await portal.run(
'piker.data', yield Feed(
'incr_buffer', name=name,
shm_token=child_shmarr_token, stream=stream,
shm=shm,
_broker_portal=portal,
) )
assert child_shmarr_token == shmarr.token
else:
increment_stream = None
yield (first_quote, stream, increment_stream, shmarr)