Add trades data streaming support

basic_orders
Tyler Goodlet 2021-01-09 10:54:09 -05:00
parent 620f5fee6e
commit 280739b51a
1 changed files with 132 additions and 24 deletions

View File

@ -35,6 +35,7 @@ import time
from async_generator import aclosing from async_generator import aclosing
from ib_insync.wrapper import RequestError from ib_insync.wrapper import RequestError
from ib_insync.contract import Contract, ContractDetails from ib_insync.contract import Contract, ContractDetails
from ib_insync.order import Order
from ib_insync.ticker import Ticker from ib_insync.ticker import Ticker
import ib_insync as ibis import ib_insync as ibis
from ib_insync.wrapper import Wrapper from ib_insync.wrapper import Wrapper
@ -53,7 +54,6 @@ from ..data import (
) )
from ..data._source import from_df from ..data._source import from_df
from ._util import SymbolNotFound from ._util import SymbolNotFound
from .._async_utils import maybe_with_if
log = get_logger(__name__) log = get_logger(__name__)
@ -150,6 +150,9 @@ class Client:
self.ib = ib self.ib = ib
self.ib.RaiseRequestErrors = True self.ib.RaiseRequestErrors = True
# contract cache
self._contracts: Dict[str, Contract] = {}
# NOTE: the ib.client here is "throttled" to 45 rps by default # NOTE: the ib.client here is "throttled" to 45 rps by default
async def bars( async def bars(
@ -283,6 +286,20 @@ class Client:
currency: str = 'USD', currency: str = 'USD',
**kwargs, **kwargs,
) -> Contract: ) -> Contract:
# TODO: we can't use this currently because
# ``wrapper.starTicker()`` currently cashes ticker instances
# which means getting a singel quote will potentially look up
# a quote for a ticker that it already streaming and thus run
# into state clobbering (eg. List: Ticker.ticks). It probably
# makes sense to try this once we get the pub-sub working on
# individual symbols...
# try:
# # give the cache a go
# return self._contracts[symbol]
# except KeyError:
# log.debug(f'Looking up contract for {symbol}')
# use heuristics to figure out contract "type" # use heuristics to figure out contract "type"
try: try:
sym, exch = symbol.upper().rsplit('.', maxsplit=1) sym, exch = symbol.upper().rsplit('.', maxsplit=1)
@ -336,6 +353,8 @@ class Client:
except IndexError: except IndexError:
raise ValueError(f"No contract could be found {con}") raise ValueError(f"No contract could be found {con}")
self._contracts[symbol] = contract
return contract return contract
async def get_head_time( async def get_head_time(
@ -401,7 +420,65 @@ class Client:
contract, contract,
snapshot=True, snapshot=True,
) )
return contract, (await ticker.updateEvent) ticker = await ticker.updateEvent
return contract, ticker
async def submit_limit(
self,
contract: Contract,
price: float,
action: str = 'BUY',
quantity: int = 100,
) -> None:
self.ib.placeOrder(
Order(
self,
orderType='LMT',
action=action,
totalQuantity=quantity,
lmtPrice=price,
# **kwargs
)
)
async def recv_trade_updates(
self,
to_trio,
) -> None:
"""Stream a ticker using the std L1 api.
"""
# contract = contract or (await self.find_contract(symbol))
def push(eventkit_obj, trade):
"""Push events to trio task.
"""
print(f'{eventkit_obj}: {trade}')
log.debug(trade)
if trade is None:
print("YO WTF NONE")
try:
to_trio.send_nowait(trade)
except trio.BrokenResourceError:
# XXX: eventkit's ``Event.emit()`` for whatever redic
# reason will catch and ignore regular exceptions
# resulting in tracebacks spammed to console..
# Manually do the dereg ourselves.
log.exception(f'Disconnected from {eventkit_obj} updates')
eventkit_obj.disconnect(push)
# hook up to the weird eventkit object - event stream api
for ev_name in [
# 'newOrderEvent', 'orderModifyEvent', 'cancelOrderEvent',
'openOrderEvent', 'orderStatusEvent', 'execDetailsEvent',
# 'commissionReportEvent', 'updatePortfolioEvent', 'positionEvent',
]:
eventkit_obj = getattr(self.ib, ev_name)
handler = partial(push, eventkit_obj)
eventkit_obj.connect(handler)
# let the engine run and stream
await self.ib.disconnectedEvent
# default config ports # default config ports
@ -586,6 +663,7 @@ def normalize(
# convert named tuples to dicts so we send usable keys # convert named tuples to dicts so we send usable keys
new_ticks = [] new_ticks = []
for tick in ticker.ticks: for tick in ticker.ticks:
if tick:
td = tick._asdict() td = tick._asdict()
td['type'] = tick_types.get(td['tickType'], 'n/a') td['type'] = tick_types.get(td['tickType'], 'n/a')
@ -788,7 +866,7 @@ async def stream_quotes(
# no real volume so we have to calculate the price # no real volume so we have to calculate the price
suffix = 'secType' suffix = 'secType'
calc_price = True calc_price = True
ticker = first_ticker # ticker = first_ticker
# pass first quote asap # pass first quote asap
quote = normalize(first_ticker, calc_price=calc_price) quote = normalize(first_ticker, calc_price=calc_price)
@ -814,6 +892,7 @@ async def stream_quotes(
calc_price = False # should be real volume for contract calc_price = False # should be real volume for contract
with trio.move_on_after(10) as cs:
# wait for real volume on feed (trading might be closed) # wait for real volume on feed (trading might be closed)
async for ticker in stream: async for ticker in stream:
@ -836,6 +915,10 @@ async def stream_quotes(
# ``aclosing()`` above? # ``aclosing()`` above?
break break
if cs.cancelled_caught:
await tractor.breakpoint()
# real-time stream # real-time stream
async for ticker in stream: async for ticker in stream:
@ -895,3 +978,28 @@ async def stream_quotes(
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
ticker.ticks = [] ticker.ticks = []
@tractor.msg.pub
async def stream_trades(
loglevel: str = None,
get_topics: Callable = None,
) -> AsyncIterator[Dict[str, Any]]:
log.error('startedddd daa tradeeeez feeeedddzzz')
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
stream = await _trio_run_client_method(
method='recv_trade_updates',
)
# more great work by our friend ib_insync...
# brutallll bby.
none = await stream.__anext__()
print(f'Cuz sending {none} makes sense..')
async for trade_event in stream:
msg = asdict(trade_event)
yield {'all': msg}