From d3b50b99205b21f52ceee4969d27afc1db42f027 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 29 Apr 2021 08:40:16 -0400 Subject: [PATCH] Port clearing engine to new tractor stream api --- piker/clearing/_client.py | 27 ++--- piker/clearing/_ems.py | 237 +++++++++++++++++++------------------- 2 files changed, 133 insertions(+), 131 deletions(-) diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 6138086c..b643b952 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -246,23 +246,24 @@ async def open_ems( async with maybe_open_emsd(broker) as portal: - trades_stream = await portal.run( + async with portal.open_stream_from( + _emsd_main, client_actor_name=actor.name, broker=broker, symbol=symbol.key, - ) - with trio.fail_after(10): - await book._ready_to_receive.wait() + ) as trades_stream: + with trio.fail_after(10): + await book._ready_to_receive.wait() - try: - yield book, trades_stream + try: + yield book, trades_stream - finally: - # TODO: we want to eventually keep this up (by having - # the exec loop keep running in the pikerd tree) but for - # now we have to kill the context to avoid backpressure - # build-up on the shm write loop. - with trio.CancelScope(shield=True): - await trades_stream.aclose() + finally: + # TODO: we want to eventually keep this up (by having + # the exec loop keep running in the pikerd tree) but for + # now we have to kill the context to avoid backpressure + # build-up on the shm write loop. + with trio.CancelScope(shield=True): + await trades_stream.aclose() diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 52fab921..73ca9ee1 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -339,130 +339,131 @@ async def process_broker_trades( """ broker = feed.mod.name - with trio.fail_after(5): - # in the paper engine case this is just a mem receive channel - trades_stream = await feed.recv_trades_data() + # TODO: make this a context + # in the paper engine case this is just a mem receive channel + async with feed.receive_trades_data() as trades_stream: first = await trades_stream.__anext__() - # startup msg expected as first from broker backend - assert first['local_trades'] == 'start' - task_status.started() + # startup msg expected as first from broker backend + assert first['local_trades'] == 'start' + task_status.started() - async for event in trades_stream: + async for event in trades_stream: - name, msg = event['local_trades'] + name, msg = event['local_trades'] - log.info(f'Received broker trade event:\n{pformat(msg)}') + log.info(f'Received broker trade event:\n{pformat(msg)}') - if name == 'position': - msg['resp'] = 'position' - - # relay through - await ctx.send_yield(msg) - continue - - # Get the broker (order) request id, this **must** be normalized - # into messaging provided by the broker backend - reqid = msg['reqid'] - - # make response packet to EMS client(s) - oid = book._broker2ems_ids.get(reqid) - - if oid is None: - # paper engine race case: ``Client.submit_limit()`` hasn't - # returned yet and provided an output reqid to register - # locally, so we need to retreive the oid that was already - # packed at submission since we already know it ahead of - # time - paper = msg.get('paper_info') - if paper: - oid = paper['oid'] - - else: - msg.get('external') - if not msg: - log.error(f"Unknown trade event {event}") + if name == 'position': + msg['resp'] = 'position' + # relay through + await ctx.send_yield(msg) continue - resp = { - 'resp': None, # placeholder - 'oid': oid - } + # Get the broker (order) request id, this **must** be normalized + # into messaging provided by the broker backend + reqid = msg['reqid'] - if name in ( - 'error', - ): - # TODO: figure out how this will interact with EMS clients - # for ex. on an error do we react with a dark orders - # management response, like cancelling all dark orders? + # make response packet to EMS client(s) + oid = book._broker2ems_ids.get(reqid) - # This looks like a supervision policy for pending orders on - # some unexpected failure - something we need to think more - # about. In most default situations, with composed orders - # (ex. brackets), most brokers seem to use a oca policy. + if oid is None: + # paper engine race case: ``Client.submit_limit()`` hasn't + # returned yet and provided an output reqid to register + # locally, so we need to retreive the oid that was already + # packed at submission since we already know it ahead of + # time + paper = msg.get('paper_info') + if paper: + oid = paper['oid'] - message = msg['message'] - - # XXX should we make one when it's blank? - log.error(pformat(message)) - - # TODO: getting this bs, prolly need to handle status messages - # 'Market data farm connection is OK:usfarm.nj' - - # another stupid ib error to handle - # if 10147 in message: cancel - - # don't relay message to order requester client - continue - - elif name in ( - 'status', - ): - # TODO: templating the ib statuses in comparison with other - # brokers is likely the way to go: - # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 - # short list: - # - PendingSubmit - # - PendingCancel - # - PreSubmitted (simulated orders) - # - ApiCancelled (cancelled by client before submission to routing) - # - Cancelled - # - Filled - # - Inactive (reject or cancelled but not by trader) - - # everyone doin camel case - status = msg['status'].lower() - - if status == 'filled': - - # conditional execution is fully complete, no more - # fills for the noted order - if not msg['remaining']: - - resp['resp'] = 'broker_executed' - - log.info(f'Execution for {oid} is complete!') - - # just log it else: - log.info(f'{broker} filled {msg}') + msg.get('external') + if not msg: + log.error(f"Unknown trade event {event}") - else: - # one of (submitted, cancelled) - resp['resp'] = 'broker_' + status + continue - elif name in ( - 'fill', - ): - # proxy through the "fill" result(s) - resp['resp'] = 'broker_filled' - resp.update(msg) + resp = { + 'resp': None, # placeholder + 'oid': oid + } - log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') + if name in ( + 'error', + ): + # TODO: figure out how this will interact with EMS clients + # for ex. on an error do we react with a dark orders + # management response, like cancelling all dark orders? - # respond to requesting client - await ctx.send_yield(resp) + # This looks like a supervision policy for pending orders on + # some unexpected failure - something we need to think more + # about. In most default situations, with composed orders + # (ex. brackets), most brokers seem to use a oca policy. + + message = msg['message'] + + # XXX should we make one when it's blank? + log.error(pformat(message)) + + # TODO: getting this bs, prolly need to handle status messages + # 'Market data farm connection is OK:usfarm.nj' + + # another stupid ib error to handle + # if 10147 in message: cancel + + # don't relay message to order requester client + continue + + elif name in ( + 'status', + ): + # TODO: templating the ib statuses in comparison with other + # brokers is likely the way to go: + # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 + # short list: + # - PendingSubmit + # - PendingCancel + # - PreSubmitted (simulated orders) + # - ApiCancelled (cancelled by client before submission + # to routing) + # - Cancelled + # - Filled + # - Inactive (reject or cancelled but not by trader) + + # everyone doin camel case + status = msg['status'].lower() + + if status == 'filled': + + # conditional execution is fully complete, no more + # fills for the noted order + if not msg['remaining']: + + resp['resp'] = 'broker_executed' + + log.info(f'Execution for {oid} is complete!') + + # just log it + else: + log.info(f'{broker} filled {msg}') + + else: + # one of (submitted, cancelled) + resp['resp'] = 'broker_' + status + + elif name in ( + 'fill', + ): + # proxy through the "fill" result(s) + resp['resp'] = 'broker_filled' + resp.update(msg) + + log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') + + # respond to requesting client + await ctx.send_yield(resp) async def process_order_cmds( @@ -675,17 +676,17 @@ async def _emsd_main( # acting as an EMS client and will submit orders) to # receive requests pushed over a tractor stream # using (for now) an async generator. - order_stream = await portal.run( + async with portal.open_stream_from( send_order_cmds, symbol_key=symbol, - ) + ) as order_stream: - # start inbound order request processing - await process_order_cmds( - ctx, - order_stream, - symbol, - feed, - client, - dark_book, - ) + # start inbound order request processing + await process_order_cmds( + ctx, + order_stream, + symbol, + feed, + client, + dark_book, + )