Append broker name to symbols before quotes broadcast in sampler task
parent
d03cd23571
commit
c0d1facf3b
|
@ -184,6 +184,7 @@ async def sample_and_broadcast(
|
||||||
bus: '_FeedsBus', # noqa
|
bus: '_FeedsBus', # noqa
|
||||||
shm: ShmArray,
|
shm: ShmArray,
|
||||||
quote_stream: trio.abc.ReceiveChannel,
|
quote_stream: trio.abc.ReceiveChannel,
|
||||||
|
brokername: str,
|
||||||
sum_tick_vlm: bool = True,
|
sum_tick_vlm: bool = True,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -193,8 +194,7 @@ async def sample_and_broadcast(
|
||||||
# iterate stream delivered by broker
|
# iterate stream delivered by broker
|
||||||
async for quotes in quote_stream:
|
async for quotes in quote_stream:
|
||||||
# TODO: ``numba`` this!
|
# TODO: ``numba`` this!
|
||||||
for sym, quote in quotes.items():
|
for broker_symbol, quote in quotes.items():
|
||||||
|
|
||||||
# TODO: in theory you can send the IPC msg *before* writing
|
# TODO: in theory you can send the IPC msg *before* writing
|
||||||
# to the sharedmem array to decrease latency, however, that
|
# to the sharedmem array to decrease latency, however, that
|
||||||
# will require at least some way to prevent task switching
|
# will require at least some way to prevent task switching
|
||||||
|
@ -258,7 +258,13 @@ async def sample_and_broadcast(
|
||||||
# end up triggering backpressure which which will
|
# end up triggering backpressure which which will
|
||||||
# eventually block this producer end of the feed and
|
# eventually block this producer end of the feed and
|
||||||
# thus other consumers still attached.
|
# thus other consumers still attached.
|
||||||
subs = bus._subscribers[sym.lower()]
|
subs = bus._subscribers[broker_symbol.lower()]
|
||||||
|
|
||||||
|
# NOTE: by default the broker backend doesn't append
|
||||||
|
# it's own "name" into the fqsn schema (but maybe it
|
||||||
|
# should?) so we have to manually generate the correct
|
||||||
|
# key here.
|
||||||
|
bsym = f'{broker_symbol}.{brokername}'
|
||||||
|
|
||||||
lags = 0
|
lags = 0
|
||||||
for (stream, tick_throttle) in subs:
|
for (stream, tick_throttle) in subs:
|
||||||
|
@ -269,7 +275,9 @@ async def sample_and_broadcast(
|
||||||
# this is a send mem chan that likely
|
# this is a send mem chan that likely
|
||||||
# pushes to the ``uniform_rate_send()`` below.
|
# pushes to the ``uniform_rate_send()`` below.
|
||||||
try:
|
try:
|
||||||
stream.send_nowait((sym, quote))
|
stream.send_nowait(
|
||||||
|
(bsym, quote)
|
||||||
|
)
|
||||||
except trio.WouldBlock:
|
except trio.WouldBlock:
|
||||||
ctx = getattr(stream, '_ctx', None)
|
ctx = getattr(stream, '_ctx', None)
|
||||||
if ctx:
|
if ctx:
|
||||||
|
@ -283,7 +291,9 @@ async def sample_and_broadcast(
|
||||||
f'feed @ {tick_throttle} Hz'
|
f'feed @ {tick_throttle} Hz'
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
await stream.send({sym: quote})
|
await stream.send(
|
||||||
|
{bsym: quote}
|
||||||
|
)
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
if cs.cancelled_caught:
|
||||||
lags += 1
|
lags += 1
|
||||||
|
|
Loading…
Reference in New Issue