Merge pull request #256 from pikers/misc_backend_fixes

Misc backend fixes
py3.10_support
goodboy 2022-01-25 07:58:30 -05:00 committed by GitHub
commit 0131160896
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 125 additions and 47 deletions

28
notes_to_self.rst 100644
View File

@ -0,0 +1,28 @@
Notes to self
=============
chicken scratch we shan't forget, consider this staging
for actual feature issues on wtv git wrapper-provider we're
using (no we shan't stick with GH long term likely).
cool chart features
-------------------
- allow right-click to spawn shell with current in view
data passed to the new process via ``msgpack-numpy``.
- expand OHLC datum to lower time frame.
- auto-highlight current time range on tick feed
features from IB charting
-------------------------
- vlm diffing from ticks and compare when bar arrives from historical
- should help isolate dark vlm / trades
chart ux ideas
--------------
- hotkey to zoom to order intersection (horizontal line) with previous
price levels (+ some margin obvs).
- L1 "lines" (queue size repr) should normalize to some fixed x width
such that when levels with more vlm appear other smaller levels are
scaled down giving an immediate indication of the liquidity diff.

View File

@ -21,7 +21,7 @@ Profiling wrappers for internal libs.
import time
from functools import wraps
_pg_profile: bool = False
_pg_profile: bool = True
def pg_profile_enabled() -> bool:

View File

@ -19,7 +19,7 @@ Binance backend
"""
from contextlib import asynccontextmanager
from typing import List, Dict, Any, Tuple, Union, Optional
from typing import List, Dict, Any, Tuple, Union, Optional, AsyncGenerator
import time
import trio
@ -37,7 +37,7 @@ from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound
from ..log import get_logger, get_console_log
from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws
from ..data._web_bs import open_autorecon_ws, NoBsWs
log = get_logger(__name__)
@ -213,7 +213,7 @@ class Client:
)
# repack in dict form
return {item[0]['symbol']: item[0]
for item in matches}
for item in matches}
async def bars(
self,
@ -295,7 +295,7 @@ class AggTrade(BaseModel):
M: bool # Ignore
async def stream_messages(ws):
async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
timeouts = 0
while True:
@ -487,11 +487,20 @@ async def stream_quotes(
# signal to caller feed is ready for consumption
feed_is_live.set()
# import time
# last = time.time()
# start streaming
async for typ, msg in msg_gen:
# period = time.time() - last
# hz = 1/period if period else float('inf')
# if hz > 60:
# log.info(f'Binance quotez : {hz}')
topic = msg['symbol'].lower()
await send_chan.send({topic: msg})
# last = time.time()
@tractor.context

View File

@ -27,27 +27,32 @@ _mag2suffix = bidict({3: 'k', 6: 'M', 9: 'B'})
def humanize(
number: float,
digits: int = 1
) -> str:
'''Convert large numbers to something with at most ``digits`` and
'''
Convert large numbers to something with at most ``digits`` and
a letter suffix (eg. k: thousand, M: million, B: billion).
'''
try:
float(number)
except ValueError:
return 0
return '0'
if not number or number <= 0:
return round(number, ndigits=digits)
return str(round(number, ndigits=digits))
mag = math.floor(math.log(number, 10))
mag = round(math.log(number, 10))
if mag < 3:
return round(number, ndigits=digits)
return str(round(number, ndigits=digits))
maxmag = max(itertools.takewhile(lambda key: mag >= key, _mag2suffix))
maxmag = max(
itertools.takewhile(
lambda key: mag >= key, _mag2suffix
)
)
return "{value}{suffix}".format(
value=round(number/10**maxmag, ndigits=digits),

View File

@ -20,6 +20,7 @@ In da suit parlances: "Execution management systems"
"""
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from math import isnan
from pprint import pformat
import time
from typing import AsyncIterator, Callable
@ -47,9 +48,11 @@ log = get_logger(__name__)
# TODO: numba all of this
def mk_check(
trigger_price: float,
known_last: float,
action: str,
) -> Callable[[float, float], bool]:
"""Create a predicate for given ``exec_price`` based on last known
price, ``known_last``.
@ -77,8 +80,7 @@ def mk_check(
return check_lt
else:
return None
raise ValueError('trigger: {trigger_price}, last: {known_last}')
@dataclass
@ -177,7 +179,15 @@ async def clear_dark_triggers(
tuple(execs.items())
):
if not pred or (ttype not in tf) or (not pred(price)):
if (
not pred or
ttype not in tf or
not pred(price)
):
log.debug(
f'skipping quote for {sym} '
f'{pred}, {ttype} not in {tf}?, {pred(price)}'
)
# majority of iterations will be non-matches
continue
@ -246,7 +256,11 @@ async def clear_dark_triggers(
# remove exec-condition from set
log.info(f'removing pred for {oid}')
execs.pop(oid)
pred = execs.pop(oid, None)
if not pred:
log.warning(
f'pred for {oid} was already removed!?'
)
await ems_client_order_stream.send(msg)
@ -1005,7 +1019,8 @@ async def _emsd_main(
first_quote = feed.first_quotes[symbol]
book = _router.get_dark_book(broker)
book.lasts[(broker, symbol)] = first_quote['last']
last = book.lasts[(broker, symbol)] = first_quote['last']
assert not isnan(last) # ib is a cucker but we've fixed it in the backend
# open a stream with the brokerd backend for order
# flow dialogue

View File

@ -107,7 +107,7 @@ def services(config, tl, names):
async with tractor.get_arbiter(
*_tractor_kwargs['arbiter_addr']
) as portal:
registry = await portal.run('self', 'get_registry')
registry = await portal.run_from_ns('self', 'get_registry')
json_d = {}
for uid, socket in registry.items():
name, uuid = uid

View File

@ -135,6 +135,7 @@ async def increment_ohlc_buffer(
async def iter_ohlc_periods(
ctx: tractor.Context,
delay_s: int,
) -> None:
"""
Subscribe to OHLC sampling "step" events: when the time
@ -252,11 +253,17 @@ async def sample_and_broadcast(
try:
stream.send_nowait((sym, quote))
except trio.WouldBlock:
log.warning(
f'Feed overrun {bus.brokername} ->'
f'{stream._ctx.channel.uid} !!!'
)
ctx = getattr(sream, '_ctx', None)
if ctx:
log.warning(
f'Feed overrun {bus.brokername} ->'
f'{ctx.channel.uid} !!!'
)
else:
log.warning(
f'Feed overrun {bus.brokername} -> '
f'feed @ {tick_throttle} Hz'
)
else:
await stream.send({sym: quote})
@ -270,18 +277,20 @@ async def sample_and_broadcast(
trio.ClosedResourceError,
trio.EndOfChannel,
):
ctx = getattr(stream, '_ctx', None)
if ctx:
log.warning(
f'{ctx.chan.uid} dropped '
'`brokerd`-quotes-feed connection'
)
if tick_throttle:
assert stream.closed()
# XXX: do we need to deregister here
# if it's done in the fee bus code?
# so far seems like no since this should all
# be single-threaded.
log.warning(
f'{stream._ctx.chan.uid} dropped '
'`brokerd`-quotes-feed connection'
)
if tick_throttle:
assert stream.closed()
# await stream.aclose()
# be single-threaded. Doing it anyway though
# since there seems to be some kinda race..
subs.remove((stream, tick_throttle))

View File

@ -394,6 +394,7 @@ def open_shm_array(
# "unlink" created shm on process teardown by
# pushing teardown calls onto actor context stack
tractor._actor._lifetime_stack.callback(shmarr.close)
tractor._actor._lifetime_stack.callback(shmarr.destroy)

View File

@ -133,9 +133,11 @@ def mk_symbol(
def from_df(
df: pd.DataFrame,
source=None,
default_tf=None
) -> np.recarray:
"""Convert OHLC formatted ``pandas.DataFrame`` to ``numpy.recarray``.

View File

@ -20,7 +20,7 @@ ToOlS fOr CoPInG wITh "tHE wEB" protocols.
"""
from contextlib import asynccontextmanager, AsyncExitStack
from types import ModuleType
from typing import Any, Callable
from typing import Any, Callable, AsyncGenerator
import json
import trio
@ -127,7 +127,7 @@ async def open_autorecon_ws(
# TODO: proper type annot smh
fixture: Callable,
):
) -> AsyncGenerator[tuple[...], NoBsWs]:
"""Apparently we can QoS for all sorts of reasons..so catch em.
"""

View File

@ -592,7 +592,8 @@ async def maybe_open_feed(
**kwargs,
) -> (Feed, ReceiveChannel[dict[str, Any]]):
'''Maybe open a data to a ``brokerd`` daemon only if there is no
'''
Maybe open a data to a ``brokerd`` daemon only if there is no
local one for the broker-symbol pair, if one is cached use it wrapped
in a tractor broadcast receiver.

View File

@ -328,8 +328,10 @@ class FastAppendCurve(pg.PlotCurveItem):
profiler = pg.debug.Profiler(disabled=not pg_profile_enabled())
# p.setRenderHint(p.Antialiasing, True)
if self._step_mode:
if (
self._step_mode
and self._last_step_rect
):
brush = self.opts['brush']
# p.drawLines(*tuple(filter(bool, self._last_step_lines)))
# p.drawRect(self._last_step_rect)

View File

@ -26,7 +26,9 @@ import trio
from PyQt5 import QtCore
from PyQt5.QtCore import QEvent, pyqtBoundSignal
from PyQt5.QtWidgets import QWidget
from PyQt5.QtWidgets import QGraphicsSceneMouseEvent as gs_mouse
from PyQt5.QtWidgets import (
QGraphicsSceneMouseEvent as gs_mouse,
)
MOUSE_EVENTS = {
@ -129,6 +131,8 @@ class EventRelay(QtCore.QObject):
# TODO: is there a global setting for this?
if ev.isAutoRepeat() and self._filter_auto_repeats:
ev.ignore()
# filter out this event and stop it's processing
# https://doc.qt.io/qt-5/qobject.html#installEventFilter
return True
# NOTE: the event object instance coming out
@ -152,9 +156,6 @@ class EventRelay(QtCore.QObject):
# **do not** filter out this event
# and instead forward to the source widget
return False
# filter out this event
# https://doc.qt.io/qt-5/qobject.html#installEventFilter
return False

View File

@ -174,7 +174,6 @@ class Selection(QComboBox):
def __init__(
self,
parent=None,
) -> None:
self._items: dict[str, int] = {}
@ -200,7 +199,6 @@ class Selection(QComboBox):
def set_style(
self,
color: str,
font_size: int,
@ -217,6 +215,7 @@ class Selection(QComboBox):
def resize(
self,
char: str = 'W',
) -> None:
br = _font.boundingRect(str(char))
_, h = br.width(), br.height()
@ -238,9 +237,11 @@ class Selection(QComboBox):
keys: list[str],
) -> None:
'''Write keys to the selection verbatim.
'''
Write keys to the selection verbatim.
All other items are cleared beforehand.
'''
self.clear()
self._items.clear()
@ -536,7 +537,8 @@ async def open_form_input_handling(
class FillStatusBar(QProgressBar):
'''A status bar for fills up to a position limit.
'''
A status bar for fills up to a position limit.
'''
border_px: int = 2
@ -663,6 +665,7 @@ def mk_fill_status_bar(
)
bar_labels_lhs.addSpacing(5/8 * bar_h)
bar_labels_lhs.addWidget(
left_label,
# XXX: doesn't seem to actually push up against

View File

@ -347,7 +347,8 @@ class CompleterView(QTreeView):
clear_all: bool = False,
) -> None:
'''Set result-rows for depth = 1 tree section ``section``.
'''
Set result-rows for depth = 1 tree section ``section``.
'''
model = self.model()
@ -438,7 +439,8 @@ class SearchBar(Edit):
class SearchWidget(QtWidgets.QWidget):
'''Composed widget of ``SearchBar`` + ``CompleterView``.
'''
Composed widget of ``SearchBar`` + ``CompleterView``.
Includes helper methods for item management in the sub-widgets.