Finally, sanely normalize local trades event data

basic_orders
Tyler Goodlet 2021-01-15 19:40:09 -05:00
parent c835cc10e0
commit bdc02010cf
1 changed files with 82 additions and 61 deletions

View File

@ -32,6 +32,8 @@ import inspect
import itertools import itertools
import time import time
import trio
import tractor
from async_generator import aclosing from async_generator import aclosing
from ib_insync.wrapper import RequestError from ib_insync.wrapper import RequestError
from ib_insync.contract import Contract, ContractDetails from ib_insync.contract import Contract, ContractDetails
@ -40,15 +42,12 @@ from ib_insync.ticker import Ticker
import ib_insync as ibis import ib_insync as ibis
from ib_insync.wrapper import Wrapper from ib_insync.wrapper import Wrapper
from ib_insync.client import Client as ib_Client from ib_insync.client import Client as ib_Client
import trio
import tractor
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ( from ..data import (
maybe_spawn_brokerd, maybe_spawn_brokerd,
iterticks, iterticks,
attach_shm_array, attach_shm_array,
# get_shm_token,
subscribe_ohlc_for_increment, subscribe_ohlc_for_increment,
_buffer, _buffer,
) )
@ -217,7 +216,6 @@ class Client:
# barSizeSetting='1 min', # barSizeSetting='1 min',
# always use extended hours # always use extended hours
useRTH=False, useRTH=False,
@ -306,6 +304,10 @@ class Client:
# into state clobbering (eg. List: Ticker.ticks). It probably # into state clobbering (eg. List: Ticker.ticks). It probably
# makes sense to try this once we get the pub-sub working on # makes sense to try this once we get the pub-sub working on
# individual symbols... # individual symbols...
# XXX UPDATE: we can probably do the tick/trades scraping
# inside our eventkit handler instead to bypass this entirely?
# try: # try:
# # give the cache a go # # give the cache a go
# return self._contracts[symbol] # return self._contracts[symbol]
@ -386,7 +388,6 @@ class Client:
to_trio, to_trio,
opts: Tuple[int] = ('375', '233',), opts: Tuple[int] = ('375', '233',),
contract: Optional[Contract] = None, contract: Optional[Contract] = None,
# opts: Tuple[int] = ('459',),
) -> None: ) -> None:
"""Stream a ticker using the std L1 api. """Stream a ticker using the std L1 api.
""" """
@ -435,11 +436,11 @@ class Client:
# async to be consistent for the client proxy, and cuz why not. # async to be consistent for the client proxy, and cuz why not.
async def submit_limit( async def submit_limit(
self, self,
oid: str, oid: str, # XXX: see return value
symbol: str, symbol: str,
price: float, price: float,
action: str = 'BUY', action: str,
quantity: int = 100, size: int = 100,
) -> int: ) -> int:
"""Place an order and return integer request id provided by client. """Place an order and return integer request id provided by client.
@ -452,16 +453,14 @@ class Client:
# against non-known prices. # against non-known prices.
raise RuntimeError("Can not order {symbol}, no live feed?") raise RuntimeError("Can not order {symbol}, no live feed?")
# contract.exchange = 'SMART'
trade = self.ib.placeOrder( trade = self.ib.placeOrder(
contract, contract,
Order( Order(
# orderId=oid, # orderId=oid, # stupid api devs..
action=action.upper(), # BUY/SELL action=action.upper(), # BUY/SELL
orderType='LMT', orderType='LMT',
lmtPrice=price, lmtPrice=price,
totalQuantity=quantity, totalQuantity=size,
outsideRth=True, outsideRth=True,
optOutSmartRouting=True, optOutSmartRouting=True,
@ -469,18 +468,21 @@ class Client:
designatedLocation='SMART', designatedLocation='SMART',
), ),
) )
# ib doesn't support setting your own id outside
# their own weird client int counting ids..
return trade.order.orderId return trade.order.orderId
async def submit_cancel( async def submit_cancel(
self, self,
oid: str, reqid: str,
) -> None: ) -> None:
"""Send cancel request for order id ``oid``. """Send cancel request for order id ``oid``.
""" """
self.ib.cancelOrder( self.ib.cancelOrder(
Order( Order(
orderId=oid, orderId=reqid,
clientId=self.ib.client.clientId, clientId=self.ib.client.clientId,
) )
) )
@ -491,43 +493,37 @@ class Client:
) -> None: ) -> None:
"""Stream a ticker using the std L1 api. """Stream a ticker using the std L1 api.
""" """
# contract = contract or (await self.find_contract(symbol))
self.inline_errors(to_trio) self.inline_errors(to_trio)
def push_tradesies(eventkit_obj, trade, fill=None): def push_tradesies(eventkit_obj, trade, fill=None):
"""Push events to trio task. """Push events to trio task.
""" """
# if fill is not None: if fill is not None:
# heyoo we executed, and thanks to ib_insync # execution details event
# we have to handle the callback signature differently item = ('fill', (trade, fill))
# due to its consistently non-consistent design. else:
item = ('status', trade)
# yet again convert the datetime since they aren't log.info(f'{eventkit_obj}: {item}')
# ipc serializable...
# fill.time = fill.time.timestamp
# trade.fill = fill
print(f'{eventkit_obj}: {trade}')
log.debug(trade)
if trade is None:
print("YO WTF NONE")
try: try:
to_trio.send_nowait(trade) to_trio.send_nowait(item)
except trio.BrokenResourceError: except trio.BrokenResourceError:
# XXX: eventkit's ``Event.emit()`` for whatever redic
# reason will catch and ignore regular exceptions
# resulting in tracebacks spammed to console..
# Manually do the dereg ourselves.
log.exception(f'Disconnected from {eventkit_obj} updates') log.exception(f'Disconnected from {eventkit_obj} updates')
eventkit_obj.disconnect(push_tradesies) eventkit_obj.disconnect(push_tradesies)
# hook up to the weird eventkit object - event stream api # hook up to the weird eventkit object - event stream api
for ev_name in [ for ev_name in [
'orderStatusEvent', 'orderStatusEvent', # all order updates
'execDetailsEvent', 'execDetailsEvent', # all "fill" updates
# XXX: not sure yet if we need these
# 'commissionReportEvent', # 'commissionReportEvent',
# XXX: ugh, it is a separate event from IB and it's
# emitted as follows:
# self.ib.commissionReportEvent.emit(trade, fill, report)
# XXX: not sure yet if we need these
# 'updatePortfolioEvent', # 'updatePortfolioEvent',
# 'positionEvent', # 'positionEvent',
@ -559,13 +555,13 @@ class Client:
) -> None: ) -> None:
log.error(errorString) log.error(errorString)
try: try:
to_trio.send_nowait( to_trio.send_nowait((
{'error': { 'error',
'brid': reqId, # error "object"
{'reqid': reqId,
'message': errorString, 'message': errorString,
'contract': contract, 'contract': contract}
}} ))
)
except trio.BrokenResourceError: except trio.BrokenResourceError:
# XXX: eventkit's ``Event.emit()`` for whatever redic # XXX: eventkit's ``Event.emit()`` for whatever redic
# reason will catch and ignore regular exceptions # reason will catch and ignore regular exceptions
@ -838,8 +834,8 @@ async def fill_bars(
method='bars', method='bars',
symbol=sym, symbol=sym,
end_dt=next_dt, end_dt=next_dt,
) )
shm.push(bars_array, prepend=True) shm.push(bars_array, prepend=True)
i += 1 i += 1
next_dt = bars[0].date next_dt = bars[0].date
@ -1092,25 +1088,50 @@ async def stream_trades(
method='recv_trade_updates', method='recv_trade_updates',
) )
# init startup msg # startup msg
yield {'trade_events': 'started'} yield {'local_trades': 'start'}
async for event in stream: async for event_name, item in stream:
from pprint import pprint
if not isinstance(event, dict): # XXX: begin normalization of nonsense ib_insync internal
# remove trade log entries for now until we figure out if we # object-state tracking representations...
# even want to retreive them this way and because they're using
# datetimes
event = asdict(event)
pprint(event)
event.pop('log', None)
# fills = event.get('fills') if event_name == 'status':
# if fills:
# await tractor.breakpoint()
# for fill in fills:
# fill['time'] = fill['time'].timestamp
# exec = fill.pop('execution')
yield {'trade_events': event} # unwrap needed data from ib_insync internal objects
trade = item
status = trade.orderStatus
# skip duplicate filled updates - we get the deats
# from the execution details event
msg = {
'reqid': trade.order.orderId,
'status': status.status,
'filled': status.filled,
'reason': status.whyHeld,
# this seems to not be necessarily up to date in the
# execDetails event.. so we have to send it here I guess?
'remaining': status.remaining,
}
elif event_name == 'fill':
trade, fill = item
execu = fill.execution
msg = {
'reqid': execu.orderId,
'execid': execu.execId,
# supposedly IB server fill time
'broker_time': execu.time, # converted to float by us
'time': fill.time, # ns in main TCP handler by us
'time_ns': time.time_ns(), # cuz why not
'action': {'BOT': 'buy', 'SLD': 'sell'}[execu.side],
'size': execu.shares,
'price': execu.price,
}
elif event_name == 'error':
msg = item
yield {'local_trades': (event_name, msg)}