From f3ae8db04b7707b9c42b054c0055271325b99a63 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 18 Jan 2021 19:55:50 -0500 Subject: [PATCH] Big refactor; start paper client --- piker/_ems.py | 561 ++++++++++++++++++++++++----------------- piker/data/__init__.py | 27 +- 2 files changed, 342 insertions(+), 246 deletions(-) diff --git a/piker/_ems.py b/piker/_ems.py index f7aa2792..dec7989c 100644 --- a/piker/_ems.py +++ b/piker/_ems.py @@ -20,6 +20,7 @@ In suit parlance: "Execution management systems" """ from pprint import pformat import time +from contextlib import asynccontextmanager from dataclasses import dataclass, field from typing import ( AsyncIterator, Dict, Callable, Tuple, @@ -37,105 +38,6 @@ from .data._source import Symbol log = get_logger(__name__) -# setup local ui event streaming channels for request/resp -# streamging with EMS daemon -_to_ems, _from_order_book = trio.open_memory_channel(100) - - -@dataclass -class OrderBook: - """Buy-side (client-side ?) order book ctl and tracking. - - A style similar to "model-view" is used here where this api is - provided as a supervised control for an EMS actor which does all the - hard/fast work of talking to brokers/exchanges to conduct - executions. - - Currently, mostly for keeping local state to match the EMS and use - received events to trigger graphics updates. - - """ - _sent_orders: Dict[str, dict] = field(default_factory=dict) - # _confirmed_orders: Dict[str, dict] = field(default_factory=dict) - - _to_ems: trio.abc.SendChannel = _to_ems - _from_order_book: trio.abc.ReceiveChannel = _from_order_book - - def send( - self, - uuid: str, - symbol: 'Symbol', - price: float, - action: str, - ) -> str: - cmd = { - 'action': action, - 'price': price, - 'symbol': symbol.key, - 'brokers': symbol.brokers, - 'oid': uuid, - } - self._sent_orders[uuid] = cmd - self._to_ems.send_nowait(cmd) - - async def modify(self, oid: str, price) -> bool: - ... - - def cancel(self, uuid: str) -> bool: - """Cancel an order (or alert) from the EMS. - - """ - cmd = self._sent_orders[uuid] - msg = { - 'action': 'cancel', - 'oid': uuid, - 'symbol': cmd['symbol'], - } - self._to_ems.send_nowait(msg) - - -_orders: OrderBook = None - - -def get_orders(emsd_uid: Tuple[str, str] = None) -> OrderBook: - - if emsd_uid is not None: - # TODO: read in target emsd's active book on startup - pass - - global _orders - - if _orders is None: - _orders = OrderBook() - - return _orders - - -# TODO: make this a ``tractor.msg.pub`` -async def send_order_cmds(): - """Order streaming task: deliver orders transmitted from UI - to downstream consumers. - - This is run in the UI actor (usually the one running Qt but could be - any other client service code). This process simply delivers order - messages to the above ``_to_ems`` send channel (from sync code using - ``.send_nowait()``), these values are pulled from the channel here - and relayed to any consumer(s) that called this function using - a ``tractor`` portal. - - This effectively makes order messages look like they're being - "pushed" from the parent to the EMS where local sync code is likely - doing the pushing from some UI. - - """ - global _from_order_book - - async for cmd in _from_order_book: - - # send msg over IPC / wire - log.info(f'sending order cmd: {cmd}') - yield cmd - # TODO: numba all of this def mk_check(trigger_price, known_last) -> Callable[[float, float], bool]: @@ -181,7 +83,6 @@ class _ExecBook: # levels which have an executable action (eg. alert, order, signal) orders: Dict[ - # Tuple[str, str], str, # symbol Dict[ str, # uuid @@ -212,10 +113,48 @@ def get_book(broker: str) -> _ExecBook: return _books.setdefault(broker, _ExecBook(broker)) +@dataclass +class PaperBoi: + """Emulates a broker order client providing the same API and + order-event response event stream format but with methods for + triggering desired events based on forward testing engine + requirements. + + """ + _to_trade_stream: trio.abc.SendChannel + trade_stream: trio.abc.ReceiveChannel + + async def submit_limit( + self, + oid: str, # XXX: see return value + symbol: str, + price: float, + action: str, + size: int = 100, + ) -> int: + """Place an order and return integer request id provided by client. + + """ + + async def submit_cancel( + self, + reqid: str, + ) -> None: + + # TODO: fake market simulation effects + self._to_trade_stream() + + def emulate_fill( + self + ) -> None: + ... + + async def exec_loop( ctx: tractor.Context, broker: str, symbol: str, + _exec_mode: str, task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED, ) -> AsyncIterator[dict]: @@ -231,7 +170,27 @@ async def exec_loop( book.lasts[(broker, symbol)] = first_quote[symbol]['last'] # TODO: wrap this in a more re-usable general api - client = feed.mod.get_client_proxy(feed._brokerd_portal) + client_factory = getattr(feed.mod, 'get_client_proxy', None) + + # we have an order API for this broker + if client_factory is not None and _exec_mode != 'paper': + client = client_factory(feed._brokerd_portal) + + # force paper mode + else: + log.warning( + f'No order client is yet supported for {broker}, ' + 'entering paper mode') + + client = PaperBoi(*trio.open_memory_channel(100)) + + # for paper mode we need to mock this trades response feed + # so we pass a duck-typed feed-looking mem chan which is fed + # fill and submission events from the exec loop + feed._set_fake_trades_stream(client.trade_stream) + + # init the trades stream + client._to_trade_stream.send_nowait({'local_trades': 'start'}) # return control to parent task task_status.started((first_quote, feed, client)) @@ -245,8 +204,7 @@ async def exec_loop( stream = feed.stream with stream.shield(): - # this stream may eventually contain multiple - # symbols + # this stream may eventually contain multiple symbols async for quotes in stream: # TODO: numba all this! @@ -269,37 +227,39 @@ async def exec_loop( for oid, (pred, name, cmd) in tuple(execs.items()): - # push trigger msg back to parent as an "alert" - # (mocking for eg. a "fill") - if pred(price): + # majority of iterations will be non-matches + if not pred(price): + continue - # register broker id for ems id - reqid = await client.submit_limit( - # oid=oid, - symbol=sym, - action=cmd['action'], - price=round(price, 2), - ) - book._broker2ems_ids[reqid] = oid + reqid = await client.submit_limit( + oid=oid, + symbol=sym, + action=cmd['action'], + price=round(price, 2), + size=1, + ) + # register broker request id to ems id + book._broker2ems_ids[reqid] = oid - resp = { - 'resp': 'dark_exec', - 'name': name, - 'time_ns': time.time_ns(), - 'trigger_price': price, - 'broker_reqid': reqid, - 'broker': broker, - # 'condition': True, + resp = { + 'resp': 'dark_executed', + 'name': name, + 'time_ns': time.time_ns(), + 'trigger_price': price, + 'broker_reqid': reqid, + 'broker': broker, + 'oid': oid, + 'cmd': cmd, # original request message - # current shm array index - this needed? - 'ohlc_index': feed.shm._last.value - 1, - } + # current shm array index - this needed? + # 'ohlc_index': feed.shm._last.value - 1, + } - # remove exec-condition from set - log.info(f'removing pred for {oid}') - pred, name, cmd = execs.pop(oid) + # remove exec-condition from set + log.info(f'removing pred for {oid}') + pred, name, cmd = execs.pop(oid) - await ctx.send_yield(resp) + await ctx.send_yield(resp) else: # condition scan loop complete log.debug(f'execs are {execs}') @@ -310,8 +270,8 @@ async def exec_loop( # feed teardown -# XXX: right now this is very very ad-hoc to IB # TODO: lots of cases still to handle +# XXX: right now this is very very ad-hoc to IB # - short-sale but securities haven't been located, in this case we # should probably keep the order in some kind of weird state or cancel # it outright? @@ -333,24 +293,38 @@ async def process_broker_trades( and appropriate responses relayed back to the original EMS client actor. There is a messaging translation layer throughout. + Expected message translation(s): + + broker ems + 'error' -> log it locally (for now) + 'status' -> relabel as 'broker_', if complete send 'executed' + 'fill' -> 'broker_filled' + + Currently accepted status values from IB + {'presubmitted', 'submitted', 'cancelled'} + """ - trades_stream = await feed.recv_trades_data() - first = await trades_stream.__anext__() + broker = feed.mod.name + + with trio.fail_after(3): + trades_stream = await feed.recv_trades_data() + first = await trades_stream.__anext__() # startup msg assert first['local_trades'] == 'start' task_status.started() - async for msg in trades_stream: - name, ev = msg['local_trades'] - log.info(f'Received broker trade event:\n{pformat(ev)}') + async for event in trades_stream: - # broker request id - must be normalized - # into error transmission by broker backend. - reqid = ev['reqid'] - oid = book._broker2ems_ids.get(reqid) + name, msg = event['local_trades'] + log.info(f'Received broker trade event:\n{pformat(msg)}') + + # Get the broker (order) request id, this **must** be normalized + # into messaging provided by the broker backend + reqid = msg['reqid'] # make response packet to EMS client(s) + oid = book._broker2ems_ids.get(reqid) resp = {'oid': oid} if name in ('error',): @@ -358,18 +332,37 @@ async def process_broker_trades( # for ex. on an error do we react with a dark orders # management response, like cancelling all dark orders? + # This looks like a supervision policy for pending orders on + # some unexpected failure - something we need to think more + # about. In most default situations, with composed orders + # (ex. brackets), most brokers seem to use a oca policy. + + message = msg['message'] + # XXX should we make one when it's blank? - log.error(pformat(ev['message'])) + log.error(pformat(message)) + + # another stupid ib error to handle + # if 10147 in message: cancel elif name in ('status',): - status = ev['status'].lower() + # everyone doin camel case + status = msg['status'].lower() if status == 'filled': - # conditional execution is fully complete - if not ev['remaining']: + + # conditional execution is fully complete, no more + # fills for the noted order + if not msg['remaining']: + await ctx.send_yield( + {'resp': 'broker_executed', 'oid': oid}) log.info(f'Execution for {oid} is complete!') - await ctx.send_yield({'resp': 'executed', 'oid': oid}) + + # just log it + else: + log.info(f'{broker} filled {msg}') + else: # one of (submitted, cancelled) resp['resp'] = 'broker_' + status @@ -379,10 +372,9 @@ async def process_broker_trades( elif name in ('fill',): # proxy through the "fill" result(s) resp['resp'] = 'broker_filled' - resp.update(ev) - - log.info(f'Fill for {oid} cleared with\n{pformat(resp)}') + resp.update(msg) await ctx.send_yield(resp) + log.info(f'Fill for {oid} cleared with\n{pformat(resp)}') @tractor.stream @@ -393,31 +385,50 @@ async def _ems_main( symbol: str, mode: str = 'live', # ('paper', 'dark', 'live') ) -> None: - """EMS (sub)actor entrypoint. + """EMS (sub)actor entrypoint providing the + execution management (micro)service which conducts broker + order control on behalf of clients. - This is the daemon (child) side routine which starts an EMS - runtime per broker/feed and and begins streaming back alerts - from executions to order clients. + This is the daemon (child) side routine which starts an EMS runtime + (one per broker-feed) and and begins streaming back alerts from + broker executions/fills. + + ``send_order_cmds()`` is called here to execute in a task back in + the actor which started this service (spawned this actor), presuming + capabilities allow it, such that requests for EMS executions are + received in a stream from that client actor and then responses are + streamed back up to the original calling task in the same client. + + The task tree is: + - ``_ems_main()``: + accepts order cmds, registers execs with exec loop + + - ``exec_loop()``: run conditions on inputs and trigger executions + + - ``process_broker_trades()``: + accept normalized trades responses, process and relay to ems client(s) """ actor = tractor.current_actor() book = get_book(broker) - # new router entry point + # get a portal back to the client async with tractor.wait_for_actor(client_actor_name) as portal: # spawn one task per broker feed async with trio.open_nursery() as n: # TODO: eventually support N-brokers + + # start the condition scan loop quote, feed, client = await n.start( exec_loop, ctx, broker, symbol, + mode, ) - # for paper mode we need to mock this trades response feed await n.start( process_broker_trades, ctx, @@ -425,6 +436,7 @@ async def _ems_main( book, ) + # connect back to the calling actor to receive order requests async for cmd in await portal.run(send_order_cmds): log.info(f'{cmd} received in {actor.uid}') @@ -435,7 +447,7 @@ async def _ems_main( if action in ('cancel',): # check for live-broker order - brid = book._broker2ems_ids.inverse[oid] + brid = book._broker2ems_ids.inverse.get(oid) if brid: log.info("Submitting cancel for live order") await client.submit_cancel(reqid=brid) @@ -443,10 +455,11 @@ async def _ems_main( # check for EMS active exec else: book.orders[symbol].pop(oid, None) - await ctx.send_yield( - {'action': 'dark_cancelled', - 'oid': oid} - ) + + await ctx.send_yield({ + 'resp': 'dark_cancelled', + 'oid': oid + }) elif action in ('alert', 'buy', 'sell',): @@ -457,62 +470,163 @@ async def _ems_main( last = book.lasts[(broker, sym)] - if action in ('buy', 'sell',): + if mode == 'live' and action in ('buy', 'sell',): + + # register broker id for ems id + order_id = await client.submit_limit( + oid=oid, # no ib support for this + symbol=sym, + action=action, + price=round(trigger_price, 2), + size=1, + ) + book._broker2ems_ids[order_id] = oid + + # XXX: the trades data broker response loop + # (``process_broker_trades()`` above) will + # handle sending the ems side acks back to + # the cmd sender from here + + elif mode in ('dark', 'paper') or action in ('alert'): # if the predicate resolves immediately send the # execution to the broker asap # if pred(last): - if mode == 'live': - # send order - log.warning("ORDER FILLED IMMEDIATELY!?!?!?!") - # IF SEND ORDER RIGHT AWAY CONDITION + # send order - # register broker id for ems id - order_id = await client.submit_limit( - oid=oid, # no ib support for this - symbol=sym, - action=action, - price=round(trigger_price, 2), - size=1, - ) - book._broker2ems_ids[order_id] = oid + # IF SEND ORDER RIGHT AWAY CONDITION - # book.orders[symbol][oid] = None + # submit order to local EMS - # XXX: the trades data broker response loop - # (``process_broker_trades()`` above) will - # handle sending the ems side acks back to - # the cmd sender from here + # Auto-gen scanner predicate: + # we automatically figure out what the alert check + # condition should be based on the current first + # price received from the feed, instead of being + # like every other shitty tina platform that makes + # the user choose the predicate operator. + pred, name = mk_check(trigger_price, last) - elif mode in {'dark', 'paper'}: + # submit execution/order to EMS scanner loop + book.orders.setdefault( + sym, {} + )[oid] = (pred, name, cmd) - # Auto-gen scanner predicate: - # we automatically figure out what the alert check - # condition should be based on the current first - # price received from the feed, instead of being - # like every other shitty tina platform that makes - # the user choose the predicate operator. - pred, name = mk_check(trigger_price, last) - - # submit execution/order to EMS scanner loop - book.orders.setdefault( - (broker, sym), {} - )[oid] = (pred, name, cmd) - - # ack-response that order is live here - await ctx.send_yield({ - 'resp': 'dark_submitted', - 'oid': oid - }) + # ack-response that order is live here + await ctx.send_yield({ + 'resp': 'dark_submitted', + 'oid': oid + }) # continue and wait on next order cmd +@dataclass +class OrderBook: + """Buy-side (client-side ?) order book ctl and tracking. + + A style similar to "model-view" is used here where this api is + provided as a supervised control for an EMS actor which does all the + hard/fast work of talking to brokers/exchanges to conduct + executions. + + Currently, mostly for keeping local state to match the EMS and use + received events to trigger graphics updates. + + """ + _to_ems: trio.abc.SendChannel + _from_order_book: trio.abc.ReceiveChannel + + _sent_orders: Dict[str, dict] = field(default_factory=dict) + _ready_to_receive: trio.Event = trio.Event() + + def send( + self, + uuid: str, + symbol: 'Symbol', + price: float, + action: str, + ) -> str: + cmd = { + 'action': action, + 'price': price, + 'symbol': symbol.key, + 'brokers': symbol.brokers, + 'oid': uuid, + } + self._sent_orders[uuid] = cmd + self._to_ems.send_nowait(cmd) + + async def modify(self, oid: str, price) -> bool: + ... + + def cancel(self, uuid: str) -> bool: + """Cancel an order (or alert) from the EMS. + + """ + cmd = self._sent_orders[uuid] + msg = { + 'action': 'cancel', + 'oid': uuid, + 'symbol': cmd['symbol'], + } + self._to_ems.send_nowait(msg) + + +_orders: OrderBook = None + + +def get_orders(emsd_uid: Tuple[str, str] = None) -> OrderBook: + + if emsd_uid is not None: + # TODO: read in target emsd's active book on startup + pass + + global _orders + + if _orders is None: + # setup local ui event streaming channels for request/resp + # streamging with EMS daemon + # _to_ems, _from_order_book = trio.open_memory_channel(100) + _orders = OrderBook(*trio.open_memory_channel(100)) + + return _orders + + +# TODO: make this a ``tractor.msg.pub`` +async def send_order_cmds(): + """Order streaming task: deliver orders transmitted from UI + to downstream consumers. + + This is run in the UI actor (usually the one running Qt but could be + any other client service code). This process simply delivers order + messages to the above ``_to_ems`` send channel (from sync code using + ``.send_nowait()``), these values are pulled from the channel here + and relayed to any consumer(s) that called this function using + a ``tractor`` portal. + + This effectively makes order messages look like they're being + "pushed" from the parent to the EMS where local sync code is likely + doing the pushing from some UI. + + """ + book = get_orders() + orders_stream = book._from_order_book + + # signal that ems connection is up and ready + book._ready_to_receive.set() + + async for cmd in orders_stream: + + # send msg over IPC / wire + log.info(f'sending order cmd: {cmd}') + yield cmd + + +@asynccontextmanager async def open_ems( - order_mode, broker: str, symbol: Symbol, - task_status: TaskStatus[str] = trio.TASK_STATUS_IGNORED, + # task_status: TaskStatus[str] = trio.TASK_STATUS_IGNORED, ) -> None: """Spawn an EMS daemon and begin sending orders and receiving alerts. @@ -538,11 +652,15 @@ async def open_ems( brokers are exposing FIX protocol; it is they doing the re-invention. - TODO: make some fancy diagrams using this: + TODO: make some fancy diagrams using mermaid.io + the possible set of responses from the stream is currently: + - 'dark_submitted', 'broker_submitted' + - 'dark_cancelled', 'broker_cancelled' + - 'dark_executed', 'broker_executed' + - 'broker_filled' """ - actor = tractor.current_actor() subactor_name = 'emsd' @@ -553,7 +671,7 @@ async def open_ems( subactor_name, enable_modules=[__name__], ) - stream = await portal.run( + trades_stream = await portal.run( _ems_main, client_actor_name=actor.name, broker=broker, @@ -561,40 +679,11 @@ async def open_ems( ) - async with tractor.wait_for_actor(subactor_name): - # let parent task continue - task_status.started(_to_ems) + # wait for service to connect back to us signalling + # ready for order commands + book = get_orders() - # Begin order-response streaming + with trio.fail_after(3): + await book._ready_to_receive.wait() - # this is where we receive **back** messages - # about executions **from** the EMS actor - async for msg in stream: - log.info(f'Received order msg: {pformat(msg)}') - - # delete the line from view - oid = msg['oid'] - resp = msg['resp'] - - # response to 'action' request (buy/sell) - if resp in ('dark_submitted', 'broker_submitted'): - log.info(f"order accepted: {msg}") - - # show line label once order is live - order_mode.on_submit(oid) - - # resp to 'cancel' request or error condition for action request - elif resp in ('broker_cancelled', 'dark_cancelled'): - - # delete level from view - order_mode.on_cancel(oid) - log.info(f'deleting line with oid: {oid}') - - # response to completed 'action' request for buy/sell - elif resp in ('executed',): - await order_mode.on_exec(oid, msg) - - # each clearing tick is responded individually - elif resp in ('broker_filled',): - # TODO: some kinda progress system - order_mode.on_fill(oid, msg) + yield book, trades_stream diff --git a/piker/data/__init__.py b/piker/data/__init__.py index f2551993..fe50eda6 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) +# Copyright (C) Tyler Goodlet (in stewardship for piker0) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -26,10 +26,10 @@ from contextlib import asynccontextmanager from importlib import import_module from types import ModuleType from typing import ( - Dict, List, Any, - Sequence, AsyncIterator, Optional + Dict, Any, Sequence, AsyncIterator, Optional ) +import trio import tractor from ..brokers import get_brokermod @@ -165,19 +165,26 @@ class Feed: return self._index_stream + def _set_fake_trades_stream( + self, + recv_chan: trio.abc.ReceiveChannel, + ) -> None: + self._trade_stream = recv_chan + async def recv_trades_data(self) -> AsyncIterator[dict]: if not getattr(self.mod, 'stream_trades', False): - log.warning(f"{self.mod.name} doesn't have trade data support yet :(") + log.warning( + f"{self.mod.name} doesn't have trade data support yet :(") - # yah this is bullshitty but it worx - async def nuttin(): - yield - return - - return nuttin() + if not self._trade_stream: + raise RuntimeError( + f'Can not stream trade data from {self.mod.name}') + # NOTE: this can be faked by setting a rx chan + # using the ``_.set_fake_trades_stream()`` method if not self._trade_stream: + self._trade_stream = await self._brokerd_portal.run( self.mod.stream_trades,