Don't open stream before starting the fsp context..

fsp_hotfixes
Tyler Goodlet 2021-11-05 10:04:10 -04:00
parent ea9b66d1c3
commit 2b97f98151
1 changed files with 18 additions and 18 deletions

View File

@ -79,7 +79,7 @@ async def filter_quotes_by_sym(
async def fsp_compute( async def fsp_compute(
stream: tractor.MsgStream, ctx: tractor.Context,
symbol: str, symbol: str,
feed: Feed, feed: Feed,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
@ -147,6 +147,7 @@ async def fsp_compute(
# setup a respawn handle # setup a respawn handle
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
tracker = TaskTracker(trio.Event(), cs) tracker = TaskTracker(trio.Event(), cs)
await ctx.started(index)
task_status.started((tracker, index)) task_status.started((tracker, index))
profiler(f'{func_name} yield last index') profiler(f'{func_name} yield last index')
@ -155,23 +156,24 @@ async def fsp_compute(
try: try:
# rt stream # rt stream
async for processed in out_stream: async with ctx.open_stream() as stream:
async for processed in out_stream:
log.debug(f"{func_name}: {processed}") log.debug(f"{func_name}: {processed}")
index = src.index index = src.index
dst.array[-1][func_name] = processed dst.array[-1][func_name] = processed
# NOTE: for now we aren't streaming this to the consumer # NOTE: for now we aren't streaming this to the consumer
# stream latest array index entry which basically just acts # stream latest array index entry which basically just acts
# as trigger msg to tell the consumer to read from shm # as trigger msg to tell the consumer to read from shm
if attach_stream: if attach_stream:
await stream.send(index) await stream.send(index)
# period = time.time() - last # period = time.time() - last
# hz = 1/period if period else float('nan') # hz = 1/period if period else float('nan')
# if hz > 60: # if hz > 60:
# log.info(f'FSP quote too fast: {hz}') # log.info(f'FSP quote too fast: {hz}')
# last = time.time() # last = time.time()
finally: finally:
tracker.complete.set() tracker.complete.set()
@ -229,14 +231,13 @@ async def cascade(
# last_len = new_len = len(src.array) # last_len = new_len = len(src.array)
async with ( async with (
ctx.open_stream() as stream,
trio.open_nursery() as n, trio.open_nursery() as n,
): ):
fsp_target = partial( fsp_target = partial(
fsp_compute, fsp_compute,
stream=stream, ctx=ctx,
symbol=symbol, symbol=symbol,
feed=feed, feed=feed,
quote_stream=quote_stream, quote_stream=quote_stream,
@ -255,7 +256,6 @@ async def cascade(
last = dst.array[-1:] last = dst.array[-1:]
zeroed = np.zeros(last.shape, dtype=last.dtype) zeroed = np.zeros(last.shape, dtype=last.dtype)
await ctx.started(index)
profiler(f'{func_name}: fsp up') profiler(f'{func_name}: fsp up')
async def resync(tracker: TaskTracker) -> tuple[TaskTracker, int]: async def resync(tracker: TaskTracker) -> tuple[TaskTracker, int]: