Compare commits
10 Commits
19205d57a1
...
b8cfee7d2f
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | b8cfee7d2f | |
Tyler Goodlet | 2d7aba0193 | |
Tyler Goodlet | 260b632f07 | |
Tyler Goodlet | b8704e1b7f | |
Tyler Goodlet | 800fe7446a | |
Tyler Goodlet | 536d1ff0d1 | |
Tyler Goodlet | be7c047e2f | |
Tyler Goodlet | d3e6ed3ba4 | |
Tyler Goodlet | 329e833e96 | |
Tyler Goodlet | 4e85d1d395 |
|
@ -33,7 +33,41 @@ class SymbolNotFound(BrokerError):
|
||||||
|
|
||||||
|
|
||||||
class NoData(BrokerError):
|
class NoData(BrokerError):
|
||||||
"Symbol data not permitted"
|
'''
|
||||||
|
Symbol data not permitted or no data
|
||||||
|
for time range found.
|
||||||
|
|
||||||
|
'''
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
*args,
|
||||||
|
frame_size: int = 1000,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
super().__init__(*args)
|
||||||
|
|
||||||
|
# when raised, machinery can check if the backend
|
||||||
|
# set a "frame size" for doing datetime calcs.
|
||||||
|
self.frame_size: int = 1000
|
||||||
|
|
||||||
|
|
||||||
|
class DataUnavailable(BrokerError):
|
||||||
|
'''
|
||||||
|
Signal storage requests to terminate.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# TODO: add in a reason that can be displayed in the
|
||||||
|
# UI (for eg. `kraken` is bs and you should complain
|
||||||
|
# to them that you can't pull more OHLC data..)
|
||||||
|
|
||||||
|
|
||||||
|
class DataThrottle(BrokerError):
|
||||||
|
'''
|
||||||
|
Broker throttled request rate for data.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# TODO: add in throttle metrics/feedback
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def resproc(
|
def resproc(
|
||||||
|
@ -50,12 +84,12 @@ def resproc(
|
||||||
if not resp.status_code == 200:
|
if not resp.status_code == 200:
|
||||||
raise BrokerError(resp.body)
|
raise BrokerError(resp.body)
|
||||||
try:
|
try:
|
||||||
json = resp.json()
|
msg = resp.json()
|
||||||
except json.decoder.JSONDecodeError:
|
except json.decoder.JSONDecodeError:
|
||||||
log.exception(f"Failed to process {resp}:\n{resp.text}")
|
log.exception(f"Failed to process {resp}:\n{resp.text}")
|
||||||
raise BrokerError(resp.text)
|
raise BrokerError(resp.text)
|
||||||
|
|
||||||
if log_resp:
|
if log_resp:
|
||||||
log.debug(f"Received json contents:\n{colorize_json(json)}")
|
log.debug(f"Received json contents:\n{colorize_json(msg)}")
|
||||||
|
|
||||||
return json if return_json else resp
|
return msg if return_json else resp
|
||||||
|
|
|
@ -1482,7 +1482,9 @@ async def get_bars(
|
||||||
|
|
||||||
if 'No market data permissions for' in msg:
|
if 'No market data permissions for' in msg:
|
||||||
# TODO: signalling for no permissions searches
|
# TODO: signalling for no permissions searches
|
||||||
raise NoData(f'Symbol: {fqsn}')
|
raise NoData(
|
||||||
|
f'Symbol: {fqsn}',
|
||||||
|
)
|
||||||
break
|
break
|
||||||
|
|
||||||
elif (
|
elif (
|
||||||
|
@ -1562,7 +1564,10 @@ async def open_history_client(
|
||||||
if out is None:
|
if out is None:
|
||||||
# could be trying to retreive bars over weekend
|
# could be trying to retreive bars over weekend
|
||||||
log.error(f"Can't grab bars starting at {end_dt}!?!?")
|
log.error(f"Can't grab bars starting at {end_dt}!?!?")
|
||||||
raise NoData(f'{end_dt}')
|
raise NoData(
|
||||||
|
f'{end_dt}',
|
||||||
|
frame_size=2000,
|
||||||
|
)
|
||||||
|
|
||||||
bars, bars_array, first_dt, last_dt = out
|
bars, bars_array, first_dt, last_dt = out
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,8 @@ Kraken backend.
|
||||||
'''
|
'''
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
from dataclasses import asdict, field
|
from dataclasses import asdict, field
|
||||||
from typing import Any, Optional, AsyncIterator, Callable
|
from datetime import datetime
|
||||||
|
from typing import Any, Optional, AsyncIterator, Callable, Union
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
|
@ -40,7 +41,13 @@ import base64
|
||||||
|
|
||||||
from .. import config
|
from .. import config
|
||||||
from .._cacheables import open_cached_client
|
from .._cacheables import open_cached_client
|
||||||
from ._util import resproc, SymbolNotFound, BrokerError
|
from ._util import (
|
||||||
|
resproc,
|
||||||
|
SymbolNotFound,
|
||||||
|
BrokerError,
|
||||||
|
DataThrottle,
|
||||||
|
DataUnavailable,
|
||||||
|
)
|
||||||
from ..log import get_logger, get_console_log
|
from ..log import get_logger, get_console_log
|
||||||
from ..data import ShmArray
|
from ..data import ShmArray
|
||||||
from ..data._web_bs import open_autorecon_ws, NoBsWs
|
from ..data._web_bs import open_autorecon_ws, NoBsWs
|
||||||
|
@ -391,17 +398,26 @@ class Client:
|
||||||
async def bars(
|
async def bars(
|
||||||
self,
|
self,
|
||||||
symbol: str = 'XBTUSD',
|
symbol: str = 'XBTUSD',
|
||||||
|
|
||||||
# UTC 2017-07-02 12:53:20
|
# UTC 2017-07-02 12:53:20
|
||||||
since: int = None,
|
since: Optional[Union[int, datetime]] = None,
|
||||||
count: int = 720, # <- max allowed per query
|
count: int = 720, # <- max allowed per query
|
||||||
as_np: bool = True,
|
as_np: bool = True,
|
||||||
|
|
||||||
) -> dict:
|
) -> dict:
|
||||||
|
|
||||||
if since is None:
|
if since is None:
|
||||||
since = pendulum.now('UTC').start_of('minute').subtract(
|
since = pendulum.now('UTC').start_of('minute').subtract(
|
||||||
minutes=count).timestamp()
|
minutes=count).timestamp()
|
||||||
|
|
||||||
|
elif isinstance(since, int):
|
||||||
|
since = pendulum.from_timestamp(since).timestamp()
|
||||||
|
|
||||||
|
else: # presumably a pendulum datetime
|
||||||
|
since = since.timestamp()
|
||||||
|
|
||||||
# UTC 2017-07-02 12:53:20 is oldest seconds value
|
# UTC 2017-07-02 12:53:20 is oldest seconds value
|
||||||
since = str(max(1499000000, since))
|
since = str(max(1499000000, int(since)))
|
||||||
json = await self._public(
|
json = await self._public(
|
||||||
'OHLC',
|
'OHLC',
|
||||||
data={
|
data={
|
||||||
|
@ -445,7 +461,16 @@ class Client:
|
||||||
array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
|
array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
|
||||||
return array
|
return array
|
||||||
except KeyError:
|
except KeyError:
|
||||||
raise SymbolNotFound(json['error'][0] + f': {symbol}')
|
errmsg = json['error'][0]
|
||||||
|
|
||||||
|
if 'not found' in errmsg:
|
||||||
|
raise SymbolNotFound(errmsg + f': {symbol}')
|
||||||
|
|
||||||
|
elif 'Too many requests' in errmsg:
|
||||||
|
raise DataThrottle(f'{symbol}')
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise BrokerError(errmsg)
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
|
@ -668,8 +693,8 @@ async def handle_order_requests(
|
||||||
oid=msg.oid,
|
oid=msg.oid,
|
||||||
reqid=msg.reqid,
|
reqid=msg.reqid,
|
||||||
symbol=msg.symbol,
|
symbol=msg.symbol,
|
||||||
# TODO: maybe figure out if pending cancels will
|
# TODO: maybe figure out if pending
|
||||||
# eventually get cancelled
|
# cancels will eventually get cancelled
|
||||||
reason="Order cancel is still pending?",
|
reason="Order cancel is still pending?",
|
||||||
broker_details=resp
|
broker_details=resp
|
||||||
).dict()
|
).dict()
|
||||||
|
@ -1003,7 +1028,45 @@ async def open_history_client(
|
||||||
|
|
||||||
# TODO implement history getter for the new storage layer.
|
# TODO implement history getter for the new storage layer.
|
||||||
async with open_cached_client('kraken') as client:
|
async with open_cached_client('kraken') as client:
|
||||||
yield client
|
|
||||||
|
# lol, kraken won't send any more then the "last"
|
||||||
|
# 720 1m bars.. so we have to just ignore further
|
||||||
|
# requests of this type..
|
||||||
|
queries: int = 0
|
||||||
|
|
||||||
|
async def get_ohlc(
|
||||||
|
end_dt: Optional[datetime] = None,
|
||||||
|
start_dt: Optional[datetime] = None,
|
||||||
|
|
||||||
|
) -> tuple[
|
||||||
|
np.ndarray,
|
||||||
|
datetime, # start
|
||||||
|
datetime, # end
|
||||||
|
]:
|
||||||
|
|
||||||
|
nonlocal queries
|
||||||
|
if queries > 0:
|
||||||
|
raise DataUnavailable
|
||||||
|
|
||||||
|
count = 0
|
||||||
|
while count <= 3:
|
||||||
|
try:
|
||||||
|
array = await client.bars(
|
||||||
|
symbol,
|
||||||
|
since=end_dt,
|
||||||
|
)
|
||||||
|
count += 1
|
||||||
|
queries += 1
|
||||||
|
break
|
||||||
|
except DataThrottle:
|
||||||
|
log.warning(f'kraken OHLC throttle for {symbol}')
|
||||||
|
await trio.sleep(1)
|
||||||
|
|
||||||
|
start_dt = pendulum.from_timestamp(array[0]['time'])
|
||||||
|
end_dt = pendulum.from_timestamp(array[-1]['time'])
|
||||||
|
return array, start_dt, end_dt
|
||||||
|
|
||||||
|
yield get_ohlc
|
||||||
|
|
||||||
|
|
||||||
async def backfill_bars(
|
async def backfill_bars(
|
||||||
|
|
|
@ -148,7 +148,7 @@ def storesh(
|
||||||
enable_modules=['piker.data._ahab'],
|
enable_modules=['piker.data._ahab'],
|
||||||
):
|
):
|
||||||
symbol = symbols[0]
|
symbol = symbols[0]
|
||||||
await tsdb_history_update()
|
await tsdb_history_update(symbol)
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
|
@ -67,6 +67,10 @@ from ._sampling import (
|
||||||
sample_and_broadcast,
|
sample_and_broadcast,
|
||||||
uniform_rate_send,
|
uniform_rate_send,
|
||||||
)
|
)
|
||||||
|
from ..brokers._util import (
|
||||||
|
NoData,
|
||||||
|
DataUnavailable,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
@ -273,7 +277,19 @@ async def start_backfill(
|
||||||
# and count < mx_fills
|
# and count < mx_fills
|
||||||
):
|
):
|
||||||
count += 1
|
count += 1
|
||||||
|
try:
|
||||||
array, start_dt, end_dt = await hist(end_dt=start_dt)
|
array, start_dt, end_dt = await hist(end_dt=start_dt)
|
||||||
|
|
||||||
|
except NoData:
|
||||||
|
# decrement by the diff in time last delivered.
|
||||||
|
end_dt = start_dt.subtract(seconds=(end_dt - start_dt).seconds)
|
||||||
|
continue
|
||||||
|
|
||||||
|
except DataUnavailable:
|
||||||
|
# broker is being a bish and we can't pull
|
||||||
|
# any more..
|
||||||
|
break
|
||||||
|
|
||||||
to_push = diff_history(
|
to_push = diff_history(
|
||||||
array,
|
array,
|
||||||
start_dt,
|
start_dt,
|
||||||
|
|
|
@ -270,6 +270,7 @@ class Storage:
|
||||||
self,
|
self,
|
||||||
fqsn: str,
|
fqsn: str,
|
||||||
timeframe: Optional[Union[int, str]] = None,
|
timeframe: Optional[Union[int, str]] = None,
|
||||||
|
end: Optional[int] = None,
|
||||||
|
|
||||||
) -> tuple[
|
) -> tuple[
|
||||||
MarketstoreClient,
|
MarketstoreClient,
|
||||||
|
@ -287,6 +288,7 @@ class Storage:
|
||||||
symbols=fqsn,
|
symbols=fqsn,
|
||||||
timeframe=tfstr,
|
timeframe=tfstr,
|
||||||
attrgroup='OHLCV',
|
attrgroup='OHLCV',
|
||||||
|
end=end,
|
||||||
# limit_from_start=True,
|
# limit_from_start=True,
|
||||||
|
|
||||||
# TODO: figure the max limit here given the
|
# TODO: figure the max limit here given the
|
||||||
|
@ -346,6 +348,7 @@ class Storage:
|
||||||
self,
|
self,
|
||||||
fqsn: str,
|
fqsn: str,
|
||||||
ohlcv: np.ndarray,
|
ohlcv: np.ndarray,
|
||||||
|
append_and_duplicate: bool = True,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
# build mkts schema compat array for writing
|
# build mkts schema compat array for writing
|
||||||
|
@ -373,7 +376,7 @@ class Storage:
|
||||||
# NOTE: will will append duplicates
|
# NOTE: will will append duplicates
|
||||||
# for the same timestamp-index.
|
# for the same timestamp-index.
|
||||||
# TODO: pre deduplicate?
|
# TODO: pre deduplicate?
|
||||||
isvariablelength=True,
|
isvariablelength=append_and_duplicate,
|
||||||
)
|
)
|
||||||
|
|
||||||
log.info(
|
log.info(
|
||||||
|
@ -443,17 +446,17 @@ async def tsdb_history_update(
|
||||||
async with (
|
async with (
|
||||||
open_storage_client(fqsn) as storage,
|
open_storage_client(fqsn) as storage,
|
||||||
|
|
||||||
# maybe_open_feed(
|
maybe_open_feed(
|
||||||
# [fqsn],
|
[fqsn],
|
||||||
# start_stream=False,
|
start_stream=False,
|
||||||
|
|
||||||
# ) as (feed, stream),
|
) as (feed, stream),
|
||||||
):
|
):
|
||||||
profiler(f'opened feed for {fqsn}')
|
profiler(f'opened feed for {fqsn}')
|
||||||
|
|
||||||
|
|
||||||
# to_append = feed.shm.array
|
to_append = feed.shm.array
|
||||||
# to_prepend = None
|
to_prepend = None
|
||||||
|
|
||||||
if fqsn:
|
if fqsn:
|
||||||
symbol = feed.symbols.get(fqsn)
|
symbol = feed.symbols.get(fqsn)
|
||||||
|
@ -477,10 +480,11 @@ async def tsdb_history_update(
|
||||||
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
|
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
|
||||||
profiler(f'listed symbols {syms}')
|
profiler(f'listed symbols {syms}')
|
||||||
|
|
||||||
|
# TODO: ask if user wants to write history for detected
|
||||||
|
# available shm buffers?
|
||||||
from tractor.trionics import ipython_embed
|
from tractor.trionics import ipython_embed
|
||||||
await ipython_embed()
|
await ipython_embed()
|
||||||
|
|
||||||
|
|
||||||
# for array in [to_append, to_prepend]:
|
# for array in [to_append, to_prepend]:
|
||||||
# if array is None:
|
# if array is None:
|
||||||
# continue
|
# continue
|
||||||
|
@ -490,7 +494,7 @@ async def tsdb_history_update(
|
||||||
# )
|
# )
|
||||||
# await storage.write_ohlcv(fqsn, array)
|
# await storage.write_ohlcv(fqsn, array)
|
||||||
|
|
||||||
profiler('Finished db writes')
|
# profiler('Finished db writes')
|
||||||
|
|
||||||
|
|
||||||
async def ingest_quote_stream(
|
async def ingest_quote_stream(
|
||||||
|
|
|
@ -167,6 +167,7 @@ def _wma(
|
||||||
|
|
||||||
assert length == len(weights)
|
assert length == len(weights)
|
||||||
|
|
||||||
|
# lol, for long sequences this is nutso slow and expensive..
|
||||||
return np.convolve(signal, weights, 'valid')
|
return np.convolve(signal, weights, 'valid')
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -309,7 +309,7 @@ async def flow_rates(
|
||||||
|
|
||||||
if period > 1:
|
if period > 1:
|
||||||
trade_rate_wma = _wma(
|
trade_rate_wma = _wma(
|
||||||
dvlm_shm.array['trade_count'],
|
dvlm_shm.array['trade_count'][-period:],
|
||||||
period,
|
period,
|
||||||
weights=weights,
|
weights=weights,
|
||||||
)
|
)
|
||||||
|
@ -332,7 +332,7 @@ async def flow_rates(
|
||||||
|
|
||||||
if period > 1:
|
if period > 1:
|
||||||
dark_trade_rate_wma = _wma(
|
dark_trade_rate_wma = _wma(
|
||||||
dvlm_shm.array['dark_trade_count'],
|
dvlm_shm.array['dark_trade_count'][-period:],
|
||||||
period,
|
period,
|
||||||
weights=weights,
|
weights=weights,
|
||||||
)
|
)
|
||||||
|
|
|
@ -429,10 +429,7 @@ class FastAppendCurve(pg.GraphicsObject):
|
||||||
if (
|
if (
|
||||||
# std m4 downsample conditions
|
# std m4 downsample conditions
|
||||||
px_width
|
px_width
|
||||||
and uppx_diff >= 1
|
and abs(uppx_diff) >= 1
|
||||||
or uppx_diff <= -1
|
|
||||||
or self._step_mode and abs(uppx_diff) >= 2
|
|
||||||
|
|
||||||
):
|
):
|
||||||
log.info(
|
log.info(
|
||||||
f'{self._name} sampler change: {self._last_uppx} -> {uppx}'
|
f'{self._name} sampler change: {self._last_uppx} -> {uppx}'
|
||||||
|
|
|
@ -61,7 +61,7 @@ from ..log import get_logger
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
# TODO: load this from a config.toml!
|
# TODO: load this from a config.toml!
|
||||||
_quote_throttle_rate: int = 12 # Hz
|
_quote_throttle_rate: int = 22 # Hz
|
||||||
|
|
||||||
|
|
||||||
# a working tick-type-classes template
|
# a working tick-type-classes template
|
||||||
|
@ -689,15 +689,17 @@ async def display_symbol_data(
|
||||||
# plot historical vwap if available
|
# plot historical vwap if available
|
||||||
wap_in_history = False
|
wap_in_history = False
|
||||||
|
|
||||||
if brokermod._show_wap_in_history:
|
# XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?!
|
||||||
|
# if brokermod._show_wap_in_history:
|
||||||
|
|
||||||
if 'bar_wap' in bars.dtype.fields:
|
# if 'bar_wap' in bars.dtype.fields:
|
||||||
wap_in_history = True
|
# wap_in_history = True
|
||||||
chart.draw_curve(
|
# chart.draw_curve(
|
||||||
name='bar_wap',
|
# name='bar_wap',
|
||||||
data=bars,
|
# shm=ohlcv,
|
||||||
add_label=False,
|
# color='default_light',
|
||||||
)
|
# add_label=False,
|
||||||
|
# )
|
||||||
|
|
||||||
# size view to data once at outset
|
# size view to data once at outset
|
||||||
chart.cv._set_yrange()
|
chart.cv._set_yrange()
|
||||||
|
|
|
@ -320,7 +320,7 @@ class Flow(msgspec.Struct): # , frozen=True):
|
||||||
render: bool = True,
|
render: bool = True,
|
||||||
array_key: Optional[str] = None,
|
array_key: Optional[str] = None,
|
||||||
|
|
||||||
profiler=None,
|
profiler: Optional[pg.debug.Profiler] = None,
|
||||||
|
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
|
@ -524,7 +524,10 @@ class Flow(msgspec.Struct): # , frozen=True):
|
||||||
view_range=(ivl, ivr), # hack
|
view_range=(ivl, ivr), # hack
|
||||||
profiler=profiler,
|
profiler=profiler,
|
||||||
# should_redraw=False,
|
# should_redraw=False,
|
||||||
# do_append=False,
|
|
||||||
|
# NOTE: already passed through by display loop?
|
||||||
|
# do_append=uppx < 16,
|
||||||
|
**kwargs,
|
||||||
)
|
)
|
||||||
curve.show()
|
curve.show()
|
||||||
profiler('updated ds curve')
|
profiler('updated ds curve')
|
||||||
|
@ -589,6 +592,7 @@ class Flow(msgspec.Struct): # , frozen=True):
|
||||||
else:
|
else:
|
||||||
# ``FastAppendCurve`` case:
|
# ``FastAppendCurve`` case:
|
||||||
array_key = array_key or self.name
|
array_key = array_key or self.name
|
||||||
|
uppx = graphics.x_uppx()
|
||||||
|
|
||||||
if graphics._step_mode and self.gy is None:
|
if graphics._step_mode and self.gy is None:
|
||||||
self._iflat_first = self.shm._first.value
|
self._iflat_first = self.shm._first.value
|
||||||
|
@ -834,7 +838,9 @@ class Flow(msgspec.Struct): # , frozen=True):
|
||||||
slice_to_head=-2,
|
slice_to_head=-2,
|
||||||
|
|
||||||
should_redraw=bool(append_diff),
|
should_redraw=bool(append_diff),
|
||||||
# do_append=False,
|
|
||||||
|
# NOTE: already passed through by display loop?
|
||||||
|
# do_append=uppx < 16,
|
||||||
|
|
||||||
**kwargs
|
**kwargs
|
||||||
)
|
)
|
||||||
|
@ -866,6 +872,8 @@ class Flow(msgspec.Struct): # , frozen=True):
|
||||||
|
|
||||||
view_range=(ivl, ivr) if use_vr else None,
|
view_range=(ivl, ivr) if use_vr else None,
|
||||||
|
|
||||||
|
# NOTE: already passed through by display loop?
|
||||||
|
# do_append=uppx < 16,
|
||||||
**kwargs
|
**kwargs
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue