Compare commits
16 Commits
ae7cd8df7c
...
87385a4e2d
| Author | SHA1 | Date |
|---|---|---|
|
|
87385a4e2d | |
|
|
b3c5478017 | |
|
|
6c9a78c5a0 | |
|
|
da223f7a55 | |
|
|
49fe0a3398 | |
|
|
29fc3b8a8b | |
|
|
1bfe777637 | |
|
|
c694d915f1 | |
|
|
c120cb51a4 | |
|
|
7c20231f16 | |
|
|
d809c79788 | |
|
|
9f2f8a1664 | |
|
|
9f141635d1 | |
|
|
0604ca7c82 | |
|
|
82c2256271 | |
|
|
a743fa28b5 |
|
|
@ -1,71 +1,30 @@
|
||||||
running ``ib`` gateway in ``docker``
|
running ``ib`` gateway in ``docker``
|
||||||
------------------------------------
|
------------------------------------
|
||||||
We have a config based on a well maintained community
|
We have a config based on the (now defunct)
|
||||||
image from `@gnzsnz`:
|
image from "waytrade":
|
||||||
|
|
||||||
https://github.com/gnzsnz/ib-gateway-docker
|
https://github.com/waytrade/ib-gateway-docker
|
||||||
|
|
||||||
|
To startup this image with our custom settings
|
||||||
To startup this image simply run the command::
|
simply run the command::
|
||||||
|
|
||||||
docker compose up
|
docker compose up
|
||||||
|
|
||||||
(For further usage^ see the official `docker-compose`_ docs)
|
And you should have the following socket-available services:
|
||||||
|
|
||||||
|
- ``x11vnc1@127.0.0.1:3003``
|
||||||
|
- ``ib-gw@127.0.0.1:4002``
|
||||||
|
|
||||||
And you should have the following socket-available services by
|
You can attach to the container via a VNC client
|
||||||
default:
|
without password auth.
|
||||||
|
|
||||||
- ``x11vnc1 @ 127.0.0.1:5900``
|
SECURITY STUFF!?!?!
|
||||||
- ``ib-gw @ 127.0.0.1:4002``
|
-------------------
|
||||||
|
Though "``ib``" claims they host filter connections outside
|
||||||
You can now attach to the container via a VNC client with password-auth;
|
localhost (aka ``127.0.0.1``) it's probably better if you filter
|
||||||
here is an example using ``vncclient`` on ``linux``::
|
the socket at the OS level using a stateless firewall rule::
|
||||||
|
|
||||||
vncviewer localhost:5900
|
|
||||||
|
|
||||||
|
|
||||||
now enter the pw you set via an (see second code blob) `.env file`_
|
|
||||||
or pw-file according to the `credentials section`_.
|
|
||||||
|
|
||||||
If you want to change away from their default config see the example
|
|
||||||
`docker-compose.yml`-config issue and config-section of the readme,
|
|
||||||
|
|
||||||
- https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#configuration
|
|
||||||
- https://github.com/gnzsnz/ib-gateway-docker/discussions/103
|
|
||||||
|
|
||||||
.. _.env file: https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#how-to-use-it
|
|
||||||
.. _docker-compose: https://docs.docker.com/compose/
|
|
||||||
.. _credentials section: https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#credentials
|
|
||||||
|
|
||||||
|
|
||||||
IF you also want to run ``TWS``
|
|
||||||
-------------------------------
|
|
||||||
You can also run it containerized,
|
|
||||||
|
|
||||||
https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#using-tws
|
|
||||||
|
|
||||||
|
|
||||||
SECURITY stuff (advanced, only if you're paranoid)
|
|
||||||
--------------------------------------------------
|
|
||||||
First and foremost if doing a "distributed" container setup where you
|
|
||||||
run the ``ib-gw`` docker container and your connecting API client
|
|
||||||
(likely ``ib_async`` from python) on **different hosts** be sure to
|
|
||||||
read the `security considerations`_ section!
|
|
||||||
|
|
||||||
And for a further (somewhat paranoid) perspective from
|
|
||||||
a long-time-ago serious devops eng..
|
|
||||||
|
|
||||||
Though "``ib``" claims they filter remote host connections outside
|
|
||||||
``localhost`` (aka ``127.0.0.1`` on ipv4) it's prolly justified if
|
|
||||||
you'd like to filter the socket at the *OS level* using a stateless
|
|
||||||
firewall rule::
|
|
||||||
|
|
||||||
ip rule add not unicast iif lo to 0.0.0.0/0 dport 4002
|
ip rule add not unicast iif lo to 0.0.0.0/0 dport 4002
|
||||||
|
|
||||||
|
We will soon have this baked into our own custom image but for
|
||||||
We will soon have this either baked into our own custom derivative
|
now you'll have to do it urself dawgy.
|
||||||
image (or patched into the current upstream one after further testin)
|
|
||||||
but for now you'll have to do it urself, diggity dawg.
|
|
||||||
|
|
||||||
.. _security considerations: https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#security-considerations
|
|
||||||
|
|
|
||||||
|
|
@ -94,15 +94,13 @@ class L1(Struct):
|
||||||
|
|
||||||
|
|
||||||
# validation type
|
# validation type
|
||||||
# https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams/Aggregate-Trade-Streams#response-example
|
|
||||||
class AggTrade(Struct, frozen=True):
|
class AggTrade(Struct, frozen=True):
|
||||||
e: str # Event type
|
e: str # Event type
|
||||||
E: int # Event time
|
E: int # Event time
|
||||||
s: str # Symbol
|
s: str # Symbol
|
||||||
a: int # Aggregate trade ID
|
a: int # Aggregate trade ID
|
||||||
p: float # Price
|
p: float # Price
|
||||||
q: float # Quantity with all the market trades
|
q: float # Quantity
|
||||||
nq: float # Normal quantity without the trades involving RPI orders
|
|
||||||
f: int # First trade ID
|
f: int # First trade ID
|
||||||
l: int # noqa Last trade ID
|
l: int # noqa Last trade ID
|
||||||
T: int # Trade time
|
T: int # Trade time
|
||||||
|
|
|
||||||
|
|
@ -102,10 +102,7 @@ class Pair(Struct, frozen=True, kw_only=True):
|
||||||
# https://developers.binance.com/docs/binance-spot-api-docs#2025-08-26
|
# https://developers.binance.com/docs/binance-spot-api-docs#2025-08-26
|
||||||
# will become non-optional 2025-08-28?
|
# will become non-optional 2025-08-28?
|
||||||
# https://developers.binance.com/docs/binance-spot-api-docs#future-changes
|
# https://developers.binance.com/docs/binance-spot-api-docs#future-changes
|
||||||
pegInstructionsAllowed: bool = False
|
pegInstructionsAllowed: bool|None = None
|
||||||
|
|
||||||
# https://developers.binance.com/docs/binance-spot-api-docs#2025-12-02
|
|
||||||
opoAllowed: bool = False
|
|
||||||
|
|
||||||
filters: dict[
|
filters: dict[
|
||||||
str,
|
str,
|
||||||
|
|
@ -223,10 +220,7 @@ class FutesPair(Pair):
|
||||||
assert pair == self.pair # sanity
|
assert pair == self.pair # sanity
|
||||||
return f'{expiry}'
|
return f'{expiry}'
|
||||||
|
|
||||||
case (
|
case 'PERPETUAL':
|
||||||
'PERPETUAL'
|
|
||||||
| 'TRADIFI_PERPETUAL'
|
|
||||||
):
|
|
||||||
return 'PERP'
|
return 'PERP'
|
||||||
|
|
||||||
case '':
|
case '':
|
||||||
|
|
@ -255,10 +249,7 @@ class FutesPair(Pair):
|
||||||
margin: str = self.marginAsset
|
margin: str = self.marginAsset
|
||||||
|
|
||||||
match ctype:
|
match ctype:
|
||||||
case (
|
case 'PERPETUAL':
|
||||||
'PERPETUAL'
|
|
||||||
| 'TRADIFI_PERPETUAL'
|
|
||||||
):
|
|
||||||
return f'{margin}M'
|
return f'{margin}M'
|
||||||
|
|
||||||
case (
|
case (
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,9 @@ routines should be primitive data types where possible.
|
||||||
"""
|
"""
|
||||||
import inspect
|
import inspect
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
from typing import List, Dict, Any, Optional
|
from typing import (
|
||||||
|
Any,
|
||||||
|
)
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
|
|
@ -34,8 +36,10 @@ from ..accounting import MktPair
|
||||||
|
|
||||||
|
|
||||||
async def api(brokername: str, methname: str, **kwargs) -> dict:
|
async def api(brokername: str, methname: str, **kwargs) -> dict:
|
||||||
"""Make (proxy through) a broker API call by name and return its result.
|
'''
|
||||||
"""
|
Make (proxy through) a broker API call by name and return its result.
|
||||||
|
|
||||||
|
'''
|
||||||
brokermod = get_brokermod(brokername)
|
brokermod = get_brokermod(brokername)
|
||||||
async with brokermod.get_client() as client:
|
async with brokermod.get_client() as client:
|
||||||
meth = getattr(client, methname, None)
|
meth = getattr(client, methname, None)
|
||||||
|
|
@ -62,10 +66,14 @@ async def api(brokername: str, methname: str, **kwargs) -> dict:
|
||||||
|
|
||||||
async def stocks_quote(
|
async def stocks_quote(
|
||||||
brokermod: ModuleType,
|
brokermod: ModuleType,
|
||||||
tickers: List[str]
|
tickers: list[str]
|
||||||
) -> Dict[str, Dict[str, Any]]:
|
|
||||||
"""Return quotes dict for ``tickers``.
|
) -> dict[str, dict[str, Any]]:
|
||||||
"""
|
'''
|
||||||
|
Return a `dict` of snapshot quotes for the provided input
|
||||||
|
`tickers`: a `list` of fqmes.
|
||||||
|
|
||||||
|
'''
|
||||||
async with brokermod.get_client() as client:
|
async with brokermod.get_client() as client:
|
||||||
return await client.quote(tickers)
|
return await client.quote(tickers)
|
||||||
|
|
||||||
|
|
@ -74,13 +82,15 @@ async def stocks_quote(
|
||||||
async def option_chain(
|
async def option_chain(
|
||||||
brokermod: ModuleType,
|
brokermod: ModuleType,
|
||||||
symbol: str,
|
symbol: str,
|
||||||
date: Optional[str] = None,
|
date: str|None = None,
|
||||||
) -> Dict[str, Dict[str, Dict[str, Any]]]:
|
) -> dict[str, dict[str, dict[str, Any]]]:
|
||||||
"""Return option chain for ``symbol`` for ``date``.
|
'''
|
||||||
|
Return option chain for ``symbol`` for ``date``.
|
||||||
|
|
||||||
By default all expiries are returned. If ``date`` is provided
|
By default all expiries are returned. If ``date`` is provided
|
||||||
then contract quotes for that single expiry are returned.
|
then contract quotes for that single expiry are returned.
|
||||||
"""
|
|
||||||
|
'''
|
||||||
async with brokermod.get_client() as client:
|
async with brokermod.get_client() as client:
|
||||||
if date:
|
if date:
|
||||||
id = int((await client.tickers2ids([symbol]))[symbol])
|
id = int((await client.tickers2ids([symbol]))[symbol])
|
||||||
|
|
@ -98,7 +108,7 @@ async def option_chain(
|
||||||
# async def contracts(
|
# async def contracts(
|
||||||
# brokermod: ModuleType,
|
# brokermod: ModuleType,
|
||||||
# symbol: str,
|
# symbol: str,
|
||||||
# ) -> Dict[str, Dict[str, Dict[str, Any]]]:
|
# ) -> dict[str, dict[str, dict[str, Any]]]:
|
||||||
# """Return option contracts (all expiries) for ``symbol``.
|
# """Return option contracts (all expiries) for ``symbol``.
|
||||||
# """
|
# """
|
||||||
# async with brokermod.get_client() as client:
|
# async with brokermod.get_client() as client:
|
||||||
|
|
@ -110,15 +120,24 @@ async def bars(
|
||||||
brokermod: ModuleType,
|
brokermod: ModuleType,
|
||||||
symbol: str,
|
symbol: str,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
) -> Dict[str, Dict[str, Dict[str, Any]]]:
|
) -> dict[str, dict[str, dict[str, Any]]]:
|
||||||
"""Return option contracts (all expiries) for ``symbol``.
|
'''
|
||||||
"""
|
Return option contracts (all expiries) for ``symbol``.
|
||||||
|
|
||||||
|
'''
|
||||||
async with brokermod.get_client() as client:
|
async with brokermod.get_client() as client:
|
||||||
return await client.bars(symbol, **kwargs)
|
return await client.bars(symbol, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
async def search_w_brokerd(name: str, pattern: str) -> dict:
|
async def search_w_brokerd(
|
||||||
|
name: str,
|
||||||
|
pattern: str,
|
||||||
|
) -> dict:
|
||||||
|
|
||||||
|
# TODO: WHY NOT WORK!?!
|
||||||
|
# when we `step` through the next block?
|
||||||
|
# import tractor
|
||||||
|
# await tractor.pause()
|
||||||
async with open_cached_client(name) as client:
|
async with open_cached_client(name) as client:
|
||||||
|
|
||||||
# TODO: support multiple asset type concurrent searches.
|
# TODO: support multiple asset type concurrent searches.
|
||||||
|
|
@ -130,12 +149,12 @@ async def symbol_search(
|
||||||
pattern: str,
|
pattern: str,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> Dict[str, Dict[str, Dict[str, Any]]]:
|
) -> dict[str, dict[str, dict[str, Any]]]:
|
||||||
'''
|
'''
|
||||||
Return symbol info from broker.
|
Return symbol info from broker.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
results = []
|
results: list[str] = []
|
||||||
|
|
||||||
async def search_backend(
|
async def search_backend(
|
||||||
brokermod: ModuleType
|
brokermod: ModuleType
|
||||||
|
|
@ -143,6 +162,13 @@ async def symbol_search(
|
||||||
|
|
||||||
brokername: str = mod.name
|
brokername: str = mod.name
|
||||||
|
|
||||||
|
# TODO: figure this the FUCK OUT
|
||||||
|
# -> ok so obvi in the root actor any async task that's
|
||||||
|
# spawned outside the main tractor-root-actor task needs to
|
||||||
|
# call this..
|
||||||
|
# await tractor.devx._debug.maybe_init_greenback()
|
||||||
|
# tractor.pause_from_sync()
|
||||||
|
|
||||||
async with maybe_spawn_brokerd(
|
async with maybe_spawn_brokerd(
|
||||||
mod.name,
|
mod.name,
|
||||||
infect_asyncio=getattr(
|
infect_asyncio=getattr(
|
||||||
|
|
@ -162,7 +188,6 @@ async def symbol_search(
|
||||||
))
|
))
|
||||||
|
|
||||||
async with trio.open_nursery() as n:
|
async with trio.open_nursery() as n:
|
||||||
|
|
||||||
for mod in brokermods:
|
for mod in brokermods:
|
||||||
n.start_soon(search_backend, mod.name)
|
n.start_soon(search_backend, mod.name)
|
||||||
|
|
||||||
|
|
@ -172,11 +197,13 @@ async def symbol_search(
|
||||||
async def mkt_info(
|
async def mkt_info(
|
||||||
brokermod: ModuleType,
|
brokermod: ModuleType,
|
||||||
fqme: str,
|
fqme: str,
|
||||||
|
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> MktPair:
|
) -> MktPair:
|
||||||
'''
|
'''
|
||||||
Return MktPair info from broker including src and dst assets.
|
Return the `piker.accounting.MktPair` info struct from a given
|
||||||
|
backend broker tradable src/dst asset pair.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async with open_cached_client(brokermod.name) as client:
|
async with open_cached_client(brokermod.name) as client:
|
||||||
|
|
|
||||||
|
|
@ -20,11 +20,6 @@ runnable script-programs.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from datetime import ( # noqa
|
|
||||||
datetime,
|
|
||||||
date,
|
|
||||||
tzinfo as TzInfo,
|
|
||||||
)
|
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from typing import (
|
from typing import (
|
||||||
Literal,
|
Literal,
|
||||||
|
|
@ -38,6 +33,7 @@ from piker.brokers._util import get_logger
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from .api import Client
|
from .api import Client
|
||||||
|
from ib_insync import IB
|
||||||
import i3ipc
|
import i3ipc
|
||||||
|
|
||||||
log = get_logger('piker.brokers.ib')
|
log = get_logger('piker.brokers.ib')
|
||||||
|
|
@ -61,7 +57,7 @@ no_setup_msg:str = (
|
||||||
|
|
||||||
|
|
||||||
def try_xdo_manual(
|
def try_xdo_manual(
|
||||||
client: Client,
|
vnc_sockaddr: str,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Do the "manual" `xdo`-based screen switch + click
|
Do the "manual" `xdo`-based screen switch + click
|
||||||
|
|
@ -78,14 +74,14 @@ def try_xdo_manual(
|
||||||
_reset_tech = 'i3ipc_xdotool'
|
_reset_tech = 'i3ipc_xdotool'
|
||||||
return True
|
return True
|
||||||
except OSError:
|
except OSError:
|
||||||
vnc_sockaddr: str = client.conf.vnc_addrs
|
|
||||||
log.exception(
|
log.exception(
|
||||||
no_setup_msg.format(vnc_sockaddr=vnc_sockaddr)
|
no_setup_msg.format(vnc_sockaddr)
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
async def data_reset_hack(
|
async def data_reset_hack(
|
||||||
|
# vnc_host: str,
|
||||||
client: Client,
|
client: Client,
|
||||||
reset_type: Literal['data', 'connection'],
|
reset_type: Literal['data', 'connection'],
|
||||||
|
|
||||||
|
|
@ -117,24 +113,36 @@ async def data_reset_hack(
|
||||||
that need to be wrangle.
|
that need to be wrangle.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
ib_client: IB = client.ib
|
||||||
|
|
||||||
# look up any user defined vnc socket address mapped from
|
# look up any user defined vnc socket address mapped from
|
||||||
# a particular API socket port.
|
# a particular API socket port.
|
||||||
vnc_addrs: tuple[str]|None = client.conf.get('vnc_addrs')
|
api_port: str = str(ib_client.client.port)
|
||||||
if not vnc_addrs:
|
vnc_host: str
|
||||||
|
vnc_port: int
|
||||||
|
vnc_sockaddr: tuple[str] | None = client.conf.get('vnc_addrs')
|
||||||
|
|
||||||
|
if not vnc_sockaddr:
|
||||||
log.warning(
|
log.warning(
|
||||||
no_setup_msg.format(vnc_sockaddr=client.conf)
|
no_setup_msg.format(vnc_sockaddr)
|
||||||
+
|
+
|
||||||
'REQUIRES A `vnc_addrs: array` ENTRY'
|
'REQUIRES A `vnc_addrs: array` ENTRY'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
vnc_host, vnc_port = vnc_sockaddr.get(
|
||||||
|
api_port,
|
||||||
|
('localhost', 3003)
|
||||||
|
)
|
||||||
global _reset_tech
|
global _reset_tech
|
||||||
|
|
||||||
match _reset_tech:
|
match _reset_tech:
|
||||||
case 'vnc':
|
case 'vnc':
|
||||||
try:
|
try:
|
||||||
await tractor.to_asyncio.run_task(
|
await tractor.to_asyncio.run_task(
|
||||||
partial(
|
partial(
|
||||||
vnc_click_hack,
|
vnc_click_hack,
|
||||||
client=client,
|
host=vnc_host,
|
||||||
|
port=vnc_port,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
except (
|
except (
|
||||||
|
|
@ -145,31 +153,29 @@ async def data_reset_hack(
|
||||||
import i3ipc # noqa (since a deps dynamic check)
|
import i3ipc # noqa (since a deps dynamic check)
|
||||||
except ModuleNotFoundError:
|
except ModuleNotFoundError:
|
||||||
log.warning(
|
log.warning(
|
||||||
no_setup_msg.format(vnc_sockaddr=client.conf)
|
no_setup_msg.format(vnc_sockaddr)
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# XXX, Xorg only workaround..
|
if vnc_host not in {
|
||||||
# TODO? remove now that we have `pyvnc`?
|
'localhost',
|
||||||
# if vnc_host not in {
|
'127.0.0.1',
|
||||||
# 'localhost',
|
}:
|
||||||
# '127.0.0.1',
|
focussed, matches = i3ipc_fin_wins_titled()
|
||||||
# }:
|
if not matches:
|
||||||
# focussed, matches = i3ipc_fin_wins_titled()
|
log.warning(
|
||||||
# if not matches:
|
no_setup_msg.format(vnc_sockaddr)
|
||||||
# log.warning(
|
)
|
||||||
# no_setup_msg.format(vnc_sockaddr=vnc_sockaddr)
|
return False
|
||||||
# )
|
else:
|
||||||
# return False
|
try_xdo_manual(vnc_sockaddr)
|
||||||
# else:
|
|
||||||
# try_xdo_manual(vnc_sockaddr)
|
|
||||||
|
|
||||||
# localhost but no vnc-client or it borked..
|
# localhost but no vnc-client or it borked..
|
||||||
else:
|
else:
|
||||||
try_xdo_manual(client)
|
try_xdo_manual(vnc_sockaddr)
|
||||||
|
|
||||||
case 'i3ipc_xdotool':
|
case 'i3ipc_xdotool':
|
||||||
try_xdo_manual(client)
|
try_xdo_manual(vnc_sockaddr)
|
||||||
# i3ipc_xdotool_manual_click_hack()
|
# i3ipc_xdotool_manual_click_hack()
|
||||||
|
|
||||||
case _ as tech:
|
case _ as tech:
|
||||||
|
|
@ -180,66 +186,21 @@ async def data_reset_hack(
|
||||||
|
|
||||||
|
|
||||||
async def vnc_click_hack(
|
async def vnc_click_hack(
|
||||||
client: Client,
|
host: str,
|
||||||
reset_type: str = 'data',
|
port: int,
|
||||||
pw: str|None = None,
|
reset_type: str = 'data'
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Reset the data or network connection for the VNC attached
|
Reset the data or network connection for the VNC attached
|
||||||
ib-gateway using a (magic) keybinding combo.
|
ib gateway using magic combos.
|
||||||
|
|
||||||
A vnc-server password can be set either by an input `pw` param or
|
|
||||||
set in the client's config with the latter loaded from the user's
|
|
||||||
`brokers.toml` in a vnc-addrs-port-mapping section,
|
|
||||||
|
|
||||||
.. code:: toml
|
|
||||||
|
|
||||||
[ib.vnc_addrs]
|
|
||||||
4002 = {host = 'localhost', port = 5900, pw = 'doggy'}
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
api_port: str = str(client.ib.client.port)
|
|
||||||
conf: dict = client.conf
|
|
||||||
vnc_addrs: dict[int, tuple] = conf.get('vnc_addrs')
|
|
||||||
if not vnc_addrs:
|
|
||||||
return None
|
|
||||||
|
|
||||||
addr_entry: dict|tuple = vnc_addrs.get(
|
|
||||||
api_port,
|
|
||||||
('localhost', 5900) # a typical default
|
|
||||||
)
|
|
||||||
if pw is None:
|
|
||||||
match addr_entry:
|
|
||||||
case (
|
|
||||||
host,
|
|
||||||
port,
|
|
||||||
):
|
|
||||||
pass
|
|
||||||
|
|
||||||
case {
|
|
||||||
'host': host,
|
|
||||||
'port': port,
|
|
||||||
'pw': pw
|
|
||||||
}:
|
|
||||||
pass
|
|
||||||
|
|
||||||
case _:
|
|
||||||
raise ValueError(
|
|
||||||
f'Invalid `ib.vnc_addrs` entry ?\n'
|
|
||||||
f'{addr_entry!r}\n'
|
|
||||||
)
|
|
||||||
try:
|
try:
|
||||||
from pyvnc import (
|
import asyncvnc
|
||||||
AsyncVNCClient,
|
|
||||||
VNCConfig,
|
|
||||||
Point,
|
|
||||||
MOUSE_BUTTON_LEFT,
|
|
||||||
)
|
|
||||||
except ModuleNotFoundError:
|
except ModuleNotFoundError:
|
||||||
log.warning(
|
log.warning(
|
||||||
"In order to leverage `piker`'s built-in data reset hacks, install "
|
"In order to leverage `piker`'s built-in data reset hacks, install "
|
||||||
"the `pyvnc` project: https://github.com/regulad/pyvnc.git"
|
"the `asyncvnc` project: https://github.com/barneygale/asyncvnc"
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
@ -250,27 +211,24 @@ async def vnc_click_hack(
|
||||||
'connection': 'r'
|
'connection': 'r'
|
||||||
}[reset_type]
|
}[reset_type]
|
||||||
|
|
||||||
with tractor.devx.open_crash_handler():
|
async with asyncvnc.connect(
|
||||||
client = await AsyncVNCClient.connect(
|
host,
|
||||||
VNCConfig(
|
port=port,
|
||||||
host=host,
|
|
||||||
port=port,
|
# TODO: doesn't work?
|
||||||
password=pw,
|
# see, https://github.com/barneygale/asyncvnc/issues/7
|
||||||
)
|
password='doggy',
|
||||||
|
|
||||||
|
) as client:
|
||||||
|
|
||||||
|
# move to middle of screen
|
||||||
|
# 640x1800
|
||||||
|
client.mouse.move(
|
||||||
|
x=500,
|
||||||
|
y=500,
|
||||||
)
|
)
|
||||||
async with client:
|
client.mouse.click()
|
||||||
# move to middle of screen
|
client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked
|
||||||
# 640x1800
|
|
||||||
await client.move(
|
|
||||||
Point(
|
|
||||||
500,
|
|
||||||
500,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
# ensure the ib-gw window is active
|
|
||||||
await client.click(MOUSE_BUTTON_LEFT)
|
|
||||||
# send the hotkeys combo B)
|
|
||||||
await client.press('Ctrl', 'Alt', key) # keys are stacked
|
|
||||||
|
|
||||||
|
|
||||||
def i3ipc_fin_wins_titled(
|
def i3ipc_fin_wins_titled(
|
||||||
|
|
@ -379,99 +337,3 @@ def i3ipc_xdotool_manual_click_hack() -> None:
|
||||||
])
|
])
|
||||||
except subprocess.TimeoutExpired:
|
except subprocess.TimeoutExpired:
|
||||||
log.exception('xdotool timed out?')
|
log.exception('xdotool timed out?')
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def is_current_time_in_range(
|
|
||||||
start_dt: datetime,
|
|
||||||
end_dt: datetime,
|
|
||||||
) -> bool:
|
|
||||||
'''
|
|
||||||
Check if current time is within the datetime range.
|
|
||||||
|
|
||||||
Use any/the-same timezone as provided by `start_dt.tzinfo` value
|
|
||||||
in the range.
|
|
||||||
|
|
||||||
'''
|
|
||||||
now: datetime = datetime.now(start_dt.tzinfo)
|
|
||||||
return start_dt <= now <= end_dt
|
|
||||||
|
|
||||||
|
|
||||||
# TODO, put this into `._util` and call it from here!
|
|
||||||
#
|
|
||||||
# NOTE, this was generated by @guille from a gpt5 prompt
|
|
||||||
# and was originally thot to be needed before learning about
|
|
||||||
# `ib_insync.contract.ContractDetails._parseSessions()` and
|
|
||||||
# it's downstream meths..
|
|
||||||
#
|
|
||||||
# This is still likely useful to keep for now to parse the
|
|
||||||
# `.tradingHours: str` value manually if we ever decide
|
|
||||||
# to move off `ib_async` and implement our own `trio`/`anyio`
|
|
||||||
# based version Bp
|
|
||||||
#
|
|
||||||
# >attempt to parse the retarted ib "time stampy thing" they
|
|
||||||
# >do for "venue hours" with this.. written by
|
|
||||||
# >gpt5-"thinking",
|
|
||||||
#
|
|
||||||
|
|
||||||
|
|
||||||
def parse_trading_hours(
|
|
||||||
spec: str,
|
|
||||||
tz: TzInfo|None = None
|
|
||||||
) -> dict[
|
|
||||||
date,
|
|
||||||
tuple[datetime, datetime]
|
|
||||||
]|None:
|
|
||||||
'''
|
|
||||||
Parse venue hours like:
|
|
||||||
'YYYYMMDD:HHMM-YYYYMMDD:HHMM;YYYYMMDD:CLOSED;...'
|
|
||||||
|
|
||||||
Returns `dict[date] = (open_dt, close_dt)` or `None` if
|
|
||||||
closed.
|
|
||||||
|
|
||||||
'''
|
|
||||||
if (
|
|
||||||
not isinstance(spec, str)
|
|
||||||
or
|
|
||||||
not spec
|
|
||||||
):
|
|
||||||
raise ValueError('spec must be a non-empty string')
|
|
||||||
|
|
||||||
out: dict[
|
|
||||||
date,
|
|
||||||
tuple[datetime, datetime]
|
|
||||||
]|None = {}
|
|
||||||
|
|
||||||
for part in (p.strip() for p in spec.split(';') if p.strip()):
|
|
||||||
if part.endswith(':CLOSED'):
|
|
||||||
day_s, _ = part.split(':', 1)
|
|
||||||
d = datetime.strptime(day_s, '%Y%m%d').date()
|
|
||||||
out[d] = None
|
|
||||||
continue
|
|
||||||
|
|
||||||
try:
|
|
||||||
start_s, end_s = part.split('-', 1)
|
|
||||||
start_dt = datetime.strptime(start_s, '%Y%m%d:%H%M')
|
|
||||||
end_dt = datetime.strptime(end_s, '%Y%m%d:%H%M')
|
|
||||||
except ValueError as exc:
|
|
||||||
raise ValueError(f'invalid segment: {part}') from exc
|
|
||||||
|
|
||||||
if tz is not None:
|
|
||||||
start_dt = start_dt.replace(tzinfo=tz)
|
|
||||||
end_dt = end_dt.replace(tzinfo=tz)
|
|
||||||
|
|
||||||
out[start_dt.date()] = (start_dt, end_dt)
|
|
||||||
|
|
||||||
return out
|
|
||||||
|
|
||||||
|
|
||||||
# ORIG desired usage,
|
|
||||||
#
|
|
||||||
# TODO, for non-drunk tomorrow,
|
|
||||||
# - call above fn and check that `output[today] is not None`
|
|
||||||
# trading_hrs: dict = parse_trading_hours(
|
|
||||||
# details.tradingHours
|
|
||||||
# )
|
|
||||||
# liq_hrs: dict = parse_trading_hours(
|
|
||||||
# details.liquidHours
|
|
||||||
# )
|
|
||||||
|
|
|
||||||
|
|
@ -333,15 +333,15 @@ class Client:
|
||||||
fqme: str,
|
fqme: str,
|
||||||
|
|
||||||
# EST in ISO 8601 format is required... below is EPOCH
|
# EST in ISO 8601 format is required... below is EPOCH
|
||||||
start_dt: datetime|str = "1970-01-01T00:00:00.000000-05:00",
|
start_dt: datetime | str = "1970-01-01T00:00:00.000000-05:00",
|
||||||
end_dt: datetime|str = "",
|
end_dt: datetime | str = "",
|
||||||
|
|
||||||
# ohlc sample period in seconds
|
# ohlc sample period in seconds
|
||||||
sample_period_s: int = 1,
|
sample_period_s: int = 1,
|
||||||
|
|
||||||
# optional "duration of time" equal to the
|
# optional "duration of time" equal to the
|
||||||
# length of the returned history frame.
|
# length of the returned history frame.
|
||||||
duration: str|None = None,
|
duration: str | None = None,
|
||||||
|
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
|
|
@ -715,8 +715,8 @@ class Client:
|
||||||
|
|
||||||
async def find_contracts(
|
async def find_contracts(
|
||||||
self,
|
self,
|
||||||
pattern: str|None = None,
|
pattern: str | None = None,
|
||||||
contract: Contract|None = None,
|
contract: Contract | None = None,
|
||||||
qualify: bool = True,
|
qualify: bool = True,
|
||||||
err_on_qualify: bool = True,
|
err_on_qualify: bool = True,
|
||||||
|
|
||||||
|
|
@ -861,7 +861,7 @@ class Client:
|
||||||
self,
|
self,
|
||||||
fqme: str,
|
fqme: str,
|
||||||
|
|
||||||
) -> datetime|None:
|
) -> datetime | None:
|
||||||
'''
|
'''
|
||||||
Return the first datetime stamp for `fqme` or `None`
|
Return the first datetime stamp for `fqme` or `None`
|
||||||
on request failure.
|
on request failure.
|
||||||
|
|
@ -917,7 +917,7 @@ class Client:
|
||||||
tries: int = 100,
|
tries: int = 100,
|
||||||
raise_on_timeout: bool = False,
|
raise_on_timeout: bool = False,
|
||||||
|
|
||||||
) -> Ticker|None:
|
) -> Ticker | None:
|
||||||
'''
|
'''
|
||||||
Return a single (snap) quote for symbol.
|
Return a single (snap) quote for symbol.
|
||||||
|
|
||||||
|
|
@ -929,7 +929,7 @@ class Client:
|
||||||
ready: ticker.TickerUpdateEvent = ticker.updateEvent
|
ready: ticker.TickerUpdateEvent = ticker.updateEvent
|
||||||
|
|
||||||
# ensure a last price gets filled in before we deliver quote
|
# ensure a last price gets filled in before we deliver quote
|
||||||
timeouterr: Exception|None = None
|
timeouterr: Exception | None = None
|
||||||
warnset: bool = False
|
warnset: bool = False
|
||||||
for _ in range(tries):
|
for _ in range(tries):
|
||||||
|
|
||||||
|
|
@ -943,7 +943,6 @@ class Client:
|
||||||
)
|
)
|
||||||
if tkr:
|
if tkr:
|
||||||
break
|
break
|
||||||
|
|
||||||
except TimeoutError as err:
|
except TimeoutError as err:
|
||||||
timeouterr = err
|
timeouterr = err
|
||||||
await asyncio.sleep(0.01)
|
await asyncio.sleep(0.01)
|
||||||
|
|
@ -952,9 +951,7 @@ class Client:
|
||||||
else:
|
else:
|
||||||
if not warnset:
|
if not warnset:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Quote req timed out..\n'
|
f'Quote req timed out..maybe venue is closed?\n'
|
||||||
f'Maybe the venue is closed?\n'
|
|
||||||
f'\n'
|
|
||||||
f'{asdict(contract)}'
|
f'{asdict(contract)}'
|
||||||
)
|
)
|
||||||
warnset = True
|
warnset = True
|
||||||
|
|
@ -966,11 +963,9 @@ class Client:
|
||||||
)
|
)
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
if (
|
if timeouterr and raise_on_timeout:
|
||||||
timeouterr
|
import pdbp
|
||||||
and
|
pdbp.set_trace()
|
||||||
raise_on_timeout
|
|
||||||
):
|
|
||||||
raise timeouterr
|
raise timeouterr
|
||||||
|
|
||||||
if not warnset:
|
if not warnset:
|
||||||
|
|
@ -1367,7 +1362,9 @@ async def load_aio_clients(
|
||||||
|
|
||||||
|
|
||||||
async def load_clients_for_trio(
|
async def load_clients_for_trio(
|
||||||
chan: tractor.to_asyncio.LinkedTaskChannel,
|
from_trio: asyncio.Queue,
|
||||||
|
to_trio: trio.abc.SendChannel,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Pure async mngr proxy to ``load_aio_clients()``.
|
Pure async mngr proxy to ``load_aio_clients()``.
|
||||||
|
|
@ -1380,7 +1377,8 @@ async def load_clients_for_trio(
|
||||||
disconnect_on_exit=False,
|
disconnect_on_exit=False,
|
||||||
) as accts2clients:
|
) as accts2clients:
|
||||||
|
|
||||||
chan.started_nowait(accts2clients)
|
to_trio.send_nowait(accts2clients)
|
||||||
|
|
||||||
# TODO: maybe a sync event to wait on instead?
|
# TODO: maybe a sync event to wait on instead?
|
||||||
await asyncio.sleep(float('inf'))
|
await asyncio.sleep(float('inf'))
|
||||||
|
|
||||||
|
|
@ -1503,7 +1501,7 @@ class MethodProxy:
|
||||||
self,
|
self,
|
||||||
pattern: str,
|
pattern: str,
|
||||||
|
|
||||||
) -> dict[str, Any]|trio.Event:
|
) -> dict[str, Any] | trio.Event:
|
||||||
|
|
||||||
ev = self.event_table.get(pattern)
|
ev = self.event_table.get(pattern)
|
||||||
|
|
||||||
|
|
@ -1524,22 +1522,23 @@ class MethodProxy:
|
||||||
|
|
||||||
|
|
||||||
async def open_aio_client_method_relay(
|
async def open_aio_client_method_relay(
|
||||||
chan: tractor.to_asyncio.LinkedTaskChannel,
|
from_trio: asyncio.Queue,
|
||||||
|
to_trio: trio.abc.SendChannel,
|
||||||
client: Client,
|
client: Client,
|
||||||
event_consumers: dict[str, trio.Event],
|
event_consumers: dict[str, trio.Event],
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
# sync with `open_client_proxy()` caller
|
# sync with `open_client_proxy()` caller
|
||||||
chan.started_nowait(client)
|
to_trio.send_nowait(client)
|
||||||
|
|
||||||
# TODO: separate channel for error handling?
|
# TODO: separate channel for error handling?
|
||||||
client.inline_errors(chan)
|
client.inline_errors(to_trio)
|
||||||
|
|
||||||
# relay all method requests to ``asyncio``-side client and deliver
|
# relay all method requests to ``asyncio``-side client and deliver
|
||||||
# back results
|
# back results
|
||||||
while not chan._to_trio._closed: # <- TODO, better check like `._web_bs`?
|
while not to_trio._closed:
|
||||||
msg: tuple[str, dict]|dict|None = await chan.get()
|
msg: tuple[str, dict] | dict | None = await from_trio.get()
|
||||||
match msg:
|
match msg:
|
||||||
case None: # termination sentinel
|
case None: # termination sentinel
|
||||||
log.info('asyncio `Client` method-proxy SHUTDOWN!')
|
log.info('asyncio `Client` method-proxy SHUTDOWN!')
|
||||||
|
|
@ -1552,7 +1551,7 @@ async def open_aio_client_method_relay(
|
||||||
try:
|
try:
|
||||||
resp = await meth(**kwargs)
|
resp = await meth(**kwargs)
|
||||||
# echo the msg back
|
# echo the msg back
|
||||||
chan.send_nowait({'result': resp})
|
to_trio.send_nowait({'result': resp})
|
||||||
|
|
||||||
except (
|
except (
|
||||||
RequestError,
|
RequestError,
|
||||||
|
|
@ -1560,10 +1559,10 @@ async def open_aio_client_method_relay(
|
||||||
# TODO: relay all errors to trio?
|
# TODO: relay all errors to trio?
|
||||||
# BaseException,
|
# BaseException,
|
||||||
) as err:
|
) as err:
|
||||||
chan.send_nowait({'exception': err})
|
to_trio.send_nowait({'exception': err})
|
||||||
|
|
||||||
case {'error': content}:
|
case {'error': content}:
|
||||||
chan.send_nowait({'exception': content})
|
to_trio.send_nowait({'exception': content})
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
raise ValueError(f'Unhandled msg {msg}')
|
raise ValueError(f'Unhandled msg {msg}')
|
||||||
|
|
|
||||||
|
|
@ -116,11 +116,7 @@ def pack_position(
|
||||||
symbol=fqme,
|
symbol=fqme,
|
||||||
currency=con.currency,
|
currency=con.currency,
|
||||||
size=float(pos.position),
|
size=float(pos.position),
|
||||||
avg_price=(
|
avg_price=float(pos.avgCost) / float(con.multiplier or 1.0),
|
||||||
float(pos.avgCost)
|
|
||||||
/
|
|
||||||
float(con.multiplier or 1.0)
|
|
||||||
),
|
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -361,10 +357,6 @@ async def update_and_audit_pos_msg(
|
||||||
size=ibpos.position,
|
size=ibpos.position,
|
||||||
|
|
||||||
avg_price=pikerpos.ppu,
|
avg_price=pikerpos.ppu,
|
||||||
|
|
||||||
# XXX ensures matching even if multiple venue-names
|
|
||||||
# in `.bs_fqme`, likely from txn records..
|
|
||||||
bs_mktid=mkt.bs_mktid,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
ibfmtmsg: str = pformat(ibpos._asdict())
|
ibfmtmsg: str = pformat(ibpos._asdict())
|
||||||
|
|
@ -433,8 +425,7 @@ async def aggr_open_orders(
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Collect all open orders from client and fill in `order_msgs:
|
Collect all open orders from client and fill in `order_msgs: list`.
|
||||||
list`.
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
trades: list[Trade] = client.ib.openTrades()
|
trades: list[Trade] = client.ib.openTrades()
|
||||||
|
|
@ -555,10 +546,7 @@ async def open_trade_dialog(
|
||||||
),
|
),
|
||||||
|
|
||||||
# TODO: do this as part of `open_account()`!?
|
# TODO: do this as part of `open_account()`!?
|
||||||
open_symcache(
|
open_symcache('ib', only_from_memcache=True) as symcache,
|
||||||
'ib',
|
|
||||||
only_from_memcache=True,
|
|
||||||
) as symcache,
|
|
||||||
):
|
):
|
||||||
# Open a trade ledgers stack for appending trade records over
|
# Open a trade ledgers stack for appending trade records over
|
||||||
# multiple accounts.
|
# multiple accounts.
|
||||||
|
|
@ -566,10 +554,8 @@ async def open_trade_dialog(
|
||||||
ledgers: dict[str, TransactionLedger] = {}
|
ledgers: dict[str, TransactionLedger] = {}
|
||||||
tables: dict[str, Account] = {}
|
tables: dict[str, Account] = {}
|
||||||
order_msgs: list[Status] = []
|
order_msgs: list[Status] = []
|
||||||
conf: dict = get_config()
|
conf = get_config()
|
||||||
accounts_def_inv: bidict[str, str] = bidict(
|
accounts_def_inv: bidict[str, str] = bidict(conf['accounts']).inverse
|
||||||
conf['accounts']
|
|
||||||
).inverse
|
|
||||||
|
|
||||||
with (
|
with (
|
||||||
ExitStack() as lstack,
|
ExitStack() as lstack,
|
||||||
|
|
@ -719,11 +705,7 @@ async def open_trade_dialog(
|
||||||
# client-account and build out position msgs to deliver to
|
# client-account and build out position msgs to deliver to
|
||||||
# EMS.
|
# EMS.
|
||||||
for acctid, acnt in tables.items():
|
for acctid, acnt in tables.items():
|
||||||
active_pps: dict[str, Position]
|
active_pps, closed_pps = acnt.dump_active()
|
||||||
(
|
|
||||||
active_pps,
|
|
||||||
closed_pps,
|
|
||||||
) = acnt.dump_active()
|
|
||||||
|
|
||||||
for pps in [active_pps, closed_pps]:
|
for pps in [active_pps, closed_pps]:
|
||||||
piker_pps: list[Position] = list(pps.values())
|
piker_pps: list[Position] = list(pps.values())
|
||||||
|
|
@ -739,7 +721,6 @@ async def open_trade_dialog(
|
||||||
)
|
)
|
||||||
if ibpos:
|
if ibpos:
|
||||||
bs_mktid: str = str(ibpos.contract.conId)
|
bs_mktid: str = str(ibpos.contract.conId)
|
||||||
|
|
||||||
msg = await update_and_audit_pos_msg(
|
msg = await update_and_audit_pos_msg(
|
||||||
acctid,
|
acctid,
|
||||||
pikerpos,
|
pikerpos,
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
# piker: trading gear for hackers
|
# piker: trading gear for hackers
|
||||||
# Copyright (C) 2018-forever Tyler Goodlet (in stewardship for pikers)
|
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
|
||||||
|
|
||||||
# This program is free software: you can redistribute it and/or modify
|
# This program is free software: you can redistribute it and/or modify
|
||||||
# it under the terms of the GNU Affero General Public License as published by
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
|
@ -13,12 +13,10 @@
|
||||||
|
|
||||||
# You should have received a copy of the GNU Affero General Public License
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
"""
|
||||||
|
Data feed endpoints pre-wrapped and ready for use with ``tractor``/``trio``.
|
||||||
|
|
||||||
'''
|
"""
|
||||||
Data feed endpoints pre-wrapped and ready for use with `tractor`/`trio`
|
|
||||||
via "infected-asyncio-mode".
|
|
||||||
|
|
||||||
'''
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
import asyncio
|
import asyncio
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
|
|
@ -28,6 +26,7 @@ from dataclasses import asdict
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
|
from math import isnan
|
||||||
import time
|
import time
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
|
|
@ -41,6 +40,7 @@ import numpy as np
|
||||||
from pendulum import (
|
from pendulum import (
|
||||||
now,
|
now,
|
||||||
from_timestamp,
|
from_timestamp,
|
||||||
|
# DateTime,
|
||||||
Duration,
|
Duration,
|
||||||
duration as mk_duration,
|
duration as mk_duration,
|
||||||
)
|
)
|
||||||
|
|
@ -69,10 +69,7 @@ from .api import (
|
||||||
Contract,
|
Contract,
|
||||||
RequestError,
|
RequestError,
|
||||||
)
|
)
|
||||||
from ._util import (
|
from ._util import data_reset_hack
|
||||||
data_reset_hack,
|
|
||||||
is_current_time_in_range,
|
|
||||||
)
|
|
||||||
from .symbols import get_mkt_info
|
from .symbols import get_mkt_info
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
|
@ -187,8 +184,7 @@ async def open_history_client(
|
||||||
|
|
||||||
if (
|
if (
|
||||||
start_dt
|
start_dt
|
||||||
and
|
and start_dt.timestamp() == 0
|
||||||
start_dt.timestamp() == 0
|
|
||||||
):
|
):
|
||||||
await tractor.pause()
|
await tractor.pause()
|
||||||
|
|
||||||
|
|
@ -207,16 +203,14 @@ async def open_history_client(
|
||||||
):
|
):
|
||||||
count += 1
|
count += 1
|
||||||
mean += latency / count
|
mean += latency / count
|
||||||
log.debug(
|
print(
|
||||||
f'HISTORY FRAME QUERY LATENCY: {latency}\n'
|
f'HISTORY FRAME QUERY LATENCY: {latency}\n'
|
||||||
f'mean: {mean}'
|
f'mean: {mean}'
|
||||||
)
|
)
|
||||||
|
|
||||||
# could be trying to retreive bars over weekend
|
# could be trying to retreive bars over weekend
|
||||||
if out is None:
|
if out is None:
|
||||||
log.error(
|
log.error(f"Can't grab bars starting at {end_dt}!?!?")
|
||||||
f"No bars starting at {end_dt!r} !?!?"
|
|
||||||
)
|
|
||||||
if (
|
if (
|
||||||
end_dt
|
end_dt
|
||||||
and head_dt
|
and head_dt
|
||||||
|
|
@ -291,9 +285,8 @@ _pacing: str = (
|
||||||
|
|
||||||
async def wait_on_data_reset(
|
async def wait_on_data_reset(
|
||||||
proxy: MethodProxy,
|
proxy: MethodProxy,
|
||||||
|
|
||||||
reset_type: str = 'data',
|
reset_type: str = 'data',
|
||||||
timeout: float = 16,
|
timeout: float = 16, # float('inf'),
|
||||||
|
|
||||||
task_status: TaskStatus[
|
task_status: TaskStatus[
|
||||||
tuple[
|
tuple[
|
||||||
|
|
@ -302,47 +295,29 @@ async def wait_on_data_reset(
|
||||||
]
|
]
|
||||||
] = trio.TASK_STATUS_IGNORED,
|
] = trio.TASK_STATUS_IGNORED,
|
||||||
) -> bool:
|
) -> bool:
|
||||||
'''
|
|
||||||
Wait on a (global-ish) "data-farm" event to be emitted
|
|
||||||
by the IB api server.
|
|
||||||
|
|
||||||
Allows syncing to reconnect event-messages emitted on the API
|
# TODO: we might have to put a task lock around this
|
||||||
console, such as:
|
# method..
|
||||||
|
hist_ev = proxy.status_event(
|
||||||
- 'HMDS data farm connection is OK:ushmds'
|
|
||||||
- 'Market data farm is connecting:usfuture'
|
|
||||||
- 'Market data farm connection is OK:usfuture'
|
|
||||||
|
|
||||||
Deliver a `(cs, done: Event)` pair to the caller to support it
|
|
||||||
waiting or cancelling the associated "data-reset-request";
|
|
||||||
normally a manual data-reset-req is expected to be the cause and
|
|
||||||
thus trigger such events (such as our click-hack-magic from
|
|
||||||
`.ib._util`).
|
|
||||||
|
|
||||||
'''
|
|
||||||
# ?TODO, do we need a task-lock around this method?
|
|
||||||
#
|
|
||||||
# register for an API "status event" wrapped for `trio`-sync.
|
|
||||||
hist_ev: trio.Event = proxy.status_event(
|
|
||||||
'HMDS data farm connection is OK:ushmds'
|
'HMDS data farm connection is OK:ushmds'
|
||||||
)
|
)
|
||||||
#
|
|
||||||
# ^TODO: other event-messages we might want to support waiting-for
|
# TODO: other event messages we might want to try and
|
||||||
# but i wasn't able to get reliable..
|
# wait for but i wasn't able to get any of this
|
||||||
#
|
# reliable..
|
||||||
# reconnect_start = proxy.status_event(
|
# reconnect_start = proxy.status_event(
|
||||||
# 'Market data farm is connecting:usfuture'
|
# 'Market data farm is connecting:usfuture'
|
||||||
# )
|
# )
|
||||||
# live_ev = proxy.status_event(
|
# live_ev = proxy.status_event(
|
||||||
# 'Market data farm connection is OK:usfuture'
|
# 'Market data farm connection is OK:usfuture'
|
||||||
# )
|
# )
|
||||||
|
|
||||||
# try to wait on the reset event(s) to arrive, a timeout
|
# try to wait on the reset event(s) to arrive, a timeout
|
||||||
# will trigger a retry up to 6 times (for now).
|
# will trigger a retry up to 6 times (for now).
|
||||||
client: Client = proxy._aio_ns
|
client: Client = proxy._aio_ns
|
||||||
|
|
||||||
done = trio.Event()
|
done = trio.Event()
|
||||||
with trio.move_on_after(timeout) as cs:
|
with trio.move_on_after(timeout) as cs:
|
||||||
|
|
||||||
task_status.started((cs, done))
|
task_status.started((cs, done))
|
||||||
|
|
||||||
log.warning(
|
log.warning(
|
||||||
|
|
@ -421,9 +396,8 @@ async def get_bars(
|
||||||
bool, # timed out hint
|
bool, # timed out hint
|
||||||
]:
|
]:
|
||||||
'''
|
'''
|
||||||
Request-n-retrieve historical data frames from a `trio.Task`
|
Retrieve historical data from a ``trio``-side task using
|
||||||
using a `MethoProxy` to query the `asyncio`-side's
|
a ``MethoProxy``.
|
||||||
`.ib.api.Client` methods.
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
global _data_resetter_task, _failed_resets
|
global _data_resetter_task, _failed_resets
|
||||||
|
|
@ -633,10 +607,7 @@ async def get_bars(
|
||||||
# such that simultaneous symbol queries don't try data resettingn
|
# such that simultaneous symbol queries don't try data resettingn
|
||||||
# too fast..
|
# too fast..
|
||||||
unset_resetter: bool = False
|
unset_resetter: bool = False
|
||||||
async with (
|
async with trio.open_nursery() as tn:
|
||||||
tractor.trionics.collapse_eg(),
|
|
||||||
trio.open_nursery() as tn
|
|
||||||
):
|
|
||||||
|
|
||||||
# start history request that we allow
|
# start history request that we allow
|
||||||
# to run indefinitely until a result is acquired
|
# to run indefinitely until a result is acquired
|
||||||
|
|
@ -682,12 +653,14 @@ async def get_bars(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# per-actor cache of inter-eventloop-chans
|
|
||||||
_quote_streams: dict[str, trio.abc.ReceiveStream] = {}
|
_quote_streams: dict[str, trio.abc.ReceiveStream] = {}
|
||||||
|
|
||||||
|
|
||||||
async def _setup_quote_stream(
|
async def _setup_quote_stream(
|
||||||
chan: tractor.to_asyncio.LinkedTaskChannel,
|
|
||||||
|
from_trio: asyncio.Queue,
|
||||||
|
to_trio: trio.abc.SendChannel,
|
||||||
|
|
||||||
symbol: str,
|
symbol: str,
|
||||||
opts: tuple[int] = (
|
opts: tuple[int] = (
|
||||||
'375', # RT trade volume (excludes utrades)
|
'375', # RT trade volume (excludes utrades)
|
||||||
|
|
@ -705,13 +678,10 @@ async def _setup_quote_stream(
|
||||||
|
|
||||||
) -> trio.abc.ReceiveChannel:
|
) -> trio.abc.ReceiveChannel:
|
||||||
'''
|
'''
|
||||||
Stream L1 quotes via the `Ticker.updateEvent.connect(push)`
|
Stream a ticker using the std L1 api.
|
||||||
callback API by registering a `push` callback which simply
|
|
||||||
`chan.send_nowait()`s quote msgs back to the calling
|
|
||||||
parent-`trio.Task`-side.
|
|
||||||
|
|
||||||
NOTE, that this task-fn is run on the `asyncio.Task`-side ONLY
|
This task is ``asyncio``-side and must be called from
|
||||||
and is thus run via `tractor.to_asyncio.open_channel_from()`.
|
``tractor.to_asyncio.open_channel_from()``.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
global _quote_streams
|
global _quote_streams
|
||||||
|
|
@ -719,79 +689,39 @@ async def _setup_quote_stream(
|
||||||
async with load_aio_clients(
|
async with load_aio_clients(
|
||||||
disconnect_on_exit=False,
|
disconnect_on_exit=False,
|
||||||
) as accts2clients:
|
) as accts2clients:
|
||||||
|
|
||||||
# XXX since this is an `asyncio.Task`, we must use
|
|
||||||
# tractor.pause_from_sync()
|
|
||||||
|
|
||||||
caccount_name, client = get_preferred_data_client(accts2clients)
|
caccount_name, client = get_preferred_data_client(accts2clients)
|
||||||
contract = (
|
contract = contract or (await client.find_contract(symbol))
|
||||||
contract
|
to_trio.send_nowait(contract) # cuz why not
|
||||||
or
|
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
|
||||||
(await client.find_contract(symbol))
|
|
||||||
)
|
|
||||||
chan.started_nowait(contract) # cuz why not
|
|
||||||
ticker: Ticker = client.ib.reqMktData(
|
|
||||||
contract,
|
|
||||||
','.join(opts),
|
|
||||||
)
|
|
||||||
maybe_exc: BaseException|None = None
|
|
||||||
handler_tries: int = 0
|
|
||||||
aio_task: asyncio.Task = asyncio.current_task()
|
|
||||||
|
|
||||||
# ?TODO? this API is batch-wise and quite slow-af but,
|
# NOTE: it's batch-wise and slow af but I guess could
|
||||||
# - seems to be 5s updates?
|
# be good for backchecking? Seems to be every 5s maybe?
|
||||||
# - maybe we could use it for backchecking?
|
|
||||||
#
|
|
||||||
# ticker: Ticker = client.ib.reqTickByTickData(
|
# ticker: Ticker = client.ib.reqTickByTickData(
|
||||||
# contract, 'Last',
|
# contract, 'Last',
|
||||||
# )
|
# )
|
||||||
|
|
||||||
# define a very naive queue-pushing callback that relays
|
# # define a simple queue push routine that streams quote packets
|
||||||
# quote-packets directly the calling (parent) `trio.Task`.
|
# # to trio over the ``to_trio`` memory channel.
|
||||||
# Ensure on teardown we cancel the feed via their cancel API.
|
# to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
|
||||||
#
|
|
||||||
def teardown():
|
def teardown():
|
||||||
'''
|
|
||||||
Disconnect our `push`-er callback and cancel the data-feed
|
|
||||||
for `contract`.
|
|
||||||
|
|
||||||
'''
|
|
||||||
nonlocal maybe_exc
|
|
||||||
ticker.updateEvent.disconnect(push)
|
ticker.updateEvent.disconnect(push)
|
||||||
report: str = f'Disconnected mkt-data for {symbol!r} due to '
|
log.error(
|
||||||
if maybe_exc is not None:
|
f'Disconnected stream for `{symbol}`'
|
||||||
report += (
|
)
|
||||||
'error,\n'
|
|
||||||
f'{maybe_exc!r}\n'
|
|
||||||
)
|
|
||||||
log.error(report)
|
|
||||||
else:
|
|
||||||
report += (
|
|
||||||
'cancellation.\n'
|
|
||||||
)
|
|
||||||
log.cancel(report)
|
|
||||||
|
|
||||||
client.ib.cancelMktData(contract)
|
client.ib.cancelMktData(contract)
|
||||||
|
|
||||||
# decouple broadcast mem chan
|
# decouple broadcast mem chan
|
||||||
_quote_streams.pop(symbol, None)
|
_quote_streams.pop(symbol, None)
|
||||||
|
|
||||||
def push(
|
def push(t: Ticker) -> None:
|
||||||
t: Ticker,
|
"""
|
||||||
tries_before_raise: int = 6,
|
Push quotes to trio task.
|
||||||
) -> None:
|
|
||||||
'''
|
|
||||||
Push quotes verbatim to parent-side `trio.Task`.
|
|
||||||
|
|
||||||
'''
|
"""
|
||||||
nonlocal maybe_exc, handler_tries
|
# log.debug(t)
|
||||||
# log.debug(f'new IB quote: {t}\n')
|
|
||||||
try:
|
try:
|
||||||
chan.send_nowait(t)
|
to_trio.send_nowait(t)
|
||||||
|
|
||||||
# XXX TODO XXX replicate in `tractor` tests
|
|
||||||
# as per `CancelledError`-handler notes below!
|
|
||||||
# assert 0
|
|
||||||
except (
|
except (
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
|
|
||||||
|
|
@ -806,107 +736,38 @@ async def _setup_quote_stream(
|
||||||
# resulting in tracebacks spammed to console..
|
# resulting in tracebacks spammed to console..
|
||||||
# Manually do the dereg ourselves.
|
# Manually do the dereg ourselves.
|
||||||
teardown()
|
teardown()
|
||||||
|
|
||||||
# for slow debugging purposes to avoid clobbering prompt
|
|
||||||
# with log msgs
|
|
||||||
except trio.WouldBlock:
|
except trio.WouldBlock:
|
||||||
log.exception(
|
# log.warning(
|
||||||
f'Asyncio->Trio `chan.send_nowait()` blocked !?\n'
|
# f'channel is blocking symbol feed for {symbol}?'
|
||||||
f'\n'
|
# f'\n{to_trio.statistics}'
|
||||||
f'{chan._to_trio.statistics()}\n'
|
# )
|
||||||
)
|
pass
|
||||||
|
|
||||||
# ?TODO, handle re-connection attempts?
|
|
||||||
except BaseException as _berr:
|
|
||||||
berr = _berr
|
|
||||||
if handler_tries >= tries_before_raise:
|
|
||||||
# breakpoint()
|
|
||||||
maybe_exc = _berr
|
|
||||||
# task.set_exception(berr)
|
|
||||||
aio_task.cancel(msg=berr.args)
|
|
||||||
raise berr
|
|
||||||
else:
|
|
||||||
handler_tries += 1
|
|
||||||
|
|
||||||
log.exception(
|
|
||||||
f'Failed to push ticker quote !?\n'
|
|
||||||
f'handler_tries={handler_tries!r}\n'
|
|
||||||
f'ticker: {t!r}\n'
|
|
||||||
f'\n'
|
|
||||||
f'{chan._to_trio.statistics()}\n'
|
|
||||||
f'\n'
|
|
||||||
f'CAUSE: {berr}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
|
# except trio.WouldBlock:
|
||||||
|
# # for slow debugging purposes to avoid clobbering prompt
|
||||||
|
# # with log msgs
|
||||||
|
# pass
|
||||||
|
|
||||||
ticker.updateEvent.connect(push)
|
ticker.updateEvent.connect(push)
|
||||||
try:
|
try:
|
||||||
await asyncio.sleep(float('inf'))
|
await asyncio.sleep(float('inf'))
|
||||||
|
|
||||||
# XXX, for debug.. TODO? can we rm again?
|
|
||||||
#
|
|
||||||
# tractor.pause_from_sync()
|
|
||||||
# while True:
|
|
||||||
# await asyncio.sleep(1.6)
|
|
||||||
# if ticker.ticks:
|
|
||||||
# log.debug(
|
|
||||||
# f'ticker.ticks = \n'
|
|
||||||
# f'{ticker.ticks}\n'
|
|
||||||
# )
|
|
||||||
# else:
|
|
||||||
# log.warning(
|
|
||||||
# 'UHH no ticker.ticks ??'
|
|
||||||
# )
|
|
||||||
|
|
||||||
# XXX TODO XXX !?!?
|
|
||||||
# apparently **without this handler** and the subsequent
|
|
||||||
# re-raising of `maybe_exc from _taskc` cancelling the
|
|
||||||
# `aio_task` from the `push()`-callback will cause a very
|
|
||||||
# strange chain of exc raising that breaks alll sorts of
|
|
||||||
# downstream callers, tasks and remote-actor tasks!?
|
|
||||||
#
|
|
||||||
# -[ ] we need some lowlevel reproducting tests to replicate
|
|
||||||
# those worst-case scenarios in `tractor` core!!
|
|
||||||
# -[ ] likely we should factor-out the `tractor.to_asyncio`
|
|
||||||
# attempts at workarounds in `.translate_aio_errors()`
|
|
||||||
# for failed `asyncio.Task.set_exception()` to either
|
|
||||||
# call `aio_task.cancel()` and/or
|
|
||||||
# `aio_task._fut_waiter.set_exception()` to a re-useable
|
|
||||||
# toolset in something like a `.to_asyncio._utils`??
|
|
||||||
#
|
|
||||||
except asyncio.CancelledError as _taskc:
|
|
||||||
if maybe_exc is not None:
|
|
||||||
raise maybe_exc from _taskc
|
|
||||||
|
|
||||||
raise _taskc
|
|
||||||
|
|
||||||
except BaseException as _berr:
|
|
||||||
# stash any crash cause for reporting in `teardown()`
|
|
||||||
maybe_exc = _berr
|
|
||||||
raise _berr
|
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# always disconnect our `push()` and cancel the
|
|
||||||
# ib-"mkt-data-feed".
|
|
||||||
teardown()
|
teardown()
|
||||||
|
|
||||||
|
# return from_aio
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def open_aio_quote_stream(
|
async def open_aio_quote_stream(
|
||||||
|
|
||||||
symbol: str,
|
symbol: str,
|
||||||
contract: Contract|None = None,
|
contract: Contract | None = None,
|
||||||
|
|
||||||
) -> (
|
) -> (
|
||||||
trio.abc.Channel| # iface
|
trio.abc.Channel| # iface
|
||||||
tractor.to_asyncio.LinkedTaskChannel # actually
|
tractor.to_asyncio.LinkedTaskChannel # actually
|
||||||
):
|
):
|
||||||
'''
|
|
||||||
Open a real-time `Ticker` quote stream from an `asyncio.Task`
|
|
||||||
spawned via `tractor.to_asyncio.open_channel_from()`, deliver the
|
|
||||||
inter-event-loop channel to the `trio.Task` caller and cache it
|
|
||||||
globally for re-use.
|
|
||||||
|
|
||||||
'''
|
|
||||||
from tractor.trionics import broadcast_receiver
|
from tractor.trionics import broadcast_receiver
|
||||||
global _quote_streams
|
global _quote_streams
|
||||||
|
|
||||||
|
|
@ -932,10 +793,6 @@ async def open_aio_quote_stream(
|
||||||
|
|
||||||
assert contract
|
assert contract
|
||||||
|
|
||||||
# TODO? de-reg on teardown of last consumer task?
|
|
||||||
# -> why aren't we using `.trionics.maybe_open_context()`
|
|
||||||
# here again?? (we are in `open_client_proxies()` tho?)
|
|
||||||
#
|
|
||||||
# cache feed for later consumers
|
# cache feed for later consumers
|
||||||
_quote_streams[symbol] = from_aio
|
_quote_streams[symbol] = from_aio
|
||||||
|
|
||||||
|
|
@ -950,12 +807,7 @@ def normalize(
|
||||||
calc_price: bool = False
|
calc_price: bool = False
|
||||||
|
|
||||||
) -> dict:
|
) -> dict:
|
||||||
'''
|
|
||||||
Translate `ib_async`'s `Ticker.ticks` values to a `piker`
|
|
||||||
normalized `dict` form for transmit to downstream `.data` layer
|
|
||||||
consumers.
|
|
||||||
|
|
||||||
'''
|
|
||||||
# check for special contract types
|
# check for special contract types
|
||||||
con = ticker.contract
|
con = ticker.contract
|
||||||
fqme, calc_price = con2fqme(con)
|
fqme, calc_price = con2fqme(con)
|
||||||
|
|
@ -974,7 +826,7 @@ def normalize(
|
||||||
|
|
||||||
tbt = ticker.tickByTicks
|
tbt = ticker.tickByTicks
|
||||||
if tbt:
|
if tbt:
|
||||||
log.info(f'tickbyticks:\n {ticker.tickByTicks}')
|
print(f'tickbyticks:\n {ticker.tickByTicks}')
|
||||||
|
|
||||||
ticker.ticks = new_ticks
|
ticker.ticks = new_ticks
|
||||||
|
|
||||||
|
|
@ -1010,39 +862,27 @@ def normalize(
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
# ?TODO? feels like this task-fn could be factored to reduce some
|
|
||||||
# indentation levels?
|
|
||||||
# -[ ] the reconnect while loop on ib-gw "data farm connection.."s
|
|
||||||
# -[ ] everything embedded under the `async with aclosing(stream):`
|
|
||||||
# as the "meat" of the quote delivery once the connection is
|
|
||||||
# stable.
|
|
||||||
#
|
|
||||||
async def stream_quotes(
|
async def stream_quotes(
|
||||||
|
|
||||||
send_chan: trio.abc.SendChannel,
|
send_chan: trio.abc.SendChannel,
|
||||||
symbols: list[str],
|
symbols: list[str],
|
||||||
feed_is_live: trio.Event,
|
feed_is_live: trio.Event,
|
||||||
|
loglevel: str = None,
|
||||||
# TODO? we need to hook into the `ib_async` logger like
|
|
||||||
# we can with i3ipc from modden!
|
|
||||||
# loglevel: str|None = None,
|
|
||||||
|
|
||||||
# startup sync
|
# startup sync
|
||||||
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Stream `symbols[0]` quotes back via `send_chan`.
|
Stream symbol quotes.
|
||||||
|
|
||||||
The `feed_is_live: Event` is set to signal the caller that it can
|
This is a ``trio`` callable routine meant to be invoked
|
||||||
begin processing msgs from the mem-chan.
|
once the brokerd is up.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# TODO: support multiple subscriptions
|
# TODO: support multiple subscriptions
|
||||||
sym: str = symbols[0]
|
sym = symbols[0]
|
||||||
log.info(
|
log.info(f'request for real-time quotes: {sym}')
|
||||||
f'request for real-time quotes\n'
|
|
||||||
f'sym: {sym!r}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
init_msgs: list[FeedInit] = []
|
init_msgs: list[FeedInit] = []
|
||||||
|
|
||||||
|
|
@ -1051,52 +891,34 @@ async def stream_quotes(
|
||||||
details: ibis.ContractDetails
|
details: ibis.ContractDetails
|
||||||
async with (
|
async with (
|
||||||
open_data_client() as proxy,
|
open_data_client() as proxy,
|
||||||
|
# trio.open_nursery() as tn,
|
||||||
):
|
):
|
||||||
mkt, details = await get_mkt_info(
|
mkt, details = await get_mkt_info(
|
||||||
sym,
|
sym,
|
||||||
proxy=proxy, # passed to avoid implicit client load
|
proxy=proxy, # passed to avoid implicit client load
|
||||||
)
|
)
|
||||||
|
|
||||||
# is venue active rn?
|
|
||||||
venue_is_open: bool = any(
|
|
||||||
is_current_time_in_range(
|
|
||||||
start_dt=sesh.start,
|
|
||||||
end_dt=sesh.end,
|
|
||||||
)
|
|
||||||
for sesh in details.tradingSessions()
|
|
||||||
)
|
|
||||||
|
|
||||||
init_msg = FeedInit(mkt_info=mkt)
|
init_msg = FeedInit(mkt_info=mkt)
|
||||||
|
|
||||||
# NOTE, tell sampler (via config) to skip vlm summing for dst
|
|
||||||
# assets which provide no vlm data..
|
|
||||||
if mkt.dst.atype in {
|
if mkt.dst.atype in {
|
||||||
'fiat',
|
'fiat',
|
||||||
'index',
|
'index',
|
||||||
'commodity',
|
'commodity',
|
||||||
}:
|
}:
|
||||||
|
# tell sampler config that it shouldn't do vlm summing.
|
||||||
init_msg.shm_write_opts['sum_tick_vlm'] = False
|
init_msg.shm_write_opts['sum_tick_vlm'] = False
|
||||||
init_msg.shm_write_opts['has_vlm'] = False
|
init_msg.shm_write_opts['has_vlm'] = False
|
||||||
|
|
||||||
init_msgs.append(init_msg)
|
init_msgs.append(init_msg)
|
||||||
|
|
||||||
con: Contract = details.contract
|
con: Contract = details.contract
|
||||||
first_ticker: Ticker|None = None
|
first_ticker: Ticker | None = None
|
||||||
|
with trio.move_on_after(1):
|
||||||
timeout: float = 1.6
|
|
||||||
with trio.move_on_after(timeout) as quote_cs:
|
|
||||||
first_ticker: Ticker = await proxy.get_quote(
|
first_ticker: Ticker = await proxy.get_quote(
|
||||||
contract=con,
|
contract=con,
|
||||||
raise_on_timeout=False,
|
raise_on_timeout=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX should never happen with this ep right?
|
|
||||||
# but if so then, more then likely mkt is closed?
|
|
||||||
if quote_cs.cancelled_caught:
|
|
||||||
log.warning(
|
|
||||||
f'First quote req timed out after {timeout!r}s'
|
|
||||||
)
|
|
||||||
|
|
||||||
if first_ticker:
|
if first_ticker:
|
||||||
first_quote: dict = normalize(first_ticker)
|
first_quote: dict = normalize(first_ticker)
|
||||||
|
|
||||||
|
|
@ -1108,27 +930,28 @@ async def stream_quotes(
|
||||||
f'{pformat(first_quote)}\n'
|
f'{pformat(first_quote)}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX NOTE: whenever we're "outside regular trading hours"
|
# NOTE: it might be outside regular trading hours for
|
||||||
# (only relevant for assets coming from the "legacy markets"
|
# assets with "standard venue operating hours" so we
|
||||||
# space) so we basically (from an API/runtime-operational
|
# only "pretend the feed is live" when the dst asset
|
||||||
# perspective) "pretend the feed is live" even if it's
|
# type is NOT within the NON-NORMAL-venue set: aka not
|
||||||
# actually closed.
|
# commodities, forex or crypto currencies which CAN
|
||||||
#
|
# always return a NaN on a snap quote request during
|
||||||
# IOW, we signal to the effective caller (task) that the live
|
# normal venue hours. In the case of a closed venue
|
||||||
# feed is "already up" but really we're just indicating that
|
# (equitiies, futes, bonds etc.) we at least try to
|
||||||
# the OHLCV history can start being loaded immediately by the
|
# grab the OHLC history.
|
||||||
# `piker.data`/`.tsp` layers.
|
if (
|
||||||
#
|
first_ticker
|
||||||
# XXX, deats: the "pretend we're live" is just done by
|
and
|
||||||
# a `feed_is_live.set()` even though nothing is actually live
|
isnan(first_ticker.last)
|
||||||
# Bp
|
# SO, if the last quote price value is NaN we ONLY
|
||||||
if not venue_is_open:
|
# "pretend to do" `feed_is_live.set()` if it's a known
|
||||||
log.warning(
|
# dst asset venue with a lot of closed operating hours.
|
||||||
f'Venue is closed, unable to establish real-time feed.\n'
|
and mkt.dst.atype not in {
|
||||||
f'mkt: {mkt!r}\n'
|
'commodity',
|
||||||
f'\n'
|
'fiat',
|
||||||
f'first_ticker: {first_ticker}\n'
|
'crypto',
|
||||||
)
|
}
|
||||||
|
):
|
||||||
task_status.started((
|
task_status.started((
|
||||||
init_msgs,
|
init_msgs,
|
||||||
first_quote,
|
first_quote,
|
||||||
|
|
@ -1139,12 +962,10 @@ async def stream_quotes(
|
||||||
feed_is_live.set()
|
feed_is_live.set()
|
||||||
|
|
||||||
# block and let data history backfill code run.
|
# block and let data history backfill code run.
|
||||||
# XXX obvi given the venue is closed, we never expect feed
|
|
||||||
# to come up; a taskc should be the only way to
|
|
||||||
# terminate this task.
|
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
return # we never expect feed to come up?
|
||||||
|
|
||||||
# ?TODO, we could instead spawn a task that waits on a feed
|
# TODO: we should instead spawn a task that waits on a feed
|
||||||
# to start and let it wait indefinitely..instead of this
|
# to start and let it wait indefinitely..instead of this
|
||||||
# hard coded stuff.
|
# hard coded stuff.
|
||||||
# async def wait_for_first_quote():
|
# async def wait_for_first_quote():
|
||||||
|
|
@ -1166,27 +987,24 @@ async def stream_quotes(
|
||||||
'Rxed init quote:\n'
|
'Rxed init quote:\n'
|
||||||
f'{pformat(first_quote)}'
|
f'{pformat(first_quote)}'
|
||||||
)
|
)
|
||||||
cs: trio.CancelScope|None = None
|
cs: trio.CancelScope | None = None
|
||||||
startup: bool = True
|
startup: bool = True
|
||||||
iter_quotes: trio.abc.Channel
|
iter_quotes: trio.abc.Channel
|
||||||
while (
|
while (
|
||||||
startup
|
startup
|
||||||
or
|
or cs.cancel_called
|
||||||
cs.cancel_called
|
|
||||||
):
|
):
|
||||||
with trio.CancelScope() as cs:
|
with trio.CancelScope() as cs:
|
||||||
async with (
|
async with (
|
||||||
tractor.trionics.collapse_eg(),
|
|
||||||
trio.open_nursery() as tn,
|
trio.open_nursery() as tn,
|
||||||
open_aio_quote_stream(
|
open_aio_quote_stream(
|
||||||
symbol=sym,
|
symbol=sym,
|
||||||
contract=con,
|
contract=con,
|
||||||
) as iter_quotes,
|
) as iter_quotes,
|
||||||
):
|
):
|
||||||
# ?TODO? can we rm this - particularly for `ib_async`?
|
|
||||||
# ugh, clear ticks since we've consumed them
|
# ugh, clear ticks since we've consumed them
|
||||||
# (ahem, ib_insync is stateful trash)
|
# (ahem, ib_insync is stateful trash)
|
||||||
# first_ticker.ticks = []
|
first_ticker.ticks = []
|
||||||
|
|
||||||
# only on first entry at feed boot up
|
# only on first entry at feed boot up
|
||||||
if startup:
|
if startup:
|
||||||
|
|
@ -1200,8 +1018,8 @@ async def stream_quotes(
|
||||||
# data feed event.
|
# data feed event.
|
||||||
async def reset_on_feed():
|
async def reset_on_feed():
|
||||||
|
|
||||||
# ??TODO? this seems to be surpressed from the
|
# TODO: this seems to be surpressed from the
|
||||||
# traceback in `tractor`?
|
# traceback in ``tractor``?
|
||||||
# assert 0
|
# assert 0
|
||||||
|
|
||||||
rt_ev = proxy.status_event(
|
rt_ev = proxy.status_event(
|
||||||
|
|
@ -1247,7 +1065,7 @@ async def stream_quotes(
|
||||||
# ugh, clear ticks since we've
|
# ugh, clear ticks since we've
|
||||||
# consumed them (ahem, ib_insync is
|
# consumed them (ahem, ib_insync is
|
||||||
# truly stateful trash)
|
# truly stateful trash)
|
||||||
# ticker.ticks = []
|
ticker.ticks = []
|
||||||
|
|
||||||
# XXX: this works because we don't use
|
# XXX: this works because we don't use
|
||||||
# ``aclosing()`` above?
|
# ``aclosing()`` above?
|
||||||
|
|
@ -1269,12 +1087,8 @@ async def stream_quotes(
|
||||||
async for ticker in iter_quotes:
|
async for ticker in iter_quotes:
|
||||||
quote = normalize(ticker)
|
quote = normalize(ticker)
|
||||||
fqme: str = quote['fqme']
|
fqme: str = quote['fqme']
|
||||||
log.debug(
|
|
||||||
f'Sending quote\n'
|
|
||||||
f'{quote}'
|
|
||||||
)
|
|
||||||
await send_chan.send({fqme: quote})
|
await send_chan.send({fqme: quote})
|
||||||
|
|
||||||
# ugh, clear ticks since we've consumed them
|
# ugh, clear ticks since we've consumed them
|
||||||
# ticker.ticks = []
|
ticker.ticks = []
|
||||||
# last = time.time()
|
# last = time.time()
|
||||||
|
|
|
||||||
|
|
@ -740,7 +740,7 @@ async def sample_and_broadcast(
|
||||||
|
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Feed OVERRUN {sub_key}'
|
f'Feed OVERRUN {sub_key}'
|
||||||
'@{bus.brokername} -> \n'
|
f'@{bus.brokername} -> \n'
|
||||||
f'feed @ {chan.uid}\n'
|
f'feed @ {chan.uid}\n'
|
||||||
f'throttle = {throttle} Hz'
|
f'throttle = {throttle} Hz'
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -786,7 +786,6 @@ async def install_brokerd_search(
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def maybe_open_feed(
|
async def maybe_open_feed(
|
||||||
|
|
||||||
fqmes: list[str],
|
fqmes: list[str],
|
||||||
loglevel: str | None = None,
|
loglevel: str | None = None,
|
||||||
|
|
||||||
|
|
@ -840,13 +839,12 @@ async def maybe_open_feed(
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def open_feed(
|
async def open_feed(
|
||||||
|
|
||||||
fqmes: list[str],
|
fqmes: list[str],
|
||||||
|
|
||||||
loglevel: str | None = None,
|
loglevel: str|None = None,
|
||||||
allow_overruns: bool = True,
|
allow_overruns: bool = True,
|
||||||
start_stream: bool = True,
|
start_stream: bool = True,
|
||||||
tick_throttle: float | None = None, # Hz
|
tick_throttle: float|None = None, # Hz
|
||||||
|
|
||||||
allow_remote_ctl_ui: bool = False,
|
allow_remote_ctl_ui: bool = False,
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -36,10 +36,10 @@ from ._sharedmem import (
|
||||||
ShmArray,
|
ShmArray,
|
||||||
_Token,
|
_Token,
|
||||||
)
|
)
|
||||||
|
from piker.accounting import MktPair
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ..accounting import MktPair
|
from piker.data.feed import Feed
|
||||||
from .feed import Feed
|
|
||||||
|
|
||||||
|
|
||||||
class Flume(Struct):
|
class Flume(Struct):
|
||||||
|
|
@ -82,7 +82,7 @@ class Flume(Struct):
|
||||||
|
|
||||||
# TODO: do we need this really if we can pull the `Portal` from
|
# TODO: do we need this really if we can pull the `Portal` from
|
||||||
# ``tractor``'s internals?
|
# ``tractor``'s internals?
|
||||||
feed: Feed | None = None
|
feed: Feed|None = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def rt_shm(self) -> ShmArray:
|
def rt_shm(self) -> ShmArray:
|
||||||
|
|
|
||||||
|
|
@ -113,9 +113,9 @@ def validate_backend(
|
||||||
)
|
)
|
||||||
if ep is None:
|
if ep is None:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Provider backend {mod.name} is missing '
|
f'Provider backend {mod.name!r} is missing '
|
||||||
f'{daemon_name} support :(\n'
|
f'{daemon_name!r} support?\n'
|
||||||
f'The following endpoint is missing: {name}'
|
f'|_module endpoint-func missing: {name!r}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
inits: list[
|
inits: list[
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue