diff --git a/piker/exchange/_client.py b/piker/exchange/_client.py
new file mode 100644
index 00000000..4bf84828
--- /dev/null
+++ b/piker/exchange/_client.py
@@ -0,0 +1,212 @@
+# piker: trading gear for hackers
+# Copyright (C) Tyler Goodlet (in stewardship for piker0)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+"""
+Orders and execution client API.
+
+"""
+from contextlib import asynccontextmanager
+from typing import Dict, Tuple
+from pprint import pformat
+from dataclasses import dataclass, field
+
+import trio
+import tractor
+
+from ..data._source import Symbol
+from ..log import get_logger
+from ._ems import _ems_main
+
+
+log = get_logger(__name__)
+
+
+@dataclass
+class OrderBook:
+ """Buy-side (client-side ?) order book ctl and tracking.
+
+ A style similar to "model-view" is used here where this api is
+ provided as a supervised control for an EMS actor which does all the
+ hard/fast work of talking to brokers/exchanges to conduct
+ executions.
+
+ Currently, this is mostly for keeping local state to match the EMS
+ and use received events to trigger graphics updates.
+
+ """
+ # mem channels used to relay order requests to the EMS daemon
+ _to_ems: trio.abc.SendChannel
+ _from_order_book: trio.abc.ReceiveChannel
+
+ _sent_orders: Dict[str, dict] = field(default_factory=dict)
+ _ready_to_receive: trio.Event = trio.Event()
+
+ def send(
+ self,
+ uuid: str,
+ symbol: 'Symbol',
+ price: float,
+ size: float,
+ action: str,
+ exec_mode: str,
+ ) -> dict:
+ cmd = {
+ 'action': action,
+ 'price': price,
+ 'size': size,
+ 'symbol': symbol.key,
+ 'brokers': symbol.brokers,
+ 'oid': uuid,
+ 'exec_mode': exec_mode, # dark or live
+ }
+ self._sent_orders[uuid] = cmd
+ self._to_ems.send_nowait(cmd)
+ return cmd
+
+ async def modify(self, oid: str, price) -> bool:
+ ...
+
+ def cancel(self, uuid: str) -> bool:
+ """Cancel an order (or alert) from the EMS.
+
+ """
+ cmd = self._sent_orders[uuid]
+ msg = {
+ 'action': 'cancel',
+ 'oid': uuid,
+ 'symbol': cmd['symbol'],
+ }
+ self._to_ems.send_nowait(msg)
+
+
+_orders: OrderBook = None
+
+
+def get_orders(
+ emsd_uid: Tuple[str, str] = None
+) -> OrderBook:
+ """"
+ OrderBook singleton factory per actor.
+
+ """
+ if emsd_uid is not None:
+ # TODO: read in target emsd's active book on startup
+ pass
+
+ global _orders
+
+ if _orders is None:
+ # setup local ui event streaming channels for request/resp
+ # streamging with EMS daemon
+ _orders = OrderBook(*trio.open_memory_channel(100))
+
+ return _orders
+
+
+# TODO: make this a ``tractor.msg.pub``
+async def send_order_cmds():
+ """Order streaming task: deliver orders transmitted from UI
+ to downstream consumers.
+
+ This is run in the UI actor (usually the one running Qt but could be
+ any other client service code). This process simply delivers order
+ messages to the above ``_to_ems`` send channel (from sync code using
+ ``.send_nowait()``), these values are pulled from the channel here
+ and relayed to any consumer(s) that called this function using
+ a ``tractor`` portal.
+
+ This effectively makes order messages look like they're being
+ "pushed" from the parent to the EMS where local sync code is likely
+ doing the pushing from some UI.
+
+ """
+ book = get_orders()
+ orders_stream = book._from_order_book
+
+ # signal that ems connection is up and ready
+ book._ready_to_receive.set()
+
+ async for cmd in orders_stream:
+
+ # send msg over IPC / wire
+ log.info(f'Send order cmd:\n{pformat(cmd)}')
+ yield cmd
+
+
+@asynccontextmanager
+async def open_ems(
+ broker: str,
+ symbol: Symbol,
+) -> None:
+ """Spawn an EMS daemon and begin sending orders and receiving
+ alerts.
+
+
+ This EMS tries to reduce most broker's terrible order entry apis to
+ a very simple protocol built on a few easy to grok and/or
+ "rantsy" premises:
+
+ - most users will prefer "dark mode" where orders are not submitted
+ to a broker until and execution condition is triggered
+ (aka client-side "hidden orders")
+
+ - Brokers over-complicate their apis and generally speaking hire
+ poor designers to create them. We're better off using creating a super
+ minimal, schema-simple, request-event-stream protocol to unify all the
+ existing piles of shit (and shocker, it'll probably just end up
+ looking like a decent crypto exchange's api)
+
+ - all order types can be implemented with client-side limit orders
+
+ - we aren't reinventing a wheel in this case since none of these
+ brokers are exposing FIX protocol; it is they doing the re-invention.
+
+
+ TODO: make some fancy diagrams using mermaid.io
+
+ the possible set of responses from the stream is currently:
+ - 'dark_submitted', 'broker_submitted'
+ - 'dark_cancelled', 'broker_cancelled'
+ - 'dark_executed', 'broker_executed'
+ - 'broker_filled'
+ """
+ actor = tractor.current_actor()
+
+ # TODO: add ``maybe_spawn_emsd()`` for this
+ async with tractor.open_nursery() as n:
+
+ portal = await n.start_actor(
+ 'emsd',
+ enable_modules=[
+ 'piker.exchange._ems',
+ ],
+ )
+ trades_stream = await portal.run(
+ _ems_main,
+ client_actor_name=actor.name,
+ broker=broker,
+ symbol=symbol.key,
+
+ )
+
+ # wait for service to connect back to us signalling
+ # ready for order commands
+ book = get_orders()
+
+ with trio.fail_after(10):
+ await book._ready_to_receive.wait()
+
+ yield book, trades_stream
diff --git a/piker/exchange/_ems.py b/piker/exchange/_ems.py
index f594b4ac..465b9d15 100644
--- a/piker/exchange/_ems.py
+++ b/piker/exchange/_ems.py
@@ -20,13 +20,10 @@ In da suit parlances: "Execution management systems"
"""
from pprint import pformat
import time
-from datetime import datetime
-from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import (
AsyncIterator, Dict, Callable, Tuple,
)
-import uuid
from bidict import bidict
import trio
@@ -35,8 +32,8 @@ import tractor
from .. import data
from ..log import get_logger
-from ..data._source import Symbol
from ..data._normalize import iterticks
+from ._paper_engine import PaperBoi, simulate_fills
log = get_logger(__name__)
@@ -126,293 +123,6 @@ def get_dark_book(broker: str) -> _DarkBook:
_DEFAULT_SIZE: float = 1.0
-@dataclass
-class PaperBoi:
- """Emulates a broker order client providing the same API and
- order-event response event stream format but with methods for
- triggering desired events based on forward testing engine
- requirements.
-
- """
- broker: str
- _to_trade_stream: trio.abc.SendChannel
- trade_stream: trio.abc.ReceiveChannel
-
- # map of paper "live" orders which be used
- # to simulate fills based on paper engine settings
- _buys: bidict
- _sells: bidict
- _reqids: bidict
-
- # init edge case L1 spread
- last_ask: Tuple[float, float] = (float('inf'), 0) # price, size
- last_bid: Tuple[float, float] = (0, 0)
-
- async def submit_limit(
- self,
- oid: str, # XXX: see return value
- symbol: str,
- price: float,
- action: str,
- size: float,
- ) -> int:
- """Place an order and return integer request id provided by client.
-
- """
- # the trades stream expects events in the form
- # {'local_trades': (event_name, msg)}
- reqid = str(uuid.uuid4())
-
- # TODO: net latency model
- # we checkpoint here quickly particulalry
- # for dark orders since we want the dark_executed
- # to trigger first thus creating a lookup entry
- # in the broker trades event processing loop
- await trio.sleep(0.05)
-
- await self._to_trade_stream.send({
-
- 'local_trades': ('status', {
-
- 'time_ns': time.time_ns(),
- 'reqid': reqid,
-
- 'status': 'submitted',
- 'broker': self.broker,
- # 'cmd': cmd, # original request message
-
- 'paper_info': {
- 'oid': oid,
- },
- }),
- })
-
- # register order internally
- self._reqids[reqid] = (oid, symbol, action, price)
-
- # if we're already a clearing price simulate an immediate fill
- if (
- action == 'buy' and (clear_price := self.last_ask[0]) <= price
- ) or (
- action == 'sell' and (clear_price := self.last_bid[0]) >= price
- ):
- await self.fake_fill(clear_price, size, action, reqid, oid)
-
- else: # register this submissions as a paper live order
-
- # submit order to book simulation fill loop
- if action == 'buy':
- orders = self._buys
-
- elif action == 'sell':
- orders = self._sells
-
- # buys/sells: (symbol -> (price -> order))
- orders.setdefault(symbol, {})[price] = (size, oid, reqid, action)
-
- return reqid
-
- async def submit_cancel(
- self,
- reqid: str,
- ) -> None:
-
- # TODO: fake market simulation effects
- # await self._to_trade_stream.send(
- oid, symbol, action, price = self._reqids[reqid]
-
- if action == 'buy':
- self._buys[symbol].pop(price)
- elif action == 'sell':
- self._sells[symbol].pop(price)
-
- # TODO: net latency model
- await trio.sleep(0.05)
-
- await self._to_trade_stream.send({
-
- 'local_trades': ('status', {
-
- 'time_ns': time.time_ns(),
- 'oid': oid,
- 'reqid': reqid,
-
- 'status': 'cancelled',
- 'broker': self.broker,
- # 'cmd': cmd, # original request message
-
- 'paper': True,
- }),
- })
-
- async def fake_fill(
- self,
- price: float,
- size: float,
- action: str, # one of {'buy', 'sell'}
-
- reqid: str,
- oid: str,
-
- # determine whether to send a filled status that has zero
- # remaining lots to fill
- order_complete: bool = True,
- remaining: float = 0,
- ) -> None:
- """Pretend to fill a broker order @ price and size.
-
- """
- # TODO: net latency model
- await trio.sleep(0.05)
-
- await self._to_trade_stream.send({
-
- 'local_trades': ('fill', {
-
- 'status': 'filled',
- 'broker': self.broker,
- # converted to float by us in ib backend
- 'broker_time': datetime.now().timestamp(),
-
- 'action': action,
- 'size': size,
- 'price': price,
- 'remaining': 0 if order_complete else remaining,
-
- # normally filled by real `brokerd` daemon
- 'time': time.time_ns(),
- 'time_ns': time.time_ns(), # cuz why not
-
- # fake ids
- 'reqid': reqid,
-
- 'paper_info': {
- 'oid': oid,
- },
-
- # XXX: fields we might not need to emulate?
- # execution id from broker
- # 'execid': execu.execId,
- # 'cmd': cmd, # original request message?
- }),
- })
- if order_complete:
- await self._to_trade_stream.send({
-
- 'local_trades': ('status', {
- 'reqid': reqid,
- 'status': 'filled',
- 'broker': self.broker,
- 'filled': size,
- 'remaining': 0 if order_complete else remaining,
-
- # converted to float by us in ib backend
- 'broker_time': datetime.now().timestamp(),
- 'paper_info': {
- 'oid': oid,
- },
- }),
- })
-
-
-async def simulate_fills(
- quote_stream: 'tractor.ReceiveStream', # noqa
- client: PaperBoi,
-) -> None:
-
- # TODO: more machinery to better simulate real-world market things:
-
- # - slippage models, check what quantopian has:
- # https://github.com/quantopian/zipline/blob/master/zipline/finance/slippage.py
- # * this should help with simulating partial fills in a fast moving mkt
- # afaiu
-
- # - commisions models, also quantopian has em:
- # https://github.com/quantopian/zipline/blob/master/zipline/finance/commission.py
-
- # - network latency models ??
-
- # - position tracking:
- # https://github.com/quantopian/zipline/blob/master/zipline/finance/ledger.py
-
- # this stream may eventually contain multiple symbols
- async for quotes in quote_stream:
- for sym, quote in quotes.items():
-
- for tick in iterticks(
- quote,
- # dark order price filter(s)
- types=('ask', 'bid', 'trade', 'last')
- ):
- # print(tick)
- tick_price = tick.get('price')
- ttype = tick['type']
-
- if ttype in ('ask',):
-
- client.last_ask = (
- tick_price,
- tick.get('size', client.last_ask[1]),
- )
-
- buys = client._buys.get(sym, {})
-
- # iterate book prices descending
- for our_bid in reversed(sorted(buys.keys())):
- if tick_price < our_bid:
-
- # retreive order info
- (size, oid, reqid, action) = buys.pop(our_bid)
-
- # clearing price would have filled entirely
- await client.fake_fill(
- # todo slippage to determine fill price
- tick_price,
- size,
- action,
- reqid,
- oid,
- )
- else:
- # prices are interated in sorted order so
- # we're done
- break
-
- if ttype in ('bid',):
-
- client.last_bid = (
- tick_price,
- tick.get('size', client.last_bid[1]),
- )
-
- sells = client._sells.get(sym, {})
-
- # iterate book prices ascending
- for our_ask in sorted(sells.keys()):
- if tick_price > our_ask:
-
- # retreive order info
- (size, oid, reqid, action) = sells.pop(our_ask)
-
- # clearing price would have filled entirely
- await client.fake_fill(
- tick_price,
- size,
- action,
- reqid,
- oid,
- )
- else:
- # prices are interated in sorted order so
- # we're done
- break
-
- if ttype in ('trade', 'last'):
- # TODO: simulate actual book queues and our orders
- # place in it, might require full L2 data?
- pass
-
-
async def execute_triggers(
broker: str,
symbol: str,
@@ -772,6 +482,8 @@ async def _ems_main(
accept normalized trades responses, process and relay to ems client(s)
"""
+ from ._client import send_order_cmds
+
book = get_dark_book(broker)
# get a portal back to the client
@@ -913,181 +625,3 @@ async def _ems_main(
})
# continue and wait on next order cmd
-
-
-@dataclass
-class OrderBook:
- """Buy-side (client-side ?) order book ctl and tracking.
-
- A style similar to "model-view" is used here where this api is
- provided as a supervised control for an EMS actor which does all the
- hard/fast work of talking to brokers/exchanges to conduct
- executions.
-
- Currently, this is mostly for keeping local state to match the EMS
- and use received events to trigger graphics updates.
-
- """
- # mem channels used to relay order requests to the EMS daemon
- _to_ems: trio.abc.SendChannel
- _from_order_book: trio.abc.ReceiveChannel
-
- _sent_orders: Dict[str, dict] = field(default_factory=dict)
- _ready_to_receive: trio.Event = trio.Event()
-
- def send(
- self,
- uuid: str,
- symbol: 'Symbol',
- price: float,
- size: float,
- action: str,
- exec_mode: str,
- ) -> dict:
- cmd = {
- 'action': action,
- 'price': price,
- 'size': size,
- 'symbol': symbol.key,
- 'brokers': symbol.brokers,
- 'oid': uuid,
- 'exec_mode': exec_mode, # dark or live
- }
- self._sent_orders[uuid] = cmd
- self._to_ems.send_nowait(cmd)
- return cmd
-
- async def modify(self, oid: str, price) -> bool:
- ...
-
- def cancel(self, uuid: str) -> bool:
- """Cancel an order (or alert) from the EMS.
-
- """
- cmd = self._sent_orders[uuid]
- msg = {
- 'action': 'cancel',
- 'oid': uuid,
- 'symbol': cmd['symbol'],
- }
- self._to_ems.send_nowait(msg)
-
-
-_orders: OrderBook = None
-
-
-def get_orders(
- emsd_uid: Tuple[str, str] = None
-) -> OrderBook:
- """"
- OrderBook singleton factory per actor.
-
- """
- if emsd_uid is not None:
- # TODO: read in target emsd's active book on startup
- pass
-
- global _orders
-
- if _orders is None:
- # setup local ui event streaming channels for request/resp
- # streamging with EMS daemon
- _orders = OrderBook(*trio.open_memory_channel(100))
-
- return _orders
-
-
-# TODO: make this a ``tractor.msg.pub``
-async def send_order_cmds():
- """Order streaming task: deliver orders transmitted from UI
- to downstream consumers.
-
- This is run in the UI actor (usually the one running Qt but could be
- any other client service code). This process simply delivers order
- messages to the above ``_to_ems`` send channel (from sync code using
- ``.send_nowait()``), these values are pulled from the channel here
- and relayed to any consumer(s) that called this function using
- a ``tractor`` portal.
-
- This effectively makes order messages look like they're being
- "pushed" from the parent to the EMS where local sync code is likely
- doing the pushing from some UI.
-
- """
- book = get_orders()
- orders_stream = book._from_order_book
-
- # signal that ems connection is up and ready
- book._ready_to_receive.set()
-
- async for cmd in orders_stream:
-
- # send msg over IPC / wire
- log.info(f'Send order cmd:\n{pformat(cmd)}')
- yield cmd
-
-
-@asynccontextmanager
-async def open_ems(
- broker: str,
- symbol: Symbol,
- # task_status: TaskStatus[str] = trio.TASK_STATUS_IGNORED,
-) -> None:
- """Spawn an EMS daemon and begin sending orders and receiving
- alerts.
-
-
- This EMS tries to reduce most broker's terrible order entry apis to
- a very simple protocol built on a few easy to grok and/or
- "rantsy" premises:
-
- - most users will prefer "dark mode" where orders are not submitted
- to a broker until and execution condition is triggered
- (aka client-side "hidden orders")
-
- - Brokers over-complicate their apis and generally speaking hire
- poor designers to create them. We're better off using creating a super
- minimal, schema-simple, request-event-stream protocol to unify all the
- existing piles of shit (and shocker, it'll probably just end up
- looking like a decent crypto exchange's api)
-
- - all order types can be implemented with client-side limit orders
-
- - we aren't reinventing a wheel in this case since none of these
- brokers are exposing FIX protocol; it is they doing the re-invention.
-
-
- TODO: make some fancy diagrams using mermaid.io
-
- the possible set of responses from the stream is currently:
- - 'dark_submitted', 'broker_submitted'
- - 'dark_cancelled', 'broker_cancelled'
- - 'dark_executed', 'broker_executed'
- - 'broker_filled'
- """
- actor = tractor.current_actor()
- subactor_name = 'emsd'
-
- # TODO: add ``maybe_spawn_emsd()`` for this
- async with tractor.open_nursery() as n:
-
- portal = await n.start_actor(
- subactor_name,
- enable_modules=[__name__],
- )
- trades_stream = await portal.run(
- _ems_main,
- client_actor_name=actor.name,
- broker=broker,
- symbol=symbol.key,
-
- )
-
- # wait for service to connect back to us signalling
- # ready for order commands
- book = get_orders()
-
- with trio.fail_after(10):
- await book._ready_to_receive.wait()
-
- yield book, trades_stream
diff --git a/piker/exchange/_paper_engine.py b/piker/exchange/_paper_engine.py
new file mode 100644
index 00000000..35f24f98
--- /dev/null
+++ b/piker/exchange/_paper_engine.py
@@ -0,0 +1,317 @@
+# piker: trading gear for hackers
+# Copyright (C) Tyler Goodlet (in stewardship for piker0)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+"""
+Fake trading for forward testing.
+
+"""
+from datetime import datetime
+import time
+from typing import Tuple
+import uuid
+
+from bidict import bidict
+import trio
+from dataclasses import dataclass
+
+from ..data._normalize import iterticks
+
+
+@dataclass
+class PaperBoi:
+ """Emulates a broker order client providing the same API and
+ order-event response event stream format but with methods for
+ triggering desired events based on forward testing engine
+ requirements.
+
+ """
+ broker: str
+ _to_trade_stream: trio.abc.SendChannel
+ trade_stream: trio.abc.ReceiveChannel
+
+ # map of paper "live" orders which be used
+ # to simulate fills based on paper engine settings
+ _buys: bidict
+ _sells: bidict
+ _reqids: bidict
+
+ # init edge case L1 spread
+ last_ask: Tuple[float, float] = (float('inf'), 0) # price, size
+ last_bid: Tuple[float, float] = (0, 0)
+
+ async def submit_limit(
+ self,
+ oid: str, # XXX: see return value
+ symbol: str,
+ price: float,
+ action: str,
+ size: float,
+ ) -> int:
+ """Place an order and return integer request id provided by client.
+
+ """
+ # the trades stream expects events in the form
+ # {'local_trades': (event_name, msg)}
+ reqid = str(uuid.uuid4())
+
+ # TODO: net latency model
+ # we checkpoint here quickly particulalry
+ # for dark orders since we want the dark_executed
+ # to trigger first thus creating a lookup entry
+ # in the broker trades event processing loop
+ await trio.sleep(0.05)
+
+ await self._to_trade_stream.send({
+
+ 'local_trades': ('status', {
+
+ 'time_ns': time.time_ns(),
+ 'reqid': reqid,
+
+ 'status': 'submitted',
+ 'broker': self.broker,
+ # 'cmd': cmd, # original request message
+
+ 'paper_info': {
+ 'oid': oid,
+ },
+ }),
+ })
+
+ # register order internally
+ self._reqids[reqid] = (oid, symbol, action, price)
+
+ # if we're already a clearing price simulate an immediate fill
+ if (
+ action == 'buy' and (clear_price := self.last_ask[0]) <= price
+ ) or (
+ action == 'sell' and (clear_price := self.last_bid[0]) >= price
+ ):
+ await self.fake_fill(clear_price, size, action, reqid, oid)
+
+ else: # register this submissions as a paper live order
+
+ # submit order to book simulation fill loop
+ if action == 'buy':
+ orders = self._buys
+
+ elif action == 'sell':
+ orders = self._sells
+
+ # buys/sells: (symbol -> (price -> order))
+ orders.setdefault(symbol, {})[price] = (size, oid, reqid, action)
+
+ return reqid
+
+ async def submit_cancel(
+ self,
+ reqid: str,
+ ) -> None:
+
+ # TODO: fake market simulation effects
+ # await self._to_trade_stream.send(
+ oid, symbol, action, price = self._reqids[reqid]
+
+ if action == 'buy':
+ self._buys[symbol].pop(price)
+ elif action == 'sell':
+ self._sells[symbol].pop(price)
+
+ # TODO: net latency model
+ await trio.sleep(0.05)
+
+ await self._to_trade_stream.send({
+
+ 'local_trades': ('status', {
+
+ 'time_ns': time.time_ns(),
+ 'oid': oid,
+ 'reqid': reqid,
+
+ 'status': 'cancelled',
+ 'broker': self.broker,
+ # 'cmd': cmd, # original request message
+
+ 'paper': True,
+ }),
+ })
+
+ async def fake_fill(
+ self,
+ price: float,
+ size: float,
+ action: str, # one of {'buy', 'sell'}
+
+ reqid: str,
+ oid: str,
+
+ # determine whether to send a filled status that has zero
+ # remaining lots to fill
+ order_complete: bool = True,
+ remaining: float = 0,
+ ) -> None:
+ """Pretend to fill a broker order @ price and size.
+
+ """
+ # TODO: net latency model
+ await trio.sleep(0.05)
+
+ await self._to_trade_stream.send({
+
+ 'local_trades': ('fill', {
+
+ 'status': 'filled',
+ 'broker': self.broker,
+ # converted to float by us in ib backend
+ 'broker_time': datetime.now().timestamp(),
+
+ 'action': action,
+ 'size': size,
+ 'price': price,
+ 'remaining': 0 if order_complete else remaining,
+
+ # normally filled by real `brokerd` daemon
+ 'time': time.time_ns(),
+ 'time_ns': time.time_ns(), # cuz why not
+
+ # fake ids
+ 'reqid': reqid,
+
+ 'paper_info': {
+ 'oid': oid,
+ },
+
+ # XXX: fields we might not need to emulate?
+ # execution id from broker
+ # 'execid': execu.execId,
+ # 'cmd': cmd, # original request message?
+ }),
+ })
+ if order_complete:
+ await self._to_trade_stream.send({
+
+ 'local_trades': ('status', {
+ 'reqid': reqid,
+ 'status': 'filled',
+ 'broker': self.broker,
+ 'filled': size,
+ 'remaining': 0 if order_complete else remaining,
+
+ # converted to float by us in ib backend
+ 'broker_time': datetime.now().timestamp(),
+ 'paper_info': {
+ 'oid': oid,
+ },
+ }),
+ })
+
+
+async def simulate_fills(
+ quote_stream: 'tractor.ReceiveStream', # noqa
+ client: PaperBoi,
+) -> None:
+
+ # TODO: more machinery to better simulate real-world market things:
+
+ # - slippage models, check what quantopian has:
+ # https://github.com/quantopian/zipline/blob/master/zipline/finance/slippage.py
+ # * this should help with simulating partial fills in a fast moving mkt
+ # afaiu
+
+ # - commisions models, also quantopian has em:
+ # https://github.com/quantopian/zipline/blob/master/zipline/finance/commission.py
+
+ # - network latency models ??
+
+ # - position tracking:
+ # https://github.com/quantopian/zipline/blob/master/zipline/finance/ledger.py
+
+ # this stream may eventually contain multiple symbols
+ async for quotes in quote_stream:
+ for sym, quote in quotes.items():
+
+ for tick in iterticks(
+ quote,
+ # dark order price filter(s)
+ types=('ask', 'bid', 'trade', 'last')
+ ):
+ # print(tick)
+ tick_price = tick.get('price')
+ ttype = tick['type']
+
+ if ttype in ('ask',):
+
+ client.last_ask = (
+ tick_price,
+ tick.get('size', client.last_ask[1]),
+ )
+
+ buys = client._buys.get(sym, {})
+
+ # iterate book prices descending
+ for our_bid in reversed(sorted(buys.keys())):
+ if tick_price < our_bid:
+
+ # retreive order info
+ (size, oid, reqid, action) = buys.pop(our_bid)
+
+ # clearing price would have filled entirely
+ await client.fake_fill(
+ # todo slippage to determine fill price
+ tick_price,
+ size,
+ action,
+ reqid,
+ oid,
+ )
+ else:
+ # prices are interated in sorted order so
+ # we're done
+ break
+
+ if ttype in ('bid',):
+
+ client.last_bid = (
+ tick_price,
+ tick.get('size', client.last_bid[1]),
+ )
+
+ sells = client._sells.get(sym, {})
+
+ # iterate book prices ascending
+ for our_ask in sorted(sells.keys()):
+ if tick_price > our_ask:
+
+ # retreive order info
+ (size, oid, reqid, action) = sells.pop(our_ask)
+
+ # clearing price would have filled entirely
+ await client.fake_fill(
+ tick_price,
+ size,
+ action,
+ reqid,
+ oid,
+ )
+ else:
+ # prices are interated in sorted order so
+ # we're done
+ break
+
+ if ttype in ('trade', 'last'):
+ # TODO: simulate actual book queues and our orders
+ # place in it, might require full L2 data?
+ pass
diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py
index f11ad0f0..aed8c991 100644
--- a/piker/ui/_chart.py
+++ b/piker/ui/_chart.py
@@ -62,7 +62,7 @@ from ..log import get_logger
from ._exec import run_qtractor, current_screen
from ._interaction import ChartView, open_order_mode
from .. import fsp
-from ..exchange._ems import open_ems
+from ..exchange._client import open_ems
log = get_logger(__name__)
diff --git a/piker/ui/_interaction.py b/piker/ui/_interaction.py
index 4f6ee19e..e497a677 100644
--- a/piker/ui/_interaction.py
+++ b/piker/ui/_interaction.py
@@ -33,7 +33,7 @@ import numpy as np
from ..log import get_logger
from ._style import _min_points_to_show, hcolor, _font
from ._graphics._lines import order_line, LevelLine
-from ..exchange._ems import OrderBook
+from ..exchange._client import OrderBook
from ..data._source import Symbol
diff --git a/piker/ui/cli.py b/piker/ui/cli.py
index 78100523..fcb9b854 100644
--- a/piker/ui/cli.py
+++ b/piker/ui/cli.py
@@ -16,6 +16,7 @@
"""
Console interface to UI components.
+
"""
from functools import partial
import os
@@ -149,6 +150,8 @@ def chart(config, symbol, profile):
tractor_kwargs={
'debug_mode': True,
'loglevel': tractorloglevel,
- 'enable_modules': ['piker.exchange._ems'],
+ 'enable_modules': [
+ 'piker.exchange._client'
+ ],
},
)