Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet b8cfee7d2f Drop `bar_wap` curve for now, seems to also be causing hangs?! 2022-04-29 11:27:18 -04:00
Tyler Goodlet 2d7aba0193 Add back fqsn passthrough and feed opening 2022-04-29 11:26:49 -04:00
Tyler Goodlet 260b632f07 Implement `open_history_client()` correctly for `kraken` 2022-04-29 11:26:15 -04:00
Tyler Goodlet b8704e1b7f Add to signal broker won't deliver more data 2022-04-29 11:25:22 -04:00
Tyler Goodlet 800fe7446a Add profiler passthrough type annot, comments about appends vs. uppx 2022-04-29 11:24:21 -04:00
Tyler Goodlet 536d1ff0d1 Relay frame size in `NoData` due to null-result history 2022-04-29 10:05:52 -04:00
Tyler Goodlet be7c047e2f Add , indicates hist size to decrement to storage logic 2022-04-29 08:12:29 -04:00
Tyler Goodlet d3e6ed3ba4 An absolute uppx diff of >= 1 seems more then fine 2022-04-27 17:19:08 -04:00
Tyler Goodlet 329e833e96 Up the display throttle rate to 22Hz 2022-04-27 17:18:11 -04:00
Tyler Goodlet 4e85d1d395 Truncate trade rate wma window sizes 2022-04-27 17:17:40 -04:00
11 changed files with 174 additions and 44 deletions

View File

@ -33,7 +33,41 @@ class SymbolNotFound(BrokerError):
class NoData(BrokerError): class NoData(BrokerError):
"Symbol data not permitted" '''
Symbol data not permitted or no data
for time range found.
'''
def __init__(
self,
*args,
frame_size: int = 1000,
) -> None:
super().__init__(*args)
# when raised, machinery can check if the backend
# set a "frame size" for doing datetime calcs.
self.frame_size: int = 1000
class DataUnavailable(BrokerError):
'''
Signal storage requests to terminate.
'''
# TODO: add in a reason that can be displayed in the
# UI (for eg. `kraken` is bs and you should complain
# to them that you can't pull more OHLC data..)
class DataThrottle(BrokerError):
'''
Broker throttled request rate for data.
'''
# TODO: add in throttle metrics/feedback
def resproc( def resproc(
@ -50,12 +84,12 @@ def resproc(
if not resp.status_code == 200: if not resp.status_code == 200:
raise BrokerError(resp.body) raise BrokerError(resp.body)
try: try:
json = resp.json() msg = resp.json()
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError:
log.exception(f"Failed to process {resp}:\n{resp.text}") log.exception(f"Failed to process {resp}:\n{resp.text}")
raise BrokerError(resp.text) raise BrokerError(resp.text)
if log_resp: if log_resp:
log.debug(f"Received json contents:\n{colorize_json(json)}") log.debug(f"Received json contents:\n{colorize_json(msg)}")
return json if return_json else resp return msg if return_json else resp

View File

@ -1482,7 +1482,9 @@ async def get_bars(
if 'No market data permissions for' in msg: if 'No market data permissions for' in msg:
# TODO: signalling for no permissions searches # TODO: signalling for no permissions searches
raise NoData(f'Symbol: {fqsn}') raise NoData(
f'Symbol: {fqsn}',
)
break break
elif ( elif (
@ -1562,7 +1564,10 @@ async def open_history_client(
if out is None: if out is None:
# could be trying to retreive bars over weekend # could be trying to retreive bars over weekend
log.error(f"Can't grab bars starting at {end_dt}!?!?") log.error(f"Can't grab bars starting at {end_dt}!?!?")
raise NoData(f'{end_dt}') raise NoData(
f'{end_dt}',
frame_size=2000,
)
bars, bars_array, first_dt, last_dt = out bars, bars_array, first_dt, last_dt = out

View File

@ -20,7 +20,8 @@ Kraken backend.
''' '''
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from dataclasses import asdict, field from dataclasses import asdict, field
from typing import Any, Optional, AsyncIterator, Callable from datetime import datetime
from typing import Any, Optional, AsyncIterator, Callable, Union
import time import time
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -40,7 +41,13 @@ import base64
from .. import config from .. import config
from .._cacheables import open_cached_client from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound, BrokerError from ._util import (
resproc,
SymbolNotFound,
BrokerError,
DataThrottle,
DataUnavailable,
)
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ShmArray from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws, NoBsWs from ..data._web_bs import open_autorecon_ws, NoBsWs
@ -305,7 +312,7 @@ class Client:
action: str, action: str,
size: float, size: float,
reqid: str = None, reqid: str = None,
validate: bool = False # set True test call without a real submission validate: bool = False # set True test call without a real submission
) -> dict: ) -> dict:
''' '''
Place an order and return integer request id provided by client. Place an order and return integer request id provided by client.
@ -391,17 +398,26 @@ class Client:
async def bars( async def bars(
self, self,
symbol: str = 'XBTUSD', symbol: str = 'XBTUSD',
# UTC 2017-07-02 12:53:20 # UTC 2017-07-02 12:53:20
since: int = None, since: Optional[Union[int, datetime]] = None,
count: int = 720, # <- max allowed per query count: int = 720, # <- max allowed per query
as_np: bool = True, as_np: bool = True,
) -> dict: ) -> dict:
if since is None: if since is None:
since = pendulum.now('UTC').start_of('minute').subtract( since = pendulum.now('UTC').start_of('minute').subtract(
minutes=count).timestamp() minutes=count).timestamp()
elif isinstance(since, int):
since = pendulum.from_timestamp(since).timestamp()
else: # presumably a pendulum datetime
since = since.timestamp()
# UTC 2017-07-02 12:53:20 is oldest seconds value # UTC 2017-07-02 12:53:20 is oldest seconds value
since = str(max(1499000000, since)) since = str(max(1499000000, int(since)))
json = await self._public( json = await self._public(
'OHLC', 'OHLC',
data={ data={
@ -445,7 +461,16 @@ class Client:
array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
return array return array
except KeyError: except KeyError:
raise SymbolNotFound(json['error'][0] + f': {symbol}') errmsg = json['error'][0]
if 'not found' in errmsg:
raise SymbolNotFound(errmsg + f': {symbol}')
elif 'Too many requests' in errmsg:
raise DataThrottle(f'{symbol}')
else:
raise BrokerError(errmsg)
@acm @acm
@ -668,8 +693,8 @@ async def handle_order_requests(
oid=msg.oid, oid=msg.oid,
reqid=msg.reqid, reqid=msg.reqid,
symbol=msg.symbol, symbol=msg.symbol,
# TODO: maybe figure out if pending cancels will # TODO: maybe figure out if pending
# eventually get cancelled # cancels will eventually get cancelled
reason="Order cancel is still pending?", reason="Order cancel is still pending?",
broker_details=resp broker_details=resp
).dict() ).dict()
@ -1003,7 +1028,45 @@ async def open_history_client(
# TODO implement history getter for the new storage layer. # TODO implement history getter for the new storage layer.
async with open_cached_client('kraken') as client: async with open_cached_client('kraken') as client:
yield client
# lol, kraken won't send any more then the "last"
# 720 1m bars.. so we have to just ignore further
# requests of this type..
queries: int = 0
async def get_ohlc(
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
) -> tuple[
np.ndarray,
datetime, # start
datetime, # end
]:
nonlocal queries
if queries > 0:
raise DataUnavailable
count = 0
while count <= 3:
try:
array = await client.bars(
symbol,
since=end_dt,
)
count += 1
queries += 1
break
except DataThrottle:
log.warning(f'kraken OHLC throttle for {symbol}')
await trio.sleep(1)
start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time'])
return array, start_dt, end_dt
yield get_ohlc
async def backfill_bars( async def backfill_bars(

View File

@ -148,7 +148,7 @@ def storesh(
enable_modules=['piker.data._ahab'], enable_modules=['piker.data._ahab'],
): ):
symbol = symbols[0] symbol = symbols[0]
await tsdb_history_update() await tsdb_history_update(symbol)
trio.run(main) trio.run(main)

View File

@ -67,6 +67,10 @@ from ._sampling import (
sample_and_broadcast, sample_and_broadcast,
uniform_rate_send, uniform_rate_send,
) )
from ..brokers._util import (
NoData,
DataUnavailable,
)
log = get_logger(__name__) log = get_logger(__name__)
@ -273,7 +277,19 @@ async def start_backfill(
# and count < mx_fills # and count < mx_fills
): ):
count += 1 count += 1
array, start_dt, end_dt = await hist(end_dt=start_dt) try:
array, start_dt, end_dt = await hist(end_dt=start_dt)
except NoData:
# decrement by the diff in time last delivered.
end_dt = start_dt.subtract(seconds=(end_dt - start_dt).seconds)
continue
except DataUnavailable:
# broker is being a bish and we can't pull
# any more..
break
to_push = diff_history( to_push = diff_history(
array, array,
start_dt, start_dt,

View File

@ -270,6 +270,7 @@ class Storage:
self, self,
fqsn: str, fqsn: str,
timeframe: Optional[Union[int, str]] = None, timeframe: Optional[Union[int, str]] = None,
end: Optional[int] = None,
) -> tuple[ ) -> tuple[
MarketstoreClient, MarketstoreClient,
@ -287,6 +288,7 @@ class Storage:
symbols=fqsn, symbols=fqsn,
timeframe=tfstr, timeframe=tfstr,
attrgroup='OHLCV', attrgroup='OHLCV',
end=end,
# limit_from_start=True, # limit_from_start=True,
# TODO: figure the max limit here given the # TODO: figure the max limit here given the
@ -346,6 +348,7 @@ class Storage:
self, self,
fqsn: str, fqsn: str,
ohlcv: np.ndarray, ohlcv: np.ndarray,
append_and_duplicate: bool = True,
) -> None: ) -> None:
# build mkts schema compat array for writing # build mkts schema compat array for writing
@ -373,7 +376,7 @@ class Storage:
# NOTE: will will append duplicates # NOTE: will will append duplicates
# for the same timestamp-index. # for the same timestamp-index.
# TODO: pre deduplicate? # TODO: pre deduplicate?
isvariablelength=True, isvariablelength=append_and_duplicate,
) )
log.info( log.info(
@ -443,17 +446,17 @@ async def tsdb_history_update(
async with ( async with (
open_storage_client(fqsn) as storage, open_storage_client(fqsn) as storage,
# maybe_open_feed( maybe_open_feed(
# [fqsn], [fqsn],
# start_stream=False, start_stream=False,
# ) as (feed, stream), ) as (feed, stream),
): ):
profiler(f'opened feed for {fqsn}') profiler(f'opened feed for {fqsn}')
# to_append = feed.shm.array to_append = feed.shm.array
# to_prepend = None to_prepend = None
if fqsn: if fqsn:
symbol = feed.symbols.get(fqsn) symbol = feed.symbols.get(fqsn)
@ -477,10 +480,11 @@ async def tsdb_history_update(
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}') log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
profiler(f'listed symbols {syms}') profiler(f'listed symbols {syms}')
# TODO: ask if user wants to write history for detected
# available shm buffers?
from tractor.trionics import ipython_embed from tractor.trionics import ipython_embed
await ipython_embed() await ipython_embed()
# for array in [to_append, to_prepend]: # for array in [to_append, to_prepend]:
# if array is None: # if array is None:
# continue # continue
@ -490,7 +494,7 @@ async def tsdb_history_update(
# ) # )
# await storage.write_ohlcv(fqsn, array) # await storage.write_ohlcv(fqsn, array)
profiler('Finished db writes') # profiler('Finished db writes')
async def ingest_quote_stream( async def ingest_quote_stream(

View File

@ -167,6 +167,7 @@ def _wma(
assert length == len(weights) assert length == len(weights)
# lol, for long sequences this is nutso slow and expensive..
return np.convolve(signal, weights, 'valid') return np.convolve(signal, weights, 'valid')

View File

@ -309,7 +309,7 @@ async def flow_rates(
if period > 1: if period > 1:
trade_rate_wma = _wma( trade_rate_wma = _wma(
dvlm_shm.array['trade_count'], dvlm_shm.array['trade_count'][-period:],
period, period,
weights=weights, weights=weights,
) )
@ -332,7 +332,7 @@ async def flow_rates(
if period > 1: if period > 1:
dark_trade_rate_wma = _wma( dark_trade_rate_wma = _wma(
dvlm_shm.array['dark_trade_count'], dvlm_shm.array['dark_trade_count'][-period:],
period, period,
weights=weights, weights=weights,
) )

View File

@ -429,10 +429,7 @@ class FastAppendCurve(pg.GraphicsObject):
if ( if (
# std m4 downsample conditions # std m4 downsample conditions
px_width px_width
and uppx_diff >= 1 and abs(uppx_diff) >= 1
or uppx_diff <= -1
or self._step_mode and abs(uppx_diff) >= 2
): ):
log.info( log.info(
f'{self._name} sampler change: {self._last_uppx} -> {uppx}' f'{self._name} sampler change: {self._last_uppx} -> {uppx}'

View File

@ -61,7 +61,7 @@ from ..log import get_logger
log = get_logger(__name__) log = get_logger(__name__)
# TODO: load this from a config.toml! # TODO: load this from a config.toml!
_quote_throttle_rate: int = 12 # Hz _quote_throttle_rate: int = 22 # Hz
# a working tick-type-classes template # a working tick-type-classes template
@ -689,15 +689,17 @@ async def display_symbol_data(
# plot historical vwap if available # plot historical vwap if available
wap_in_history = False wap_in_history = False
if brokermod._show_wap_in_history: # XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?!
# if brokermod._show_wap_in_history:
if 'bar_wap' in bars.dtype.fields: # if 'bar_wap' in bars.dtype.fields:
wap_in_history = True # wap_in_history = True
chart.draw_curve( # chart.draw_curve(
name='bar_wap', # name='bar_wap',
data=bars, # shm=ohlcv,
add_label=False, # color='default_light',
) # add_label=False,
# )
# size view to data once at outset # size view to data once at outset
chart.cv._set_yrange() chart.cv._set_yrange()

View File

@ -320,7 +320,7 @@ class Flow(msgspec.Struct): # , frozen=True):
render: bool = True, render: bool = True,
array_key: Optional[str] = None, array_key: Optional[str] = None,
profiler=None, profiler: Optional[pg.debug.Profiler] = None,
**kwargs, **kwargs,
@ -524,7 +524,10 @@ class Flow(msgspec.Struct): # , frozen=True):
view_range=(ivl, ivr), # hack view_range=(ivl, ivr), # hack
profiler=profiler, profiler=profiler,
# should_redraw=False, # should_redraw=False,
# do_append=False,
# NOTE: already passed through by display loop?
# do_append=uppx < 16,
**kwargs,
) )
curve.show() curve.show()
profiler('updated ds curve') profiler('updated ds curve')
@ -589,6 +592,7 @@ class Flow(msgspec.Struct): # , frozen=True):
else: else:
# ``FastAppendCurve`` case: # ``FastAppendCurve`` case:
array_key = array_key or self.name array_key = array_key or self.name
uppx = graphics.x_uppx()
if graphics._step_mode and self.gy is None: if graphics._step_mode and self.gy is None:
self._iflat_first = self.shm._first.value self._iflat_first = self.shm._first.value
@ -834,7 +838,9 @@ class Flow(msgspec.Struct): # , frozen=True):
slice_to_head=-2, slice_to_head=-2,
should_redraw=bool(append_diff), should_redraw=bool(append_diff),
# do_append=False,
# NOTE: already passed through by display loop?
# do_append=uppx < 16,
**kwargs **kwargs
) )
@ -866,6 +872,8 @@ class Flow(msgspec.Struct): # , frozen=True):
view_range=(ivl, ivr) if use_vr else None, view_range=(ivl, ivr) if use_vr else None,
# NOTE: already passed through by display loop?
# do_append=uppx < 16,
**kwargs **kwargs
) )