Compare commits
10 Commits
19205d57a1
...
b8cfee7d2f
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | b8cfee7d2f | |
Tyler Goodlet | 2d7aba0193 | |
Tyler Goodlet | 260b632f07 | |
Tyler Goodlet | b8704e1b7f | |
Tyler Goodlet | 800fe7446a | |
Tyler Goodlet | 536d1ff0d1 | |
Tyler Goodlet | be7c047e2f | |
Tyler Goodlet | d3e6ed3ba4 | |
Tyler Goodlet | 329e833e96 | |
Tyler Goodlet | 4e85d1d395 |
|
@ -33,7 +33,41 @@ class SymbolNotFound(BrokerError):
|
|||
|
||||
|
||||
class NoData(BrokerError):
|
||||
"Symbol data not permitted"
|
||||
'''
|
||||
Symbol data not permitted or no data
|
||||
for time range found.
|
||||
|
||||
'''
|
||||
def __init__(
|
||||
self,
|
||||
*args,
|
||||
frame_size: int = 1000,
|
||||
|
||||
) -> None:
|
||||
super().__init__(*args)
|
||||
|
||||
# when raised, machinery can check if the backend
|
||||
# set a "frame size" for doing datetime calcs.
|
||||
self.frame_size: int = 1000
|
||||
|
||||
|
||||
class DataUnavailable(BrokerError):
|
||||
'''
|
||||
Signal storage requests to terminate.
|
||||
|
||||
'''
|
||||
# TODO: add in a reason that can be displayed in the
|
||||
# UI (for eg. `kraken` is bs and you should complain
|
||||
# to them that you can't pull more OHLC data..)
|
||||
|
||||
|
||||
class DataThrottle(BrokerError):
|
||||
'''
|
||||
Broker throttled request rate for data.
|
||||
|
||||
'''
|
||||
# TODO: add in throttle metrics/feedback
|
||||
|
||||
|
||||
|
||||
def resproc(
|
||||
|
@ -50,12 +84,12 @@ def resproc(
|
|||
if not resp.status_code == 200:
|
||||
raise BrokerError(resp.body)
|
||||
try:
|
||||
json = resp.json()
|
||||
msg = resp.json()
|
||||
except json.decoder.JSONDecodeError:
|
||||
log.exception(f"Failed to process {resp}:\n{resp.text}")
|
||||
raise BrokerError(resp.text)
|
||||
|
||||
if log_resp:
|
||||
log.debug(f"Received json contents:\n{colorize_json(json)}")
|
||||
log.debug(f"Received json contents:\n{colorize_json(msg)}")
|
||||
|
||||
return json if return_json else resp
|
||||
return msg if return_json else resp
|
||||
|
|
|
@ -1482,7 +1482,9 @@ async def get_bars(
|
|||
|
||||
if 'No market data permissions for' in msg:
|
||||
# TODO: signalling for no permissions searches
|
||||
raise NoData(f'Symbol: {fqsn}')
|
||||
raise NoData(
|
||||
f'Symbol: {fqsn}',
|
||||
)
|
||||
break
|
||||
|
||||
elif (
|
||||
|
@ -1562,7 +1564,10 @@ async def open_history_client(
|
|||
if out is None:
|
||||
# could be trying to retreive bars over weekend
|
||||
log.error(f"Can't grab bars starting at {end_dt}!?!?")
|
||||
raise NoData(f'{end_dt}')
|
||||
raise NoData(
|
||||
f'{end_dt}',
|
||||
frame_size=2000,
|
||||
)
|
||||
|
||||
bars, bars_array, first_dt, last_dt = out
|
||||
|
||||
|
|
|
@ -20,7 +20,8 @@ Kraken backend.
|
|||
'''
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from dataclasses import asdict, field
|
||||
from typing import Any, Optional, AsyncIterator, Callable
|
||||
from datetime import datetime
|
||||
from typing import Any, Optional, AsyncIterator, Callable, Union
|
||||
import time
|
||||
|
||||
from trio_typing import TaskStatus
|
||||
|
@ -40,7 +41,13 @@ import base64
|
|||
|
||||
from .. import config
|
||||
from .._cacheables import open_cached_client
|
||||
from ._util import resproc, SymbolNotFound, BrokerError
|
||||
from ._util import (
|
||||
resproc,
|
||||
SymbolNotFound,
|
||||
BrokerError,
|
||||
DataThrottle,
|
||||
DataUnavailable,
|
||||
)
|
||||
from ..log import get_logger, get_console_log
|
||||
from ..data import ShmArray
|
||||
from ..data._web_bs import open_autorecon_ws, NoBsWs
|
||||
|
@ -391,17 +398,26 @@ class Client:
|
|||
async def bars(
|
||||
self,
|
||||
symbol: str = 'XBTUSD',
|
||||
|
||||
# UTC 2017-07-02 12:53:20
|
||||
since: int = None,
|
||||
since: Optional[Union[int, datetime]] = None,
|
||||
count: int = 720, # <- max allowed per query
|
||||
as_np: bool = True,
|
||||
|
||||
) -> dict:
|
||||
|
||||
if since is None:
|
||||
since = pendulum.now('UTC').start_of('minute').subtract(
|
||||
minutes=count).timestamp()
|
||||
|
||||
elif isinstance(since, int):
|
||||
since = pendulum.from_timestamp(since).timestamp()
|
||||
|
||||
else: # presumably a pendulum datetime
|
||||
since = since.timestamp()
|
||||
|
||||
# UTC 2017-07-02 12:53:20 is oldest seconds value
|
||||
since = str(max(1499000000, since))
|
||||
since = str(max(1499000000, int(since)))
|
||||
json = await self._public(
|
||||
'OHLC',
|
||||
data={
|
||||
|
@ -445,7 +461,16 @@ class Client:
|
|||
array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
|
||||
return array
|
||||
except KeyError:
|
||||
raise SymbolNotFound(json['error'][0] + f': {symbol}')
|
||||
errmsg = json['error'][0]
|
||||
|
||||
if 'not found' in errmsg:
|
||||
raise SymbolNotFound(errmsg + f': {symbol}')
|
||||
|
||||
elif 'Too many requests' in errmsg:
|
||||
raise DataThrottle(f'{symbol}')
|
||||
|
||||
else:
|
||||
raise BrokerError(errmsg)
|
||||
|
||||
|
||||
@acm
|
||||
|
@ -668,8 +693,8 @@ async def handle_order_requests(
|
|||
oid=msg.oid,
|
||||
reqid=msg.reqid,
|
||||
symbol=msg.symbol,
|
||||
# TODO: maybe figure out if pending cancels will
|
||||
# eventually get cancelled
|
||||
# TODO: maybe figure out if pending
|
||||
# cancels will eventually get cancelled
|
||||
reason="Order cancel is still pending?",
|
||||
broker_details=resp
|
||||
).dict()
|
||||
|
@ -1003,7 +1028,45 @@ async def open_history_client(
|
|||
|
||||
# TODO implement history getter for the new storage layer.
|
||||
async with open_cached_client('kraken') as client:
|
||||
yield client
|
||||
|
||||
# lol, kraken won't send any more then the "last"
|
||||
# 720 1m bars.. so we have to just ignore further
|
||||
# requests of this type..
|
||||
queries: int = 0
|
||||
|
||||
async def get_ohlc(
|
||||
end_dt: Optional[datetime] = None,
|
||||
start_dt: Optional[datetime] = None,
|
||||
|
||||
) -> tuple[
|
||||
np.ndarray,
|
||||
datetime, # start
|
||||
datetime, # end
|
||||
]:
|
||||
|
||||
nonlocal queries
|
||||
if queries > 0:
|
||||
raise DataUnavailable
|
||||
|
||||
count = 0
|
||||
while count <= 3:
|
||||
try:
|
||||
array = await client.bars(
|
||||
symbol,
|
||||
since=end_dt,
|
||||
)
|
||||
count += 1
|
||||
queries += 1
|
||||
break
|
||||
except DataThrottle:
|
||||
log.warning(f'kraken OHLC throttle for {symbol}')
|
||||
await trio.sleep(1)
|
||||
|
||||
start_dt = pendulum.from_timestamp(array[0]['time'])
|
||||
end_dt = pendulum.from_timestamp(array[-1]['time'])
|
||||
return array, start_dt, end_dt
|
||||
|
||||
yield get_ohlc
|
||||
|
||||
|
||||
async def backfill_bars(
|
||||
|
|
|
@ -148,7 +148,7 @@ def storesh(
|
|||
enable_modules=['piker.data._ahab'],
|
||||
):
|
||||
symbol = symbols[0]
|
||||
await tsdb_history_update()
|
||||
await tsdb_history_update(symbol)
|
||||
|
||||
trio.run(main)
|
||||
|
||||
|
|
|
@ -67,6 +67,10 @@ from ._sampling import (
|
|||
sample_and_broadcast,
|
||||
uniform_rate_send,
|
||||
)
|
||||
from ..brokers._util import (
|
||||
NoData,
|
||||
DataUnavailable,
|
||||
)
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
@ -273,7 +277,19 @@ async def start_backfill(
|
|||
# and count < mx_fills
|
||||
):
|
||||
count += 1
|
||||
try:
|
||||
array, start_dt, end_dt = await hist(end_dt=start_dt)
|
||||
|
||||
except NoData:
|
||||
# decrement by the diff in time last delivered.
|
||||
end_dt = start_dt.subtract(seconds=(end_dt - start_dt).seconds)
|
||||
continue
|
||||
|
||||
except DataUnavailable:
|
||||
# broker is being a bish and we can't pull
|
||||
# any more..
|
||||
break
|
||||
|
||||
to_push = diff_history(
|
||||
array,
|
||||
start_dt,
|
||||
|
|
|
@ -270,6 +270,7 @@ class Storage:
|
|||
self,
|
||||
fqsn: str,
|
||||
timeframe: Optional[Union[int, str]] = None,
|
||||
end: Optional[int] = None,
|
||||
|
||||
) -> tuple[
|
||||
MarketstoreClient,
|
||||
|
@ -287,6 +288,7 @@ class Storage:
|
|||
symbols=fqsn,
|
||||
timeframe=tfstr,
|
||||
attrgroup='OHLCV',
|
||||
end=end,
|
||||
# limit_from_start=True,
|
||||
|
||||
# TODO: figure the max limit here given the
|
||||
|
@ -346,6 +348,7 @@ class Storage:
|
|||
self,
|
||||
fqsn: str,
|
||||
ohlcv: np.ndarray,
|
||||
append_and_duplicate: bool = True,
|
||||
|
||||
) -> None:
|
||||
# build mkts schema compat array for writing
|
||||
|
@ -373,7 +376,7 @@ class Storage:
|
|||
# NOTE: will will append duplicates
|
||||
# for the same timestamp-index.
|
||||
# TODO: pre deduplicate?
|
||||
isvariablelength=True,
|
||||
isvariablelength=append_and_duplicate,
|
||||
)
|
||||
|
||||
log.info(
|
||||
|
@ -443,17 +446,17 @@ async def tsdb_history_update(
|
|||
async with (
|
||||
open_storage_client(fqsn) as storage,
|
||||
|
||||
# maybe_open_feed(
|
||||
# [fqsn],
|
||||
# start_stream=False,
|
||||
maybe_open_feed(
|
||||
[fqsn],
|
||||
start_stream=False,
|
||||
|
||||
# ) as (feed, stream),
|
||||
) as (feed, stream),
|
||||
):
|
||||
profiler(f'opened feed for {fqsn}')
|
||||
|
||||
|
||||
# to_append = feed.shm.array
|
||||
# to_prepend = None
|
||||
to_append = feed.shm.array
|
||||
to_prepend = None
|
||||
|
||||
if fqsn:
|
||||
symbol = feed.symbols.get(fqsn)
|
||||
|
@ -477,10 +480,11 @@ async def tsdb_history_update(
|
|||
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
|
||||
profiler(f'listed symbols {syms}')
|
||||
|
||||
# TODO: ask if user wants to write history for detected
|
||||
# available shm buffers?
|
||||
from tractor.trionics import ipython_embed
|
||||
await ipython_embed()
|
||||
|
||||
|
||||
# for array in [to_append, to_prepend]:
|
||||
# if array is None:
|
||||
# continue
|
||||
|
@ -490,7 +494,7 @@ async def tsdb_history_update(
|
|||
# )
|
||||
# await storage.write_ohlcv(fqsn, array)
|
||||
|
||||
profiler('Finished db writes')
|
||||
# profiler('Finished db writes')
|
||||
|
||||
|
||||
async def ingest_quote_stream(
|
||||
|
|
|
@ -167,6 +167,7 @@ def _wma(
|
|||
|
||||
assert length == len(weights)
|
||||
|
||||
# lol, for long sequences this is nutso slow and expensive..
|
||||
return np.convolve(signal, weights, 'valid')
|
||||
|
||||
|
||||
|
|
|
@ -309,7 +309,7 @@ async def flow_rates(
|
|||
|
||||
if period > 1:
|
||||
trade_rate_wma = _wma(
|
||||
dvlm_shm.array['trade_count'],
|
||||
dvlm_shm.array['trade_count'][-period:],
|
||||
period,
|
||||
weights=weights,
|
||||
)
|
||||
|
@ -332,7 +332,7 @@ async def flow_rates(
|
|||
|
||||
if period > 1:
|
||||
dark_trade_rate_wma = _wma(
|
||||
dvlm_shm.array['dark_trade_count'],
|
||||
dvlm_shm.array['dark_trade_count'][-period:],
|
||||
period,
|
||||
weights=weights,
|
||||
)
|
||||
|
|
|
@ -429,10 +429,7 @@ class FastAppendCurve(pg.GraphicsObject):
|
|||
if (
|
||||
# std m4 downsample conditions
|
||||
px_width
|
||||
and uppx_diff >= 1
|
||||
or uppx_diff <= -1
|
||||
or self._step_mode and abs(uppx_diff) >= 2
|
||||
|
||||
and abs(uppx_diff) >= 1
|
||||
):
|
||||
log.info(
|
||||
f'{self._name} sampler change: {self._last_uppx} -> {uppx}'
|
||||
|
|
|
@ -61,7 +61,7 @@ from ..log import get_logger
|
|||
log = get_logger(__name__)
|
||||
|
||||
# TODO: load this from a config.toml!
|
||||
_quote_throttle_rate: int = 12 # Hz
|
||||
_quote_throttle_rate: int = 22 # Hz
|
||||
|
||||
|
||||
# a working tick-type-classes template
|
||||
|
@ -689,15 +689,17 @@ async def display_symbol_data(
|
|||
# plot historical vwap if available
|
||||
wap_in_history = False
|
||||
|
||||
if brokermod._show_wap_in_history:
|
||||
# XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?!
|
||||
# if brokermod._show_wap_in_history:
|
||||
|
||||
if 'bar_wap' in bars.dtype.fields:
|
||||
wap_in_history = True
|
||||
chart.draw_curve(
|
||||
name='bar_wap',
|
||||
data=bars,
|
||||
add_label=False,
|
||||
)
|
||||
# if 'bar_wap' in bars.dtype.fields:
|
||||
# wap_in_history = True
|
||||
# chart.draw_curve(
|
||||
# name='bar_wap',
|
||||
# shm=ohlcv,
|
||||
# color='default_light',
|
||||
# add_label=False,
|
||||
# )
|
||||
|
||||
# size view to data once at outset
|
||||
chart.cv._set_yrange()
|
||||
|
|
|
@ -320,7 +320,7 @@ class Flow(msgspec.Struct): # , frozen=True):
|
|||
render: bool = True,
|
||||
array_key: Optional[str] = None,
|
||||
|
||||
profiler=None,
|
||||
profiler: Optional[pg.debug.Profiler] = None,
|
||||
|
||||
**kwargs,
|
||||
|
||||
|
@ -524,7 +524,10 @@ class Flow(msgspec.Struct): # , frozen=True):
|
|||
view_range=(ivl, ivr), # hack
|
||||
profiler=profiler,
|
||||
# should_redraw=False,
|
||||
# do_append=False,
|
||||
|
||||
# NOTE: already passed through by display loop?
|
||||
# do_append=uppx < 16,
|
||||
**kwargs,
|
||||
)
|
||||
curve.show()
|
||||
profiler('updated ds curve')
|
||||
|
@ -589,6 +592,7 @@ class Flow(msgspec.Struct): # , frozen=True):
|
|||
else:
|
||||
# ``FastAppendCurve`` case:
|
||||
array_key = array_key or self.name
|
||||
uppx = graphics.x_uppx()
|
||||
|
||||
if graphics._step_mode and self.gy is None:
|
||||
self._iflat_first = self.shm._first.value
|
||||
|
@ -834,7 +838,9 @@ class Flow(msgspec.Struct): # , frozen=True):
|
|||
slice_to_head=-2,
|
||||
|
||||
should_redraw=bool(append_diff),
|
||||
# do_append=False,
|
||||
|
||||
# NOTE: already passed through by display loop?
|
||||
# do_append=uppx < 16,
|
||||
|
||||
**kwargs
|
||||
)
|
||||
|
@ -866,6 +872,8 @@ class Flow(msgspec.Struct): # , frozen=True):
|
|||
|
||||
view_range=(ivl, ivr) if use_vr else None,
|
||||
|
||||
# NOTE: already passed through by display loop?
|
||||
# do_append=uppx < 16,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
|
|
Loading…
Reference in New Issue