Add a `maybe_open_feed()` which uses new broadcast chans

Try out he new broadcast channels from `tractor` for data feeds
we already have cached. Any time there's a cache hit we load the
cached feed and just slap a broadcast receiver on it for the local
consumer task.
pause_feeds_on_sym_switch
Tyler Goodlet 2021-08-10 16:00:14 -04:00
parent 224dbbc4e3
commit a7d3afc9b1
2 changed files with 58 additions and 27 deletions

View File

@ -146,9 +146,8 @@ async def maybe_open_ctx(
key: Hashable, key: Hashable,
mngr: AsyncContextManager[T], mngr: AsyncContextManager[T],
loglevel: str,
) -> T: ) -> (bool, T):
'''Maybe open a context manager if there is not already a cached '''Maybe open a context manager if there is not already a cached
version for the provided ``key``. Return the cached instance on version for the provided ``key``. Return the cached instance on
a cache hit. a cache hit.
@ -161,7 +160,7 @@ async def maybe_open_ctx(
log.info(f'Reusing cached feed for {key}') log.info(f'Reusing cached feed for {key}')
try: try:
cache.users += 1 cache.users += 1
yield feed yield True, feed
finally: finally:
cache.users -= 1 cache.users -= 1
if cache.users == 0: if cache.users == 0:
@ -170,7 +169,7 @@ async def maybe_open_ctx(
try: try:
with get_and_use() as feed: with get_and_use() as feed:
yield feed yield True, feed
except KeyError: except KeyError:
# lock feed acquisition around task racing / ``trio``'s # lock feed acquisition around task racing / ``trio``'s
# scheduler protocol # scheduler protocol
@ -193,7 +192,7 @@ async def maybe_open_ctx(
cache.ctxs[key] = value cache.ctxs[key] = value
cache.lock.release() cache.lock.release()
try: try:
yield value yield True, value
finally: finally:
# don't tear down the feed until there are zero # don't tear down the feed until there are zero
# users of it left. # users of it left.

View File

@ -31,11 +31,14 @@ from typing import (
) )
import trio import trio
from trio.abc import ReceiveChannel
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from tractor import _broadcast
from pydantic import BaseModel from pydantic import BaseModel
from ..brokers import get_brokermod from ..brokers import get_brokermod
from .._cacheables import maybe_open_ctx
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from .._daemon import ( from .._daemon import (
maybe_spawn_brokerd, maybe_spawn_brokerd,
@ -345,10 +348,10 @@ class Feed:
memory buffer orchestration. memory buffer orchestration.
""" """
name: str name: str
stream: AsyncIterator[dict[str, Any]]
shm: ShmArray shm: ShmArray
mod: ModuleType mod: ModuleType
first_quote: dict first_quote: dict
stream: trio.abc.ReceiveChannel[dict[str, Any]]
_brokerd_portal: tractor._portal.Portal _brokerd_portal: tractor._portal.Portal
_index_stream: Optional[AsyncIterator[int]] = None _index_stream: Optional[AsyncIterator[int]] = None
@ -362,7 +365,7 @@ class Feed:
symbols: dict[str, Symbol] = field(default_factory=dict) symbols: dict[str, Symbol] = field(default_factory=dict)
async def receive(self) -> dict: async def receive(self) -> dict:
return await self.stream.__anext__() return await self.stream.receive()
@asynccontextmanager @asynccontextmanager
async def index_stream( async def index_stream(
@ -376,8 +379,10 @@ class Feed:
# a lone broker-daemon per provider should be # a lone broker-daemon per provider should be
# created for all practical purposes # created for all practical purposes
async with self._brokerd_portal.open_stream_from( async with self._brokerd_portal.open_stream_from(
iter_ohlc_periods, iter_ohlc_periods,
delay_s=delay_s or self._max_sample_rate, delay_s=delay_s or self._max_sample_rate,
) as self._index_stream: ) as self._index_stream:
yield self._index_stream yield self._index_stream
@ -395,7 +400,7 @@ def sym_to_shm_key(
@asynccontextmanager @asynccontextmanager
async def install_brokerd_search( async def install_brokerd_search(
portal: tractor._portal.Portal, portal: tractor.Portal,
brokermod: ModuleType, brokermod: ModuleType,
) -> None: ) -> None:
@ -434,34 +439,21 @@ async def open_feed(
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
tick_throttle: Optional[float] = None, # Hz tick_throttle: Optional[float] = None, # Hz
shielded_stream: bool = False,
) -> AsyncIterator[dict[str, Any]]: ) -> ReceiveChannel[dict[str, Any]]:
''' '''
Open a "data feed" which provides streamed real-time quotes. Open a "data feed" which provides streamed real-time quotes.
''' '''
sym = symbols[0].lower() sym = symbols[0].lower()
# TODO: feed cache locking, right now this is causing
# issues when reconnecting to a long running emsd?
# global _searcher_cache
# async with _cache_lock:
# feed = _searcher_cache.get((brokername, sym))
# # if feed is not None and sym in feed.symbols:
# if feed is not None:
# yield feed
# # short circuit
# return
try: try:
mod = get_brokermod(brokername) mod = get_brokermod(brokername)
except ImportError: except ImportError:
mod = get_ingestormod(brokername) mod = get_ingestormod(brokername)
# no feed for broker exists so maybe spawn a data brokerd # no feed for broker exists so maybe spawn a data brokerd
async with ( async with (
maybe_spawn_brokerd( maybe_spawn_brokerd(
@ -480,21 +472,25 @@ async def open_feed(
) as (ctx, (init_msg, first_quote)), ) as (ctx, (init_msg, first_quote)),
ctx.open_stream() as stream, ctx.open_stream(shield=shielded_stream) as stream,
):
):
# we can only read from shm # we can only read from shm
shm = attach_shm_array( shm = attach_shm_array(
token=init_msg[sym]['shm_token'], token=init_msg[sym]['shm_token'],
readonly=True, readonly=True,
) )
bstream = _broadcast.broadcast_receiver(
stream,
2**10,
)
feed = Feed( feed = Feed(
name=brokername, name=brokername,
stream=stream,
shm=shm, shm=shm,
mod=mod, mod=mod,
first_quote=first_quote, first_quote=first_quote,
stream=bstream, #brx_stream,
_brokerd_portal=portal, _brokerd_portal=portal,
) )
ohlc_sample_rates = [] ohlc_sample_rates = []
@ -526,7 +522,43 @@ async def open_feed(
feed._max_sample_rate = max(ohlc_sample_rates) feed._max_sample_rate = max(ohlc_sample_rates)
try: try:
yield feed yield feed, bstream
finally: finally:
# drop the infinite stream connection # drop the infinite stream connection
await ctx.cancel() await ctx.cancel()
@asynccontextmanager
async def maybe_open_feed(
brokername: str,
symbols: Sequence[str],
loglevel: Optional[str] = None,
tick_throttle: Optional[float] = None, # Hz
shielded_stream: bool = False,
) -> ReceiveChannel[dict[str, Any]]:
'''Maybe open a data to a ``brokerd`` daemon only if there is no
local one for the broker-symbol pair, if one is cached use it wrapped
in a tractor broadcast receiver.
'''
sym = symbols[0].lower()
async with maybe_open_ctx(
key=(brokername, sym),
mngr=open_feed(
brokername,
[sym],
loglevel=loglevel,
),
) as (cache_hit, (feed, stream)):
if cache_hit:
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use
async with stream.subscribe() as bstream:
yield feed, bstream
else:
yield feed, stream