Merge pull request #181 from pikers/syseng_tweaks

Syseng tweaks
binance_syminfo_and_mintick
goodboy 2021-05-25 08:39:06 -04:00 committed by GitHub
commit 9c821c8cfd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 282 additions and 201 deletions

View File

@ -19,6 +19,8 @@ piker: trading gear for hackers.
"""
import msgpack # noqa
# TODO: remove this now right?
import msgpack_numpy
# patch msgpack for numpy arrays

View File

@ -19,8 +19,9 @@ Structured, daemon tree service management.
"""
from functools import partial
from typing import Optional, Union
from contextlib import asynccontextmanager
from typing import Optional, Union, Callable, Any
from contextlib import asynccontextmanager, AsyncExitStack
from collections import defaultdict
from pydantic import BaseModel
import trio
@ -33,6 +34,10 @@ from .brokers import get_brokermod
log = get_logger(__name__)
_root_dname = 'pikerd'
_tractor_kwargs: dict[str, Any] = {
# use a different registry addr then tractor's default
'arbiter_addr': ('127.0.0.1', 6116),
}
_root_modules = [
__name__,
'piker.clearing._ems',
@ -44,10 +49,34 @@ class Services(BaseModel):
actor_n: tractor._trionics.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
ctx_stack: AsyncExitStack
class Config:
arbitrary_types_allowed = True
async def open_remote_ctx(
self,
portal: tractor.Portal,
target: Callable,
**kwargs,
) -> tractor.Context:
'''
Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` tearodwn.
This allows for allocating long-running sub-services in our main
daemon and explicitly controlling their lifetimes.
'''
ctx, first = await self.ctx_stack.enter_async_context(
portal.open_context(
target,
**kwargs,
)
)
return ctx
_services: Optional[Services] = None
@ -62,20 +91,21 @@ async def open_pikerd(
debug_mode: bool = False,
) -> Optional[tractor._portal.Portal]:
"""
'''
Start a root piker daemon who's lifetime extends indefinitely
until cancelled.
A root actor nursery is created which can be used to create and keep
alive underling services (see below).
"""
'''
global _services
assert _services is None
# XXX: this may open a root actor as well
async with tractor.open_root_actor(
# passed through to ``open_root_actor``
arbiter_addr=_tractor_kwargs['arbiter_addr'],
name=_root_dname,
loglevel=loglevel,
debug_mode=debug_mode,
@ -90,14 +120,18 @@ async def open_pikerd(
) as _, tractor.open_nursery() as actor_nursery:
async with trio.open_nursery() as service_nursery:
# assign globally for future daemon/task creation
_services = Services(
actor_n=actor_nursery,
service_n=service_nursery,
debug_mode=debug_mode,
)
# setup service mngr singleton instance
async with AsyncExitStack() as stack:
yield _services
# assign globally for future daemon/task creation
_services = Services(
actor_n=actor_nursery,
service_n=service_nursery,
debug_mode=debug_mode,
ctx_stack=stack,
)
yield _services
@asynccontextmanager
@ -140,6 +174,7 @@ async def maybe_open_pikerd(
# presume pikerd role
async with open_pikerd(
loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False),
) as _:
# in the case where we're starting up the
# tractor-piker runtime stack in **this** process
@ -193,17 +228,19 @@ async def spawn_brokerd(
# call with and then have the ability to unwind the call whenevs.
# non-blocking setup of brokerd service nursery
_services.service_n.start_soon(
partial(
portal.run,
_setup_persistent_brokerd,
brokername=brokername,
)
await _services.open_remote_ctx(
portal,
_setup_persistent_brokerd,
brokername=brokername,
)
return dname
class Brokerd:
locks = defaultdict(trio.Lock)
@asynccontextmanager
async def maybe_spawn_brokerd(
@ -222,9 +259,15 @@ async def maybe_spawn_brokerd(
dname = f'brokerd.{brokername}'
# serialize access to this section to avoid
# 2 or more tasks racing to create a daemon
lock = Brokerd.locks[brokername]
await lock.acquire()
# attach to existing brokerd if possible
async with tractor.find_actor(dname) as portal:
if portal is not None:
lock.release()
yield portal
return
@ -249,6 +292,7 @@ async def maybe_spawn_brokerd(
)
async with tractor.wait_for_actor(dname) as portal:
lock.release()
yield portal

View File

@ -341,6 +341,10 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]:
class AutoReconWs:
"""Make ``trio_websocketw` sockets stay up no matter the bs.
TODO:
apply any more insights from this:
https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds
"""
recon_errors = (
ConnectionClosed,

View File

@ -179,31 +179,40 @@ async def execute_triggers(
# majority of iterations will be non-matches
continue
# submit_price = price + price*percent_away
submit_price = price + abs_diff_away
action = cmd['action']
log.info(
f'Dark order triggered for price {price}\n'
f'Submitting order @ price {submit_price}')
if action != 'alert':
# executable order submission
reqid = await client.submit_limit(
oid=oid,
# submit_price = price + price*percent_away
submit_price = price + abs_diff_away
# this is a brand new order request for the
# underlying broker so we set out "broker request
# id" (brid) as nothing so that the broker
# client knows that we aren't trying to modify
# an existing order.
brid=None,
log.info(
f'Dark order triggered for price {price}\n'
f'Submitting order @ price {submit_price}')
symbol=sym,
action=cmd['action'],
price=submit_price,
size=cmd['size'],
)
reqid = await client.submit_limit(
oid=oid,
# register broker request id to ems id
book._broker2ems_ids[reqid] = oid
# this is a brand new order request for the
# underlying broker so we set out "broker request
# id" (brid) as nothing so that the broker
# client knows that we aren't trying to modify
# an existing order.
brid=None,
symbol=sym,
action=cmd['action'],
price=submit_price,
size=cmd['size'],
)
# register broker request id to ems id
book._broker2ems_ids[reqid] = oid
else:
# alerts have no broker request id
reqid = ''
resp = {
'resp': 'dark_executed',
@ -233,19 +242,20 @@ async def execute_triggers(
async def exec_loop(
ctx: tractor.Context,
feed: 'Feed', # noqa
broker: str,
symbol: str,
_exec_mode: str,
task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED,
) -> AsyncIterator[dict]:
"""Main scan loop for order execution conditions and submission
to brokers.
"""
# TODO: get initial price quote from target broker
# XXX: this should be initial price quote from target provider
first_quote = await feed.receive()
book = get_dark_book(broker)
@ -342,6 +352,7 @@ async def process_broker_trades(
# TODO: make this a context
# in the paper engine case this is just a mem receive channel
async with feed.receive_trades_data() as trades_stream:
first = await trades_stream.__anext__()
# startup msg expected as first from broker backend
@ -642,35 +653,18 @@ async def _emsd_main(
dark_book = get_dark_book(broker)
# get a portal back to the client
async with tractor.wait_for_actor(client_actor_name) as portal:
# spawn one task per broker feed
async with trio.open_nursery() as n:
# spawn one task per broker feed
async with trio.open_nursery() as n:
# TODO: eventually support N-brokers
async with data.open_feed(
broker,
[symbol],
loglevel='info',
) as feed:
# TODO: eventually support N-brokers
async with data.open_feed(
broker,
[symbol],
loglevel='info',
) as feed:
# start the condition scan loop
quote, feed, client = await n.start(
exec_loop,
ctx,
feed,
broker,
symbol,
_mode,
)
await n.start(
process_broker_trades,
ctx,
feed,
dark_book,
)
# get a portal back to the client
async with tractor.wait_for_actor(client_actor_name) as portal:
# connect back to the calling actor (the one that is
# acting as an EMS client and will submit orders) to
@ -681,6 +675,23 @@ async def _emsd_main(
symbol_key=symbol,
) as order_stream:
# start the condition scan loop
quote, feed, client = await n.start(
exec_loop,
ctx,
feed,
broker,
symbol,
_mode,
)
await n.start(
process_broker_trades,
ctx,
feed,
dark_book,
)
# start inbound order request processing
await process_order_cmds(
ctx,

View File

@ -37,7 +37,7 @@ _context_defaults = dict(
def pikerd(loglevel, host, tl, pdb):
"""Spawn the piker broker-daemon.
"""
from .._daemon import _data_mods, open_pikerd
from .._daemon import open_pikerd
log = get_console_log(loglevel)
if pdb:
@ -112,11 +112,11 @@ def services(config, tl, names):
def _load_clis() -> None:
from ..data import marketstore as _
from ..data import cli as _
from ..brokers import cli as _ # noqa
from ..ui import cli as _ # noqa
from ..watchlists import cli as _ # noqa
from ..data import marketstore # noqa
from ..data import cli # noqa
from ..brokers import cli # noqa
from ..ui import cli # noqa
from ..watchlists import cli # noqa
# load downstream cli modules

View File

@ -227,7 +227,7 @@ async def sample_and_broadcast(
# end up triggering backpressure which which will
# eventually block this producer end of the feed and
# thus other consumers still attached.
subs = bus.subscribers[sym]
subs = bus._subscribers[sym]
for ctx in subs:
# print(f'sub is {ctx.chan.uid}')
try:
@ -236,5 +236,8 @@ async def sample_and_broadcast(
trio.BrokenResourceError,
trio.ClosedResourceError
):
subs.remove(ctx)
# XXX: do we need to deregister here
# if it's done in the fee bus code?
# so far seems like no since this should all
# be single-threaded.
log.error(f'{ctx.chan.uid} dropped connection')

View File

@ -67,11 +67,19 @@ class _FeedsBus(BaseModel):
brokername: str
nursery: trio.Nursery
feeds: Dict[str, trio.CancelScope] = {}
subscribers: Dict[str, List[tractor.Context]] = {}
task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
# XXX: so weird but, apparently without this being `._` private
# pydantic will complain about private `tractor.Context` instance
# vars (namely `._portal` and `._cancel_scope`) at import time.
# Reported this bug:
# https://github.com/samuelcolvin/pydantic/issues/2816
_subscribers: Dict[str, List[tractor.Context]] = {}
class Config:
arbitrary_types_allowed = True
underscore_attrs_are_private = False
async def cancel_all(self) -> None:
for sym, (cs, msg, quote) in self.feeds.items():
@ -108,7 +116,11 @@ def get_feed_bus(
return _bus
async def _setup_persistent_brokerd(brokername: str) -> None:
@tractor.context
async def _setup_persistent_brokerd(
ctx: tractor.Context,
brokername: str
) -> None:
"""Allocate a actor-wide service nursery in ``brokerd``
such that feeds can be run in the background persistently by
the broker backend as needed.
@ -121,6 +133,9 @@ async def _setup_persistent_brokerd(brokername: str) -> None:
# background tasks from clients
bus = get_feed_bus(brokername, service_nursery)
# unblock caller
await ctx.started()
# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
await trio.sleep_forever()
@ -224,7 +239,7 @@ async def attach_feed_bus(
brokername: str,
symbol: str,
loglevel: str,
):
) -> None:
# try:
if loglevel is None:
@ -256,7 +271,7 @@ async def attach_feed_bus(
loglevel=loglevel,
)
)
bus.subscribers.setdefault(symbol, []).append(ctx)
bus._subscribers.setdefault(symbol, []).append(ctx)
else:
sub_only = True
@ -266,15 +281,17 @@ async def attach_feed_bus(
# send this even to subscribers to existing feed?
await ctx.send_yield(init_msg)
# deliver a first quote asap
await ctx.send_yield(first_quote)
if sub_only:
bus.subscribers[symbol].append(ctx)
bus._subscribers[symbol].append(ctx)
try:
await trio.sleep_forever()
finally:
bus.subscribers[symbol].remove(ctx)
bus._subscribers[symbol].remove(ctx)
@dataclass

View File

@ -17,6 +17,7 @@
"""
Financial signal processing for the peeps.
"""
from functools import partial
from typing import AsyncIterator, Callable, Tuple
import trio
@ -29,6 +30,8 @@ from .. import data
from ._momo import _rsi, _wma
from ._volume import _tina_vwap
from ..data import attach_shm_array
from ..data.feed import Feed
from ..data._sharedmem import ShmArray
log = get_logger(__name__)
@ -62,6 +65,97 @@ async def latency(
yield value
async def fsp_compute(
ctx: tractor.Context,
symbol: str,
feed: Feed,
src: ShmArray,
dst: ShmArray,
fsp_func_name: str,
func: Callable,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
# TODO: load appropriate fsp with input args
async def filter_by_sym(
sym: str,
stream,
):
# TODO: make this the actualy first quote from feed
# XXX: this allows for a single iteration to run for history
# processing without waiting on the real-time feed for a new quote
yield {}
# task cancellation won't kill the channel
with stream.shield():
async for quotes in stream:
for symbol, quotes in quotes.items():
if symbol == sym:
yield quotes
out_stream = func(
filter_by_sym(symbol, feed.stream),
feed.shm,
)
# TODO: XXX:
# THERE'S A BIG BUG HERE WITH THE `index` field since we're
# prepending a copy of the first value a few times to make
# sub-curves align with the parent bar chart.
# This likely needs to be fixed either by,
# - manually assigning the index and historical data
# seperately to the shm array (i.e. not using .push())
# - developing some system on top of the shared mem array that
# is `index` aware such that historical data can be indexed
# relative to the true first datum? Not sure if this is sane
# for incremental compuations.
dst._first.value = src._first.value
dst._last.value = src._first.value
# Conduct a single iteration of fsp with historical bars input
# and get historical output
history_output = await out_stream.__anext__()
# build a struct array which includes an 'index' field to push
# as history
history = np.array(
np.arange(len(history_output)),
dtype=dst.array.dtype
)
history[fsp_func_name] = history_output
# check for data length mis-allignment and fill missing values
diff = len(src.array) - len(history)
if diff >= 0:
print(f"WTF DIFF SIGNAL to HISTORY {diff}")
for _ in range(diff):
dst.push(history[:1])
# compare with source signal and time align
index = dst.push(history)
await ctx.send_yield(index)
# setup a respawn handle
with trio.CancelScope() as cs:
task_status.started(cs)
# rt stream
async for processed in out_stream:
log.debug(f"{fsp_func_name}: {processed}")
index = src.index
dst.array[-1][fsp_func_name] = processed
# stream latest shm array index entry
await ctx.send_yield(index)
@tractor.stream
async def cascade(
ctx: tractor.Context,
@ -85,84 +179,24 @@ async def cascade(
assert src.token == feed.shm.token
async def fsp_compute(
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
# TODO: load appropriate fsp with input args
async def filter_by_sym(
sym: str,
stream,
):
# task cancellation won't kill the channel
with stream.shield():
async for quotes in stream:
for symbol, quotes in quotes.items():
if symbol == sym:
yield quotes
out_stream = func(
filter_by_sym(symbol, feed.stream),
feed.shm,
)
# TODO: XXX:
# THERE'S A BIG BUG HERE WITH THE `index` field since we're
# prepending a copy of the first value a few times to make
# sub-curves align with the parent bar chart.
# This likely needs to be fixed either by,
# - manually assigning the index and historical data
# seperately to the shm array (i.e. not using .push())
# - developing some system on top of the shared mem array that
# is `index` aware such that historical data can be indexed
# relative to the true first datum? Not sure if this is sane
# for incremental compuations.
dst._first.value = src._first.value
dst._last.value = src._first.value
# Conduct a single iteration of fsp with historical bars input
# and get historical output
history_output = await out_stream.__anext__()
# build a struct array which includes an 'index' field to push
# as history
history = np.array(
np.arange(len(history_output)),
dtype=dst.array.dtype
)
history[fsp_func_name] = history_output
# check for data length mis-allignment and fill missing values
diff = len(src.array) - len(history)
if diff >= 0:
print(f"WTF DIFF SIGNAL to HISTORY {diff}")
for _ in range(diff):
dst.push(history[:1])
# compare with source signal and time align
index = dst.push(history)
await ctx.send_yield(index)
# setup a respawn handle
with trio.CancelScope() as cs:
task_status.started(cs)
# rt stream
async for processed in out_stream:
log.debug(f"{fsp_func_name}: {processed}")
index = src.index
dst.array[-1][fsp_func_name] = processed
# stream latest shm array index entry
await ctx.send_yield(index)
last_len = new_len = len(src.array)
fsp_target = partial(
fsp_compute,
ctx=ctx,
symbol=symbol,
feed=feed,
src=src,
dst=dst,
fsp_func_name=fsp_func_name,
func=func
)
async with trio.open_nursery() as n:
cs = await n.start(fsp_compute)
cs = await n.start(fsp_target)
# Increment the underlying shared memory buffer on every
# "increment" msg received from the underlying data feed.
@ -176,7 +210,7 @@ async def cascade(
# respawn the signal compute task if the source
# signal has been updated
cs.cancel()
cs = await n.start(fsp_compute)
cs = await n.start(fsp_target)
# TODO: adopt an incremental update engine/approach
# where possible here eventually!

View File

@ -42,7 +42,7 @@ import trio
import tractor
from outcome import Error
from .._daemon import maybe_open_pikerd
from .._daemon import maybe_open_pikerd, _tractor_kwargs
from ..log import get_logger
from ._pg_overrides import _do_overrides
@ -196,6 +196,10 @@ def run_qtractor(
'main': instance,
}
# override tractor's defaults
tractor_kwargs.update(_tractor_kwargs)
# define tractor entrypoint
async def main():

View File

@ -79,7 +79,7 @@ class DpiAwareFont:
return self._qfont
@property
def px_size(self):
def px_size(self) -> int:
return self._qfont.pixelSize()
def configure_to_dpi(self, screen: Optional[QtGui.QScreen] = None):

View File

@ -319,14 +319,10 @@ async def start_order_mode(
) -> None:
# spawn EMS actor-service
async with open_ems(
brokername,
symbol,
) as (book, trades_stream), open_order_mode(
symbol,
chart,
book,
) as order_mode:
async with (
open_ems(brokername, symbol) as (book, trades_stream),
open_order_mode(symbol, chart, book) as order_mode
):
def get_index(time: float):
@ -337,7 +333,7 @@ async def start_order_mode(
indexes = ohlc['time'] >= time
if any(indexes):
return ohlc['index'][indexes[-1]]
return ohlc['index'][indexes][-1]
else:
return ohlc['index'][-1]

View File

@ -5,41 +5,7 @@ import pyqtgraph as pg
from PyQt5 import QtCore, QtGui
class SampleLegendItem(pg.graphicsItems.LegendItem.ItemSample):
def paint(self, p, *args):
p.setRenderHint(p.Antialiasing)
if isinstance(self.item, tuple):
positive = self.item[0].opts
negative = self.item[1].opts
p.setPen(pg.mkPen(positive['pen']))
p.setBrush(pg.mkBrush(positive['brush']))
p.drawPolygon(
QtGui.QPolygonF(
[
QtCore.QPointF(0, 0),
QtCore.QPointF(18, 0),
QtCore.QPointF(18, 18),
]
)
)
p.setPen(pg.mkPen(negative['pen']))
p.setBrush(pg.mkBrush(negative['brush']))
p.drawPolygon(
QtGui.QPolygonF(
[
QtCore.QPointF(0, 0),
QtCore.QPointF(0, 18),
QtCore.QPointF(18, 18),
]
)
)
else:
opts = self.item.opts
p.setPen(pg.mkPen(opts['pen']))
p.drawRect(0, 10, 18, 0.5)
# TODO: test this out as our help menu
class CenteredTextItem(QtGui.QGraphicsTextItem):
def __init__(
self,