Compare commits

..

No commits in common. "b8cfee7d2f4b86fc07a7f6a488ac2712089570b2" and "19205d57a1b312d09a16b28d9e2829d570f2f9ed" have entirely different histories.

11 changed files with 44 additions and 174 deletions

View File

@ -33,41 +33,7 @@ class SymbolNotFound(BrokerError):
class NoData(BrokerError):
'''
Symbol data not permitted or no data
for time range found.
'''
def __init__(
self,
*args,
frame_size: int = 1000,
) -> None:
super().__init__(*args)
# when raised, machinery can check if the backend
# set a "frame size" for doing datetime calcs.
self.frame_size: int = 1000
class DataUnavailable(BrokerError):
'''
Signal storage requests to terminate.
'''
# TODO: add in a reason that can be displayed in the
# UI (for eg. `kraken` is bs and you should complain
# to them that you can't pull more OHLC data..)
class DataThrottle(BrokerError):
'''
Broker throttled request rate for data.
'''
# TODO: add in throttle metrics/feedback
"Symbol data not permitted"
def resproc(
@ -84,12 +50,12 @@ def resproc(
if not resp.status_code == 200:
raise BrokerError(resp.body)
try:
msg = resp.json()
json = resp.json()
except json.decoder.JSONDecodeError:
log.exception(f"Failed to process {resp}:\n{resp.text}")
raise BrokerError(resp.text)
if log_resp:
log.debug(f"Received json contents:\n{colorize_json(msg)}")
log.debug(f"Received json contents:\n{colorize_json(json)}")
return msg if return_json else resp
return json if return_json else resp

View File

@ -1482,9 +1482,7 @@ async def get_bars(
if 'No market data permissions for' in msg:
# TODO: signalling for no permissions searches
raise NoData(
f'Symbol: {fqsn}',
)
raise NoData(f'Symbol: {fqsn}')
break
elif (
@ -1564,10 +1562,7 @@ async def open_history_client(
if out is None:
# could be trying to retreive bars over weekend
log.error(f"Can't grab bars starting at {end_dt}!?!?")
raise NoData(
f'{end_dt}',
frame_size=2000,
)
raise NoData(f'{end_dt}')
bars, bars_array, first_dt, last_dt = out

View File

@ -20,8 +20,7 @@ Kraken backend.
'''
from contextlib import asynccontextmanager as acm
from dataclasses import asdict, field
from datetime import datetime
from typing import Any, Optional, AsyncIterator, Callable, Union
from typing import Any, Optional, AsyncIterator, Callable
import time
from trio_typing import TaskStatus
@ -41,13 +40,7 @@ import base64
from .. import config
from .._cacheables import open_cached_client
from ._util import (
resproc,
SymbolNotFound,
BrokerError,
DataThrottle,
DataUnavailable,
)
from ._util import resproc, SymbolNotFound, BrokerError
from ..log import get_logger, get_console_log
from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws, NoBsWs
@ -312,7 +305,7 @@ class Client:
action: str,
size: float,
reqid: str = None,
validate: bool = False # set True test call without a real submission
validate: bool = False # set True test call without a real submission
) -> dict:
'''
Place an order and return integer request id provided by client.
@ -398,26 +391,17 @@ class Client:
async def bars(
self,
symbol: str = 'XBTUSD',
# UTC 2017-07-02 12:53:20
since: Optional[Union[int, datetime]] = None,
since: int = None,
count: int = 720, # <- max allowed per query
as_np: bool = True,
) -> dict:
if since is None:
since = pendulum.now('UTC').start_of('minute').subtract(
minutes=count).timestamp()
elif isinstance(since, int):
since = pendulum.from_timestamp(since).timestamp()
else: # presumably a pendulum datetime
since = since.timestamp()
# UTC 2017-07-02 12:53:20 is oldest seconds value
since = str(max(1499000000, int(since)))
since = str(max(1499000000, since))
json = await self._public(
'OHLC',
data={
@ -461,16 +445,7 @@ class Client:
array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
return array
except KeyError:
errmsg = json['error'][0]
if 'not found' in errmsg:
raise SymbolNotFound(errmsg + f': {symbol}')
elif 'Too many requests' in errmsg:
raise DataThrottle(f'{symbol}')
else:
raise BrokerError(errmsg)
raise SymbolNotFound(json['error'][0] + f': {symbol}')
@acm
@ -693,8 +668,8 @@ async def handle_order_requests(
oid=msg.oid,
reqid=msg.reqid,
symbol=msg.symbol,
# TODO: maybe figure out if pending
# cancels will eventually get cancelled
# TODO: maybe figure out if pending cancels will
# eventually get cancelled
reason="Order cancel is still pending?",
broker_details=resp
).dict()
@ -1028,45 +1003,7 @@ async def open_history_client(
# TODO implement history getter for the new storage layer.
async with open_cached_client('kraken') as client:
# lol, kraken won't send any more then the "last"
# 720 1m bars.. so we have to just ignore further
# requests of this type..
queries: int = 0
async def get_ohlc(
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
) -> tuple[
np.ndarray,
datetime, # start
datetime, # end
]:
nonlocal queries
if queries > 0:
raise DataUnavailable
count = 0
while count <= 3:
try:
array = await client.bars(
symbol,
since=end_dt,
)
count += 1
queries += 1
break
except DataThrottle:
log.warning(f'kraken OHLC throttle for {symbol}')
await trio.sleep(1)
start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time'])
return array, start_dt, end_dt
yield get_ohlc
yield client
async def backfill_bars(

View File

@ -148,7 +148,7 @@ def storesh(
enable_modules=['piker.data._ahab'],
):
symbol = symbols[0]
await tsdb_history_update(symbol)
await tsdb_history_update()
trio.run(main)

View File

@ -67,10 +67,6 @@ from ._sampling import (
sample_and_broadcast,
uniform_rate_send,
)
from ..brokers._util import (
NoData,
DataUnavailable,
)
log = get_logger(__name__)
@ -277,19 +273,7 @@ async def start_backfill(
# and count < mx_fills
):
count += 1
try:
array, start_dt, end_dt = await hist(end_dt=start_dt)
except NoData:
# decrement by the diff in time last delivered.
end_dt = start_dt.subtract(seconds=(end_dt - start_dt).seconds)
continue
except DataUnavailable:
# broker is being a bish and we can't pull
# any more..
break
array, start_dt, end_dt = await hist(end_dt=start_dt)
to_push = diff_history(
array,
start_dt,

View File

@ -270,7 +270,6 @@ class Storage:
self,
fqsn: str,
timeframe: Optional[Union[int, str]] = None,
end: Optional[int] = None,
) -> tuple[
MarketstoreClient,
@ -288,7 +287,6 @@ class Storage:
symbols=fqsn,
timeframe=tfstr,
attrgroup='OHLCV',
end=end,
# limit_from_start=True,
# TODO: figure the max limit here given the
@ -348,7 +346,6 @@ class Storage:
self,
fqsn: str,
ohlcv: np.ndarray,
append_and_duplicate: bool = True,
) -> None:
# build mkts schema compat array for writing
@ -376,7 +373,7 @@ class Storage:
# NOTE: will will append duplicates
# for the same timestamp-index.
# TODO: pre deduplicate?
isvariablelength=append_and_duplicate,
isvariablelength=True,
)
log.info(
@ -446,17 +443,17 @@ async def tsdb_history_update(
async with (
open_storage_client(fqsn) as storage,
maybe_open_feed(
[fqsn],
start_stream=False,
# maybe_open_feed(
# [fqsn],
# start_stream=False,
) as (feed, stream),
# ) as (feed, stream),
):
profiler(f'opened feed for {fqsn}')
to_append = feed.shm.array
to_prepend = None
# to_append = feed.shm.array
# to_prepend = None
if fqsn:
symbol = feed.symbols.get(fqsn)
@ -480,11 +477,10 @@ async def tsdb_history_update(
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
profiler(f'listed symbols {syms}')
# TODO: ask if user wants to write history for detected
# available shm buffers?
from tractor.trionics import ipython_embed
await ipython_embed()
# for array in [to_append, to_prepend]:
# if array is None:
# continue
@ -494,7 +490,7 @@ async def tsdb_history_update(
# )
# await storage.write_ohlcv(fqsn, array)
# profiler('Finished db writes')
profiler('Finished db writes')
async def ingest_quote_stream(

View File

@ -167,7 +167,6 @@ def _wma(
assert length == len(weights)
# lol, for long sequences this is nutso slow and expensive..
return np.convolve(signal, weights, 'valid')

View File

@ -309,7 +309,7 @@ async def flow_rates(
if period > 1:
trade_rate_wma = _wma(
dvlm_shm.array['trade_count'][-period:],
dvlm_shm.array['trade_count'],
period,
weights=weights,
)
@ -332,7 +332,7 @@ async def flow_rates(
if period > 1:
dark_trade_rate_wma = _wma(
dvlm_shm.array['dark_trade_count'][-period:],
dvlm_shm.array['dark_trade_count'],
period,
weights=weights,
)

View File

@ -429,7 +429,10 @@ class FastAppendCurve(pg.GraphicsObject):
if (
# std m4 downsample conditions
px_width
and abs(uppx_diff) >= 1
and uppx_diff >= 1
or uppx_diff <= -1
or self._step_mode and abs(uppx_diff) >= 2
):
log.info(
f'{self._name} sampler change: {self._last_uppx} -> {uppx}'

View File

@ -61,7 +61,7 @@ from ..log import get_logger
log = get_logger(__name__)
# TODO: load this from a config.toml!
_quote_throttle_rate: int = 22 # Hz
_quote_throttle_rate: int = 12 # Hz
# a working tick-type-classes template
@ -689,17 +689,15 @@ async def display_symbol_data(
# plot historical vwap if available
wap_in_history = False
# XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?!
# if brokermod._show_wap_in_history:
if brokermod._show_wap_in_history:
# if 'bar_wap' in bars.dtype.fields:
# wap_in_history = True
# chart.draw_curve(
# name='bar_wap',
# shm=ohlcv,
# color='default_light',
# add_label=False,
# )
if 'bar_wap' in bars.dtype.fields:
wap_in_history = True
chart.draw_curve(
name='bar_wap',
data=bars,
add_label=False,
)
# size view to data once at outset
chart.cv._set_yrange()

View File

@ -320,7 +320,7 @@ class Flow(msgspec.Struct): # , frozen=True):
render: bool = True,
array_key: Optional[str] = None,
profiler: Optional[pg.debug.Profiler] = None,
profiler=None,
**kwargs,
@ -524,10 +524,7 @@ class Flow(msgspec.Struct): # , frozen=True):
view_range=(ivl, ivr), # hack
profiler=profiler,
# should_redraw=False,
# NOTE: already passed through by display loop?
# do_append=uppx < 16,
**kwargs,
# do_append=False,
)
curve.show()
profiler('updated ds curve')
@ -592,7 +589,6 @@ class Flow(msgspec.Struct): # , frozen=True):
else:
# ``FastAppendCurve`` case:
array_key = array_key or self.name
uppx = graphics.x_uppx()
if graphics._step_mode and self.gy is None:
self._iflat_first = self.shm._first.value
@ -838,9 +834,7 @@ class Flow(msgspec.Struct): # , frozen=True):
slice_to_head=-2,
should_redraw=bool(append_diff),
# NOTE: already passed through by display loop?
# do_append=uppx < 16,
# do_append=False,
**kwargs
)
@ -872,8 +866,6 @@ class Flow(msgspec.Struct): # , frozen=True):
view_range=(ivl, ivr) if use_vr else None,
# NOTE: already passed through by display loop?
# do_append=uppx < 16,
**kwargs
)