Compare commits

..

No commits in common. "b8cfee7d2f4b86fc07a7f6a488ac2712089570b2" and "19205d57a1b312d09a16b28d9e2829d570f2f9ed" have entirely different histories.

11 changed files with 44 additions and 174 deletions

View File

@ -33,41 +33,7 @@ class SymbolNotFound(BrokerError):
class NoData(BrokerError): class NoData(BrokerError):
''' "Symbol data not permitted"
Symbol data not permitted or no data
for time range found.
'''
def __init__(
self,
*args,
frame_size: int = 1000,
) -> None:
super().__init__(*args)
# when raised, machinery can check if the backend
# set a "frame size" for doing datetime calcs.
self.frame_size: int = 1000
class DataUnavailable(BrokerError):
'''
Signal storage requests to terminate.
'''
# TODO: add in a reason that can be displayed in the
# UI (for eg. `kraken` is bs and you should complain
# to them that you can't pull more OHLC data..)
class DataThrottle(BrokerError):
'''
Broker throttled request rate for data.
'''
# TODO: add in throttle metrics/feedback
def resproc( def resproc(
@ -84,12 +50,12 @@ def resproc(
if not resp.status_code == 200: if not resp.status_code == 200:
raise BrokerError(resp.body) raise BrokerError(resp.body)
try: try:
msg = resp.json() json = resp.json()
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError:
log.exception(f"Failed to process {resp}:\n{resp.text}") log.exception(f"Failed to process {resp}:\n{resp.text}")
raise BrokerError(resp.text) raise BrokerError(resp.text)
if log_resp: if log_resp:
log.debug(f"Received json contents:\n{colorize_json(msg)}") log.debug(f"Received json contents:\n{colorize_json(json)}")
return msg if return_json else resp return json if return_json else resp

View File

@ -1482,9 +1482,7 @@ async def get_bars(
if 'No market data permissions for' in msg: if 'No market data permissions for' in msg:
# TODO: signalling for no permissions searches # TODO: signalling for no permissions searches
raise NoData( raise NoData(f'Symbol: {fqsn}')
f'Symbol: {fqsn}',
)
break break
elif ( elif (
@ -1564,10 +1562,7 @@ async def open_history_client(
if out is None: if out is None:
# could be trying to retreive bars over weekend # could be trying to retreive bars over weekend
log.error(f"Can't grab bars starting at {end_dt}!?!?") log.error(f"Can't grab bars starting at {end_dt}!?!?")
raise NoData( raise NoData(f'{end_dt}')
f'{end_dt}',
frame_size=2000,
)
bars, bars_array, first_dt, last_dt = out bars, bars_array, first_dt, last_dt = out

View File

@ -20,8 +20,7 @@ Kraken backend.
''' '''
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from dataclasses import asdict, field from dataclasses import asdict, field
from datetime import datetime from typing import Any, Optional, AsyncIterator, Callable
from typing import Any, Optional, AsyncIterator, Callable, Union
import time import time
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -41,13 +40,7 @@ import base64
from .. import config from .. import config
from .._cacheables import open_cached_client from .._cacheables import open_cached_client
from ._util import ( from ._util import resproc, SymbolNotFound, BrokerError
resproc,
SymbolNotFound,
BrokerError,
DataThrottle,
DataUnavailable,
)
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ShmArray from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws, NoBsWs from ..data._web_bs import open_autorecon_ws, NoBsWs
@ -398,26 +391,17 @@ class Client:
async def bars( async def bars(
self, self,
symbol: str = 'XBTUSD', symbol: str = 'XBTUSD',
# UTC 2017-07-02 12:53:20 # UTC 2017-07-02 12:53:20
since: Optional[Union[int, datetime]] = None, since: int = None,
count: int = 720, # <- max allowed per query count: int = 720, # <- max allowed per query
as_np: bool = True, as_np: bool = True,
) -> dict: ) -> dict:
if since is None: if since is None:
since = pendulum.now('UTC').start_of('minute').subtract( since = pendulum.now('UTC').start_of('minute').subtract(
minutes=count).timestamp() minutes=count).timestamp()
elif isinstance(since, int):
since = pendulum.from_timestamp(since).timestamp()
else: # presumably a pendulum datetime
since = since.timestamp()
# UTC 2017-07-02 12:53:20 is oldest seconds value # UTC 2017-07-02 12:53:20 is oldest seconds value
since = str(max(1499000000, int(since))) since = str(max(1499000000, since))
json = await self._public( json = await self._public(
'OHLC', 'OHLC',
data={ data={
@ -461,16 +445,7 @@ class Client:
array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
return array return array
except KeyError: except KeyError:
errmsg = json['error'][0] raise SymbolNotFound(json['error'][0] + f': {symbol}')
if 'not found' in errmsg:
raise SymbolNotFound(errmsg + f': {symbol}')
elif 'Too many requests' in errmsg:
raise DataThrottle(f'{symbol}')
else:
raise BrokerError(errmsg)
@acm @acm
@ -693,8 +668,8 @@ async def handle_order_requests(
oid=msg.oid, oid=msg.oid,
reqid=msg.reqid, reqid=msg.reqid,
symbol=msg.symbol, symbol=msg.symbol,
# TODO: maybe figure out if pending # TODO: maybe figure out if pending cancels will
# cancels will eventually get cancelled # eventually get cancelled
reason="Order cancel is still pending?", reason="Order cancel is still pending?",
broker_details=resp broker_details=resp
).dict() ).dict()
@ -1028,45 +1003,7 @@ async def open_history_client(
# TODO implement history getter for the new storage layer. # TODO implement history getter for the new storage layer.
async with open_cached_client('kraken') as client: async with open_cached_client('kraken') as client:
yield client
# lol, kraken won't send any more then the "last"
# 720 1m bars.. so we have to just ignore further
# requests of this type..
queries: int = 0
async def get_ohlc(
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
) -> tuple[
np.ndarray,
datetime, # start
datetime, # end
]:
nonlocal queries
if queries > 0:
raise DataUnavailable
count = 0
while count <= 3:
try:
array = await client.bars(
symbol,
since=end_dt,
)
count += 1
queries += 1
break
except DataThrottle:
log.warning(f'kraken OHLC throttle for {symbol}')
await trio.sleep(1)
start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time'])
return array, start_dt, end_dt
yield get_ohlc
async def backfill_bars( async def backfill_bars(

View File

@ -148,7 +148,7 @@ def storesh(
enable_modules=['piker.data._ahab'], enable_modules=['piker.data._ahab'],
): ):
symbol = symbols[0] symbol = symbols[0]
await tsdb_history_update(symbol) await tsdb_history_update()
trio.run(main) trio.run(main)

View File

@ -67,10 +67,6 @@ from ._sampling import (
sample_and_broadcast, sample_and_broadcast,
uniform_rate_send, uniform_rate_send,
) )
from ..brokers._util import (
NoData,
DataUnavailable,
)
log = get_logger(__name__) log = get_logger(__name__)
@ -277,19 +273,7 @@ async def start_backfill(
# and count < mx_fills # and count < mx_fills
): ):
count += 1 count += 1
try:
array, start_dt, end_dt = await hist(end_dt=start_dt) array, start_dt, end_dt = await hist(end_dt=start_dt)
except NoData:
# decrement by the diff in time last delivered.
end_dt = start_dt.subtract(seconds=(end_dt - start_dt).seconds)
continue
except DataUnavailable:
# broker is being a bish and we can't pull
# any more..
break
to_push = diff_history( to_push = diff_history(
array, array,
start_dt, start_dt,

View File

@ -270,7 +270,6 @@ class Storage:
self, self,
fqsn: str, fqsn: str,
timeframe: Optional[Union[int, str]] = None, timeframe: Optional[Union[int, str]] = None,
end: Optional[int] = None,
) -> tuple[ ) -> tuple[
MarketstoreClient, MarketstoreClient,
@ -288,7 +287,6 @@ class Storage:
symbols=fqsn, symbols=fqsn,
timeframe=tfstr, timeframe=tfstr,
attrgroup='OHLCV', attrgroup='OHLCV',
end=end,
# limit_from_start=True, # limit_from_start=True,
# TODO: figure the max limit here given the # TODO: figure the max limit here given the
@ -348,7 +346,6 @@ class Storage:
self, self,
fqsn: str, fqsn: str,
ohlcv: np.ndarray, ohlcv: np.ndarray,
append_and_duplicate: bool = True,
) -> None: ) -> None:
# build mkts schema compat array for writing # build mkts schema compat array for writing
@ -376,7 +373,7 @@ class Storage:
# NOTE: will will append duplicates # NOTE: will will append duplicates
# for the same timestamp-index. # for the same timestamp-index.
# TODO: pre deduplicate? # TODO: pre deduplicate?
isvariablelength=append_and_duplicate, isvariablelength=True,
) )
log.info( log.info(
@ -446,17 +443,17 @@ async def tsdb_history_update(
async with ( async with (
open_storage_client(fqsn) as storage, open_storage_client(fqsn) as storage,
maybe_open_feed( # maybe_open_feed(
[fqsn], # [fqsn],
start_stream=False, # start_stream=False,
) as (feed, stream), # ) as (feed, stream),
): ):
profiler(f'opened feed for {fqsn}') profiler(f'opened feed for {fqsn}')
to_append = feed.shm.array # to_append = feed.shm.array
to_prepend = None # to_prepend = None
if fqsn: if fqsn:
symbol = feed.symbols.get(fqsn) symbol = feed.symbols.get(fqsn)
@ -480,11 +477,10 @@ async def tsdb_history_update(
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}') log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
profiler(f'listed symbols {syms}') profiler(f'listed symbols {syms}')
# TODO: ask if user wants to write history for detected
# available shm buffers?
from tractor.trionics import ipython_embed from tractor.trionics import ipython_embed
await ipython_embed() await ipython_embed()
# for array in [to_append, to_prepend]: # for array in [to_append, to_prepend]:
# if array is None: # if array is None:
# continue # continue
@ -494,7 +490,7 @@ async def tsdb_history_update(
# ) # )
# await storage.write_ohlcv(fqsn, array) # await storage.write_ohlcv(fqsn, array)
# profiler('Finished db writes') profiler('Finished db writes')
async def ingest_quote_stream( async def ingest_quote_stream(

View File

@ -167,7 +167,6 @@ def _wma(
assert length == len(weights) assert length == len(weights)
# lol, for long sequences this is nutso slow and expensive..
return np.convolve(signal, weights, 'valid') return np.convolve(signal, weights, 'valid')

View File

@ -309,7 +309,7 @@ async def flow_rates(
if period > 1: if period > 1:
trade_rate_wma = _wma( trade_rate_wma = _wma(
dvlm_shm.array['trade_count'][-period:], dvlm_shm.array['trade_count'],
period, period,
weights=weights, weights=weights,
) )
@ -332,7 +332,7 @@ async def flow_rates(
if period > 1: if period > 1:
dark_trade_rate_wma = _wma( dark_trade_rate_wma = _wma(
dvlm_shm.array['dark_trade_count'][-period:], dvlm_shm.array['dark_trade_count'],
period, period,
weights=weights, weights=weights,
) )

View File

@ -429,7 +429,10 @@ class FastAppendCurve(pg.GraphicsObject):
if ( if (
# std m4 downsample conditions # std m4 downsample conditions
px_width px_width
and abs(uppx_diff) >= 1 and uppx_diff >= 1
or uppx_diff <= -1
or self._step_mode and abs(uppx_diff) >= 2
): ):
log.info( log.info(
f'{self._name} sampler change: {self._last_uppx} -> {uppx}' f'{self._name} sampler change: {self._last_uppx} -> {uppx}'

View File

@ -61,7 +61,7 @@ from ..log import get_logger
log = get_logger(__name__) log = get_logger(__name__)
# TODO: load this from a config.toml! # TODO: load this from a config.toml!
_quote_throttle_rate: int = 22 # Hz _quote_throttle_rate: int = 12 # Hz
# a working tick-type-classes template # a working tick-type-classes template
@ -689,17 +689,15 @@ async def display_symbol_data(
# plot historical vwap if available # plot historical vwap if available
wap_in_history = False wap_in_history = False
# XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?! if brokermod._show_wap_in_history:
# if brokermod._show_wap_in_history:
# if 'bar_wap' in bars.dtype.fields: if 'bar_wap' in bars.dtype.fields:
# wap_in_history = True wap_in_history = True
# chart.draw_curve( chart.draw_curve(
# name='bar_wap', name='bar_wap',
# shm=ohlcv, data=bars,
# color='default_light', add_label=False,
# add_label=False, )
# )
# size view to data once at outset # size view to data once at outset
chart.cv._set_yrange() chart.cv._set_yrange()

View File

@ -320,7 +320,7 @@ class Flow(msgspec.Struct): # , frozen=True):
render: bool = True, render: bool = True,
array_key: Optional[str] = None, array_key: Optional[str] = None,
profiler: Optional[pg.debug.Profiler] = None, profiler=None,
**kwargs, **kwargs,
@ -524,10 +524,7 @@ class Flow(msgspec.Struct): # , frozen=True):
view_range=(ivl, ivr), # hack view_range=(ivl, ivr), # hack
profiler=profiler, profiler=profiler,
# should_redraw=False, # should_redraw=False,
# do_append=False,
# NOTE: already passed through by display loop?
# do_append=uppx < 16,
**kwargs,
) )
curve.show() curve.show()
profiler('updated ds curve') profiler('updated ds curve')
@ -592,7 +589,6 @@ class Flow(msgspec.Struct): # , frozen=True):
else: else:
# ``FastAppendCurve`` case: # ``FastAppendCurve`` case:
array_key = array_key or self.name array_key = array_key or self.name
uppx = graphics.x_uppx()
if graphics._step_mode and self.gy is None: if graphics._step_mode and self.gy is None:
self._iflat_first = self.shm._first.value self._iflat_first = self.shm._first.value
@ -838,9 +834,7 @@ class Flow(msgspec.Struct): # , frozen=True):
slice_to_head=-2, slice_to_head=-2,
should_redraw=bool(append_diff), should_redraw=bool(append_diff),
# do_append=False,
# NOTE: already passed through by display loop?
# do_append=uppx < 16,
**kwargs **kwargs
) )
@ -872,8 +866,6 @@ class Flow(msgspec.Struct): # , frozen=True):
view_range=(ivl, ivr) if use_vr else None, view_range=(ivl, ivr) if use_vr else None,
# NOTE: already passed through by display loop?
# do_append=uppx < 16,
**kwargs **kwargs
) )