diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 793417e9..490ae4b0 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -233,22 +233,26 @@ async def sample_and_broadcast( for (stream, tick_throttle) in subs: - if tick_throttle: - await stream.send(quote) + try: + if tick_throttle: + await stream.send(quote) - else: - try: + else: await stream.send({sym: quote}) - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): - # XXX: do we need to deregister here - # if it's done in the fee bus code? - # so far seems like no since this should all - # be single-threaded. - log.error(f'{stream._ctx.chan.uid} dropped connection') - subs.remove((stream, tick_throttle)) + + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + # XXX: do we need to deregister here + # if it's done in the fee bus code? + # so far seems like no since this should all + # be single-threaded. + log.warning( + f'{stream._ctx.chan.uid} dropped ' + '`brokerd`-quotes-feed connection' + ) + subs.remove((stream, tick_throttle)) async def uniform_rate_send( diff --git a/piker/data/feed.py b/piker/data/feed.py index 477e7bac..ed24a095 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -305,6 +305,11 @@ async def attach_feed_bus( ): if tick_throttle: + + # open a bg task which receives quotes over a mem chan + # and only pushes them to the target actor-consumer at + # a max ``tick_throttle`` instantaneous rate. + send, recv = trio.open_memory_channel(2**10) n.start_soon( uniform_rate_send, @@ -321,7 +326,12 @@ async def attach_feed_bus( try: await trio.sleep_forever() + finally: + log.info( + f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}') + if tick_throttle: + n.cancel_scope.cancel() bus._subscribers[symbol].remove(sub) @@ -473,11 +483,6 @@ async def open_feed( ctx.open_stream() as stream, ): - # TODO: can we make this work better with the proposed - # context based bidirectional streaming style api proposed in: - # https://github.com/goodboy/tractor/issues/53 - # init_msg = await stream.receive() - # we can only read from shm shm = attach_shm_array( token=init_msg[sym]['shm_token'], @@ -520,4 +525,8 @@ async def open_feed( feed._max_sample_rate = max(ohlc_sample_rates) - yield feed + try: + yield feed + finally: + # drop the infinite stream connection + await ctx.cancel()