diff --git a/config/brokers.toml b/config/brokers.toml index 42df5a3e..098a940c 100644 --- a/config/brokers.toml +++ b/config/brokers.toml @@ -1,6 +1,5 @@ -################ # ---- CEXY ---- -################ + [binance] accounts.paper = 'paper' @@ -13,28 +12,41 @@ accounts.spot = 'spot' spot.use_testnet = false spot.api_key = '' spot.api_secret = '' +# ------ binance ------ [deribit] +# std assets key_id = '' key_secret = '' +# options +accounts.option = 'option' +option.use_testnet = false +option.key_id = '' +option.key_secret = '' +# aux logging from `cryptofeed` +option.log.filename = 'cryptofeed.log' +option.log.level = 'DEBUG' +option.log.disabled = true +# ------ deribit ------ [kraken] key_descr = '' api_key = '' secret = '' +# ------ kraken ------ [kucoin] key_id = '' key_secret = '' key_passphrase = '' +# ------ kucoin ------ -################ # -- BROKERZ --- -################ + [questrade] refresh_token = '' access_token = '' @@ -42,44 +54,55 @@ api_server = 'https://api06.iq.questrade.com/' expires_in = 1800 token_type = 'Bearer' expires_at = 1616095326.355846 +# ------ questrade ------ [ib] +# define the (set of) host-port socketaddrs that +# brokerd.ib will scan to connect to an API endpoint +# (ib-gw or ib-tws listening instances) hosts = [ '127.0.0.1', ] -# XXX: the order in which ports will be scanned -# (by the `brokerd` daemon-actor) -# is determined # by the line order here. -# TODO: when we eventually spawn gateways in our -# container, we can just dynamically allocate these -# using IBC. ports = [ 4002, # gw 7497, # tws ] -# XXX: for a paper account the flex web query service -# is not supported so you have to manually download -# and XML report and put it in a location that can be -# accessed by the ``brokerd.ib`` backend code for parsing. -flex_token = '' -flex_trades_query_id = '' # live account - -# when clients are being scanned this determines -# which clients are preferred to be used for data -# feeds based on the order of account names, if -# detected as active on an API client. +# When API endpoints are being scanned durin startup, the order +# of user-defined-account "names" (as defined below) here +# determines which py-client connection is given priority to be +# used for data-feed-requests by according to whichever client +# connected to an API endpoing which reported the equivalent +# account number for that name. prefer_data_account = [ 'paper', 'margin', 'ira', ] +# For long-term trades txn (transaction) history +# processing (i.e your txn ledger with IB) you can +# (automatically for live accounts) query the FLEX +# report system for past history. +# +# (For paper accounts the web query service +# is not supported so you have to manually download +# an XML report and put it in a location that can be +# accessed by our `brokerd.ib` backend code for parsing). +# +flex_token = '' +flex_trades_query_id = '' # live account + +# define "aliases" (names) for each account number +# such that the names can be reffed and logged throughout +# `piker.accounting` subsys and more easily +# referred to by the user. +# +# These keys will be the set exposed through the order-mode +# account-selection UI so that numbers are never shown. [ib.accounts] -# the order in which accounts will be selectable -# in the order mode UI (if found via clients during -# API-app scanning)when a new symbol is loaded. -paper = 'XX0000000' -margin = 'X0000000' -ira = 'X0000000' +paper = 'DU0000000' # <- literal account # +margin = 'U0000000' +ira = 'U0000000' +# ------ ib ------ diff --git a/dockering/ib/README.rst b/dockering/ib/README.rst index 3f9e01b9..ad441213 100644 --- a/dockering/ib/README.rst +++ b/dockering/ib/README.rst @@ -1,30 +1,138 @@ running ``ib`` gateway in ``docker`` ------------------------------------ -We have a config based on the (now defunct) -image from "waytrade": +We have a config based on a well maintained community +image from `@gnzsnz`: -https://github.com/waytrade/ib-gateway-docker +https://github.com/gnzsnz/ib-gateway-docker -To startup this image with our custom settings -simply run the command:: + +To startup this image simply run the command:: docker compose up -And you should have the following socket-available services: +(For further usage^ see the official `docker-compose`_ docs) -- ``x11vnc1@127.0.0.1:3003`` -- ``ib-gw@127.0.0.1:4002`` -You can attach to the container via a VNC client -without password auth. +And you should have the following socket-available services by +default: -SECURITY STUFF!?!?! -------------------- -Though "``ib``" claims they host filter connections outside -localhost (aka ``127.0.0.1``) it's probably better if you filter -the socket at the OS level using a stateless firewall rule:: +- ``x11vnc1 @ 127.0.0.1:5900`` +- ``ib-gw @ 127.0.0.1:4002`` + +You can now attach to the container via a VNC client with password-auth; +here is an example using ``vncclient`` on ``linux``:: + + vncviewer localhost:5900 + +now enter the pw (password) you set via an (see second code blob) +`.env file`_ or pw-file according to the `credentials section`_. + +If you want to change away from their default config see the example +`docker-compose.yml`-config issue and config-section of the readme, + + - https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#configuration + - https://github.com/gnzsnz/ib-gateway-docker/discussions/103 + +.. _.env file: https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#how-to-use-it +.. _docker-compose: https://docs.docker.com/compose/ +.. _credentials section: https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#credentials + + +Connecting to the API from `piker` +--------------------------------- +In order to expose the container's API endpoint to the +`brokerd/datad/ib` actor, we need to add a section to the user's +`brokers.toml` config (note the below is similar to the repo-shipped +template file), + +.. code:: toml + + [ib] + # define the (set of) host-port socketaddrs that + # brokerd.ib will scan to connect to an API endpoint + # (ib-gw or ib-tws listening instances) + hosts = [ + '127.0.0.1', + ] + ports = [ + 4002, # gw + 7497, # tws + ] + + # When API endpoints are being scanned durin startup, the order + # of user-defined-account "names" (as defined below) here + # determines which py-client connection is given priority to be + # used for data-feed-requests by according to whichever client + # connected to an API endpoing which reported the equivalent + # account number for that name. + prefer_data_account = [ + 'paper', + 'margin', + 'ira', + ] + + # define "aliases" (names) for each account number + # such that the names can be reffed and logged throughout + # `piker.accounting` subsys and more easily + # referred to by the user. + # + # These keys will be the set exposed through the order-mode + # account-selection UI so that numbers are never shown. + [ib.accounts] + paper = 'XX0000000' + margin = 'X0000000' + ira = 'X0000000' + + +the broker daemon can also connect to the container's VNC server for +added functionalies including, + +- viewing the API endpoint program's GUI for manual interventions, +- workarounds for historical data throttling using hotkey hacks, + +Add a further section to `brokers.toml` which maps each API-ep's +port to a table of VNC server connection info like, + +.. code:: toml + + [ib.vnc_addrs] + 4002 = {host = 'localhost', port = 5900, pw = 'doggy'} + +The `pw = 'doggy'` here ^ should the same value as the particular +container instances `.env` file setting (when it was run), + +.. code:: ini + + VNC_SERVER_PASSWORD='doggy' + + +IF you also want to run ``TWS`` +------------------------------- +You can also run it containerized, + +https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#using-tws + + +SECURITY stuff (advanced, only if you're paranoid) +-------------------------------------------------- +First and foremost if doing a "distributed" container setup where you +run the ``ib-gw`` docker container and your connecting API client +(likely ``ib_async`` from python) on **different hosts** be sure to +read the `security considerations`_ section! + +And for a further (somewhat paranoid) perspective from +a long-time-ago serious devops eng.. + +Though "``ib``" claims they filter remote host connections outside +``localhost`` (aka ``127.0.0.1`` on ipv4) it's prolly justified if +you'd like to filter the socket at the *OS level* using a stateless +firewall rule:: ip rule add not unicast iif lo to 0.0.0.0/0 dport 4002 -We will soon have this baked into our own custom image but for -now you'll have to do it urself dawgy. + +We will soon have this either baked into our own custom derivative +image (or patched into the current upstream one after further testin) +but for now you'll have to do it urself, diggity dawg. + +.. _security considerations: https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#security-considerations diff --git a/dockering/ib/docker-compose.yml b/dockering/ib/docker-compose.yml index 2f2db58f..54d62ca9 100644 --- a/dockering/ib/docker-compose.yml +++ b/dockering/ib/docker-compose.yml @@ -1,10 +1,15 @@ -# rework from the original @ -# https://github.com/waytrade/ib-gateway-docker/blob/master/docker-compose.yml -version: "3.5" - +# a community maintained IB API container! +# +# https://github.com/gnzsnz/ib-gateway-docker +# +# For piker we (currently) include some minor deviations +# for some config files in the `volumes` section. +# +# See full configuration settings @ +# - https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#configuration +# - https://github.com/gnzsnz/ib-gateway-docker/discussions/103 services: - ib_gw_paper: # apparently java is a mega cukc: @@ -50,16 +55,22 @@ services: target: /root/scripts/run_x11_vnc.sh read_only: true - # NOTE:to fill these out, define an `.env` file in the same dir as - # this compose file which looks something like: - # TWS_USERID='myuser' - # TWS_PASSWORD='guest' + # NOTE: an alt method to fill these out is to + # define an `.env` file in the same dir as + # this compose file. environment: TWS_USERID: ${TWS_USERID} + # TWS_USERID: 'myuser' TWS_PASSWORD: ${TWS_PASSWORD} - TRADING_MODE: 'paper' - VNC_SERVER_PASSWORD: 'doggy' - VNC_SERVER_PORT: '3003' + # TWS_PASSWORD: 'guest' + TRADING_MODE: ${TRADING_MODE} + # TRADING_MODE: 'paper' + VNC_SERVER_PASSWORD: ${VNC_SERVER_PASSWORD} + # VNC_SERVER_PASSWORD: 'doggy' + + # TODO, see if we can get this supported like it + # was on the old `waytrade` image? + # VNC_SERVER_PORT: '3003' # ports: # - target: 4002 @@ -76,6 +87,9 @@ services: # - "127.0.0.1:4002:4002" # - "127.0.0.1:5900:5900" + # TODO, a masked but working example of dual paper + live + # ib-gw instances running in a single app run! + # # ib_gw_live: # image: waytrade/ib-gateway:1012.2i # restart: no diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 6111d307..c1aa88ac 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -22,7 +22,9 @@ routines should be primitive data types where possible. """ import inspect from types import ModuleType -from typing import List, Dict, Any, Optional +from typing import ( + Any, +) import trio @@ -34,8 +36,10 @@ from ..accounting import MktPair async def api(brokername: str, methname: str, **kwargs) -> dict: - """Make (proxy through) a broker API call by name and return its result. - """ + ''' + Make (proxy through) a broker API call by name and return its result. + + ''' brokermod = get_brokermod(brokername) async with brokermod.get_client() as client: meth = getattr(client, methname, None) @@ -62,10 +66,14 @@ async def api(brokername: str, methname: str, **kwargs) -> dict: async def stocks_quote( brokermod: ModuleType, - tickers: List[str] -) -> Dict[str, Dict[str, Any]]: - """Return quotes dict for ``tickers``. - """ + tickers: list[str] + +) -> dict[str, dict[str, Any]]: + ''' + Return a `dict` of snapshot quotes for the provided input + `tickers`: a `list` of fqmes. + + ''' async with brokermod.get_client() as client: return await client.quote(tickers) @@ -74,13 +82,15 @@ async def stocks_quote( async def option_chain( brokermod: ModuleType, symbol: str, - date: Optional[str] = None, -) -> Dict[str, Dict[str, Dict[str, Any]]]: - """Return option chain for ``symbol`` for ``date``. + date: str|None = None, +) -> dict[str, dict[str, dict[str, Any]]]: + ''' + Return option chain for ``symbol`` for ``date``. By default all expiries are returned. If ``date`` is provided then contract quotes for that single expiry are returned. - """ + + ''' async with brokermod.get_client() as client: if date: id = int((await client.tickers2ids([symbol]))[symbol]) @@ -98,7 +108,7 @@ async def option_chain( # async def contracts( # brokermod: ModuleType, # symbol: str, -# ) -> Dict[str, Dict[str, Dict[str, Any]]]: +# ) -> dict[str, dict[str, dict[str, Any]]]: # """Return option contracts (all expiries) for ``symbol``. # """ # async with brokermod.get_client() as client: @@ -110,15 +120,24 @@ async def bars( brokermod: ModuleType, symbol: str, **kwargs, -) -> Dict[str, Dict[str, Dict[str, Any]]]: - """Return option contracts (all expiries) for ``symbol``. - """ +) -> dict[str, dict[str, dict[str, Any]]]: + ''' + Return option contracts (all expiries) for ``symbol``. + + ''' async with brokermod.get_client() as client: return await client.bars(symbol, **kwargs) -async def search_w_brokerd(name: str, pattern: str) -> dict: +async def search_w_brokerd( + name: str, + pattern: str, +) -> dict: + # TODO: WHY NOT WORK!?! + # when we `step` through the next block? + # import tractor + # await tractor.pause() async with open_cached_client(name) as client: # TODO: support multiple asset type concurrent searches. @@ -130,12 +149,12 @@ async def symbol_search( pattern: str, **kwargs, -) -> Dict[str, Dict[str, Dict[str, Any]]]: +) -> dict[str, dict[str, dict[str, Any]]]: ''' Return symbol info from broker. ''' - results = [] + results: list[str] = [] async def search_backend( brokermod: ModuleType @@ -143,6 +162,13 @@ async def symbol_search( brokername: str = mod.name + # TODO: figure this the FUCK OUT + # -> ok so obvi in the root actor any async task that's + # spawned outside the main tractor-root-actor task needs to + # call this.. + # await tractor.devx._debug.maybe_init_greenback() + # tractor.pause_from_sync() + async with maybe_spawn_brokerd( mod.name, infect_asyncio=getattr( @@ -162,7 +188,6 @@ async def symbol_search( )) async with trio.open_nursery() as n: - for mod in brokermods: n.start_soon(search_backend, mod.name) @@ -172,11 +197,13 @@ async def symbol_search( async def mkt_info( brokermod: ModuleType, fqme: str, + **kwargs, ) -> MktPair: ''' - Return MktPair info from broker including src and dst assets. + Return the `piker.accounting.MktPair` info struct from a given + backend broker tradable src/dst asset pair. ''' async with open_cached_client(brokermod.name) as client: diff --git a/piker/brokers/ib/_util.py b/piker/brokers/ib/_util.py index 2c71bc46..00b2d233 100644 --- a/piker/brokers/ib/_util.py +++ b/piker/brokers/ib/_util.py @@ -20,6 +20,11 @@ runnable script-programs. ''' from __future__ import annotations +from datetime import ( # noqa + datetime, + date, + tzinfo as TzInfo, +) from functools import partial from typing import ( Literal, @@ -33,7 +38,7 @@ from piker.brokers._util import get_logger if TYPE_CHECKING: from .api import Client - from ib_insync import IB + import i3ipc log = get_logger('piker.brokers.ib') @@ -48,8 +53,39 @@ _reset_tech: Literal[ ] = 'vnc' +no_setup_msg:str = ( + 'No data reset hack test setup for {vnc_sockaddr}!\n' + 'See config setup tips @\n' + 'https://github.com/pikers/piker/tree/master/piker/brokers/ib' +) + + +def try_xdo_manual( + client: Client, +): + ''' + Do the "manual" `xdo`-based screen switch + click + combo since apparently the `asyncvnc` client ain't workin.. + + Note this is only meant as a backup method for Xorg users, + ideally you can use a real vnc client and the `vnc_click_hack()` + impl! + + ''' + global _reset_tech + try: + i3ipc_xdotool_manual_click_hack() + _reset_tech = 'i3ipc_xdotool' + return True + except OSError: + vnc_sockaddr: str = client.conf.vnc_addrs + log.exception( + no_setup_msg.format(vnc_sockaddr=vnc_sockaddr) + ) + return False + + async def data_reset_hack( - # vnc_host: str, client: Client, reset_type: Literal['data', 'connection'], @@ -81,65 +117,60 @@ async def data_reset_hack( that need to be wrangle. ''' - ib_client: IB = client.ib - # look up any user defined vnc socket address mapped from # a particular API socket port. - api_port: str = str(ib_client.client.port) - vnc_host: str - vnc_port: int - vnc_sockaddr: tuple[str] | None = client.conf.get('vnc_addrs') - - no_setup_msg:str = ( - f'No data reset hack test setup for {vnc_sockaddr}!\n' - 'See config setup tips @\n' - 'https://github.com/pikers/piker/tree/master/piker/brokers/ib' - ) - - if not vnc_sockaddr: + vnc_addrs: tuple[str]|None = client.conf.get('vnc_addrs') + if not vnc_addrs: log.warning( - no_setup_msg + no_setup_msg.format(vnc_sockaddr=client.conf) + 'REQUIRES A `vnc_addrs: array` ENTRY' ) - vnc_host, vnc_port = vnc_sockaddr.get( - api_port, - ('localhost', 3003) - ) global _reset_tech - match _reset_tech: case 'vnc': try: await tractor.to_asyncio.run_task( partial( vnc_click_hack, - host=vnc_host, - port=vnc_port, + client=client, ) ) - except OSError: - if vnc_host != 'localhost': - log.warning(no_setup_msg) - return False - + except ( + OSError, # no VNC server avail.. + PermissionError, # asyncvnc pw fail.. + ): try: import i3ipc # noqa (since a deps dynamic check) except ModuleNotFoundError: - log.warning(no_setup_msg) + log.warning( + no_setup_msg.format(vnc_sockaddr=client.conf) + ) return False - try: - i3ipc_xdotool_manual_click_hack() - _reset_tech = 'i3ipc_xdotool' - return True - except OSError: - log.exception(no_setup_msg) - return False + # XXX, Xorg only workaround.. + # TODO? remove now that we have `pyvnc`? + # if vnc_host not in { + # 'localhost', + # '127.0.0.1', + # }: + # focussed, matches = i3ipc_fin_wins_titled() + # if not matches: + # log.warning( + # no_setup_msg.format(vnc_sockaddr=vnc_sockaddr) + # ) + # return False + # else: + # try_xdo_manual(vnc_sockaddr) + + # localhost but no vnc-client or it borked.. + else: + try_xdo_manual(client) case 'i3ipc_xdotool': - i3ipc_xdotool_manual_click_hack() + try_xdo_manual(client) + # i3ipc_xdotool_manual_click_hack() case _ as tech: raise RuntimeError(f'{tech} is not supported for reset tech!?') @@ -149,21 +180,66 @@ async def data_reset_hack( async def vnc_click_hack( - host: str, - port: int, - reset_type: str = 'data' + client: Client, + reset_type: str = 'data', + pw: str|None = None, + ) -> None: ''' Reset the data or network connection for the VNC attached - ib gateway using magic combos. + ib-gateway using a (magic) keybinding combo. + + A vnc-server password can be set either by an input `pw` param or + set in the client's config with the latter loaded from the user's + `brokers.toml` in a vnc-addrs-port-mapping section, + + .. code:: toml + + [ib.vnc_addrs] + 4002 = {host = 'localhost', port = 5900, pw = 'doggy'} ''' + api_port: str = str(client.ib.client.port) + conf: dict = client.conf + vnc_addrs: dict[int, tuple] = conf.get('vnc_addrs') + if not vnc_addrs: + return None + + addr_entry: dict|tuple = vnc_addrs.get( + api_port, + ('localhost', 5900) # a typical default + ) + if pw is None: + match addr_entry: + case ( + host, + port, + ): + pass + + case { + 'host': host, + 'port': port, + 'pw': pw + }: + pass + + case _: + raise ValueError( + f'Invalid `ib.vnc_addrs` entry ?\n' + f'{addr_entry!r}\n' + ) try: - import asyncvnc + from pyvnc import ( + AsyncVNCClient, + VNCConfig, + Point, + MOUSE_BUTTON_LEFT, + ) except ModuleNotFoundError: log.warning( "In order to leverage `piker`'s built-in data reset hacks, install " - "the `asyncvnc` project: https://github.com/barneygale/asyncvnc" + "the `pyvnc` project: https://github.com/regulad/pyvnc.git" ) return @@ -174,24 +250,79 @@ async def vnc_click_hack( 'connection': 'r' }[reset_type] - async with asyncvnc.connect( - host, - port=port, - - # TODO: doesn't work see: - # https://github.com/barneygale/asyncvnc/issues/7 - # password='ibcansmbz', - - ) as client: - - # move to middle of screen - # 640x1800 - client.mouse.move( - x=500, - y=500, + with tractor.devx.open_crash_handler(): + client = await AsyncVNCClient.connect( + VNCConfig( + host=host, + port=port, + password=pw, + ) ) - client.mouse.click() - client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked + async with client: + # move to middle of screen + # 640x1800 + await client.move( + Point( + 500, + 500, + ) + ) + # ensure the ib-gw window is active + await client.click(MOUSE_BUTTON_LEFT) + # send the hotkeys combo B) + await client.press('Ctrl', 'Alt', key) # keys are stacked + + +def i3ipc_fin_wins_titled( + titles: list[str] = [ + 'Interactive Brokers', # tws running in i3 + 'IB Gateway', # gw running in i3 + # 'IB', # gw running in i3 (newer version?) + + # !TODO, remote vnc instance + # -[ ] something in title (or other Con-props) that indicates + # this is explicitly for ibrk sw? + # |_[ ] !can use modden spawn eventually! + 'TigerVNC', + # 'vncviewer', # the terminal.. + ], +) -> tuple[ + i3ipc.Con, # orig focussed win + list[tuple[str, i3ipc.Con]], # matching wins by title +]: + ''' + Attempt to find a local-DE window titled with an entry in + `titles`. + + If found deliver the current focussed window and all matching + `i3ipc.Con`s in a list. + + ''' + import i3ipc + ipc = i3ipc.Connection() + + # TODO: might be worth offering some kinda api for grabbing + # the window id from the pid? + # https://stackoverflow.com/a/2250879 + tree = ipc.get_tree() + focussed: i3ipc.Con = tree.find_focused() + + matches: list[i3ipc.Con] = [] + for name in titles: + results = tree.find_titled(name) + print(f'results for {name}: {results}') + if results: + con = results[0] + matches.append(( + name, + con, + )) + + return ( + focussed, + matches, + ) + def i3ipc_xdotool_manual_click_hack() -> None: @@ -199,65 +330,46 @@ def i3ipc_xdotool_manual_click_hack() -> None: Do the data reset hack but expecting a local X-window using `xdotool`. ''' - import i3ipc - i3 = i3ipc.Connection() - - # TODO: might be worth offering some kinda api for grabbing - # the window id from the pid? - # https://stackoverflow.com/a/2250879 - t = i3.get_tree() - - orig_win_id = t.find_focused().window - - # for tws - win_names: list[str] = [ - 'Interactive Brokers', # tws running in i3 - 'IB Gateway', # gw running in i3 - # 'IB', # gw running in i3 (newer version?) - ] - + focussed, matches = i3ipc_fin_wins_titled() + orig_win_id = focussed.window try: - for name in win_names: - results = t.find_titled(name) - print(f'results for {name}: {results}') - if results: - con = results[0] - print(f'Resetting data feed for {name}') - win_id = str(con.window) - w, h = con.rect.width, con.rect.height + for name, con in matches: + print(f'Resetting data feed for {name}') + win_id = str(con.window) + w, h = con.rect.width, con.rect.height - # TODO: seems to be a few libs for python but not sure - # if they support all the sub commands we need, order of - # most recent commit history: - # https://github.com/rr-/pyxdotool - # https://github.com/ShaneHutter/pyxdotool - # https://github.com/cphyc/pyxdotool + # TODO: seems to be a few libs for python but not sure + # if they support all the sub commands we need, order of + # most recent commit history: + # https://github.com/rr-/pyxdotool + # https://github.com/ShaneHutter/pyxdotool + # https://github.com/cphyc/pyxdotool - # TODO: only run the reconnect (2nd) kc on a detected - # disconnect? - for key_combo, timeout in [ - # only required if we need a connection reset. - # ('ctrl+alt+r', 12), - # data feed reset. - ('ctrl+alt+f', 6) - ]: - subprocess.call([ - 'xdotool', - 'windowactivate', '--sync', win_id, + # TODO: only run the reconnect (2nd) kc on a detected + # disconnect? + for key_combo, timeout in [ + # only required if we need a connection reset. + # ('ctrl+alt+r', 12), + # data feed reset. + ('ctrl+alt+f', 6) + ]: + subprocess.call([ + 'xdotool', + 'windowactivate', '--sync', win_id, - # move mouse to bottom left of window (where - # there should be nothing to click). - 'mousemove_relative', '--sync', str(w-4), str(h-4), + # move mouse to bottom left of window (where + # there should be nothing to click). + 'mousemove_relative', '--sync', str(w-4), str(h-4), - # NOTE: we may need to stick a `--retry 3` in here.. - 'click', '--window', win_id, - '--repeat', '3', '1', + # NOTE: we may need to stick a `--retry 3` in here.. + 'click', '--window', win_id, + '--repeat', '3', '1', - # hackzorzes - 'key', key_combo, - ], - timeout=timeout, - ) + # hackzorzes + 'key', key_combo, + ], + timeout=timeout, + ) # re-activate and focus original window subprocess.call([ @@ -267,3 +379,99 @@ def i3ipc_xdotool_manual_click_hack() -> None: ]) except subprocess.TimeoutExpired: log.exception('xdotool timed out?') + + + +def is_current_time_in_range( + start_dt: datetime, + end_dt: datetime, +) -> bool: + ''' + Check if current time is within the datetime range. + + Use any/the-same timezone as provided by `start_dt.tzinfo` value + in the range. + + ''' + now: datetime = datetime.now(start_dt.tzinfo) + return start_dt <= now <= end_dt + + +# TODO, put this into `._util` and call it from here! +# +# NOTE, this was generated by @guille from a gpt5 prompt +# and was originally thot to be needed before learning about +# `ib_insync.contract.ContractDetails._parseSessions()` and +# it's downstream meths.. +# +# This is still likely useful to keep for now to parse the +# `.tradingHours: str` value manually if we ever decide +# to move off `ib_async` and implement our own `trio`/`anyio` +# based version Bp +# +# >attempt to parse the retarted ib "time stampy thing" they +# >do for "venue hours" with this.. written by +# >gpt5-"thinking", +# + + +def parse_trading_hours( + spec: str, + tz: TzInfo|None = None +) -> dict[ + date, + tuple[datetime, datetime] +]|None: + ''' + Parse venue hours like: + 'YYYYMMDD:HHMM-YYYYMMDD:HHMM;YYYYMMDD:CLOSED;...' + + Returns `dict[date] = (open_dt, close_dt)` or `None` if + closed. + + ''' + if ( + not isinstance(spec, str) + or + not spec + ): + raise ValueError('spec must be a non-empty string') + + out: dict[ + date, + tuple[datetime, datetime] + ]|None = {} + + for part in (p.strip() for p in spec.split(';') if p.strip()): + if part.endswith(':CLOSED'): + day_s, _ = part.split(':', 1) + d = datetime.strptime(day_s, '%Y%m%d').date() + out[d] = None + continue + + try: + start_s, end_s = part.split('-', 1) + start_dt = datetime.strptime(start_s, '%Y%m%d:%H%M') + end_dt = datetime.strptime(end_s, '%Y%m%d:%H%M') + except ValueError as exc: + raise ValueError(f'invalid segment: {part}') from exc + + if tz is not None: + start_dt = start_dt.replace(tzinfo=tz) + end_dt = end_dt.replace(tzinfo=tz) + + out[start_dt.date()] = (start_dt, end_dt) + + return out + + +# ORIG desired usage, +# +# TODO, for non-drunk tomorrow, +# - call above fn and check that `output[today] is not None` +# trading_hrs: dict = parse_trading_hours( +# details.tradingHours +# ) +# liq_hrs: dict = parse_trading_hours( +# details.liquidHours + # ) diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 74d03075..5bcc7336 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -334,15 +334,15 @@ class Client: fqme: str, # EST in ISO 8601 format is required... below is EPOCH - start_dt: datetime | str = "1970-01-01T00:00:00.000000-05:00", - end_dt: datetime | str = "", + start_dt: datetime|str = "1970-01-01T00:00:00.000000-05:00", + end_dt: datetime|str = "", # ohlc sample period in seconds sample_period_s: int = 1, # optional "duration of time" equal to the # length of the returned history frame. - duration: str | None = None, + duration: str|None = None, **kwargs, @@ -716,8 +716,8 @@ class Client: async def find_contracts( self, - pattern: str | None = None, - contract: Contract | None = None, + pattern: str|None = None, + contract: Contract|None = None, qualify: bool = True, err_on_qualify: bool = True, @@ -862,7 +862,7 @@ class Client: self, fqme: str, - ) -> datetime | None: + ) -> datetime|None: ''' Return the first datetime stamp for `fqme` or `None` on request failure. @@ -918,7 +918,7 @@ class Client: tries: int = 100, raise_on_timeout: bool = False, - ) -> Ticker | None: + ) -> Ticker|None: ''' Return a single (snap) quote for symbol. @@ -930,7 +930,7 @@ class Client: ready: ticker.TickerUpdateEvent = ticker.updateEvent # ensure a last price gets filled in before we deliver quote - timeouterr: Exception | None = None + timeouterr: Exception|None = None warnset: bool = False for _ in range(tries): @@ -944,6 +944,7 @@ class Client: ) if tkr: break + except TimeoutError as err: timeouterr = err await asyncio.sleep(0.01) @@ -952,7 +953,9 @@ class Client: else: if not warnset: log.warning( - f'Quote req timed out..maybe venue is closed?\n' + f'Quote req timed out..\n' + f'Maybe the venue is closed?\n' + f'\n' f'{asdict(contract)}' ) warnset = True @@ -964,9 +967,11 @@ class Client: ) break else: - if timeouterr and raise_on_timeout: - import pdbp - pdbp.set_trace() + if ( + timeouterr + and + raise_on_timeout + ): raise timeouterr if not warnset: @@ -1363,9 +1368,7 @@ async def load_aio_clients( async def load_clients_for_trio( - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, - + chan: tractor.to_asyncio.LinkedTaskChannel, ) -> None: ''' Pure async mngr proxy to ``load_aio_clients()``. @@ -1378,8 +1381,7 @@ async def load_clients_for_trio( disconnect_on_exit=False, ) as accts2clients: - to_trio.send_nowait(accts2clients) - + chan.started_nowait(accts2clients) # TODO: maybe a sync event to wait on instead? await asyncio.sleep(float('inf')) @@ -1505,7 +1507,7 @@ class MethodProxy: self, pattern: str, - ) -> dict[str, Any] | trio.Event: + ) -> dict[str, Any]|trio.Event: ev = self.event_table.get(pattern) @@ -1526,23 +1528,22 @@ class MethodProxy: async def open_aio_client_method_relay( - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, + chan: tractor.to_asyncio.LinkedTaskChannel, client: Client, event_consumers: dict[str, trio.Event], ) -> None: # sync with `open_client_proxy()` caller - to_trio.send_nowait(client) + chan.started_nowait(client) # TODO: separate channel for error handling? - client.inline_errors(to_trio) + client.inline_errors(chan) # relay all method requests to ``asyncio``-side client and deliver # back results - while not to_trio._closed: - msg: tuple[str, dict] | dict | None = await from_trio.get() + while not chan._to_trio._closed: # <- TODO, better check like `._web_bs`? + msg: tuple[str, dict]|dict|None = await chan.get() match msg: case None: # termination sentinel log.info('asyncio `Client` method-proxy SHUTDOWN!') @@ -1555,7 +1556,7 @@ async def open_aio_client_method_relay( try: resp = await meth(**kwargs) # echo the msg back - to_trio.send_nowait({'result': resp}) + chan.send_nowait({'result': resp}) except ( RequestError, @@ -1563,10 +1564,10 @@ async def open_aio_client_method_relay( # TODO: relay all errors to trio? # BaseException, ) as err: - to_trio.send_nowait({'exception': err}) + chan.send_nowait({'exception': err}) case {'error': content}: - to_trio.send_nowait({'exception': content}) + chan.send_nowait({'exception': content}) case _: raise ValueError(f'Unhandled msg {msg}') diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index b78f2880..5065d678 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -117,7 +117,11 @@ def pack_position( symbol=fqme, currency=con.currency, size=float(pos.position), - avg_price=float(pos.avgCost) / float(con.multiplier or 1.0), + avg_price=( + float(pos.avgCost) + / + float(con.multiplier or 1.0) + ), ), ) @@ -547,7 +551,10 @@ async def open_trade_dialog( ), # TODO: do this as part of `open_account()`!? - open_symcache('ib', only_from_memcache=True) as symcache, + open_symcache( + 'ib', + only_from_memcache=True, + ) as symcache, ): # Open a trade ledgers stack for appending trade records over # multiple accounts. @@ -555,8 +562,10 @@ async def open_trade_dialog( ledgers: dict[str, TransactionLedger] = {} tables: dict[str, Account] = {} order_msgs: list[Status] = [] - conf = get_config() - accounts_def_inv: bidict[str, str] = bidict(conf['accounts']).inverse + conf: dict = get_config() + accounts_def_inv: bidict[str, str] = bidict( + conf['accounts'] + ).inverse with ( ExitStack() as lstack, @@ -706,7 +715,11 @@ async def open_trade_dialog( # client-account and build out position msgs to deliver to # EMS. for acctid, acnt in tables.items(): - active_pps, closed_pps = acnt.dump_active() + active_pps: dict[str, Position] + ( + active_pps, + closed_pps, + ) = acnt.dump_active() for pps in [active_pps, closed_pps]: piker_pps: list[Position] = list(pps.values()) @@ -722,6 +735,7 @@ async def open_trade_dialog( ) if ibpos: bs_mktid: str = str(ibpos.contract.conId) + msg = await update_and_audit_pos_msg( acctid, pikerpos, @@ -1243,32 +1257,47 @@ async def deliver_trade_events( # never relay errors for non-broker related issues # https://interactivebrokers.github.io/tws-api/message_codes.html code: int = err['error_code'] - if code in { - 200, # uhh + reason: str = err['reason'] + reqid: str = str(err['reqid']) + + # "Warning:" msg codes, + # https://interactivebrokers.github.io/tws-api/message_codes.html#warning_codes + # - 2109: 'Outside Regular Trading Hours' + if 'Warning:' in reason: + log.warning( + f'Order-API-warning: {code!r}\n' + f'reqid: {reqid!r}\n' + f'\n' + f'{pformat(err)}\n' + # ^TODO? should we just print the `reason` + # not the full `err`-dict? + ) + continue + + # XXX known special (ignore) cases + elif code in { + 200, # uhh.. ni idea # hist pacing / connectivity 162, 165, - # WARNING codes: - # https://interactivebrokers.github.io/tws-api/message_codes.html#warning_codes - # Attribute 'Outside Regular Trading Hours' is - # " 'ignored based on the order type and - # destination. PlaceOrder is now ' 'being - # processed.', - 2109, - # XXX: lol this isn't even documented.. # 'No market data during competing live session' 1669, }: + log.error( + f'Order-API-error which is non-cancel-causing ?!\n' + f'\n' + f'{pformat(err)}\n' + ) continue - reqid: str = str(err['reqid']) - reason: str = err['reason'] - if err['reqid'] == -1: - log.error(f'TWS external order error:\n{pformat(err)}') + log.error( + f'TWS external order error ??\n' + f'{pformat(err)}\n' + ) flow: dict = dict( flows.get(reqid) diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 062b2c2e..51305ced 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) Tyler Goodlet (in stewardship for pikers) +# Copyright (C) 2018-forever Tyler Goodlet (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -13,10 +13,12 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" -Data feed endpoints pre-wrapped and ready for use with ``tractor``/``trio``. -""" +''' +Data feed endpoints pre-wrapped and ready for use with `tractor`/`trio` +via "infected-asyncio-mode". + +''' from __future__ import annotations import asyncio from contextlib import ( @@ -26,7 +28,6 @@ from dataclasses import asdict from datetime import datetime from functools import partial from pprint import pformat -from math import isnan import time from typing import ( Any, @@ -40,7 +41,6 @@ import numpy as np from pendulum import ( now, from_timestamp, - # DateTime, Duration, duration as mk_duration, ) @@ -69,7 +69,10 @@ from .api import ( Contract, RequestError, ) -from ._util import data_reset_hack +from ._util import ( + data_reset_hack, + is_current_time_in_range, +) from .symbols import get_mkt_info if TYPE_CHECKING: @@ -184,7 +187,8 @@ async def open_history_client( if ( start_dt - and start_dt.timestamp() == 0 + and + start_dt.timestamp() == 0 ): await tractor.pause() @@ -203,14 +207,16 @@ async def open_history_client( ): count += 1 mean += latency / count - print( + log.debug( f'HISTORY FRAME QUERY LATENCY: {latency}\n' f'mean: {mean}' ) # could be trying to retreive bars over weekend if out is None: - log.error(f"Can't grab bars starting at {end_dt}!?!?") + log.error( + f"No bars starting at {end_dt!r} !?!?" + ) if ( end_dt and head_dt @@ -285,8 +291,9 @@ _pacing: str = ( async def wait_on_data_reset( proxy: MethodProxy, + reset_type: str = 'data', - timeout: float = 16, # float('inf'), + timeout: float = 16, task_status: TaskStatus[ tuple[ @@ -295,29 +302,47 @@ async def wait_on_data_reset( ] ] = trio.TASK_STATUS_IGNORED, ) -> bool: + ''' + Wait on a (global-ish) "data-farm" event to be emitted + by the IB api server. - # TODO: we might have to put a task lock around this - # method.. - hist_ev = proxy.status_event( + Allows syncing to reconnect event-messages emitted on the API + console, such as: + + - 'HMDS data farm connection is OK:ushmds' + - 'Market data farm is connecting:usfuture' + - 'Market data farm connection is OK:usfuture' + + Deliver a `(cs, done: Event)` pair to the caller to support it + waiting or cancelling the associated "data-reset-request"; + normally a manual data-reset-req is expected to be the cause and + thus trigger such events (such as our click-hack-magic from + `.ib._util`). + + ''' + # ?TODO, do we need a task-lock around this method? + # + # register for an API "status event" wrapped for `trio`-sync. + hist_ev: trio.Event = proxy.status_event( 'HMDS data farm connection is OK:ushmds' ) - - # TODO: other event messages we might want to try and - # wait for but i wasn't able to get any of this - # reliable.. + # + # ^TODO: other event-messages we might want to support waiting-for + # but i wasn't able to get reliable.. + # # reconnect_start = proxy.status_event( # 'Market data farm is connecting:usfuture' # ) # live_ev = proxy.status_event( # 'Market data farm connection is OK:usfuture' # ) + # try to wait on the reset event(s) to arrive, a timeout # will trigger a retry up to 6 times (for now). client: Client = proxy._aio_ns done = trio.Event() with trio.move_on_after(timeout) as cs: - task_status.started((cs, done)) log.warning( @@ -396,8 +421,9 @@ async def get_bars( bool, # timed out hint ]: ''' - Retrieve historical data from a ``trio``-side task using - a ``MethoProxy``. + Request-n-retrieve historical data frames from a `trio.Task` + using a `MethoProxy` to query the `asyncio`-side's + `.ib.api.Client` methods. ''' global _data_resetter_task, _failed_resets @@ -607,7 +633,10 @@ async def get_bars( # such that simultaneous symbol queries don't try data resettingn # too fast.. unset_resetter: bool = False - async with trio.open_nursery() as tn: + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery() as tn + ): # start history request that we allow # to run indefinitely until a result is acquired @@ -653,14 +682,12 @@ async def get_bars( ) +# per-actor cache of inter-eventloop-chans _quote_streams: dict[str, trio.abc.ReceiveStream] = {} async def _setup_quote_stream( - - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, - + chan: tractor.to_asyncio.LinkedTaskChannel, symbol: str, opts: tuple[int] = ( '375', # RT trade volume (excludes utrades) @@ -678,10 +705,13 @@ async def _setup_quote_stream( ) -> trio.abc.ReceiveChannel: ''' - Stream a ticker using the std L1 api. + Stream L1 quotes via the `Ticker.updateEvent.connect(push)` + callback API by registering a `push` callback which simply + `chan.send_nowait()`s quote msgs back to the calling + parent-`trio.Task`-side. - This task is ``asyncio``-side and must be called from - ``tractor.to_asyncio.open_channel_from()``. + NOTE, that this task-fn is run on the `asyncio.Task`-side ONLY + and is thus run via `tractor.to_asyncio.open_channel_from()`. ''' global _quote_streams @@ -689,39 +719,79 @@ async def _setup_quote_stream( async with load_aio_clients( disconnect_on_exit=False, ) as accts2clients: - caccount_name, client = get_preferred_data_client(accts2clients) - contract = contract or (await client.find_contract(symbol)) - to_trio.send_nowait(contract) # cuz why not - ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) - # NOTE: it's batch-wise and slow af but I guess could - # be good for backchecking? Seems to be every 5s maybe? + # XXX since this is an `asyncio.Task`, we must use + # tractor.pause_from_sync() + + caccount_name, client = get_preferred_data_client(accts2clients) + contract = ( + contract + or + (await client.find_contract(symbol)) + ) + chan.started_nowait(contract) # cuz why not + ticker: Ticker = client.ib.reqMktData( + contract, + ','.join(opts), + ) + maybe_exc: BaseException|None = None + handler_tries: int = 0 + aio_task: asyncio.Task = asyncio.current_task() + + # ?TODO? this API is batch-wise and quite slow-af but, + # - seems to be 5s updates? + # - maybe we could use it for backchecking? + # # ticker: Ticker = client.ib.reqTickByTickData( # contract, 'Last', # ) - # # define a simple queue push routine that streams quote packets - # # to trio over the ``to_trio`` memory channel. - # to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore + # define a very naive queue-pushing callback that relays + # quote-packets directly the calling (parent) `trio.Task`. + # Ensure on teardown we cancel the feed via their cancel API. + # def teardown(): + ''' + Disconnect our `push`-er callback and cancel the data-feed + for `contract`. + + ''' + nonlocal maybe_exc ticker.updateEvent.disconnect(push) - log.error( - f'Disconnected stream for `{symbol}`' - ) + report: str = f'Disconnected mkt-data for {symbol!r} due to ' + if maybe_exc is not None: + report += ( + 'error,\n' + f'{maybe_exc!r}\n' + ) + log.error(report) + else: + report += ( + 'cancellation.\n' + ) + log.cancel(report) + client.ib.cancelMktData(contract) # decouple broadcast mem chan _quote_streams.pop(symbol, None) - def push(t: Ticker) -> None: - """ - Push quotes to trio task. + def push( + t: Ticker, + tries_before_raise: int = 6, + ) -> None: + ''' + Push quotes verbatim to parent-side `trio.Task`. - """ - # log.debug(t) + ''' + nonlocal maybe_exc, handler_tries + # log.debug(f'new IB quote: {t}\n') try: - to_trio.send_nowait(t) + chan.send_nowait(t) + # XXX TODO XXX replicate in `tractor` tests + # as per `CancelledError`-handler notes below! + # assert 0 except ( trio.BrokenResourceError, @@ -736,38 +806,107 @@ async def _setup_quote_stream( # resulting in tracebacks spammed to console.. # Manually do the dereg ourselves. teardown() - except trio.WouldBlock: - # log.warning( - # f'channel is blocking symbol feed for {symbol}?' - # f'\n{to_trio.statistics}' - # ) - pass - # except trio.WouldBlock: - # # for slow debugging purposes to avoid clobbering prompt - # # with log msgs - # pass + # for slow debugging purposes to avoid clobbering prompt + # with log msgs + except trio.WouldBlock: + log.exception( + f'Asyncio->Trio `chan.send_nowait()` blocked !?\n' + f'\n' + f'{chan._to_trio.statistics()}\n' + ) + + # ?TODO, handle re-connection attempts? + except BaseException as _berr: + berr = _berr + if handler_tries >= tries_before_raise: + # breakpoint() + maybe_exc = _berr + # task.set_exception(berr) + aio_task.cancel(msg=berr.args) + raise berr + else: + handler_tries += 1 + + log.exception( + f'Failed to push ticker quote !?\n' + f'handler_tries={handler_tries!r}\n' + f'ticker: {t!r}\n' + f'\n' + f'{chan._to_trio.statistics()}\n' + f'\n' + f'CAUSE: {berr}\n' + ) + ticker.updateEvent.connect(push) try: await asyncio.sleep(float('inf')) - finally: - teardown() - # return from_aio + # XXX, for debug.. TODO? can we rm again? + # + # tractor.pause_from_sync() + # while True: + # await asyncio.sleep(1.6) + # if ticker.ticks: + # log.debug( + # f'ticker.ticks = \n' + # f'{ticker.ticks}\n' + # ) + # else: + # log.warning( + # 'UHH no ticker.ticks ??' + # ) + + # XXX TODO XXX !?!? + # apparently **without this handler** and the subsequent + # re-raising of `maybe_exc from _taskc` cancelling the + # `aio_task` from the `push()`-callback will cause a very + # strange chain of exc raising that breaks alll sorts of + # downstream callers, tasks and remote-actor tasks!? + # + # -[ ] we need some lowlevel reproducting tests to replicate + # those worst-case scenarios in `tractor` core!! + # -[ ] likely we should factor-out the `tractor.to_asyncio` + # attempts at workarounds in `.translate_aio_errors()` + # for failed `asyncio.Task.set_exception()` to either + # call `aio_task.cancel()` and/or + # `aio_task._fut_waiter.set_exception()` to a re-useable + # toolset in something like a `.to_asyncio._utils`?? + # + except asyncio.CancelledError as _taskc: + if maybe_exc is not None: + raise maybe_exc from _taskc + + raise _taskc + + except BaseException as _berr: + # stash any crash cause for reporting in `teardown()` + maybe_exc = _berr + raise _berr + + finally: + # always disconnect our `push()` and cancel the + # ib-"mkt-data-feed". + teardown() @acm async def open_aio_quote_stream( - symbol: str, - contract: Contract | None = None, + contract: Contract|None = None, ) -> ( trio.abc.Channel| # iface tractor.to_asyncio.LinkedTaskChannel # actually ): + ''' + Open a real-time `Ticker` quote stream from an `asyncio.Task` + spawned via `tractor.to_asyncio.open_channel_from()`, deliver the + inter-event-loop channel to the `trio.Task` caller and cache it + globally for re-use. + ''' from tractor.trionics import broadcast_receiver global _quote_streams @@ -793,6 +932,10 @@ async def open_aio_quote_stream( assert contract + # TODO? de-reg on teardown of last consumer task? + # -> why aren't we using `.trionics.maybe_open_context()` + # here again?? (we are in `open_client_proxies()` tho?) + # # cache feed for later consumers _quote_streams[symbol] = from_aio @@ -807,7 +950,12 @@ def normalize( calc_price: bool = False ) -> dict: + ''' + Translate `ib_async`'s `Ticker.ticks` values to a `piker` + normalized `dict` form for transmit to downstream `.data` layer + consumers. + ''' # check for special contract types con = ticker.contract fqme, calc_price = con2fqme(con) @@ -826,7 +974,7 @@ def normalize( tbt = ticker.tickByTicks if tbt: - print(f'tickbyticks:\n {ticker.tickByTicks}') + log.info(f'tickbyticks:\n {ticker.tickByTicks}') ticker.ticks = new_ticks @@ -862,27 +1010,39 @@ def normalize( return data +# ?TODO? feels like this task-fn could be factored to reduce some +# indentation levels? +# -[ ] the reconnect while loop on ib-gw "data farm connection.."s +# -[ ] everything embedded under the `async with aclosing(stream):` +# as the "meat" of the quote delivery once the connection is +# stable. +# async def stream_quotes( - send_chan: trio.abc.SendChannel, symbols: list[str], feed_is_live: trio.Event, - loglevel: str = None, + + # TODO? we need to hook into the `ib_async` logger like + # we can with i3ipc from modden! + # loglevel: str|None = None, # startup sync task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, ) -> None: ''' - Stream symbol quotes. + Stream `symbols[0]` quotes back via `send_chan`. - This is a ``trio`` callable routine meant to be invoked - once the brokerd is up. + The `feed_is_live: Event` is set to signal the caller that it can + begin processing msgs from the mem-chan. ''' # TODO: support multiple subscriptions - sym = symbols[0] - log.info(f'request for real-time quotes: {sym}') + sym: str = symbols[0] + log.info( + f'request for real-time quotes\n' + f'sym: {sym!r}\n' + ) init_msgs: list[FeedInit] = [] @@ -891,34 +1051,52 @@ async def stream_quotes( details: ibis.ContractDetails async with ( open_data_client() as proxy, - # trio.open_nursery() as tn, ): mkt, details = await get_mkt_info( sym, proxy=proxy, # passed to avoid implicit client load ) + # is venue active rn? + venue_is_open: bool = any( + is_current_time_in_range( + start_dt=sesh.start, + end_dt=sesh.end, + ) + for sesh in details.tradingSessions() + ) + init_msg = FeedInit(mkt_info=mkt) + # NOTE, tell sampler (via config) to skip vlm summing for dst + # assets which provide no vlm data.. if mkt.dst.atype in { 'fiat', 'index', 'commodity', }: - # tell sampler config that it shouldn't do vlm summing. init_msg.shm_write_opts['sum_tick_vlm'] = False init_msg.shm_write_opts['has_vlm'] = False init_msgs.append(init_msg) con: Contract = details.contract - first_ticker: Ticker | None = None - with trio.move_on_after(1): + first_ticker: Ticker|None = None + + timeout: float = 1.6 + with trio.move_on_after(timeout) as quote_cs: first_ticker: Ticker = await proxy.get_quote( contract=con, raise_on_timeout=False, ) + # XXX should never happen with this ep right? + # but if so then, more then likely mkt is closed? + if quote_cs.cancelled_caught: + log.warning( + f'First quote req timed out after {timeout!r}s' + ) + if first_ticker: first_quote: dict = normalize(first_ticker) @@ -930,28 +1108,27 @@ async def stream_quotes( f'{pformat(first_quote)}\n' ) - # NOTE: it might be outside regular trading hours for - # assets with "standard venue operating hours" so we - # only "pretend the feed is live" when the dst asset - # type is NOT within the NON-NORMAL-venue set: aka not - # commodities, forex or crypto currencies which CAN - # always return a NaN on a snap quote request during - # normal venue hours. In the case of a closed venue - # (equitiies, futes, bonds etc.) we at least try to - # grab the OHLC history. - if ( - first_ticker - and - isnan(first_ticker.last) - # SO, if the last quote price value is NaN we ONLY - # "pretend to do" `feed_is_live.set()` if it's a known - # dst asset venue with a lot of closed operating hours. - and mkt.dst.atype not in { - 'commodity', - 'fiat', - 'crypto', - } - ): + # XXX NOTE: whenever we're "outside regular trading hours" + # (only relevant for assets coming from the "legacy markets" + # space) so we basically (from an API/runtime-operational + # perspective) "pretend the feed is live" even if it's + # actually closed. + # + # IOW, we signal to the effective caller (task) that the live + # feed is "already up" but really we're just indicating that + # the OHLCV history can start being loaded immediately by the + # `piker.data`/`.tsp` layers. + # + # XXX, deats: the "pretend we're live" is just done by + # a `feed_is_live.set()` even though nothing is actually live + # Bp + if not venue_is_open: + log.warning( + f'Venue is closed, unable to establish real-time feed.\n' + f'mkt: {mkt!r}\n' + f'\n' + f'first_ticker: {first_ticker}\n' + ) task_status.started(( init_msgs, first_quote, @@ -962,10 +1139,12 @@ async def stream_quotes( feed_is_live.set() # block and let data history backfill code run. + # XXX obvi given the venue is closed, we never expect feed + # to come up; a taskc should be the only way to + # terminate this task. await trio.sleep_forever() - return # we never expect feed to come up? - # TODO: we should instead spawn a task that waits on a feed + # ?TODO, we could instead spawn a task that waits on a feed # to start and let it wait indefinitely..instead of this # hard coded stuff. # async def wait_for_first_quote(): @@ -987,24 +1166,27 @@ async def stream_quotes( 'Rxed init quote:\n' f'{pformat(first_quote)}' ) - cs: trio.CancelScope | None = None + cs: trio.CancelScope|None = None startup: bool = True iter_quotes: trio.abc.Channel while ( startup - or cs.cancel_called + or + cs.cancel_called ): with trio.CancelScope() as cs: async with ( + tractor.trionics.collapse_eg(), trio.open_nursery() as tn, open_aio_quote_stream( symbol=sym, contract=con, ) as iter_quotes, ): + # ?TODO? can we rm this - particularly for `ib_async`? # ugh, clear ticks since we've consumed them # (ahem, ib_insync is stateful trash) - first_ticker.ticks = [] + # first_ticker.ticks = [] # only on first entry at feed boot up if startup: @@ -1018,8 +1200,8 @@ async def stream_quotes( # data feed event. async def reset_on_feed(): - # TODO: this seems to be surpressed from the - # traceback in ``tractor``? + # ??TODO? this seems to be surpressed from the + # traceback in `tractor`? # assert 0 rt_ev = proxy.status_event( @@ -1065,7 +1247,7 @@ async def stream_quotes( # ugh, clear ticks since we've # consumed them (ahem, ib_insync is # truly stateful trash) - ticker.ticks = [] + # ticker.ticks = [] # XXX: this works because we don't use # ``aclosing()`` above? @@ -1087,8 +1269,12 @@ async def stream_quotes( async for ticker in iter_quotes: quote = normalize(ticker) fqme: str = quote['fqme'] + log.debug( + f'Sending quote\n' + f'{quote}' + ) await send_chan.send({fqme: quote}) # ugh, clear ticks since we've consumed them - ticker.ticks = [] + # ticker.ticks = [] # last = time.time()