diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 957bacb8..3f7b1eb0 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -40,7 +40,10 @@ from ._util import resproc, SymbolNotFound, BrokerError from ..log import get_logger, get_console_log from ..data import ShmArray from ..data._web_bs import open_autorecon_ws -from ..clearing._messages import BrokerdPosition, BrokerdOrder, BrokerdStatus +from ..clearing._messages import ( + BrokerdPosition, BrokerdOrder, BrokerdStatus, + BrokerdOrderAck, BrokerdError, BrokerdCancel, BrokerdFill +) import urllib.parse import hashlib @@ -433,6 +436,74 @@ def normalize_symbol( return ticker.lower() +async def handle_order_requests( + + client: #kraken, + ems_order_stream: tractor.MsgStream, + +) -> None: + + # order_request: dict + async for request_msg in ems_order_stream: + log.info(f'Received order request {request_msg}') + + action = request_msg['action'] + + if action in {'buy', 'sell'}: + + account = request_msg['account'] + if account != 'kraken.spot': + log.error( + 'This is a kraken account, \ + only a `kraken.spot` selection is valid' + ) + await ems_order_stream.send(BrokerError( + oid=request_msg['oid'] + symbol=request_msg['symbol'] + reason=f'Kraken only, No account found: `{account}` ?', + ).dict()) + continue + + # validate + order = BrokerdOrder(**request_msg) + + # call our client api to submit the order + ## TODO: look into the submit_limit method, do it write my own? + reqid = await client.submit_limit( + + oid=order.oid, + symbol=order.symbol, + price=order.price, + action=order.action, + size=order.size, + ## XXX: how do I handle new orders + reqid=order.reqid, + ) + + # deliver ack that order has been submitted to broker routing + await ems_order_stream.send( + BrokerdOrderAck( + + # ems order request id + oid=order.oid, + + # broker specific request id + reqid=reqid, + + ).dict() + ) + + elif action == 'cancel': + msg = BrokerdCancel(**request_msg) + + await client.submit_cancel( + reqid=msg.reqid + ) + + else: + log.error(f'Unknown order command: {request_msg}') + + @tractor.context async def trades_dialogue( ctx: tractor.Context, @@ -462,9 +533,10 @@ async def trades_dialogue( await trio.sleep_forever() - # async with ( - # ctx.open_stream() as ems_stream, - # + # async with ( + # ctx.open_stream() as ems_stream, + # trio.open_nursery as n, + # ): async def stream_messages(ws):