Convert `iter_ohlc_periods()` to a `@tractor.context`

fqsns
Tyler Goodlet 2022-03-07 07:25:01 -05:00
parent 039d06cc48
commit 97c2a2da3e
1 changed files with 19 additions and 15 deletions

View File

@ -133,18 +133,20 @@ async def increment_ohlc_buffer(
# a given sample period. # a given sample period.
subs = sampler.subscribers.get(delay_s, ()) subs = sampler.subscribers.get(delay_s, ())
for ctx in subs: for stream in subs:
try: try:
await ctx.send_yield({'index': shm._last.value}) await stream.send({'index': shm._last.value})
except ( except (
trio.BrokenResourceError, trio.BrokenResourceError,
trio.ClosedResourceError trio.ClosedResourceError
): ):
log.error(f'{ctx.chan.uid} dropped connection') log.error(
subs.remove(ctx) f'{stream._ctx.chan.uid} dropped connection'
)
subs.remove(stream)
@tractor.stream @tractor.context
async def iter_ohlc_periods( async def iter_ohlc_periods(
ctx: tractor.Context, ctx: tractor.Context,
delay_s: int, delay_s: int,
@ -158,18 +160,20 @@ async def iter_ohlc_periods(
''' '''
# add our subscription # add our subscription
subs = sampler.subscribers.setdefault(delay_s, []) subs = sampler.subscribers.setdefault(delay_s, [])
subs.append(ctx) await ctx.started()
async with ctx.open_stream() as stream:
subs.append(stream)
try:
# stream and block until cancelled
await trio.sleep_forever()
finally:
try: try:
subs.remove(ctx) # stream and block until cancelled
except ValueError: await trio.sleep_forever()
log.error( finally:
f'iOHLC step stream was already dropped for {ctx.chan.uid}?' try:
) subs.remove(stream)
except ValueError:
log.error(
f'iOHLC step stream was already dropped {ctx.chan.uid}?'
)
async def sample_and_broadcast( async def sample_and_broadcast(