Merge pull request #414 from pikers/agg_feedz

Agg feedz
dark_clearing_improvements
goodboy 2023-01-13 12:20:47 -05:00 committed by GitHub
commit 8d1eb81f16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1314 additions and 746 deletions

View File

@ -14,6 +14,27 @@ on:
jobs:
# test that we can generate a software distribution and install it
# thus avoid missing file issues after packaging.
sdist-linux:
name: 'sdist'
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Setup python
uses: actions/setup-python@v2
with:
python-version: '3.10'
- name: Build sdist
run: python setup.py sdist --formats=zip
- name: Install sdist from .zips
run: python -m pip install dist/*.zip
testing:
name: 'install + test-suite'
runs-on: ubuntu-latest

View File

@ -62,39 +62,39 @@ services:
# - "127.0.0.1:4002:4002"
# - "127.0.0.1:5900:5900"
ib_gw_live:
image: waytrade/ib-gateway:1012.2i
restart: always
network_mode: 'host'
# ib_gw_live:
# image: waytrade/ib-gateway:1012.2i
# restart: always
# network_mode: 'host'
volumes:
- type: bind
source: ./jts_live.ini
target: /root/jts/jts.ini
# don't let ibc clobber this file for
# the main reason of not having a stupid
# timezone set..
read_only: true
# volumes:
# - type: bind
# source: ./jts_live.ini
# target: /root/jts/jts.ini
# # don't let ibc clobber this file for
# # the main reason of not having a stupid
# # timezone set..
# read_only: true
# force our own ibc config
- type: bind
source: ./ibc.ini
target: /root/ibc/config.ini
# # force our own ibc config
# - type: bind
# source: ./ibc.ini
# target: /root/ibc/config.ini
# force our noop script - socat isn't needed in host mode.
- type: bind
source: ./fork_ports_delayed.sh
target: /root/scripts/fork_ports_delayed.sh
# # force our noop script - socat isn't needed in host mode.
# - type: bind
# source: ./fork_ports_delayed.sh
# target: /root/scripts/fork_ports_delayed.sh
# force our noop script - socat isn't needed in host mode.
- type: bind
source: ./run_x11_vnc.sh
target: /root/scripts/run_x11_vnc.sh
read_only: true
# # force our noop script - socat isn't needed in host mode.
# - type: bind
# source: ./run_x11_vnc.sh
# target: /root/scripts/run_x11_vnc.sh
# read_only: true
# NOTE: to fill these out, define an `.env` file in the same dir as
# this compose file which looks something like:
environment:
TRADING_MODE: 'live'
VNC_SERVER_PASSWORD: 'doggy'
VNC_SERVER_PORT: '3004'
# # NOTE: to fill these out, define an `.env` file in the same dir as
# # this compose file which looks something like:
# environment:
# TRADING_MODE: 'live'
# VNC_SERVER_PASSWORD: 'doggy'
# VNC_SERVER_PORT: '3004'

View File

@ -35,12 +35,17 @@ log = get_logger(__name__)
_root_dname = 'pikerd'
_registry_host: str = '127.0.0.1'
_registry_port: int = 6116
_registry_addr = (
_registry_host,
_registry_port,
_default_registry_host: str = '127.0.0.1'
_default_registry_port: int = 6116
_default_reg_addr: tuple[str, int] = (
_default_registry_host,
_default_registry_port,
)
# NOTE: this value is set as an actor-global once the first endpoint
# who is capable, spawns a `pikerd` service tree.
_registry_addr: tuple[str, int] | None = None
_tractor_kwargs: dict[str, Any] = {
# use a different registry addr then tractor's default
'arbiter_addr': _registry_addr
@ -152,13 +157,20 @@ async def open_pikerd(
'''
global _services
global _registry_addr
if (
_registry_addr is None
or registry_addr
):
_registry_addr = registry_addr or _default_reg_addr
# XXX: this may open a root actor as well
async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
arbiter_addr=registry_addr or _registry_addr,
arbiter_addr=_registry_addr,
name=_root_dname,
loglevel=loglevel,
debug_mode=debug_mode,
@ -197,7 +209,7 @@ async def open_piker_runtime(
# XXX: you should pretty much never want debug mode
# for data daemons when running in production.
debug_mode: bool = False,
registry_addr: None | tuple[str, int] = _registry_addr,
registry_addr: None | tuple[str, int] = None,
) -> tractor.Actor:
'''
@ -206,13 +218,20 @@ async def open_piker_runtime(
'''
global _services
global _registry_addr
if (
_registry_addr is None
or registry_addr
):
_registry_addr = registry_addr or _default_reg_addr
# XXX: this may open a root actor as well
async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
arbiter_addr=registry_addr,
arbiter_addr=_registry_addr,
name=name,
loglevel=loglevel,
debug_mode=debug_mode,

View File

@ -26,10 +26,21 @@ asks.init('trio')
__brokers__ = [
'binance',
'questrade',
'robinhood',
'ib',
'kraken',
# broken but used to work
# 'questrade',
# 'robinhood',
# TODO: we should get on these stat!
# alpaca
# wstrade
# iex
# deribit
# kucoin
# bitso
]

View File

@ -41,10 +41,15 @@ from ._util import (
SymbolNotFound,
DataUnavailable,
)
from ..log import get_logger, get_console_log
from ..data import ShmArray
from ..log import (
get_logger,
get_console_log,
)
from ..data.types import Struct
from ..data._web_bs import open_autorecon_ws, NoBsWs
from ..data._web_bs import (
open_autorecon_ws,
NoBsWs,
)
log = get_logger(__name__)
@ -142,7 +147,9 @@ class OHLC(Struct):
# convert datetime obj timestamp to unixtime in milliseconds
def binance_timestamp(when):
def binance_timestamp(
when: datetime
) -> int:
return int((when.timestamp() * 1000) + (when.microsecond / 1000))
@ -181,7 +188,7 @@ class Client:
params = {}
if sym is not None:
sym = sym.upper()
sym = sym.lower()
params = {'symbol': sym}
resp = await self._api(
@ -238,7 +245,7 @@ class Client:
) -> dict:
if end_dt is None:
end_dt = pendulum.now('UTC')
end_dt = pendulum.now('UTC').add(minutes=1)
if start_dt is None:
start_dt = end_dt.start_of(
@ -396,8 +403,8 @@ async def open_history_client(
async def get_ohlc(
timeframe: float,
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
end_dt: datetime | None = None,
start_dt: datetime | None = None,
) -> tuple[
np.ndarray,
@ -412,27 +419,22 @@ async def open_history_client(
start_dt=start_dt,
end_dt=end_dt,
)
start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time'])
times = array['time']
if (
end_dt is None
):
inow = round(time.time())
if (inow - times[-1]) > 60:
await tractor.breakpoint()
start_dt = pendulum.from_timestamp(times[0])
end_dt = pendulum.from_timestamp(times[-1])
return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 3, 'rate': 3}
async def backfill_bars(
sym: str,
shm: ShmArray, # type: ignore # noqa
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Fill historical bars into shared mem / storage afap.
"""
with trio.CancelScope() as cs:
async with open_cached_client('binance') as client:
bars = await client.bars(symbol=sym)
shm.push(bars)
task_status.started(cs)
async def stream_quotes(
send_chan: trio.abc.SendChannel,
@ -465,7 +467,7 @@ async def stream_quotes(
si = sym_infos[sym] = syminfo.to_dict()
filters = {}
for entry in syminfo.filters:
ftype = entry.pop('filterType')
ftype = entry['filterType']
filters[ftype] = entry
# XXX: after manually inspecting the response format we

View File

@ -371,8 +371,8 @@ async def update_and_audit_msgs(
else:
entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
raise ValueError(
# log.error(
# raise ValueError(
log.error(
f'POSITION MISMATCH ib <-> piker ledger:\n'
f'ib: {ibppmsg}\n'
f'piker: {msg}\n'
@ -575,8 +575,9 @@ async def trades_dialogue(
# if new trades are detected from the API, prepare
# them for the ledger file and update the pptable.
if api_to_ledger_entries:
trade_entries = api_to_ledger_entries[acctid]
trade_entries = api_to_ledger_entries.get(acctid)
if trade_entries:
# write ledger with all new trades **AFTER**
# we've updated the `pps.toml` from the
# original ledger state! (i.e. this is
@ -883,7 +884,7 @@ async def deliver_trade_events(
# execdict.pop('acctNumber')
fill_msg = BrokerdFill(
# should match the value returned from
# NOTE: should match the value returned from
# `.submit_limit()`
reqid=execu.orderId,
time_ns=time.time_ns(), # cuz why not

View File

@ -1047,7 +1047,13 @@ async def open_symbol_search(
stock_results = []
async def stash_results(target: Awaitable[list]):
stock_results.extend(await target)
try:
results = await target
except tractor.trionics.Lagged:
print("IB SYM-SEARCH OVERRUN?!?")
return
stock_results.extend(results)
for i in range(10):
with trio.move_on_after(3) as cs:

View File

@ -27,6 +27,7 @@ from contextlib import asynccontextmanager as acm
from math import isnan
from pprint import pformat
import time
from types import ModuleType
from typing import (
AsyncIterator,
Any,
@ -48,6 +49,7 @@ from ..data._source import (
)
from ..data.feed import (
Feed,
Flume,
maybe_open_feed,
)
from ..ui._notify import notify_from_ems_status_msg
@ -380,14 +382,15 @@ class Router(Struct):
@acm
async def maybe_open_brokerd_dialog(
self,
feed: Feed,
brokermod: ModuleType,
portal: tractor.Portal,
exec_mode: str,
symbol: str,
loglevel: str,
) -> None:
brokermod = feed.mod
broker = brokermod.name
relay: TradesRelay = self.relays.get(broker)
if (
relay
@ -426,7 +429,7 @@ class Router(Struct):
else:
# open live brokerd trades endpoint
open_trades_endpoint = feed.portal.open_context(
open_trades_endpoint = portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
@ -523,18 +526,22 @@ class Router(Struct):
maybe_open_feed(
[fqsn],
loglevel=loglevel,
) as (feed, quote_stream),
) as feed,
):
brokermod = feed.mod
brokername, _, _ = unpack_fqsn(fqsn)
brokermod = feed.mods[brokername]
broker = brokermod.name
portal = feed.portals[brokermod]
# XXX: this should be initial price quote from target provider
first_quote: dict = feed.first_quotes[fqsn]
flume = feed.flumes[fqsn]
first_quote: dict = flume.first_quote
book: DarkBook = self.get_dark_book(broker)
book.lasts[fqsn]: float = first_quote['last']
async with self.maybe_open_brokerd_dialog(
feed=feed,
brokermod=brokermod,
portal=portal,
exec_mode=exec_mode,
symbol=symbol,
loglevel=loglevel,
@ -547,14 +554,16 @@ class Router(Struct):
clear_dark_triggers,
self,
relay.brokerd_stream,
quote_stream,
flume.stream,
broker,
fqsn, # form: <name>.<venue>.<suffix>.<broker>
book
)
client_ready = trio.Event()
task_status.started((relay, feed, client_ready))
task_status.started(
(relay, feed, client_ready)
)
# sync to the client side by waiting for the stream
# connection setup before relaying any existing live
@ -1014,7 +1023,7 @@ async def process_client_order_cmds(
brokerd_order_stream: tractor.MsgStream,
fqsn: str,
feed: Feed,
flume: Flume,
dark_book: DarkBook,
router: Router,
@ -1212,7 +1221,7 @@ async def process_client_order_cmds(
'size': size,
'exec_mode': exec_mode,
'action': action,
'brokers': brokers, # list
'brokers': _, # list
} if (
# "DARK" triggers
# submit order to local EMS book and scan loop,
@ -1234,13 +1243,12 @@ async def process_client_order_cmds(
# sometimes the real-time feed hasn't come up
# so just pull from the latest history.
if isnan(last):
last = feed.rt_shm.array[-1]['close']
last = flume.rt_shm.array[-1]['close']
pred = mk_check(trigger_price, last, action)
spread_slap: float = 5
sym = fqsn.replace(f'.{brokers[0]}', '')
min_tick = feed.symbols[sym].tick_size
min_tick = flume.symbol.tick_size
if action == 'buy':
tickfilter = ('ask', 'last', 'trade')
@ -1453,11 +1461,12 @@ async def _emsd_main(
# start inbound (from attached client) order request processing
# main entrypoint, run here until cancelled.
try:
flume = feed.flumes[fqsn]
await process_client_order_cmds(
client_stream,
brokerd_stream,
fqsn,
feed,
flume,
dark_book,
_router,
)

View File

@ -578,7 +578,7 @@ async def trades_dialogue(
)
# paper engine simulator clearing task
await simulate_fills(feed.stream, client)
await simulate_fills(feed.streams[broker], client)
@asynccontextmanager

View File

@ -29,14 +29,13 @@ from ..log import get_console_log, get_logger, colorize_json
from ..brokers import get_brokermod
from .._daemon import (
_tractor_kwargs,
_registry_host,
_registry_port,
_default_registry_host,
_default_registry_port,
)
from .. import config
log = get_logger('cli')
DEFAULT_BROKER = 'questrade'
@click.command()
@ -77,8 +76,8 @@ def pikerd(
reg_addr: None | tuple[str, int] = None
if host or port:
reg_addr = (
host or _registry_host,
int(port) or _registry_port,
host or _default_registry_host,
int(port) or _default_registry_port,
)
async def main():
@ -118,7 +117,7 @@ def pikerd(
@click.group(context_settings=config._context_defaults)
@click.option(
'--brokers', '-b',
default=[DEFAULT_BROKER],
default=None,
multiple=True,
help='Broker backend to use'
)
@ -144,16 +143,19 @@ def cli(
ctx.ensure_object(dict)
if len(brokers) == 1:
brokermods = [get_brokermod(brokers[0])]
else:
if not brokers:
# (try to) load all (supposedly) supported data/broker backends
from piker.brokers import __brokers__
brokers = __brokers__
brokermods = [get_brokermod(broker) for broker in brokers]
assert brokermods
reg_addr: None | tuple[str, int] = None
if host or port:
reg_addr = (
host or _registry_host,
int(port) or _registry_port,
host or _default_registry_host,
int(port) or _default_registry_port,
)
ctx.obj.update({

View File

@ -197,6 +197,9 @@ def load(
'''
path = path or get_conf_path(conf_name)
if not os.path.isdir(_config_dir):
os.mkdir(_config_dir)
if not os.path.isfile(path):
fn = _conf_fn_w_ext(conf_name)
@ -210,8 +213,8 @@ def load(
if os.path.isfile(template):
shutil.copyfile(template, path)
else:
with open(path, 'w'):
pass # touch
with open(path, 'r'):
pass # touch it
config = toml.load(path, **tomlkws)
log.debug(f"Read config file {path}")

View File

@ -22,7 +22,9 @@ financial data flows.
from __future__ import annotations
from collections import Counter
import time
from typing import TYPE_CHECKING, Optional, Union
from typing import (
TYPE_CHECKING,
)
import tractor
import trio
@ -147,7 +149,7 @@ async def increment_ohlc_buffer(
async def broadcast(
delay_s: int,
shm: Optional[ShmArray] = None,
shm: ShmArray | None = None,
) -> None:
'''
@ -241,6 +243,8 @@ async def sample_and_broadcast(
# iterate stream delivered by broker
async for quotes in quote_stream:
# print(quotes)
# TODO: ``numba`` this!
for broker_symbol, quote in quotes.items():
# TODO: in theory you can send the IPC msg *before* writing
@ -304,29 +308,29 @@ async def sample_and_broadcast(
volume,
)
# TODO: PUT THIS IN A ``_FeedsBus.broadcast()`` method!
# XXX: we need to be very cautious here that no
# context-channel is left lingering which doesn't have
# a far end receiver actor-task. In such a case you can
# end up triggering backpressure which which will
# eventually block this producer end of the feed and
# thus other consumers still attached.
sub_key: str = broker_symbol.lower()
subs: list[
tuple[
Union[tractor.MsgStream, trio.MemorySendChannel],
tractor.Context,
Optional[float], # tick throttle in Hz
tractor.MsgStream | trio.MemorySendChannel,
float | None, # tick throttle in Hz
]
] = bus._subscribers[broker_symbol.lower()]
] = bus.get_subs(sub_key)
# NOTE: by default the broker backend doesn't append
# it's own "name" into the fqsn schema (but maybe it
# should?) so we have to manually generate the correct
# key here.
bsym = f'{broker_symbol}.{brokername}'
fqsn = f'{broker_symbol}.{brokername}'
lags: int = 0
for (stream, ctx, tick_throttle) in subs:
for (stream, tick_throttle) in subs.copy():
try:
with trio.move_on_after(0.2) as cs:
if tick_throttle:
@ -334,47 +338,39 @@ async def sample_and_broadcast(
# pushes to the ``uniform_rate_send()`` below.
try:
stream.send_nowait(
(bsym, quote)
(fqsn, quote)
)
except trio.WouldBlock:
overruns[sub_key] += 1
ctx = stream._ctx
chan = ctx.chan
if ctx:
log.warning(
f'Feed overrun {bus.brokername} ->'
f'{chan.uid} !!!'
f'Feed OVERRUN {sub_key}'
'@{bus.brokername} -> \n'
f'feed @ {chan.uid}\n'
f'throttle = {tick_throttle} Hz'
)
else:
key = id(stream)
overruns[key] += 1
log.warning(
f'Feed overrun {broker_symbol}'
'@{bus.brokername} -> '
f'feed @ {tick_throttle} Hz'
)
if overruns[key] > 6:
if overruns[sub_key] > 6:
# TODO: should we check for the
# context being cancelled? this
# could happen but the
# channel-ipc-pipe is still up.
if not chan.connected():
if (
not chan.connected()
or ctx._cancel_called
):
log.warning(
'Dropping broken consumer:\n'
f'{broker_symbol}:'
f'{sub_key}:'
f'{ctx.cid}@{chan.uid}'
)
await stream.aclose()
raise trio.BrokenResourceError
else:
log.warning(
'Feed getting overrun bro!\n'
f'{broker_symbol}:'
f'{ctx.cid}@{chan.uid}'
)
continue
else:
await stream.send(
{bsym: quote}
{fqsn: quote}
)
if cs.cancelled_caught:
@ -402,13 +398,9 @@ async def sample_and_broadcast(
# so far seems like no since this should all
# be single-threaded. Doing it anyway though
# since there seems to be some kinda race..
try:
subs.remove((stream, tick_throttle))
except ValueError:
log.error(
f'Stream was already removed from subs!?\n'
f'{broker_symbol}:'
f'{ctx.cid}@{chan.uid}'
bus.remove_subs(
sub_key,
{(stream, tick_throttle)},
)

View File

@ -23,7 +23,8 @@ import decimal
from bidict import bidict
import numpy as np
from msgspec import Struct
from .types import Struct
# from numba import from_dtype
@ -217,6 +218,10 @@ class Symbol(Struct):
else:
return (key, broker)
@property
def fqsn(self) -> str:
return '.'.join(self.tokens()).lower()
def front_fqsn(self) -> str:
'''
fqsn = "fully qualified symbol name"

File diff suppressed because it is too large Load Diff

View File

@ -38,7 +38,7 @@ from math import isnan
from bidict import bidict
from msgspec.msgpack import encode, decode
import pyqtgraph as pg
# import pyqtgraph as pg
import numpy as np
import tractor
from trio_websocket import open_websocket_url
@ -429,10 +429,7 @@ class Storage:
end: Optional[int] = None,
limit: int = int(800e3),
) -> dict[
int,
Union[dict, np.ndarray],
]:
) -> np.ndarray:
client = self.client
syms = await client.list_symbols()
@ -661,7 +658,7 @@ async def tsdb_history_update(
[fqsn],
start_stream=False,
) as (feed, stream),
) as feed,
):
profiler(f'opened feed for {fqsn}')
@ -669,12 +666,13 @@ async def tsdb_history_update(
# to_prepend = None
if fqsn:
symbol = feed.symbols.get(fqsn)
flume = feed.flumes[fqsn]
symbol = flume.symbol
if symbol:
fqsn = symbol.front_fqsn()
fqsn = symbol.fqsn
# diff db history with shm and only write the missing portions
# ohlcv = feed.hist_shm.array
# ohlcv = flume.hist_shm.array
# TODO: use pg profiler
# for secs in (1, 60):

View File

@ -42,16 +42,17 @@ class Struct(
for f in self.__struct_fields__
}
def __repr__(self):
# only turn on pprint when we detect a python REPL
# at runtime B)
if (
hasattr(sys, 'ps1')
# TODO: check if we're in pdb
):
return self.pformat()
# Lul, doesn't seem to work that well..
# def __repr__(self):
# # only turn on pprint when we detect a python REPL
# # at runtime B)
# if (
# hasattr(sys, 'ps1')
# # TODO: check if we're in pdb
# ):
# return self.pformat()
return super().__repr__()
# return super().__repr__()
def pformat(self) -> str:
return f'Struct({pformat(self.to_dict())})'

View File

@ -26,7 +26,6 @@ from typing import (
)
import numpy as np
import pyqtgraph as pg
import trio
from trio_typing import TaskStatus
import tractor
@ -35,7 +34,9 @@ from tractor.msg import NamespacePath
from ..log import get_logger, get_console_log
from .. import data
from ..data import attach_shm_array
from ..data.feed import Feed
from ..data.feed import (
Flume,
)
from ..data._sharedmem import ShmArray
from ..data._sampling import _default_delay_s
from ..data._source import Symbol
@ -79,7 +80,7 @@ async def filter_quotes_by_sym(
async def fsp_compute(
symbol: Symbol,
feed: Feed,
flume: Flume,
quote_stream: trio.abc.ReceiveChannel,
src: ShmArray,
@ -107,7 +108,7 @@ async def fsp_compute(
filter_quotes_by_sym(fqsn, quote_stream),
# XXX: currently the ``ohlcv`` arg
feed.rt_shm,
flume.rt_shm,
)
# Conduct a single iteration of fsp with historical bars input
@ -310,12 +311,12 @@ async def cascade(
# needs to get throttled the ticks we generate.
# tick_throttle=60,
) as (feed, quote_stream):
symbol = feed.symbols[fqsn]
) as feed:
flume = feed.flumes[fqsn]
symbol = flume.symbol
assert src.token == flume.rt_shm.token
profiler(f'{func}: feed up')
assert src.token == feed.rt_shm.token
# last_len = new_len = len(src.array)
func_name = func.__name__
@ -327,8 +328,8 @@ async def cascade(
fsp_compute,
symbol=symbol,
feed=feed,
quote_stream=quote_stream,
flume=flume,
quote_stream=flume.stream,
# shm
src=src,
@ -430,7 +431,7 @@ async def cascade(
# Increment the underlying shared memory buffer on every
# "increment" msg received from the underlying data feed.
async with feed.index_stream(
async with flume.index_stream(
int(delay_s)
) as istream:

View File

@ -19,15 +19,16 @@ Main app startup and run.
'''
from functools import partial
from types import ModuleType
from PyQt5.QtCore import QEvent
import trio
from .._daemon import maybe_spawn_brokerd
from ..brokers import get_brokermod
from . import _event
from ._exec import run_qtractor
from ..data.feed import install_brokerd_search
from ..data._source import unpack_fqsn
from . import _search
from ._chart import GodWidget
from ..log import get_logger
@ -36,27 +37,26 @@ log = get_logger(__name__)
async def load_provider_search(
broker: str,
brokermod: str,
loglevel: str,
) -> None:
log.info(f'loading brokerd for {broker}..')
name = brokermod.name
log.info(f'loading brokerd for {name}..')
async with (
maybe_spawn_brokerd(
broker,
name,
loglevel=loglevel
) as portal,
install_brokerd_search(
portal,
get_brokermod(broker),
brokermod,
),
):
# keep search engine stream up until cancelled
await trio.sleep_forever()
@ -66,8 +66,8 @@ async def _async_main(
# implicit required argument provided by ``qtractor_run()``
main_widget: GodWidget,
sym: str,
brokernames: str,
syms: list[str],
brokers: dict[str, ModuleType],
loglevel: str,
) -> None:
@ -99,6 +99,11 @@ async def _async_main(
sbar = godwidget.window.status_bar
starting_done = sbar.open_status('starting ze sexy chartz')
needed_brokermods: dict[str, ModuleType] = {}
for fqsn in syms:
brokername, *_ = unpack_fqsn(fqsn)
needed_brokermods[brokername] = brokers[brokername]
async with (
trio.open_nursery() as root_n,
):
@ -113,12 +118,16 @@ async def _async_main(
# godwidget.hbox.addWidget(search)
godwidget.search = search
symbols: list[str] = []
for sym in syms:
symbol, _, provider = sym.rpartition('.')
symbols.append(symbol)
# this internally starts a ``display_symbol_data()`` task above
order_mode_ready = await godwidget.load_symbol(
order_mode_ready = await godwidget.load_symbols(
provider,
symbol,
symbols,
loglevel
)
@ -136,8 +145,12 @@ async def _async_main(
):
# load other providers into search **after**
# the chart's select cache
for broker in brokernames:
root_n.start_soon(load_provider_search, broker, loglevel)
for brokername, mod in needed_brokermods.items():
root_n.start_soon(
load_provider_search,
mod,
loglevel,
)
await order_mode_ready.wait()
@ -166,8 +179,8 @@ async def _async_main(
def _main(
sym: str,
brokernames: [str],
syms: list[str],
brokermods: list[ModuleType],
piker_loglevel: str,
tractor_kwargs,
) -> None:
@ -178,7 +191,11 @@ def _main(
'''
run_qtractor(
func=_async_main,
args=(sym, brokernames, piker_loglevel),
args=(
syms,
{mod.name: mod for mod in brokermods},
piker_loglevel,
),
main_widget_type=GodWidget,
tractor_kwargs=tractor_kwargs,
)

View File

@ -186,10 +186,10 @@ class GodWidget(QWidget):
) -> tuple[LinkedSplits, LinkedSplits]: # type: ignore
return self._chart_cache.get(symbol_key)
async def load_symbol(
async def load_symbols(
self,
providername: str,
symbol_key: str,
symbol_keys: list[str],
loglevel: str,
reset: bool = False,
@ -200,12 +200,20 @@ class GodWidget(QWidget):
Expects a ``numpy`` structured array containing all the ohlcv fields.
'''
fqsns: list[str] = []
# our symbol key style is always lower case
symbol_key = symbol_key.lower()
for key in list(map(str.lower, symbol_keys)):
# fully qualified symbol name (SNS i guess is what we're making?)
fqsn = '.'.join([symbol_key, providername])
all_linked = self.get_chart_symbol(fqsn)
fqsn = '.'.join([key, providername])
fqsns.append(fqsn)
# NOTE: for now we use the first symbol in the set as the "key"
# for the overlay of feeds on the chart.
group_key = fqsns[0]
all_linked = self.get_chart_symbol(group_key)
order_mode_started = trio.Event()
if not self.vbox.isEmpty():
@ -238,7 +246,7 @@ class GodWidget(QWidget):
display_symbol_data,
self,
providername,
symbol_key,
fqsns,
loglevel,
order_mode_started,
)
@ -907,6 +915,7 @@ class ChartPlotWidget(pg.PlotWidget):
def resume_all_feeds(self):
try:
for feed in self._feeds.values():
for flume in feed.flumes.values():
self.linked.godwidget._root_n.start_soon(feed.resume)
except RuntimeError:
# TODO: cancel the qtractor runtime here?
@ -914,6 +923,7 @@ class ChartPlotWidget(pg.PlotWidget):
def pause_all_feeds(self):
for feed in self._feeds.values():
for flume in feed.flumes.values():
self.linked.godwidget._root_n.start_soon(feed.pause)
@property

View File

@ -33,6 +33,7 @@ import pyqtgraph as pg
from ..data.feed import (
open_feed,
Feed,
Flume,
)
from ..data.types import Struct
from ._axes import YAxisLabel
@ -228,7 +229,7 @@ async def graphics_update_loop(
nurse: trio.Nursery,
godwidget: GodWidget,
feed: Feed,
flume: Flume,
wap_in_history: bool = False,
vlm_chart: Optional[ChartPlotWidget] = None,
@ -255,8 +256,8 @@ async def graphics_update_loop(
fast_chart = linked.chart
hist_chart = godwidget.hist_linked.chart
ohlcv = feed.rt_shm
hist_ohlcv = feed.hist_shm
ohlcv = flume.rt_shm
hist_ohlcv = flume.hist_shm
# update last price sticky
last_price_sticky = fast_chart._ysticks[fast_chart.name]
@ -347,9 +348,9 @@ async def graphics_update_loop(
'i_last_append': i_last,
'i_last': i_last,
}
_, hist_step_size_s, _ = feed.get_ds_info()
_, hist_step_size_s, _ = flume.get_ds_info()
async with feed.index_stream(
async with flume.index_stream(
# int(hist_step_size_s)
# TODO: seems this is more reliable at keeping the slow
# chart incremented in view more correctly?
@ -393,7 +394,7 @@ async def graphics_update_loop(
nurse.start_soon(increment_history_view)
# main real-time quotes update loop
stream: tractor.MsgStream = feed.stream
stream: tractor.MsgStream = flume.stream
async for quotes in stream:
ds.quotes = quotes
@ -813,13 +814,13 @@ def graphics_update_cycle(
async def link_views_with_region(
rt_chart: ChartPlotWidget,
hist_chart: ChartPlotWidget,
feed: Feed,
flume: Flume,
) -> None:
# these value are be only pulled once during shm init/startup
izero_hist = feed.izero_hist
izero_rt = feed.izero_rt
izero_hist = flume.izero_hist
izero_rt = flume.izero_rt
# Add the LinearRegionItem to the ViewBox, but tell the ViewBox
# to exclude this item when doing auto-range calculations.
@ -846,7 +847,7 @@ async def link_views_with_region(
# poll for datums load and timestep detection
for _ in range(100):
try:
_, _, ratio = feed.get_ds_info()
_, _, ratio = flume.get_ds_info()
break
except IndexError:
await trio.sleep(0.01)
@ -947,7 +948,7 @@ async def link_views_with_region(
async def display_symbol_data(
godwidget: GodWidget,
provider: str,
sym: str,
fqsns: list[str],
loglevel: str,
order_mode_started: trio.Event,
@ -961,11 +962,6 @@ async def display_symbol_data(
'''
sbar = godwidget.window.status_bar
loading_sym_key = sbar.open_status(
f'loading {sym}.{provider} ->',
group_key=True
)
# historical data fetch
# brokermod = brokers.get_brokermod(provider)
@ -974,10 +970,17 @@ async def display_symbol_data(
# clear_on_next=True,
# group_key=loading_sym_key,
# )
fqsn = '.'.join((sym, provider))
for fqsn in fqsns:
loading_sym_key = sbar.open_status(
f'loading {fqsn} ->',
group_key=True
)
feed: Feed
async with open_feed(
[fqsn],
fqsns,
loglevel=loglevel,
# limit to at least display's FPS
@ -985,11 +988,17 @@ async def display_symbol_data(
tick_throttle=_quote_throttle_rate,
) as feed:
ohlcv: ShmArray = feed.rt_shm
hist_ohlcv: ShmArray = feed.hist_shm
symbol = feed.symbols[sym]
fqsn = symbol.front_fqsn()
# TODO: right now we only show one symbol on charts, but
# overlays are coming muy pronto guey..
assert len(feed.flumes) == 1
flume = list(feed.flumes.values())[0]
ohlcv: ShmArray = flume.rt_shm
hist_ohlcv: ShmArray = flume.hist_shm
symbol = flume.symbol
fqsn = symbol.fqsn
step_size_s = 1
tf_key = tf_in_1s[step_size_s]
@ -1009,7 +1018,7 @@ async def display_symbol_data(
hist_linked._symbol = symbol
hist_chart = hist_linked.plot_ohlc_main(
symbol,
feed.hist_shm,
hist_ohlcv,
# in the case of history chart we explicitly set `False`
# to avoid internal pane creation.
# sidepane=False,
@ -1025,7 +1034,7 @@ async def display_symbol_data(
godwidget.pp_pane = pp_pane
# create main OHLC chart
chart = rt_linked.plot_ohlc_main(
ohlc_chart = rt_linked.plot_ohlc_main(
symbol,
ohlcv,
# in the case of history chart we explicitly set `False`
@ -1033,8 +1042,8 @@ async def display_symbol_data(
sidepane=pp_pane,
)
chart._feeds[symbol.key] = feed
chart.setFocus()
ohlc_chart._feeds[symbol.key] = feed
ohlc_chart.setFocus()
# XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?!
# plot historical vwap if available
@ -1044,7 +1053,7 @@ async def display_symbol_data(
# and 'bar_wap' in bars.dtype.fields
# ):
# wap_in_history = True
# chart.draw_curve(
# ohlc_chart.draw_curve(
# name='bar_wap',
# shm=ohlcv,
# color='default_light',
@ -1097,7 +1106,7 @@ async def display_symbol_data(
graphics_update_loop,
ln,
godwidget,
feed,
flume,
wap_in_history,
vlm_chart,
)
@ -1105,7 +1114,7 @@ async def display_symbol_data(
await trio.sleep(0)
# size view to data prior to order mode init
chart.default_view()
ohlc_chart.default_view()
rt_linked.graphics_cycle()
await trio.sleep(0)
@ -1119,9 +1128,9 @@ async def display_symbol_data(
godwidget.resize_all()
await link_views_with_region(
chart,
ohlc_chart,
hist_chart,
feed,
flume,
)
mode: OrderMode
@ -1135,7 +1144,7 @@ async def display_symbol_data(
):
if not vlm_chart:
# trigger another view reset if no sub-chart
chart.default_view()
ohlc_chart.default_view()
rt_linked.mode = mode

View File

@ -304,7 +304,7 @@ class PlotItemOverlay:
# NOTE: required for scene layering/relaying; this guarantees
# the "root" plot receives priority for interaction
# events/signals.
root_plotitem.vb.setZValue(1000)
root_plotitem.vb.setZValue(10)
self.overlays: list[PlotItem] = []
self.layout = ComposedGridLayout(root_plotitem)
@ -494,6 +494,8 @@ class PlotItemOverlay:
root.vb.setFocus()
assert root.vb.focusWidget()
vb.setZValue(100)
def get_axis(
self,
plot: PlotItem,

View File

@ -45,7 +45,10 @@ from ..calc import humanize, pnl, puterize
from ..clearing._allocate import Allocator
from ..pp import Position
from ..data._normalize import iterticks
from ..data.feed import Feed
from ..data.feed import (
Feed,
Flume,
)
from ..data.types import Struct
from ._label import Label
from ._lines import LevelLine, order_line
@ -64,7 +67,7 @@ _pnl_tasks: dict[str, bool] = {}
async def update_pnl_from_feed(
feed: Feed,
flume: Flume,
order_mode: OrderMode, # noqa
tracker: PositionTracker,
@ -95,7 +98,7 @@ async def update_pnl_from_feed(
# real-time update pnl on the status pane
try:
async with feed.stream.subscribe() as bstream:
async with flume.stream.subscribe() as bstream:
# last_tick = time.time()
async for quotes in bstream:
@ -390,12 +393,12 @@ class SettingsPane:
mode = self.order_mode
sym = mode.chart.linked.symbol
size = tracker.live_pp.size
feed = mode.quote_feed
flume: Feed = mode.feed.flumes[sym.fqsn]
pnl_value = 0
if size:
# last historical close price
last = feed.rt_shm.array[-1][['close']][0]
last = flume.rt_shm.array[-1][['close']][0]
pnl_value = copysign(1, size) * pnl(
tracker.live_pp.ppu,
last,
@ -408,7 +411,7 @@ class SettingsPane:
_pnl_tasks[fqsn] = True
self.order_mode.nursery.start_soon(
update_pnl_from_feed,
feed,
flume,
mode,
tracker,
)

View File

@ -665,9 +665,9 @@ class SearchWidget(QtWidgets.QWidget):
log.info(f'Requesting symbol: {symbol}.{provider}')
await godw.load_symbol(
await godw.load_symbols(
provider,
symbol,
[symbol],
'info',
)

View File

@ -166,16 +166,16 @@ def chart(
))
return
# global opts
brokernames = config['brokers']
brokermods = config['brokermods']
assert brokermods
tractorloglevel = config['tractorloglevel']
pikerloglevel = config['loglevel']
_main(
sym=symbols[0],
brokernames=brokernames,
syms=symbols,
brokermods=brokermods,
piker_loglevel=pikerloglevel,
tractor_kwargs={
'debug_mode': pdb,

View File

@ -44,7 +44,10 @@ from ..clearing._allocate import (
)
from ._style import _font
from ..data._source import Symbol
from ..data.feed import Feed
from ..data.feed import (
Feed,
Flume,
)
from ..data.types import Struct
from ..log import get_logger
from ._editors import LineEditor, ArrowEditor
@ -118,7 +121,6 @@ class OrderMode:
chart: ChartPlotWidget # type: ignore # noqa
hist_chart: ChartPlotWidget # type: ignore # noqa
nursery: trio.Nursery # used by ``ui._position`` code?
quote_feed: Feed
book: OrderBook
lines: LineEditor
arrows: ArrowEditor
@ -514,12 +516,13 @@ class OrderMode:
# XXX: seems to fail on certain types of races?
# assert len(lines) == 2
if lines:
_, _, ratio = self.feed.get_ds_info()
flume: Flume = self.feed.flumes[self.chart.linked.symbol.fqsn]
_, _, ratio = flume.get_ds_info()
for i, chart in [
(arrow_index, self.chart),
(self.feed.izero_hist
(flume.izero_hist
+
round((arrow_index - self.feed.izero_rt)/ratio),
round((arrow_index - flume.izero_rt)/ratio),
self.hist_chart)
]:
self.arrows.add(
@ -801,7 +804,6 @@ async def open_order_mode(
chart,
hist_chart,
tn,
feed,
book,
lines,
arrows,

View File

@ -1,7 +1,7 @@
# we require a pinned dev branch to get some edge features that
# are often untested in tractor's CI and/or being tested by us
# first before committing as core features in tractor's base.
-e git+https://github.com/goodboy/tractor.git@master#egg=tractor
-e git+https://github.com/goodboy/tractor.git@piker_pin#egg=tractor
# `pyqtgraph` peeps keep breaking, fixing, improving so might as well
# pin this to a dev branch that we have more control over especially

View File

@ -1,10 +1,12 @@
from contextlib import asynccontextmanager as acm
import os
import pytest
import tractor
import trio
from piker import log, config
from piker.brokers import questrade
from piker import (
# log,
config,
)
def pytest_addoption(parser):
@ -14,15 +16,6 @@ def pytest_addoption(parser):
help="Use a practice API account")
@pytest.fixture(scope='session', autouse=True)
def loglevel(request):
orig = tractor.log._default_loglevel
level = tractor.log._default_loglevel = request.config.option.loglevel
log.get_console_log(level)
yield level
tractor.log._default_loglevel = orig
@pytest.fixture(scope='session')
def test_config():
dirname = os.path.dirname
@ -37,9 +30,11 @@ def test_config():
@pytest.fixture(scope='session', autouse=True)
def confdir(request, test_config):
"""If the `--confdir` flag is not passed use the
'''
If the `--confdir` flag is not passed use the
broker config file found in that dir.
"""
'''
confdir = request.config.option.confdir
if confdir is not None:
config._override_config_dir(confdir)
@ -47,49 +42,61 @@ def confdir(request, test_config):
return confdir
@pytest.fixture(scope='session', autouse=True)
def travis(confdir):
is_travis = os.environ.get('TRAVIS', False)
if is_travis:
# this directory is cached, see .travis.yaml
conf_file = config.get_broker_conf_path()
refresh_token = os.environ['QT_REFRESH_TOKEN']
# @pytest.fixture(scope='session', autouse=True)
# def travis(confdir):
# is_travis = os.environ.get('TRAVIS', False)
# if is_travis:
# # this directory is cached, see .travis.yaml
# conf_file = config.get_broker_conf_path()
# refresh_token = os.environ['QT_REFRESH_TOKEN']
def write_with_token(token):
# XXX don't pass the dir path here since may be
# written behind the scenes in the `confdir fixture`
if not os.path.isfile(conf_file):
open(conf_file, 'w').close()
conf, path = config.load()
conf.setdefault('questrade', {}).update(
{'refresh_token': token,
'is_practice': 'True'}
)
config.write(conf, path)
# def write_with_token(token):
# # XXX don't pass the dir path here since may be
# # written behind the scenes in the `confdir fixture`
# if not os.path.isfile(conf_file):
# open(conf_file, 'w').close()
# conf, path = config.load()
# conf.setdefault('questrade', {}).update(
# {'refresh_token': token,
# 'is_practice': 'True'}
# )
# config.write(conf, path)
async def ensure_config():
# try to refresh current token using cached brokers config
# if it fails fail try using the refresh token provided by the
# env var and if that fails stop the test run here.
try:
async with questrade.get_client(ask_user=False):
pass
except (
FileNotFoundError, ValueError,
questrade.BrokerError, questrade.QuestradeError,
trio.MultiError,
):
# 3 cases:
# - config doesn't have a ``refresh_token`` k/v
# - cache dir does not exist yet
# - current token is expired; take it form env var
write_with_token(refresh_token)
# async def ensure_config():
# # try to refresh current token using cached brokers config
# # if it fails fail try using the refresh token provided by the
# # env var and if that fails stop the test run here.
# try:
# async with questrade.get_client(ask_user=False):
# pass
# except (
# FileNotFoundError, ValueError,
# questrade.BrokerError, questrade.QuestradeError,
# trio.MultiError,
# ):
# # 3 cases:
# # - config doesn't have a ``refresh_token`` k/v
# # - cache dir does not exist yet
# # - current token is expired; take it form env var
# write_with_token(refresh_token)
async with questrade.get_client(ask_user=False):
pass
# async with questrade.get_client(ask_user=False):
# pass
# XXX ``pytest_trio`` doesn't support scope or autouse
trio.run(ensure_config)
# # XXX ``pytest_trio`` doesn't support scope or autouse
# trio.run(ensure_config)
_ci_env: bool = os.environ.get('CI', False)
@pytest.fixture(scope='session')
def ci_env() -> bool:
'''
Detect CI envoirment.
'''
return _ci_env
@pytest.fixture
@ -105,3 +112,56 @@ def tmx_symbols():
@pytest.fixture
def cse_symbols():
return ['TRUL.CN', 'CWEB.CN', 'SNN.CN']
@acm
async def _open_test_pikerd(
reg_addr: tuple[str, int] | None = None,
**kwargs,
) -> tuple[
str,
int,
tractor.Portal
]:
'''
Testing helper to startup the service tree and runtime on
a different port then the default to allow testing alongside
a running stack.
'''
import random
from piker._daemon import maybe_open_pikerd
if reg_addr is None:
port = random.randint(6e3, 7e3)
reg_addr = ('127.0.0.1', port)
async with (
maybe_open_pikerd(
registry_addr=reg_addr,
**kwargs,
),
):
async with tractor.wait_for_actor(
'pikerd',
arbiter_sockaddr=reg_addr,
) as portal:
raddr = portal.channel.raddr
assert raddr == reg_addr
yield (
raddr[0],
raddr[1],
portal,
)
@pytest.fixture
def open_test_pikerd():
yield _open_test_pikerd
# TODO: teardown checks such as,
# - no leaked subprocs or shm buffers
# - all requested container service are torn down
# - certain ``tractor`` runtime state?

128
tests/test_feeds.py 100644
View File

@ -0,0 +1,128 @@
'''
Data feed layer APIs, performance, msg throttling.
'''
from collections import Counter
from pprint import pprint
from typing import AsyncContextManager
import pytest
# import tractor
import trio
from piker.data import (
ShmArray,
open_feed,
)
from piker.data._source import (
unpack_fqsn,
)
@pytest.mark.parametrize(
'fqsns',
[
# binance
(100, {'btcusdt.binance', 'ethusdt.binance'}, False),
# kraken
(20, {'ethusdt.kraken', 'xbtusd.kraken'}, True),
# binance + kraken
(100, {'btcusdt.binance', 'xbtusd.kraken'}, False),
],
ids=lambda param: f'quotes={param[0]}@fqsns={param[1]}',
)
def test_multi_fqsn_feed(
open_test_pikerd: AsyncContextManager,
fqsns: set[str],
ci_env: bool
):
'''
Start a real-time data feed for provided fqsn and pull
a few quotes then simply shut down.
'''
max_quotes, fqsns, run_in_ci = fqsns
if (
ci_env
and not run_in_ci
):
pytest.skip('Skipping CI disabled test due to feed restrictions')
brokers = set()
for fqsn in fqsns:
brokername, key, suffix = unpack_fqsn(fqsn)
brokers.add(brokername)
async def main():
async with (
open_test_pikerd(),
open_feed(
fqsns,
loglevel='info',
# TODO: ensure throttle rate is applied
# limit to at least display's FPS
# avoiding needless Qt-in-guest-mode context switches
# tick_throttle=_quote_throttle_rate,
) as feed
):
# verify shm buffers exist
for fqin in fqsns:
flume = feed.flumes[fqin]
ohlcv: ShmArray = flume.rt_shm
hist_ohlcv: ShmArray = flume.hist_shm
async with feed.open_multi_stream(brokers) as stream:
# pull the first startup quotes, one for each fqsn, and
# ensure they match each flume's startup quote value.
fqsns_copy = fqsns.copy()
with trio.fail_after(0.5):
for _ in range(1):
first_quotes = await stream.receive()
for fqsn, quote in first_quotes.items():
# XXX: TODO: WTF apparently this error will get
# supressed and only show up in the teardown
# excgroup if we don't have the fix from
# <tractorbugurl>
# assert 0
fqsns_copy.remove(fqsn)
flume = feed.flumes[fqsn]
assert quote['last'] == flume.first_quote['last']
cntr = Counter()
with trio.fail_after(6):
async for quotes in stream:
for fqsn, quote in quotes.items():
cntr[fqsn] += 1
# await tractor.breakpoint()
flume = feed.flumes[fqsn]
ohlcv: ShmArray = flume.rt_shm
hist_ohlcv: ShmArray = flume.hist_shm
# print quote msg, rt and history
# buffer values on console.
rt_row = ohlcv.array[-1]
hist_row = hist_ohlcv.array[-1]
# last = quote['last']
# assert last == rt_row['close']
# assert last == hist_row['close']
pprint(
f'{fqsn}: {quote}\n'
f'rt_ohlc: {rt_row}\n'
f'hist_ohlc: {hist_row}\n'
)
if cntr.total() >= max_quotes:
break
assert set(cntr.keys()) == fqsns
trio.run(main)