Handle non-fqsn for derivs and don't put brokername in

marketstore_backup
Tyler Goodlet 2022-04-08 11:48:14 -04:00
parent 0790ae8235
commit c3553de1f2
1 changed files with 16 additions and 7 deletions

View File

@ -377,8 +377,10 @@ async def manage_history(
async def allocate_persistent_feed(
bus: _FeedsBus,
brokername: str,
symbol: str,
loglevel: str,
start_stream: bool = True,
@ -398,6 +400,7 @@ async def allocate_persistent_feed(
- a real-time streaming task which connec
'''
# load backend module
try:
mod = get_brokermod(brokername)
except ImportError:
@ -454,7 +457,10 @@ async def allocate_persistent_feed(
# true fqsn
fqsn = '.'.join((bfqsn, brokername))
# add a fqsn entry that includes the ``.<broker>`` suffix
# and an entry that includes the broker-specific fqsn (including
# any new suffixes or elements as injected by the backend).
init_msg[fqsn] = msg
init_msg[bfqsn] = msg
# TODO: pretty sure we don't need this? why not just leave 1s as
# the fastest "sample period" since we'll probably always want that
@ -468,13 +474,14 @@ async def allocate_persistent_feed(
await some_data_ready.wait()
# append ``.<broker>`` suffix to each quote symbol
bsym = symbol + f'.{brokername}'
acceptable_not_fqsn_with_broker_suffix = symbol + f'.{brokername}'
generic_first_quotes = {
bsym: first_quote,
acceptable_not_fqsn_with_broker_suffix: first_quote,
fqsn: first_quote,
}
bus.feeds[symbol] = bus.feeds[fqsn] = (
bus.feeds[symbol] = bus.feeds[bfqsn] = (
init_msg,
generic_first_quotes,
)
@ -525,7 +532,7 @@ async def open_feed_bus(
ctx: tractor.Context,
brokername: str,
symbol: str,
symbol: str, # normally expected to the broker-specific fqsn
loglevel: str,
tick_throttle: Optional[float] = None,
start_stream: bool = True,
@ -547,7 +554,9 @@ async def open_feed_bus(
# TODO: check for any stale shm entries for this symbol
# (after we also group them in a nice `/dev/shm/piker/` subdir).
# ensure we are who we think we are
assert 'brokerd' in tractor.current_actor().name
servicename = tractor.current_actor().name
assert 'brokerd' in servicename
assert brokername in servicename
bus = get_feed_bus(brokername)
@ -557,7 +566,7 @@ async def open_feed_bus(
entry = bus.feeds.get(symbol)
if entry is None:
# allocate a new actor-local stream bus which
# will persist for this `brokerd`.
# will persist for this `brokerd`'s service lifetime.
async with bus.task_lock:
await bus.nursery.start(
partial(
@ -586,7 +595,7 @@ async def open_feed_bus(
# true fqsn
fqsn = '.'.join([bfqsn, brokername])
assert fqsn in first_quotes
assert bus.feeds[fqsn]
assert bus.feeds[bfqsn]
# broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib)
bsym = symbol + f'.{brokername}'