Task lock bus loading, always close feed stream on disconnect

cached_feeds
Tyler Goodlet 2021-04-05 07:58:28 -04:00
parent 8069bbe105
commit 100e27ac12
1 changed files with 127 additions and 111 deletions

View File

@ -96,6 +96,7 @@ class _FeedsBus(BaseModel):
nursery: trio.Nursery nursery: trio.Nursery
feeds: Dict[str, trio.CancelScope] = {} feeds: Dict[str, trio.CancelScope] = {}
subscribers: Dict[str, List[tractor.Context]] = {} subscribers: Dict[str, List[tractor.Context]] = {}
task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
class Config: class Config:
arbitrary_types_allowed = True arbitrary_types_allowed = True
@ -115,7 +116,7 @@ def get_feed_bus(
) -> _FeedsBus: ) -> _FeedsBus:
""" """
Retreive broker-daemon-local data feeds bus from process global Retreive broker-daemon-local data feeds bus from process global
scope. scope. Serialize task access to lock.
""" """
@ -152,6 +153,7 @@ async def _setup_persistent_brokerd(brokername: str) -> None:
# parent actor decides to tear it down # parent actor decides to tear it down
await trio.sleep_forever() await trio.sleep_forever()
finally: finally:
# TODO: this needs to be shielded?
await bus.cancel_all() await bus.cancel_all()
@ -187,7 +189,7 @@ async def allocate_persistent_feed(
# if not opened: # if not opened:
# raise RuntimeError("Persistent shm for sym was already open?!") # raise RuntimeError("Persistent shm for sym was already open?!")
send, quote_stream = trio.open_memory_channel(2**8) send, quote_stream = trio.open_memory_channel(10)
feed_is_live = trio.Event() feed_is_live = trio.Event()
# establish broker backend quote stream # establish broker backend quote stream
@ -204,14 +206,13 @@ async def allocate_persistent_feed(
) )
init_msg[symbol]['shm_token'] = shm.token init_msg[symbol]['shm_token'] = shm.token
cs = trio.CancelScope() cs = bus.nursery.cancel_scope
# TODO: make this into a composed type which also # TODO: make this into a composed type which also
# contains the backfiller cs for individual super-based # contains the backfiller cs for individual super-based
# resspawns when needed. # resspawns when needed.
bus.feeds[symbol] = (cs, init_msg, first_quote) bus.feeds[symbol] = (cs, init_msg, first_quote)
with cs:
if opened: if opened:
# start history backfill task ``backfill_bars()`` is # start history backfill task ``backfill_bars()`` is
@ -230,15 +231,10 @@ async def allocate_persistent_feed(
await feed_is_live.wait() await feed_is_live.wait()
# # tell incrementer task it can start
# shm_incrementing(shm.token['shm_name']).set()
# start shm incrementingn for OHLC sampling
# subscribe_ohlc_for_increment(shm, delay_s)
if opened: if opened:
_shms.setdefault(delay_s, []).append(shm) _shms.setdefault(delay_s, []).append(shm)
# start shm incrementing for OHLC sampling
if _incrementers.get(delay_s) is None: if _incrementers.get(delay_s) is None:
cs = await bus.nursery.start(increment_ohlc_buffer, delay_s) cs = await bus.nursery.start(increment_ohlc_buffer, delay_s)
@ -246,9 +242,6 @@ async def allocate_persistent_feed(
'shm_write_opts', {} 'shm_write_opts', {}
).get('sum_tick_vlm', True) ).get('sum_tick_vlm', True)
# begin shm write loop and broadcast to subscribers
async with quote_stream:
log.info("Started shared mem bar writer") log.info("Started shared mem bar writer")
# iterate stream delivered by broker # iterate stream delivered by broker
@ -262,7 +255,8 @@ async def allocate_persistent_feed(
# at the yield such that the array write isn't delayed # at the yield such that the array write isn't delayed
# while another consumer is serviced.. # while another consumer is serviced..
# start writing the shm buffer with appropriate trade data # start writing the shm buffer with appropriate
# trade data
for tick in quote['ticks']: for tick in quote['ticks']:
# if tick['type'] in ('utrade',): # if tick['type'] in ('utrade',):
@ -309,13 +303,22 @@ async def allocate_persistent_feed(
volume, volume,
) )
for ctx in bus.subscribers[sym]: # XXX: we need to be very cautious here that no
# context-channel is left lingering which doesn't have
# a far end receiver actor-task. In such a case you can
# end up triggering backpressure which which will
# eventually block this producer end of the feed and
# thus other consumers still attached.
subs = bus.subscribers[sym]
for ctx in subs:
# print(f'sub is {ctx.chan.uid}')
try: try:
await ctx.send_yield({sym: quote}) await ctx.send_yield({sym: quote})
except ( except (
trio.BrokenResourceError, trio.BrokenResourceError,
trio.ClosedResourceError trio.ClosedResourceError
): ):
subs.remove(ctx)
log.error(f'{ctx.chan.uid} dropped connection') log.error(f'{ctx.chan.uid} dropped connection')
@ -327,6 +330,7 @@ async def attach_feed_bus(
loglevel: str, loglevel: str,
): ):
# try:
if loglevel is None: if loglevel is None:
loglevel = tractor.current_actor().loglevel loglevel = tractor.current_actor().loglevel
@ -337,8 +341,10 @@ async def attach_feed_bus(
assert 'brokerd' in tractor.current_actor().name assert 'brokerd' in tractor.current_actor().name
bus = get_feed_bus(brokername) bus = get_feed_bus(brokername)
async with bus.task_lock:
task_cs = bus.feeds.get(symbol) task_cs = bus.feeds.get(symbol)
bus.subscribers.setdefault(symbol, []).append(ctx) sub_only: bool = False
# if no cached feed for this symbol has been created for this # if no cached feed for this symbol has been created for this
# brokerd yet, start persistent stream and shm writer task in # brokerd yet, start persistent stream and shm writer task in
@ -354,18 +360,23 @@ async def attach_feed_bus(
loglevel=loglevel, loglevel=loglevel,
) )
) )
bus.subscribers.setdefault(symbol, []).append(ctx)
else:
sub_only = True
# XXX: ``first_quote`` may be outdated here if this is secondary subscriber # XXX: ``first_quote`` may be outdated here if this is secondary
# subscriber
cs, init_msg, first_quote = bus.feeds[symbol] cs, init_msg, first_quote = bus.feeds[symbol]
# send this even to subscribers to existing feed? # send this even to subscribers to existing feed?
await ctx.send_yield(init_msg) await ctx.send_yield(init_msg)
await ctx.send_yield(first_quote) await ctx.send_yield(first_quote)
try: if sub_only:
# just block while the stream pumps bus.subscribers[symbol].append(ctx)
await trio.sleep_forever()
try:
await trio.sleep_forever()
finally: finally:
bus.subscribers[symbol].remove(ctx) bus.subscribers[symbol].remove(ctx)
@ -484,11 +495,10 @@ async def open_feed(
# https://github.com/goodboy/tractor/issues/53 # https://github.com/goodboy/tractor/issues/53
init_msg = await stream.receive() init_msg = await stream.receive()
# we can only read from shm
shm = attach_shm_array( shm = attach_shm_array(
token=init_msg[sym]['shm_token'], token=init_msg[sym]['shm_token'],
readonly=True,
# we are the buffer writer
readonly=False,
) )
feed = Feed( feed = Feed(
@ -522,4 +532,10 @@ async def open_feed(
feed._max_sample_rate = max(ohlc_sample_rates) feed._max_sample_rate = max(ohlc_sample_rates)
try:
yield feed yield feed
finally:
# always cancel the far end producer task
with trio.CancelScope(shield=True):
await stream.aclose()