From 56db2c812d04c706c3c6f8043b6f0d909f36fc5c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 29 Apr 2021 08:38:51 -0400 Subject: [PATCH] Port fsp engine to new tractor stream api --- piker/fsp/__init__.py | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index 0b432e5b..2345b516 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -167,24 +167,25 @@ async def cascade( # Increment the underlying shared memory buffer on every # "increment" msg received from the underlying data feed. - async for msg in await feed.index_stream(): + async with feed.index_stream() as stream: + async for msg in stream: - new_len = len(src.array) + new_len = len(src.array) - if new_len > last_len + 1: - # respawn the signal compute task if the source - # signal has been updated - cs.cancel() - cs = await n.start(fsp_compute) + if new_len > last_len + 1: + # respawn the signal compute task if the source + # signal has been updated + cs.cancel() + cs = await n.start(fsp_compute) - # TODO: adopt an incremental update engine/approach - # where possible here eventually! + # TODO: adopt an incremental update engine/approach + # where possible here eventually! - # read out last shm row - array = dst.array - last = array[-1:].copy() + # read out last shm row + array = dst.array + last = array[-1:].copy() - # write new row to the shm buffer - dst.push(last) + # write new row to the shm buffer + dst.push(last) - last_len = new_len + last_len = new_len