Breakout fsp rt loop as non-closure for readability

syseng_tweaks
Tyler Goodlet 2021-05-24 08:47:30 -04:00
parent f6f4a0cd8d
commit efd93d058a
1 changed files with 109 additions and 75 deletions

View File

@ -17,6 +17,7 @@
"""
Financial signal processing for the peeps.
"""
from functools import partial
from typing import AsyncIterator, Callable, Tuple
import trio
@ -29,6 +30,8 @@ from .. import data
from ._momo import _rsi, _wma
from ._volume import _tina_vwap
from ..data import attach_shm_array
from ..data.feed import Feed
from ..data._sharedmem import ShmArray
log = get_logger(__name__)
@ -62,6 +65,97 @@ async def latency(
yield value
async def fsp_compute(
ctx: tractor.Context,
symbol: str,
feed: Feed,
src: ShmArray,
dst: ShmArray,
fsp_func_name: str,
func: Callable,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
# TODO: load appropriate fsp with input args
async def filter_by_sym(
sym: str,
stream,
):
# TODO: make this the actualy first quote from feed
# XXX: this allows for a single iteration to run for history
# processing without waiting on the real-time feed for a new quote
yield {}
# task cancellation won't kill the channel
with stream.shield():
async for quotes in stream:
for symbol, quotes in quotes.items():
if symbol == sym:
yield quotes
out_stream = func(
filter_by_sym(symbol, feed.stream),
feed.shm,
)
# TODO: XXX:
# THERE'S A BIG BUG HERE WITH THE `index` field since we're
# prepending a copy of the first value a few times to make
# sub-curves align with the parent bar chart.
# This likely needs to be fixed either by,
# - manually assigning the index and historical data
# seperately to the shm array (i.e. not using .push())
# - developing some system on top of the shared mem array that
# is `index` aware such that historical data can be indexed
# relative to the true first datum? Not sure if this is sane
# for incremental compuations.
dst._first.value = src._first.value
dst._last.value = src._first.value
# Conduct a single iteration of fsp with historical bars input
# and get historical output
history_output = await out_stream.__anext__()
# build a struct array which includes an 'index' field to push
# as history
history = np.array(
np.arange(len(history_output)),
dtype=dst.array.dtype
)
history[fsp_func_name] = history_output
# check for data length mis-allignment and fill missing values
diff = len(src.array) - len(history)
if diff >= 0:
print(f"WTF DIFF SIGNAL to HISTORY {diff}")
for _ in range(diff):
dst.push(history[:1])
# compare with source signal and time align
index = dst.push(history)
await ctx.send_yield(index)
# setup a respawn handle
with trio.CancelScope() as cs:
task_status.started(cs)
# rt stream
async for processed in out_stream:
log.debug(f"{fsp_func_name}: {processed}")
index = src.index
dst.array[-1][fsp_func_name] = processed
# stream latest shm array index entry
await ctx.send_yield(index)
@tractor.stream
async def cascade(
ctx: tractor.Context,
@ -85,84 +179,24 @@ async def cascade(
assert src.token == feed.shm.token
async def fsp_compute(
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
# TODO: load appropriate fsp with input args
async def filter_by_sym(
sym: str,
stream,
):
# task cancellation won't kill the channel
with stream.shield():
async for quotes in stream:
for symbol, quotes in quotes.items():
if symbol == sym:
yield quotes
out_stream = func(
filter_by_sym(symbol, feed.stream),
feed.shm,
)
# TODO: XXX:
# THERE'S A BIG BUG HERE WITH THE `index` field since we're
# prepending a copy of the first value a few times to make
# sub-curves align with the parent bar chart.
# This likely needs to be fixed either by,
# - manually assigning the index and historical data
# seperately to the shm array (i.e. not using .push())
# - developing some system on top of the shared mem array that
# is `index` aware such that historical data can be indexed
# relative to the true first datum? Not sure if this is sane
# for incremental compuations.
dst._first.value = src._first.value
dst._last.value = src._first.value
# Conduct a single iteration of fsp with historical bars input
# and get historical output
history_output = await out_stream.__anext__()
# build a struct array which includes an 'index' field to push
# as history
history = np.array(
np.arange(len(history_output)),
dtype=dst.array.dtype
)
history[fsp_func_name] = history_output
# check for data length mis-allignment and fill missing values
diff = len(src.array) - len(history)
if diff >= 0:
print(f"WTF DIFF SIGNAL to HISTORY {diff}")
for _ in range(diff):
dst.push(history[:1])
# compare with source signal and time align
index = dst.push(history)
await ctx.send_yield(index)
# setup a respawn handle
with trio.CancelScope() as cs:
task_status.started(cs)
# rt stream
async for processed in out_stream:
log.debug(f"{fsp_func_name}: {processed}")
index = src.index
dst.array[-1][fsp_func_name] = processed
# stream latest shm array index entry
await ctx.send_yield(index)
last_len = new_len = len(src.array)
fsp_target = partial(
fsp_compute,
ctx=ctx,
symbol=symbol,
feed=feed,
src=src,
dst=dst,
fsp_func_name=fsp_func_name,
func=func
)
async with trio.open_nursery() as n:
cs = await n.start(fsp_compute)
cs = await n.start(fsp_target)
# Increment the underlying shared memory buffer on every
# "increment" msg received from the underlying data feed.
@ -176,7 +210,7 @@ async def cascade(
# respawn the signal compute task if the source
# signal has been updated
cs.cancel()
cs = await n.start(fsp_compute)
cs = await n.start(fsp_target)
# TODO: adopt an incremental update engine/approach
# where possible here eventually!