Port fsp engine to new tractor stream api

tractor_open_stream_from
Tyler Goodlet 2021-04-29 08:38:51 -04:00
parent 0d9f091a34
commit 56db2c812d
1 changed files with 16 additions and 15 deletions

View File

@ -167,24 +167,25 @@ async def cascade(
# Increment the underlying shared memory buffer on every # Increment the underlying shared memory buffer on every
# "increment" msg received from the underlying data feed. # "increment" msg received from the underlying data feed.
async for msg in await feed.index_stream(): async with feed.index_stream() as stream:
async for msg in stream:
new_len = len(src.array) new_len = len(src.array)
if new_len > last_len + 1: if new_len > last_len + 1:
# respawn the signal compute task if the source # respawn the signal compute task if the source
# signal has been updated # signal has been updated
cs.cancel() cs.cancel()
cs = await n.start(fsp_compute) cs = await n.start(fsp_compute)
# TODO: adopt an incremental update engine/approach # TODO: adopt an incremental update engine/approach
# where possible here eventually! # where possible here eventually!
# read out last shm row # read out last shm row
array = dst.array array = dst.array
last = array[-1:].copy() last = array[-1:].copy()
# write new row to the shm buffer # write new row to the shm buffer
dst.push(last) dst.push(last)
last_len = new_len last_len = new_len