Compare commits

...

20 Commits

Author SHA1 Message Date
Tyler Goodlet ac0f43dc98 Go Python 3.10+ in anticipation of upcoming feature PRs 2022-06-28 10:02:09 -04:00
goodboy 3977f1cc7e
Merge pull request #341 from pikers/contain_mkts
Contain mkts
2022-06-26 13:49:23 -04:00
Tyler Goodlet e45cb9d08a Always cancel container on teardown 2022-06-26 13:36:29 -04:00
Tyler Goodlet 27c523ca74 Speedup: only load a "views worth" of datums on first query 2022-06-23 15:21:09 -04:00
Tyler Goodlet b8b76a32a6 Harden container cancel-and-wait supervisor loop
This should hopefully make teardown more reliable and includes better
logic to fail over to a hard kill path after a 3 second timeout waiting
for the instance to complete using the `docker-py` wait API. Also
generalize the supervisor teardown loop by allowing the container config
endpoint to return 2 msgs to expect:
- a startup message that can be read from the container's internal
  process logging that indicates it is fully up and ready.
- a teardown msg that can be polled for that indicates the container has
  gracefully terminated after a cancellation request which is passed to
  our container wrappers `.cancel()` method.

Make the marketstore config endpoint return the 2 messages we previously
had hard coded and use this new api.
2022-06-23 10:23:14 -04:00
Tyler Goodlet dcee0ddd55 Move/expect all marketstore configs under a `<configdir>/piker/marketstore` subdir 2022-06-23 09:48:32 -04:00
goodboy 67eab85f06
Merge pull request #340 from pikers/slic_fix_v2
Slice fix v2
2022-06-22 19:55:39 -04:00
Tyler Goodlet afc95b8592 Facepalm, get the first x value not the array.. 2022-06-22 19:43:33 -04:00
Tyler Goodlet 14c98d82ee Only warn once when realtime quotes time out 2022-06-22 19:43:23 -04:00
goodboy b87aa30031
Merge pull request #339 from pikers/uppx_slice_fix
Uppx slice fix
2022-06-16 16:20:00 -04:00
Tyler Goodlet 958f53d8e9 Lower re-syncing log msgs to debug level 2022-06-16 15:50:21 -04:00
Tyler Goodlet ba43b54175 Handle edge case for extreme zoom out 2022-06-16 15:50:05 -04:00
Tyler Goodlet de970755d7 Flip back to original daemon port 2022-06-16 15:50:05 -04:00
goodboy 7ddebf6773
Merge pull request #338 from pikers/update_last_datums_in_view
Fix: update last datums in view by `uppx` indexing
2022-06-10 09:38:26 -04:00
Tyler Goodlet 8eb4a427da Revert uppx flooring, causes shift issues 2022-06-10 07:03:21 -04:00
Tyler Goodlet da5dea9f99 Drop cache reset from `Curve.draw_last_datum()` 2022-06-10 07:03:05 -04:00
Tyler Goodlet 3074773662 Fix 'last datum line is uppx's worth of data' rendering
This was introduced in #302 but after thorough testing was clear to be
not working XD. Adjust the display loop to update the last graphics
segment on both the OHLC and vlm charts (as well as all deriving fsp
flows) whenever the uppx >= 1 and there is no current path append
taking place (since more datums are needed to span an x-pixel in view).

Summary of tweaks:
- move vlm chart update code to be at the end of the cycle routine and
  have that block include the tests for a "interpolated last datum in
  view" line.
- make `do_append: bool` compare with a floor of the uppx value (i.e.
  appends should happen when we're just fractionally over a pixel of
  x units).
- never update the "volume" chart.
2022-06-09 17:57:34 -04:00
Tyler Goodlet 4099b53ea2 Add `Flow.ds_graphics': a downsample curve ref
Allows for optionally updating a "downsampled" graphics type which is
currently necessary in the `BarItems` -> `FlattenedOHLC` curve switching
case; we don't want to be needlessly redrawing the `Flow.graphics`
object (which will be an OHLC curve) when in flattened curve mode.
Further add a `only_last_uppx: bool` flag to `.draw_last()` to allow
forcing a "last uppx's worth of data max/min" style interpolating line
as needed.
2022-06-09 17:57:34 -04:00
goodboy 633fa7cc3a
Merge pull request #335 from pikers/ib_subpkg
`pikers.broker.ib` subpackage
2022-06-07 11:41:11 -04:00
goodboy e9f0ea3daa
Merge pull request #327 from pikers/flexxin
Flexxin: `ib` trade reports parsing basics
2022-06-07 09:42:54 -04:00
10 changed files with 280 additions and 173 deletions

View File

@ -35,7 +35,7 @@ log = get_logger(__name__)
_root_dname = 'pikerd'
_registry_addr = ('127.0.0.1', 1616)
_registry_addr = ('127.0.0.1', 6116)
_tractor_kwargs: dict[str, Any] = {
# use a different registry addr then tractor's default
'arbiter_addr': _registry_addr

View File

@ -640,6 +640,7 @@ class Client:
ready = ticker.updateEvent
# ensure a last price gets filled in before we deliver quote
warnset: bool = False
for _ in range(100):
if isnan(ticker.last):
@ -650,17 +651,21 @@ class Client:
if ready in done:
break
else:
log.warning(
f'Quote for {symbol} timed out: market is closed?'
)
if not warnset:
log.warning(
f'Quote for {symbol} timed out: market is closed?'
)
warnset = True
else:
log.info(f'Got first quote for {symbol}')
break
else:
log.warning(
f'Symbol {symbol} is not returning a quote '
'it may be outside trading hours?')
if not warnset:
log.warning(
f'Symbol {symbol} is not returning a quote '
'it may be outside trading hours?')
warnset = True
return contract, ticker, details

View File

@ -19,6 +19,7 @@ Supervisor for docker with included specific-image service helpers.
'''
import os
import time
from typing import (
Optional,
Callable,
@ -186,45 +187,65 @@ class Container:
async def cancel(
self,
stop_msg: str,
) -> None:
cid = self.cntr.id
# first try a graceful cancel
log.cancel(
f'SIGINT cancelling container: {cid}\n'
f'waiting on stop msg: "{stop_msg}"'
)
self.try_signal('SIGINT')
with trio.move_on_after(0.5) as cs:
cs.shield = True
await self.process_logs_until('initiating graceful shutdown')
await self.process_logs_until('exiting...',)
start = time.time()
for _ in range(30):
for _ in range(10):
with trio.move_on_after(0.5) as cs:
cs.shield = True
await self.process_logs_until('exiting...',)
await self.process_logs_until(stop_msg)
# if we aren't cancelled on above checkpoint then we
# assume we read the expected stop msg and terminated.
break
if cs.cancelled_caught:
# get out the big guns, bc apparently marketstore
# doesn't actually know how to terminate gracefully
# :eyeroll:...
self.try_signal('SIGKILL')
try:
log.info(f'Polling for container shutdown:\n{cid}')
try:
log.info('Waiting on container shutdown: {cid}')
if self.cntr.status not in {'exited', 'not-running'}:
self.cntr.wait(
timeout=0.1,
condition='not-running',
)
break
except (
ReadTimeout,
ConnectionError,
):
log.error(f'failed to wait on container {cid}')
raise
break
except (
ReadTimeout,
):
log.info(f'Still waiting on container:\n{cid}')
continue
except (
docker.errors.APIError,
ConnectionError,
):
log.exception('Docker connection failure')
break
else:
raise RuntimeError('Failed to cancel container {cid}')
delay = time.time() - start
log.error(
f'Failed to kill container {cid} after {delay}s\n'
'sending SIGKILL..'
)
# get out the big guns, bc apparently marketstore
# doesn't actually know how to terminate gracefully
# :eyeroll:...
self.try_signal('SIGKILL')
self.cntr.wait(
timeout=3,
condition='not-running',
)
log.cancel(f'Container stopped: {cid}')
@ -245,13 +266,16 @@ async def open_ahabd(
# params, etc. passing to ``Containter.run()``?
# call into endpoint for container config/init
ep_func = NamespacePath(endpoint).load_ref()
dcntr, cntr_config = ep_func(client)
(
dcntr,
cntr_config,
start_msg,
stop_msg,
) = ep_func(client)
cntr = Container(dcntr)
with trio.move_on_after(1):
found = await cntr.process_logs_until(
"launching tcp listener for all services...",
)
found = await cntr.process_logs_until(start_msg)
if not found and cntr not in client.containers.list():
raise RuntimeError(
@ -271,16 +295,9 @@ async def open_ahabd(
# callers to have root perms?
await trio.sleep_forever()
except (
BaseException,
# trio.Cancelled,
# KeyboardInterrupt,
):
finally:
with trio.CancelScope(shield=True):
await cntr.cancel()
raise
await cntr.cancel(stop_msg)
async def start_ahab(

View File

@ -127,10 +127,15 @@ def start_marketstore(
import os
import docker
from .. import config
get_console_log('info', name=__name__)
yml_file = os.path.join(config._config_dir, 'mkts.yml')
mktsdir = os.path.join(config._config_dir, 'marketstore')
# create when dne
if not os.path.isdir(mktsdir):
os.mkdir(mktsdir)
yml_file = os.path.join(mktsdir, 'mkts.yml')
if not os.path.isfile(yml_file):
log.warning(
f'No `marketstore` config exists?: {yml_file}\n'
@ -143,14 +148,14 @@ def start_marketstore(
# create a mount from user's local piker config dir into container
config_dir_mnt = docker.types.Mount(
target='/etc',
source=config._config_dir,
source=mktsdir,
type='bind',
)
# create a user config subdir where the marketstore
# backing filesystem database can be persisted.
persistent_data_dir = os.path.join(
config._config_dir, 'data',
mktsdir, 'data',
)
if not os.path.isdir(persistent_data_dir):
os.mkdir(persistent_data_dir)
@ -180,7 +185,14 @@ def start_marketstore(
init=True,
# remove=True,
)
return dcntr, _config
return (
dcntr,
_config,
# expected startup and stop msgs
"launching tcp listener for all services...",
"exiting...",
)
_tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK')
@ -383,7 +395,12 @@ class Storage:
]:
first_tsdb_dt, last_tsdb_dt = None, None
tsdb_arrays = await self.read_ohlcv(fqsn)
tsdb_arrays = await self.read_ohlcv(
fqsn,
# on first load we don't need to pull the max
# history per request size worth.
limit=3000,
)
log.info(f'Loaded tsdb history {tsdb_arrays}')
if tsdb_arrays:
@ -401,6 +418,7 @@ class Storage:
fqsn: str,
timeframe: Optional[Union[int, str]] = None,
end: Optional[int] = None,
limit: int = int(800e3),
) -> tuple[
MarketstoreClient,
@ -423,7 +441,7 @@ class Storage:
# TODO: figure the max limit here given the
# ``purepc`` msg size limit of purerpc: 33554432
limit=int(800e3),
limit=limit,
)
if timeframe is None:

View File

@ -361,7 +361,7 @@ async def cascade(
) -> tuple[TaskTracker, int]:
# TODO: adopt an incremental update engine/approach
# where possible here eventually!
log.warning(f're-syncing fsp {func_name} to source')
log.debug(f're-syncing fsp {func_name} to source')
tracker.cs.cancel()
await tracker.complete.wait()
tracker, index = await n.start(fsp_target)

View File

@ -379,17 +379,17 @@ class Curve(pg.GraphicsObject):
) -> None:
# default line draw last call
with self.reset_cache():
x = render_data['index']
y = render_data[array_key]
# with self.reset_cache():
x = render_data['index']
y = render_data[array_key]
# draw the "current" step graphic segment so it
# lines up with the "middle" of the current
# (OHLC) sample.
self._last_line = QLineF(
x[-2], y[-2],
x[-1], y[-1],
)
# draw the "current" step graphic segment so it
# lines up with the "middle" of the current
# (OHLC) sample.
self._last_line = QLineF(
x[-2], y[-2],
x[-1], y[-1],
)
return x, y

View File

@ -426,71 +426,6 @@ def graphics_update_cycle(
profiler('view incremented')
if vlm_chart:
# always update y-label
ds.vlm_sticky.update_from_data(
*array[-1][['index', 'volume']]
)
if (
(
do_rt_update
or do_append
and liv
)
or trigger_all
):
# TODO: make it so this doesn't have to be called
# once the $vlm is up?
vlm_chart.update_graphics_from_flow(
'volume',
# UGGGh, see ``maxmin()`` impl in `._fsp` for
# the overlayed plotitems... we need a better
# bay to invoke a maxmin per overlay..
render=False,
# XXX: ^^^^ THIS IS SUPER IMPORTANT! ^^^^
# without this, since we disable the
# 'volume' (units) chart after the $vlm starts
# up we need to be sure to enable this
# auto-ranging otherwise there will be no handler
# connected to update accompanying overlay
# graphics..
)
profiler('`vlm_chart.update_graphics_from_flow()`')
if (
mx_vlm_in_view != vars['last_mx_vlm']
):
yrange = (0, mx_vlm_in_view * 1.375)
vlm_chart.view._set_yrange(
yrange=yrange,
)
profiler('`vlm_chart.view._set_yrange()`')
# print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}')
vars['last_mx_vlm'] = mx_vlm_in_view
for curve_name, flow in vlm_chart._flows.items():
if not flow.render:
continue
update_fsp_chart(
vlm_chart,
flow,
curve_name,
array_key=curve_name,
# do_append=uppx < update_uppx,
do_append=do_append,
)
# is this even doing anything?
# (pretty sure it's the real-time
# resizing from last quote?)
fvb = flow.plot.vb
fvb._set_yrange(
# autoscale_linked_plots=False,
name=curve_name,
)
ticks_frame = quote.get('ticks', ())
frames_by_type: dict[str, dict] = {}
@ -540,15 +475,16 @@ def graphics_update_cycle(
or do_append
or trigger_all
):
# TODO: we should always update the "last" datum
# since the current range should at least be updated
# to it's max/min on the last pixel.
chart.update_graphics_from_flow(
chart.name,
# do_append=uppx < update_uppx,
do_append=do_append,
)
# NOTE: we always update the "last" datum
# since the current range should at least be updated
# to it's max/min on the last pixel.
# iterate in FIFO order per tick-frame
for typ, tick in lasts.items():
@ -653,30 +589,115 @@ def graphics_update_cycle(
vars['last_mx'], vars['last_mn'] = mx, mn
# run synchronous update on all linked flows
# TODO: should the "main" (aka source) flow be special?
for curve_name, flow in chart._flows.items():
# update any overlayed fsp flows
if curve_name != chart.data_key:
update_fsp_chart(
chart,
flow,
curve_name,
array_key=curve_name,
)
# even if we're downsampled bigly
# draw the last datum in the final
# px column to give the user the mx/mn
# range of that set.
if (
not do_append
# and not do_rt_update
and liv
):
flow.draw_last(
array_key=curve_name,
only_last_uppx=True,
)
# volume chart logic..
# TODO: can we unify this with the above loop?
if vlm_chart:
# always update y-label
ds.vlm_sticky.update_from_data(
*array[-1][['index', 'volume']]
)
if (
not (do_rt_update or do_append)
and liv
# even if we're downsampled bigly
# draw the last datum in the final
# px column to give the user the mx/mn
# range of that set.
(
do_rt_update
or do_append
and liv
)
or trigger_all
):
# always update the last datum-element
# graphic for all flows
flow.draw_last(array_key=curve_name)
# TODO: make it so this doesn't have to be called
# once the $vlm is up?
vlm_chart.update_graphics_from_flow(
'volume',
# UGGGh, see ``maxmin()`` impl in `._fsp` for
# the overlayed plotitems... we need a better
# bay to invoke a maxmin per overlay..
render=False,
# XXX: ^^^^ THIS IS SUPER IMPORTANT! ^^^^
# without this, since we disable the
# 'volume' (units) chart after the $vlm starts
# up we need to be sure to enable this
# auto-ranging otherwise there will be no handler
# connected to update accompanying overlay
# graphics..
)
profiler('`vlm_chart.update_graphics_from_flow()`')
# TODO: should the "main" (aka source) flow be special?
if curve_name == chart.data_key:
continue
if (
mx_vlm_in_view != vars['last_mx_vlm']
):
yrange = (0, mx_vlm_in_view * 1.375)
vlm_chart.view._set_yrange(
yrange=yrange,
)
profiler('`vlm_chart.view._set_yrange()`')
# print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}')
vars['last_mx_vlm'] = mx_vlm_in_view
update_fsp_chart(
chart,
flow,
curve_name,
array_key=curve_name,
)
for curve_name, flow in vlm_chart._flows.items():
if (
curve_name != 'volume' and
flow.render and (
liv and
do_rt_update or do_append
)
):
update_fsp_chart(
vlm_chart,
flow,
curve_name,
array_key=curve_name,
# do_append=uppx < update_uppx,
do_append=do_append,
)
# is this even doing anything?
# (pretty sure it's the real-time
# resizing from last quote?)
fvb = flow.plot.vb
fvb._set_yrange(
name=curve_name,
)
elif (
curve_name != 'volume'
and not do_append
and liv
and uppx >= 1
# even if we're downsampled bigly
# draw the last datum in the final
# px column to give the user the mx/mn
# range of that set.
):
# always update the last datum-element
# graphic for all flows
# print(f'drawing last {flow.name}')
flow.draw_last(array_key=curve_name)
async def display_symbol_data(

View File

@ -175,6 +175,7 @@ def render_baritems(
name=f'{flow.name}_ds_ohlc',
color=bars._color,
)
flow.ds_graphics = curve
curve.hide()
self.plot.addItem(curve)
@ -192,18 +193,20 @@ def render_baritems(
uppx = curve.x_uppx()
in_line = should_line = curve.isVisible()
if (
should_line
in_line
and uppx < x_gt
):
# print('FLIPPING TO BARS')
should_line = False
flow._in_ds = False
elif (
not should_line
not in_line
and uppx >= x_gt
):
# print('FLIPPING TO LINE')
should_line = True
flow._in_ds = True
profiler(f'ds logic complete line={should_line}')
@ -333,7 +336,13 @@ class Flow(msgspec.Struct): # , frozen=True):
'''
name: str
plot: pg.PlotItem
graphics: Curve
graphics: Union[Curve, BarItems]
# in some cases a flow may want to change its
# graphical "type" or, "form" when downsampling,
# normally this is just a plain line.
ds_graphics: Optional[Curve] = None
_shm: ShmArray
is_ohlc: bool = False
@ -540,6 +549,7 @@ class Flow(msgspec.Struct): # , frozen=True):
should_redraw: bool = False
rkwargs = {}
should_line = False
if isinstance(graphics, BarItems):
# XXX: special case where we change out graphics
# to a line after a certain uppx threshold.
@ -556,8 +566,8 @@ class Flow(msgspec.Struct): # , frozen=True):
profiler,
**kwargs,
)
# bars = True
should_redraw = changed_to_line or not should_line
self._in_ds = should_line
else:
r = self._src_r
@ -661,6 +671,17 @@ class Flow(msgspec.Struct): # , frozen=True):
# assign output paths to graphicis obj
graphics.path = r.path
graphics.fast_path = r.fast_path
# XXX: we don't need this right?
# graphics.draw_last_datum(
# path,
# src_array,
# data,
# reset,
# array_key,
# )
# graphics.update()
# profiler('.update()')
else:
# assign output paths to graphicis obj
graphics.path = r.path
@ -673,16 +694,15 @@ class Flow(msgspec.Struct): # , frozen=True):
reset,
array_key,
)
# TODO: is this ever better?
# graphics.prepareGeometryChange()
# profiler('.prepareGeometryChange()')
graphics.update()
profiler('.update()')
# TODO: does this actuallly help us in any way (prolly should
# look at the source / ask ogi). I think it avoid artifacts on
# wheel-scroll downsampling curve updates?
graphics.update()
profiler('.update()')
# TODO: is this ever better?
# graphics.prepareGeometryChange()
# profiler('.prepareGeometryChange()')
# track downsampled state
self._in_ds = r._in_ds
@ -692,6 +712,7 @@ class Flow(msgspec.Struct): # , frozen=True):
def draw_last(
self,
array_key: Optional[str] = None,
only_last_uppx: bool = False,
) -> None:
@ -711,19 +732,41 @@ class Flow(msgspec.Struct): # , frozen=True):
array_key,
)
if self._in_ds:
# we only care about the last pixel's
# worth of data since that's all the screen
# can represent on the last column where
# the most recent datum is being drawn.
# the renderer is downsampling we choose
# to always try and updadte a single (interpolating)
# line segment that spans and tries to display
# the las uppx's worth of datums.
# we only care about the last pixel's
# worth of data since that's all the screen
# can represent on the last column where
# the most recent datum is being drawn.
if self._in_ds or only_last_uppx:
dsg = self.ds_graphics or self.graphics
# XXX: pretty sure we don't need this?
# if isinstance(g, Curve):
# with dsg.reset_cache():
uppx = self._last_uppx
y = y[-uppx:]
ymn, ymx = y.min(), y.max()
# print(f'drawing uppx={uppx} mxmn line: {ymn}, {ymx}')
g._last_line = QLineF(
x[-2], ymn,
try:
iuppx = x[-uppx]
except IndexError:
# we're less then an x-px wide so just grab the start
# datum index.
iuppx = x[0]
dsg._last_line = QLineF(
iuppx, ymn,
x[-1], ymx,
)
# print(f'updating DS curve {self.name}')
dsg.update()
else:
# print(f'updating NOT DS curve {self.name}')
g.update()
def by_index_and_key(

View File

@ -440,7 +440,7 @@ class FspAdmin:
# if the chart isn't hidden try to update
# the data on screen.
if not self.linked.isHidden():
log.info(f'Re-syncing graphics for fsp: {ns_path}')
log.debug(f'Re-syncing graphics for fsp: {ns_path}')
self.linked.graphics_cycle(
trigger_all=True,
prepend_update_index=info['first'],

View File

@ -57,6 +57,7 @@ setup(
# from github currently (see requirements.txt)
# 'trimeter', # not released yet..
# 'tractor',
# asyncvnc,
# brokers
'asks==2.4.8',
@ -71,32 +72,34 @@ setup(
# UI
'PyQt5',
'pyqtgraph',
'qdarkstyle >= 3.0.2',
# fuzzy search
'fuzzywuzzy[speedup]',
# 'pyqtgraph', from our fork see reqs.txt
'qdarkstyle >= 3.0.2', # themeing
'fuzzywuzzy[speedup]', # fuzzy search
# tsdbs
'pymarketstore',
# anyio-marketstore # from gh see reqs.txt
],
extras_require={
# serialization
'tsdb': [
'docker',
],
},
tests_require=['pytest'],
python_requires=">=3.9", # literally for ``datetime.datetime.fromisoformat``...
keywords=["async", "trading", "finance", "quant", "charting"],
python_requires=">=3.10",
keywords=[
"async",
"trading",
"finance",
"quant",
"charting",
],
classifiers=[
'Development Status :: 3 - Alpha',
'License :: OSI Approved :: ',
'Operating System :: POSIX :: Linux',
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
'Intended Audience :: Financial and Insurance Industry',
'Intended Audience :: Science/Research',