Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet 29ce8de462 Use new container image mentioned on IBC thread 2023-10-29 13:21:32 -04:00
Tyler Goodlet d3dab17939 order_mode: fix to avoid `Dialog.uuid` on null dialog.. 2023-10-20 13:57:52 -04:00
Tyler Goodlet cadc200818 Always ignore untracked-order error msgs from `brokerd` 2023-10-16 13:15:12 -04:00
Tyler Goodlet 363c8dfdb1 Default spec registrar set as empty addr list
Since it probably IS sane to just assume a root-actor-as-registrar
listening on the localhost as a default, AND allows NOT expecting every
caller of `open_piker_runtime()` to not have to pass an addr set XD

This makes a bucha CLI shit work again after breakage due to no
default..
2023-10-03 13:36:22 -04:00
Tyler Goodlet 00c046c280 Factor transport-ep parser/loader into helper
For now def it `.cli.load_trans_eps()` just inside the pkg mod; only
loads the ep for `pikerd` which currently acts as the main service-actor
registrar per host. Delegate to this new `.load_trans_eps()`
as-it-was-used from the `pikerd` cmd body and add fresh support for
`piker chart --maddr <addr: str>` using the routine in the body of the
`piker.cli.cli` cmd group after loading the `conf.toml::network` section
B)

Also, toss in runtime debug mode wrapping around `piker chart` using the
new `tractor.devx.maybe_open_crash_handler()` and pull the switch from
a `--pdb` flag now factored into the `.cli.cli` click group.
2023-10-03 10:00:01 -04:00
Tyler Goodlet 9165515811 ib: more detailed comments on wait-for-quote-task todo 2023-10-02 17:57:47 -04:00
Tyler Goodlet 543c11f377 ib: only normalize and log first quote if it arrives 2023-10-01 19:14:08 -04:00
Tyler Goodlet 637d33d7cc Make `.config.load_accounts()` load `brokers.toml`.. 2023-10-01 19:09:15 -04:00
Tyler Goodlet e5fdb33e31 Port cache-`dict` search to new `rapidfuzz` api 2023-10-01 17:46:46 -04:00
Tyler Goodlet 81a8cd1685 binance: always load the `brokers.toml` file since default is `conf.toml` now 2023-10-01 17:37:09 -04:00
11 changed files with 256 additions and 114 deletions

View File

@ -19,8 +19,9 @@ services:
# other image tags available: # other image tags available:
# https://github.com/waytrade/ib-gateway-docker#supported-tags # https://github.com/waytrade/ib-gateway-docker#supported-tags
# image: waytrade/ib-gateway:981.3j # image: waytrade/ib-gateway:1012.2i
image: waytrade/ib-gateway:1012.2i image: ghcr.io/gnzsnz/ib-gateway:latest
restart: 'no' # restart on boot whenev there's a crash or user clicsk restart: 'no' # restart on boot whenev there's a crash or user clicsk
network_mode: 'host' network_mode: 'host'

View File

@ -83,7 +83,10 @@ def get_config() -> dict:
conf: dict conf: dict
path: Path path: Path
conf, path = config.load(touch_if_dne=True) conf, path = config.load(
conf_name='brokers',
touch_if_dne=True,
)
section = conf.get('binance') section = conf.get('binance')

View File

@ -1000,7 +1000,9 @@ _scan_ignore: set[tuple[str, int]] = set()
def get_config() -> dict[str, Any]: def get_config() -> dict[str, Any]:
conf, path = config.load('brokers') conf, path = config.load(
conf_name='brokers',
)
section = conf.get('ib') section = conf.get('ib')
accounts = section.get('accounts') accounts = section.get('accounts')

View File

@ -25,6 +25,7 @@ from contextlib import (
from dataclasses import asdict from dataclasses import asdict
from datetime import datetime from datetime import datetime
from functools import partial from functools import partial
from pprint import pformat
from math import isnan from math import isnan
import time import time
from typing import ( from typing import (
@ -815,7 +816,10 @@ async def stream_quotes(
proxy: MethodProxy proxy: MethodProxy
mkt: MktPair mkt: MktPair
details: ibis.ContractDetails details: ibis.ContractDetails
async with open_data_client() as proxy: async with (
open_data_client() as proxy,
trio.open_nursery() as tn,
):
mkt, details = await get_mkt_info( mkt, details = await get_mkt_info(
sym, sym,
proxy=proxy, # passed to avoid implicit client load proxy=proxy, # passed to avoid implicit client load
@ -836,29 +840,49 @@ async def stream_quotes(
con: Contract = details.contract con: Contract = details.contract
first_ticker: Ticker = await proxy.get_quote(contract=con) first_ticker: Ticker = await proxy.get_quote(contract=con)
if first_ticker:
first_quote: dict = normalize(first_ticker) first_quote: dict = normalize(first_ticker)
log.info(
'Rxed init quote:\n'
f'{pformat(first_quote)}'
)
log.warning(f'FIRST QUOTE: {first_quote}') # TODO: we should instead spawn a task that waits on a feed
# to start and let it wait indefinitely..instead of this
# hard coded stuff.
# async def wait_for_first_quote():
# with trio.CancelScope() as cs:
# TODO: we should instead spawn a task that waits on a feed to start
# and let it wait indefinitely..instead of this hard coded stuff.
with trio.move_on_after(1): with trio.move_on_after(1):
first_ticker = await proxy.get_quote( first_ticker = await proxy.get_quote(
contract=con, contract=con,
raise_on_timeout=True, raise_on_timeout=True,
) )
# it might be outside regular trading hours so see if we can at # NOTE: it might be outside regular trading hours for
# least grab history. # assets with "standard venue operating hours" so we
# only "pretend the feed is live" when the dst asset
# type is NOT within the NON-NORMAL-venue set: aka not
# commodities, forex or crypto currencies which CAN
# always return a NaN on a snap quote request during
# normal venue hours. In the case of a closed venue
# (equitiies, futes, bonds etc.) we at least try to
# grab the OHLC history.
if ( if (
isnan(first_ticker.last) # last quote price value is nan isnan(first_ticker.last)
# SO, if the last quote price value is NaN we ONLY
# "pretend to do" `feed_is_live.set()` if it's a known
# dst asset venue with a lot of closed operating hours.
and mkt.dst.atype not in { and mkt.dst.atype not in {
'commodity', 'commodity',
'fiat', 'fiat',
'crypto', 'crypto',
} }
): ):
task_status.started((init_msgs, first_quote)) task_status.started((
init_msgs,
first_quote,
))
# it's not really live but this will unblock # it's not really live but this will unblock
# the brokerd feed task to tell the ui to update? # the brokerd feed task to tell the ui to update?
@ -888,8 +912,11 @@ async def stream_quotes(
# only on first entry at feed boot up # only on first entry at feed boot up
if startup: if startup:
startup = False startup: bool = False
task_status.started((init_msgs, first_quote)) task_status.started((
init_msgs,
first_quote,
))
# start a stream restarter task which monitors the # start a stream restarter task which monitors the
# data feed event. # data feed event.
@ -913,7 +940,7 @@ async def stream_quotes(
# generally speaking these feeds don't # generally speaking these feeds don't
# include vlm data. # include vlm data.
atype = mkt.dst.atype atype: str = mkt.dst.atype
log.info( log.info(
f'No-vlm {mkt.fqme}@{atype}, skipping quote poll' f'No-vlm {mkt.fqme}@{atype}, skipping quote poll'
) )
@ -949,7 +976,8 @@ async def stream_quotes(
quote = normalize(ticker) quote = normalize(ticker)
log.debug(f"First ticker received {quote}") log.debug(f"First ticker received {quote}")
# tell caller quotes are now coming in live # tell data-layer spawner-caller that live
# quotes are now streaming.
feed_is_live.set() feed_is_live.set()
# last = time.time() # last = time.time()

View File

@ -913,8 +913,17 @@ async def translate_and_relay_brokerd_events(
}: }:
if ( if (
not oid not oid
# try to lookup any order dialog by
# brokerd-side id..
and not (
oid := book._ems2brokerd_ids.inverse.get(reqid)
)
): ):
oid: str = book._ems2brokerd_ids.inverse[reqid] log.warning(
f'Rxed unusable error-msg:\n'
f'{brokerd_msg}'
)
continue
msg = BrokerdError(**brokerd_msg) msg = BrokerdError(**brokerd_msg)

View File

@ -1,18 +1,20 @@
# piker: trading gear for hackers # piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers) # Copyright (C) 2018-present Tyler Goodlet
# (in stewardship for pikers, everywhere.)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or
# it under the terms of the GNU Affero General Public License as published by # modify it under the terms of the GNU Affero General Public
# the Free Software Foundation, either version 3 of the License, or # License as published by the Free Software Foundation, either
# (at your option) any later version. # version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, # This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of # but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# GNU Affero General Public License for more details. # Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public
# along with this program. If not, see <https://www.gnu.org/licenses/>. # License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
''' '''
CLI commons. CLI commons.
@ -25,6 +27,7 @@ from types import ModuleType
import click import click
import trio import trio
import tractor import tractor
from tractor._multiaddr import parse_maddr
from ..log import ( from ..log import (
get_console_log, get_console_log,
@ -42,6 +45,50 @@ from .. import config
log = get_logger('piker.cli') log = get_logger('piker.cli')
def load_trans_eps(
network: dict | None = None,
maddrs: list[tuple] | None = None,
) -> dict[str, dict[str, dict]]:
# transport-oriented endpoint multi-addresses
eps: dict[
str, # service name, eg. `pikerd`, `emsd`..
# libp2p style multi-addresses parsed into prot layers
list[dict[str, str | int]]
] = {}
if (
network
and not maddrs
):
# load network section and (attempt to) connect all endpoints
# which are reachable B)
for key, maddrs in network.items():
match key:
# TODO: resolve table across multiple discov
# prots Bo
case 'resolv':
pass
case 'pikerd':
dname: str = key
for maddr in maddrs:
layers: dict = parse_maddr(maddr)
eps.setdefault(
dname,
[],
).append(layers)
elif maddrs:
# presume user is manually specifying the root actor ep.
eps['pikerd'] = [parse_maddr(maddr)]
return eps
@click.command() @click.command()
@click.option( @click.option(
'--loglevel', '--loglevel',
@ -76,7 +123,7 @@ log = get_logger('piker.cli')
# help='Enable local ``elasticsearch`` instance' # help='Enable local ``elasticsearch`` instance'
# ) # )
def pikerd( def pikerd(
maddr: str | None, maddr: list[str] | None,
loglevel: str, loglevel: str,
tl: bool, tl: bool,
pdb: bool, pdb: bool,
@ -100,64 +147,35 @@ def pikerd(
"\n" "\n"
)) ))
# service-actor registry endpoint socket-address # service-actor registry endpoint socket-address set
regaddrs: list[tuple[str, int]] | None = None regaddrs: list[tuple[str, int]] = []
conf, _ = config.load( conf, _ = config.load(
conf_name='conf', conf_name='conf',
) )
network: dict = conf.get('network') network: dict = conf.get('network')
if network is None: if (
network is None
and not maddr
):
regaddrs = [( regaddrs = [(
_default_registry_host, _default_registry_host,
_default_registry_port, _default_registry_port,
)] )]
from .. import service
from tractor._multiaddr import parse_maddr
# transport-oriented endpoint multi-addresses
eps: dict[
str, # service name, eg. `pikerd`, `emsd`..
# libp2p style multi-addresses parsed into prot layers
list[dict[str, str | int]]
] = {}
if (
not maddr
and network
):
# load network section and (attempt to) connect all endpoints
# which are reachable B)
for key, maddrs in network.items():
match key:
# TODO: resolve table across multiple discov
# prots Bo
case 'resolv':
pass
case 'pikerd':
dname: str = key
for maddr in maddrs:
layers: dict = parse_maddr(maddr)
eps.setdefault(
dname,
[],
).append(layers)
else: else:
# presume user is manually specifying the root actor ep. eps: dict = load_trans_eps(
eps['pikerd'] = [parse_maddr(maddr)] network,
maddr,
regaddrs: list[tuple[str, int]] = [] )
for layers in eps['pikerd']: for layers in eps['pikerd']:
regaddrs.append(( regaddrs.append((
layers['ipv4']['addr'], layers['ipv4']['addr'],
layers['tcp']['port'], layers['tcp']['port'],
)) ))
from .. import service
async def main(): async def main():
service_mngr: service.Services service_mngr: service.Services
@ -208,8 +226,24 @@ def pikerd(
@click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--tl', is_flag=True, help='Enable tractor logging') @click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--configdir', '-c', help='Configuration directory') @click.option('--configdir', '-c', help='Configuration directory')
@click.option('--maddr', '-m', default=None, help='Multiaddr to bind') @click.option(
@click.option('--raddr', '-r', default=None, help='Registrar addr to contact') '--pdb',
is_flag=True,
help='Enable runtime debug mode ',
)
@click.option(
'--maddr',
'-m',
default=None,
multiple=True,
help='Multiaddr to bind',
)
@click.option(
'--regaddr',
'-r',
default=None,
help='Registrar addr to contact',
)
@click.pass_context @click.pass_context
def cli( def cli(
ctx: click.Context, ctx: click.Context,
@ -217,10 +251,11 @@ def cli(
loglevel: str, loglevel: str,
tl: bool, tl: bool,
configdir: str, configdir: str,
pdb: bool,
# TODO: make these list[str] with multiple -m maddr0 -m maddr1 # TODO: make these list[str] with multiple -m maddr0 -m maddr1
maddr: str, maddr: list[str],
raddr: str, regaddr: str,
) -> None: ) -> None:
if configdir is not None: if configdir is not None:
@ -245,11 +280,17 @@ def cli(
# - pikerd vs. regd, separate registry daemon? # - pikerd vs. regd, separate registry daemon?
# - expose datad vs. brokerd? # - expose datad vs. brokerd?
# - bind emsd with certain perms on public iface? # - bind emsd with certain perms on public iface?
regaddrs: list[tuple[str, int]] = [( regaddrs: list[tuple[str, int]] = regaddr or [(
_default_registry_host, _default_registry_host,
_default_registry_port, _default_registry_port,
)] )]
# TODO: factor [network] section parsing out from pikerd
# above and call it here as well.
# if maddr:
# for addr in maddr:
# layers: dict = parse_maddr(addr)
ctx.obj.update({ ctx.obj.update({
'brokers': brokers, 'brokers': brokers,
'brokermods': brokermods, 'brokermods': brokermods,
@ -259,6 +300,11 @@ def cli(
'confdir': config._config_dir, 'confdir': config._config_dir,
'wl_path': config._watchlists_data_path, 'wl_path': config._watchlists_data_path,
'registry_addrs': regaddrs, 'registry_addrs': regaddrs,
'pdb': pdb, # debug mode flag
# TODO: endpoint parsing, pinging and binding
# on no existing server.
# 'maddrs': maddr,
}) })
# allow enabling same loglevel in ``tractor`` machinery # allow enabling same loglevel in ``tractor`` machinery

View File

@ -358,7 +358,9 @@ def load_accounts(
) -> bidict[str, str | None]: ) -> bidict[str, str | None]:
conf, path = load() conf, path = load(
conf_name='brokers',
)
accounts = bidict() accounts = bidict()
for provider_name, section in conf.items(): for provider_name, section in conf.items():
accounts_section = section.get('accounts') accounts_section = section.get('accounts')

View File

@ -56,7 +56,7 @@ def get_runtime_vars() -> dict[str, Any]:
@acm @acm
async def open_piker_runtime( async def open_piker_runtime(
name: str, name: str,
registry_addrs: list[tuple[str, int]], registry_addrs: list[tuple[str, int]] = [],
enable_modules: list[str] = [], enable_modules: list[str] = [],
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
@ -93,6 +93,8 @@ async def open_piker_runtime(
'piker_vars' 'piker_vars'
] = tractor_runtime_overrides ] = tractor_runtime_overrides
# NOTE: if no registrar list passed used the default of just
# setting it as the root actor on localhost.
registry_addrs = ( registry_addrs = (
registry_addrs registry_addrs
or [_default_reg_addr] or [_default_reg_addr]

View File

@ -43,7 +43,7 @@ from typing import (
Iterator, Iterator,
) )
import time import time
# from pprint import pformat from pprint import pformat
from rapidfuzz import process as fuzzy from rapidfuzz import process as fuzzy
import trio import trio
@ -1139,21 +1139,25 @@ async def search_simple_dict(
) -> dict[str, Any]: ) -> dict[str, Any]:
tokens = [] tokens: list[str] = []
for key in source: for key in source:
if not isinstance(key, str): match key:
tokens.extend(key) case str():
else:
tokens.append(key) tokens.append(key)
case []:
tokens.extend(key)
# search routine can be specified as a function such # search routine can be specified as a function such
# as in the case of the current app's local symbol cache # as in the case of the current app's local symbol cache
matches = fuzzy.extractBests( matches = fuzzy.extract(
text, text,
tokens, tokens,
score_cutoff=90, score_cutoff=90,
) )
log.info(
'cache search results:\n'
f'{pformat(matches)}'
)
return [item[0] for item in matches] return [item[0] for item in matches]

View File

@ -96,9 +96,17 @@ def monitor(config, rate, name, dhost, test, tl):
@click.option('--rate', '-r', default=1, help='Logging level') @click.option('--rate', '-r', default=1, help='Logging level')
@click.argument('symbol', required=True) @click.argument('symbol', required=True)
@click.pass_obj @click.pass_obj
def optschain(config, symbol, date, rate, test): def optschain(
"""Start an option chain UI config,
""" symbol,
date,
rate,
test,
):
'''
Start an option chain UI
'''
# global opts # global opts
loglevel = config['loglevel'] loglevel = config['loglevel']
brokername = config['broker'] brokername = config['broker']
@ -132,18 +140,19 @@ def optschain(config, symbol, date, rate, test):
default=None, default=None,
help='Enable pyqtgraph profiling' help='Enable pyqtgraph profiling'
) )
@click.option( # @click.option(
'--pdb', # '--pdb',
is_flag=True, # is_flag=True,
help='Enable tractor debug mode' # help='Enable tractor debug mode'
) # )
@click.argument('symbols', nargs=-1, required=True) @click.argument('symbols', nargs=-1, required=True)
# @click.pass_context
@click.pass_obj @click.pass_obj
def chart( def chart(
config, config,
# ctx: click.Context,
symbols: list[str], symbols: list[str],
profile, profile,
pdb: bool,
): ):
''' '''
Run chart UI app, spawning service daemons dynamically as Run chart UI app, spawning service daemons dynamically as
@ -174,6 +183,42 @@ def chart(
tractorloglevel = config['tractorloglevel'] tractorloglevel = config['tractorloglevel']
pikerloglevel = config['loglevel'] pikerloglevel = config['loglevel']
maddrs: list[tuple[str, int]] = config.get(
'maddrs',
[],
)
# if maddrs:
# from tractor._multiaddr import parse_maddr
# for addr in maddrs:
# breakpoint()
# layers: dict = parse_maddr(addr)
regaddrs: list[tuple[str, int]] = config.get(
'registry_addrs',
[],
)
from ..config import load
conf, _ = load(
conf_name='conf',
)
network: dict = conf.get('network')
if network:
from ..cli import load_trans_eps
eps: dict = load_trans_eps(
network,
maddrs,
)
for layers in eps['pikerd']:
regaddrs.append((
layers['ipv4']['addr'],
layers['tcp']['port'],
))
from tractor.devx import maybe_open_crash_handler
pdb: bool = config['pdb']
with maybe_open_crash_handler(pdb=pdb):
_main( _main(
syms=symbols, syms=symbols,
brokermods=brokermods, brokermods=brokermods,
@ -182,6 +227,6 @@ def chart(
'debug_mode': pdb, 'debug_mode': pdb,
'loglevel': tractorloglevel, 'loglevel': tractorloglevel,
'name': 'chart', 'name': 'chart',
'registry_addrs': config.get('registry_addrs'), 'registry_addrs': list(set(regaddrs)),
}, },
) )

View File

@ -613,7 +613,7 @@ class OrderMode:
oids: set[str] = set() oids: set[str] = set()
for line in lines: for line in lines:
dialog: Dialog = getattr(line, 'dialog', None) if dialog := getattr(line, 'dialog', None):
oid: str = dialog.uuid oid: str = dialog.uuid
if ( if (
dialog dialog