Compare commits

..

7 Commits

18 changed files with 3414 additions and 3157 deletions

View File

@ -26,13 +26,6 @@ ports = [
7497, # tws
]
# XXX: for a paper account the flex web query service
# is not supported so you have to manually download
# and XML report and put it in a location that can be
# accessed by the ``brokerd.ib`` backend code for parsing.
flex_token = '666666666666666666666666'
flex_trades_query_id = '666666' # live account
# when clients are being scanned this determines
# which clients are preferred to be used for data
# feeds based on the order of account names, if

View File

@ -35,7 +35,7 @@ log = get_logger(__name__)
_root_dname = 'pikerd'
_registry_addr = ('127.0.0.1', 6116)
_registry_addr = ('127.0.0.1', 1616)
_tractor_kwargs: dict[str, Any] = {
# use a different registry addr then tractor's default
'arbiter_addr': _registry_addr
@ -426,19 +426,9 @@ async def spawn_brokerd(
# ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery
modpath = brokermod.__name__
broker_enable = [modpath]
for submodname in getattr(
brokermod,
'__enable_modules__',
[],
):
subpath = f'{modpath}.{submodname}'
broker_enable.append(subpath)
portal = await _services.actor_n.start_actor(
dname,
enable_modules=_data_mods + broker_enable,
enable_modules=_data_mods + [brokermod.__name__],
loglevel=loglevel,
debug_mode=_services.debug_mode,
**tractor_kwargs

View File

@ -21,11 +21,15 @@ import os
from functools import partial
from operator import attrgetter
from operator import itemgetter
from typing import (
Any,
)
import click
import trio
import tractor
# from .._daemon import maybe_open_runtime
from ..cli import cli
from .. import watchlists as wl
from ..log import get_console_log, colorize_json, get_logger
@ -39,6 +43,142 @@ _config_dir = click.get_app_dir('piker')
_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json')
@cli.command()
@click.argument('broker', nargs=1, required=True)
@click.pass_obj
def brokercheck(config, broker):
'''
Test broker apis for completeness.
'''
OK = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
def print_ok(s: str, **kwargs):
print(OK + s + ENDC, **kwargs)
def print_error(s: str, **kwargs):
print(FAIL + s + ENDC, **kwargs)
async def run_method(
client,
meth_name: str,
**kwargs,
) -> Any:
print(f'checking client for method \'{meth_name}\'...', end='', flush=True)
method = getattr(client, meth_name, None)
assert method, f'.{meth_name} does not exist for {client}!'
print_ok('found!, running...', end='', flush=True)
result = await method(**kwargs)
print_ok(f'done! result: {type(result)}')
return result
async def run_test(broker_name: str):
brokermod = get_brokermod(broker_name)
total = 0
passed = 0
failed = 0
print('getting client...', end='', flush=True)
if not hasattr(brokermod, 'get_client'):
print_error('fail! no \'get_client\' context manager found.')
return
# extra_tractor_kwargs = getattr(
# brokermod,
# '_spawn_kwargs',
# {},
# )
# TODO: eventually avoid this hack for `ib` XD
import inspect
get_client = brokermod.get_client
if 'is_brokercheck' in inspect.signature(get_client).parameters:
kwargs = {'is_brokercheck': True}
else:
kwargs = {}
async with (
# TODO: in theory we can actually spawn a local `brokerd`
# and then try to make some basic feed queries?
# maybe_open_runtime(**extra_tractor_kwargs),
brokermod.get_client(**kwargs) as client,
):
print_ok('done! inside client context.')
# check for methods present on brokermod
method_list = [
# not required eps i'm pretty sure?
# 'backfill_bars',
# 'stream_messages',
'open_history_client',
'stream_quotes',
'open_symbol_search',
'trades_dialogue',
]
for method in method_list:
print(
f"checking brokermod for method '{method}'...",
end='',
flush=True,
)
if not hasattr(brokermod, method):
print_error(f"fail! method '{method}' not found.")
failed += 1
else:
print_ok('done!')
passed += 1
total += 1
# check for methods present con brokermod.Client and attempt
# to use them and gather output results.
symbol_info = getattr(client, 'symbol_info', None)
if symbol_info:
syms = await run_method(
client,
'symbol_info',
)
total += 1
if len(syms) == 0:
raise BaseException('Empty Symbol list?')
passed += 1
first_sym = tuple(syms.keys())[0]
method_list = [
('cache_symbols', {}),
('search_symbols', {'pattern': first_sym[:-1]}),
('bars', {'symbol': first_sym})
]
for method_name, method_kwargs in method_list:
try:
await run_method(client, method_name, **method_kwargs)
passed += 1
except AssertionError:
print_error(f'fail! method \'{method_name}\' not found.')
failed += 1
total += 1
print(f'total: {total}, passed: {passed}, failed: {failed}')
trio.run(run_test, broker)
@cli.command()
@click.option('--keys', '-k', multiple=True,
help='Return results only for these keys')
@ -193,6 +333,8 @@ def contracts(ctx, loglevel, broker, symbol, ids):
brokermod = get_brokermod(broker)
get_console_log(loglevel)
contracts = trio.run(partial(core.contracts, brokermod, symbol))
if not ids:
# just print out expiry dates which can be used with

View File

@ -0,0 +1,483 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Deribit backend
"""
import asyncio
from contextlib import asynccontextmanager as acm
from datetime import datetime
from typing import (
Any, Union, Optional, List,
AsyncGenerator, Callable,
)
import time
import trio
from trio_typing import TaskStatus
import pendulum
import asks
from fuzzywuzzy import process as fuzzy
import numpy as np
import tractor
from tractor import to_asyncio
from pydantic.dataclasses import dataclass
from pydantic import BaseModel
import wsproto
from .. import config
from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound
from ..log import get_logger, get_console_log
from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws, NoBsWs
from cryptofeed import FeedHandler
from cryptofeed.callback import (
L1BookCallback,
TradeCallback
)
from cryptofeed.defines import (
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
)
from cryptofeed.symbols import Symbol
_spawn_kwargs = {
'infect_asyncio': True,
}
def get_config() -> dict[str, Any]:
conf, path = config.load()
section = conf.get('deribit')
if section is None:
log.warning(f'No config section found for deribit in {path}')
return {}
conf['log'] = {}
conf['log']['filename'] = 'feedhandler.log'
conf['log']['level'] = 'WARNING'
return conf
log = get_logger(__name__)
_url = 'https://www.deribit.com'
# Broker specific ohlc schema (rest)
_ohlc_dtype = [
('index', int),
('time', int),
('open', float),
('high', float),
('low', float),
('close', float),
('volume', float),
('bar_wap', float), # will be zeroed by sampler if not filled
]
class KLinesResult(BaseModel):
close: List[float]
cost: List[float]
high: List[float]
low: List[float]
open: List[float]
status: str
ticks: List[int]
volume: List[float]
class KLines(BaseModel):
jsonrpc: str = '2.0'
result: KLinesResult
usIn: int
usOut: int
usDiff: int
testnet: bool
# convert datetime obj timestamp to unixtime in milliseconds
def deribit_timestamp(when):
return int((when.timestamp() * 1000) + (when.microsecond / 1000))
class Client:
def __init__(self) -> None:
self._sesh = asks.Session(connections=4)
self._sesh.base_location = _url
self._pairs: dict[str, Any] = {}
async def _api(
self,
method: str,
params: dict,
) -> dict[str, Any]:
resp = await self._sesh.get(
path=f'/api/v2/public/{method}',
params=params,
timeout=float('inf')
)
return resproc(resp, log)
async def symbol_info(
self,
instrument: Optional[str] = None,
currency: str = 'btc', # BTC, ETH, SOL, USDC
kind: str = 'option',
expired: bool = False
) -> dict[str, Any]:
'''Get symbol info for the exchange.
'''
# TODO: we can load from our self._pairs cache
# on repeat calls...
# will retrieve all symbols by default
params = {
'currency': currency.upper(),
'kind': kind,
'expired': str(expired).lower()
}
resp = await self._api(
'get_instruments', params=params)
results = resp['result']
instruments = {
item['instrument_name']: item for item in results}
if instrument is not None:
return instruments[instrument]
else:
return instruments
async def cache_symbols(
self,
) -> dict:
if not self._pairs:
self._pairs = await self.symbol_info()
return self._pairs
async def search_symbols(
self,
pattern: str,
limit: int = None,
) -> dict[str, Any]:
if self._pairs is not None:
data = self._pairs
else:
data = await self.symbol_info()
matches = fuzzy.extractBests(
pattern,
data,
score_cutoff=50,
)
# repack in dict form
return {item[0]['instrument_name']: item[0]
for item in matches}
async def bars(
self,
symbol: str,
start_dt: Optional[datetime] = None,
end_dt: Optional[datetime] = None,
limit: int = 1000,
as_np: bool = True,
) -> dict:
instrument = symbol
if end_dt is None:
end_dt = pendulum.now('UTC')
if start_dt is None:
start_dt = end_dt.start_of(
'minute').subtract(minutes=limit)
start_time = deribit_timestamp(start_dt)
end_time = deribit_timestamp(end_dt)
# https://docs.deribit.com/#public-get_tradingview_chart_data
response = await self._api(
'get_tradingview_chart_data',
params={
'instrument_name': instrument.upper(),
'start_timestamp': start_time,
'end_timestamp': end_time,
'resolution': '1'
}
)
klines = KLines(**response)
result = klines.result
new_bars = []
for i in range(len(result.close)):
_open = result.open[i]
high = result.high[i]
low = result.low[i]
close = result.close[i]
volume = result.volume[i]
row = [
(start_time + (i * (60 * 1000))) / 1000.0, # time
result.open[i],
result.high[i],
result.low[i],
result.close[i],
result.volume[i]
]
new_bars.append((i,) + tuple(row))
array = np.array(
[i, ], dtype=_ohlc_dtype) if as_np else klines
return array
@acm
async def get_client() -> Client:
client = Client()
await client.cache_symbols()
yield client
# inside here we are in an asyncio context
async def open_aio_cryptofeed_relay(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
instruments: List[str] = []
) -> None:
conf = get_config()
def format_sym(name: str) -> str:
base, expiry_date, strike_price, option_type = tuple(
name.upper().split('-'))
quote = base
if option_type == 'P':
option_type = PUT
elif option_type == 'C':
option_type = CALL
else:
raise BaseException("Instrument name must end in 'c' for calls or 'p' for puts")
return Symbol(
base, quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date.upper()).normalized
instruments = [format_sym(i) for i in instruments]
async def trade_cb(data: dict, receipt_timestamp):
breakpoint()
# to_trio.send_nowait(('trade', {
# 'symbol': data.symbol.lower(),
# 'last': data.
# 'broker_ts': time.time(),
# 'data': data.to_dict(),
# 'receipt': receipt_timestamp}))
async def l1_book_cb(data: dict, receipt_timestamp):
to_trio.send_nowait(('l1', {
'symbol': data.symbol.lower(),
'ticks': [
{'type': 'bid',
'price': float(data.bid_price), 'size': float(data.bid_size)},
{'type': 'bsize',
'price': float(data.bid_price), 'size': float(data.bid_size)},
{'type': 'ask',
'price': float(data.ask_price), 'size': float(data.ask_size)},
{'type': 'asize',
'price': float(data.ask_price), 'size': float(data.ask_size)}
]}))
fh = FeedHandler(config=conf)
fh.run(start_loop=False)
fh.add_feed(
DERIBIT,
channels=[L1_BOOK, TRADES],
symbols=instruments,
callbacks={
L1_BOOK: L1BookCallback(l1_book_cb),
TRADES: TradeCallback(trade_cb)
})
# sync with trio
to_trio.send_nowait(None)
await from_trio.get()
async def open_cryptofeeds(
instruments: List[str],
to_chart: trio.abc.SendChannel,
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
):
async with to_asyncio.open_channel_from(
open_aio_cryptofeed_relay,
instruments=instruments,
) as (first, chan):
assert first is None
await chan.send(None)
async with chan.subscribe() as msg_stream:
task_status.started()
async for msg in msg_stream:
await to_chart.send(msg)
@acm
async def open_history_client(
instrument: str,
) -> tuple[Callable, int]:
# TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client:
async def get_ohlc(
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
) -> tuple[
np.ndarray,
datetime, # start
datetime, # end
]:
array = await client.bars(
instrument,
start_dt=start_dt,
end_dt=end_dt,
)
start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time'])
return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 3, 'rate': 3}
async def backfill_bars(
symbol: str,
shm: ShmArray, # type: ignore # noqa
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Fill historical bars into shared mem / storage afap.
"""
instrument = symbol
with trio.CancelScope() as cs:
async with open_cached_client('deribit') as client:
bars = await client.bars(instrument)
shm.push(bars)
task_status.started(cs)
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
loglevel: str = None,
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
sym = symbols[0]
to_chart, from_feed = trio.open_memory_channel(1)
async with (
open_cached_client('deribit') as client,
send_chan as send_chan,
trio.open_nursery() as n
):
await n.start(
open_cryptofeeds, symbols, to_chart)
init_msgs = {
# pass back token, and bool, signalling if we're the writer
# and that history has been written
sym: {
'symbol_info': {},
'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
},
}
# keep client cached for real-time section
cache = await client.cache_symbols()
async with from_feed:
typ, quote = await anext(from_feed)
while typ != 'trade':
typ, quote = await anext(from_feed)
task_status.started((init_msgs, quote))
async for typ, msg in from_feed:
topic = msg['symbol']
await send_chan.send({topic: msg})
@tractor.context
async def open_symbol_search(
ctx: tractor.Context,
) -> Client:
async with open_cached_client('deribit') as client:
# load all symbols locally for fast search
cache = await client.cache_symbols()
await ctx.started()
async with ctx.open_stream() as stream:
async for pattern in stream:
# results = await client.symbol_info(sym=pattern.upper())
matches = fuzzy.extractBests(
pattern,
cache,
score_cutoff=50,
)
# repack in dict form
await stream.send(
{item[0]['instrument_name']: item[0]
for item in matches}
)

2621
piker/brokers/ib.py 100644

File diff suppressed because it is too large Load Diff

View File

@ -1,67 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Interactive Brokers API backend.
Sub-modules within break into the core functionalities:
- ``broker.py`` part for orders / trading endpoints
- ``data.py`` for real-time data feed endpoints
- ``client.py`` for the core API machinery which is ``trio``-ized
wrapping around ``ib_insync``.
- ``report.py`` for the hackery to build manual pp calcs
to avoid ib's absolute bullshit FIFO style position
tracking..
"""
from .api import (
get_client,
)
from .feed import (
open_history_client,
open_symbol_search,
stream_quotes,
)
from .broker import trades_dialogue
__all__ = [
'get_client',
'trades_dialogue',
'open_history_client',
'open_symbol_search',
'stream_quotes',
]
# tractor RPC enable arg
__enable_modules__: list[str] = [
'api',
'feed',
'broker',
]
# passed to ``tractor.ActorNursery.start_actor()``
_spawn_kwargs = {
'infect_asyncio': True,
}
# annotation to let backend agnostic code
# know if ``brokerd`` should be spawned with
# ``tractor``'s aio mode.
_infect_asyncio: bool = True

File diff suppressed because it is too large Load Diff

View File

@ -1,590 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Order and trades endpoints for use with ``piker``'s EMS.
"""
from __future__ import annotations
from dataclasses import asdict
from functools import partial
from pprint import pformat
import time
from typing import (
Any,
Optional,
AsyncIterator,
)
import trio
from trio_typing import TaskStatus
import tractor
from ib_insync.contract import (
Contract,
Option,
)
from ib_insync.order import (
Trade,
OrderStatus,
)
from ib_insync.objects import (
Fill,
Execution,
)
from ib_insync.objects import Position
from piker import config
from piker.log import get_console_log
from piker.clearing._messages import (
BrokerdOrder,
BrokerdOrderAck,
BrokerdStatus,
BrokerdPosition,
BrokerdCancel,
BrokerdFill,
BrokerdError,
)
from .api import (
_accounts2clients,
_adhoc_futes_set,
log,
get_config,
open_client_proxies,
Client,
)
def pack_position(
pos: Position
) -> dict[str, Any]:
con = pos.contract
if isinstance(con, Option):
# TODO: option symbol parsing and sane display:
symbol = con.localSymbol.replace(' ', '')
else:
# TODO: lookup fqsn even for derivs.
symbol = con.symbol.lower()
exch = (con.primaryExchange or con.exchange).lower()
symkey = '.'.join((symbol, exch))
if not exch:
# attempt to lookup the symbol from our
# hacked set..
for sym in _adhoc_futes_set:
if symbol in sym:
symkey = sym
break
expiry = con.lastTradeDateOrContractMonth
if expiry:
symkey += f'.{expiry}'
# TODO: options contracts into a sane format..
return BrokerdPosition(
broker='ib',
account=pos.account,
symbol=symkey,
currency=con.currency,
size=float(pos.position),
avg_price=float(pos.avgCost) / float(con.multiplier or 1.0),
)
async def handle_order_requests(
ems_order_stream: tractor.MsgStream,
accounts_def: dict[str, str],
) -> None:
request_msg: dict
async for request_msg in ems_order_stream:
log.info(f'Received order request {request_msg}')
action = request_msg['action']
account = request_msg['account']
acct_number = accounts_def.get(account)
if not acct_number:
log.error(
f'An IB account number for name {account} is not found?\n'
'Make sure you have all TWS and GW instances running.'
)
await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'No account found: `{account}` ?',
).dict())
continue
client = _accounts2clients.get(account)
if not client:
log.error(
f'An IB client for account name {account} is not found.\n'
'Make sure you have all TWS and GW instances running.'
)
await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'No api client loaded for account: `{account}` ?',
).dict())
continue
if action in {'buy', 'sell'}:
# validate
order = BrokerdOrder(**request_msg)
# call our client api to submit the order
reqid = client.submit_limit(
oid=order.oid,
symbol=order.symbol,
price=order.price,
action=order.action,
size=order.size,
account=acct_number,
# XXX: by default 0 tells ``ib_insync`` methods that
# there is no existing order so ask the client to create
# a new one (which it seems to do by allocating an int
# counter - collision prone..)
reqid=order.reqid,
)
if reqid is None:
await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason='Order already active?',
).dict())
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
# ems order request id
oid=order.oid,
# broker specific request id
reqid=reqid,
time_ns=time.time_ns(),
account=account,
).dict()
)
elif action == 'cancel':
msg = BrokerdCancel(**request_msg)
client.submit_cancel(reqid=msg.reqid)
else:
log.error(f'Unknown order command: {request_msg}')
async def recv_trade_updates(
client: Client,
to_trio: trio.abc.SendChannel,
) -> None:
"""Stream a ticker using the std L1 api.
"""
client.inline_errors(to_trio)
# sync with trio task
to_trio.send_nowait(None)
def push_tradesies(eventkit_obj, obj, fill=None):
"""Push events to trio task.
"""
if fill is not None:
# execution details event
item = ('fill', (obj, fill))
elif eventkit_obj.name() == 'positionEvent':
item = ('position', obj)
else:
item = ('status', obj)
log.info(f'eventkit event ->\n{pformat(item)}')
try:
to_trio.send_nowait(item)
except trio.BrokenResourceError:
log.exception(f'Disconnected from {eventkit_obj} updates')
eventkit_obj.disconnect(push_tradesies)
# hook up to the weird eventkit object - event stream api
for ev_name in [
'orderStatusEvent', # all order updates
'execDetailsEvent', # all "fill" updates
'positionEvent', # avg price updates per symbol per account
# 'commissionReportEvent',
# XXX: ugh, it is a separate event from IB and it's
# emitted as follows:
# self.ib.commissionReportEvent.emit(trade, fill, report)
# XXX: not sure yet if we need these
# 'updatePortfolioEvent',
# XXX: these all seem to be weird ib_insync intrernal
# events that we probably don't care that much about
# given the internal design is wonky af..
# 'newOrderEvent',
# 'orderModifyEvent',
# 'cancelOrderEvent',
# 'openOrderEvent',
]:
eventkit_obj = getattr(client.ib, ev_name)
handler = partial(push_tradesies, eventkit_obj)
eventkit_obj.connect(handler)
# let the engine run and stream
await client.ib.disconnectedEvent
@tractor.context
async def trades_dialogue(
ctx: tractor.Context,
loglevel: str = None,
) -> AsyncIterator[dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
accounts_def = config.load_accounts(['ib'])
global _client_cache
# deliver positions to subscriber before anything else
all_positions = []
accounts = set()
clients: list[tuple[Client, trio.MemoryReceiveChannel]] = []
async with (
trio.open_nursery() as nurse,
open_client_proxies() as (proxies, aioclients),
):
for account, proxy in proxies.items():
client = aioclients[account]
async def open_stream(
task_status: TaskStatus[
trio.abc.ReceiveChannel
] = trio.TASK_STATUS_IGNORED,
):
# each api client has a unique event stream
async with tractor.to_asyncio.open_channel_from(
recv_trade_updates,
client=client,
) as (first, trade_event_stream):
task_status.started(trade_event_stream)
await trio.sleep_forever()
trade_event_stream = await nurse.start(open_stream)
clients.append((client, trade_event_stream))
assert account in accounts_def
accounts.add(account)
for client in aioclients.values():
for pos in client.positions():
msg = pack_position(pos)
msg.account = accounts_def.inverse[msg.account]
assert msg.account in accounts, (
f'Position for unknown account: {msg.account}')
all_positions.append(msg.dict())
trades: list[dict] = []
for proxy in proxies.values():
trades.append(await proxy.trades())
log.info(f'Loaded {len(trades)} from this session')
# TODO: write trades to local ``trades.toml``
# - use above per-session trades data and write to local file
# - get the "flex reports" working and pull historical data and
# also save locally.
await ctx.started((
all_positions,
tuple(name for name in accounts_def if name in accounts),
))
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
):
# start order request handler **before** local trades event loop
n.start_soon(handle_order_requests, ems_stream, accounts_def)
# allocate event relay tasks for each client connection
for client, stream in clients:
n.start_soon(
deliver_trade_events,
stream,
ems_stream,
accounts_def
)
# block until cancelled
await trio.sleep_forever()
async def deliver_trade_events(
trade_event_stream: trio.MemoryReceiveChannel,
ems_stream: tractor.MsgStream,
accounts_def: dict[str, str],
) -> None:
'''Format and relay all trade events for a given client to the EMS.
'''
action_map = {'BOT': 'buy', 'SLD': 'sell'}
# TODO: for some reason we can receive a ``None`` here when the
# ib-gw goes down? Not sure exactly how that's happening looking
# at the eventkit code above but we should probably handle it...
async for event_name, item in trade_event_stream:
log.info(f'ib sending {event_name}:\n{pformat(item)}')
# TODO: templating the ib statuses in comparison with other
# brokers is likely the way to go:
# https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313
# short list:
# - PendingSubmit
# - PendingCancel
# - PreSubmitted (simulated orders)
# - ApiCancelled (cancelled by client before submission
# to routing)
# - Cancelled
# - Filled
# - Inactive (reject or cancelled but not by trader)
# XXX: here's some other sucky cases from the api
# - short-sale but securities haven't been located, in this
# case we should probably keep the order in some kind of
# weird state or cancel it outright?
# status='PendingSubmit', message=''),
# status='Cancelled', message='Error 404,
# reqId 1550: Order held while securities are located.'),
# status='PreSubmitted', message='')],
if event_name == 'status':
# XXX: begin normalization of nonsense ib_insync internal
# object-state tracking representations...
# unwrap needed data from ib_insync internal types
trade: Trade = item
status: OrderStatus = trade.orderStatus
# skip duplicate filled updates - we get the deats
# from the execution details event
msg = BrokerdStatus(
reqid=trade.order.orderId,
time_ns=time.time_ns(), # cuz why not
account=accounts_def.inverse[trade.order.account],
# everyone doin camel case..
status=status.status.lower(), # force lower case
filled=status.filled,
reason=status.whyHeld,
# this seems to not be necessarily up to date in the
# execDetails event.. so we have to send it here I guess?
remaining=status.remaining,
broker_details={'name': 'ib'},
)
elif event_name == 'fill':
# for wtv reason this is a separate event type
# from IB, not sure why it's needed other then for extra
# complexity and over-engineering :eyeroll:.
# we may just end up dropping these events (or
# translating them to ``Status`` msgs) if we can
# show the equivalent status events are no more latent.
# unpack ib_insync types
# pep-0526 style:
# https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations
trade: Trade
fill: Fill
trade, fill = item
execu: Execution = fill.execution
# TODO: normalize out commissions details?
details = {
'contract': asdict(fill.contract),
'execution': asdict(fill.execution),
'commissions': asdict(fill.commissionReport),
'broker_time': execu.time, # supposedly server fill time
'name': 'ib',
}
msg = BrokerdFill(
# should match the value returned from `.submit_limit()`
reqid=execu.orderId,
time_ns=time.time_ns(), # cuz why not
action=action_map[execu.side],
size=execu.shares,
price=execu.price,
broker_details=details,
# XXX: required by order mode currently
broker_time=details['broker_time'],
)
elif event_name == 'error':
err: dict = item
# f$#$% gawd dammit insync..
con = err['contract']
if isinstance(con, Contract):
err['contract'] = asdict(con)
if err['reqid'] == -1:
log.error(f'TWS external order error:\n{pformat(err)}')
# TODO: what schema for this msg if we're going to make it
# portable across all backends?
# msg = BrokerdError(**err)
continue
elif event_name == 'position':
msg = pack_position(item)
msg.account = accounts_def.inverse[msg.account]
elif event_name == 'event':
# it's either a general system status event or an external
# trade event?
log.info(f"TWS system status: \n{pformat(item)}")
# TODO: support this again but needs parsing at the callback
# level...
# reqid = item.get('reqid', 0)
# if getattr(msg, 'reqid', 0) < -1:
# log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
continue
# msg.reqid = 'tws-' + str(-1 * reqid)
# mark msg as from "external system"
# TODO: probably something better then this.. and start
# considering multiplayer/group trades tracking
# msg.broker_details['external_src'] = 'tws'
# XXX: we always serialize to a dict for msgpack
# translations, ideally we can move to an msgspec (or other)
# encoder # that can be enabled in ``tractor`` ahead of
# time so we can pass through the message types directly.
await ems_stream.send(msg.dict())
def load_flex_trades(
path: Optional[str] = None,
) -> dict[str, str]:
from pprint import pprint
from ib_insync import flexreport, util
conf = get_config()
if not path:
# load ``brokers.toml`` and try to get the flex
# token and query id that must be previously defined
# by the user.
token = conf.get('flex_token')
if not token:
raise ValueError(
'You must specify a ``flex_token`` field in your'
'`brokers.toml` in order load your trade log, see our'
'intructions for how to set this up here:\n'
'PUT LINK HERE!'
)
qid = conf['flex_trades_query_id']
# TODO: hack this into our logging
# system like we do with the API client..
util.logToConsole()
# TODO: rewrite the query part of this with async..httpx?
report = flexreport.FlexReport(
token=token,
queryId=qid,
)
else:
# XXX: another project we could potentially look at,
# https://pypi.org/project/ibflex/
report = flexreport.FlexReport(path=path)
trade_entries = report.extract('Trade')
trades = {
# XXX: LOL apparently ``toml`` has a bug
# where a section key error will show up in the write
# if you leave this as an ``int``?
str(t.__dict__['tradeID']): t.__dict__
for t in trade_entries
}
ln = len(trades)
log.info(f'Loaded {ln} trades from flex query')
trades_by_account = {}
for tid, trade in trades.items():
trades_by_account.setdefault(
# oddly for some so-called "BookTrade" entries
# this field seems to be blank, no cuckin clue.
# trade['ibExecID']
str(trade['accountId']), {}
)[tid] = trade
section = {'ib': trades_by_account}
pprint(section)
# TODO: load the config first and append in
# the new trades loaded here..
try:
config.write(section, 'trades')
except KeyError:
import pdbpp; pdbpp.set_trace() # noqa
if __name__ == '__main__':
load_flex_trades()

View File

@ -1,938 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Data feed endpoints pre-wrapped and ready for use with ``tractor``/``trio``.
"""
from __future__ import annotations
import asyncio
from contextlib import asynccontextmanager as acm
from dataclasses import asdict
from datetime import datetime
from math import isnan
import time
from typing import (
Callable,
Optional,
Awaitable,
)
from async_generator import aclosing
from fuzzywuzzy import process as fuzzy
import numpy as np
import pendulum
import tractor
import trio
from trio_typing import TaskStatus
from piker.data._sharedmem import ShmArray
from .._util import SymbolNotFound, NoData
from .api import (
_adhoc_futes_set,
log,
load_aio_clients,
ibis,
MethodProxy,
open_client_proxies,
get_preferred_data_client,
Ticker,
RequestError,
Contract,
)
# https://interactivebrokers.github.io/tws-api/tick_types.html
tick_types = {
77: 'trade',
# a "utrade" aka an off exchange "unreportable" (dark) vlm:
# https://interactivebrokers.github.io/tws-api/tick_types.html#rt_volume
48: 'dark_trade',
# standard L1 ticks
0: 'bsize',
1: 'bid',
2: 'ask',
3: 'asize',
4: 'last',
5: 'size',
8: 'volume',
# ``ib_insync`` already packs these into
# quotes under the following fields.
# 55: 'trades_per_min', # `'tradeRate'`
# 56: 'vlm_per_min', # `'volumeRate'`
# 89: 'shortable', # `'shortableShares'`
}
@acm
async def open_data_client() -> MethodProxy:
'''
Open the first found preferred "data client" as defined in the
user's ``brokers.toml`` in the ``ib.prefer_data_account`` variable
and deliver that client wrapped in a ``MethodProxy``.
'''
async with (
open_client_proxies() as (proxies, clients),
):
account_name, client = get_preferred_data_client(clients)
proxy = proxies.get(f'ib.{account_name}')
if not proxy:
raise ValueError(
f'No preferred data client could be found for {account_name}!'
)
yield proxy
@acm
async def open_history_client(
symbol: str,
) -> tuple[Callable, int]:
'''
History retreival endpoint - delivers a historical frame callble
that takes in ``pendulum.datetime`` and returns ``numpy`` arrays.
'''
async with open_data_client() as proxy:
async def get_hist(
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
) -> tuple[np.ndarray, str]:
out, fails = await get_bars(proxy, symbol, end_dt=end_dt)
# TODO: add logic here to handle tradable hours and only grab
# valid bars in the range
if out is None:
# could be trying to retreive bars over weekend
log.error(f"Can't grab bars starting at {end_dt}!?!?")
raise NoData(
f'{end_dt}',
frame_size=2000,
)
bars, bars_array, first_dt, last_dt = out
# volume cleaning since there's -ve entries,
# wood luv to know what crookery that is..
vlm = bars_array['volume']
vlm[vlm < 0] = 0
return bars_array, first_dt, last_dt
# TODO: it seems like we can do async queries for ohlc
# but getting the order right still isn't working and I'm not
# quite sure why.. needs some tinkering and probably
# a lookthrough of the ``ib_insync`` machinery, for eg. maybe
# we have to do the batch queries on the `asyncio` side?
yield get_hist, {'erlangs': 1, 'rate': 6}
_pacing: str = (
'Historical Market Data Service error '
'message:Historical data request pacing violation'
)
async def get_bars(
proxy: MethodProxy,
fqsn: str,
# blank to start which tells ib to look up the latest datum
end_dt: str = '',
) -> (dict, np.ndarray):
'''
Retrieve historical data from a ``trio``-side task using
a ``MethoProxy``.
'''
fails = 0
bars: Optional[list] = None
first_dt: datetime = None
last_dt: datetime = None
if end_dt:
last_dt = pendulum.from_timestamp(end_dt.timestamp())
for _ in range(10):
try:
out = await proxy.bars(
fqsn=fqsn,
end_dt=end_dt,
)
if out:
bars, bars_array = out
else:
await tractor.breakpoint()
if bars_array is None:
raise SymbolNotFound(fqsn)
first_dt = pendulum.from_timestamp(
bars[0].date.timestamp())
last_dt = pendulum.from_timestamp(
bars[-1].date.timestamp())
time = bars_array['time']
assert time[-1] == last_dt.timestamp()
assert time[0] == first_dt.timestamp()
log.info(
f'{len(bars)} bars retreived for {first_dt} -> {last_dt}'
)
return (bars, bars_array, first_dt, last_dt), fails
except RequestError as err:
msg = err.message
# why do we always need to rebind this?
# _err = err
if 'No market data permissions for' in msg:
# TODO: signalling for no permissions searches
raise NoData(
f'Symbol: {fqsn}',
)
elif (
err.code == 162
and 'HMDS query returned no data' in err.message
):
# XXX: this is now done in the storage mgmt layer
# and we shouldn't implicitly decrement the frame dt
# index since the upper layer may be doing so
# concurrently and we don't want to be delivering frames
# that weren't asked for.
log.warning(
f'NO DATA found ending @ {end_dt}\n'
)
# try to decrement start point and look further back
# end_dt = last_dt = last_dt.subtract(seconds=2000)
raise NoData(
f'Symbol: {fqsn}',
frame_size=2000,
)
elif _pacing in msg:
log.warning(
'History throttle rate reached!\n'
'Resetting farms with `ctrl-alt-f` hack\n'
)
# TODO: we might have to put a task lock around this
# method..
hist_ev = proxy.status_event(
'HMDS data farm connection is OK:ushmds'
)
# XXX: other event messages we might want to try and
# wait for but i wasn't able to get any of this
# reliable..
# reconnect_start = proxy.status_event(
# 'Market data farm is connecting:usfuture'
# )
# live_ev = proxy.status_event(
# 'Market data farm connection is OK:usfuture'
# )
# try to wait on the reset event(s) to arrive, a timeout
# will trigger a retry up to 6 times (for now).
tries: int = 2
timeout: float = 10
# try 3 time with a data reset then fail over to
# a connection reset.
for i in range(1, tries):
log.warning('Sending DATA RESET request')
await data_reset_hack(reset_type='data')
with trio.move_on_after(timeout) as cs:
for name, ev in [
# TODO: not sure if waiting on other events
# is all that useful here or not. in theory
# you could wait on one of the ones above
# first to verify the reset request was
# sent?
('history', hist_ev),
]:
await ev.wait()
log.info(f"{name} DATA RESET")
break
if cs.cancelled_caught:
fails += 1
log.warning(
f'Data reset {name} timeout, retrying {i}.'
)
continue
else:
log.warning('Sending CONNECTION RESET')
await data_reset_hack(reset_type='connection')
with trio.move_on_after(timeout) as cs:
for name, ev in [
# TODO: not sure if waiting on other events
# is all that useful here or not. in theory
# you could wait on one of the ones above
# first to verify the reset request was
# sent?
('history', hist_ev),
]:
await ev.wait()
log.info(f"{name} DATA RESET")
if cs.cancelled_caught:
fails += 1
log.warning('Data CONNECTION RESET timeout!?')
else:
raise
return None, None
# else: # throttle wasn't fixed so error out immediately
# raise _err
async def backfill_bars(
fqsn: str,
shm: ShmArray, # type: ignore # noqa
# TODO: we want to avoid overrunning the underlying shm array buffer
# and we should probably calc the number of calls to make depending
# on that until we have the `marketstore` daemon in place in which
# case the shm size will be driven by user config and available sys
# memory.
count: int = 16,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Fill historical bars into shared mem / storage afap.
TODO: avoid pacing constraints:
https://github.com/pikers/piker/issues/128
'''
# last_dt1 = None
last_dt = None
with trio.CancelScope() as cs:
async with open_data_client() as proxy:
out, fails = await get_bars(proxy, fqsn)
if out is None:
raise RuntimeError("Could not pull currrent history?!")
(first_bars, bars_array, first_dt, last_dt) = out
vlm = bars_array['volume']
vlm[vlm < 0] = 0
last_dt = first_dt
# write historical data to buffer
shm.push(bars_array)
task_status.started(cs)
i = 0
while i < count:
out, fails = await get_bars(proxy, fqsn, end_dt=first_dt)
if out is None:
# could be trying to retreive bars over weekend
# TODO: add logic here to handle tradable hours and
# only grab valid bars in the range
log.error(f"Can't grab bars starting at {first_dt}!?!?")
# XXX: get_bars() should internally decrement dt by
# 2k seconds and try again.
continue
(first_bars, bars_array, first_dt, last_dt) = out
# last_dt1 = last_dt
# last_dt = first_dt
# volume cleaning since there's -ve entries,
# wood luv to know what crookery that is..
vlm = bars_array['volume']
vlm[vlm < 0] = 0
# TODO we should probably dig into forums to see what peeps
# think this data "means" and then use it as an indicator of
# sorts? dinkus has mentioned that $vlms for the day dont'
# match other platforms nor the summary stat tws shows in
# the monitor - it's probably worth investigating.
shm.push(bars_array, prepend=True)
i += 1
asset_type_map = {
'STK': 'stock',
'OPT': 'option',
'FUT': 'future',
'CONTFUT': 'continuous_future',
'CASH': 'forex',
'IND': 'index',
'CFD': 'cfd',
'BOND': 'bond',
'CMDTY': 'commodity',
'FOP': 'futures_option',
'FUND': 'mutual_fund',
'WAR': 'warrant',
'IOPT': 'warran',
'BAG': 'bag',
# 'NEWS': 'news',
}
_quote_streams: dict[str, trio.abc.ReceiveStream] = {}
async def _setup_quote_stream(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
symbol: str,
opts: tuple[int] = (
'375', # RT trade volume (excludes utrades)
'233', # RT trade volume (includes utrades)
'236', # Shortable shares
# these all appear to only be updated every 25s thus
# making them mostly useless and explains why the scanner
# is always slow XD
# '293', # Trade count for day
'294', # Trade rate / minute
'295', # Vlm rate / minute
),
contract: Optional[Contract] = None,
) -> trio.abc.ReceiveChannel:
'''
Stream a ticker using the std L1 api.
This task is ``asyncio``-side and must be called from
``tractor.to_asyncio.open_channel_from()``.
'''
global _quote_streams
to_trio.send_nowait(None)
async with load_aio_clients() as accts2clients:
caccount_name, client = get_preferred_data_client(accts2clients)
contract = contract or (await client.find_contract(symbol))
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
# NOTE: it's batch-wise and slow af but I guess could
# be good for backchecking? Seems to be every 5s maybe?
# ticker: Ticker = client.ib.reqTickByTickData(
# contract, 'Last',
# )
# # define a simple queue push routine that streams quote packets
# # to trio over the ``to_trio`` memory channel.
# to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
def teardown():
ticker.updateEvent.disconnect(push)
log.error(f"Disconnected stream for `{symbol}`")
client.ib.cancelMktData(contract)
# decouple broadcast mem chan
_quote_streams.pop(symbol, None)
def push(t: Ticker) -> None:
"""
Push quotes to trio task.
"""
# log.debug(t)
try:
to_trio.send_nowait(t)
except (
trio.BrokenResourceError,
# XXX: HACK, not sure why this gets left stale (probably
# due to our terrible ``tractor.to_asyncio``
# implementation for streams.. but if the mem chan
# gets left here and starts blocking just kill the feed?
# trio.WouldBlock,
):
# XXX: eventkit's ``Event.emit()`` for whatever redic
# reason will catch and ignore regular exceptions
# resulting in tracebacks spammed to console..
# Manually do the dereg ourselves.
teardown()
except trio.WouldBlock:
log.warning(
f'channel is blocking symbol feed for {symbol}?'
f'\n{to_trio.statistics}'
)
# except trio.WouldBlock:
# # for slow debugging purposes to avoid clobbering prompt
# # with log msgs
# pass
ticker.updateEvent.connect(push)
try:
await asyncio.sleep(float('inf'))
finally:
teardown()
# return from_aio
@acm
async def open_aio_quote_stream(
symbol: str,
contract: Optional[Contract] = None,
) -> trio.abc.ReceiveStream:
from tractor.trionics import broadcast_receiver
global _quote_streams
from_aio = _quote_streams.get(symbol)
if from_aio:
# if we already have a cached feed deliver a rx side clone to consumer
async with broadcast_receiver(
from_aio,
2**6,
) as from_aio:
yield from_aio
return
async with tractor.to_asyncio.open_channel_from(
_setup_quote_stream,
symbol=symbol,
contract=contract,
) as (first, from_aio):
# cache feed for later consumers
_quote_streams[symbol] = from_aio
yield from_aio
# TODO: cython/mypyc/numba this!
def normalize(
ticker: Ticker,
calc_price: bool = False
) -> dict:
# should be real volume for this contract by default
calc_price = False
# check for special contract types
con = ticker.contract
if type(con) in (
ibis.Commodity,
ibis.Forex,
):
# commodities and forex don't have an exchange name and
# no real volume so we have to calculate the price
suffix = con.secType
# no real volume on this tract
calc_price = True
else:
suffix = con.primaryExchange
if not suffix:
suffix = con.exchange
# append a `.<suffix>` to the returned symbol
# key for derivatives that normally is the expiry
# date key.
expiry = con.lastTradeDateOrContractMonth
if expiry:
suffix += f'.{expiry}'
# convert named tuples to dicts so we send usable keys
new_ticks = []
for tick in ticker.ticks:
if tick and not isinstance(tick, dict):
td = tick._asdict()
td['type'] = tick_types.get(
td['tickType'],
'n/a',
)
new_ticks.append(td)
tbt = ticker.tickByTicks
if tbt:
print(f'tickbyticks:\n {ticker.tickByTicks}')
ticker.ticks = new_ticks
# some contracts don't have volume so we may want to calculate
# a midpoint price based on data we can acquire (such as bid / ask)
if calc_price:
ticker.ticks.append(
{'type': 'trade', 'price': ticker.marketPrice()}
)
# serialize for transport
data = asdict(ticker)
# generate fqsn with possible specialized suffix
# for derivatives, note the lowercase.
data['symbol'] = data['fqsn'] = '.'.join(
(con.symbol, suffix)
).lower()
# convert named tuples to dicts for transport
tbts = data.get('tickByTicks')
if tbts:
data['tickByTicks'] = [tbt._asdict() for tbt in tbts]
# add time stamps for downstream latency measurements
data['brokerd_ts'] = time.time()
# stupid stupid shit...don't even care any more..
# leave it until we do a proper latency study
# if ticker.rtTime is not None:
# data['broker_ts'] = data['rtTime_s'] = float(
# ticker.rtTime.timestamp) / 1000.
data.pop('rtTime')
return data
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
loglevel: str = None,
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Stream symbol quotes.
This is a ``trio`` callable routine meant to be invoked
once the brokerd is up.
'''
# TODO: support multiple subscriptions
sym = symbols[0]
log.info(f'request for real-time quotes: {sym}')
async with open_data_client() as proxy:
con, first_ticker, details = await proxy.get_sym_details(symbol=sym)
first_quote = normalize(first_ticker)
# print(f'first quote: {first_quote}')
def mk_init_msgs() -> dict[str, dict]:
'''
Collect a bunch of meta-data useful for feed startup and
pack in a `dict`-msg.
'''
# pass back some symbol info like min_tick, trading_hours, etc.
syminfo = asdict(details)
syminfo.update(syminfo['contract'])
# nested dataclass we probably don't need and that won't IPC
# serialize
syminfo.pop('secIdList')
# TODO: more consistent field translation
atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']]
# for stocks it seems TWS reports too small a tick size
# such that you can't submit orders with that granularity?
min_tick = 0.01 if atype == 'stock' else 0
syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick)
# for "traditional" assets, volume is normally discreet, not
# a float
syminfo['lot_tick_size'] = 0.0
ibclient = proxy._aio_ns.ib.client
host, port = ibclient.host, ibclient.port
# TODO: for loop through all symbols passed in
init_msgs = {
# pass back token, and bool, signalling if we're the writer
# and that history has been written
sym: {
'symbol_info': syminfo,
'fqsn': first_quote['fqsn'],
},
'status': {
'data_ep': f'{host}:{port}',
},
}
return init_msgs
init_msgs = mk_init_msgs()
# TODO: we should instead spawn a task that waits on a feed to start
# and let it wait indefinitely..instead of this hard coded stuff.
with trio.move_on_after(1):
contract, first_ticker, details = await proxy.get_quote(symbol=sym)
# it might be outside regular trading hours so see if we can at
# least grab history.
if isnan(first_ticker.last):
task_status.started((init_msgs, first_quote))
# it's not really live but this will unblock
# the brokerd feed task to tell the ui to update?
feed_is_live.set()
# block and let data history backfill code run.
await trio.sleep_forever()
return # we never expect feed to come up?
async with open_aio_quote_stream(
symbol=sym,
contract=con,
) as stream:
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash)
first_ticker.ticks = []
task_status.started((init_msgs, first_quote))
async with aclosing(stream):
if type(first_ticker.contract) not in (
ibis.Commodity,
ibis.Forex
):
# wait for real volume on feed (trading might be closed)
while True:
ticker = await stream.receive()
# for a real volume contract we rait for the first
# "real" trade to take place
if (
# not calc_price
# and not ticker.rtTime
not ticker.rtTime
):
# spin consuming tickers until we get a real
# market datum
log.debug(f"New unsent ticker: {ticker}")
continue
else:
log.debug("Received first real volume tick")
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is truly stateful trash)
ticker.ticks = []
# XXX: this works because we don't use
# ``aclosing()`` above?
break
quote = normalize(ticker)
log.debug(f"First ticker received {quote}")
# tell caller quotes are now coming in live
feed_is_live.set()
# last = time.time()
async for ticker in stream:
quote = normalize(ticker)
await send_chan.send({quote['fqsn']: quote})
# ugh, clear ticks since we've consumed them
ticker.ticks = []
# last = time.time()
async def data_reset_hack(
reset_type: str = 'data',
) -> None:
'''
Run key combos for resetting data feeds and yield back to caller
when complete.
This is a linux-only hack around:
https://interactivebrokers.github.io/tws-api/historical_limitations.html#pacing_violations
TODOs:
- a return type that hopefully determines if the hack was
successful.
- other OS support?
- integration with ``ib-gw`` run in docker + Xorg?
'''
async def vnc_click_hack(
reset_type: str = 'data'
) -> None:
'''
Reset the data or netowork connection for the VNC attached
ib gateway using magic combos.
'''
key = {'data': 'f', 'connection': 'r'}[reset_type]
import asyncvnc
async with asyncvnc.connect(
'localhost',
port=3003,
# password='ibcansmbz',
) as client:
# move to middle of screen
# 640x1800
client.mouse.move(
x=500,
y=500,
)
client.mouse.click()
client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked
await tractor.to_asyncio.run_task(vnc_click_hack)
# we don't really need the ``xdotool`` approach any more B)
return True
@tractor.context
async def open_symbol_search(
ctx: tractor.Context,
) -> None:
# TODO: load user defined symbol set locally for fast search?
await ctx.started({})
async with open_data_client() as proxy:
async with ctx.open_stream() as stream:
last = time.time()
async for pattern in stream:
log.debug(f'received {pattern}')
now = time.time()
assert pattern, 'IB can not accept blank search pattern'
# throttle search requests to no faster then 1Hz
diff = now - last
if diff < 1.0:
log.debug('throttle sleeping')
await trio.sleep(diff)
try:
pattern = stream.receive_nowait()
except trio.WouldBlock:
pass
if not pattern or pattern.isspace():
log.warning('empty pattern received, skipping..')
# TODO: *BUG* if nothing is returned here the client
# side will cache a null set result and not showing
# anything to the use on re-searches when this query
# timed out. We probably need a special "timeout" msg
# or something...
# XXX: this unblocks the far end search task which may
# hold up a multi-search nursery block
await stream.send({})
continue
log.debug(f'searching for {pattern}')
last = time.time()
# async batch search using api stocks endpoint and module
# defined adhoc symbol set.
stock_results = []
async def stash_results(target: Awaitable[list]):
stock_results.extend(await target)
async with trio.open_nursery() as sn:
sn.start_soon(
stash_results,
proxy.search_symbols(
pattern=pattern,
upto=5,
),
)
# trigger async request
await trio.sleep(0)
# match against our ad-hoc set immediately
adhoc_matches = fuzzy.extractBests(
pattern,
list(_adhoc_futes_set),
score_cutoff=90,
)
log.info(f'fuzzy matched adhocs: {adhoc_matches}')
adhoc_match_results = {}
if adhoc_matches:
# TODO: do we need to pull contract details?
adhoc_match_results = {i[0]: {} for i in adhoc_matches}
log.debug(f'fuzzy matching stocks {stock_results}')
stock_matches = fuzzy.extractBests(
pattern,
stock_results,
score_cutoff=50,
)
matches = adhoc_match_results | {
item[0]: {} for item in stock_matches
}
# TODO: we used to deliver contract details
# {item[2]: item[0] for item in stock_matches}
log.debug(f"sending matches: {matches.keys()}")
await stream.send(matches)

View File

@ -19,7 +19,6 @@ Supervisor for docker with included specific-image service helpers.
'''
import os
import time
from typing import (
Optional,
Callable,
@ -187,65 +186,45 @@ class Container:
async def cancel(
self,
stop_msg: str,
) -> None:
cid = self.cntr.id
# first try a graceful cancel
log.cancel(
f'SIGINT cancelling container: {cid}\n'
f'waiting on stop msg: "{stop_msg}"'
)
self.try_signal('SIGINT')
start = time.time()
for _ in range(30):
with trio.move_on_after(0.5) as cs:
cs.shield = True
await self.process_logs_until('initiating graceful shutdown')
await self.process_logs_until('exiting...',)
for _ in range(10):
with trio.move_on_after(0.5) as cs:
cs.shield = True
await self.process_logs_until(stop_msg)
# if we aren't cancelled on above checkpoint then we
# assume we read the expected stop msg and terminated.
await self.process_logs_until('exiting...',)
break
try:
log.info(f'Polling for container shutdown:\n{cid}')
if cs.cancelled_caught:
# get out the big guns, bc apparently marketstore
# doesn't actually know how to terminate gracefully
# :eyeroll:...
self.try_signal('SIGKILL')
if self.cntr.status not in {'exited', 'not-running'}:
try:
log.info('Waiting on container shutdown: {cid}')
self.cntr.wait(
timeout=0.1,
condition='not-running',
)
break
break
except (
ReadTimeout,
ConnectionError,
):
log.error(f'failed to wait on container {cid}')
raise
except (
ReadTimeout,
):
log.info(f'Still waiting on container:\n{cid}')
continue
except (
docker.errors.APIError,
ConnectionError,
):
log.exception('Docker connection failure')
break
else:
delay = time.time() - start
log.error(
f'Failed to kill container {cid} after {delay}s\n'
'sending SIGKILL..'
)
# get out the big guns, bc apparently marketstore
# doesn't actually know how to terminate gracefully
# :eyeroll:...
self.try_signal('SIGKILL')
self.cntr.wait(
timeout=3,
condition='not-running',
)
raise RuntimeError('Failed to cancel container {cid}')
log.cancel(f'Container stopped: {cid}')
@ -266,16 +245,13 @@ async def open_ahabd(
# params, etc. passing to ``Containter.run()``?
# call into endpoint for container config/init
ep_func = NamespacePath(endpoint).load_ref()
(
dcntr,
cntr_config,
start_msg,
stop_msg,
) = ep_func(client)
dcntr, cntr_config = ep_func(client)
cntr = Container(dcntr)
with trio.move_on_after(1):
found = await cntr.process_logs_until(start_msg)
found = await cntr.process_logs_until(
"launching tcp listener for all services...",
)
if not found and cntr not in client.containers.list():
raise RuntimeError(
@ -295,9 +271,16 @@ async def open_ahabd(
# callers to have root perms?
await trio.sleep_forever()
finally:
except (
BaseException,
# trio.Cancelled,
# KeyboardInterrupt,
):
with trio.CancelScope(shield=True):
await cntr.cancel(stop_msg)
await cntr.cancel()
raise
async def start_ahab(

View File

@ -700,7 +700,6 @@ async def manage_history(
bfqsn = fqsn.replace('.' + mod.name, '')
open_history_client = getattr(mod, 'open_history_client', None)
assert open_history_client
if is_up and opened and open_history_client:

View File

@ -127,15 +127,10 @@ def start_marketstore(
import os
import docker
from .. import config
get_console_log('info', name=__name__)
mktsdir = os.path.join(config._config_dir, 'marketstore')
# create when dne
if not os.path.isdir(mktsdir):
os.mkdir(mktsdir)
yml_file = os.path.join(mktsdir, 'mkts.yml')
yml_file = os.path.join(config._config_dir, 'mkts.yml')
if not os.path.isfile(yml_file):
log.warning(
f'No `marketstore` config exists?: {yml_file}\n'
@ -148,14 +143,14 @@ def start_marketstore(
# create a mount from user's local piker config dir into container
config_dir_mnt = docker.types.Mount(
target='/etc',
source=mktsdir,
source=config._config_dir,
type='bind',
)
# create a user config subdir where the marketstore
# backing filesystem database can be persisted.
persistent_data_dir = os.path.join(
mktsdir, 'data',
config._config_dir, 'data',
)
if not os.path.isdir(persistent_data_dir):
os.mkdir(persistent_data_dir)
@ -185,14 +180,7 @@ def start_marketstore(
init=True,
# remove=True,
)
return (
dcntr,
_config,
# expected startup and stop msgs
"launching tcp listener for all services...",
"exiting...",
)
return dcntr, _config
_tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK')
@ -395,12 +383,7 @@ class Storage:
]:
first_tsdb_dt, last_tsdb_dt = None, None
tsdb_arrays = await self.read_ohlcv(
fqsn,
# on first load we don't need to pull the max
# history per request size worth.
limit=3000,
)
tsdb_arrays = await self.read_ohlcv(fqsn)
log.info(f'Loaded tsdb history {tsdb_arrays}')
if tsdb_arrays:
@ -418,7 +401,6 @@ class Storage:
fqsn: str,
timeframe: Optional[Union[int, str]] = None,
end: Optional[int] = None,
limit: int = int(800e3),
) -> tuple[
MarketstoreClient,
@ -441,7 +423,7 @@ class Storage:
# TODO: figure the max limit here given the
# ``purepc`` msg size limit of purerpc: 33554432
limit=limit,
limit=int(800e3),
)
if timeframe is None:

View File

@ -361,7 +361,7 @@ async def cascade(
) -> tuple[TaskTracker, int]:
# TODO: adopt an incremental update engine/approach
# where possible here eventually!
log.debug(f're-syncing fsp {func_name} to source')
log.warning(f're-syncing fsp {func_name} to source')
tracker.cs.cancel()
await tracker.complete.wait()
tracker, index = await n.start(fsp_target)

View File

@ -379,17 +379,17 @@ class Curve(pg.GraphicsObject):
) -> None:
# default line draw last call
# with self.reset_cache():
x = render_data['index']
y = render_data[array_key]
with self.reset_cache():
x = render_data['index']
y = render_data[array_key]
# draw the "current" step graphic segment so it
# lines up with the "middle" of the current
# (OHLC) sample.
self._last_line = QLineF(
x[-2], y[-2],
x[-1], y[-1],
)
# draw the "current" step graphic segment so it
# lines up with the "middle" of the current
# (OHLC) sample.
self._last_line = QLineF(
x[-2], y[-2],
x[-1], y[-1],
)
return x, y

View File

@ -426,6 +426,71 @@ def graphics_update_cycle(
profiler('view incremented')
if vlm_chart:
# always update y-label
ds.vlm_sticky.update_from_data(
*array[-1][['index', 'volume']]
)
if (
(
do_rt_update
or do_append
and liv
)
or trigger_all
):
# TODO: make it so this doesn't have to be called
# once the $vlm is up?
vlm_chart.update_graphics_from_flow(
'volume',
# UGGGh, see ``maxmin()`` impl in `._fsp` for
# the overlayed plotitems... we need a better
# bay to invoke a maxmin per overlay..
render=False,
# XXX: ^^^^ THIS IS SUPER IMPORTANT! ^^^^
# without this, since we disable the
# 'volume' (units) chart after the $vlm starts
# up we need to be sure to enable this
# auto-ranging otherwise there will be no handler
# connected to update accompanying overlay
# graphics..
)
profiler('`vlm_chart.update_graphics_from_flow()`')
if (
mx_vlm_in_view != vars['last_mx_vlm']
):
yrange = (0, mx_vlm_in_view * 1.375)
vlm_chart.view._set_yrange(
yrange=yrange,
)
profiler('`vlm_chart.view._set_yrange()`')
# print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}')
vars['last_mx_vlm'] = mx_vlm_in_view
for curve_name, flow in vlm_chart._flows.items():
if not flow.render:
continue
update_fsp_chart(
vlm_chart,
flow,
curve_name,
array_key=curve_name,
# do_append=uppx < update_uppx,
do_append=do_append,
)
# is this even doing anything?
# (pretty sure it's the real-time
# resizing from last quote?)
fvb = flow.plot.vb
fvb._set_yrange(
# autoscale_linked_plots=False,
name=curve_name,
)
ticks_frame = quote.get('ticks', ())
frames_by_type: dict[str, dict] = {}
@ -475,16 +540,15 @@ def graphics_update_cycle(
or do_append
or trigger_all
):
# TODO: we should always update the "last" datum
# since the current range should at least be updated
# to it's max/min on the last pixel.
chart.update_graphics_from_flow(
chart.name,
# do_append=uppx < update_uppx,
do_append=do_append,
)
# NOTE: we always update the "last" datum
# since the current range should at least be updated
# to it's max/min on the last pixel.
# iterate in FIFO order per tick-frame
for typ, tick in lasts.items():
@ -589,116 +653,31 @@ def graphics_update_cycle(
vars['last_mx'], vars['last_mn'] = mx, mn
# run synchronous update on all linked flows
# TODO: should the "main" (aka source) flow be special?
for curve_name, flow in chart._flows.items():
# update any overlayed fsp flows
if curve_name != chart.data_key:
update_fsp_chart(
chart,
flow,
curve_name,
array_key=curve_name,
)
# even if we're downsampled bigly
# draw the last datum in the final
# px column to give the user the mx/mn
# range of that set.
if (
not do_append
# and not do_rt_update
not (do_rt_update or do_append)
and liv
# even if we're downsampled bigly
# draw the last datum in the final
# px column to give the user the mx/mn
# range of that set.
):
flow.draw_last(
array_key=curve_name,
only_last_uppx=True,
)
# always update the last datum-element
# graphic for all flows
flow.draw_last(array_key=curve_name)
# volume chart logic..
# TODO: can we unify this with the above loop?
if vlm_chart:
# always update y-label
ds.vlm_sticky.update_from_data(
*array[-1][['index', 'volume']]
# TODO: should the "main" (aka source) flow be special?
if curve_name == chart.data_key:
continue
update_fsp_chart(
chart,
flow,
curve_name,
array_key=curve_name,
)
if (
(
do_rt_update
or do_append
and liv
)
or trigger_all
):
# TODO: make it so this doesn't have to be called
# once the $vlm is up?
vlm_chart.update_graphics_from_flow(
'volume',
# UGGGh, see ``maxmin()`` impl in `._fsp` for
# the overlayed plotitems... we need a better
# bay to invoke a maxmin per overlay..
render=False,
# XXX: ^^^^ THIS IS SUPER IMPORTANT! ^^^^
# without this, since we disable the
# 'volume' (units) chart after the $vlm starts
# up we need to be sure to enable this
# auto-ranging otherwise there will be no handler
# connected to update accompanying overlay
# graphics..
)
profiler('`vlm_chart.update_graphics_from_flow()`')
if (
mx_vlm_in_view != vars['last_mx_vlm']
):
yrange = (0, mx_vlm_in_view * 1.375)
vlm_chart.view._set_yrange(
yrange=yrange,
)
profiler('`vlm_chart.view._set_yrange()`')
# print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}')
vars['last_mx_vlm'] = mx_vlm_in_view
for curve_name, flow in vlm_chart._flows.items():
if (
curve_name != 'volume' and
flow.render and (
liv and
do_rt_update or do_append
)
):
update_fsp_chart(
vlm_chart,
flow,
curve_name,
array_key=curve_name,
# do_append=uppx < update_uppx,
do_append=do_append,
)
# is this even doing anything?
# (pretty sure it's the real-time
# resizing from last quote?)
fvb = flow.plot.vb
fvb._set_yrange(
name=curve_name,
)
elif (
curve_name != 'volume'
and not do_append
and liv
and uppx >= 1
# even if we're downsampled bigly
# draw the last datum in the final
# px column to give the user the mx/mn
# range of that set.
):
# always update the last datum-element
# graphic for all flows
# print(f'drawing last {flow.name}')
flow.draw_last(array_key=curve_name)
async def display_symbol_data(
godwidget: GodWidget,

View File

@ -175,7 +175,6 @@ def render_baritems(
name=f'{flow.name}_ds_ohlc',
color=bars._color,
)
flow.ds_graphics = curve
curve.hide()
self.plot.addItem(curve)
@ -193,20 +192,18 @@ def render_baritems(
uppx = curve.x_uppx()
in_line = should_line = curve.isVisible()
if (
in_line
should_line
and uppx < x_gt
):
# print('FLIPPING TO BARS')
should_line = False
flow._in_ds = False
elif (
not in_line
not should_line
and uppx >= x_gt
):
# print('FLIPPING TO LINE')
should_line = True
flow._in_ds = True
profiler(f'ds logic complete line={should_line}')
@ -336,13 +333,7 @@ class Flow(msgspec.Struct): # , frozen=True):
'''
name: str
plot: pg.PlotItem
graphics: Union[Curve, BarItems]
# in some cases a flow may want to change its
# graphical "type" or, "form" when downsampling,
# normally this is just a plain line.
ds_graphics: Optional[Curve] = None
graphics: Curve
_shm: ShmArray
is_ohlc: bool = False
@ -549,7 +540,6 @@ class Flow(msgspec.Struct): # , frozen=True):
should_redraw: bool = False
rkwargs = {}
should_line = False
if isinstance(graphics, BarItems):
# XXX: special case where we change out graphics
# to a line after a certain uppx threshold.
@ -566,8 +556,8 @@ class Flow(msgspec.Struct): # , frozen=True):
profiler,
**kwargs,
)
# bars = True
should_redraw = changed_to_line or not should_line
self._in_ds = should_line
else:
r = self._src_r
@ -671,17 +661,6 @@ class Flow(msgspec.Struct): # , frozen=True):
# assign output paths to graphicis obj
graphics.path = r.path
graphics.fast_path = r.fast_path
# XXX: we don't need this right?
# graphics.draw_last_datum(
# path,
# src_array,
# data,
# reset,
# array_key,
# )
# graphics.update()
# profiler('.update()')
else:
# assign output paths to graphicis obj
graphics.path = r.path
@ -694,15 +673,16 @@ class Flow(msgspec.Struct): # , frozen=True):
reset,
array_key,
)
graphics.update()
profiler('.update()')
# TODO: is this ever better?
# graphics.prepareGeometryChange()
# profiler('.prepareGeometryChange()')
# TODO: does this actuallly help us in any way (prolly should
# look at the source / ask ogi). I think it avoid artifacts on
# wheel-scroll downsampling curve updates?
# TODO: is this ever better?
# graphics.prepareGeometryChange()
# profiler('.prepareGeometryChange()')
graphics.update()
profiler('.update()')
# track downsampled state
self._in_ds = r._in_ds
@ -712,7 +692,6 @@ class Flow(msgspec.Struct): # , frozen=True):
def draw_last(
self,
array_key: Optional[str] = None,
only_last_uppx: bool = False,
) -> None:
@ -732,41 +711,19 @@ class Flow(msgspec.Struct): # , frozen=True):
array_key,
)
# the renderer is downsampling we choose
# to always try and updadte a single (interpolating)
# line segment that spans and tries to display
# the las uppx's worth of datums.
# we only care about the last pixel's
# worth of data since that's all the screen
# can represent on the last column where
# the most recent datum is being drawn.
if self._in_ds or only_last_uppx:
dsg = self.ds_graphics or self.graphics
# XXX: pretty sure we don't need this?
# if isinstance(g, Curve):
# with dsg.reset_cache():
if self._in_ds:
# we only care about the last pixel's
# worth of data since that's all the screen
# can represent on the last column where
# the most recent datum is being drawn.
uppx = self._last_uppx
y = y[-uppx:]
ymn, ymx = y.min(), y.max()
# print(f'drawing uppx={uppx} mxmn line: {ymn}, {ymx}')
try:
iuppx = x[-uppx]
except IndexError:
# we're less then an x-px wide so just grab the start
# datum index.
iuppx = x[0]
dsg._last_line = QLineF(
iuppx, ymn,
g._last_line = QLineF(
x[-2], ymn,
x[-1], ymx,
)
# print(f'updating DS curve {self.name}')
dsg.update()
else:
# print(f'updating NOT DS curve {self.name}')
g.update()
def by_index_and_key(

View File

@ -440,7 +440,7 @@ class FspAdmin:
# if the chart isn't hidden try to update
# the data on screen.
if not self.linked.isHidden():
log.debug(f'Re-syncing graphics for fsp: {ns_path}')
log.info(f'Re-syncing graphics for fsp: {ns_path}')
self.linked.graphics_cycle(
trigger_all=True,
prepend_update_index=info['first'],

View File

@ -57,11 +57,11 @@ setup(
# from github currently (see requirements.txt)
# 'trimeter', # not released yet..
# 'tractor',
# asyncvnc,
# brokers
'asks==2.4.8',
'ib_insync',
'cryptofeed',
# numerics
'pendulum', # easier datetimes
@ -72,34 +72,32 @@ setup(
# UI
'PyQt5',
# 'pyqtgraph', from our fork see reqs.txt
'qdarkstyle >= 3.0.2', # themeing
'fuzzywuzzy[speedup]', # fuzzy search
'pyqtgraph',
'qdarkstyle >= 3.0.2',
# fuzzy search
'fuzzywuzzy[speedup]',
# tsdbs
# anyio-marketstore # from gh see reqs.txt
'pymarketstore',
],
extras_require={
# serialization
'tsdb': [
'docker',
],
},
tests_require=['pytest'],
python_requires=">=3.10",
keywords=[
"async",
"trading",
"finance",
"quant",
"charting",
],
python_requires=">=3.9", # literally for ``datetime.datetime.fromisoformat``...
keywords=["async", "trading", "finance", "quant", "charting"],
classifiers=[
'Development Status :: 3 - Alpha',
'License :: OSI Approved :: ',
'Operating System :: POSIX :: Linux',
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
'Intended Audience :: Financial and Insurance Industry',
'Intended Audience :: Science/Research',