Handle non-fqsn for derivs and don't put brokername in

m4_corrections
Tyler Goodlet 2022-04-08 11:48:14 -04:00
parent e430756944
commit 550d81ee2c
1 changed files with 16 additions and 7 deletions

View File

@ -375,8 +375,10 @@ async def manage_history(
async def allocate_persistent_feed( async def allocate_persistent_feed(
bus: _FeedsBus, bus: _FeedsBus,
brokername: str, brokername: str,
symbol: str, symbol: str,
loglevel: str, loglevel: str,
start_stream: bool = True, start_stream: bool = True,
@ -396,6 +398,7 @@ async def allocate_persistent_feed(
- a real-time streaming task which connec - a real-time streaming task which connec
''' '''
# load backend module
try: try:
mod = get_brokermod(brokername) mod = get_brokermod(brokername)
except ImportError: except ImportError:
@ -452,7 +455,10 @@ async def allocate_persistent_feed(
# true fqsn # true fqsn
fqsn = '.'.join((bfqsn, brokername)) fqsn = '.'.join((bfqsn, brokername))
# add a fqsn entry that includes the ``.<broker>`` suffix # add a fqsn entry that includes the ``.<broker>`` suffix
# and an entry that includes the broker-specific fqsn (including
# any new suffixes or elements as injected by the backend).
init_msg[fqsn] = msg init_msg[fqsn] = msg
init_msg[bfqsn] = msg
# TODO: pretty sure we don't need this? why not just leave 1s as # TODO: pretty sure we don't need this? why not just leave 1s as
# the fastest "sample period" since we'll probably always want that # the fastest "sample period" since we'll probably always want that
@ -466,13 +472,14 @@ async def allocate_persistent_feed(
await some_data_ready.wait() await some_data_ready.wait()
# append ``.<broker>`` suffix to each quote symbol # append ``.<broker>`` suffix to each quote symbol
bsym = symbol + f'.{brokername}' acceptable_not_fqsn_with_broker_suffix = symbol + f'.{brokername}'
generic_first_quotes = { generic_first_quotes = {
bsym: first_quote, acceptable_not_fqsn_with_broker_suffix: first_quote,
fqsn: first_quote, fqsn: first_quote,
} }
bus.feeds[symbol] = bus.feeds[fqsn] = ( bus.feeds[symbol] = bus.feeds[bfqsn] = (
init_msg, init_msg,
generic_first_quotes, generic_first_quotes,
) )
@ -523,7 +530,7 @@ async def open_feed_bus(
ctx: tractor.Context, ctx: tractor.Context,
brokername: str, brokername: str,
symbol: str, symbol: str, # normally expected to the broker-specific fqsn
loglevel: str, loglevel: str,
tick_throttle: Optional[float] = None, tick_throttle: Optional[float] = None,
start_stream: bool = True, start_stream: bool = True,
@ -545,7 +552,9 @@ async def open_feed_bus(
# TODO: check for any stale shm entries for this symbol # TODO: check for any stale shm entries for this symbol
# (after we also group them in a nice `/dev/shm/piker/` subdir). # (after we also group them in a nice `/dev/shm/piker/` subdir).
# ensure we are who we think we are # ensure we are who we think we are
assert 'brokerd' in tractor.current_actor().name servicename = tractor.current_actor().name
assert 'brokerd' in servicename
assert brokername in servicename
bus = get_feed_bus(brokername) bus = get_feed_bus(brokername)
@ -555,7 +564,7 @@ async def open_feed_bus(
entry = bus.feeds.get(symbol) entry = bus.feeds.get(symbol)
if entry is None: if entry is None:
# allocate a new actor-local stream bus which # allocate a new actor-local stream bus which
# will persist for this `brokerd`. # will persist for this `brokerd`'s service lifetime.
async with bus.task_lock: async with bus.task_lock:
await bus.nursery.start( await bus.nursery.start(
partial( partial(
@ -584,7 +593,7 @@ async def open_feed_bus(
# true fqsn # true fqsn
fqsn = '.'.join([bfqsn, brokername]) fqsn = '.'.join([bfqsn, brokername])
assert fqsn in first_quotes assert fqsn in first_quotes
assert bus.feeds[fqsn] assert bus.feeds[bfqsn]
# broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib) # broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib)
bsym = symbol + f'.{brokername}' bsym = symbol + f'.{brokername}'