Compare commits

..

No commits in common. "8c1905e35a00a6325753e058182f83c8b050823a" and "1967bc79737cbfc5e5158fe4294e1208fd9ad2d7" have entirely different histories.

7 changed files with 121 additions and 302 deletions

View File

@ -30,7 +30,7 @@ from trio_typing import TaskStatus
import tractor
import docker
import json
from docker.models.containers import Container as DockerContainer
from docker.models.containers import Container
from docker.errors import DockerException, APIError
from requests.exceptions import ConnectionError, ReadTimeout
@ -133,136 +133,6 @@ async def open_docker(
c.kill()
class Container:
'''
Wrapper around a ``docker.models.containers.Container`` to include
log capture and relay through our native logging system and helper
method(s) for cancellation/teardown.
'''
def __init__(
self,
cntr: DockerContainer,
) -> None:
self.cntr = cntr
# log msg de-duplication
self.seen_so_far = set()
async def process_logs_until(
self,
patt: str,
bp_on_msg: bool = False,
) -> bool:
'''
Attempt to capture container log messages and relay through our
native logging system.
'''
seen_so_far = self.seen_so_far
while True:
logs = self.cntr.logs()
entries = logs.decode().split('\n')
for entry in entries:
# ignore null lines
if not entry:
continue
try:
record = json.loads(entry.strip())
except json.JSONDecodeError:
if 'Error' in entry:
raise RuntimeError(entry)
raise
msg = record['msg']
level = record['level']
if msg and entry not in seen_so_far:
seen_so_far.add(entry)
if bp_on_msg:
await tractor.breakpoint()
getattr(log, level, log.error)(f'{msg}')
if patt in msg:
return True
# do a checkpoint so we don't block if cancelled B)
await trio.sleep(0.01)
return False
def try_signal(
self,
signal: str = 'SIGINT',
) -> bool:
try:
# XXX: market store doesn't seem to shutdown nicely all the
# time with this (maybe because there are still open grpc
# connections?) noticably after client connections have been
# made or are in use/teardown. It works just fine if you
# just start and stop the container tho?..
log.cancel(f'SENDING {signal} to {self.cntr.id}')
self.cntr.kill(signal)
return True
except docker.errors.APIError as err:
# _err = err
if 'is not running' in err.explanation:
return False
async def cancel(
self,
) -> None:
cid = self.cntr.id
self.try_signal('SIGINT')
with trio.move_on_after(0.5) as cs:
cs.shield = True
# print('PROCESSINGN LOGS')
await self.process_logs_until('initiating graceful shutdown')
# print('SHUTDOWN REPORTED BY CONTAINER')
await self.process_logs_until('exiting...',)
for _ in range(10):
with trio.move_on_after(0.5) as cs:
cs.shield = True
# print('waiting on EXITING')
await self.process_logs_until('exiting...',)
# print('got EXITING')
break
if cs.cancelled_caught:
# get out the big guns, bc apparently marketstore
# doesn't actually know how to terminate gracefully
# :eyeroll:...
self.try_signal('SIGKILL')
try:
log.info('Waiting on container shutdown: {cid}')
self.cntr.wait(
timeout=0.1,
condition='not-running',
)
break
except (
ReadTimeout,
ConnectionError,
):
log.error(f'failed to wait on container {cid}')
raise
else:
raise RuntimeError('Failed to cancel container {cid}')
log.cancel(f'Container stopped: {cid}')
@tractor.context
async def open_marketstored(
ctx: tractor.Context,
@ -305,7 +175,7 @@ async def open_marketstored(
type='bind',
)
dcntr: DockerContainer = client.containers.run(
cntr: Container = client.containers.run(
'alpacamarkets/marketstore:latest',
# do we need this for cmds?
# '-i',
@ -321,10 +191,42 @@ async def open_marketstored(
init=True,
# remove=True,
)
cntr = Container(dcntr)
try:
seen_so_far = set()
with trio.move_on_after(1):
found = await cntr.process_logs_until(
async def process_logs_until(
match: str,
bp_on_msg: bool = False,
):
logs = cntr.logs(stream=True)
for entry in logs:
entry = entry.decode()
try:
record = json.loads(entry.strip())
except json.JSONDecodeError:
if 'Error' in entry:
raise RuntimeError(entry)
msg = record['msg']
level = record['level']
if msg and entry not in seen_so_far:
seen_so_far.add(entry)
if bp_on_msg:
await tractor.breakpoint()
getattr(log, level, log.error)(f'{msg}')
# if "launching tcp listener for all services..." in msg:
if match in msg:
return True
# do a checkpoint so we don't block if cancelled B)
await trio.sleep(0)
return False
with trio.move_on_after(0.5):
found = await process_logs_until(
"launching tcp listener for all services...",
)
@ -333,48 +235,34 @@ async def open_marketstored(
'Failed to start `marketstore` check logs deats'
)
await ctx.started((cntr.cntr.id, os.getpid()))
await ctx.started(cntr.id)
# async with ctx.open_stream() as stream:
try:
# TODO: we might eventually want a proxy-style msg-prot here
# to allow remote control of containers without needing
# callers to have root perms?
await trio.sleep_forever()
# await cntr.cancel()
# with trio.CancelScope(shield=True):
# # block for the expected "teardown log msg"..
# # await cntr.process_logs_until('exiting...',)
# # only msg should be to signal killing the
# # container and this super daemon.
# msg = await stream.receive()
# # print("GOT CANCEL MSG")
# cid = msg['cancel']
# log.cancel(f'Cancelling container {cid}')
# # print("CANCELLING CONTAINER")
# await cntr.cancel()
# # print("SENDING ACK")
# await stream.send('ack')
# block for the expected "teardown log msg"..
await process_logs_until('exiting...',)
except (
BaseException,
# trio.Cancelled,
# KeyboardInterrupt,
):
with trio.CancelScope(shield=True):
await cntr.cancel()
# await stream.send('ack')
cntr.kill('SIGINT')
with trio.move_on_after(0.5) as cs:
cs.shield = True
await process_logs_until('exiting...',)
raise
finally:
try:
cntr.wait(
timeout=0.5,
condition='not-running',
)
except (
ReadTimeout,
ConnectionError,
):
cntr.kill()
async def start_ahab(
service_name: str,
@ -423,18 +311,9 @@ async def start_ahab(
open_marketstored,
) as (ctx, first):
cid, pid = first
assert str(first)
# run till cancelled
await trio.sleep_forever()
# async with ctx.open_stream() as stream:
# try:
# # run till cancelled
# await trio.sleep_forever()
# finally:
# with trio.CancelScope(shield=True):
# # print('SENDING CANCEL TO MARKETSTORED')
# await stream.send({'cancel': (cid, pid)})
# assert await stream.receive() == 'ack'
# since we demoted root perms in this parent
# we'll get a perms error on proc cleanup in

View File

@ -146,21 +146,13 @@ async def broadcast(
# a given sample period.
subs = sampler.subscribers.get(delay_s, ())
last = -1
if shm is None:
periods = sampler.ohlcv_shms.keys()
# if this is an update triggered by a history update there
# might not actually be any sampling bus setup since there's
# no "live feed" active yet.
if periods:
lowest = min(periods)
lowest = min(sampler.ohlcv_shms.keys())
shm = sampler.ohlcv_shms[lowest][0]
last = shm._last.value
for stream in subs:
try:
await stream.send({'index': last})
await stream.send({'index': shm._last.value})
except (
trio.BrokenResourceError,
trio.ClosedResourceError

View File

@ -351,7 +351,7 @@ class ShmArray:
# tries to access ``.array`` (which due to the index
# overlap will be empty). Pretty sure we've fixed it now
# but leaving this here as a reminder.
if prepend and update_first and length:
if prepend and update_first:
assert index < self._first.value
if (

View File

@ -21,7 +21,6 @@ from __future__ import annotations
from typing import Any
import decimal
from bidict import bidict
import numpy as np
from pydantic import BaseModel
# from numba import from_dtype
@ -48,16 +47,16 @@ base_ohlc_dtype = np.dtype(ohlc_fields)
# https://github.com/numba/numba/issues/4511
# numba_ohlc_dtype = from_dtype(base_ohlc_dtype)
# map time frame "keys" to seconds values
tf_in_1s = bidict({
1: '1s',
60: '1m',
60*5: '5m',
60*15: '15m',
60*30: '30m',
60*60: '1h',
60*60*24: '1d',
})
# map time frame "keys" to minutes values
tf_in_1m = {
'1m': 1,
'5m': 5,
'15m': 15,
'30m': 30,
'1h': 60,
'4h': 240,
'1d': 1440,
}
def mk_fqsn(

View File

@ -31,7 +31,6 @@ from typing import (
AsyncIterator, Optional,
Generator,
Awaitable,
TYPE_CHECKING,
)
import trio
@ -75,8 +74,6 @@ from ..brokers._util import (
DataUnavailable,
)
if TYPE_CHECKING:
from .marketstore import Storage
log = get_logger(__name__)
@ -211,28 +208,28 @@ def diff_history(
) -> np.ndarray:
to_push = array
if last_tsdb_dt:
s_diff = (start_dt - last_tsdb_dt).seconds
to_push = array[:s_diff]
# if we detect a partial frame's worth of data
# that is new, slice out only that history and
# write to shm.
if (
s_diff < 0
and abs(s_diff) < len(array)
):
# the + 1 is because ``last_tsdb_dt`` is pulled from
# the last row entry for the ``'time'`` field retreived
# from the tsdb.
to_push = array[abs(s_diff)+1:]
if abs(s_diff) < len(array):
log.info(
f'Pushing partial frame {to_push.size} to shm'
)
# assert last_tsdb_dt > start_dt
# selected = array['time'] > last_tsdb_dt.timestamp()
# to_push = array[selected]
# return to_push
return to_push
else:
return array
async def start_backfill(
mod: ModuleType,
@ -240,8 +237,6 @@ async def start_backfill(
shm: ShmArray,
last_tsdb_dt: Optional[datetime] = None,
storage: Optional[Storage] = None,
write_tsdb: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -253,17 +248,6 @@ async def start_backfill(
# back to what is recorded in the tsdb
array, start_dt, end_dt = await hist(end_dt=None)
times = array['time']
# sample period step size in seconds
step_size_s = (
pendulum.from_timestamp(times[-1]) -
pendulum.from_timestamp(times[-2])
).seconds
# "frame"'s worth of sample period steps in seconds
frame_size_s = len(array) * step_size_s
to_push = diff_history(
array,
start_dt,
@ -283,13 +267,20 @@ async def start_backfill(
# let caller unblock and deliver latest history frame
task_status.started((shm, start_dt, end_dt, bf_done))
times = array['time']
step_size_s = (
pendulum.from_timestamp(times[-1]) -
pendulum.from_timestamp(times[-2])
).seconds
frame_size_s = len(to_push) * step_size_s
if last_tsdb_dt is None:
# maybe a better default (they don't seem to define epoch?!)
# based on the sample step size load a certain amount
# history
if step_size_s == 1:
last_tsdb_dt = pendulum.now().subtract(days=2)
last_tsdb_dt = pendulum.now().subtract(days=6)
elif step_size_s == 60:
last_tsdb_dt = pendulum.now().subtract(years=2)
@ -372,7 +363,7 @@ async def start_backfill(
except NoData:
log.warning(
f'NO DATA for {frame_size_s}s frame @ {input_end_dt} ?!?'
f'NO DATA for {frame_size_s}s frame @ {end_dt} ?!?'
)
return None # discard signal
@ -411,27 +402,15 @@ async def start_backfill(
)
# reset dtrange gen to new start point
try:
next_end = iter_dts_gen.send(start_dt)
log.info(
f'Reset frame index to start at {start_dt}\n'
f'Was at {next_end}'
)
# NOTE: manually set "earliest end datetime" index-value
# to avoid the request loop getting confused about
# new frames that are earlier in history - i.e. this
# **is not** the case of out-of-order frames from
# an async batch request.
# TODO: can we avoid this?
earliest_end_dt = start_dt
except StopIteration:
# gen already terminated meaning we probably already
# exhausted it via frame requests.
log.info(
"Datetime index already exhausted, can't reset.."
)
to_push = diff_history(
array,
start_dt,
@ -540,7 +519,6 @@ async def start_backfill(
break
to_push, start_dt, end_dt = frames.pop(epoch)
ln = len(to_push)
# bail gracefully on shm allocation overrun/full condition
try:
@ -549,30 +527,19 @@ async def start_backfill(
log.info(
f'Shm buffer overrun on: {start_dt} -> {end_dt}?'
)
# await tractor.breakpoint()
break
log.info(
f'Shm pushed {ln} frame:\n'
f'Shm pushed {len(to_push)} frame:\n'
f'{start_dt} -> {end_dt}'
)
# keep track of most recent "prepended" ``start_dt``
# both for detecting gaps and ensuring async
# frame-result order.
earliest_end_dt = start_dt
if (
storage is not None
and write_tsdb
):
log.info(
f'Writing {ln} frame to storage:\n'
f'{start_dt} -> {end_dt}'
)
await storage.write_ohlcv(
f'{bfqsn}.{mod.name}', # lul..
to_push,
)
# TODO: can we only trigger this if the respective
# history in "in view"?!?
# XXX: extremely important, there can be no checkpoints
@ -641,7 +608,7 @@ async def manage_history(
# shm backfiller approach below.
# start history anal and load missing new data via backend.
series, _, last_tsdb_dt = await storage.load(fqsn)
series, first_dt, last_dt = await storage.load(fqsn)
broker, symbol, expiry = unpack_fqsn(fqsn)
(
@ -655,8 +622,7 @@ async def manage_history(
mod,
bfqsn,
shm,
last_tsdb_dt=last_tsdb_dt,
storage=storage,
last_tsdb_dt=last_dt,
)
)
@ -677,10 +643,8 @@ async def manage_history(
# do diff against last start frame of history and only fill
# in from the tsdb an allotment that allows for most recent
# to be loaded into mem *before* tsdb data.
if last_tsdb_dt:
dt_diff_s = (
latest_start_dt - last_tsdb_dt
).seconds
if last_dt:
dt_diff_s = (latest_start_dt - last_dt).seconds
else:
dt_diff_s = 0
@ -709,7 +673,7 @@ async def manage_history(
field_map=marketstore.ohlc_key_map,
)
# load as much from storage into shm as space will
# load as much from storage into shm as spacec will
# allow according to user's shm size settings.
count = 0
end = fastest['Epoch'][0]
@ -734,11 +698,15 @@ async def manage_history(
prepend=True,
# update_first=False,
# start=prepend_start,
field_map=marketstore.ohlc_key_map,
field_map={
'Epoch': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
},
)
# manually trigger step update to update charts/fsps
# which need an incremental update.
for delay_s in sampler.subscribers:
await broadcast(delay_s)

View File

@ -191,9 +191,6 @@ class ContentsLabel(pg.LabelItem):
self.setText(
"<b>i</b>:{index}<br/>"
# NB: these fields must be indexed in the correct order via
# the slice syntax below.
"<b>epoch</b>:{}<br/>"
"<b>O</b>:{}<br/>"
"<b>H</b>:{}<br/>"
"<b>L</b>:{}<br/>"
@ -201,15 +198,7 @@ class ContentsLabel(pg.LabelItem):
"<b>V</b>:{}<br/>"
"<b>wap</b>:{}".format(
*array[index - first][
[
'time',
'open',
'high',
'low',
'close',
'volume',
'bar_wap',
]
['open', 'high', 'low', 'close', 'volume', 'bar_wap']
],
name=name,
index=index,

View File

@ -29,7 +29,6 @@ from typing import Optional, Any, Callable
import numpy as np
import tractor
import trio
import pendulum
import pyqtgraph as pg
from .. import brokers
@ -48,7 +47,6 @@ from ._fsp import (
open_vlm_displays,
)
from ..data._sharedmem import ShmArray
from ..data._source import tf_in_1s
from ._forms import (
FieldsForm,
mk_order_pane_layout,
@ -662,17 +660,11 @@ async def display_symbol_data(
symbol = feed.symbols[sym]
fqsn = symbol.front_fqsn()
times = bars['time']
end = pendulum.from_timestamp(times[-1])
start = pendulum.from_timestamp(times[times != times[-1]][-1])
step_size_s = (end - start).seconds
tf_key = tf_in_1s[step_size_s]
# load in symbol's ohlc data
godwidget.window.setWindowTitle(
f'{fqsn} '
f'tick:{symbol.tick_size} '
f'step:{tf_key} '
f'step:1s '
)
linked = godwidget.linkedsplits