diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index 0b432e5b..2345b516 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -167,24 +167,25 @@ async def cascade( # Increment the underlying shared memory buffer on every # "increment" msg received from the underlying data feed. - async for msg in await feed.index_stream(): + async with feed.index_stream() as stream: + async for msg in stream: - new_len = len(src.array) + new_len = len(src.array) - if new_len > last_len + 1: - # respawn the signal compute task if the source - # signal has been updated - cs.cancel() - cs = await n.start(fsp_compute) + if new_len > last_len + 1: + # respawn the signal compute task if the source + # signal has been updated + cs.cancel() + cs = await n.start(fsp_compute) - # TODO: adopt an incremental update engine/approach - # where possible here eventually! + # TODO: adopt an incremental update engine/approach + # where possible here eventually! - # read out last shm row - array = dst.array - last = array[-1:].copy() + # read out last shm row + array = dst.array + last = array[-1:].copy() - # write new row to the shm buffer - dst.push(last) + # write new row to the shm buffer + dst.push(last) - last_len = new_len + last_len = new_len