diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index c6e9299d..dda27ccf 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -79,7 +79,7 @@ async def filter_quotes_by_sym( async def fsp_compute( - stream: tractor.MsgStream, + ctx: tractor.Context, symbol: str, feed: Feed, quote_stream: trio.abc.ReceiveChannel, @@ -147,6 +147,7 @@ async def fsp_compute( # setup a respawn handle with trio.CancelScope() as cs: tracker = TaskTracker(trio.Event(), cs) + await ctx.started(index) task_status.started((tracker, index)) profiler(f'{func_name} yield last index') @@ -155,23 +156,24 @@ async def fsp_compute( try: # rt stream - async for processed in out_stream: + async with ctx.open_stream() as stream: + async for processed in out_stream: - log.debug(f"{func_name}: {processed}") - index = src.index - dst.array[-1][func_name] = processed + log.debug(f"{func_name}: {processed}") + index = src.index + dst.array[-1][func_name] = processed - # NOTE: for now we aren't streaming this to the consumer - # stream latest array index entry which basically just acts - # as trigger msg to tell the consumer to read from shm - if attach_stream: - await stream.send(index) + # NOTE: for now we aren't streaming this to the consumer + # stream latest array index entry which basically just acts + # as trigger msg to tell the consumer to read from shm + if attach_stream: + await stream.send(index) - # period = time.time() - last - # hz = 1/period if period else float('nan') - # if hz > 60: - # log.info(f'FSP quote too fast: {hz}') - # last = time.time() + # period = time.time() - last + # hz = 1/period if period else float('nan') + # if hz > 60: + # log.info(f'FSP quote too fast: {hz}') + # last = time.time() finally: tracker.complete.set() @@ -229,14 +231,13 @@ async def cascade( # last_len = new_len = len(src.array) async with ( - ctx.open_stream() as stream, trio.open_nursery() as n, ): fsp_target = partial( fsp_compute, - stream=stream, + ctx=ctx, symbol=symbol, feed=feed, quote_stream=quote_stream, @@ -255,7 +256,6 @@ async def cascade( last = dst.array[-1:] zeroed = np.zeros(last.shape, dtype=last.dtype) - await ctx.started(index) profiler(f'{func_name}: fsp up') async def resync(tracker: TaskTracker) -> tuple[TaskTracker, int]: