From 550d81ee2c2366faf0779301014a9b6b87473840 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 8 Apr 2022 11:48:14 -0400 Subject: [PATCH] Handle non-fqsn for derivs and don't put brokername in --- piker/data/feed.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 6cbfbebc..e6f4990e 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -375,8 +375,10 @@ async def manage_history( async def allocate_persistent_feed( bus: _FeedsBus, + brokername: str, symbol: str, + loglevel: str, start_stream: bool = True, @@ -396,6 +398,7 @@ async def allocate_persistent_feed( - a real-time streaming task which connec ''' + # load backend module try: mod = get_brokermod(brokername) except ImportError: @@ -452,7 +455,10 @@ async def allocate_persistent_feed( # true fqsn fqsn = '.'.join((bfqsn, brokername)) # add a fqsn entry that includes the ``.`` suffix + # and an entry that includes the broker-specific fqsn (including + # any new suffixes or elements as injected by the backend). init_msg[fqsn] = msg + init_msg[bfqsn] = msg # TODO: pretty sure we don't need this? why not just leave 1s as # the fastest "sample period" since we'll probably always want that @@ -466,13 +472,14 @@ async def allocate_persistent_feed( await some_data_ready.wait() # append ``.`` suffix to each quote symbol - bsym = symbol + f'.{brokername}' + acceptable_not_fqsn_with_broker_suffix = symbol + f'.{brokername}' + generic_first_quotes = { - bsym: first_quote, + acceptable_not_fqsn_with_broker_suffix: first_quote, fqsn: first_quote, } - bus.feeds[symbol] = bus.feeds[fqsn] = ( + bus.feeds[symbol] = bus.feeds[bfqsn] = ( init_msg, generic_first_quotes, ) @@ -523,7 +530,7 @@ async def open_feed_bus( ctx: tractor.Context, brokername: str, - symbol: str, + symbol: str, # normally expected to the broker-specific fqsn loglevel: str, tick_throttle: Optional[float] = None, start_stream: bool = True, @@ -545,7 +552,9 @@ async def open_feed_bus( # TODO: check for any stale shm entries for this symbol # (after we also group them in a nice `/dev/shm/piker/` subdir). # ensure we are who we think we are - assert 'brokerd' in tractor.current_actor().name + servicename = tractor.current_actor().name + assert 'brokerd' in servicename + assert brokername in servicename bus = get_feed_bus(brokername) @@ -555,7 +564,7 @@ async def open_feed_bus( entry = bus.feeds.get(symbol) if entry is None: # allocate a new actor-local stream bus which - # will persist for this `brokerd`. + # will persist for this `brokerd`'s service lifetime. async with bus.task_lock: await bus.nursery.start( partial( @@ -584,7 +593,7 @@ async def open_feed_bus( # true fqsn fqsn = '.'.join([bfqsn, brokername]) assert fqsn in first_quotes - assert bus.feeds[fqsn] + assert bus.feeds[bfqsn] # broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib) bsym = symbol + f'.{brokername}'