Merge pull request #236 from pikers/fsp_drunken_alignment

Fsp drunken alignment
teardown_guesmost_via_cs
goodboy 2021-11-03 08:48:45 -04:00 committed by GitHub
commit 186d221dda
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 452 additions and 235 deletions

View File

@ -246,7 +246,7 @@ async def sample_and_broadcast(
if tick_throttle:
# this is a send mem chan that likely
# pushes to the ``uniform_rate_send()`` below.
await stream.send(quote)
await stream.send((sym, quote))
else:
await stream.send({sym: quote})
@ -285,10 +285,14 @@ async def uniform_rate_send(
sleep_period = 1/rate - 0.000616
last_send = time.time()
aname = stream._ctx.chan.uid[0]
fsp = False
if 'fsp' in aname:
fsp = True
while True:
first_quote = await quote_stream.receive()
sym, first_quote = await quote_stream.receive()
start = time.time()
# append quotes since last iteration into the last quote's
@ -301,7 +305,7 @@ async def uniform_rate_send(
#
while True:
try:
next_quote = quote_stream.receive_nowait()
sym, next_quote = quote_stream.receive_nowait()
ticks = next_quote.get('ticks')
if ticks:
@ -312,12 +316,12 @@ async def uniform_rate_send(
rate = 1 / (now - last_send)
last_send = now
# print(f'{rate} Hz sending quotes') # \n{first_quote}')
# log.info(f'{rate} Hz sending quotes') # \n{first_quote}')
# TODO: now if only we could sync this to the display
# rate timing exactly lul
try:
await stream.send({first_quote['symbol']: first_quote})
await stream.send({sym: first_quote})
break
except trio.ClosedResourceError:
# if the feed consumer goes down then drop

View File

@ -31,7 +31,7 @@ import tractor
import numpy as np
from ..log import get_logger
from ._source import base_ohlc_dtype, base_iohlc_dtype
from ._source import base_iohlc_dtype
log = get_logger(__name__)
@ -168,6 +168,7 @@ class ShmArray:
self._len = len(shmarr)
self._shm = shm
self._post_init: bool = False
# pushing data does not write the index (aka primary key)
self._write_fields = list(shmarr.dtype.fields.keys())[1:]
@ -196,7 +197,24 @@ class ShmArray:
@property
def array(self) -> np.ndarray:
return self._array[self._first.value:self._last.value]
'''Return an up-to-date ``np.ndarray`` view of the
so-far-written data to the underlying shm buffer.
'''
a = self._array[self._first.value:self._last.value]
# first, last = self._first.value, self._last.value
# a = self._array[first:last]
# TODO: eventually comment this once we've not seen it in the
# wild in a long time..
# XXX: race where first/last indexes cause a reader
# to load an empty array..
if len(a) == 0 and self._post_init:
raise RuntimeError('Empty array race condition hit!?')
# breakpoint()
return a
def last(
self,
@ -209,6 +227,7 @@ class ShmArray:
data: np.ndarray,
prepend: bool = False,
start: Optional[int] = None,
) -> int:
'''Ring buffer like "push" to append data
@ -217,12 +236,18 @@ class ShmArray:
NB: no actual ring logic yet to give a "loop around" on overflow
condition, lel.
'''
self._post_init = True
length = len(data)
index = start or self._last.value
if prepend:
index = self._first.value - length
else:
index = self._last.value
if index < 0:
raise ValueError(
f'Array size of {self._len} was overrun during prepend.\n'
'You have passed {abs(index)} too many datums.'
)
end = index + length
@ -230,11 +255,22 @@ class ShmArray:
try:
self._array[fields][index:end] = data[fields][:]
# NOTE: there was a race here between updating
# the first and last indices and when the next reader
# tries to access ``.array`` (which due to the index
# overlap will be empty). Pretty sure we've fixed it now
# but leaving this here as a reminder.
if prepend:
assert index < self._first.value
if index < self._first.value:
self._first.value = index
else:
self._last.value = end
return end
except ValueError as err:
# shoudl raise if diff detected
self.diff_err_fields(data)
@ -290,20 +326,25 @@ class ShmArray:
# how much is probably dependent on lifestyle
_secs_in_day = int(60 * 60 * 12)
_default_size = 2 * _secs_in_day
_secs_in_day = int(60 * 60 * 24)
# we try for 3 times but only on a run-every-other-day kinda week.
_default_size = 3 * _secs_in_day
def open_shm_array(
key: Optional[str] = None,
size: int = _default_size,
dtype: Optional[np.dtype] = None,
readonly: bool = False,
) -> ShmArray:
"""Open a memory shared ``numpy`` using the standard library.
'''Open a memory shared ``numpy`` using the standard library.
This call unlinks (aka permanently destroys) the buffer on teardown
and thus should be used from the parent-most accessor (process).
"""
'''
# create new shared mem segment for which we
# have write permission
a = np.zeros(size, dtype=dtype)

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
# Copyright (C) Tyler Goodlet (in stewardship of piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -14,33 +14,17 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Financial signal processing for the peeps.
"""
from functools import partial
from typing import AsyncIterator, Callable, Tuple, Optional
'''
Fin-sig-proc for the peeps!
'''
from typing import AsyncIterator
import trio
from trio_typing import TaskStatus
import tractor
import numpy as np
from ..log import get_logger, get_console_log
from .. import data
from ._momo import _rsi, _wma
from ._volume import _tina_vwap
from ..data import attach_shm_array
from ..data.feed import Feed
from ..data._sharedmem import ShmArray
from ._engine import cascade
log = get_logger(__name__)
_fsps = {
'rsi': _rsi,
'wma': _wma,
'vwap': _tina_vwap,
}
__all__ = ['cascade']
async def latency(
@ -63,183 +47,3 @@ async def latency(
# stack tracing.
value = quote['brokerd_ts'] - quote['broker_ts']
yield value
async def fsp_compute(
ctx: tractor.Context,
symbol: str,
feed: Feed,
stream: trio.abc.ReceiveChannel,
src: ShmArray,
dst: ShmArray,
fsp_func_name: str,
func: Callable,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
# TODO: load appropriate fsp with input args
async def filter_by_sym(
sym: str,
stream,
):
# TODO: make this the actualy first quote from feed
# XXX: this allows for a single iteration to run for history
# processing without waiting on the real-time feed for a new quote
yield {}
# task cancellation won't kill the channel
# since we shielded at the `open_feed()` call
async for quotes in stream:
for symbol, quotes in quotes.items():
if symbol == sym:
yield quotes
out_stream = func(
filter_by_sym(symbol, stream),
feed.shm,
)
# TODO: XXX:
# THERE'S A BIG BUG HERE WITH THE `index` field since we're
# prepending a copy of the first value a few times to make
# sub-curves align with the parent bar chart.
# This likely needs to be fixed either by,
# - manually assigning the index and historical data
# seperately to the shm array (i.e. not using .push())
# - developing some system on top of the shared mem array that
# is `index` aware such that historical data can be indexed
# relative to the true first datum? Not sure if this is sane
# for incremental compuations.
dst._first.value = src._first.value
dst._last.value = src._first.value
# Conduct a single iteration of fsp with historical bars input
# and get historical output
history_output = await out_stream.__anext__()
# build a struct array which includes an 'index' field to push
# as history
history = np.array(
np.arange(len(history_output)),
dtype=dst.array.dtype
)
history[fsp_func_name] = history_output
# check for data length mis-allignment and fill missing values
diff = len(src.array) - len(history)
if diff >= 0:
log.warning(f"WTF DIFF SIGNAL to HISTORY {diff}")
for _ in range(diff):
dst.push(history[:1])
# compare with source signal and time align
index = dst.push(history)
await ctx.send_yield(index)
# setup a respawn handle
with trio.CancelScope() as cs:
task_status.started(cs)
# rt stream
async for processed in out_stream:
# period = time.time() - last
# hz = 1/period if period else float('nan')
# if hz > 60:
# log.info(f'FSP quote too fast: {hz}')
log.debug(f"{fsp_func_name}: {processed}")
index = src.index
dst.array[-1][fsp_func_name] = processed
# stream latest shm array index entry
await ctx.send_yield(index)
@tractor.stream
async def cascade(
ctx: tractor.Context,
brokername: str,
src_shm_token: dict,
dst_shm_token: Tuple[str, np.dtype],
symbol: str,
fsp_func_name: str,
loglevel: Optional[str] = None,
) -> None:
'''Chain streaming signal processors and deliver output to
destination mem buf.
'''
if loglevel:
get_console_log(loglevel)
src = attach_shm_array(token=src_shm_token)
dst = attach_shm_array(readonly=False, token=dst_shm_token)
func: Callable = _fsps[fsp_func_name]
# open a data feed stream with requested broker
async with data.feed.maybe_open_feed(
brokername,
[symbol],
# TODO:
# tick_throttle=60,
) as (feed, stream):
assert src.token == feed.shm.token
last_len = new_len = len(src.array)
fsp_target = partial(
fsp_compute,
ctx=ctx,
symbol=symbol,
feed=feed,
stream=stream,
src=src,
dst=dst,
fsp_func_name=fsp_func_name,
func=func
)
async with trio.open_nursery() as n:
cs = await n.start(fsp_target)
# Increment the underlying shared memory buffer on every
# "increment" msg received from the underlying data feed.
async with feed.index_stream() as stream:
async for msg in stream:
new_len = len(src.array)
if new_len > last_len + 1:
# respawn the signal compute task if the source
# signal has been updated
cs.cancel()
cs = await n.start(fsp_target)
# TODO: adopt an incremental update engine/approach
# where possible here eventually!
# read out last shm row
array = dst.array
last = array[-1:].copy()
# write new row to the shm buffer
dst.push(last)
last_len = new_len

View File

@ -0,0 +1,342 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship of piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
core task logic for processing chains
'''
from dataclasses import dataclass
from functools import partial
from typing import AsyncIterator, Callable, Optional
import numpy as np
import pyqtgraph as pg
import trio
from trio_typing import TaskStatus
import tractor
from ..log import get_logger, get_console_log
from .. import data
from ..data import attach_shm_array
from ..data.feed import Feed
from ..data._sharedmem import ShmArray
from ._momo import _rsi, _wma
from ._volume import _tina_vwap
log = get_logger(__name__)
_fsp_builtins = {
'rsi': _rsi,
'wma': _wma,
'vwap': _tina_vwap,
}
# TODO: things to figure the heck out:
# - how to handle non-plottable values (pyqtgraph has facility for this
# now in `arrayToQPath()`)
# - composition of fsps / implicit chaining syntax (we need an issue)
@dataclass
class TaskTracker:
complete: trio.Event
cs: trio.CancelScope
async def filter_quotes_by_sym(
sym: str,
quote_stream: tractor.MsgStream,
) -> AsyncIterator[dict]:
'''
Filter quote stream by target symbol.
'''
# TODO: make this the actual first quote from feed
# XXX: this allows for a single iteration to run for history
# processing without waiting on the real-time feed for a new quote
yield {}
async for quotes in quote_stream:
quote = quotes.get(sym)
if quote:
yield quote
async def fsp_compute(
stream: tractor.MsgStream,
symbol: str,
feed: Feed,
quote_stream: trio.abc.ReceiveChannel,
src: ShmArray,
dst: ShmArray,
func_name: str,
func: Callable,
attach_stream: bool = False,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
profiler = pg.debug.Profiler(
delayed=False,
disabled=True
)
out_stream = func(
# TODO: do we even need this if we do the feed api right?
# shouldn't a local stream do this before we get a handle
# to the async iterable? it's that or we do some kinda
# async itertools style?
filter_quotes_by_sym(symbol, quote_stream),
feed.shm,
)
# Conduct a single iteration of fsp with historical bars input
# and get historical output
history_output = await out_stream.__anext__()
profiler(f'{func_name} generated history')
# build a struct array which includes an 'index' field to push
# as history
history = np.array(
np.arange(len(history_output)),
dtype=dst.array.dtype
)
history[func_name] = history_output
# TODO: XXX:
# THERE'S A BIG BUG HERE WITH THE `index` field since we're
# prepending a copy of the first value a few times to make
# sub-curves align with the parent bar chart.
# This likely needs to be fixed either by,
# - manually assigning the index and historical data
# seperately to the shm array (i.e. not using .push())
# - developing some system on top of the shared mem array that
# is `index` aware such that historical data can be indexed
# relative to the true first datum? Not sure if this is sane
# for incremental compuations.
first = dst._first.value = src._first.value
# TODO: can we use this `start` flag instead of the manual
# setting above?
index = dst.push(history, start=first)
profiler(f'{func_name} pushed history')
profiler.finish()
# setup a respawn handle
with trio.CancelScope() as cs:
tracker = TaskTracker(trio.Event(), cs)
task_status.started((tracker, index))
profiler(f'{func_name} yield last index')
# import time
# last = time.time()
try:
# rt stream
async for processed in out_stream:
log.debug(f"{func_name}: {processed}")
index = src.index
dst.array[-1][func_name] = processed
# NOTE: for now we aren't streaming this to the consumer
# stream latest array index entry which basically just acts
# as trigger msg to tell the consumer to read from shm
if attach_stream:
await stream.send(index)
# period = time.time() - last
# hz = 1/period if period else float('nan')
# if hz > 60:
# log.info(f'FSP quote too fast: {hz}')
# last = time.time()
finally:
tracker.complete.set()
@tractor.context
async def cascade(
ctx: tractor.Context,
brokername: str,
src_shm_token: dict,
dst_shm_token: tuple[str, np.dtype],
symbol: str,
func_name: str,
zero_on_step: bool = False,
loglevel: Optional[str] = None,
) -> None:
'''
Chain streaming signal processors and deliver output to
destination shm array buffer.
'''
profiler = pg.debug.Profiler(delayed=False, disabled=False)
if loglevel:
get_console_log(loglevel)
src = attach_shm_array(token=src_shm_token)
dst = attach_shm_array(readonly=False, token=dst_shm_token)
func: Callable = _fsp_builtins.get(func_name)
if not func:
# TODO: assume it's a func target path
raise ValueError('Unknown fsp target: {func_name}')
# open a data feed stream with requested broker
async with data.feed.maybe_open_feed(
brokername,
[symbol],
# TODO throttle tick outputs from *this* daemon since
# it'll emit tons of ticks due to the throttle only
# limits quote arrival periods, so the consumer of *this*
# needs to get throttled the ticks we generate.
# tick_throttle=60,
) as (feed, quote_stream):
profiler(f'{func_name}: feed up')
assert src.token == feed.shm.token
# last_len = new_len = len(src.array)
async with (
ctx.open_stream() as stream,
trio.open_nursery() as n,
):
fsp_target = partial(
fsp_compute,
stream=stream,
symbol=symbol,
feed=feed,
quote_stream=quote_stream,
# shm
src=src,
dst=dst,
func_name=func_name,
func=func
)
tracker, index = await n.start(fsp_target)
if zero_on_step:
last = dst.array[-1:]
zeroed = np.zeros(last.shape, dtype=last.dtype)
await ctx.started(index)
profiler(f'{func_name}: fsp up')
async def resync(tracker: TaskTracker) -> tuple[TaskTracker, int]:
# TODO: adopt an incremental update engine/approach
# where possible here eventually!
log.warning(f're-syncing fsp {func_name} to source')
tracker.cs.cancel()
await tracker.complete.wait()
return await n.start(fsp_target)
def is_synced(
src: ShmArray,
dst: ShmArray
) -> tuple[bool, int, int]:
'''Predicate to dertmine if a destination FSP
output array is aligned to its source array.
'''
step_diff = src.index - dst.index
len_diff = abs(len(src.array) - len(dst.array))
return not (
# the source is likely backfilling and we must
# sync history calculations
len_diff > 2 or
# we aren't step synced to the source and may be
# leading/lagging by a step
step_diff > 1 or
step_diff < 0
), step_diff, len_diff
async def poll_and_sync_to_step(
tracker: TaskTracker,
src: ShmArray,
dst: ShmArray,
) -> tuple[TaskTracker, int]:
synced, step_diff, _ = is_synced(src, dst)
while not synced:
tracker, index = await resync(tracker)
synced, step_diff, _ = is_synced(src, dst)
return tracker, step_diff
s, step, ld = is_synced(src, dst)
# Increment the underlying shared memory buffer on every
# "increment" msg received from the underlying data feed.
async with feed.index_stream() as stream:
profiler(f'{func_name}: sample stream up')
profiler.finish()
async for msg in stream:
# respawn the compute task if the source
# array has been updated such that we compute
# new history from the (prepended) source.
synced, step_diff, _ = is_synced(src, dst)
if not synced:
tracker, step_diff = await poll_and_sync_to_step(
tracker,
src,
dst,
)
# skip adding a last bar since we should already
# be step alinged
if step_diff == 0:
continue
# read out last shm row, copy and write new row
array = dst.array
# some metrics like vlm should be reset
# to zero every step.
if zero_on_step:
last = zeroed
else:
last = array[-1:].copy()
dst.push(last)

View File

@ -16,6 +16,7 @@
"""
Momentum bby.
"""
from typing import AsyncIterator, Optional
@ -23,12 +24,9 @@ import numpy as np
from numba import jit, float64, optional, int64
from ..data._normalize import iterticks
from ..data._sharedmem import ShmArray
# TODO: things to figure the fuck out:
# - how to handle non-plottable values
# - composition of fsps / implicit chaining
@jit(
float64[:](
float64[:],
@ -39,11 +37,14 @@ from ..data._normalize import iterticks
nogil=True
)
def ema(
y: 'np.ndarray[float64]',
alpha: optional(float64) = None,
ylast: optional(float64) = None,
) -> 'np.ndarray[float64]':
r"""Exponential weighted moving average owka 'Exponential smoothing'.
r'''
Exponential weighted moving average owka 'Exponential smoothing'.
- https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
- https://en.wikipedia.org/wiki/Exponential_smoothing
@ -68,7 +69,8 @@ def ema(
More discussion here:
https://stackoverflow.com/questions/42869495/numpy-version-of-exponential-weighted-moving-average-equivalent-to-pandas-ewm
"""
'''
n = y.shape[0]
if alpha is None:
@ -105,14 +107,21 @@ def ema(
# nogil=True
# )
def rsi(
# TODO: use https://github.com/ramonhagenaars/nptyping
signal: 'np.ndarray[float64]',
period: int64 = 14,
up_ema_last: float64 = None,
down_ema_last: float64 = None,
) -> 'np.ndarray[float64]':
'''
relative strengggth.
'''
alpha = 1/period
df = np.diff(signal)
df = np.diff(signal, prepend=0)
up = np.where(df > 0, df, 0)
up_ema = ema(up, alpha, up_ema_last)
@ -120,11 +129,12 @@ def rsi(
down = np.where(df < 0, -df, 0)
down_ema = ema(down, alpha, down_ema_last)
# avoid dbz errors
# avoid dbz errors, this leaves the first
# index == 0 right?
rs = np.divide(
up_ema,
down_ema,
out=np.zeros_like(up_ema),
out=np.zeros_like(signal),
where=down_ema != 0
)
@ -137,10 +147,18 @@ def rsi(
def wma(
signal: np.ndarray,
length: int,
weights: Optional[np.ndarray] = None,
) -> np.ndarray:
'''
Compute a windowed moving average of ``signal`` with window
``length`` and optional ``weights`` (must be same size as
``signal``).
'''
if weights is None:
# default is a standard arithmetic mean
seq = np.full((length,), 1)
@ -151,18 +169,22 @@ def wma(
return np.convolve(signal, weights, 'valid')
# @piker.fsp.signal(
# @piker.fsp.emit(
# timeframes=['1s', '5s', '15s', '1m', '5m', '1H'],
# )
async def _rsi(
source: 'QuoteStream[Dict[str, Any]]', # noqa
ohlcv: "ShmArray[T<'close'>]",
ohlcv: ShmArray,
period: int = 14,
) -> AsyncIterator[np.ndarray]:
"""Multi-timeframe streaming RSI.
'''
Multi-timeframe streaming RSI.
https://en.wikipedia.org/wiki/Relative_strength_index
"""
'''
sig = ohlcv.array['close']
# wilder says to seed the RSI EMAs with the SMA for the "period"
@ -170,7 +192,8 @@ async def _rsi(
# TODO: the emas here should be seeded with a period SMA as per
# wilder's original formula..
rsi_h, last_up_ema_close, last_down_ema_close = rsi(sig, period, seed, seed)
rsi_h, last_up_ema_close, last_down_ema_close = rsi(
sig, period, seed, seed)
up_ema_last = last_up_ema_close
down_ema_last = last_down_ema_close
@ -178,7 +201,6 @@ async def _rsi(
yield rsi_h
index = ohlcv.index
async for quote in source:
# tick based updates
for tick in iterticks(quote):
@ -206,16 +228,20 @@ async def _rsi(
async def _wma(
source, #: AsyncStream[np.ndarray],
length: int,
ohlcv: np.ndarray, # price time-frame "aware"
) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream?
"""Streaming weighted moving average.
'''
Streaming weighted moving average.
``weights`` is a sequence of already scaled values. As an example
for the WMA often found in "techincal analysis":
``weights = np.arange(1, N) * N*(N-1)/2``.
"""
'''
# deliver historical output as "first yield"
yield wma(ohlcv.array['close'], length)

View File

@ -323,7 +323,7 @@ async def fan_out_spawn_fsp_daemons(
conf['shm'] = shm
portal = await n.start_actor(
enable_modules=['piker.fsp'],
enable_modules=['piker.fsp._engine'],
name='fsp.' + display_name,
)