Compare commits
No commits in common. "b8cfee7d2f4b86fc07a7f6a488ac2712089570b2" and "19205d57a1b312d09a16b28d9e2829d570f2f9ed" have entirely different histories.
b8cfee7d2f
...
19205d57a1
|
@ -33,41 +33,7 @@ class SymbolNotFound(BrokerError):
|
||||||
|
|
||||||
|
|
||||||
class NoData(BrokerError):
|
class NoData(BrokerError):
|
||||||
'''
|
"Symbol data not permitted"
|
||||||
Symbol data not permitted or no data
|
|
||||||
for time range found.
|
|
||||||
|
|
||||||
'''
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
*args,
|
|
||||||
frame_size: int = 1000,
|
|
||||||
|
|
||||||
) -> None:
|
|
||||||
super().__init__(*args)
|
|
||||||
|
|
||||||
# when raised, machinery can check if the backend
|
|
||||||
# set a "frame size" for doing datetime calcs.
|
|
||||||
self.frame_size: int = 1000
|
|
||||||
|
|
||||||
|
|
||||||
class DataUnavailable(BrokerError):
|
|
||||||
'''
|
|
||||||
Signal storage requests to terminate.
|
|
||||||
|
|
||||||
'''
|
|
||||||
# TODO: add in a reason that can be displayed in the
|
|
||||||
# UI (for eg. `kraken` is bs and you should complain
|
|
||||||
# to them that you can't pull more OHLC data..)
|
|
||||||
|
|
||||||
|
|
||||||
class DataThrottle(BrokerError):
|
|
||||||
'''
|
|
||||||
Broker throttled request rate for data.
|
|
||||||
|
|
||||||
'''
|
|
||||||
# TODO: add in throttle metrics/feedback
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def resproc(
|
def resproc(
|
||||||
|
@ -84,12 +50,12 @@ def resproc(
|
||||||
if not resp.status_code == 200:
|
if not resp.status_code == 200:
|
||||||
raise BrokerError(resp.body)
|
raise BrokerError(resp.body)
|
||||||
try:
|
try:
|
||||||
msg = resp.json()
|
json = resp.json()
|
||||||
except json.decoder.JSONDecodeError:
|
except json.decoder.JSONDecodeError:
|
||||||
log.exception(f"Failed to process {resp}:\n{resp.text}")
|
log.exception(f"Failed to process {resp}:\n{resp.text}")
|
||||||
raise BrokerError(resp.text)
|
raise BrokerError(resp.text)
|
||||||
|
|
||||||
if log_resp:
|
if log_resp:
|
||||||
log.debug(f"Received json contents:\n{colorize_json(msg)}")
|
log.debug(f"Received json contents:\n{colorize_json(json)}")
|
||||||
|
|
||||||
return msg if return_json else resp
|
return json if return_json else resp
|
||||||
|
|
|
@ -1482,9 +1482,7 @@ async def get_bars(
|
||||||
|
|
||||||
if 'No market data permissions for' in msg:
|
if 'No market data permissions for' in msg:
|
||||||
# TODO: signalling for no permissions searches
|
# TODO: signalling for no permissions searches
|
||||||
raise NoData(
|
raise NoData(f'Symbol: {fqsn}')
|
||||||
f'Symbol: {fqsn}',
|
|
||||||
)
|
|
||||||
break
|
break
|
||||||
|
|
||||||
elif (
|
elif (
|
||||||
|
@ -1564,10 +1562,7 @@ async def open_history_client(
|
||||||
if out is None:
|
if out is None:
|
||||||
# could be trying to retreive bars over weekend
|
# could be trying to retreive bars over weekend
|
||||||
log.error(f"Can't grab bars starting at {end_dt}!?!?")
|
log.error(f"Can't grab bars starting at {end_dt}!?!?")
|
||||||
raise NoData(
|
raise NoData(f'{end_dt}')
|
||||||
f'{end_dt}',
|
|
||||||
frame_size=2000,
|
|
||||||
)
|
|
||||||
|
|
||||||
bars, bars_array, first_dt, last_dt = out
|
bars, bars_array, first_dt, last_dt = out
|
||||||
|
|
||||||
|
|
|
@ -20,8 +20,7 @@ Kraken backend.
|
||||||
'''
|
'''
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
from dataclasses import asdict, field
|
from dataclasses import asdict, field
|
||||||
from datetime import datetime
|
from typing import Any, Optional, AsyncIterator, Callable
|
||||||
from typing import Any, Optional, AsyncIterator, Callable, Union
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
|
@ -41,13 +40,7 @@ import base64
|
||||||
|
|
||||||
from .. import config
|
from .. import config
|
||||||
from .._cacheables import open_cached_client
|
from .._cacheables import open_cached_client
|
||||||
from ._util import (
|
from ._util import resproc, SymbolNotFound, BrokerError
|
||||||
resproc,
|
|
||||||
SymbolNotFound,
|
|
||||||
BrokerError,
|
|
||||||
DataThrottle,
|
|
||||||
DataUnavailable,
|
|
||||||
)
|
|
||||||
from ..log import get_logger, get_console_log
|
from ..log import get_logger, get_console_log
|
||||||
from ..data import ShmArray
|
from ..data import ShmArray
|
||||||
from ..data._web_bs import open_autorecon_ws, NoBsWs
|
from ..data._web_bs import open_autorecon_ws, NoBsWs
|
||||||
|
@ -398,26 +391,17 @@ class Client:
|
||||||
async def bars(
|
async def bars(
|
||||||
self,
|
self,
|
||||||
symbol: str = 'XBTUSD',
|
symbol: str = 'XBTUSD',
|
||||||
|
|
||||||
# UTC 2017-07-02 12:53:20
|
# UTC 2017-07-02 12:53:20
|
||||||
since: Optional[Union[int, datetime]] = None,
|
since: int = None,
|
||||||
count: int = 720, # <- max allowed per query
|
count: int = 720, # <- max allowed per query
|
||||||
as_np: bool = True,
|
as_np: bool = True,
|
||||||
|
|
||||||
) -> dict:
|
) -> dict:
|
||||||
|
|
||||||
if since is None:
|
if since is None:
|
||||||
since = pendulum.now('UTC').start_of('minute').subtract(
|
since = pendulum.now('UTC').start_of('minute').subtract(
|
||||||
minutes=count).timestamp()
|
minutes=count).timestamp()
|
||||||
|
|
||||||
elif isinstance(since, int):
|
|
||||||
since = pendulum.from_timestamp(since).timestamp()
|
|
||||||
|
|
||||||
else: # presumably a pendulum datetime
|
|
||||||
since = since.timestamp()
|
|
||||||
|
|
||||||
# UTC 2017-07-02 12:53:20 is oldest seconds value
|
# UTC 2017-07-02 12:53:20 is oldest seconds value
|
||||||
since = str(max(1499000000, int(since)))
|
since = str(max(1499000000, since))
|
||||||
json = await self._public(
|
json = await self._public(
|
||||||
'OHLC',
|
'OHLC',
|
||||||
data={
|
data={
|
||||||
|
@ -461,16 +445,7 @@ class Client:
|
||||||
array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
|
array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
|
||||||
return array
|
return array
|
||||||
except KeyError:
|
except KeyError:
|
||||||
errmsg = json['error'][0]
|
raise SymbolNotFound(json['error'][0] + f': {symbol}')
|
||||||
|
|
||||||
if 'not found' in errmsg:
|
|
||||||
raise SymbolNotFound(errmsg + f': {symbol}')
|
|
||||||
|
|
||||||
elif 'Too many requests' in errmsg:
|
|
||||||
raise DataThrottle(f'{symbol}')
|
|
||||||
|
|
||||||
else:
|
|
||||||
raise BrokerError(errmsg)
|
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
|
@ -693,8 +668,8 @@ async def handle_order_requests(
|
||||||
oid=msg.oid,
|
oid=msg.oid,
|
||||||
reqid=msg.reqid,
|
reqid=msg.reqid,
|
||||||
symbol=msg.symbol,
|
symbol=msg.symbol,
|
||||||
# TODO: maybe figure out if pending
|
# TODO: maybe figure out if pending cancels will
|
||||||
# cancels will eventually get cancelled
|
# eventually get cancelled
|
||||||
reason="Order cancel is still pending?",
|
reason="Order cancel is still pending?",
|
||||||
broker_details=resp
|
broker_details=resp
|
||||||
).dict()
|
).dict()
|
||||||
|
@ -1028,45 +1003,7 @@ async def open_history_client(
|
||||||
|
|
||||||
# TODO implement history getter for the new storage layer.
|
# TODO implement history getter for the new storage layer.
|
||||||
async with open_cached_client('kraken') as client:
|
async with open_cached_client('kraken') as client:
|
||||||
|
yield client
|
||||||
# lol, kraken won't send any more then the "last"
|
|
||||||
# 720 1m bars.. so we have to just ignore further
|
|
||||||
# requests of this type..
|
|
||||||
queries: int = 0
|
|
||||||
|
|
||||||
async def get_ohlc(
|
|
||||||
end_dt: Optional[datetime] = None,
|
|
||||||
start_dt: Optional[datetime] = None,
|
|
||||||
|
|
||||||
) -> tuple[
|
|
||||||
np.ndarray,
|
|
||||||
datetime, # start
|
|
||||||
datetime, # end
|
|
||||||
]:
|
|
||||||
|
|
||||||
nonlocal queries
|
|
||||||
if queries > 0:
|
|
||||||
raise DataUnavailable
|
|
||||||
|
|
||||||
count = 0
|
|
||||||
while count <= 3:
|
|
||||||
try:
|
|
||||||
array = await client.bars(
|
|
||||||
symbol,
|
|
||||||
since=end_dt,
|
|
||||||
)
|
|
||||||
count += 1
|
|
||||||
queries += 1
|
|
||||||
break
|
|
||||||
except DataThrottle:
|
|
||||||
log.warning(f'kraken OHLC throttle for {symbol}')
|
|
||||||
await trio.sleep(1)
|
|
||||||
|
|
||||||
start_dt = pendulum.from_timestamp(array[0]['time'])
|
|
||||||
end_dt = pendulum.from_timestamp(array[-1]['time'])
|
|
||||||
return array, start_dt, end_dt
|
|
||||||
|
|
||||||
yield get_ohlc
|
|
||||||
|
|
||||||
|
|
||||||
async def backfill_bars(
|
async def backfill_bars(
|
||||||
|
|
|
@ -148,7 +148,7 @@ def storesh(
|
||||||
enable_modules=['piker.data._ahab'],
|
enable_modules=['piker.data._ahab'],
|
||||||
):
|
):
|
||||||
symbol = symbols[0]
|
symbol = symbols[0]
|
||||||
await tsdb_history_update(symbol)
|
await tsdb_history_update()
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
|
@ -67,10 +67,6 @@ from ._sampling import (
|
||||||
sample_and_broadcast,
|
sample_and_broadcast,
|
||||||
uniform_rate_send,
|
uniform_rate_send,
|
||||||
)
|
)
|
||||||
from ..brokers._util import (
|
|
||||||
NoData,
|
|
||||||
DataUnavailable,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
@ -277,19 +273,7 @@ async def start_backfill(
|
||||||
# and count < mx_fills
|
# and count < mx_fills
|
||||||
):
|
):
|
||||||
count += 1
|
count += 1
|
||||||
try:
|
|
||||||
array, start_dt, end_dt = await hist(end_dt=start_dt)
|
array, start_dt, end_dt = await hist(end_dt=start_dt)
|
||||||
|
|
||||||
except NoData:
|
|
||||||
# decrement by the diff in time last delivered.
|
|
||||||
end_dt = start_dt.subtract(seconds=(end_dt - start_dt).seconds)
|
|
||||||
continue
|
|
||||||
|
|
||||||
except DataUnavailable:
|
|
||||||
# broker is being a bish and we can't pull
|
|
||||||
# any more..
|
|
||||||
break
|
|
||||||
|
|
||||||
to_push = diff_history(
|
to_push = diff_history(
|
||||||
array,
|
array,
|
||||||
start_dt,
|
start_dt,
|
||||||
|
|
|
@ -270,7 +270,6 @@ class Storage:
|
||||||
self,
|
self,
|
||||||
fqsn: str,
|
fqsn: str,
|
||||||
timeframe: Optional[Union[int, str]] = None,
|
timeframe: Optional[Union[int, str]] = None,
|
||||||
end: Optional[int] = None,
|
|
||||||
|
|
||||||
) -> tuple[
|
) -> tuple[
|
||||||
MarketstoreClient,
|
MarketstoreClient,
|
||||||
|
@ -288,7 +287,6 @@ class Storage:
|
||||||
symbols=fqsn,
|
symbols=fqsn,
|
||||||
timeframe=tfstr,
|
timeframe=tfstr,
|
||||||
attrgroup='OHLCV',
|
attrgroup='OHLCV',
|
||||||
end=end,
|
|
||||||
# limit_from_start=True,
|
# limit_from_start=True,
|
||||||
|
|
||||||
# TODO: figure the max limit here given the
|
# TODO: figure the max limit here given the
|
||||||
|
@ -348,7 +346,6 @@ class Storage:
|
||||||
self,
|
self,
|
||||||
fqsn: str,
|
fqsn: str,
|
||||||
ohlcv: np.ndarray,
|
ohlcv: np.ndarray,
|
||||||
append_and_duplicate: bool = True,
|
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
# build mkts schema compat array for writing
|
# build mkts schema compat array for writing
|
||||||
|
@ -376,7 +373,7 @@ class Storage:
|
||||||
# NOTE: will will append duplicates
|
# NOTE: will will append duplicates
|
||||||
# for the same timestamp-index.
|
# for the same timestamp-index.
|
||||||
# TODO: pre deduplicate?
|
# TODO: pre deduplicate?
|
||||||
isvariablelength=append_and_duplicate,
|
isvariablelength=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
log.info(
|
log.info(
|
||||||
|
@ -446,17 +443,17 @@ async def tsdb_history_update(
|
||||||
async with (
|
async with (
|
||||||
open_storage_client(fqsn) as storage,
|
open_storage_client(fqsn) as storage,
|
||||||
|
|
||||||
maybe_open_feed(
|
# maybe_open_feed(
|
||||||
[fqsn],
|
# [fqsn],
|
||||||
start_stream=False,
|
# start_stream=False,
|
||||||
|
|
||||||
) as (feed, stream),
|
# ) as (feed, stream),
|
||||||
):
|
):
|
||||||
profiler(f'opened feed for {fqsn}')
|
profiler(f'opened feed for {fqsn}')
|
||||||
|
|
||||||
|
|
||||||
to_append = feed.shm.array
|
# to_append = feed.shm.array
|
||||||
to_prepend = None
|
# to_prepend = None
|
||||||
|
|
||||||
if fqsn:
|
if fqsn:
|
||||||
symbol = feed.symbols.get(fqsn)
|
symbol = feed.symbols.get(fqsn)
|
||||||
|
@ -480,11 +477,10 @@ async def tsdb_history_update(
|
||||||
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
|
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
|
||||||
profiler(f'listed symbols {syms}')
|
profiler(f'listed symbols {syms}')
|
||||||
|
|
||||||
# TODO: ask if user wants to write history for detected
|
|
||||||
# available shm buffers?
|
|
||||||
from tractor.trionics import ipython_embed
|
from tractor.trionics import ipython_embed
|
||||||
await ipython_embed()
|
await ipython_embed()
|
||||||
|
|
||||||
|
|
||||||
# for array in [to_append, to_prepend]:
|
# for array in [to_append, to_prepend]:
|
||||||
# if array is None:
|
# if array is None:
|
||||||
# continue
|
# continue
|
||||||
|
@ -494,7 +490,7 @@ async def tsdb_history_update(
|
||||||
# )
|
# )
|
||||||
# await storage.write_ohlcv(fqsn, array)
|
# await storage.write_ohlcv(fqsn, array)
|
||||||
|
|
||||||
# profiler('Finished db writes')
|
profiler('Finished db writes')
|
||||||
|
|
||||||
|
|
||||||
async def ingest_quote_stream(
|
async def ingest_quote_stream(
|
||||||
|
|
|
@ -167,7 +167,6 @@ def _wma(
|
||||||
|
|
||||||
assert length == len(weights)
|
assert length == len(weights)
|
||||||
|
|
||||||
# lol, for long sequences this is nutso slow and expensive..
|
|
||||||
return np.convolve(signal, weights, 'valid')
|
return np.convolve(signal, weights, 'valid')
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -309,7 +309,7 @@ async def flow_rates(
|
||||||
|
|
||||||
if period > 1:
|
if period > 1:
|
||||||
trade_rate_wma = _wma(
|
trade_rate_wma = _wma(
|
||||||
dvlm_shm.array['trade_count'][-period:],
|
dvlm_shm.array['trade_count'],
|
||||||
period,
|
period,
|
||||||
weights=weights,
|
weights=weights,
|
||||||
)
|
)
|
||||||
|
@ -332,7 +332,7 @@ async def flow_rates(
|
||||||
|
|
||||||
if period > 1:
|
if period > 1:
|
||||||
dark_trade_rate_wma = _wma(
|
dark_trade_rate_wma = _wma(
|
||||||
dvlm_shm.array['dark_trade_count'][-period:],
|
dvlm_shm.array['dark_trade_count'],
|
||||||
period,
|
period,
|
||||||
weights=weights,
|
weights=weights,
|
||||||
)
|
)
|
||||||
|
|
|
@ -429,7 +429,10 @@ class FastAppendCurve(pg.GraphicsObject):
|
||||||
if (
|
if (
|
||||||
# std m4 downsample conditions
|
# std m4 downsample conditions
|
||||||
px_width
|
px_width
|
||||||
and abs(uppx_diff) >= 1
|
and uppx_diff >= 1
|
||||||
|
or uppx_diff <= -1
|
||||||
|
or self._step_mode and abs(uppx_diff) >= 2
|
||||||
|
|
||||||
):
|
):
|
||||||
log.info(
|
log.info(
|
||||||
f'{self._name} sampler change: {self._last_uppx} -> {uppx}'
|
f'{self._name} sampler change: {self._last_uppx} -> {uppx}'
|
||||||
|
|
|
@ -61,7 +61,7 @@ from ..log import get_logger
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
# TODO: load this from a config.toml!
|
# TODO: load this from a config.toml!
|
||||||
_quote_throttle_rate: int = 22 # Hz
|
_quote_throttle_rate: int = 12 # Hz
|
||||||
|
|
||||||
|
|
||||||
# a working tick-type-classes template
|
# a working tick-type-classes template
|
||||||
|
@ -689,17 +689,15 @@ async def display_symbol_data(
|
||||||
# plot historical vwap if available
|
# plot historical vwap if available
|
||||||
wap_in_history = False
|
wap_in_history = False
|
||||||
|
|
||||||
# XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?!
|
if brokermod._show_wap_in_history:
|
||||||
# if brokermod._show_wap_in_history:
|
|
||||||
|
|
||||||
# if 'bar_wap' in bars.dtype.fields:
|
if 'bar_wap' in bars.dtype.fields:
|
||||||
# wap_in_history = True
|
wap_in_history = True
|
||||||
# chart.draw_curve(
|
chart.draw_curve(
|
||||||
# name='bar_wap',
|
name='bar_wap',
|
||||||
# shm=ohlcv,
|
data=bars,
|
||||||
# color='default_light',
|
add_label=False,
|
||||||
# add_label=False,
|
)
|
||||||
# )
|
|
||||||
|
|
||||||
# size view to data once at outset
|
# size view to data once at outset
|
||||||
chart.cv._set_yrange()
|
chart.cv._set_yrange()
|
||||||
|
|
|
@ -320,7 +320,7 @@ class Flow(msgspec.Struct): # , frozen=True):
|
||||||
render: bool = True,
|
render: bool = True,
|
||||||
array_key: Optional[str] = None,
|
array_key: Optional[str] = None,
|
||||||
|
|
||||||
profiler: Optional[pg.debug.Profiler] = None,
|
profiler=None,
|
||||||
|
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
|
@ -524,10 +524,7 @@ class Flow(msgspec.Struct): # , frozen=True):
|
||||||
view_range=(ivl, ivr), # hack
|
view_range=(ivl, ivr), # hack
|
||||||
profiler=profiler,
|
profiler=profiler,
|
||||||
# should_redraw=False,
|
# should_redraw=False,
|
||||||
|
# do_append=False,
|
||||||
# NOTE: already passed through by display loop?
|
|
||||||
# do_append=uppx < 16,
|
|
||||||
**kwargs,
|
|
||||||
)
|
)
|
||||||
curve.show()
|
curve.show()
|
||||||
profiler('updated ds curve')
|
profiler('updated ds curve')
|
||||||
|
@ -592,7 +589,6 @@ class Flow(msgspec.Struct): # , frozen=True):
|
||||||
else:
|
else:
|
||||||
# ``FastAppendCurve`` case:
|
# ``FastAppendCurve`` case:
|
||||||
array_key = array_key or self.name
|
array_key = array_key or self.name
|
||||||
uppx = graphics.x_uppx()
|
|
||||||
|
|
||||||
if graphics._step_mode and self.gy is None:
|
if graphics._step_mode and self.gy is None:
|
||||||
self._iflat_first = self.shm._first.value
|
self._iflat_first = self.shm._first.value
|
||||||
|
@ -838,9 +834,7 @@ class Flow(msgspec.Struct): # , frozen=True):
|
||||||
slice_to_head=-2,
|
slice_to_head=-2,
|
||||||
|
|
||||||
should_redraw=bool(append_diff),
|
should_redraw=bool(append_diff),
|
||||||
|
# do_append=False,
|
||||||
# NOTE: already passed through by display loop?
|
|
||||||
# do_append=uppx < 16,
|
|
||||||
|
|
||||||
**kwargs
|
**kwargs
|
||||||
)
|
)
|
||||||
|
@ -872,8 +866,6 @@ class Flow(msgspec.Struct): # , frozen=True):
|
||||||
|
|
||||||
view_range=(ivl, ivr) if use_vr else None,
|
view_range=(ivl, ivr) if use_vr else None,
|
||||||
|
|
||||||
# NOTE: already passed through by display loop?
|
|
||||||
# do_append=uppx < 16,
|
|
||||||
**kwargs
|
**kwargs
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue