diff --git a/piker/_ems.py b/piker/_ems.py
index e860199d..65374ace 100644
--- a/piker/_ems.py
+++ b/piker/_ems.py
@@ -15,16 +15,18 @@
# along with this program. If not, see .
"""
-In suit parlance: "Execution management systems"
+In da suit parlances: "Execution management systems"
"""
from pprint import pformat
import time
+from datetime import datetime
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import (
AsyncIterator, Dict, Callable, Tuple,
)
+import uuid
from bidict import bidict
import trio
@@ -73,10 +75,13 @@ def mk_check(trigger_price, known_last) -> Callable[[float, float], bool]:
@dataclass
-class _ExecBook:
- """EMS-side execution book.
+class _DarkBook:
+ """Client-side execution book.
+
+ Contains conditions for executions (aka "orders") which are not
+ exposed to brokers and thus the market; i.e. these are privacy
+ focussed "client side" orders.
- Contains conditions for executions (aka "orders").
A singleton instance is created per EMS actor (for now).
"""
@@ -105,13 +110,13 @@ class _ExecBook:
_broker2ems_ids: Dict[str, str] = field(default_factory=bidict)
-_books: Dict[str, _ExecBook] = {}
+_books: Dict[str, _DarkBook] = {}
-def get_book(broker: str) -> _ExecBook:
+def get_dark_book(broker: str) -> _DarkBook:
global _books
- return _books.setdefault(broker, _ExecBook(broker))
+ return _books.setdefault(broker, _DarkBook(broker))
# XXX: this is in place to prevent accidental positions that are too
@@ -129,9 +134,20 @@ class PaperBoi:
requirements.
"""
+ broker: str
_to_trade_stream: trio.abc.SendChannel
trade_stream: trio.abc.ReceiveChannel
+ # map of paper "live" orders which be used
+ # to simulate fills based on paper engine settings
+ _buys: bidict
+ _sells: bidict
+ _reqids: bidict
+
+ # init edge case L1 spread
+ last_ask: Tuple[float, float] = (float('inf'), 0) # price, size
+ last_bid: Tuple[float, float] = (0, 0)
+
async def submit_limit(
self,
oid: str, # XXX: see return value
@@ -143,6 +159,46 @@ class PaperBoi:
"""Place an order and return integer request id provided by client.
"""
+ # the trades stream expects events in the form
+ # {'local_trades': (event_name, msg)}
+ reqid = str(uuid.uuid4())
+
+ # register this submissions as a paper live order
+ if action == 'buy':
+ orders = self._buys
+
+ elif action == 'sell':
+ orders = self._sells
+
+ # buys/sells: (symbol -> (price -> order))
+ orders.setdefault(symbol, {})[price] = (size, oid, reqid, action)
+
+ self._reqids[reqid] = (oid, symbol, action, price)
+
+ # TODO: net latency model
+ # we checkpoint here quickly particulalry
+ # for dark orders since we want the dark_executed
+ # to trigger first thus creating a lookup entry
+ # in the broker trades event processing loop
+ await trio.sleep(0.05)
+
+ await self._to_trade_stream.send({
+
+ 'local_trades': ('status', {
+
+ 'time_ns': time.time_ns(),
+ 'reqid': reqid,
+
+ 'status': 'submitted',
+ 'broker': self.broker,
+ # 'cmd': cmd, # original request message
+
+ 'paper_info': {
+ 'oid': oid,
+ },
+ }),
+ })
+ return reqid
async def submit_cancel(
self,
@@ -150,12 +206,303 @@ class PaperBoi:
) -> None:
# TODO: fake market simulation effects
- self._to_trade_stream()
+ # await self._to_trade_stream.send(
+ oid, symbol, action, price = self._reqids[reqid]
- def emulate_fill(
- self
+ if action == 'buy':
+ self._buys[symbol].pop(price)
+ elif action == 'sell':
+ self._sells[symbol].pop(price)
+
+ # TODO: net latency model
+ await trio.sleep(0.05)
+
+ await self._to_trade_stream.send({
+
+ 'local_trades': ('status', {
+
+ 'time_ns': time.time_ns(),
+ 'oid': oid,
+ 'reqid': reqid,
+
+ 'status': 'cancelled',
+ 'broker': self.broker,
+ # 'cmd': cmd, # original request message
+
+ 'paper': True,
+ }),
+ })
+
+ async def fake_fill(
+ self,
+ price: float,
+ size: float,
+ action: str, # one of {'buy', 'sell'}
+
+ reqid: str,
+ oid: str,
+
+ # determine whether to send a filled status that has zero
+ # remaining lots to fill
+ order_complete: bool = True,
+ remaining: float = 0,
) -> None:
- ...
+ """Pretend to fill a broker order @ price and size.
+
+ """
+ # TODO: net latency model
+ await trio.sleep(0.05)
+
+ await self._to_trade_stream.send({
+
+ 'local_trades': ('fill', {
+
+ 'status': 'filled',
+ 'broker': self.broker,
+ # converted to float by us in ib backend
+ 'broker_time': datetime.now().timestamp(),
+
+ 'action': action,
+ 'size': size,
+ 'price': price,
+ 'remaining': 0 if order_complete else remaining,
+
+ # normally filled by real `brokerd` daemon
+ 'time': time.time_ns(),
+ 'time_ns': time.time_ns(), # cuz why not
+
+ # fake ids
+ 'reqid': reqid,
+
+ 'paper_info': {
+ 'oid': oid,
+ },
+
+ # XXX: fields we might not need to emulate?
+ # execution id from broker
+ # 'execid': execu.execId,
+ # 'cmd': cmd, # original request message?
+ }),
+ })
+ if order_complete:
+ await self._to_trade_stream.send({
+
+ 'local_trades': ('status', {
+ 'reqid': reqid,
+ 'status': 'filled',
+ 'broker': self.broker,
+ 'filled': size,
+ 'remaining': 0 if order_complete else remaining,
+
+ # converted to float by us in ib backend
+ 'broker_time': datetime.now().timestamp(),
+ 'paper_info': {
+ 'oid': oid,
+ },
+ }),
+ })
+
+
+async def simulate_fills(
+ quote_stream: 'tractor.ReceiveStream', # noqa
+ client: PaperBoi,
+) -> None:
+
+ # TODO: more machinery to better simulate real-world market things:
+
+ # - slippage models, check what quantopian has:
+ # https://github.com/quantopian/zipline/blob/master/zipline/finance/slippage.py
+ # * this should help with simulating partial fills in a fast moving mkt
+ # afaiu
+
+ # - commisions models, also quantopian has em:
+ # https://github.com/quantopian/zipline/blob/master/zipline/finance/commission.py
+
+ # - network latency models ??
+
+ # - position tracking:
+ # https://github.com/quantopian/zipline/blob/master/zipline/finance/ledger.py
+
+ # this stream may eventually contain multiple symbols
+ async for quotes in quote_stream:
+ for sym, quote in quotes.items():
+
+ buys, sells = client._buys.get(sym), client._sells.get(sym)
+
+ if not (buys or sells):
+ continue
+
+ for tick in iterticks(
+ quote,
+ # dark order price filter(s)
+ types=('ask', 'bid', 'trade', 'last')
+ ):
+ print(tick)
+ tick_price = tick.get('price')
+ ttype = tick['type']
+
+ if ttype in ('ask',) and buys:
+
+ client.last_ask = (
+ tick_price,
+ tick.get('size', client.last_ask[1]),
+ )
+
+ # iterate book prices descending
+ for our_bid in reversed(sorted(buys.keys())):
+
+ if tick_price < our_bid:
+
+ # retreive order info
+ (size, oid, reqid, action) = buys.pop(our_bid)
+
+ # clearing price would have filled entirely
+ await client.fake_fill(
+ # todo slippage to determine fill price
+ tick_price,
+ size,
+ action,
+ reqid,
+ oid,
+ )
+ else:
+ # prices are interated in sorted order so
+ # we're done
+ break
+
+ if ttype in ('bid',) and sells:
+
+ # iterate book prices ascending
+ for our_ask in sorted(sells.keys()):
+
+ client.last_bid = (
+ tick_price,
+ tick.get('bid', client.last_bid[1]),
+ )
+
+ if tick_price > our_ask:
+
+ # retreive order info
+ (size, oid, reqid, action) = sells.pop(our_ask)
+
+ # clearing price would have filled entirely
+ await client.fake_fill(
+ tick_price,
+ size,
+ action,
+ reqid,
+ oid,
+ )
+ else:
+ # prices are interated in sorted order so
+ # we're done
+ break
+
+ if ttype in ('trade', 'last'):
+ # TODO: simulate actual book queues and our orders
+ # place in it, might require full L2 data?
+ pass
+
+
+async def execute_triggers(
+ broker: str,
+ symbol: str,
+ stream: 'tractor.ReceiveStream', # noqa
+ ctx: tractor.Context,
+ client: 'Client', # noqa
+ book: _DarkBook,
+) -> None:
+ """Core dark order trigger loop.
+
+ Scan the (price) data feed and submit triggered orders
+ to broker.
+
+ """
+ # this stream may eventually contain multiple symbols
+ async for quotes in stream:
+
+ # TODO: numba all this!
+
+ # start = time.time()
+ for sym, quote in quotes.items():
+
+ execs = book.orders.get(sym, None)
+ if execs is None:
+ continue
+
+ for tick in iterticks(
+ quote,
+ # dark order price filter(s)
+ types=('ask', 'bid', 'trade', 'last')
+ ):
+ price = tick.get('price')
+ ttype = tick['type']
+
+ # lel, fuck you ib
+ # if price < 0:
+ # log.error(f'!!?!?!VOLUME TICK {tick}!?!?')
+ # continue
+
+ # update to keep new cmds informed
+ book.lasts[(broker, symbol)] = price
+
+ for oid, (
+ pred,
+ tf,
+ cmd,
+ percent_away,
+ abs_diff_away
+ ) in (
+ tuple(execs.items())
+ ):
+
+ if (ttype not in tf) or (not pred(price)):
+ # majority of iterations will be non-matches
+ continue
+
+ # submit_price = price + price*percent_away
+ submit_price = price + abs_diff_away
+
+ log.info(
+ f'Dark order triggered for price {price}\n'
+ f'Submitting order @ price {submit_price}')
+
+ reqid = await client.submit_limit(
+ oid=oid,
+ symbol=sym,
+ action=cmd['action'],
+ price=submit_price,
+ size=cmd['size'],
+ )
+
+ # register broker request id to ems id
+ book._broker2ems_ids[reqid] = oid
+
+ resp = {
+ 'resp': 'dark_executed',
+ 'time_ns': time.time_ns(),
+ 'trigger_price': price,
+
+ 'cmd': cmd, # original request message
+
+ 'broker_reqid': reqid,
+ 'broker': broker,
+ 'oid': oid, # piker order id
+
+ }
+
+ # remove exec-condition from set
+ log.info(f'removing pred for {oid}')
+ execs.pop(oid)
+
+ await ctx.send_yield(resp)
+
+ else: # condition scan loop complete
+ log.debug(f'execs are {execs}')
+ if execs:
+ book.orders[symbol] = execs
+
+ # print(f'execs scan took: {time.time() - start}')
async def exec_loop(
@@ -165,7 +512,10 @@ async def exec_loop(
_exec_mode: str,
task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED,
) -> AsyncIterator[dict]:
+ """Main scan loop for order execution conditions and submission
+ to brokers.
+ """
async with data.open_feed(
broker,
[symbol],
@@ -174,32 +524,40 @@ async def exec_loop(
# TODO: get initial price quote from target broker
first_quote = await feed.receive()
- book = get_book(broker)
+
+ book = get_dark_book(broker)
book.lasts[(broker, symbol)] = first_quote[symbol]['last']
# TODO: wrap this in a more re-usable general api
client_factory = getattr(feed.mod, 'get_client_proxy', None)
- # we have an order API for this broker
if client_factory is not None and _exec_mode != 'paper':
+
+ # we have an order API for this broker
client = client_factory(feed._brokerd_portal)
- # force paper mode
else:
- log.warning(
- f'No order client is yet supported for {broker}, '
- 'entering paper mode')
+ # force paper mode
+ log.warning(f'Entering paper trading mode for {broker}')
- client = PaperBoi(*trio.open_memory_channel(100))
+ client = PaperBoi(
+ broker,
+ *trio.open_memory_channel(100),
+ _buys={},
+ _sells={},
+ _reqids={},
+ )
# for paper mode we need to mock this trades response feed
# so we pass a duck-typed feed-looking mem chan which is fed
# fill and submission events from the exec loop
- feed._set_fake_trades_stream(client.trade_stream)
+ feed._trade_stream = client.trade_stream
# init the trades stream
client._to_trade_stream.send_nowait({'local_trades': 'start'})
+ _exec_mode = 'paper'
+
# return control to parent task
task_status.started((first_quote, feed, client))
@@ -211,92 +569,19 @@ async def exec_loop(
# shield this field so the remote brokerd does not get cancelled
stream = feed.stream
with stream.shield():
+ async with trio.open_nursery() as n:
+ n.start_soon(
+ execute_triggers,
+ broker,
+ symbol,
+ stream,
+ ctx,
+ client,
+ book
+ )
- # this stream may eventually contain multiple symbols
- async for quotes in stream:
-
- # TODO: numba all this!
-
- # start = time.time()
- for sym, quote in quotes.items():
-
- execs = book.orders.get(sym, None)
- if execs is None:
- continue
-
- for tick in iterticks(
- quote,
- # dark order price filter(s)
- types=('ask', 'bid', 'trade', 'last')
- ):
- price = tick.get('price')
- ttype = tick['type']
-
- # lel, fuck you ib
- if price < 0:
- log.error(f'!!?!?!VOLUME TICK {tick}!?!?')
- continue
-
- # update to keep new cmds informed
- book.lasts[(broker, symbol)] = price
-
- for oid, (
- pred,
- tf,
- cmd,
- percent_away,
- abs_diff_away
- ) in (
- tuple(execs.items())
- ):
-
- if (ttype not in tf) or (not pred(price)):
- # majority of iterations will be non-matches
- continue
-
- # submit_price = price + price*percent_away
- submit_price = price + abs_diff_away
-
- log.info(
- f'Dark order triggered for price {price}\n'
- f'Submitting order @ price {submit_price}')
-
- reqid = await client.submit_limit(
- oid=oid,
- symbol=sym,
- action=cmd['action'],
- price=round(submit_price, 2),
- size=cmd['size'],
- )
- # register broker request id to ems id
- book._broker2ems_ids[reqid] = oid
-
- resp = {
- 'resp': 'dark_executed',
- 'time_ns': time.time_ns(),
- 'trigger_price': price,
- 'broker_reqid': reqid,
- 'broker': broker,
- 'oid': oid,
- 'cmd': cmd, # original request message
-
- # current shm array index - this needed?
- # 'ohlc_index': feed.shm._last.value - 1,
- }
-
- # remove exec-condition from set
- log.info(f'removing pred for {oid}')
- execs.pop(oid)
-
- await ctx.send_yield(resp)
-
- else: # condition scan loop complete
- log.debug(f'execs are {execs}')
- if execs:
- book.orders[symbol] = execs
-
- # print(f'execs scan took: {time.time() - start}')
- # feed teardown
+ if _exec_mode == 'paper':
+ n.start_soon(simulate_fills, stream.clone(), client)
# TODO: lots of cases still to handle
@@ -312,7 +597,7 @@ async def exec_loop(
async def process_broker_trades(
ctx: tractor.Context,
feed: 'Feed', # noqa
- book: _ExecBook,
+ book: _DarkBook,
task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED,
) -> AsyncIterator[dict]:
"""Trades update loop - receive updates from broker, convert
@@ -329,17 +614,18 @@ async def process_broker_trades(
'status' -> relabel as 'broker_', if complete send 'executed'
'fill' -> 'broker_filled'
- Currently accepted status values from IB
+ Currently accepted status values from IB:
{'presubmitted', 'submitted', 'cancelled', 'inactive'}
"""
broker = feed.mod.name
with trio.fail_after(5):
+ # in the paper engine case this is just a mem receive channel
trades_stream = await feed.recv_trades_data()
first = await trades_stream.__anext__()
- # startup msg
+ # startup msg expected as first from broker backend
assert first['local_trades'] == 'start'
task_status.started()
@@ -354,7 +640,19 @@ async def process_broker_trades(
# make response packet to EMS client(s)
oid = book._broker2ems_ids.get(reqid)
- resp = {'oid': oid}
+
+ if oid is None:
+ # paper engine race case: ``Client.submit_limit()`` hasn't
+ # returned yet and provided an output reqid to register
+ # locally, so we need to retreive the oid that was already
+ # packed at submission since we already know it ahead of
+ # time
+ oid = msg['paper_info']['oid']
+
+ resp = {
+ 'resp': None, # placeholder
+ 'oid': oid
+ }
if name in (
'error',
@@ -379,6 +677,9 @@ async def process_broker_trades(
# another stupid ib error to handle
# if 10147 in message: cancel
+ # don't relay message to order requester client
+ continue
+
elif name in (
'status',
):
@@ -398,12 +699,14 @@ async def process_broker_trades(
status = msg['status'].lower()
if status == 'filled':
+ # await tractor.breakpoint()
# conditional execution is fully complete, no more
# fills for the noted order
if not msg['remaining']:
- await ctx.send_yield(
- {'resp': 'broker_executed', 'oid': oid})
+
+ resp['resp'] = 'broker_executed'
+
log.info(f'Execution for {oid} is complete!')
# just log it
@@ -414,16 +717,17 @@ async def process_broker_trades(
# one of (submitted, cancelled)
resp['resp'] = 'broker_' + status
- await ctx.send_yield(resp)
-
elif name in (
'fill',
):
# proxy through the "fill" result(s)
resp['resp'] = 'broker_filled'
resp.update(msg)
- await ctx.send_yield(resp)
- log.info(f'Fill for {oid} cleared with\n{pformat(resp)}')
+
+ log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
+
+ # respond to requesting client
+ await ctx.send_yield(resp)
@tractor.stream
@@ -452,14 +756,14 @@ async def _ems_main(
- ``_ems_main()``:
accepts order cmds, registers execs with exec loop
- - ``exec_loop()``: run conditions on inputs and trigger executions
+ - ``exec_loop()``:
+ run (dark) conditions on inputs and trigger broker submissions
- ``process_broker_trades()``:
accept normalized trades responses, process and relay to ems client(s)
"""
- actor = tractor.current_actor()
- book = get_book(broker)
+ book = get_dark_book(broker)
# get a portal back to the client
async with tractor.wait_for_actor(client_actor_name) as portal:
@@ -485,10 +789,13 @@ async def _ems_main(
book,
)
- # connect back to the calling actor to receive order requests
+ # connect back to the calling actor (the one that is
+ # acting as an EMS client and will submit orders) to
+ # receive requests pushed over a tractor stream
+ # using (for now) an async generator.
async for cmd in await portal.run(send_order_cmds):
- log.info(f'{cmd} received in {actor.uid}')
+ log.info(f'Received order cmd:\n{pformat(cmd)}')
action = cmd['action']
oid = cmd['oid']
@@ -533,7 +840,7 @@ async def _ems_main(
oid=oid, # no ib support for this
symbol=sym,
action=action,
- price=round(trigger_price, 2),
+ price=trigger_price,
size=size,
)
book._broker2ems_ids[order_id] = oid
@@ -590,7 +897,7 @@ async def _ems_main(
abs_diff_away
)
- # ack-response that order is live here
+ # ack-response that order is live in EMS
await ctx.send_yield({
'resp': 'dark_submitted',
'oid': oid
@@ -608,10 +915,11 @@ class OrderBook:
hard/fast work of talking to brokers/exchanges to conduct
executions.
- Currently, mostly for keeping local state to match the EMS and use
- received events to trigger graphics updates.
+ Currently, this is mostly for keeping local state to match the EMS
+ and use received events to trigger graphics updates.
"""
+ # mem channels used to relay order requests to the EMS daemon
_to_ems: trio.abc.SendChannel
_from_order_book: trio.abc.ReceiveChannel
@@ -626,7 +934,7 @@ class OrderBook:
size: float,
action: str,
exec_mode: str,
- ) -> str:
+ ) -> dict:
cmd = {
'action': action,
'price': price,
@@ -638,6 +946,7 @@ class OrderBook:
}
self._sent_orders[uuid] = cmd
self._to_ems.send_nowait(cmd)
+ return cmd
async def modify(self, oid: str, price) -> bool:
...
@@ -658,8 +967,13 @@ class OrderBook:
_orders: OrderBook = None
-def get_orders(emsd_uid: Tuple[str, str] = None) -> OrderBook:
+def get_orders(
+ emsd_uid: Tuple[str, str] = None
+) -> OrderBook:
+ """"
+ OrderBook singleton factory per actor.
+ """
if emsd_uid is not None:
# TODO: read in target emsd's active book on startup
pass
@@ -669,7 +983,6 @@ def get_orders(emsd_uid: Tuple[str, str] = None) -> OrderBook:
if _orders is None:
# setup local ui event streaming channels for request/resp
# streamging with EMS daemon
- # _to_ems, _from_order_book = trio.open_memory_channel(100)
_orders = OrderBook(*trio.open_memory_channel(100))
return _orders
@@ -701,7 +1014,7 @@ async def send_order_cmds():
async for cmd in orders_stream:
# send msg over IPC / wire
- log.info(f'sending order cmd: {cmd}')
+ log.info(f'Send order cmd:\n{pformat(cmd)}')
yield cmd
@@ -737,7 +1050,6 @@ async def open_ems(
TODO: make some fancy diagrams using mermaid.io
-
the possible set of responses from the stream is currently:
- 'dark_submitted', 'broker_submitted'
- 'dark_cancelled', 'broker_cancelled'
@@ -766,7 +1078,7 @@ async def open_ems(
# ready for order commands
book = get_orders()
- with trio.fail_after(5):
+ with trio.fail_after(10):
await book._ready_to_receive.wait()
yield book, trades_stream