From b06cb5bb5aa3f939c308ba180fafdcabb12f4a00 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 10 Jun 2021 08:24:10 -0400 Subject: [PATCH] Comments clean and improvments --- piker/_daemon.py | 1 - piker/clearing/_client.py | 3 +- piker/clearing/_ems.py | 92 +++++++++++++++++---------------- piker/clearing/_paper_engine.py | 8 --- piker/ui/order_mode.py | 4 -- 5 files changed, 49 insertions(+), 59 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index 5e102e4c..009adad5 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -248,7 +248,6 @@ async def maybe_spawn_daemon( if pikerd_portal is None: # we are root so spawn brokerd directly in our tree # the root nursery is accessed through process global state - # await spawn_brokerd(brokername, loglevel=loglevel) await spawn_func(**spawn_args) else: diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 47c79636..fcceeaac 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -223,12 +223,11 @@ async def open_ems( # connect to emsd portal.open_context( + _emsd_main, broker=broker, symbol=symbol.key, - # TODO: ``first`` here should be the active orders/execs - # persistent on the ems so that loca UI's can be populated. ) as (ctx, positions), # open 2-way trade command stream diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index bfaaf82a..0fe63db2 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -124,15 +124,12 @@ _DEFAULT_SIZE: float = 1.0 async def clear_dark_triggers( - # ctx: tractor.Context, brokerd_orders_stream: tractor.MsgStream, ems_client_order_stream: tractor.MsgStream, quote_stream: tractor.ReceiveMsgStream, # noqa broker: str, symbol: str, - # client: 'Client', # noqa - # order_msg_stream: 'Client', # noqa book: _DarkBook, @@ -189,8 +186,7 @@ async def clear_dark_triggers( # message back to the requesting ems client resp = 'alert_triggered' - else: - # executable order submission + else: # executable order submission # submit_price = price + price*percent_away submit_price = price + abs_diff_away @@ -199,18 +195,17 @@ async def clear_dark_triggers( f'Dark order triggered for price {price}\n' f'Submitting order @ price {submit_price}') - # TODO: port to BrokerdOrder message sending msg = BrokerdOrder( action=cmd['action'], oid=oid, time_ns=time.time_ns(), - - # this is a brand new order request for the + # this **creates** new order request for the # underlying broker so we set a "broker - # request id" (brid) to "nothing" so that the - # broker client knows that we aren't trying - # to modify an existing order-request. + # request id" (``reqid`` kwarg) to ``None`` + # so that the broker client knows that we + # aren't trying to modify an existing + # order-request and instead create a new one. reqid=None, symbol=sym, @@ -218,13 +213,21 @@ async def clear_dark_triggers( size=cmd['size'], ) await brokerd_orders_stream.send(msg.dict()) - # mark this entry as having send an order request + + # mark this entry as having sent an order + # request. the entry will be replaced once the + # target broker replies back with + # a ``BrokerdOrderAck`` msg including the + # allocated unique ``BrokerdOrderAck.reqid`` key + # generated by the broker's own systems. book._ems_entries[oid] = msg + # our internal status value for client-side + # triggered "dark orders" resp = 'dark_triggered' msg = Status( - oid=oid, # piker order id + oid=oid, # ems order id resp=resp, time_ns=time.time_ns(), @@ -340,11 +343,11 @@ async def translate_and_relay_brokerd_events( # initial response to brokerd order request if name == 'ack': - # register the brokerd request id (that was likely - # generated internally) with our locall ems order id for - # reverse lookup later. a BrokerdOrderAck **must** be - # sent after an order request in order to establish this - # id mapping. + # register the brokerd request id (that was generated + # / created internally by the broker backend) with our + # local ems order id for reverse lookup later. + # a ``BrokerdOrderAck`` **must** be sent after an order + # request in order to establish this id mapping. book._ems2brokerd_ids[oid] = reqid # new order which has not yet be registered into the @@ -455,16 +458,17 @@ async def translate_and_relay_brokerd_events( else: raise ValueError(f'Brokerd message {brokerd_msg} is invalid') - # Create and relay EMS response status message - resp = Status( - oid=oid, - resp=resp, - time_ns=time.time_ns(), - broker_reqid=reqid, - brokerd_msg=broker_details, + # Create and relay response status message + # to requesting EMS client + await ems_client_order_stream.send( + Status( + oid=oid, + resp=resp, + time_ns=time.time_ns(), + broker_reqid=reqid, + brokerd_msg=broker_details, + ).dict() ) - # relay response to requesting EMS client - await ems_client_order_stream.send(resp.dict()) async def process_client_order_cmds( @@ -509,9 +513,9 @@ async def process_client_order_cmds( await brokerd_order_stream.send(msg.dict()) else: - # might be a cancel for order that hasn't been acked yet - # by brokerd so register a cancel for then the order - # does show up later + # this might be a cancel for an order that hasn't been + # acked yet by a brokerd, so register a cancel for when + # the order ack does show up later dark_book._ems_entries[oid] = msg # check for EMS active exec @@ -552,10 +556,9 @@ async def process_client_order_cmds( # if we already had a broker order id then # this is likely an order update commmand. - log.info(f"Modifying order: {live_entry.reqid}") + log.info( + f"Modifying live {broker} order: {live_entry.reqid}") - # TODO: port to BrokerdOrder message sending - # register broker id for ems id msg = BrokerdOrder( oid=oid, # no ib support for oids... time_ns=time.time_ns(), @@ -575,7 +578,7 @@ async def process_client_order_cmds( # (``translate_and_relay_brokerd_events()`` above) will # handle relaying the ems side responses back to # the client/cmd sender from this request - print(f'sending live order {msg}') + log.info(f'Sending live order to {broker}:\n{pformat(msg)}') await brokerd_order_stream.send(msg.dict()) # an immediate response should be brokerd ack with order @@ -653,14 +656,13 @@ async def process_client_order_cmds( async def _emsd_main( ctx: tractor.Context, - # client_actor_name: str, broker: str, symbol: str, _exec_mode: str = 'dark', # ('paper', 'dark', 'live') loglevel: str = 'info', ) -> None: - """EMS (sub)actor entrypoint providing the + '''EMS (sub)actor entrypoint providing the execution management (micro)service which conducts broker order control on behalf of clients. @@ -693,12 +695,13 @@ async def _emsd_main( accepts order cmds from requesting piker clients, registers execs with exec loop - """ - # from ._client import send_order_cmds - + ''' global _router dark_book = _router.get_dark_book(broker) + # TODO: would be nice if in tractor we can require either a ctx arg, + # or a named arg with ctx in it and a type annotation of + # tractor.Context instead of strictly requiring a ctx arg. ems_ctx = ctx cached_feed = _router.feeds.get((broker, symbol)) @@ -742,9 +745,9 @@ async def _emsd_main( _exec_mode = 'paper' log.warning(f'Entering paper trading mode for {broker}') - # load the paper trading engine inside the brokerd - # actor to simulate the real load it'll likely be under - # when also pulling data from feeds + # load the paper trading engine as a subactor of this emsd + # actor to simulate the real IPC load it'll have when also + # pulling data from feeds open_trades_endpoint = paper.open_paperboi( broker=broker, symbol=symbol, @@ -814,7 +817,7 @@ class _Router(BaseModel): ''' nursery: trio.Nursery - feeds: dict[str, tuple[trio.CancelScope, float]] = {} + feeds: dict[tuple[str, str], data.feed.Feed] = {} books: dict[str, _DarkBook] = {} class Config: @@ -842,8 +845,9 @@ async def _setup_persistent_emsd( global _router - # spawn one task per broker feed + # open a root "service nursery" for the ``emsd`` actor async with trio.open_nursery() as service_nursery: + _router = _Router(nursery=service_nursery) # TODO: send back the full set of persistent orders/execs persistent diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 788e7674..b71be312 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -216,7 +216,6 @@ class PaperBoi: time_ns=time.time_ns(), status='filled', - # broker=self.broker, filled=size, remaining=0 if order_complete else remaining, @@ -224,7 +223,6 @@ class PaperBoi: size=size, price=price, - # broker=self.broker, broker_details={ 'paper_info': { 'oid': oid, @@ -321,12 +319,6 @@ async def simulate_fills( break -# class MockBrokerdMsgStream: - - -# async def MockContext(*args, **kwargs): - - async def handle_order_requests( client: PaperBoi, diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index c3c4016a..da775821 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -127,10 +127,6 @@ class OrderMode: """ line = self.lines.commit_line(uuid) - # req_msg = self.book._sent_orders.get(uuid) - # if req_msg: - # req_msg.ack_time_ns = time.time_ns() - return line def on_fill(