Compare commits

..

No commits in common. "29ce8de46266fe9f545a080673734bcf18f84f0f" and "a382f01c850d6d31ac420eac57e5bdcc5d81ddf4" have entirely different histories.

11 changed files with 115 additions and 257 deletions

View File

@ -19,9 +19,8 @@ services:
# other image tags available: # other image tags available:
# https://github.com/waytrade/ib-gateway-docker#supported-tags # https://github.com/waytrade/ib-gateway-docker#supported-tags
# image: waytrade/ib-gateway:1012.2i # image: waytrade/ib-gateway:981.3j
image: ghcr.io/gnzsnz/ib-gateway:latest image: waytrade/ib-gateway:1012.2i
restart: 'no' # restart on boot whenev there's a crash or user clicsk restart: 'no' # restart on boot whenev there's a crash or user clicsk
network_mode: 'host' network_mode: 'host'

View File

@ -83,10 +83,7 @@ def get_config() -> dict:
conf: dict conf: dict
path: Path path: Path
conf, path = config.load( conf, path = config.load(touch_if_dne=True)
conf_name='brokers',
touch_if_dne=True,
)
section = conf.get('binance') section = conf.get('binance')

View File

@ -1000,9 +1000,7 @@ _scan_ignore: set[tuple[str, int]] = set()
def get_config() -> dict[str, Any]: def get_config() -> dict[str, Any]:
conf, path = config.load( conf, path = config.load('brokers')
conf_name='brokers',
)
section = conf.get('ib') section = conf.get('ib')
accounts = section.get('accounts') accounts = section.get('accounts')

View File

@ -25,7 +25,6 @@ from contextlib import (
from dataclasses import asdict from dataclasses import asdict
from datetime import datetime from datetime import datetime
from functools import partial from functools import partial
from pprint import pformat
from math import isnan from math import isnan
import time import time
from typing import ( from typing import (
@ -816,10 +815,7 @@ async def stream_quotes(
proxy: MethodProxy proxy: MethodProxy
mkt: MktPair mkt: MktPair
details: ibis.ContractDetails details: ibis.ContractDetails
async with ( async with open_data_client() as proxy:
open_data_client() as proxy,
trio.open_nursery() as tn,
):
mkt, details = await get_mkt_info( mkt, details = await get_mkt_info(
sym, sym,
proxy=proxy, # passed to avoid implicit client load proxy=proxy, # passed to avoid implicit client load
@ -840,49 +836,29 @@ async def stream_quotes(
con: Contract = details.contract con: Contract = details.contract
first_ticker: Ticker = await proxy.get_quote(contract=con) first_ticker: Ticker = await proxy.get_quote(contract=con)
if first_ticker:
first_quote: dict = normalize(first_ticker) first_quote: dict = normalize(first_ticker)
log.info(
'Rxed init quote:\n'
f'{pformat(first_quote)}'
)
# TODO: we should instead spawn a task that waits on a feed log.warning(f'FIRST QUOTE: {first_quote}')
# to start and let it wait indefinitely..instead of this
# hard coded stuff.
# async def wait_for_first_quote():
# with trio.CancelScope() as cs:
# TODO: we should instead spawn a task that waits on a feed to start
# and let it wait indefinitely..instead of this hard coded stuff.
with trio.move_on_after(1): with trio.move_on_after(1):
first_ticker = await proxy.get_quote( first_ticker = await proxy.get_quote(
contract=con, contract=con,
raise_on_timeout=True, raise_on_timeout=True,
) )
# NOTE: it might be outside regular trading hours for # it might be outside regular trading hours so see if we can at
# assets with "standard venue operating hours" so we # least grab history.
# only "pretend the feed is live" when the dst asset
# type is NOT within the NON-NORMAL-venue set: aka not
# commodities, forex or crypto currencies which CAN
# always return a NaN on a snap quote request during
# normal venue hours. In the case of a closed venue
# (equitiies, futes, bonds etc.) we at least try to
# grab the OHLC history.
if ( if (
isnan(first_ticker.last) isnan(first_ticker.last) # last quote price value is nan
# SO, if the last quote price value is NaN we ONLY
# "pretend to do" `feed_is_live.set()` if it's a known
# dst asset venue with a lot of closed operating hours.
and mkt.dst.atype not in { and mkt.dst.atype not in {
'commodity', 'commodity',
'fiat', 'fiat',
'crypto', 'crypto',
} }
): ):
task_status.started(( task_status.started((init_msgs, first_quote))
init_msgs,
first_quote,
))
# it's not really live but this will unblock # it's not really live but this will unblock
# the brokerd feed task to tell the ui to update? # the brokerd feed task to tell the ui to update?
@ -912,11 +888,8 @@ async def stream_quotes(
# only on first entry at feed boot up # only on first entry at feed boot up
if startup: if startup:
startup: bool = False startup = False
task_status.started(( task_status.started((init_msgs, first_quote))
init_msgs,
first_quote,
))
# start a stream restarter task which monitors the # start a stream restarter task which monitors the
# data feed event. # data feed event.
@ -940,7 +913,7 @@ async def stream_quotes(
# generally speaking these feeds don't # generally speaking these feeds don't
# include vlm data. # include vlm data.
atype: str = mkt.dst.atype atype = mkt.dst.atype
log.info( log.info(
f'No-vlm {mkt.fqme}@{atype}, skipping quote poll' f'No-vlm {mkt.fqme}@{atype}, skipping quote poll'
) )
@ -976,8 +949,7 @@ async def stream_quotes(
quote = normalize(ticker) quote = normalize(ticker)
log.debug(f"First ticker received {quote}") log.debug(f"First ticker received {quote}")
# tell data-layer spawner-caller that live # tell caller quotes are now coming in live
# quotes are now streaming.
feed_is_live.set() feed_is_live.set()
# last = time.time() # last = time.time()

View File

@ -913,17 +913,8 @@ async def translate_and_relay_brokerd_events(
}: }:
if ( if (
not oid not oid
# try to lookup any order dialog by
# brokerd-side id..
and not (
oid := book._ems2brokerd_ids.inverse.get(reqid)
)
): ):
log.warning( oid: str = book._ems2brokerd_ids.inverse[reqid]
f'Rxed unusable error-msg:\n'
f'{brokerd_msg}'
)
continue
msg = BrokerdError(**brokerd_msg) msg = BrokerdError(**brokerd_msg)

View File

@ -1,20 +1,18 @@
# piker: trading gear for hackers # piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet # Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers)
# (in stewardship for pikers, everywhere.)
# This program is free software: you can redistribute it and/or # This program is free software: you can redistribute it and/or modify
# modify it under the terms of the GNU Affero General Public # it under the terms of the GNU Affero General Public License as published by
# License as published by the Free Software Foundation, either # the Free Software Foundation, either version 3 of the License, or
# version 3 of the License, or (at your option) any later version. # (at your option) any later version.
# This program is distributed in the hope that it will be useful, # This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of # but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Affero General Public License for more details. # GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public # You should have received a copy of the GNU Affero General Public License
# License along with this program. If not, see # along with this program. If not, see <https://www.gnu.org/licenses/>.
# <https://www.gnu.org/licenses/>.
''' '''
CLI commons. CLI commons.
@ -27,7 +25,6 @@ from types import ModuleType
import click import click
import trio import trio
import tractor import tractor
from tractor._multiaddr import parse_maddr
from ..log import ( from ..log import (
get_console_log, get_console_log,
@ -45,50 +42,6 @@ from .. import config
log = get_logger('piker.cli') log = get_logger('piker.cli')
def load_trans_eps(
network: dict | None = None,
maddrs: list[tuple] | None = None,
) -> dict[str, dict[str, dict]]:
# transport-oriented endpoint multi-addresses
eps: dict[
str, # service name, eg. `pikerd`, `emsd`..
# libp2p style multi-addresses parsed into prot layers
list[dict[str, str | int]]
] = {}
if (
network
and not maddrs
):
# load network section and (attempt to) connect all endpoints
# which are reachable B)
for key, maddrs in network.items():
match key:
# TODO: resolve table across multiple discov
# prots Bo
case 'resolv':
pass
case 'pikerd':
dname: str = key
for maddr in maddrs:
layers: dict = parse_maddr(maddr)
eps.setdefault(
dname,
[],
).append(layers)
elif maddrs:
# presume user is manually specifying the root actor ep.
eps['pikerd'] = [parse_maddr(maddr)]
return eps
@click.command() @click.command()
@click.option( @click.option(
'--loglevel', '--loglevel',
@ -123,7 +76,7 @@ def load_trans_eps(
# help='Enable local ``elasticsearch`` instance' # help='Enable local ``elasticsearch`` instance'
# ) # )
def pikerd( def pikerd(
maddr: list[str] | None, maddr: str | None,
loglevel: str, loglevel: str,
tl: bool, tl: bool,
pdb: bool, pdb: bool,
@ -147,35 +100,64 @@ def pikerd(
"\n" "\n"
)) ))
# service-actor registry endpoint socket-address set # service-actor registry endpoint socket-address
regaddrs: list[tuple[str, int]] = [] regaddrs: list[tuple[str, int]] | None = None
conf, _ = config.load( conf, _ = config.load(
conf_name='conf', conf_name='conf',
) )
network: dict = conf.get('network') network: dict = conf.get('network')
if ( if network is None:
network is None
and not maddr
):
regaddrs = [( regaddrs = [(
_default_registry_host, _default_registry_host,
_default_registry_port, _default_registry_port,
)] )]
from .. import service
from tractor._multiaddr import parse_maddr
# transport-oriented endpoint multi-addresses
eps: dict[
str, # service name, eg. `pikerd`, `emsd`..
# libp2p style multi-addresses parsed into prot layers
list[dict[str, str | int]]
] = {}
if (
not maddr
and network
):
# load network section and (attempt to) connect all endpoints
# which are reachable B)
for key, maddrs in network.items():
match key:
# TODO: resolve table across multiple discov
# prots Bo
case 'resolv':
pass
case 'pikerd':
dname: str = key
for maddr in maddrs:
layers: dict = parse_maddr(maddr)
eps.setdefault(
dname,
[],
).append(layers)
else: else:
eps: dict = load_trans_eps( # presume user is manually specifying the root actor ep.
network, eps['pikerd'] = [parse_maddr(maddr)]
maddr,
) regaddrs: list[tuple[str, int]] = []
for layers in eps['pikerd']: for layers in eps['pikerd']:
regaddrs.append(( regaddrs.append((
layers['ipv4']['addr'], layers['ipv4']['addr'],
layers['tcp']['port'], layers['tcp']['port'],
)) ))
from .. import service
async def main(): async def main():
service_mngr: service.Services service_mngr: service.Services
@ -226,24 +208,8 @@ def pikerd(
@click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--tl', is_flag=True, help='Enable tractor logging') @click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--configdir', '-c', help='Configuration directory') @click.option('--configdir', '-c', help='Configuration directory')
@click.option( @click.option('--maddr', '-m', default=None, help='Multiaddr to bind')
'--pdb', @click.option('--raddr', '-r', default=None, help='Registrar addr to contact')
is_flag=True,
help='Enable runtime debug mode ',
)
@click.option(
'--maddr',
'-m',
default=None,
multiple=True,
help='Multiaddr to bind',
)
@click.option(
'--regaddr',
'-r',
default=None,
help='Registrar addr to contact',
)
@click.pass_context @click.pass_context
def cli( def cli(
ctx: click.Context, ctx: click.Context,
@ -251,11 +217,10 @@ def cli(
loglevel: str, loglevel: str,
tl: bool, tl: bool,
configdir: str, configdir: str,
pdb: bool,
# TODO: make these list[str] with multiple -m maddr0 -m maddr1 # TODO: make these list[str] with multiple -m maddr0 -m maddr1
maddr: list[str], maddr: str,
regaddr: str, raddr: str,
) -> None: ) -> None:
if configdir is not None: if configdir is not None:
@ -280,17 +245,11 @@ def cli(
# - pikerd vs. regd, separate registry daemon? # - pikerd vs. regd, separate registry daemon?
# - expose datad vs. brokerd? # - expose datad vs. brokerd?
# - bind emsd with certain perms on public iface? # - bind emsd with certain perms on public iface?
regaddrs: list[tuple[str, int]] = regaddr or [( regaddrs: list[tuple[str, int]] = [(
_default_registry_host, _default_registry_host,
_default_registry_port, _default_registry_port,
)] )]
# TODO: factor [network] section parsing out from pikerd
# above and call it here as well.
# if maddr:
# for addr in maddr:
# layers: dict = parse_maddr(addr)
ctx.obj.update({ ctx.obj.update({
'brokers': brokers, 'brokers': brokers,
'brokermods': brokermods, 'brokermods': brokermods,
@ -300,11 +259,6 @@ def cli(
'confdir': config._config_dir, 'confdir': config._config_dir,
'wl_path': config._watchlists_data_path, 'wl_path': config._watchlists_data_path,
'registry_addrs': regaddrs, 'registry_addrs': regaddrs,
'pdb': pdb, # debug mode flag
# TODO: endpoint parsing, pinging and binding
# on no existing server.
# 'maddrs': maddr,
}) })
# allow enabling same loglevel in ``tractor`` machinery # allow enabling same loglevel in ``tractor`` machinery

View File

@ -358,9 +358,7 @@ def load_accounts(
) -> bidict[str, str | None]: ) -> bidict[str, str | None]:
conf, path = load( conf, path = load()
conf_name='brokers',
)
accounts = bidict() accounts = bidict()
for provider_name, section in conf.items(): for provider_name, section in conf.items():
accounts_section = section.get('accounts') accounts_section = section.get('accounts')

View File

@ -56,7 +56,7 @@ def get_runtime_vars() -> dict[str, Any]:
@acm @acm
async def open_piker_runtime( async def open_piker_runtime(
name: str, name: str,
registry_addrs: list[tuple[str, int]] = [], registry_addrs: list[tuple[str, int]],
enable_modules: list[str] = [], enable_modules: list[str] = [],
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
@ -93,8 +93,6 @@ async def open_piker_runtime(
'piker_vars' 'piker_vars'
] = tractor_runtime_overrides ] = tractor_runtime_overrides
# NOTE: if no registrar list passed used the default of just
# setting it as the root actor on localhost.
registry_addrs = ( registry_addrs = (
registry_addrs registry_addrs
or [_default_reg_addr] or [_default_reg_addr]

View File

@ -43,7 +43,7 @@ from typing import (
Iterator, Iterator,
) )
import time import time
from pprint import pformat # from pprint import pformat
from rapidfuzz import process as fuzzy from rapidfuzz import process as fuzzy
import trio import trio
@ -1139,25 +1139,21 @@ async def search_simple_dict(
) -> dict[str, Any]: ) -> dict[str, Any]:
tokens: list[str] = [] tokens = []
for key in source: for key in source:
match key: if not isinstance(key, str):
case str():
tokens.append(key)
case []:
tokens.extend(key) tokens.extend(key)
else:
tokens.append(key)
# search routine can be specified as a function such # search routine can be specified as a function such
# as in the case of the current app's local symbol cache # as in the case of the current app's local symbol cache
matches = fuzzy.extract( matches = fuzzy.extractBests(
text, text,
tokens, tokens,
score_cutoff=90, score_cutoff=90,
) )
log.info(
'cache search results:\n'
f'{pformat(matches)}'
)
return [item[0] for item in matches] return [item[0] for item in matches]

View File

@ -96,17 +96,9 @@ def monitor(config, rate, name, dhost, test, tl):
@click.option('--rate', '-r', default=1, help='Logging level') @click.option('--rate', '-r', default=1, help='Logging level')
@click.argument('symbol', required=True) @click.argument('symbol', required=True)
@click.pass_obj @click.pass_obj
def optschain( def optschain(config, symbol, date, rate, test):
config, """Start an option chain UI
symbol, """
date,
rate,
test,
):
'''
Start an option chain UI
'''
# global opts # global opts
loglevel = config['loglevel'] loglevel = config['loglevel']
brokername = config['broker'] brokername = config['broker']
@ -140,19 +132,18 @@ def optschain(
default=None, default=None,
help='Enable pyqtgraph profiling' help='Enable pyqtgraph profiling'
) )
# @click.option( @click.option(
# '--pdb', '--pdb',
# is_flag=True, is_flag=True,
# help='Enable tractor debug mode' help='Enable tractor debug mode'
# ) )
@click.argument('symbols', nargs=-1, required=True) @click.argument('symbols', nargs=-1, required=True)
# @click.pass_context
@click.pass_obj @click.pass_obj
def chart( def chart(
config, config,
# ctx: click.Context,
symbols: list[str], symbols: list[str],
profile, profile,
pdb: bool,
): ):
''' '''
Run chart UI app, spawning service daemons dynamically as Run chart UI app, spawning service daemons dynamically as
@ -183,42 +174,6 @@ def chart(
tractorloglevel = config['tractorloglevel'] tractorloglevel = config['tractorloglevel']
pikerloglevel = config['loglevel'] pikerloglevel = config['loglevel']
maddrs: list[tuple[str, int]] = config.get(
'maddrs',
[],
)
# if maddrs:
# from tractor._multiaddr import parse_maddr
# for addr in maddrs:
# breakpoint()
# layers: dict = parse_maddr(addr)
regaddrs: list[tuple[str, int]] = config.get(
'registry_addrs',
[],
)
from ..config import load
conf, _ = load(
conf_name='conf',
)
network: dict = conf.get('network')
if network:
from ..cli import load_trans_eps
eps: dict = load_trans_eps(
network,
maddrs,
)
for layers in eps['pikerd']:
regaddrs.append((
layers['ipv4']['addr'],
layers['tcp']['port'],
))
from tractor.devx import maybe_open_crash_handler
pdb: bool = config['pdb']
with maybe_open_crash_handler(pdb=pdb):
_main( _main(
syms=symbols, syms=symbols,
brokermods=brokermods, brokermods=brokermods,
@ -227,6 +182,6 @@ def chart(
'debug_mode': pdb, 'debug_mode': pdb,
'loglevel': tractorloglevel, 'loglevel': tractorloglevel,
'name': 'chart', 'name': 'chart',
'registry_addrs': list(set(regaddrs)), 'registry_addrs': config.get('registry_addrs'),
}, },
) )

View File

@ -613,7 +613,7 @@ class OrderMode:
oids: set[str] = set() oids: set[str] = set()
for line in lines: for line in lines:
if dialog := getattr(line, 'dialog', None): dialog: Dialog = getattr(line, 'dialog', None)
oid: str = dialog.uuid oid: str = dialog.uuid
if ( if (
dialog dialog