Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet b8cfee7d2f Drop `bar_wap` curve for now, seems to also be causing hangs?! 2022-04-29 11:27:18 -04:00
Tyler Goodlet 2d7aba0193 Add back fqsn passthrough and feed opening 2022-04-29 11:26:49 -04:00
Tyler Goodlet 260b632f07 Implement `open_history_client()` correctly for `kraken` 2022-04-29 11:26:15 -04:00
Tyler Goodlet b8704e1b7f Add to signal broker won't deliver more data 2022-04-29 11:25:22 -04:00
Tyler Goodlet 800fe7446a Add profiler passthrough type annot, comments about appends vs. uppx 2022-04-29 11:24:21 -04:00
Tyler Goodlet 536d1ff0d1 Relay frame size in `NoData` due to null-result history 2022-04-29 10:05:52 -04:00
Tyler Goodlet be7c047e2f Add , indicates hist size to decrement to storage logic 2022-04-29 08:12:29 -04:00
Tyler Goodlet d3e6ed3ba4 An absolute uppx diff of >= 1 seems more then fine 2022-04-27 17:19:08 -04:00
Tyler Goodlet 329e833e96 Up the display throttle rate to 22Hz 2022-04-27 17:18:11 -04:00
Tyler Goodlet 4e85d1d395 Truncate trade rate wma window sizes 2022-04-27 17:17:40 -04:00
11 changed files with 174 additions and 44 deletions

View File

@ -33,7 +33,41 @@ class SymbolNotFound(BrokerError):
class NoData(BrokerError):
"Symbol data not permitted"
'''
Symbol data not permitted or no data
for time range found.
'''
def __init__(
self,
*args,
frame_size: int = 1000,
) -> None:
super().__init__(*args)
# when raised, machinery can check if the backend
# set a "frame size" for doing datetime calcs.
self.frame_size: int = 1000
class DataUnavailable(BrokerError):
'''
Signal storage requests to terminate.
'''
# TODO: add in a reason that can be displayed in the
# UI (for eg. `kraken` is bs and you should complain
# to them that you can't pull more OHLC data..)
class DataThrottle(BrokerError):
'''
Broker throttled request rate for data.
'''
# TODO: add in throttle metrics/feedback
def resproc(
@ -50,12 +84,12 @@ def resproc(
if not resp.status_code == 200:
raise BrokerError(resp.body)
try:
json = resp.json()
msg = resp.json()
except json.decoder.JSONDecodeError:
log.exception(f"Failed to process {resp}:\n{resp.text}")
raise BrokerError(resp.text)
if log_resp:
log.debug(f"Received json contents:\n{colorize_json(json)}")
log.debug(f"Received json contents:\n{colorize_json(msg)}")
return json if return_json else resp
return msg if return_json else resp

View File

@ -1482,7 +1482,9 @@ async def get_bars(
if 'No market data permissions for' in msg:
# TODO: signalling for no permissions searches
raise NoData(f'Symbol: {fqsn}')
raise NoData(
f'Symbol: {fqsn}',
)
break
elif (
@ -1562,7 +1564,10 @@ async def open_history_client(
if out is None:
# could be trying to retreive bars over weekend
log.error(f"Can't grab bars starting at {end_dt}!?!?")
raise NoData(f'{end_dt}')
raise NoData(
f'{end_dt}',
frame_size=2000,
)
bars, bars_array, first_dt, last_dt = out

View File

@ -20,7 +20,8 @@ Kraken backend.
'''
from contextlib import asynccontextmanager as acm
from dataclasses import asdict, field
from typing import Any, Optional, AsyncIterator, Callable
from datetime import datetime
from typing import Any, Optional, AsyncIterator, Callable, Union
import time
from trio_typing import TaskStatus
@ -40,7 +41,13 @@ import base64
from .. import config
from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound, BrokerError
from ._util import (
resproc,
SymbolNotFound,
BrokerError,
DataThrottle,
DataUnavailable,
)
from ..log import get_logger, get_console_log
from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws, NoBsWs
@ -305,7 +312,7 @@ class Client:
action: str,
size: float,
reqid: str = None,
validate: bool = False # set True test call without a real submission
validate: bool = False # set True test call without a real submission
) -> dict:
'''
Place an order and return integer request id provided by client.
@ -391,17 +398,26 @@ class Client:
async def bars(
self,
symbol: str = 'XBTUSD',
# UTC 2017-07-02 12:53:20
since: int = None,
since: Optional[Union[int, datetime]] = None,
count: int = 720, # <- max allowed per query
as_np: bool = True,
) -> dict:
if since is None:
since = pendulum.now('UTC').start_of('minute').subtract(
minutes=count).timestamp()
elif isinstance(since, int):
since = pendulum.from_timestamp(since).timestamp()
else: # presumably a pendulum datetime
since = since.timestamp()
# UTC 2017-07-02 12:53:20 is oldest seconds value
since = str(max(1499000000, since))
since = str(max(1499000000, int(since)))
json = await self._public(
'OHLC',
data={
@ -445,7 +461,16 @@ class Client:
array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
return array
except KeyError:
raise SymbolNotFound(json['error'][0] + f': {symbol}')
errmsg = json['error'][0]
if 'not found' in errmsg:
raise SymbolNotFound(errmsg + f': {symbol}')
elif 'Too many requests' in errmsg:
raise DataThrottle(f'{symbol}')
else:
raise BrokerError(errmsg)
@acm
@ -668,8 +693,8 @@ async def handle_order_requests(
oid=msg.oid,
reqid=msg.reqid,
symbol=msg.symbol,
# TODO: maybe figure out if pending cancels will
# eventually get cancelled
# TODO: maybe figure out if pending
# cancels will eventually get cancelled
reason="Order cancel is still pending?",
broker_details=resp
).dict()
@ -1003,7 +1028,45 @@ async def open_history_client(
# TODO implement history getter for the new storage layer.
async with open_cached_client('kraken') as client:
yield client
# lol, kraken won't send any more then the "last"
# 720 1m bars.. so we have to just ignore further
# requests of this type..
queries: int = 0
async def get_ohlc(
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
) -> tuple[
np.ndarray,
datetime, # start
datetime, # end
]:
nonlocal queries
if queries > 0:
raise DataUnavailable
count = 0
while count <= 3:
try:
array = await client.bars(
symbol,
since=end_dt,
)
count += 1
queries += 1
break
except DataThrottle:
log.warning(f'kraken OHLC throttle for {symbol}')
await trio.sleep(1)
start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time'])
return array, start_dt, end_dt
yield get_ohlc
async def backfill_bars(

View File

@ -148,7 +148,7 @@ def storesh(
enable_modules=['piker.data._ahab'],
):
symbol = symbols[0]
await tsdb_history_update()
await tsdb_history_update(symbol)
trio.run(main)

View File

@ -67,6 +67,10 @@ from ._sampling import (
sample_and_broadcast,
uniform_rate_send,
)
from ..brokers._util import (
NoData,
DataUnavailable,
)
log = get_logger(__name__)
@ -273,7 +277,19 @@ async def start_backfill(
# and count < mx_fills
):
count += 1
array, start_dt, end_dt = await hist(end_dt=start_dt)
try:
array, start_dt, end_dt = await hist(end_dt=start_dt)
except NoData:
# decrement by the diff in time last delivered.
end_dt = start_dt.subtract(seconds=(end_dt - start_dt).seconds)
continue
except DataUnavailable:
# broker is being a bish and we can't pull
# any more..
break
to_push = diff_history(
array,
start_dt,

View File

@ -270,6 +270,7 @@ class Storage:
self,
fqsn: str,
timeframe: Optional[Union[int, str]] = None,
end: Optional[int] = None,
) -> tuple[
MarketstoreClient,
@ -287,6 +288,7 @@ class Storage:
symbols=fqsn,
timeframe=tfstr,
attrgroup='OHLCV',
end=end,
# limit_from_start=True,
# TODO: figure the max limit here given the
@ -346,6 +348,7 @@ class Storage:
self,
fqsn: str,
ohlcv: np.ndarray,
append_and_duplicate: bool = True,
) -> None:
# build mkts schema compat array for writing
@ -373,7 +376,7 @@ class Storage:
# NOTE: will will append duplicates
# for the same timestamp-index.
# TODO: pre deduplicate?
isvariablelength=True,
isvariablelength=append_and_duplicate,
)
log.info(
@ -443,17 +446,17 @@ async def tsdb_history_update(
async with (
open_storage_client(fqsn) as storage,
# maybe_open_feed(
# [fqsn],
# start_stream=False,
maybe_open_feed(
[fqsn],
start_stream=False,
# ) as (feed, stream),
) as (feed, stream),
):
profiler(f'opened feed for {fqsn}')
# to_append = feed.shm.array
# to_prepend = None
to_append = feed.shm.array
to_prepend = None
if fqsn:
symbol = feed.symbols.get(fqsn)
@ -477,10 +480,11 @@ async def tsdb_history_update(
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
profiler(f'listed symbols {syms}')
# TODO: ask if user wants to write history for detected
# available shm buffers?
from tractor.trionics import ipython_embed
await ipython_embed()
# for array in [to_append, to_prepend]:
# if array is None:
# continue
@ -490,7 +494,7 @@ async def tsdb_history_update(
# )
# await storage.write_ohlcv(fqsn, array)
profiler('Finished db writes')
# profiler('Finished db writes')
async def ingest_quote_stream(

View File

@ -167,6 +167,7 @@ def _wma(
assert length == len(weights)
# lol, for long sequences this is nutso slow and expensive..
return np.convolve(signal, weights, 'valid')

View File

@ -309,7 +309,7 @@ async def flow_rates(
if period > 1:
trade_rate_wma = _wma(
dvlm_shm.array['trade_count'],
dvlm_shm.array['trade_count'][-period:],
period,
weights=weights,
)
@ -332,7 +332,7 @@ async def flow_rates(
if period > 1:
dark_trade_rate_wma = _wma(
dvlm_shm.array['dark_trade_count'],
dvlm_shm.array['dark_trade_count'][-period:],
period,
weights=weights,
)

View File

@ -429,10 +429,7 @@ class FastAppendCurve(pg.GraphicsObject):
if (
# std m4 downsample conditions
px_width
and uppx_diff >= 1
or uppx_diff <= -1
or self._step_mode and abs(uppx_diff) >= 2
and abs(uppx_diff) >= 1
):
log.info(
f'{self._name} sampler change: {self._last_uppx} -> {uppx}'

View File

@ -61,7 +61,7 @@ from ..log import get_logger
log = get_logger(__name__)
# TODO: load this from a config.toml!
_quote_throttle_rate: int = 12 # Hz
_quote_throttle_rate: int = 22 # Hz
# a working tick-type-classes template
@ -689,15 +689,17 @@ async def display_symbol_data(
# plot historical vwap if available
wap_in_history = False
if brokermod._show_wap_in_history:
# XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?!
# if brokermod._show_wap_in_history:
if 'bar_wap' in bars.dtype.fields:
wap_in_history = True
chart.draw_curve(
name='bar_wap',
data=bars,
add_label=False,
)
# if 'bar_wap' in bars.dtype.fields:
# wap_in_history = True
# chart.draw_curve(
# name='bar_wap',
# shm=ohlcv,
# color='default_light',
# add_label=False,
# )
# size view to data once at outset
chart.cv._set_yrange()

View File

@ -320,7 +320,7 @@ class Flow(msgspec.Struct): # , frozen=True):
render: bool = True,
array_key: Optional[str] = None,
profiler=None,
profiler: Optional[pg.debug.Profiler] = None,
**kwargs,
@ -524,7 +524,10 @@ class Flow(msgspec.Struct): # , frozen=True):
view_range=(ivl, ivr), # hack
profiler=profiler,
# should_redraw=False,
# do_append=False,
# NOTE: already passed through by display loop?
# do_append=uppx < 16,
**kwargs,
)
curve.show()
profiler('updated ds curve')
@ -589,6 +592,7 @@ class Flow(msgspec.Struct): # , frozen=True):
else:
# ``FastAppendCurve`` case:
array_key = array_key or self.name
uppx = graphics.x_uppx()
if graphics._step_mode and self.gy is None:
self._iflat_first = self.shm._first.value
@ -834,7 +838,9 @@ class Flow(msgspec.Struct): # , frozen=True):
slice_to_head=-2,
should_redraw=bool(append_diff),
# do_append=False,
# NOTE: already passed through by display loop?
# do_append=uppx < 16,
**kwargs
)
@ -866,6 +872,8 @@ class Flow(msgspec.Struct): # , frozen=True):
view_range=(ivl, ivr) if use_vr else None,
# NOTE: already passed through by display loop?
# do_append=uppx < 16,
**kwargs
)