Append broker name to symbols before quotes broadcast in sampler task

fqsns
Tyler Goodlet 2022-03-18 15:05:32 -04:00
parent 8462ea8a28
commit 7f36e85815
1 changed files with 15 additions and 5 deletions

View File

@ -181,6 +181,7 @@ async def sample_and_broadcast(
bus: '_FeedsBus', # noqa bus: '_FeedsBus', # noqa
shm: ShmArray, shm: ShmArray,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
brokername: str,
sum_tick_vlm: bool = True, sum_tick_vlm: bool = True,
) -> None: ) -> None:
@ -190,8 +191,7 @@ async def sample_and_broadcast(
# iterate stream delivered by broker # iterate stream delivered by broker
async for quotes in quote_stream: async for quotes in quote_stream:
# TODO: ``numba`` this! # TODO: ``numba`` this!
for sym, quote in quotes.items(): for broker_symbol, quote in quotes.items():
# TODO: in theory you can send the IPC msg *before* writing # TODO: in theory you can send the IPC msg *before* writing
# to the sharedmem array to decrease latency, however, that # to the sharedmem array to decrease latency, however, that
# will require at least some way to prevent task switching # will require at least some way to prevent task switching
@ -255,7 +255,13 @@ async def sample_and_broadcast(
# end up triggering backpressure which which will # end up triggering backpressure which which will
# eventually block this producer end of the feed and # eventually block this producer end of the feed and
# thus other consumers still attached. # thus other consumers still attached.
subs = bus._subscribers[sym.lower()] subs = bus._subscribers[broker_symbol.lower()]
# NOTE: by default the broker backend doesn't append
# it's own "name" into the fqsn schema (but maybe it
# should?) so we have to manually generate the correct
# key here.
bsym = f'{broker_symbol}.{brokername}'
lags = 0 lags = 0
for (stream, tick_throttle) in subs: for (stream, tick_throttle) in subs:
@ -266,7 +272,9 @@ async def sample_and_broadcast(
# this is a send mem chan that likely # this is a send mem chan that likely
# pushes to the ``uniform_rate_send()`` below. # pushes to the ``uniform_rate_send()`` below.
try: try:
stream.send_nowait((sym, quote)) stream.send_nowait(
(bsym, quote)
)
except trio.WouldBlock: except trio.WouldBlock:
ctx = getattr(stream, '_ctx', None) ctx = getattr(stream, '_ctx', None)
if ctx: if ctx:
@ -280,7 +288,9 @@ async def sample_and_broadcast(
f'feed @ {tick_throttle} Hz' f'feed @ {tick_throttle} Hz'
) )
else: else:
await stream.send({sym: quote}) await stream.send(
{bsym: quote}
)
if cs.cancelled_caught: if cs.cancelled_caught:
lags += 1 lags += 1