Feed detach must explicitly unsub throttled streams
If a client attaches to a quotes data feed and requests a throttle rate, be sure to unsub that side-band memchan + task when it detaches and especially so on any transport connection error. Also, use an explicit `tractor.Context.cancel()` on the client feed block exit since we removed the implicit cancel option from the `tractor` api.minimal_brokerd_trade_dialogues
parent
9f897ba75c
commit
f4a998655b
|
@ -233,22 +233,26 @@ async def sample_and_broadcast(
|
||||||
|
|
||||||
for (stream, tick_throttle) in subs:
|
for (stream, tick_throttle) in subs:
|
||||||
|
|
||||||
if tick_throttle:
|
try:
|
||||||
await stream.send(quote)
|
if tick_throttle:
|
||||||
|
await stream.send(quote)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
try:
|
|
||||||
await stream.send({sym: quote})
|
await stream.send({sym: quote})
|
||||||
except (
|
|
||||||
trio.BrokenResourceError,
|
except (
|
||||||
trio.ClosedResourceError
|
trio.BrokenResourceError,
|
||||||
):
|
trio.ClosedResourceError
|
||||||
# XXX: do we need to deregister here
|
):
|
||||||
# if it's done in the fee bus code?
|
# XXX: do we need to deregister here
|
||||||
# so far seems like no since this should all
|
# if it's done in the fee bus code?
|
||||||
# be single-threaded.
|
# so far seems like no since this should all
|
||||||
log.error(f'{stream._ctx.chan.uid} dropped connection')
|
# be single-threaded.
|
||||||
subs.remove((stream, tick_throttle))
|
log.warning(
|
||||||
|
f'{stream._ctx.chan.uid} dropped '
|
||||||
|
'`brokerd`-quotes-feed connection'
|
||||||
|
)
|
||||||
|
subs.remove((stream, tick_throttle))
|
||||||
|
|
||||||
|
|
||||||
async def uniform_rate_send(
|
async def uniform_rate_send(
|
||||||
|
|
|
@ -305,6 +305,11 @@ async def attach_feed_bus(
|
||||||
):
|
):
|
||||||
|
|
||||||
if tick_throttle:
|
if tick_throttle:
|
||||||
|
|
||||||
|
# open a bg task which receives quotes over a mem chan
|
||||||
|
# and only pushes them to the target actor-consumer at
|
||||||
|
# a max ``tick_throttle`` instantaneous rate.
|
||||||
|
|
||||||
send, recv = trio.open_memory_channel(2**10)
|
send, recv = trio.open_memory_channel(2**10)
|
||||||
n.start_soon(
|
n.start_soon(
|
||||||
uniform_rate_send,
|
uniform_rate_send,
|
||||||
|
@ -321,7 +326,12 @@ async def attach_feed_bus(
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
log.info(
|
||||||
|
f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}')
|
||||||
|
if tick_throttle:
|
||||||
|
n.cancel_scope.cancel()
|
||||||
bus._subscribers[symbol].remove(sub)
|
bus._subscribers[symbol].remove(sub)
|
||||||
|
|
||||||
|
|
||||||
|
@ -473,11 +483,6 @@ async def open_feed(
|
||||||
ctx.open_stream() as stream,
|
ctx.open_stream() as stream,
|
||||||
):
|
):
|
||||||
|
|
||||||
# TODO: can we make this work better with the proposed
|
|
||||||
# context based bidirectional streaming style api proposed in:
|
|
||||||
# https://github.com/goodboy/tractor/issues/53
|
|
||||||
# init_msg = await stream.receive()
|
|
||||||
|
|
||||||
# we can only read from shm
|
# we can only read from shm
|
||||||
shm = attach_shm_array(
|
shm = attach_shm_array(
|
||||||
token=init_msg[sym]['shm_token'],
|
token=init_msg[sym]['shm_token'],
|
||||||
|
@ -520,4 +525,8 @@ async def open_feed(
|
||||||
|
|
||||||
feed._max_sample_rate = max(ohlc_sample_rates)
|
feed._max_sample_rate = max(ohlc_sample_rates)
|
||||||
|
|
||||||
yield feed
|
try:
|
||||||
|
yield feed
|
||||||
|
finally:
|
||||||
|
# drop the infinite stream connection
|
||||||
|
await ctx.cancel()
|
||||||
|
|
Loading…
Reference in New Issue