Handle non-fqsn for derivs and don't put brokername in
parent
15630f465d
commit
9fe5cd647a
|
@ -375,8 +375,10 @@ async def manage_history(
|
|||
|
||||
async def allocate_persistent_feed(
|
||||
bus: _FeedsBus,
|
||||
|
||||
brokername: str,
|
||||
symbol: str,
|
||||
|
||||
loglevel: str,
|
||||
start_stream: bool = True,
|
||||
|
||||
|
@ -396,6 +398,7 @@ async def allocate_persistent_feed(
|
|||
- a real-time streaming task which connec
|
||||
|
||||
'''
|
||||
# load backend module
|
||||
try:
|
||||
mod = get_brokermod(brokername)
|
||||
except ImportError:
|
||||
|
@ -452,7 +455,10 @@ async def allocate_persistent_feed(
|
|||
# true fqsn
|
||||
fqsn = '.'.join((bfqsn, brokername))
|
||||
# add a fqsn entry that includes the ``.<broker>`` suffix
|
||||
# and an entry that includes the broker-specific fqsn (including
|
||||
# any new suffixes or elements as injected by the backend).
|
||||
init_msg[fqsn] = msg
|
||||
init_msg[bfqsn] = msg
|
||||
|
||||
# TODO: pretty sure we don't need this? why not just leave 1s as
|
||||
# the fastest "sample period" since we'll probably always want that
|
||||
|
@ -466,13 +472,14 @@ async def allocate_persistent_feed(
|
|||
await some_data_ready.wait()
|
||||
|
||||
# append ``.<broker>`` suffix to each quote symbol
|
||||
bsym = symbol + f'.{brokername}'
|
||||
acceptable_not_fqsn_with_broker_suffix = symbol + f'.{brokername}'
|
||||
|
||||
generic_first_quotes = {
|
||||
bsym: first_quote,
|
||||
acceptable_not_fqsn_with_broker_suffix: first_quote,
|
||||
fqsn: first_quote,
|
||||
}
|
||||
|
||||
bus.feeds[symbol] = bus.feeds[fqsn] = (
|
||||
bus.feeds[symbol] = bus.feeds[bfqsn] = (
|
||||
init_msg,
|
||||
generic_first_quotes,
|
||||
)
|
||||
|
@ -523,7 +530,7 @@ async def open_feed_bus(
|
|||
|
||||
ctx: tractor.Context,
|
||||
brokername: str,
|
||||
symbol: str,
|
||||
symbol: str, # normally expected to the broker-specific fqsn
|
||||
loglevel: str,
|
||||
tick_throttle: Optional[float] = None,
|
||||
start_stream: bool = True,
|
||||
|
@ -545,7 +552,9 @@ async def open_feed_bus(
|
|||
# TODO: check for any stale shm entries for this symbol
|
||||
# (after we also group them in a nice `/dev/shm/piker/` subdir).
|
||||
# ensure we are who we think we are
|
||||
assert 'brokerd' in tractor.current_actor().name
|
||||
servicename = tractor.current_actor().name
|
||||
assert 'brokerd' in servicename
|
||||
assert brokername in servicename
|
||||
|
||||
bus = get_feed_bus(brokername)
|
||||
|
||||
|
@ -555,7 +564,7 @@ async def open_feed_bus(
|
|||
entry = bus.feeds.get(symbol)
|
||||
if entry is None:
|
||||
# allocate a new actor-local stream bus which
|
||||
# will persist for this `brokerd`.
|
||||
# will persist for this `brokerd`'s service lifetime.
|
||||
async with bus.task_lock:
|
||||
await bus.nursery.start(
|
||||
partial(
|
||||
|
@ -584,7 +593,7 @@ async def open_feed_bus(
|
|||
# true fqsn
|
||||
fqsn = '.'.join([bfqsn, brokername])
|
||||
assert fqsn in first_quotes
|
||||
assert bus.feeds[fqsn]
|
||||
assert bus.feeds[bfqsn]
|
||||
|
||||
# broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib)
|
||||
bsym = symbol + f'.{brokername}'
|
||||
|
|
Loading…
Reference in New Issue