Feed detach must explicitly unsub throttled streams

If a client attaches to a quotes data feed and requests a throttle rate,
be sure to unsub that side-band memchan + task when it detaches and
especially so on any transport connection error.

Also, use an explicit `tractor.Context.cancel()` on the client feed
block exit since we removed the implicit cancel option from the
`tractor` api.
backup_asyncify_input_modes
Tyler Goodlet 2021-07-05 09:41:35 -04:00
parent 77baad1e92
commit b306d1573b
2 changed files with 33 additions and 20 deletions

View File

@ -233,12 +233,13 @@ async def sample_and_broadcast(
for (stream, tick_throttle) in subs:
try:
if tick_throttle:
await stream.send(quote)
else:
try:
await stream.send({sym: quote})
except (
trio.BrokenResourceError,
trio.ClosedResourceError
@ -247,7 +248,10 @@ async def sample_and_broadcast(
# if it's done in the fee bus code?
# so far seems like no since this should all
# be single-threaded.
log.error(f'{stream._ctx.chan.uid} dropped connection')
log.warning(
f'{stream._ctx.chan.uid} dropped '
'`brokerd`-quotes-feed connection'
)
subs.remove((stream, tick_throttle))

View File

@ -305,6 +305,11 @@ async def attach_feed_bus(
):
if tick_throttle:
# open a bg task which receives quotes over a mem chan
# and only pushes them to the target actor-consumer at
# a max ``tick_throttle`` instantaneous rate.
send, recv = trio.open_memory_channel(2**10)
n.start_soon(
uniform_rate_send,
@ -321,7 +326,12 @@ async def attach_feed_bus(
try:
await trio.sleep_forever()
finally:
log.info(
f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}')
if tick_throttle:
n.cancel_scope.cancel()
bus._subscribers[symbol].remove(sub)
@ -473,11 +483,6 @@ async def open_feed(
ctx.open_stream() as stream,
):
# TODO: can we make this work better with the proposed
# context based bidirectional streaming style api proposed in:
# https://github.com/goodboy/tractor/issues/53
# init_msg = await stream.receive()
# we can only read from shm
shm = attach_shm_array(
token=init_msg[sym]['shm_token'],
@ -520,4 +525,8 @@ async def open_feed(
feed._max_sample_rate = max(ohlc_sample_rates)
try:
yield feed
finally:
# drop the infinite stream connection
await ctx.cancel()