Compare commits

..

2 Commits

12 changed files with 267 additions and 304 deletions

View File

@ -35,7 +35,7 @@ log = get_logger(__name__)
_root_dname = 'pikerd'
_registry_addr = ('127.0.0.1', 6116)
_registry_addr = ('127.0.0.1', 1616)
_tractor_kwargs: dict[str, Any] = {
# use a different registry addr then tractor's default
'arbiter_addr': _registry_addr

View File

@ -640,7 +640,6 @@ class Client:
ready = ticker.updateEvent
# ensure a last price gets filled in before we deliver quote
warnset: bool = False
for _ in range(100):
if isnan(ticker.last):
@ -651,21 +650,17 @@ class Client:
if ready in done:
break
else:
if not warnset:
log.warning(
f'Quote for {symbol} timed out: market is closed?'
)
warnset = True
log.warning(
f'Quote for {symbol} timed out: market is closed?'
)
else:
log.info(f'Got first quote for {symbol}')
break
else:
if not warnset:
log.warning(
f'Symbol {symbol} is not returning a quote '
'it may be outside trading hours?')
warnset = True
log.warning(
f'Symbol {symbol} is not returning a quote '
'it may be outside trading hours?')
return contract, ticker, details

View File

@ -555,36 +555,56 @@ def load_flex_trades(
report = flexreport.FlexReport(path=path)
trade_entries = report.extract('Trade')
trades = {
# get reverse map to user account names
accounts = conf['accounts'].inverse
trades_by_account = {}
for t in trade_entries:
# XXX: LOL apparently ``toml`` has a bug
# where a section key error will show up in the write
# if you leave this as an ``int``?
str(t.__dict__['tradeID']): t.__dict__
for t in trade_entries
}
trade = t.__dict__
# oddly for some so-called "BookTrade" entries
# this field seems to be blank, no cuckin clue.
# trade['ibExecID']
tid = str(trade['tradeID'])
date = str(trade['tradeDate'])
acctid = accounts[str(trade['accountId'])]
trades_by_account.setdefault(
acctid, {}
).setdefault(date, {})[tid] = trade
ln = len(trades)
ln = len(trades_by_account.values())
log.info(f'Loaded {ln} trades from flex query')
trades_by_account = {}
for tid, trade in trades.items():
trades_by_account.setdefault(
# oddly for some so-called "BookTrade" entries
# this field seems to be blank, no cuckin clue.
# trade['ibExecID']
str(trade['accountId']), {}
)[tid] = trade
# section = {'ib': trades_by_account}
for acctid, trades_by_id in trades_by_account.items():
with config.open_trade_ledger('ib', acctid) as ledger:
ledger.update({'ib': trades_by_id})
section = {'ib': trades_by_account}
pprint(section)
# pprint(section)
# TODO: load the config first and append in
# the new trades loaded here..
try:
config.write(section, 'trades')
except KeyError:
import pdbpp; pdbpp.set_trace() # noqa
# try:
# config.write(section, 'trades')
# except KeyError:
# import pdbpp; pdbpp.set_trace() # noqa
if __name__ == '__main__':
load_flex_trades()
import sys
import os
args = sys.argv
if len(args) > 1:
args = args[1:]
for arg in args:
path = os.path.abspath(arg)
load_flex_trades(path=path)
else:
# expect brokers.toml to have an entry and
# pull from the web service.
load_flex_trades()

View File

@ -18,9 +18,11 @@
Broker configuration mgmt.
"""
from contextlib import contextmanager as cm
import platform
import sys
import os
from os import path
from os.path import dirname
import shutil
from typing import Optional
@ -111,6 +113,7 @@ if _parent_user:
_conf_names: set[str] = {
'brokers',
'pp',
'trades',
'watchlists',
}
@ -147,19 +150,21 @@ def get_conf_path(
conf_name: str = 'brokers',
) -> str:
"""Return the default config path normally under
``~/.config/piker`` on linux.
'''
Return the top-level default config path normally under
``~/.config/piker`` on linux for a given ``conf_name``, the config
name.
Contains files such as:
- brokers.toml
- pp.toml
- watchlists.toml
- trades.toml
# maybe coming soon ;)
- signals.toml
- strats.toml
"""
'''
assert conf_name in _conf_names
fn = _conf_fn_w_ext(conf_name)
return os.path.join(
@ -168,12 +173,57 @@ def get_conf_path(
)
@cm
def open_trade_ledger(
broker: str,
account: str,
) -> str:
'''
Indempotently create and read in a trade log file from the
``<configuration_dir>/ledgers/`` directory.
Files are named per broker account of the form
``<brokername>_<accountname>.toml``. The ``accountname`` here is the
name as defined in the user's ``brokers.toml`` config.
'''
ldir = path.join(_config_dir, 'ledgers')
if not path.isdir(ldir):
os.makedirs(ldir)
fname = f'trades_{broker}_{account}.toml'
tradesfile = path.join(ldir, fname)
if not path.isfile(tradesfile):
log.info(
f'Creating new local trades ledger: {tradesfile}'
)
with open(tradesfile, 'w') as cf:
pass # touch
try:
with open(tradesfile, 'r') as cf:
ledger = toml.load(tradesfile)
cpy = ledger.copy()
yield cpy
finally:
if cpy != ledger:
# TODO: show diff output?
# https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries
print(f'Updating ledger for {tradesfile}:\n')
ledger.update(cpy)
# we write on close the mutated ledger data
with open(tradesfile, 'w') as cf:
return toml.dump(ledger, cf)
def repodir():
'''
Return the abspath to the repo directory.
'''
dirpath = os.path.abspath(
dirpath = path.abspath(
# we're 3 levels down in **this** module file
dirname(dirname(os.path.realpath(__file__)))
)

View File

@ -19,7 +19,6 @@ Supervisor for docker with included specific-image service helpers.
'''
import os
import time
from typing import (
Optional,
Callable,
@ -187,65 +186,45 @@ class Container:
async def cancel(
self,
stop_msg: str,
) -> None:
cid = self.cntr.id
# first try a graceful cancel
log.cancel(
f'SIGINT cancelling container: {cid}\n'
f'waiting on stop msg: "{stop_msg}"'
)
self.try_signal('SIGINT')
start = time.time()
for _ in range(30):
with trio.move_on_after(0.5) as cs:
cs.shield = True
await self.process_logs_until('initiating graceful shutdown')
await self.process_logs_until('exiting...',)
for _ in range(10):
with trio.move_on_after(0.5) as cs:
cs.shield = True
await self.process_logs_until(stop_msg)
# if we aren't cancelled on above checkpoint then we
# assume we read the expected stop msg and terminated.
await self.process_logs_until('exiting...',)
break
try:
log.info(f'Polling for container shutdown:\n{cid}')
if cs.cancelled_caught:
# get out the big guns, bc apparently marketstore
# doesn't actually know how to terminate gracefully
# :eyeroll:...
self.try_signal('SIGKILL')
if self.cntr.status not in {'exited', 'not-running'}:
try:
log.info('Waiting on container shutdown: {cid}')
self.cntr.wait(
timeout=0.1,
condition='not-running',
)
break
break
except (
ReadTimeout,
ConnectionError,
):
log.error(f'failed to wait on container {cid}')
raise
except (
ReadTimeout,
):
log.info(f'Still waiting on container:\n{cid}')
continue
except (
docker.errors.APIError,
ConnectionError,
):
log.exception('Docker connection failure')
break
else:
delay = time.time() - start
log.error(
f'Failed to kill container {cid} after {delay}s\n'
'sending SIGKILL..'
)
# get out the big guns, bc apparently marketstore
# doesn't actually know how to terminate gracefully
# :eyeroll:...
self.try_signal('SIGKILL')
self.cntr.wait(
timeout=3,
condition='not-running',
)
raise RuntimeError('Failed to cancel container {cid}')
log.cancel(f'Container stopped: {cid}')
@ -266,16 +245,13 @@ async def open_ahabd(
# params, etc. passing to ``Containter.run()``?
# call into endpoint for container config/init
ep_func = NamespacePath(endpoint).load_ref()
(
dcntr,
cntr_config,
start_msg,
stop_msg,
) = ep_func(client)
dcntr, cntr_config = ep_func(client)
cntr = Container(dcntr)
with trio.move_on_after(1):
found = await cntr.process_logs_until(start_msg)
found = await cntr.process_logs_until(
"launching tcp listener for all services...",
)
if not found and cntr not in client.containers.list():
raise RuntimeError(
@ -295,9 +271,16 @@ async def open_ahabd(
# callers to have root perms?
await trio.sleep_forever()
finally:
except (
BaseException,
# trio.Cancelled,
# KeyboardInterrupt,
):
with trio.CancelScope(shield=True):
await cntr.cancel(stop_msg)
await cntr.cancel()
raise
async def start_ahab(

View File

@ -127,15 +127,10 @@ def start_marketstore(
import os
import docker
from .. import config
get_console_log('info', name=__name__)
mktsdir = os.path.join(config._config_dir, 'marketstore')
# create when dne
if not os.path.isdir(mktsdir):
os.mkdir(mktsdir)
yml_file = os.path.join(mktsdir, 'mkts.yml')
yml_file = os.path.join(config._config_dir, 'mkts.yml')
if not os.path.isfile(yml_file):
log.warning(
f'No `marketstore` config exists?: {yml_file}\n'
@ -148,14 +143,14 @@ def start_marketstore(
# create a mount from user's local piker config dir into container
config_dir_mnt = docker.types.Mount(
target='/etc',
source=mktsdir,
source=config._config_dir,
type='bind',
)
# create a user config subdir where the marketstore
# backing filesystem database can be persisted.
persistent_data_dir = os.path.join(
mktsdir, 'data',
config._config_dir, 'data',
)
if not os.path.isdir(persistent_data_dir):
os.mkdir(persistent_data_dir)
@ -185,14 +180,7 @@ def start_marketstore(
init=True,
# remove=True,
)
return (
dcntr,
_config,
# expected startup and stop msgs
"launching tcp listener for all services...",
"exiting...",
)
return dcntr, _config
_tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK')
@ -395,12 +383,7 @@ class Storage:
]:
first_tsdb_dt, last_tsdb_dt = None, None
tsdb_arrays = await self.read_ohlcv(
fqsn,
# on first load we don't need to pull the max
# history per request size worth.
limit=3000,
)
tsdb_arrays = await self.read_ohlcv(fqsn)
log.info(f'Loaded tsdb history {tsdb_arrays}')
if tsdb_arrays:
@ -418,7 +401,6 @@ class Storage:
fqsn: str,
timeframe: Optional[Union[int, str]] = None,
end: Optional[int] = None,
limit: int = int(800e3),
) -> tuple[
MarketstoreClient,
@ -441,7 +423,7 @@ class Storage:
# TODO: figure the max limit here given the
# ``purepc`` msg size limit of purerpc: 33554432
limit=limit,
limit=int(800e3),
)
if timeframe is None:

View File

@ -361,7 +361,7 @@ async def cascade(
) -> tuple[TaskTracker, int]:
# TODO: adopt an incremental update engine/approach
# where possible here eventually!
log.debug(f're-syncing fsp {func_name} to source')
log.warning(f're-syncing fsp {func_name} to source')
tracker.cs.cancel()
await tracker.complete.wait()
tracker, index = await n.start(fsp_target)

View File

@ -379,17 +379,17 @@ class Curve(pg.GraphicsObject):
) -> None:
# default line draw last call
# with self.reset_cache():
x = render_data['index']
y = render_data[array_key]
with self.reset_cache():
x = render_data['index']
y = render_data[array_key]
# draw the "current" step graphic segment so it
# lines up with the "middle" of the current
# (OHLC) sample.
self._last_line = QLineF(
x[-2], y[-2],
x[-1], y[-1],
)
# draw the "current" step graphic segment so it
# lines up with the "middle" of the current
# (OHLC) sample.
self._last_line = QLineF(
x[-2], y[-2],
x[-1], y[-1],
)
return x, y

View File

@ -426,6 +426,71 @@ def graphics_update_cycle(
profiler('view incremented')
if vlm_chart:
# always update y-label
ds.vlm_sticky.update_from_data(
*array[-1][['index', 'volume']]
)
if (
(
do_rt_update
or do_append
and liv
)
or trigger_all
):
# TODO: make it so this doesn't have to be called
# once the $vlm is up?
vlm_chart.update_graphics_from_flow(
'volume',
# UGGGh, see ``maxmin()`` impl in `._fsp` for
# the overlayed plotitems... we need a better
# bay to invoke a maxmin per overlay..
render=False,
# XXX: ^^^^ THIS IS SUPER IMPORTANT! ^^^^
# without this, since we disable the
# 'volume' (units) chart after the $vlm starts
# up we need to be sure to enable this
# auto-ranging otherwise there will be no handler
# connected to update accompanying overlay
# graphics..
)
profiler('`vlm_chart.update_graphics_from_flow()`')
if (
mx_vlm_in_view != vars['last_mx_vlm']
):
yrange = (0, mx_vlm_in_view * 1.375)
vlm_chart.view._set_yrange(
yrange=yrange,
)
profiler('`vlm_chart.view._set_yrange()`')
# print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}')
vars['last_mx_vlm'] = mx_vlm_in_view
for curve_name, flow in vlm_chart._flows.items():
if not flow.render:
continue
update_fsp_chart(
vlm_chart,
flow,
curve_name,
array_key=curve_name,
# do_append=uppx < update_uppx,
do_append=do_append,
)
# is this even doing anything?
# (pretty sure it's the real-time
# resizing from last quote?)
fvb = flow.plot.vb
fvb._set_yrange(
# autoscale_linked_plots=False,
name=curve_name,
)
ticks_frame = quote.get('ticks', ())
frames_by_type: dict[str, dict] = {}
@ -475,16 +540,15 @@ def graphics_update_cycle(
or do_append
or trigger_all
):
# TODO: we should always update the "last" datum
# since the current range should at least be updated
# to it's max/min on the last pixel.
chart.update_graphics_from_flow(
chart.name,
# do_append=uppx < update_uppx,
do_append=do_append,
)
# NOTE: we always update the "last" datum
# since the current range should at least be updated
# to it's max/min on the last pixel.
# iterate in FIFO order per tick-frame
for typ, tick in lasts.items():
@ -589,116 +653,31 @@ def graphics_update_cycle(
vars['last_mx'], vars['last_mn'] = mx, mn
# run synchronous update on all linked flows
# TODO: should the "main" (aka source) flow be special?
for curve_name, flow in chart._flows.items():
# update any overlayed fsp flows
if curve_name != chart.data_key:
update_fsp_chart(
chart,
flow,
curve_name,
array_key=curve_name,
)
# even if we're downsampled bigly
# draw the last datum in the final
# px column to give the user the mx/mn
# range of that set.
if (
not do_append
# and not do_rt_update
not (do_rt_update or do_append)
and liv
# even if we're downsampled bigly
# draw the last datum in the final
# px column to give the user the mx/mn
# range of that set.
):
flow.draw_last(
array_key=curve_name,
only_last_uppx=True,
)
# always update the last datum-element
# graphic for all flows
flow.draw_last(array_key=curve_name)
# volume chart logic..
# TODO: can we unify this with the above loop?
if vlm_chart:
# always update y-label
ds.vlm_sticky.update_from_data(
*array[-1][['index', 'volume']]
# TODO: should the "main" (aka source) flow be special?
if curve_name == chart.data_key:
continue
update_fsp_chart(
chart,
flow,
curve_name,
array_key=curve_name,
)
if (
(
do_rt_update
or do_append
and liv
)
or trigger_all
):
# TODO: make it so this doesn't have to be called
# once the $vlm is up?
vlm_chart.update_graphics_from_flow(
'volume',
# UGGGh, see ``maxmin()`` impl in `._fsp` for
# the overlayed plotitems... we need a better
# bay to invoke a maxmin per overlay..
render=False,
# XXX: ^^^^ THIS IS SUPER IMPORTANT! ^^^^
# without this, since we disable the
# 'volume' (units) chart after the $vlm starts
# up we need to be sure to enable this
# auto-ranging otherwise there will be no handler
# connected to update accompanying overlay
# graphics..
)
profiler('`vlm_chart.update_graphics_from_flow()`')
if (
mx_vlm_in_view != vars['last_mx_vlm']
):
yrange = (0, mx_vlm_in_view * 1.375)
vlm_chart.view._set_yrange(
yrange=yrange,
)
profiler('`vlm_chart.view._set_yrange()`')
# print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}')
vars['last_mx_vlm'] = mx_vlm_in_view
for curve_name, flow in vlm_chart._flows.items():
if (
curve_name != 'volume' and
flow.render and (
liv and
do_rt_update or do_append
)
):
update_fsp_chart(
vlm_chart,
flow,
curve_name,
array_key=curve_name,
# do_append=uppx < update_uppx,
do_append=do_append,
)
# is this even doing anything?
# (pretty sure it's the real-time
# resizing from last quote?)
fvb = flow.plot.vb
fvb._set_yrange(
name=curve_name,
)
elif (
curve_name != 'volume'
and not do_append
and liv
and uppx >= 1
# even if we're downsampled bigly
# draw the last datum in the final
# px column to give the user the mx/mn
# range of that set.
):
# always update the last datum-element
# graphic for all flows
# print(f'drawing last {flow.name}')
flow.draw_last(array_key=curve_name)
async def display_symbol_data(
godwidget: GodWidget,

View File

@ -175,7 +175,6 @@ def render_baritems(
name=f'{flow.name}_ds_ohlc',
color=bars._color,
)
flow.ds_graphics = curve
curve.hide()
self.plot.addItem(curve)
@ -193,20 +192,18 @@ def render_baritems(
uppx = curve.x_uppx()
in_line = should_line = curve.isVisible()
if (
in_line
should_line
and uppx < x_gt
):
# print('FLIPPING TO BARS')
should_line = False
flow._in_ds = False
elif (
not in_line
not should_line
and uppx >= x_gt
):
# print('FLIPPING TO LINE')
should_line = True
flow._in_ds = True
profiler(f'ds logic complete line={should_line}')
@ -336,13 +333,7 @@ class Flow(msgspec.Struct): # , frozen=True):
'''
name: str
plot: pg.PlotItem
graphics: Union[Curve, BarItems]
# in some cases a flow may want to change its
# graphical "type" or, "form" when downsampling,
# normally this is just a plain line.
ds_graphics: Optional[Curve] = None
graphics: Curve
_shm: ShmArray
is_ohlc: bool = False
@ -549,7 +540,6 @@ class Flow(msgspec.Struct): # , frozen=True):
should_redraw: bool = False
rkwargs = {}
should_line = False
if isinstance(graphics, BarItems):
# XXX: special case where we change out graphics
# to a line after a certain uppx threshold.
@ -566,8 +556,8 @@ class Flow(msgspec.Struct): # , frozen=True):
profiler,
**kwargs,
)
# bars = True
should_redraw = changed_to_line or not should_line
self._in_ds = should_line
else:
r = self._src_r
@ -671,17 +661,6 @@ class Flow(msgspec.Struct): # , frozen=True):
# assign output paths to graphicis obj
graphics.path = r.path
graphics.fast_path = r.fast_path
# XXX: we don't need this right?
# graphics.draw_last_datum(
# path,
# src_array,
# data,
# reset,
# array_key,
# )
# graphics.update()
# profiler('.update()')
else:
# assign output paths to graphicis obj
graphics.path = r.path
@ -694,15 +673,16 @@ class Flow(msgspec.Struct): # , frozen=True):
reset,
array_key,
)
graphics.update()
profiler('.update()')
# TODO: is this ever better?
# graphics.prepareGeometryChange()
# profiler('.prepareGeometryChange()')
# TODO: does this actuallly help us in any way (prolly should
# look at the source / ask ogi). I think it avoid artifacts on
# wheel-scroll downsampling curve updates?
# TODO: is this ever better?
# graphics.prepareGeometryChange()
# profiler('.prepareGeometryChange()')
graphics.update()
profiler('.update()')
# track downsampled state
self._in_ds = r._in_ds
@ -712,7 +692,6 @@ class Flow(msgspec.Struct): # , frozen=True):
def draw_last(
self,
array_key: Optional[str] = None,
only_last_uppx: bool = False,
) -> None:
@ -732,41 +711,19 @@ class Flow(msgspec.Struct): # , frozen=True):
array_key,
)
# the renderer is downsampling we choose
# to always try and updadte a single (interpolating)
# line segment that spans and tries to display
# the las uppx's worth of datums.
# we only care about the last pixel's
# worth of data since that's all the screen
# can represent on the last column where
# the most recent datum is being drawn.
if self._in_ds or only_last_uppx:
dsg = self.ds_graphics or self.graphics
# XXX: pretty sure we don't need this?
# if isinstance(g, Curve):
# with dsg.reset_cache():
if self._in_ds:
# we only care about the last pixel's
# worth of data since that's all the screen
# can represent on the last column where
# the most recent datum is being drawn.
uppx = self._last_uppx
y = y[-uppx:]
ymn, ymx = y.min(), y.max()
# print(f'drawing uppx={uppx} mxmn line: {ymn}, {ymx}')
try:
iuppx = x[-uppx]
except IndexError:
# we're less then an x-px wide so just grab the start
# datum index.
iuppx = x[0]
dsg._last_line = QLineF(
iuppx, ymn,
g._last_line = QLineF(
x[-2], ymn,
x[-1], ymx,
)
# print(f'updating DS curve {self.name}')
dsg.update()
else:
# print(f'updating NOT DS curve {self.name}')
g.update()
def by_index_and_key(

View File

@ -440,7 +440,7 @@ class FspAdmin:
# if the chart isn't hidden try to update
# the data on screen.
if not self.linked.isHidden():
log.debug(f'Re-syncing graphics for fsp: {ns_path}')
log.info(f'Re-syncing graphics for fsp: {ns_path}')
self.linked.graphics_cycle(
trigger_all=True,
prepend_update_index=info['first'],

View File

@ -57,7 +57,6 @@ setup(
# from github currently (see requirements.txt)
# 'trimeter', # not released yet..
# 'tractor',
# asyncvnc,
# brokers
'asks==2.4.8',
@ -72,34 +71,32 @@ setup(
# UI
'PyQt5',
# 'pyqtgraph', from our fork see reqs.txt
'qdarkstyle >= 3.0.2', # themeing
'fuzzywuzzy[speedup]', # fuzzy search
'pyqtgraph',
'qdarkstyle >= 3.0.2',
# fuzzy search
'fuzzywuzzy[speedup]',
# tsdbs
# anyio-marketstore # from gh see reqs.txt
'pymarketstore',
],
extras_require={
# serialization
'tsdb': [
'docker',
],
},
tests_require=['pytest'],
python_requires=">=3.10",
keywords=[
"async",
"trading",
"finance",
"quant",
"charting",
],
python_requires=">=3.9", # literally for ``datetime.datetime.fromisoformat``...
keywords=["async", "trading", "finance", "quant", "charting"],
classifiers=[
'Development Status :: 3 - Alpha',
'License :: OSI Approved :: ',
'Operating System :: POSIX :: Linux',
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
'Intended Audience :: Financial and Insurance Industry',
'Intended Audience :: Science/Research',