diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index cd0795b3..bfaaf82a 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -26,7 +26,6 @@ from typing import AsyncIterator, Callable from bidict import bidict from pydantic import BaseModel import trio -from trio_typing import TaskStatus import tractor from .. import data @@ -224,44 +223,6 @@ async def clear_dark_triggers( resp = 'dark_triggered' - # an internal brokerd-broker specific - # order-request id is expected to be generated - - # reqid = await client.submit_limit( - - # oid=oid, - - # # this is a brand new order request for the - # # underlying broker so we set a "broker - # # request id" (brid) to "nothing" so that the - # # broker client knows that we aren't trying - # # to modify an existing order-request. - # brid=None, - - # symbol=sym, - # action=cmd['action'], - # price=submit_price, - # size=cmd['size'], - # ) - - # # register broker request id to ems id - - # else: - # # alerts have no broker request id - # reqid = '' - - # resp = { - # 'resp': 'dark_executed', - # 'cmd': cmd, # original request message - - # 'time_ns': time.time_ns(), - # 'trigger_price': price, - - # 'broker_reqid': reqid, - # 'broker': broker, - # 'oid': oid, # piker order id - - # } msg = Status( oid=oid, # piker order id resp=resp, @@ -270,7 +231,6 @@ async def clear_dark_triggers( symbol=symbol, trigger_price=price, - # broker_reqid=reqid, broker_details={'name': broker}, cmd=cmd, # original request message @@ -281,7 +241,6 @@ async def clear_dark_triggers( log.info(f'removing pred for {oid}') execs.pop(oid) - # await ctx.send_yield(resp) await ems_client_order_stream.send(msg) else: # condition scan loop complete @@ -292,51 +251,6 @@ async def clear_dark_triggers( # print(f'execs scan took: {time.time() - start}') -# async def start_clearing( - -# # ctx: tractor.Context, -# brokerd_order_stream: tractor.MsgStream, -# quote_stream: tractor.MsgStream, - -# # client: 'Client', - -# # feed: 'Feed', # noqa -# broker: str, -# symbol: str, -# _exec_mode: str, - -# book: _DarkBook, - -# # task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED, - -# ) -> AsyncIterator[dict]: -# """Main scan loop for order execution conditions and submission -# to brokers. - -# """ -# async with trio.open_nursery() as n: - -# # trigger scan and exec loop -# n.start_soon( -# trigger_executions, - -# brokerd_order_stream, -# quote_stream, - -# broker, -# symbol, -# book -# # ctx, -# # client, -# ) - -# # # paper engine simulator task -# # if _exec_mode == 'paper': -# # # TODO: make this an actual broadcast channels as in: -# # # https://github.com/python-trio/trio/issues/987 -# # n.start_soon(simulate_fills, quote_stream, client) - - # TODO: lots of cases still to handle # XXX: right now this is very very ad-hoc to IB # - short-sale but securities haven't been located, in this case we @@ -349,15 +263,11 @@ async def clear_dark_triggers( async def translate_and_relay_brokerd_events( - # ctx: tractor.Context, broker: str, ems_client_order_stream: tractor.MsgStream, brokerd_trades_stream: tractor.MsgStream, book: _DarkBook, - # feed: 'Feed', # noqa - task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED, - ) -> AsyncIterator[dict]: """Trades update loop - receive updates from broker, convert to EMS responses, transmit to ordering client(s). @@ -377,27 +287,13 @@ async def translate_and_relay_brokerd_events( {'presubmitted', 'submitted', 'cancelled', 'inactive'} """ - # broker = feed.mod.name - - # TODO: make this a context - # in the paper engine case this is just a mem receive channel - # async with feed.receive_trades_data() as brokerd_trades_stream: - - # first = await brokerd_trades_stream.__anext__() - - # startup msg expected as first from broker backend - # assert first['local_trades'] == 'start' - # task_status.started() - async for brokerd_msg in brokerd_trades_stream: - # name, msg = event['local_trades'] name = brokerd_msg['name'] log.info(f'Received broker trade event:\n{pformat(brokerd_msg)}') if name == 'position': - # msg['resp'] = 'position' # relay through position msgs immediately await ems_client_order_stream.send( @@ -476,34 +372,6 @@ async def translate_and_relay_brokerd_events( # a live flow now exists oid = entry.oid - # make response packet to EMS client(s) - # reqid = book._ems_entries.get(oid) - - # # msg is for unknown emsd order id - # if oid is None: - # oid = msg['oid'] - - # # XXX: paper clearing special cases - # # paper engine race case: ``Client.submit_limit()`` hasn't - # # returned yet and provided an output reqid to register - # # locally, so we need to retreive the oid that was already - # # packed at submission since we already know it ahead of - # # time - # paper = msg.get('paper_info') - # if paper: - # oid = paper['oid'] - - # else: - # msg.get('external') - # if not msg: - # log.error(f"Unknown trade event {event}") - - # continue - - # resp = { - # 'resp': None, # placeholder - # 'oid': oid - # } resp = None broker_details = {} @@ -551,7 +419,6 @@ async def translate_and_relay_brokerd_events( # everyone doin camel case msg = BrokerdStatus(**brokerd_msg) - # status = msg['status'].lower() if msg.status == 'filled': @@ -563,12 +430,10 @@ async def translate_and_relay_brokerd_events( log.info(f'Execution for {oid} is complete!') - # just log it else: log.info(f'{broker} filled {msg}') - else: # one of {submitted, cancelled} resp = 'broker_' + msg.status @@ -604,13 +469,11 @@ async def translate_and_relay_brokerd_events( async def process_client_order_cmds( - # ctx: tractor.Context, client_order_stream: tractor.MsgStream, # noqa brokerd_order_stream: tractor.MsgStream, symbol: str, feed: 'Feed', # noqa - # client: 'Client', # noqa dark_book: _DarkBook, ) -> None: @@ -643,7 +506,6 @@ async def process_client_order_cmds( # NOTE: cancel response will be relayed back in messages # from corresponding broker - # await client.submit_cancel(reqid=reqid) await brokerd_order_stream.send(msg.dict()) else: @@ -675,12 +537,6 @@ async def process_client_order_cmds( msg = Order(**cmd) - # sym = cmd['symbol'] - # trigger_price = cmd['price'] - # size = cmd['size'] - # brokers = cmd['brokers'] - # exec_mode = cmd['exec_mode'] - sym = msg.symbol trigger_price = msg.price size = msg.size @@ -722,20 +578,6 @@ async def process_client_order_cmds( print(f'sending live order {msg}') await brokerd_order_stream.send(msg.dict()) - # order_id = await client.submit_limit( - - # oid=oid, # no ib support for oids... - - # # if this is None, creates a new order - # # otherwise will modify any existing one - # brid=brid, - - # symbol=sym, - # action=action, - # price=trigger_price, - # size=size, - # ) - # an immediate response should be brokerd ack with order # id but we register our request as part of the flow dark_book._ems_entries[oid] = msg @@ -793,14 +635,6 @@ async def process_client_order_cmds( abs_diff_away ) - # TODO: if the predicate resolves immediately send the - # execution to the broker asap? Or no? - - # ack-response that order is live in EMS - # await ctx.send_yield( - # {'resp': 'dark_submitted', - # 'oid': oid} - # ) if action == 'alert': resp = 'alert_submitted' else: @@ -846,8 +680,9 @@ async def _emsd_main( sets up brokerd feed, order feed with ems client, trades dialogue with brokderd trading api. | - - ``start_clearing()``: - run (dark) conditions on inputs and trigger broker submissions + - ``clear_dark_triggers()``: + run (dark order) conditions on inputs and trigger brokerd "live" + order submissions. | - ``translate_and_relay_brokerd_events()``: accept normalized trades responses from brokerd, process and @@ -899,6 +734,10 @@ async def _emsd_main( if trades_endpoint is None or _exec_mode == 'paper': + # for paper mode we need to mock this trades response feed + # so we load bidir stream to a new sub-actor running a + # paper-simulator clearing engine. + # load the paper trading engine _exec_mode = 'paper' log.warning(f'Entering paper trading mode for {broker}') @@ -912,14 +751,6 @@ async def _emsd_main( loglevel=loglevel, ) - # for paper mode we need to mock this trades response feed - # so we pass a duck-typed feed-looking mem chan which is fed - # fill and submission events from the exec loop - # feed._trade_stream = client.trade_stream - - # init the trades stream - # client._to_trade_stream.send_nowait({'local_trades': 'start'}) - else: # open live brokerd trades endpoint open_trades_endpoint = portal.open_context( @@ -931,36 +762,9 @@ async def _emsd_main( open_trades_endpoint as (brokerd_ctx, positions), brokerd_ctx.open_stream() as brokerd_trades_stream, ): - - # if trades_endpoint is not None and _exec_mode != 'paper': - - # # TODO: open a bidir stream here? - # # we have an order API for this broker - # client = client_factory(feed._brokerd_portal) - - # else: - - # return control to parent task - # task_status.started((first_quote, feed, client)) - - # stream = feed.stream - - # start the real-time clearing condition scan loop and - # paper engine simulator. - - # n.start_soon( - # start_clearing, - # brokerd_trades_stream, - # feed.stream, # quote stream - # # client, - # broker, - # symbol, - # _exec_mode, - # book, - # ) - # signal to client that we're started - # TODO: we could send back **all** brokerd positions here? + # TODO: we could eventually send back **all** brokerd + # positions here? await ems_ctx.started(positions) # establish 2-way stream with requesting order-client and @@ -978,14 +782,15 @@ async def _emsd_main( broker, symbol, book - # ctx, - # client, ) # begin processing order events from the target brokerd backend + # by receiving order submission response messages, + # normalizing them to EMS messages and relaying back to + # the piker order client. n.start_soon( - translate_and_relay_brokerd_events, + broker, ems_client_order_stream, brokerd_trades_stream,