Factor msg loop into new func: `handle_order_updates()`
							parent
							
								
									9e8d32cdff
								
							
						
					
					
						commit
						f1192dff09
					
				| 
						 | 
					@ -329,355 +329,383 @@ async def trades_dialogue(
 | 
				
			||||||
                ids,
 | 
					                ids,
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # process and relay trades events to ems
 | 
					            # enter relay loop
 | 
				
			||||||
 | 
					            await handle_order_updates(
 | 
				
			||||||
 | 
					                ws,
 | 
				
			||||||
 | 
					                ems_stream,
 | 
				
			||||||
 | 
					                emsflow,
 | 
				
			||||||
 | 
					                ids,
 | 
				
			||||||
 | 
					                trans,
 | 
				
			||||||
 | 
					                acctid,
 | 
				
			||||||
 | 
					                acc_name,
 | 
				
			||||||
 | 
					                token,
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					async def handle_order_updates(
 | 
				
			||||||
 | 
					    ws: NoBsWs,
 | 
				
			||||||
 | 
					    ems_stream: tractor.MsgStream,
 | 
				
			||||||
 | 
					    emsflow: dict[str, list[MsgUnion]],
 | 
				
			||||||
 | 
					    ids: bidict[str, int],
 | 
				
			||||||
 | 
					    trans: list[pp.Transaction],
 | 
				
			||||||
 | 
					    acctid: str,
 | 
				
			||||||
 | 
					    acc_name: str,
 | 
				
			||||||
 | 
					    token: str,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					) -> None:
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
 | 
					    Main msg handling loop for all things order management.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    This code is broken out to make the context explicit and state variables
 | 
				
			||||||
 | 
					    defined in the signature clear to the reader.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
 | 
					    async for msg in stream_messages(ws):
 | 
				
			||||||
 | 
					        match msg:
 | 
				
			||||||
 | 
					            # process and relay clearing trade events to ems
 | 
				
			||||||
            # https://docs.kraken.com/websockets/#message-ownTrades
 | 
					            # https://docs.kraken.com/websockets/#message-ownTrades
 | 
				
			||||||
            async for msg in stream_messages(ws):
 | 
					            case [
 | 
				
			||||||
                match msg:
 | 
					                trades_msgs,
 | 
				
			||||||
                    case [
 | 
					                'ownTrades',
 | 
				
			||||||
                        trades_msgs,
 | 
					                {'sequence': seq},
 | 
				
			||||||
                        'ownTrades',
 | 
					            ]:
 | 
				
			||||||
                        {'sequence': seq},
 | 
					                # flatten msgs for processing
 | 
				
			||||||
                    ]:
 | 
					                trades = {
 | 
				
			||||||
                        # flatten msgs for processing
 | 
					                    tid: trade
 | 
				
			||||||
                        trades = {
 | 
					                    for entry in trades_msgs
 | 
				
			||||||
                            tid: trade
 | 
					                    for (tid, trade) in entry.items()
 | 
				
			||||||
                            for entry in trades_msgs
 | 
					 | 
				
			||||||
                            for (tid, trade) in entry.items()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                            # only emit entries which are already not-in-ledger
 | 
					                    # only emit entries which are already not-in-ledger
 | 
				
			||||||
                            if tid not in {r.tid for r in trans}
 | 
					                    if tid not in {r.tid for r in trans}
 | 
				
			||||||
                        }
 | 
					                }
 | 
				
			||||||
                        for tid, trade in trades.items():
 | 
					                for tid, trade in trades.items():
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                            # parse-cast
 | 
					                    # parse-cast
 | 
				
			||||||
                            reqid = trade['ordertxid']
 | 
					                    reqid = trade['ordertxid']
 | 
				
			||||||
                            action = trade['type']
 | 
					                    action = trade['type']
 | 
				
			||||||
                            price = float(trade['price'])
 | 
					                    price = float(trade['price'])
 | 
				
			||||||
                            size = float(trade['vol'])
 | 
					                    size = float(trade['vol'])
 | 
				
			||||||
                            broker_time = float(trade['time'])
 | 
					                    broker_time = float(trade['time'])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                            # send a fill msg for gui update
 | 
					                    # send a fill msg for gui update
 | 
				
			||||||
                            fill_msg = BrokerdFill(
 | 
					                    fill_msg = BrokerdFill(
 | 
				
			||||||
                                reqid=reqid,
 | 
					                        reqid=reqid,
 | 
				
			||||||
                                time_ns=time.time_ns(),
 | 
					                        time_ns=time.time_ns(),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                                action=action,
 | 
					                        action=action,
 | 
				
			||||||
                                size=size,
 | 
					                        size=size,
 | 
				
			||||||
                                price=price,
 | 
					                        price=price,
 | 
				
			||||||
                                # TODO: maybe capture more msg data
 | 
					                        # TODO: maybe capture more msg data
 | 
				
			||||||
                                # i.e fees?
 | 
					                        # i.e fees?
 | 
				
			||||||
                                broker_details={'name': 'kraken'},
 | 
					                        broker_details={'name': 'kraken'},
 | 
				
			||||||
                                broker_time=broker_time
 | 
					                        broker_time=broker_time
 | 
				
			||||||
                            )
 | 
					                    )
 | 
				
			||||||
                            await ems_stream.send(fill_msg.dict())
 | 
					                    await ems_stream.send(fill_msg.dict())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                            filled_msg = BrokerdStatus(
 | 
					                    filled_msg = BrokerdStatus(
 | 
				
			||||||
                                reqid=reqid,
 | 
					                        reqid=reqid,
 | 
				
			||||||
                                time_ns=time.time_ns(),
 | 
					                        time_ns=time.time_ns(),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                                account=acc_name,
 | 
					                        account=acc_name,
 | 
				
			||||||
                                status='filled',
 | 
					                        status='filled',
 | 
				
			||||||
                                filled=size,
 | 
					                        filled=size,
 | 
				
			||||||
                                reason='Order filled by kraken',
 | 
					                        reason='Order filled by kraken',
 | 
				
			||||||
                                broker_details={
 | 
					                        broker_details={
 | 
				
			||||||
                                    'name': 'kraken',
 | 
					                            'name': 'kraken',
 | 
				
			||||||
                                    'broker_time': broker_time
 | 
					                            'broker_time': broker_time
 | 
				
			||||||
                                },
 | 
					                        },
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                                # TODO: figure out if kraken gives a count
 | 
					                        # TODO: figure out if kraken gives a count
 | 
				
			||||||
                                # of how many units of underlying were
 | 
					                        # of how many units of underlying were
 | 
				
			||||||
                                # filled. Alternatively we can decrement
 | 
					                        # filled. Alternatively we can decrement
 | 
				
			||||||
                                # this value ourselves by associating and
 | 
					                        # this value ourselves by associating and
 | 
				
			||||||
                                # calcing from the diff with the original
 | 
					                        # calcing from the diff with the original
 | 
				
			||||||
                                # client-side request, see:
 | 
					                        # client-side request, see:
 | 
				
			||||||
                                # https://github.com/pikers/piker/issues/296
 | 
					                        # https://github.com/pikers/piker/issues/296
 | 
				
			||||||
                                remaining=0,
 | 
					                        remaining=0,
 | 
				
			||||||
                            )
 | 
					                    )
 | 
				
			||||||
                            await ems_stream.send(filled_msg.dict())
 | 
					                    await ems_stream.send(filled_msg.dict())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        # update ledger and position tracking
 | 
					                # update ledger and position tracking
 | 
				
			||||||
                        trans = await update_ledger(acctid, trades)
 | 
					                trans = await update_ledger(acctid, trades)
 | 
				
			||||||
                        active, closed = pp.update_pps_conf(
 | 
					                active, closed = pp.update_pps_conf(
 | 
				
			||||||
                            'kraken',
 | 
					                    'kraken',
 | 
				
			||||||
                            acctid,
 | 
					                    acctid,
 | 
				
			||||||
                            trade_records=trans,
 | 
					                    trade_records=trans,
 | 
				
			||||||
                            ledger_reload={}.fromkeys(
 | 
					                    ledger_reload={}.fromkeys(
 | 
				
			||||||
                                t.bsuid for t in trans),
 | 
					                        t.bsuid for t in trans),
 | 
				
			||||||
                        )
 | 
					                )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        # emit pp msgs
 | 
					                # emit any new pp msgs to ems
 | 
				
			||||||
                        for pos in filter(
 | 
					                for pos in filter(
 | 
				
			||||||
                            bool,
 | 
					                    bool,
 | 
				
			||||||
                            chain(active.values(), closed.values()),
 | 
					                    chain(active.values(), closed.values()),
 | 
				
			||||||
                        ):
 | 
					                ):
 | 
				
			||||||
                            pp_msg = BrokerdPosition(
 | 
					                    pp_msg = BrokerdPosition(
 | 
				
			||||||
                                broker='kraken',
 | 
					                        broker='kraken',
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                                # XXX: ok so this is annoying, we're
 | 
					                        # XXX: ok so this is annoying, we're
 | 
				
			||||||
                                # relaying an account name with the
 | 
					                        # relaying an account name with the
 | 
				
			||||||
                                # backend suffix prefixed but when
 | 
					                        # backend suffix prefixed but when
 | 
				
			||||||
                                # reading accounts from ledgers we
 | 
					                        # reading accounts from ledgers we
 | 
				
			||||||
                                # don't need it and/or it's prefixed
 | 
					                        # don't need it and/or it's prefixed
 | 
				
			||||||
                                # in the section table.. we should
 | 
					                        # in the section table.. we should
 | 
				
			||||||
                                # just strip this from the message
 | 
					                        # just strip this from the message
 | 
				
			||||||
                                # right since `.broker` is already
 | 
					                        # right since `.broker` is already
 | 
				
			||||||
                                # included?
 | 
					                        # included?
 | 
				
			||||||
                                account=f'kraken.{acctid}',
 | 
					                        account=f'kraken.{acctid}',
 | 
				
			||||||
                                symbol=pos.symbol.front_fqsn(),
 | 
					                        symbol=pos.symbol.front_fqsn(),
 | 
				
			||||||
                                size=pos.size,
 | 
					                        size=pos.size,
 | 
				
			||||||
                                avg_price=pos.be_price,
 | 
					                        avg_price=pos.be_price,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                                # TODO
 | 
					                        # TODO
 | 
				
			||||||
                                # currency=''
 | 
					                        # currency=''
 | 
				
			||||||
                            )
 | 
					                    )
 | 
				
			||||||
                            await ems_stream.send(pp_msg.dict())
 | 
					                    await ems_stream.send(pp_msg.dict())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    case [
 | 
					            # process and relay order state change events
 | 
				
			||||||
                        order_msgs,
 | 
					            # https://docs.kraken.com/websockets/#message-openOrders
 | 
				
			||||||
                        'openOrders',
 | 
					            case [
 | 
				
			||||||
                        {'sequence': seq},
 | 
					                order_msgs,
 | 
				
			||||||
                    ]:
 | 
					                'openOrders',
 | 
				
			||||||
                        # TODO: async order update handling which we
 | 
					                {'sequence': seq},
 | 
				
			||||||
                        # should remove from `handle_order_requests()`
 | 
					            ]:
 | 
				
			||||||
                        # above:
 | 
					                for order_msg in order_msgs:
 | 
				
			||||||
                        # https://github.com/pikers/piker/issues/293
 | 
					                    log.info(
 | 
				
			||||||
                        # https://github.com/pikers/piker/issues/310
 | 
					                        f'Order msg update_{seq}:\n'
 | 
				
			||||||
                        for order_msg in order_msgs:
 | 
					                        f'{pformat(order_msg)}'
 | 
				
			||||||
                            log.info(
 | 
					                    )
 | 
				
			||||||
                                'Order msg update_{seq}:\n'
 | 
					                    txid, update_msg = list(order_msg.items())[0]
 | 
				
			||||||
                                f'{pformat(order_msg)}'
 | 
					                    match update_msg:
 | 
				
			||||||
                            )
 | 
					                        case {
 | 
				
			||||||
                            txid, update_msg = list(order_msg.items())[0]
 | 
					                            'cancel_reason': 'Order replaced',
 | 
				
			||||||
                            match update_msg:
 | 
					                            'status': status,
 | 
				
			||||||
                                case {
 | 
					                            'userref': reqid,
 | 
				
			||||||
                                    'cancel_reason': 'Order replaced',
 | 
					                            **rest,
 | 
				
			||||||
                                    'status': status,
 | 
					                        }:
 | 
				
			||||||
                                    'userref': reqid,
 | 
					                            # we ignore internal order updates
 | 
				
			||||||
                                    **rest,
 | 
					                            # triggered by kraken's "edit"
 | 
				
			||||||
                                }:
 | 
					                            # endpoint.
 | 
				
			||||||
                                    # we ignore internal order updates
 | 
					                            continue
 | 
				
			||||||
                                    # triggered by kraken's "edit"
 | 
					 | 
				
			||||||
                                    # endpoint.
 | 
					 | 
				
			||||||
                                    continue
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                                case {
 | 
					                        case {
 | 
				
			||||||
                                    'status': status,
 | 
					                            'status': status,
 | 
				
			||||||
                                    'userref': reqid,
 | 
					                            'userref': reqid,
 | 
				
			||||||
                                    **rest,
 | 
					                            **rest,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                                    # XXX: eg. of remaining msg schema:
 | 
					                            # XXX: eg. of remaining msg schema:
 | 
				
			||||||
                                    # 'avg_price': _,
 | 
					                            # 'avg_price': _,
 | 
				
			||||||
                                    # 'cost': _,
 | 
					                            # 'cost': _,
 | 
				
			||||||
                                    # 'descr': {
 | 
					                            # 'descr': {
 | 
				
			||||||
                                        # 'close': None,
 | 
					                                # 'close': None,
 | 
				
			||||||
                                        # 'leverage': None,
 | 
					                                # 'leverage': None,
 | 
				
			||||||
                                        # 'order': descr,
 | 
					                                # 'order': descr,
 | 
				
			||||||
                                        # 'ordertype': 'limit',
 | 
					                                # 'ordertype': 'limit',
 | 
				
			||||||
                                        # 'pair': 'XMR/EUR',
 | 
					                                # 'pair': 'XMR/EUR',
 | 
				
			||||||
                                        # 'price': '74.94000000',
 | 
					                                # 'price': '74.94000000',
 | 
				
			||||||
                                        # 'price2': '0.00000000',
 | 
					                                # 'price2': '0.00000000',
 | 
				
			||||||
                                        # 'type': 'buy'
 | 
					                                # 'type': 'buy'
 | 
				
			||||||
                                    # },
 | 
					                            # },
 | 
				
			||||||
                                    # 'expiretm': None,
 | 
					                            # 'expiretm': None,
 | 
				
			||||||
                                    # 'fee': '0.00000000',
 | 
					                            # 'fee': '0.00000000',
 | 
				
			||||||
                                    # 'limitprice': '0.00000000',
 | 
					                            # 'limitprice': '0.00000000',
 | 
				
			||||||
                                    # 'misc': '',
 | 
					                            # 'misc': '',
 | 
				
			||||||
                                    # 'oflags': 'fciq',
 | 
					                            # 'oflags': 'fciq',
 | 
				
			||||||
                                    # 'opentm': '1656966131.337344',
 | 
					                            # 'opentm': '1656966131.337344',
 | 
				
			||||||
                                    # 'refid': None,
 | 
					                            # 'refid': None,
 | 
				
			||||||
                                    # 'starttm': None,
 | 
					                            # 'starttm': None,
 | 
				
			||||||
                                    # 'stopprice': '0.00000000',
 | 
					                            # 'stopprice': '0.00000000',
 | 
				
			||||||
                                    # 'timeinforce': 'GTC',
 | 
					                            # 'timeinforce': 'GTC',
 | 
				
			||||||
                                    # 'vol': submit_vlm,  # '13.34400854',
 | 
					                            # 'vol': submit_vlm,  # '13.34400854',
 | 
				
			||||||
                                    # 'vol_exec': exec_vlm,  # 0.0000
 | 
					                            # 'vol_exec': exec_vlm,  # 0.0000
 | 
				
			||||||
                                }:
 | 
					                        }:
 | 
				
			||||||
                                    ems_status = {
 | 
					                            ems_status = {
 | 
				
			||||||
                                        'open': 'submitted',
 | 
					                                'open': 'submitted',
 | 
				
			||||||
                                        'closed': 'cancelled',
 | 
					                                'closed': 'cancelled',
 | 
				
			||||||
                                        'canceled': 'cancelled',
 | 
					                                'canceled': 'cancelled',
 | 
				
			||||||
                                        # do we even need to forward
 | 
					                                # do we even need to forward
 | 
				
			||||||
                                        # this state to the ems?
 | 
					                                # this state to the ems?
 | 
				
			||||||
                                        'pending': 'pending',
 | 
					                                'pending': 'pending',
 | 
				
			||||||
                                    }[status]
 | 
					                            }[status]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                                    submit_vlm = rest.get('vol', 0)
 | 
					                            submit_vlm = rest.get('vol', 0)
 | 
				
			||||||
                                    exec_vlm = rest.get('vol_exec', 0)
 | 
					                            exec_vlm = rest.get('vol_exec', 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                                    oid = ids.inverse[reqid]
 | 
					                            oid = ids.inverse[reqid]
 | 
				
			||||||
                                    msgs = emsflow[oid]
 | 
					                            msgs = emsflow[oid]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                                    # send BrokerdStatus messages for all
 | 
					                            # send BrokerdStatus messages for all
 | 
				
			||||||
                                    # order state updates
 | 
					                            # order state updates
 | 
				
			||||||
                                    resp = BrokerdStatus(
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                                        reqid=txid,
 | 
					 | 
				
			||||||
                                        time_ns=time.time_ns(),  # cuz why not
 | 
					 | 
				
			||||||
                                        account=f'kraken.{acctid}',
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                                        # everyone doin camel case..
 | 
					 | 
				
			||||||
                                        status=ems_status,  # force lower case
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                                        filled=exec_vlm,
 | 
					 | 
				
			||||||
                                        reason='',  # why held?
 | 
					 | 
				
			||||||
                                        remaining=(
 | 
					 | 
				
			||||||
                                            float(submit_vlm)
 | 
					 | 
				
			||||||
                                            -
 | 
					 | 
				
			||||||
                                            float(exec_vlm)
 | 
					 | 
				
			||||||
                                        ),
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                                        broker_details=dict(
 | 
					 | 
				
			||||||
                                            {'name': 'kraken'}, **update_msg
 | 
					 | 
				
			||||||
                                        ),
 | 
					 | 
				
			||||||
                                    )
 | 
					 | 
				
			||||||
                                    msgs.append(resp)
 | 
					 | 
				
			||||||
                                    await ems_stream.send(resp.dict())
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                                case _:
 | 
					 | 
				
			||||||
                                    log.warning(
 | 
					 | 
				
			||||||
                                        'Unknown orders msg:\n'
 | 
					 | 
				
			||||||
                                        f'{txid}:{order_msg}'
 | 
					 | 
				
			||||||
                                    )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                    case {
 | 
					 | 
				
			||||||
                        'event': etype,
 | 
					 | 
				
			||||||
                        'status': status,
 | 
					 | 
				
			||||||
                        'errorMessage': errmsg,
 | 
					 | 
				
			||||||
                        'reqid': reqid,
 | 
					 | 
				
			||||||
                    } if (
 | 
					 | 
				
			||||||
                        etype in {'addOrderStatus', 'editOrderStatus'}
 | 
					 | 
				
			||||||
                        and status == 'error'
 | 
					 | 
				
			||||||
                    ):
 | 
					 | 
				
			||||||
                        log.error(
 | 
					 | 
				
			||||||
                            f'Failed to submit/edit order {reqid}:\n'
 | 
					 | 
				
			||||||
                            f'{errmsg}'
 | 
					 | 
				
			||||||
                        )
 | 
					 | 
				
			||||||
                        oid = ids.inverse[reqid]
 | 
					 | 
				
			||||||
                        msgs = emsflow[oid]
 | 
					 | 
				
			||||||
                        last = msgs[-1]
 | 
					 | 
				
			||||||
                        resp = BrokerdError(
 | 
					 | 
				
			||||||
                            oid=oid,
 | 
					 | 
				
			||||||
                            # use old reqid in case it changed?
 | 
					 | 
				
			||||||
                            reqid=last.reqid,
 | 
					 | 
				
			||||||
                            symbol=last.symbol,
 | 
					 | 
				
			||||||
                            reason=f'Failed submit:\n{errmsg}',
 | 
					 | 
				
			||||||
                            broker_details=resp
 | 
					 | 
				
			||||||
                        )
 | 
					 | 
				
			||||||
                        msgs.append(resp)
 | 
					 | 
				
			||||||
                        await ems_stream.send(resp.dict())
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                        # if we rx any error cancel the order again
 | 
					 | 
				
			||||||
                        await ws.send_msg({
 | 
					 | 
				
			||||||
                            'event': 'cancelOrder',
 | 
					 | 
				
			||||||
                            'token': token,
 | 
					 | 
				
			||||||
                            'reqid': reqid,
 | 
					 | 
				
			||||||
                            'txid': [last.reqid],  # txid from submission
 | 
					 | 
				
			||||||
                        })
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                    case {
 | 
					 | 
				
			||||||
                        'event': 'addOrderStatus',
 | 
					 | 
				
			||||||
                        'status': status,
 | 
					 | 
				
			||||||
                        'reqid': reqid,  # oid from ems side
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                        # NOTE: in the case of an edit request this is
 | 
					 | 
				
			||||||
                        # a new value!
 | 
					 | 
				
			||||||
                        'txid': txid,
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                        'descr': descr,  # only on success?
 | 
					 | 
				
			||||||
                        # 'originaltxid': txid,  # only on edits
 | 
					 | 
				
			||||||
                        # **rest,
 | 
					 | 
				
			||||||
                    }:
 | 
					 | 
				
			||||||
                        oid = ids.inverse[reqid]
 | 
					 | 
				
			||||||
                        msgs = emsflow[oid]
 | 
					 | 
				
			||||||
                        last = msgs[-1]
 | 
					 | 
				
			||||||
                        log.info(
 | 
					 | 
				
			||||||
                            f'Submitting order: {descr}\n'
 | 
					 | 
				
			||||||
                            f'ems oid: {oid}\n'
 | 
					 | 
				
			||||||
                            f're-mapped reqid: {reqid}\n'
 | 
					 | 
				
			||||||
                            f'txid: {txid}\n'
 | 
					 | 
				
			||||||
                        )
 | 
					 | 
				
			||||||
                        resp = BrokerdOrderAck(
 | 
					 | 
				
			||||||
                            oid=oid,  # ems order request id
 | 
					 | 
				
			||||||
                            reqid=txid,  # kraken unique order id
 | 
					 | 
				
			||||||
                            account=last.account,  # piker account
 | 
					 | 
				
			||||||
                        )
 | 
					 | 
				
			||||||
                        msgs.append(resp)
 | 
					 | 
				
			||||||
                        await ems_stream.send(resp.dict())
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                    case {
 | 
					 | 
				
			||||||
                        'event': 'editOrderStatus',
 | 
					 | 
				
			||||||
                        'status': status,
 | 
					 | 
				
			||||||
                        'reqid': reqid,  # oid from ems side
 | 
					 | 
				
			||||||
                        'descr': descr,
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                        # NOTE: for edit request this is a new value
 | 
					 | 
				
			||||||
                        'txid': txid,
 | 
					 | 
				
			||||||
                        'originaltxid': origtxid,
 | 
					 | 
				
			||||||
                    }:
 | 
					 | 
				
			||||||
                        log.info(
 | 
					 | 
				
			||||||
                            f'Editting order {oid}[requid={reqid}]:\n'
 | 
					 | 
				
			||||||
                            f'txid: {origtxid} -> {txid}\n'
 | 
					 | 
				
			||||||
                            f'{descr}'
 | 
					 | 
				
			||||||
                        )
 | 
					 | 
				
			||||||
                        # deliver another ack to update the ems-side `.reqid`.
 | 
					 | 
				
			||||||
                        oid = ids.inverse[reqid]
 | 
					 | 
				
			||||||
                        msgs = emsflow[oid]
 | 
					 | 
				
			||||||
                        last = msgs[-1]
 | 
					 | 
				
			||||||
                        resp = BrokerdOrderAck(
 | 
					 | 
				
			||||||
                            oid=oid,  # ems order request id
 | 
					 | 
				
			||||||
                            reqid=txid,  # kraken unique order id
 | 
					 | 
				
			||||||
                            account=last.account,  # piker account
 | 
					 | 
				
			||||||
                        )
 | 
					 | 
				
			||||||
                        msgs.append(resp)
 | 
					 | 
				
			||||||
                        await ems_stream.send(resp.dict())
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                    # successful cancellation
 | 
					 | 
				
			||||||
                    case {
 | 
					 | 
				
			||||||
                        "event": "cancelOrderStatus",
 | 
					 | 
				
			||||||
                        "status": "ok",
 | 
					 | 
				
			||||||
                        'txid': txids,
 | 
					 | 
				
			||||||
                        'reqid': reqid,
 | 
					 | 
				
			||||||
                    }:
 | 
					 | 
				
			||||||
                        # TODO: should we support "batch" acking of
 | 
					 | 
				
			||||||
                        # multiple cancels thus avoiding the below loop?
 | 
					 | 
				
			||||||
                        oid = ids.inverse[reqid]
 | 
					 | 
				
			||||||
                        msgs = emsflow[oid]
 | 
					 | 
				
			||||||
                        last = msgs[-1]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                        for txid in txids:
 | 
					 | 
				
			||||||
                            resp = BrokerdStatus(
 | 
					                            resp = BrokerdStatus(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                                reqid=txid,
 | 
					                                reqid=txid,
 | 
				
			||||||
                                account=last.account,
 | 
					                                time_ns=time.time_ns(),  # cuz why not
 | 
				
			||||||
                                time_ns=time.time_ns(),
 | 
					                                account=f'kraken.{acctid}',
 | 
				
			||||||
                                status='cancelled',
 | 
					
 | 
				
			||||||
                                reason='Cancel success: {oid}@{txid}',
 | 
					                                # everyone doin camel case..
 | 
				
			||||||
                                broker_details=resp,
 | 
					                                status=ems_status,  # force lower case
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                                filled=exec_vlm,
 | 
				
			||||||
 | 
					                                reason='',  # why held?
 | 
				
			||||||
 | 
					                                remaining=(
 | 
				
			||||||
 | 
					                                    float(submit_vlm)
 | 
				
			||||||
 | 
					                                    -
 | 
				
			||||||
 | 
					                                    float(exec_vlm)
 | 
				
			||||||
 | 
					                                ),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                                broker_details=dict(
 | 
				
			||||||
 | 
					                                    {'name': 'kraken'}, **update_msg
 | 
				
			||||||
 | 
					                                ),
 | 
				
			||||||
                            )
 | 
					                            )
 | 
				
			||||||
                            msgs.append(resp)
 | 
					                            msgs.append(resp)
 | 
				
			||||||
                            await ems_stream.send(resp.dict())
 | 
					                            await ems_stream.send(resp.dict())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    # failed cancel
 | 
					                        case _:
 | 
				
			||||||
                    case {
 | 
					                            log.warning(
 | 
				
			||||||
                       "event": "cancelOrderStatus",
 | 
					                                'Unknown orders msg:\n'
 | 
				
			||||||
                       "status": "error",
 | 
					                                f'{txid}:{order_msg}'
 | 
				
			||||||
                       "errorMessage": errmsg,
 | 
					                            )
 | 
				
			||||||
                       'reqid': reqid,
 | 
					 | 
				
			||||||
                    }:
 | 
					 | 
				
			||||||
                        oid = ids.inverse[reqid]
 | 
					 | 
				
			||||||
                        msgs = emsflow[oid]
 | 
					 | 
				
			||||||
                        last = msgs[-1]
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        resp = BrokerdError(
 | 
					            case {
 | 
				
			||||||
                            oid=oid,
 | 
					                'event': etype,
 | 
				
			||||||
                            reqid=last.reqid,
 | 
					                'status': status,
 | 
				
			||||||
                            symbol=last.symbol,
 | 
					                'errorMessage': errmsg,
 | 
				
			||||||
                            reason=f'Failed order cancel {errmsg}',
 | 
					                'reqid': reqid,
 | 
				
			||||||
                            broker_details=resp
 | 
					            } if (
 | 
				
			||||||
                        )
 | 
					                etype in {'addOrderStatus', 'editOrderStatus'}
 | 
				
			||||||
                        msgs.append(resp)
 | 
					                and status == 'error'
 | 
				
			||||||
                        await ems_stream.send(resp.dict())
 | 
					            ):
 | 
				
			||||||
 | 
					                log.error(
 | 
				
			||||||
 | 
					                    f'Failed to submit/edit order {reqid}:\n'
 | 
				
			||||||
 | 
					                    f'{errmsg}'
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					                oid = ids.inverse[reqid]
 | 
				
			||||||
 | 
					                msgs = emsflow[oid]
 | 
				
			||||||
 | 
					                last = msgs[-1]
 | 
				
			||||||
 | 
					                resp = BrokerdError(
 | 
				
			||||||
 | 
					                    oid=oid,
 | 
				
			||||||
 | 
					                    # use old reqid in case it changed?
 | 
				
			||||||
 | 
					                    reqid=last.reqid,
 | 
				
			||||||
 | 
					                    symbol=last.symbol,
 | 
				
			||||||
 | 
					                    reason=f'Failed submit:\n{errmsg}',
 | 
				
			||||||
 | 
					                    broker_details=resp
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					                msgs.append(resp)
 | 
				
			||||||
 | 
					                await ems_stream.send(resp.dict())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    case _:
 | 
					                # if we rx any error cancel the order again
 | 
				
			||||||
                        log.warning(f'Unhandled trades msg: {msg}')
 | 
					                await ws.send_msg({
 | 
				
			||||||
 | 
					                    'event': 'cancelOrder',
 | 
				
			||||||
 | 
					                    'token': token,
 | 
				
			||||||
 | 
					                    'reqid': reqid,
 | 
				
			||||||
 | 
					                    'txid': [last.reqid],  # txid from submission
 | 
				
			||||||
 | 
					                })
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            case {
 | 
				
			||||||
 | 
					                'event': 'addOrderStatus',
 | 
				
			||||||
 | 
					                'status': status,
 | 
				
			||||||
 | 
					                'reqid': reqid,  # oid from ems side
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # NOTE: in the case of an edit request this is
 | 
				
			||||||
 | 
					                # a new value!
 | 
				
			||||||
 | 
					                'txid': txid,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                'descr': descr,  # only on success?
 | 
				
			||||||
 | 
					                # 'originaltxid': txid,  # only on edits
 | 
				
			||||||
 | 
					                # **rest,
 | 
				
			||||||
 | 
					            }:
 | 
				
			||||||
 | 
					                oid = ids.inverse[reqid]
 | 
				
			||||||
 | 
					                msgs = emsflow[oid]
 | 
				
			||||||
 | 
					                last = msgs[-1]
 | 
				
			||||||
 | 
					                log.info(
 | 
				
			||||||
 | 
					                    f'Submitting order: {descr}\n'
 | 
				
			||||||
 | 
					                    f'ems oid: {oid}\n'
 | 
				
			||||||
 | 
					                    f're-mapped reqid: {reqid}\n'
 | 
				
			||||||
 | 
					                    f'txid: {txid}\n'
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					                resp = BrokerdOrderAck(
 | 
				
			||||||
 | 
					                    oid=oid,  # ems order request id
 | 
				
			||||||
 | 
					                    reqid=txid,  # kraken unique order id
 | 
				
			||||||
 | 
					                    account=last.account,  # piker account
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					                msgs.append(resp)
 | 
				
			||||||
 | 
					                await ems_stream.send(resp.dict())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            case {
 | 
				
			||||||
 | 
					                'event': 'editOrderStatus',
 | 
				
			||||||
 | 
					                'status': status,
 | 
				
			||||||
 | 
					                'reqid': reqid,  # oid from ems side
 | 
				
			||||||
 | 
					                'descr': descr,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # NOTE: for edit request this is a new value
 | 
				
			||||||
 | 
					                'txid': txid,
 | 
				
			||||||
 | 
					                'originaltxid': origtxid,
 | 
				
			||||||
 | 
					            }:
 | 
				
			||||||
 | 
					                log.info(
 | 
				
			||||||
 | 
					                    f'Editting order {oid}[requid={reqid}]:\n'
 | 
				
			||||||
 | 
					                    f'txid: {origtxid} -> {txid}\n'
 | 
				
			||||||
 | 
					                    f'{descr}'
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					                # deliver another ack to update the ems-side `.reqid`.
 | 
				
			||||||
 | 
					                oid = ids.inverse[reqid]
 | 
				
			||||||
 | 
					                msgs = emsflow[oid]
 | 
				
			||||||
 | 
					                last = msgs[-1]
 | 
				
			||||||
 | 
					                resp = BrokerdOrderAck(
 | 
				
			||||||
 | 
					                    oid=oid,  # ems order request id
 | 
				
			||||||
 | 
					                    reqid=txid,  # kraken unique order id
 | 
				
			||||||
 | 
					                    account=last.account,  # piker account
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					                msgs.append(resp)
 | 
				
			||||||
 | 
					                await ems_stream.send(resp.dict())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # successful cancellation
 | 
				
			||||||
 | 
					            case {
 | 
				
			||||||
 | 
					                "event": "cancelOrderStatus",
 | 
				
			||||||
 | 
					                "status": "ok",
 | 
				
			||||||
 | 
					                'txid': txids,
 | 
				
			||||||
 | 
					                'reqid': reqid,
 | 
				
			||||||
 | 
					            }:
 | 
				
			||||||
 | 
					                # TODO: should we support "batch" acking of
 | 
				
			||||||
 | 
					                # multiple cancels thus avoiding the below loop?
 | 
				
			||||||
 | 
					                oid = ids.inverse[reqid]
 | 
				
			||||||
 | 
					                msgs = emsflow[oid]
 | 
				
			||||||
 | 
					                last = msgs[-1]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                for txid in txids:
 | 
				
			||||||
 | 
					                    resp = BrokerdStatus(
 | 
				
			||||||
 | 
					                        reqid=txid,
 | 
				
			||||||
 | 
					                        account=last.account,
 | 
				
			||||||
 | 
					                        time_ns=time.time_ns(),
 | 
				
			||||||
 | 
					                        status='cancelled',
 | 
				
			||||||
 | 
					                        reason='Cancel success: {oid}@{txid}',
 | 
				
			||||||
 | 
					                        broker_details=resp,
 | 
				
			||||||
 | 
					                    )
 | 
				
			||||||
 | 
					                    msgs.append(resp)
 | 
				
			||||||
 | 
					                    await ems_stream.send(resp.dict())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # failed cancel
 | 
				
			||||||
 | 
					            case {
 | 
				
			||||||
 | 
					               "event": "cancelOrderStatus",
 | 
				
			||||||
 | 
					               "status": "error",
 | 
				
			||||||
 | 
					               "errorMessage": errmsg,
 | 
				
			||||||
 | 
					               'reqid': reqid,
 | 
				
			||||||
 | 
					            }:
 | 
				
			||||||
 | 
					                oid = ids.inverse[reqid]
 | 
				
			||||||
 | 
					                msgs = emsflow[oid]
 | 
				
			||||||
 | 
					                last = msgs[-1]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                resp = BrokerdError(
 | 
				
			||||||
 | 
					                    oid=oid,
 | 
				
			||||||
 | 
					                    reqid=last.reqid,
 | 
				
			||||||
 | 
					                    symbol=last.symbol,
 | 
				
			||||||
 | 
					                    reason=f'Failed order cancel {errmsg}',
 | 
				
			||||||
 | 
					                    broker_details=resp
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					                msgs.append(resp)
 | 
				
			||||||
 | 
					                await ems_stream.send(resp.dict())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            case _:
 | 
				
			||||||
 | 
					                log.warning(f'Unhandled trades msg: {msg}')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def norm_trade_records(
 | 
					def norm_trade_records(
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue