From 100e27ac1206c93598fd7071a9de048c3c333524 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 5 Apr 2021 07:58:28 -0400 Subject: [PATCH] Task lock bus loading, always close feed stream on disconnect --- piker/data/__init__.py | 238 ++++++++++++++++++++++------------------- 1 file changed, 127 insertions(+), 111 deletions(-) diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 907dd6fe..4629bbc4 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -96,6 +96,7 @@ class _FeedsBus(BaseModel): nursery: trio.Nursery feeds: Dict[str, trio.CancelScope] = {} subscribers: Dict[str, List[tractor.Context]] = {} + task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() class Config: arbitrary_types_allowed = True @@ -115,7 +116,7 @@ def get_feed_bus( ) -> _FeedsBus: """ Retreive broker-daemon-local data feeds bus from process global - scope. + scope. Serialize task access to lock. """ @@ -152,6 +153,7 @@ async def _setup_persistent_brokerd(brokername: str) -> None: # parent actor decides to tear it down await trio.sleep_forever() finally: + # TODO: this needs to be shielded? await bus.cancel_all() @@ -187,7 +189,7 @@ async def allocate_persistent_feed( # if not opened: # raise RuntimeError("Persistent shm for sym was already open?!") - send, quote_stream = trio.open_memory_channel(2**8) + send, quote_stream = trio.open_memory_channel(10) feed_is_live = trio.Event() # establish broker backend quote stream @@ -204,119 +206,120 @@ async def allocate_persistent_feed( ) init_msg[symbol]['shm_token'] = shm.token - cs = trio.CancelScope() + cs = bus.nursery.cancel_scope # TODO: make this into a composed type which also # contains the backfiller cs for individual super-based # resspawns when needed. bus.feeds[symbol] = (cs, init_msg, first_quote) - with cs: - if opened: + if opened: - # start history backfill task ``backfill_bars()`` is - # a required backend func this must block until shm is - # filled with first set of ohlc bars - await bus.nursery.start(mod.backfill_bars, symbol, shm) + # start history backfill task ``backfill_bars()`` is + # a required backend func this must block until shm is + # filled with first set of ohlc bars + await bus.nursery.start(mod.backfill_bars, symbol, shm) - times = shm.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] + times = shm.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] - # pass OHLC sample rate in seconds - init_msg[symbol]['sample_rate'] = delay_s + # pass OHLC sample rate in seconds + init_msg[symbol]['sample_rate'] = delay_s - # yield back control to starting nursery - task_status.started((init_msg, first_quote)) + # yield back control to starting nursery + task_status.started((init_msg, first_quote)) - await feed_is_live.wait() + await feed_is_live.wait() - # # tell incrementer task it can start - # shm_incrementing(shm.token['shm_name']).set() + if opened: + _shms.setdefault(delay_s, []).append(shm) - # start shm incrementingn for OHLC sampling - # subscribe_ohlc_for_increment(shm, delay_s) + # start shm incrementing for OHLC sampling + if _incrementers.get(delay_s) is None: + cs = await bus.nursery.start(increment_ohlc_buffer, delay_s) - if opened: - _shms.setdefault(delay_s, []).append(shm) + sum_tick_vlm: bool = init_msg.get( + 'shm_write_opts', {} + ).get('sum_tick_vlm', True) - if _incrementers.get(delay_s) is None: - cs = await bus.nursery.start(increment_ohlc_buffer, delay_s) + log.info("Started shared mem bar writer") - sum_tick_vlm: bool = init_msg.get( - 'shm_write_opts', {} - ).get('sum_tick_vlm', True) + # iterate stream delivered by broker + async for quotes in quote_stream: + for sym, quote in quotes.items(): - # begin shm write loop and broadcast to subscribers - async with quote_stream: + # TODO: in theory you can send the IPC msg *before* + # writing to the sharedmem array to decrease latency, + # however, that will require `tractor.msg.pub` support + # here or at least some way to prevent task switching + # at the yield such that the array write isn't delayed + # while another consumer is serviced.. - log.info("Started shared mem bar writer") + # start writing the shm buffer with appropriate + # trade data + for tick in quote['ticks']: - # iterate stream delivered by broker - async for quotes in quote_stream: - for sym, quote in quotes.items(): + # if tick['type'] in ('utrade',): + # print(tick) - # TODO: in theory you can send the IPC msg *before* - # writing to the sharedmem array to decrease latency, - # however, that will require `tractor.msg.pub` support - # here or at least some way to prevent task switching - # at the yield such that the array write isn't delayed - # while another consumer is serviced.. + # write trade events to shm last OHLC sample + if tick['type'] in ('trade', 'utrade'): - # start writing the shm buffer with appropriate trade data - for tick in quote['ticks']: + last = tick['price'] - # if tick['type'] in ('utrade',): - # print(tick) + # update last entry + # benchmarked in the 4-5 us range + o, high, low, v = shm.array[-1][ + ['open', 'high', 'low', 'volume'] + ] - # write trade events to shm last OHLC sample - if tick['type'] in ('trade', 'utrade'): + new_v = tick.get('size', 0) - last = tick['price'] + if v == 0 and new_v: + # no trades for this bar yet so the open + # is also the close/last trade price + o = last - # update last entry - # benchmarked in the 4-5 us range - o, high, low, v = shm.array[-1][ - ['open', 'high', 'low', 'volume'] - ] + if sum_tick_vlm: + volume = v + new_v + else: + # presume backend takes care of summing + # it's own vlm + volume = quote['volume'] - new_v = tick.get('size', 0) + shm.array[[ + 'open', + 'high', + 'low', + 'close', + 'bar_wap', # can be optionally provided + 'volume', + ]][-1] = ( + o, + max(high, last), + min(low, last), + last, + quote.get('bar_wap', 0), + volume, + ) - if v == 0 and new_v: - # no trades for this bar yet so the open - # is also the close/last trade price - o = last - - if sum_tick_vlm: - volume = v + new_v - else: - # presume backend takes care of summing - # it's own vlm - volume = quote['volume'] - - shm.array[[ - 'open', - 'high', - 'low', - 'close', - 'bar_wap', # can be optionally provided - 'volume', - ]][-1] = ( - o, - max(high, last), - min(low, last), - last, - quote.get('bar_wap', 0), - volume, - ) - - for ctx in bus.subscribers[sym]: - try: - await ctx.send_yield({sym: quote}) - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): - log.error(f'{ctx.chan.uid} dropped connection') + # XXX: we need to be very cautious here that no + # context-channel is left lingering which doesn't have + # a far end receiver actor-task. In such a case you can + # end up triggering backpressure which which will + # eventually block this producer end of the feed and + # thus other consumers still attached. + subs = bus.subscribers[sym] + for ctx in subs: + # print(f'sub is {ctx.chan.uid}') + try: + await ctx.send_yield({sym: quote}) + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + subs.remove(ctx) + log.error(f'{ctx.chan.uid} dropped connection') @tractor.stream @@ -327,6 +330,7 @@ async def attach_feed_bus( loglevel: str, ): + # try: if loglevel is None: loglevel = tractor.current_actor().loglevel @@ -337,35 +341,42 @@ async def attach_feed_bus( assert 'brokerd' in tractor.current_actor().name bus = get_feed_bus(brokername) - task_cs = bus.feeds.get(symbol) - bus.subscribers.setdefault(symbol, []).append(ctx) - # if no cached feed for this symbol has been created for this - # brokerd yet, start persistent stream and shm writer task in - # service nursery - if task_cs is None: - init_msg, first_quote = await bus.nursery.start( - partial( - allocate_persistent_feed, - ctx=ctx, - bus=bus, - brokername=brokername, - symbol=symbol, - loglevel=loglevel, + async with bus.task_lock: + task_cs = bus.feeds.get(symbol) + sub_only: bool = False + + # if no cached feed for this symbol has been created for this + # brokerd yet, start persistent stream and shm writer task in + # service nursery + if task_cs is None: + init_msg, first_quote = await bus.nursery.start( + partial( + allocate_persistent_feed, + ctx=ctx, + bus=bus, + brokername=brokername, + symbol=symbol, + loglevel=loglevel, + ) ) - ) + bus.subscribers.setdefault(symbol, []).append(ctx) + else: + sub_only = True - # XXX: ``first_quote`` may be outdated here if this is secondary subscriber + # XXX: ``first_quote`` may be outdated here if this is secondary + # subscriber cs, init_msg, first_quote = bus.feeds[symbol] # send this even to subscribers to existing feed? await ctx.send_yield(init_msg) await ctx.send_yield(first_quote) - try: - # just block while the stream pumps - await trio.sleep_forever() + if sub_only: + bus.subscribers[symbol].append(ctx) + try: + await trio.sleep_forever() finally: bus.subscribers[symbol].remove(ctx) @@ -484,11 +495,10 @@ async def open_feed( # https://github.com/goodboy/tractor/issues/53 init_msg = await stream.receive() + # we can only read from shm shm = attach_shm_array( token=init_msg[sym]['shm_token'], - - # we are the buffer writer - readonly=False, + readonly=True, ) feed = Feed( @@ -522,4 +532,10 @@ async def open_feed( feed._max_sample_rate = max(ohlc_sample_rates) - yield feed + try: + yield feed + + finally: + # always cancel the far end producer task + with trio.CancelScope(shield=True): + await stream.aclose()