Port fsp daemon to tractor's context api

fsp_drunken_alignment
Tyler Goodlet 2021-09-24 17:47:43 -04:00
parent 31f4dbef7d
commit 33d1f56440
1 changed files with 87 additions and 59 deletions

View File

@ -36,7 +36,7 @@ from ..data._sharedmem import ShmArray
log = get_logger(__name__) log = get_logger(__name__)
_fsps = { _fsp_builtins = {
'rsi': _rsi, 'rsi': _rsi,
'wma': _wma, 'wma': _wma,
'vwap': _tina_vwap, 'vwap': _tina_vwap,
@ -65,16 +65,39 @@ async def latency(
yield value yield value
async def filter_quotes_by_sym(
sym: str,
quote_stream,
) -> AsyncIterator[dict]:
'''Filter quote stream by target symbol.
'''
# TODO: make this the actualy first quote from feed
# XXX: this allows for a single iteration to run for history
# processing without waiting on the real-time feed for a new quote
yield {}
# task cancellation won't kill the channel
# since we shielded at the `open_feed()` call
async for quotes in quote_stream:
for symbol, quote in quotes.items():
if symbol == sym:
yield quote
async def fsp_compute( async def fsp_compute(
ctx: tractor.Context,
stream: tractor.MsgStream,
symbol: str, symbol: str,
feed: Feed, feed: Feed,
stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
src: ShmArray, src: ShmArray,
dst: ShmArray, dst: ShmArray,
fsp_func_name: str, func_name: str,
func: Callable, func: Callable,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
@ -83,25 +106,13 @@ async def fsp_compute(
# TODO: load appropriate fsp with input args # TODO: load appropriate fsp with input args
async def filter_by_sym(
sym: str,
stream,
):
# TODO: make this the actualy first quote from feed
# XXX: this allows for a single iteration to run for history
# processing without waiting on the real-time feed for a new quote
yield {}
# task cancellation won't kill the channel
# since we shielded at the `open_feed()` call
async for quotes in stream:
for symbol, quotes in quotes.items():
if symbol == sym:
yield quotes
out_stream = func( out_stream = func(
filter_by_sym(symbol, stream),
# TODO: do we even need this if we do the feed api right?
# shouldn't a local stream do this before we get a handle
# to the async iterable? it's that or we do some kinda
# async itertools style?
filter_quotes_by_sym(symbol, quote_stream),
feed.shm, feed.shm,
) )
@ -129,11 +140,11 @@ async def fsp_compute(
np.arange(len(history_output)), np.arange(len(history_output)),
dtype=dst.array.dtype dtype=dst.array.dtype
) )
history[fsp_func_name] = history_output history[func_name] = history_output
# check for data length mis-allignment and fill missing values # check for data length mis-allignment and fill missing values
diff = len(src.array) - len(history) diff = len(src.array) - len(history)
if diff >= 0: if diff > 0:
log.warning(f"WTF DIFF SIGNAL to HISTORY {diff}") log.warning(f"WTF DIFF SIGNAL to HISTORY {diff}")
for _ in range(diff): for _ in range(diff):
dst.push(history[:1]) dst.push(history[:1])
@ -141,36 +152,44 @@ async def fsp_compute(
# compare with source signal and time align # compare with source signal and time align
index = dst.push(history) index = dst.push(history)
await ctx.send_yield(index)
# setup a respawn handle # setup a respawn handle
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
task_status.started(cs) task_status.started((cs, index))
import time
last = time.time()
# rt stream # rt stream
async for processed in out_stream: async for processed in out_stream:
# period = time.time() - last period = time.time() - last
# hz = 1/period if period else float('nan') hz = 1/period if period else float('nan')
# if hz > 60: if hz > 60:
# log.info(f'FSP quote too fast: {hz}') log.info(f'FSP quote too fast: {hz}')
log.debug(f"{fsp_func_name}: {processed}") log.debug(f"{func_name}: {processed}")
index = src.index index = src.index
dst.array[-1][fsp_func_name] = processed dst.array[-1][func_name] = processed
# stream latest shm array index entry # stream latest array index entry which basically just acts
await ctx.send_yield(index) # as trigger msg to tell the consumer to read from shm
await stream.send(index)
last = time.time()
@tractor.stream @tractor.context
async def cascade( async def cascade(
ctx: tractor.Context, ctx: tractor.Context,
brokername: str, brokername: str,
src_shm_token: dict, src_shm_token: dict,
dst_shm_token: Tuple[str, np.dtype], dst_shm_token: Tuple[str, np.dtype],
symbol: str, symbol: str,
fsp_func_name: str, func_name: str,
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
) -> None: ) -> None:
@ -184,39 +203,50 @@ async def cascade(
src = attach_shm_array(token=src_shm_token) src = attach_shm_array(token=src_shm_token)
dst = attach_shm_array(readonly=False, token=dst_shm_token) dst = attach_shm_array(readonly=False, token=dst_shm_token)
func: Callable = _fsps[fsp_func_name] func: Callable = _fsp_builtins.get(func_name)
if not func:
# TODO: assume it's a func target path
raise ValueError('Unknown fsp target: {func_name}')
# open a data feed stream with requested broker # open a data feed stream with requested broker
async with data.feed.maybe_open_feed( async with data.feed.maybe_open_feed(
brokername, brokername,
[symbol], [symbol],
# TODO: # TODO throttle tick outputs from *this* daemon since
# it'll emit tons of ticks due to the throttle only
# limits quote arrival periods, so the consumer of *this*
# needs to get throttled the ticks we generate.
# tick_throttle=60, # tick_throttle=60,
) as (feed, stream): ) as (feed, quote_stream):
assert src.token == feed.shm.token assert src.token == feed.shm.token
last_len = new_len = len(src.array) last_len = new_len = len(src.array)
fsp_target = partial( async with (
fsp_compute, ctx.open_stream() as stream,
ctx=ctx, trio.open_nursery() as n,
symbol=symbol, ):
feed=feed,
stream=stream,
src=src, fsp_target = partial(
dst=dst,
fsp_func_name=fsp_func_name, fsp_compute,
func=func stream=stream,
) symbol=symbol,
feed=feed,
quote_stream=quote_stream,
async with trio.open_nursery() as n: # shm
src=src,
dst=dst,
cs = await n.start(fsp_target) func_name=func_name,
func=func
)
cs, index = await n.start(fsp_target)
await ctx.started(index)
# Increment the underlying shared memory buffer on every # Increment the underlying shared memory buffer on every
# "increment" msg received from the underlying data feed. # "increment" msg received from the underlying data feed.
@ -229,17 +259,15 @@ async def cascade(
if new_len > last_len + 1: if new_len > last_len + 1:
# respawn the signal compute task if the source # respawn the signal compute task if the source
# signal has been updated # signal has been updated
log.warning(f'Re-spawning fsp {func_name}')
cs.cancel() cs.cancel()
cs = await n.start(fsp_target) cs, index = await n.start(fsp_target)
# TODO: adopt an incremental update engine/approach # TODO: adopt an incremental update engine/approach
# where possible here eventually! # where possible here eventually!
# read out last shm row # read out last shm row, copy and write new row
array = dst.array array = dst.array
last = array[-1:].copy() last = array[-1:].copy()
# write new row to the shm buffer
dst.push(last) dst.push(last)
last_len = new_len last_len = new_len