Compare commits

..

No commits in common. "310_plus" and "slic_fix_v2" have entirely different histories.

3 changed files with 51 additions and 89 deletions

View File

@ -19,7 +19,6 @@ Supervisor for docker with included specific-image service helpers.
''' '''
import os import os
import time
from typing import ( from typing import (
Optional, Optional,
Callable, Callable,
@ -187,65 +186,45 @@ class Container:
async def cancel( async def cancel(
self, self,
stop_msg: str,
) -> None: ) -> None:
cid = self.cntr.id cid = self.cntr.id
# first try a graceful cancel
log.cancel(
f'SIGINT cancelling container: {cid}\n'
f'waiting on stop msg: "{stop_msg}"'
)
self.try_signal('SIGINT') self.try_signal('SIGINT')
start = time.time()
for _ in range(30):
with trio.move_on_after(0.5) as cs: with trio.move_on_after(0.5) as cs:
cs.shield = True cs.shield = True
await self.process_logs_until(stop_msg) await self.process_logs_until('initiating graceful shutdown')
await self.process_logs_until('exiting...',)
# if we aren't cancelled on above checkpoint then we for _ in range(10):
# assume we read the expected stop msg and terminated. with trio.move_on_after(0.5) as cs:
cs.shield = True
await self.process_logs_until('exiting...',)
break break
try: if cs.cancelled_caught:
log.info(f'Polling for container shutdown:\n{cid}')
if self.cntr.status not in {'exited', 'not-running'}:
self.cntr.wait(
timeout=0.1,
condition='not-running',
)
break
except (
ReadTimeout,
):
log.info(f'Still waiting on container:\n{cid}')
continue
except (
docker.errors.APIError,
ConnectionError,
):
log.exception('Docker connection failure')
break
else:
delay = time.time() - start
log.error(
f'Failed to kill container {cid} after {delay}s\n'
'sending SIGKILL..'
)
# get out the big guns, bc apparently marketstore # get out the big guns, bc apparently marketstore
# doesn't actually know how to terminate gracefully # doesn't actually know how to terminate gracefully
# :eyeroll:... # :eyeroll:...
self.try_signal('SIGKILL') self.try_signal('SIGKILL')
try:
log.info('Waiting on container shutdown: {cid}')
self.cntr.wait( self.cntr.wait(
timeout=3, timeout=0.1,
condition='not-running', condition='not-running',
) )
break
except (
ReadTimeout,
ConnectionError,
):
log.error(f'failed to wait on container {cid}')
raise
else:
raise RuntimeError('Failed to cancel container {cid}')
log.cancel(f'Container stopped: {cid}') log.cancel(f'Container stopped: {cid}')
@ -266,16 +245,13 @@ async def open_ahabd(
# params, etc. passing to ``Containter.run()``? # params, etc. passing to ``Containter.run()``?
# call into endpoint for container config/init # call into endpoint for container config/init
ep_func = NamespacePath(endpoint).load_ref() ep_func = NamespacePath(endpoint).load_ref()
( dcntr, cntr_config = ep_func(client)
dcntr,
cntr_config,
start_msg,
stop_msg,
) = ep_func(client)
cntr = Container(dcntr) cntr = Container(dcntr)
with trio.move_on_after(1): with trio.move_on_after(1):
found = await cntr.process_logs_until(start_msg) found = await cntr.process_logs_until(
"launching tcp listener for all services...",
)
if not found and cntr not in client.containers.list(): if not found and cntr not in client.containers.list():
raise RuntimeError( raise RuntimeError(
@ -295,9 +271,16 @@ async def open_ahabd(
# callers to have root perms? # callers to have root perms?
await trio.sleep_forever() await trio.sleep_forever()
finally: except (
BaseException,
# trio.Cancelled,
# KeyboardInterrupt,
):
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await cntr.cancel(stop_msg) await cntr.cancel()
raise
async def start_ahab( async def start_ahab(

View File

@ -127,15 +127,10 @@ def start_marketstore(
import os import os
import docker import docker
from .. import config from .. import config
get_console_log('info', name=__name__) get_console_log('info', name=__name__)
mktsdir = os.path.join(config._config_dir, 'marketstore') yml_file = os.path.join(config._config_dir, 'mkts.yml')
# create when dne
if not os.path.isdir(mktsdir):
os.mkdir(mktsdir)
yml_file = os.path.join(mktsdir, 'mkts.yml')
if not os.path.isfile(yml_file): if not os.path.isfile(yml_file):
log.warning( log.warning(
f'No `marketstore` config exists?: {yml_file}\n' f'No `marketstore` config exists?: {yml_file}\n'
@ -148,14 +143,14 @@ def start_marketstore(
# create a mount from user's local piker config dir into container # create a mount from user's local piker config dir into container
config_dir_mnt = docker.types.Mount( config_dir_mnt = docker.types.Mount(
target='/etc', target='/etc',
source=mktsdir, source=config._config_dir,
type='bind', type='bind',
) )
# create a user config subdir where the marketstore # create a user config subdir where the marketstore
# backing filesystem database can be persisted. # backing filesystem database can be persisted.
persistent_data_dir = os.path.join( persistent_data_dir = os.path.join(
mktsdir, 'data', config._config_dir, 'data',
) )
if not os.path.isdir(persistent_data_dir): if not os.path.isdir(persistent_data_dir):
os.mkdir(persistent_data_dir) os.mkdir(persistent_data_dir)
@ -185,14 +180,7 @@ def start_marketstore(
init=True, init=True,
# remove=True, # remove=True,
) )
return ( return dcntr, _config
dcntr,
_config,
# expected startup and stop msgs
"launching tcp listener for all services...",
"exiting...",
)
_tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK') _tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK')
@ -395,12 +383,7 @@ class Storage:
]: ]:
first_tsdb_dt, last_tsdb_dt = None, None first_tsdb_dt, last_tsdb_dt = None, None
tsdb_arrays = await self.read_ohlcv( tsdb_arrays = await self.read_ohlcv(fqsn)
fqsn,
# on first load we don't need to pull the max
# history per request size worth.
limit=3000,
)
log.info(f'Loaded tsdb history {tsdb_arrays}') log.info(f'Loaded tsdb history {tsdb_arrays}')
if tsdb_arrays: if tsdb_arrays:
@ -418,7 +401,6 @@ class Storage:
fqsn: str, fqsn: str,
timeframe: Optional[Union[int, str]] = None, timeframe: Optional[Union[int, str]] = None,
end: Optional[int] = None, end: Optional[int] = None,
limit: int = int(800e3),
) -> tuple[ ) -> tuple[
MarketstoreClient, MarketstoreClient,
@ -441,7 +423,7 @@ class Storage:
# TODO: figure the max limit here given the # TODO: figure the max limit here given the
# ``purepc`` msg size limit of purerpc: 33554432 # ``purepc`` msg size limit of purerpc: 33554432
limit=limit, limit=int(800e3),
) )
if timeframe is None: if timeframe is None:

View File

@ -57,7 +57,6 @@ setup(
# from github currently (see requirements.txt) # from github currently (see requirements.txt)
# 'trimeter', # not released yet.. # 'trimeter', # not released yet..
# 'tractor', # 'tractor',
# asyncvnc,
# brokers # brokers
'asks==2.4.8', 'asks==2.4.8',
@ -72,34 +71,32 @@ setup(
# UI # UI
'PyQt5', 'PyQt5',
# 'pyqtgraph', from our fork see reqs.txt 'pyqtgraph',
'qdarkstyle >= 3.0.2', # themeing 'qdarkstyle >= 3.0.2',
'fuzzywuzzy[speedup]', # fuzzy search # fuzzy search
'fuzzywuzzy[speedup]',
# tsdbs # tsdbs
# anyio-marketstore # from gh see reqs.txt 'pymarketstore',
], ],
extras_require={ extras_require={
# serialization
'tsdb': [ 'tsdb': [
'docker', 'docker',
], ],
}, },
tests_require=['pytest'], tests_require=['pytest'],
python_requires=">=3.10", python_requires=">=3.9", # literally for ``datetime.datetime.fromisoformat``...
keywords=[ keywords=["async", "trading", "finance", "quant", "charting"],
"async",
"trading",
"finance",
"quant",
"charting",
],
classifiers=[ classifiers=[
'Development Status :: 3 - Alpha', 'Development Status :: 3 - Alpha',
'License :: OSI Approved :: ', 'License :: OSI Approved :: ',
'Operating System :: POSIX :: Linux', 'Operating System :: POSIX :: Linux',
"Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: 3 :: Only", "Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.10",
'Intended Audience :: Financial and Insurance Industry', 'Intended Audience :: Financial and Insurance Industry',
'Intended Audience :: Science/Research', 'Intended Audience :: Science/Research',