diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index add23b18..68c7238e 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -33,7 +33,6 @@ import asks from fuzzywuzzy import process as fuzzy import numpy as np import tractor -from pydantic.dataclasses import dataclass import wsproto from .._cacheables import open_cached_client @@ -106,14 +105,14 @@ class Pair(Struct, frozen=True): permissions: list[str] -@dataclass -class OHLC: - """Description of the flattened OHLC quote format. +class OHLC(Struct): + ''' + Description of the flattened OHLC quote format. For schema details see: https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-streams - """ + ''' time: int open: float @@ -262,6 +261,7 @@ class Client: for i, bar in enumerate(bars): bar = OHLC(*bar) + bar.typecast() row = [] for j, (name, ftype) in enumerate(_ohlc_dtype[1:]): diff --git a/piker/brokers/kraken/README.rst b/piker/brokers/kraken/README.rst new file mode 100644 index 00000000..80e56913 --- /dev/null +++ b/piker/brokers/kraken/README.rst @@ -0,0 +1,64 @@ +``kraken`` backend +------------------ +though they don't have the most liquidity of all the cexes they sure are +accommodating to those of us who appreciate a little ``xmr``. + +status +****** +current support is *production grade* and both real-time data and order +management should be correct and fast. this backend is used by core devs +for live trading. + + +config +****** +In order to get order mode support your ``brokers.toml`` +needs to have something like the following: + +.. code:: toml + + [kraken] + accounts.spot = 'spot' + key_descr = "spot" + api_key = "69696969696969696696969696969696969696969696969696969696" + secret = "BOOBSBOOBSBOOBSBOOBSBOOBSSMBZ69696969696969669969696969696" + + +If everything works correctly you should see any current positions +loaded in the pps pane on chart load and you should also be able to +check your trade records in the file:: + + /ledgers/trades_kraken_spot.toml + + +An example ledger file will have entries written verbatim from the +trade events schema: + +.. code:: toml + + [TFJBKK-SMBZS-VJ4UWS] + ordertxid = "SMBZSA-7CNQU-3HWLNJ" + postxid = "SMBZSE-M7IF5-CFI7LT" + pair = "XXMRZEUR" + time = 1655691993.4133966 + type = "buy" + ordertype = "limit" + price = "103.97000000" + cost = "499.99999977" + fee = "0.80000000" + vol = "4.80907954" + margin = "0.00000000" + misc = "" + + +your ``pps.toml`` file will have position entries like, + +.. code:: toml + + [kraken.spot."xmreur.kraken"] + size = 4.80907954 + ppu = 103.97000000 + bsuid = "XXMRZEUR" + clears = [ + { tid = "TFJBKK-SMBZS-VJ4UWS", cost = 0.8, price = 103.97, size = 4.80907954, dt = "2022-05-20T02:26:33.413397+00:00" }, + ] diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py index 3abf533e..80feab49 100644 --- a/piker/brokers/kraken/api.py +++ b/piker/brokers/kraken/api.py @@ -19,7 +19,6 @@ Kraken web API wrapping. ''' from contextlib import asynccontextmanager as acm -from dataclasses import field from datetime import datetime import itertools from typing import ( @@ -29,17 +28,16 @@ from typing import ( ) import time -# import trio -# import tractor +from bidict import bidict import pendulum import asks from fuzzywuzzy import process as fuzzy import numpy as np -from pydantic.dataclasses import dataclass import urllib.parse import hashlib import hmac import base64 +import trio from piker import config from piker.brokers._util import ( @@ -48,6 +46,7 @@ from piker.brokers._util import ( BrokerError, DataThrottle, ) +from piker.pp import Transaction from . import log # // @@ -77,31 +76,6 @@ _symbol_info_translation: dict[str, str] = { } -@dataclass -class OHLC: - ''' - Description of the flattened OHLC quote format. - - For schema details see: - https://docs.kraken.com/websockets/#message-ohlc - - ''' - chan_id: int # internal kraken id - chan_name: str # eg. ohlc-1 (name-interval) - pair: str # fx pair - time: float # Begin time of interval, in seconds since epoch - etime: float # End time of interval, in seconds since epoch - open: float # Open price of interval - high: float # High price within interval - low: float # Low price within interval - close: float # Close price of interval - vwap: float # Volume weighted average price within interval - volume: float # Accumulated volume **within interval** - count: int # Number of trades within interval - # (sampled) generated tick data - ticks: list[Any] = field(default_factory=list) - - def get_config() -> dict[str, Any]: conf, path = config.load() @@ -141,8 +115,13 @@ class InvalidKey(ValueError): class Client: + # global symbol normalization table + _ntable: dict[str, str] = {} + _atable: bidict[str, str] = bidict() + def __init__( self, + config: dict[str, str], name: str = '', api_key: str = '', secret: str = '' @@ -153,6 +132,7 @@ class Client: 'User-Agent': 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)' }) + self.conf: dict[str, str] = config self._pairs: list[str] = [] self._name = name self._api_key = api_key @@ -212,8 +192,36 @@ class Client: data['nonce'] = str(int(1000*time.time())) return await self._private(method, data, uri_path) + async def get_balances( + self, + ) -> dict[str, float]: + ''' + Return the set of asset balances for this account + by symbol. + + ''' + resp = await self.endpoint( + 'Balance', + {}, + ) + by_bsuid = resp['result'] + return { + self._atable[sym].lower(): float(bal) + for sym, bal in by_bsuid.items() + } + + async def get_assets(self) -> dict[str, dict]: + resp = await self._public('Assets', {}) + return resp['result'] + + async def cache_assets(self) -> None: + assets = self.assets = await self.get_assets() + for bsuid, info in assets.items(): + self._atable[bsuid] = info['altname'] + async def get_trades( self, + fetch_limit: int = 10, ) -> dict[str, Any]: ''' @@ -225,6 +233,8 @@ class Client: trades_by_id: dict[str, Any] = {} for i in itertools.count(): + if i >= fetch_limit: + break # increment 'ofs' pagination offset ofs = i*50 @@ -254,6 +264,61 @@ class Client: assert count == len(trades_by_id.values()) return trades_by_id + async def get_xfers( + self, + asset: str, + src_asset: str = '', + + ) -> dict[str, Transaction]: + ''' + Get asset balance transfer transactions. + + Currently only withdrawals are supported. + + ''' + xfers: list[dict] = (await self.endpoint( + 'WithdrawStatus', + {'asset': asset}, + ))['result'] + + # eg. resp schema: + # 'result': [{'method': 'Bitcoin', 'aclass': 'currency', 'asset': + # 'XXBT', 'refid': 'AGBJRMB-JHD2M4-NDI3NR', 'txid': + # 'b95d66d3bb6fd76cbccb93f7639f99a505cb20752c62ea0acc093a0e46547c44', + # 'info': 'bc1qc8enqjekwppmw3g80p56z5ns7ze3wraqk5rl9z', + # 'amount': '0.00300726', 'fee': '0.00001000', 'time': + # 1658347714, 'status': 'Success'}]} + + trans: dict[str, Transaction] = {} + for entry in xfers: + # look up the normalized name + asset = self._atable[entry['asset']].lower() + + # XXX: this is in the asset units (likely) so it isn't + # quite the same as a commisions cost necessarily..) + cost = float(entry['fee']) + + tran = Transaction( + fqsn=asset + '.kraken', + tid=entry['txid'], + dt=pendulum.from_timestamp(entry['time']), + bsuid=f'{asset}{src_asset}', + size=-1*( + float(entry['amount']) + + + cost + ), + # since this will be treated as a "sell" it + # shouldn't be needed to compute the be price. + price='NaN', + + # XXX: see note above + cost=0, + ) + trans[tran.tid] = tran + + return trans + async def submit_limit( self, symbol: str, @@ -282,6 +347,7 @@ class Client: "volume": str(size), } return await self.endpoint('AddOrder', data) + else: # Edit order data for kraken api data["txid"] = reqid @@ -301,7 +367,9 @@ class Client: async def symbol_info( self, pair: Optional[str] = None, - ): + + ) -> dict[str, dict[str, str]]: + if pair is not None: pairs = {'pair': pair} else: @@ -327,6 +395,12 @@ class Client: if not self._pairs: self._pairs = await self.symbol_info() + ntable = {} + for restapikey, info in self._pairs.items(): + ntable[restapikey] = ntable[info['wsname']] = info['altname'] + + self._ntable.update(ntable) + return self._pairs async def search_symbols( @@ -424,45 +498,43 @@ class Client: else: raise BrokerError(errmsg) + @classmethod + def normalize_symbol( + cls, + ticker: str + ) -> str: + ''' + Normalize symbol names to to a 3x3 pair from the global + definition map which we build out from the data retreived from + the 'AssetPairs' endpoint, see methods above. + + ''' + ticker = cls._ntable[ticker] + symlen = len(ticker) + if symlen != 6: + raise ValueError(f'Unhandled symbol: {ticker}') + + return ticker.lower() + @acm async def get_client() -> Client: - section = get_config() - if section: + conf = get_config() + if conf: client = Client( - name=section['key_descr'], - api_key=section['api_key'], - secret=section['secret'] + conf, + name=conf['key_descr'], + api_key=conf['api_key'], + secret=conf['secret'] ) else: - client = Client() + client = Client({}) - # at startup, load all symbols locally for fast search - await client.cache_symbols() + # at startup, load all symbols, and asset info in + # batch requests. + async with trio.open_nursery() as nurse: + nurse.start_soon(client.cache_assets) + await client.cache_symbols() yield client - - -def normalize_symbol( - ticker: str -) -> str: - ''' - Normalize symbol names to to a 3x3 pair. - - ''' - remap = { - 'XXBTZEUR': 'XBTEUR', - 'XXMRZEUR': 'XMREUR', - - # ws versions? pretty weird.. - 'XBT/EUR': 'XBTEUR', - 'XMR/EUR': 'XMREUR', - } - symlen = len(ticker) - if symlen != 6: - ticker = remap[ticker] - else: - raise ValueError(f'Unhandled symbol: {ticker}') - - return ticker.lower() diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 588a0924..3641934a 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -18,25 +18,36 @@ Order api and machinery ''' -from contextlib import asynccontextmanager as acm +from collections import ChainMap, defaultdict +from contextlib import ( + asynccontextmanager as acm, + contextmanager as cm, +) from functools import partial -from itertools import chain +from itertools import count +import math from pprint import pformat import time from typing import ( Any, AsyncIterator, - # Callable, - # Optional, - # Union, + Union, ) +from async_generator import aclosing +from bidict import bidict import pendulum import trio import tractor import wsproto -from piker import pp +from piker.pp import ( + Position, + PpTable, + Transaction, + open_trade_ledger, + open_pps, +) from piker.clearing._messages import ( BrokerdCancel, BrokerdError, @@ -46,13 +57,11 @@ from piker.clearing._messages import ( BrokerdPosition, BrokerdStatus, ) -from piker.data.types import Struct from . import log from .api import ( Client, BrokerError, get_client, - normalize_symbol, ) from .feed import ( get_console_log, @@ -61,186 +70,199 @@ from .feed import ( stream_messages, ) +MsgUnion = Union[ + BrokerdCancel, + BrokerdError, + BrokerdFill, + BrokerdOrder, + BrokerdOrderAck, + BrokerdPosition, + BrokerdStatus, +] -class Trade(Struct): - ''' - Trade class that helps parse and validate ownTrades stream - ''' - reqid: str # kraken order transaction id - action: str # buy or sell - price: float # price of asset - size: float # vol of asset - broker_time: str # e.g GTC, GTD +class TooFastEdit(Exception): + 'Edit requests faster then api submissions' async def handle_order_requests( + ws: NoBsWs, client: Client, ems_order_stream: tractor.MsgStream, + token: str, + apiflows: dict[int, ChainMap[dict[str, dict]]], + ids: bidict[str, int], + reqids2txids: dict[int, str], ) -> None: + ''' + Process new order submission requests from the EMS + and deliver acks or errors. - request_msg: dict + ''' + # XXX: UGH, let's unify this.. with ``msgspec``. + msg: dict[str, Any] order: BrokerdOrder + counter = count(1) - async for request_msg in ems_order_stream: - log.info( - 'Received order request:\n' - f'{pformat(request_msg)}' - ) + async for msg in ems_order_stream: + log.info(f'Rx order msg:\n{pformat(msg)}') + match msg: + case { + 'action': 'cancel', + }: + cancel = BrokerdCancel(**msg) + reqid = ids[cancel.oid] - action = request_msg['action'] + try: + txid = reqids2txids[reqid] + except KeyError: + # XXX: not sure if this block ever gets hit now? + log.error('TOO FAST CANCEL/EDIT') + reqids2txids[reqid] = TooFastEdit(reqid) + await ems_order_stream.send( + BrokerdError( + oid=msg['oid'], + symbol=msg['symbol'], + reason=( + f'TooFastEdit reqid:{reqid}, could not cancelling..' + ), - if action in {'buy', 'sell'}: - - account = request_msg['account'] - if account != 'kraken.spot': - log.error( - 'This is a kraken account, \ - only a `kraken.spot` selection is valid' - ) - await ems_order_stream.send(BrokerdError( - oid=request_msg['oid'], - symbol=request_msg['symbol'], - - # reason=f'Kraken only, No account found: `{account}` ?', - reason=( - 'Kraken only, order mode disabled due to ' - 'https://github.com/pikers/piker/issues/299' - ), - - )) - continue - - # validate - order = BrokerdOrder(**request_msg) - # call our client api to submit the order - resp = await client.submit_limit( - symbol=order.symbol, - price=order.price, - action=order.action, - size=order.size, - reqid=order.reqid, - ) - - err = resp['error'] - if err: - oid = order.oid - log.error(f'Failed to submit order: {oid}') - - await ems_order_stream.send( - BrokerdError( - oid=order.oid, - reqid=order.reqid, - symbol=order.symbol, - reason="Failed order submission", - broker_details=resp + ) ) - ) - else: - # TODO: handle multiple orders (cancels?) - # txid is an array of strings - if order.reqid is None: - reqid = resp['result']['txid'][0] else: - # update the internal pairing of oid to krakens - # txid with the new txid that is returned on edit - reqid = resp['result']['txid'] + # call ws api to cancel: + # https://docs.kraken.com/websockets/#message-cancelOrder + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid, + 'txid': [txid], # should be txid from submission + }) - # deliver ack that order has been submitted to broker routing - await ems_order_stream.send( - BrokerdOrderAck( + case { + 'account': 'kraken.spot' as account, + 'action': action, + } if action in {'buy', 'sell'}: - # ems order request id - oid=order.oid, - - # broker specific request id - reqid=reqid, - - # account the made the order - account=order.account - - ) - ) - - elif action == 'cancel': - msg = BrokerdCancel(**request_msg) - - # Send order cancellation to kraken - resp = await client.submit_cancel( - reqid=msg.reqid - ) - - # Check to make sure there was no error returned by - # the kraken endpoint. Assert one order was cancelled. - try: - result = resp['result'] - count = result['count'] - - # check for 'error' key if we received no 'result' - except KeyError: - error = resp.get('error') - - await ems_order_stream.send( - BrokerdError( - oid=msg.oid, - reqid=msg.reqid, - symbol=msg.symbol, - reason="Failed order cancel", - broker_details=resp - ) - ) - - if not error: - raise BrokerError(f'Unknown order cancel response: {resp}') - - else: - if not count: # no orders were cancelled? - - # XXX: what exactly is this from and why would we care? - # there doesn't seem to be any docs here? - # https://docs.kraken.com/rest/#operation/cancelOrder - - # Check to make sure the cancellation is NOT pending, - # then send the confirmation to the ems order stream - pending = result.get('pending') - if pending: - log.error(f'Order {oid} cancel was not yet successful') + # validate + order = BrokerdOrder(**msg) + # logic from old `Client.submit_limit()` + if order.oid in ids: + ep = 'editOrder' + reqid = ids[order.oid] # integer not txid + try: + txid = reqids2txids[reqid] + except KeyError: + # XXX: not sure if this block ever gets hit now? + log.error('TOO FAST EDIT') + reqids2txids[reqid] = TooFastEdit(reqid) await ems_order_stream.send( BrokerdError( - oid=msg.oid, - reqid=msg.reqid, - symbol=msg.symbol, - # TODO: maybe figure out if pending - # cancels will eventually get cancelled - reason="Order cancel is still pending?", - broker_details=resp + oid=msg['oid'], + symbol=msg['symbol'], + reason=( + f'TooFastEdit reqid:{reqid}, cancelling..' + ), + ) ) + else: + extra = { + 'orderid': txid, # txid + 'newuserref': str(reqid), + } - else: # order cancel success case. - - await ems_order_stream.send( - BrokerdStatus( - reqid=msg.reqid, - account=msg.account, - time_ns=time.time_ns(), - status='cancelled', - reason='Order cancelled', - broker_details={'name': 'kraken'} - ) + else: + ep = 'addOrder' + reqid = next(counter) + ids[order.oid] = reqid + log.debug( + f"Adding order {reqid}\n" + f'{ids}' ) - else: - log.error(f'Unknown order command: {request_msg}') + extra = { + 'ordertype': 'limit', + 'type': order.action, + } + + psym = order.symbol.upper() + pair = f'{psym[:3]}/{psym[3:]}' + + # XXX: ACK the request **immediately** before sending + # the api side request to ensure the ems maps the oid -> + # reqid correctly! + resp = BrokerdOrderAck( + oid=order.oid, # ems order request id + reqid=reqid, # our custom int mapping + account=account, # piker account + ) + await ems_order_stream.send(resp) + + # call ws api to submit the order: + # https://docs.kraken.com/websockets/#message-addOrder + req = { + 'event': ep, + 'token': token, + + 'reqid': reqid, # remapped-to-int uid from ems + # XXX: we set these to the same value since for us + # a request dialog and an order's state-liftime are + # treated the same. Also this used to not work, the + # values used to be mutex for some odd reason until + # we dealt with support about it, and then they + # fixed it and pretended like we were crazy and the + # issue was never there lmao... coorps bro. + # 'userref': str(reqid), + 'userref': str(reqid), + 'pair': pair, + 'price': str(order.price), + 'volume': str(order.size), + # validate: 'true', # validity check, nothing more + } | extra + + log.info(f'Submitting WS order request:\n{pformat(req)}') + await ws.send_msg(req) + + # placehold for sanity checking in relay loop + apiflows[reqid].maps.append(msg) + + case _: + account = msg.get('account') + if account != 'kraken.spot': + log.error( + 'This is a kraken account, \ + only a `kraken.spot` selection is valid' + ) + + await ems_order_stream.send( + BrokerdError( + oid=msg['oid'], + symbol=msg['symbol'], + reason=( + 'Invalid request msg:\n{msg}' + )) + ) @acm async def subscribe( ws: wsproto.WSConnection, token: str, - subs: list[str] = ['ownTrades', 'openOrders'], + subs: list[tuple[str, dict]] = [ + ('ownTrades', { + # don't send first 50 trades on startup, + # we already pull this manually from the rest endpoint. + 'snapshot': False, + },), + ('openOrders', { + # include rate limit counters + 'ratecounter': True, + },), + + ], ): ''' Setup ws api subscriptions: @@ -251,14 +273,16 @@ async def subscribe( ''' # more specific logic for this in kraken's sync client: # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 - assert token - for sub in subs: + subnames: set[str] = set() + + for name, sub_opts in subs: msg = { 'event': 'subscribe', 'subscription': { - 'name': sub, + 'name': name, 'token': token, + **sub_opts, } } @@ -267,7 +291,34 @@ async def subscribe( # since internally the ws methods appear to be FIFO # locked. await ws.send_msg(msg) + subnames.add(name) + # wait on subscriptionn acks + with trio.move_on_after(5): + while True: + match (msg := await ws.recv_msg()): + case { + 'event': 'subscriptionStatus', + 'status': 'subscribed', + 'subscription': sub_opts, + } as msg: + log.info( + f'Sucessful subscribe for {sub_opts}:\n' + f'{pformat(msg)}' + ) + subnames.remove(sub_opts['name']) + if not subnames: + break + + case { + 'event': 'subscriptionStatus', + 'status': 'error', + 'errorMessage': errmsg, + } as msg: + raise RuntimeError( + f'{errmsg}\n\n' + f'{pformat(msg)}' + ) yield for sub in subs: @@ -281,10 +332,56 @@ async def subscribe( # await ws.recv_msg() +def trades2pps( + table: PpTable, + acctid: str, + new_trans: dict[str, Transaction] = {}, + +) -> tuple[ + list[BrokerdPosition], + list[Transaction], +]: + if new_trans: + updated = table.update_from_trans( + new_trans, + ) + log.info(f'Updated pps:\n{pformat(updated)}') + + pp_entries, closed_pp_objs = table.dump_active() + pp_objs: dict[Union[str, int], Position] = table.pps + + pps: dict[int, Position] + position_msgs: list[dict] = [] + + for pps in [pp_objs, closed_pp_objs]: + for tid, p in pps.items(): + msg = BrokerdPosition( + broker='kraken', + # XXX: ok so this is annoying, we're + # relaying an account name with the + # backend suffix prefixed but when + # reading accounts from ledgers we + # don't need it and/or it's prefixed + # in the section table.. we should + # just strip this from the message + # right since `.broker` is already + # included? + account='kraken.' + acctid, + symbol=p.symbol.front_fqsn(), + size=p.size, + avg_price=p.ppu, + currency='', + ) + position_msgs.append(msg) + + return position_msgs + + @tractor.context async def trades_dialogue( ctx: tractor.Context, loglevel: str = None, + ) -> AsyncIterator[dict[str, Any]]: # XXX: required to propagate ``tractor`` loglevel to piker logging @@ -305,236 +402,707 @@ async def trades_dialogue( acctid = client._name acc_name = 'kraken.' + acctid - # pull and deliver trades ledger - trades = await client.get_trades() - log.info( - f'Loaded {len(trades)} trades from account `{acc_name}`' - ) - trans = await update_ledger(acctid, trades) - active, closed = pp.update_pps_conf( - 'kraken', - acctid, - trade_records=trans, - ledger_reload={}.fromkeys(t.bsuid for t in trans), - ) + # task local msg dialog tracking + apiflows: defaultdict[ + int, + ChainMap[dict[str, dict]], + ] = defaultdict(ChainMap) - position_msgs: list[dict] = [] - pps: dict[int, pp.Position] - for pps in [active, closed]: - for tid, p in pps.items(): - msg = BrokerdPosition( - broker='kraken', - account=acc_name, - symbol=p.symbol.front_fqsn(), - size=p.size, - avg_price=p.be_price, - currency='', - ) - position_msgs.append(msg) + # 2way map for ems ids to kraken int reqids.. + ids: bidict[str, int] = bidict() + reqids2txids: bidict[int, str] = bidict() - await ctx.started( - (position_msgs, [acc_name]) - ) + # NOTE: testing code for making sure the rt incremental update + # of positions, via newly generated msgs works. In order to test + # this, + # - delete the *ABSOLUTE LAST* entry from accont's corresponding + # trade ledgers file (NOTE this MUST be the last record + # delivered from the + # api ledger), + # - open you ``pps.toml`` and find that same tid and delete it + # from the pp's clears table, + # - set this flag to `True` + # + # You should see an update come in after the order mode + # boots up which shows your latest correct asset + # balance size after the "previously unknown simulating a live + # fill" update comes in on the relay loop even though the fill + # will be ignored by the ems (no known reqid) the pp msg should + # update things correctly. + simulate_pp_update: bool = False - # Get websocket token for authenticated data stream - # Assert that a token was actually received. - resp = await client.endpoint('GetWebSocketsToken', {}) + with ( + open_pps( + 'kraken', + acctid, + ) as table, - err = resp.get('error') - if err: - raise BrokerError(err) - - token = resp['result']['token'] - - ws: NoBsWs - async with ( - ctx.open_stream() as ems_stream, - open_autorecon_ws( - 'wss://ws-auth.kraken.com/', - fixture=partial( - subscribe, - token=token, - ), - ) as ws, - trio.open_nursery() as n, + open_trade_ledger( + 'kraken', + acctid, + ) as ledger_dict, ): - # task for processing inbound requests from ems - n.start_soon(handle_order_requests, client, ems_stream) + # transaction-ify the ledger entries + ledger_trans = norm_trade_records(ledger_dict) - count: int = 0 + # TODO: eventually probably only load + # as far back as it seems is not deliverd in the + # most recent 50 trades and assume that by ordering we + # already have those records in the ledger. + tids2trades = await client.get_trades() + ledger_dict.update(tids2trades) + api_trans = norm_trade_records(tids2trades) - # process and relay trades events to ems + # retrieve kraken reported balances + # and do diff with ledger to determine + # what amount of trades-transactions need + # to be reloaded. + sizes = await client.get_balances() + for dst, size in sizes.items(): + # we don't care about tracking positions + # in the user's source fiat currency. + if dst == client.conf['src_fiat']: + continue + + def has_pp(dst: str) -> Position | bool: + pps_dst_assets = {bsuid[:3]: bsuid for bsuid in table.pps} + pair = pps_dst_assets.get(dst) + pp = table.pps.get(pair) + + if ( + not pair or not pp + or not math.isclose(pp.size, size) + ): + return False + + return pp + + pos = has_pp(dst) + if not pos: + + # we have a balance for which there is no pp + # entry? so we have to likely update from the + # ledger. + updated = table.update_from_trans(ledger_trans) + log.info(f'Updated pps from ledger:\n{pformat(updated)}') + pos = has_pp(dst) + + if not pos and not simulate_pp_update: + # try reloading from API + table.update_from_trans(api_trans) + pos = has_pp(dst) + if not pos: + + # get transfers to make sense of abs balances. + # NOTE: we do this after ledger and API + # loading since we might not have an entry + # in the ``pps.toml`` for the necessary pair + # yet and thus this likely pair grabber will + # likely fail. + likely_pair = { + bsuid[:3]: bsuid + for bsuid in table.pps + }.get(dst) + if not likely_pair: + raise ValueError( + 'Could not find a position pair in ' + 'ledger for likely widthdrawal ' + f'candidate: {dst}' + ) + + if likely_pair: + # this was likely pp that had a withdrawal + # from the dst asset out of the account. + + xfer_trans = await client.get_xfers( + dst, + src_asset=likely_pair[3:], + ) + if xfer_trans: + updated = table.update_from_trans( + xfer_trans, + cost_scalar=1, + ) + log.info( + 'Updated {dst} from transfers:\n' + f'{pformat(updated)}' + ) + + if not has_pp(dst): + raise ValueError( + 'Could not reproduce balance:\n' + f'dst: {dst}, {size}\n' + ) + + # only for simulate-testing a "new fill" since + # otherwise we have to actually conduct a live clear. + if simulate_pp_update: + tid = list(tids2trades)[0] + last_trade_dict = tids2trades[tid] + # stage a first reqid of `0` + reqids2txids[0] = last_trade_dict['ordertxid'] + + ppmsgs = trades2pps( + table, + acctid, + ) + await ctx.started((ppmsgs, [acc_name])) + + # Get websocket token for authenticated data stream + # Assert that a token was actually received. + resp = await client.endpoint('GetWebSocketsToken', {}) + err = resp.get('error') + if err: + raise BrokerError(err) + + token = resp['result']['token'] + + ws: NoBsWs + async with ( + ctx.open_stream() as ems_stream, + open_autorecon_ws( + 'wss://ws-auth.kraken.com/', + fixture=partial( + subscribe, + token=token, + ), + ) as ws, + aclosing(stream_messages(ws)) as stream, + trio.open_nursery() as nurse, + ): + stream = stream_messages(ws) + + # task for processing inbound requests from ems + nurse.start_soon( + handle_order_requests, + ws, + client, + ems_stream, + token, + apiflows, + ids, + reqids2txids, + ) + + # enter relay loop + await handle_order_updates( + ws, + stream, + ems_stream, + apiflows, + ids, + reqids2txids, + table, + api_trans, + acctid, + acc_name, + token, + ) + + +async def handle_order_updates( + ws: NoBsWs, + ws_stream: AsyncIterator, + ems_stream: tractor.MsgStream, + apiflows: dict[int, ChainMap[dict[str, dict]]], + ids: bidict[str, int], + reqids2txids: bidict[int, str], + table: PpTable, + + # transaction records which will be updated + # on new trade clearing events (aka order "fills") + ledger_trans: dict[str, Transaction], + acctid: str, + acc_name: str, + token: str, + +) -> None: + ''' + Main msg handling loop for all things order management. + + This code is broken out to make the context explicit and state variables + defined in the signature clear to the reader. + + ''' + async for msg in ws_stream: + match msg: + + # TODO: turns out you get the fill events from the + # `openOrders` before you get this, so it might be better + # to do all fill/status/pp updates in that sub and just use + # this one for ledger syncs? + + # XXX: ASK SUPPORT ABOUT THIS! + + # For eg. we could take the "last 50 trades" and do a diff + # with the ledger and then only do a re-sync if something + # seems amiss? + + # process and relay clearing trade events to ems # https://docs.kraken.com/websockets/#message-ownTrades - async for msg in stream_messages(ws): - match msg: - case [ - trades_msgs, - 'ownTrades', - {'sequence': seq}, - ]: - # XXX: do we actually need this orrr? - # ensure that we are only processing new trades? - assert seq > count - count += 1 + # format as tid -> trade event map + # eg. received msg format, + # [{'TOKWHY-SMTUB-G5DOI6': { + # 'cost': '95.29047', + # 'fee': '0.24776', + # 'margin': '0.00000', + # 'ordertxid': 'OKSUXM-3OLSB-L7TN72', + # 'ordertype': 'limit', + # 'pair': 'XBT/EUR', + # 'postxid': 'TKH2SE-M7IF5-CFI7LT', + # 'price': '21268.20000', + # 'time': '1657990947.640891', + # 'type': 'buy', + # 'vol': '0.00448042' + # }}] + case [ + trades_msgs, + 'ownTrades', + {'sequence': seq}, + ]: + log.info( + f'ownTrades update_{seq}:\n' + f'{pformat(trades_msgs)}' + ) + trades = { + tid: trade + for entry in trades_msgs + for (tid, trade) in entry.items() - # flatten msgs for processing - trades = { - tid: trade - for entry in trades_msgs - for (tid, trade) in entry.items() + # don't re-process datums we've already seen + # if tid not in ledger_trans + } + for tid, trade in trades.items(): + assert tid not in ledger_trans + txid = trade['ordertxid'] + reqid = trade.get('userref') - # only emit entries which are already not-in-ledger - if tid not in {r.tid for r in trans} - } - for tid, trade in trades.items(): + if not reqid: + # NOTE: yet again, here we don't have any ref to the + # reqid that's generated by us (as the client) and + # sent in the order request, so we have to look it + # up from our own registry... + reqid = reqids2txids.inverse[txid] + if not reqid: + log.warning(f'Unknown trade dialog: {txid}') - # parse-cast - reqid = trade['ordertxid'] - action = trade['type'] - price = float(trade['price']) - size = float(trade['vol']) - broker_time = float(trade['time']) + action = trade['type'] + price = float(trade['price']) + size = float(trade['vol']) + broker_time = float(trade['time']) + + # TODO: we can emit this on the "closed" state in + # the `openOrders` sub-block below. + status_msg = BrokerdStatus( + reqid=reqid, + time_ns=time.time_ns(), + + account=acc_name, + status='filled', + filled=size, + reason='Order filled by kraken', + broker_details={ + 'name': 'kraken', + 'broker_time': broker_time + }, + + # TODO: figure out if kraken gives a count + # of how many units of underlying were + # filled. Alternatively we can decrement + # this value ourselves by associating and + # calcing from the diff with the original + # client-side request, see: + # https://github.com/pikers/piker/issues/296 + remaining=0, + ) + await ems_stream.send(status_msg) + + new_trans = norm_trade_records(trades) + ppmsgs = trades2pps( + table, + acctid, + new_trans, + ) + for pp_msg in ppmsgs: + await ems_stream.send(pp_msg) + + ledger_trans.update(new_trans) + + # process and relay order state change events + # https://docs.kraken.com/websockets/#message-openOrders + case [ + order_msgs, + 'openOrders', + {'sequence': seq}, + ]: + for order_msg in order_msgs: + log.info( + f'`openOrders` msg update_{seq}:\n' + f'{pformat(order_msg)}' + ) + txid, update_msg = list(order_msg.items())[0] + match update_msg: + + # XXX: eg. of full msg schema: + # {'avg_price': _, + # 'cost': _, + # 'descr': { + # 'close': None, + # 'leverage': None, + # 'order': descr, + # 'ordertype': 'limit', + # 'pair': 'XMR/EUR', + # 'price': '74.94000000', + # 'price2': '0.00000000', + # 'type': 'buy' + # }, + # 'expiretm': None, + # 'fee': '0.00000000', + # 'limitprice': '0.00000000', + # 'misc': '', + # 'oflags': 'fciq', + # 'opentm': '1656966131.337344', + # 'refid': None, + # 'starttm': None, + # 'stopprice': '0.00000000', + # 'timeinforce': 'GTC', + # 'vol': submit_vlm, # '13.34400854', + # 'vol_exec': exec_vlm} # 0.0000 + case { + 'userref': reqid, + + # during a fill this field is **not** + # provided! but, it is always avail on + # actual status updates.. see case above. + 'status': status, + **rest, + }: + # TODO: store this in a ChainMap instance + # per order dialog. + # submit_vlm = rest.get('vol', 0) + # fee = rest.get('fee', 0) + if status == 'closed': + vlm = 0 + else: + vlm = rest.get('vol_exec', 0) + + if status == 'canceled': + reqids2txids.pop(reqid) + + # we specially ignore internal order + # updates triggered by kraken's "edit" + # endpoint. + if rest['cancel_reason'] == 'Order replaced': + # TODO: + # - put the edit order status update + # code here? + # - send open order status msg. + log.info( + f'Order replaced: {txid}@reqid={reqid}' + ) + + # we don't do normal msg emission on + # a replacement cancel since it's + # the result of an "edited order" + # and thus we mask the kraken + # backend cancel then create details + # from the ems side. + continue + else: + # XXX: keep kraken engine's ``txid`` synced + # with the ems dialog's ``reqid``. + reqids2txids[reqid] = txid + + ourreqid = reqids2txids.inverse.get(txid) + if ourreqid is None: + log.info( + 'Mapping new txid to our reqid:\n' + f'{reqid} -> {txid}' + ) + + oid = ids.inverse.get(reqid) + + if ( + status == 'open' + and ( + # XXX: too fast edit handled by the + # request handler task: this + # scenario occurs when ems side + # requests are coming in too quickly + # such that there is no known txid + # yet established for the ems + # dialog's last reqid when the + # request handler task is already + # receceiving a new update for that + # reqid. In this case we simply mark + # the reqid as being "too fast" and + # then when we get the next txid + # update from kraken's backend, and + # thus the new txid, we simply + # cancel the order for now. + + # TODO: Ideally we eventually + # instead make the client side of + # the ems block until a submission + # is confirmed by the backend + # instead of this hacky throttle + # style approach and avoid requests + # coming in too quickly on the other + # side of the ems, aka the client + # <-> ems dialog. + (toofast := isinstance( + reqids2txids.get(reqid), + TooFastEdit + )) + + # pre-existing open order NOT from + # this EMS session. + or (noid := oid is None) + ) + ): + if toofast: + # TODO: don't even allow this case + # by not moving the client side line + # until an edit confirmation + # arrives... + log.cancel( + f'Received too fast edit {txid}:\n' + f'{update_msg}\n' + 'Cancelling order for now!..' + ) + + elif noid: # a non-ems-active order + # TODO: handle these and relay them + # through the EMS to the client / UI + # side! + log.cancel( + f'Rx unknown active order {txid}:\n' + f'{update_msg}\n' + 'Cancelling order for now!..' + ) + + # call ws api to cancel: + # https://docs.kraken.com/websockets/#message-cancelOrder + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid or 0, + 'txid': [txid], + }) + continue + + # remap statuses to ems set. + ems_status = { + 'open': 'submitted', + 'closed': 'filled', + 'canceled': 'cancelled', + # do we even need to forward + # this state to the ems? + 'pending': 'pending', + }[status] + # TODO: i like the open / closed semantics + # more we should consider them for internals + + # send BrokerdStatus messages for all + # order state updates + resp = BrokerdStatus( - # send a fill msg for gui update - fill_msg = BrokerdFill( reqid=reqid, - time_ns=time.time_ns(), + time_ns=time.time_ns(), # cuz why not + account=f'kraken.{acctid}', - action=action, - size=size, + # everyone doin camel case.. + status=ems_status, # force lower case + + filled=vlm, + reason='', # why held? + remaining=vlm, + + # TODO: need to extract the submit vlm + # from a prior msg update.. + # ( + # float(submit_vlm) + # - + # float(exec_vlm) + # ), + + broker_details=dict( + {'name': 'kraken'}, **update_msg + ), + ) + + apiflows[reqid].maps.append(update_msg) + await ems_stream.send(resp) + + # fill msg. + # eg. contents (in total): + # { + # 'vol_exec': '0.84709869', + # 'cost': '101.25370642', + # 'fee': '0.26325964', + # 'avg_price': '119.53000001', + # 'userref': 0, + # } + # NOTE: there is no `status` field + case { + 'vol_exec': vlm, + 'avg_price': price, + 'userref': reqid, + **rest, + } as msg: + + ourreqid = reqids2txids.inverse[txid] + assert reqid == ourreqid + log.info( + f'openOrders vlm={vlm} Fill for {reqid}:\n' + f'{update_msg}' + ) + + fill_msg = BrokerdFill( + time_ns=time.time_ns(), + reqid=reqid, + + # action=action, # just use size value + # for now? + size=vlm, price=price, + # TODO: maybe capture more msg data # i.e fees? - broker_details={'name': 'kraken'}, + broker_details={'name': 'kraken'} | trade, broker_time=broker_time ) await ems_stream.send(fill_msg) - filled_msg = BrokerdStatus( - reqid=reqid, - time_ns=time.time_ns(), - - account=acc_name, - status='filled', - filled=size, - reason='Order filled by kraken', - broker_details={ - 'name': 'kraken', - 'broker_time': broker_time - }, - - # TODO: figure out if kraken gives a count - # of how many units of underlying were - # filled. Alternatively we can decrement - # this value ourselves by associating and - # calcing from the diff with the original - # client-side request, see: - # https://github.com/pikers/piker/issues/296 - remaining=0, + case _: + log.warning( + 'Unknown orders msg:\n' + f'{txid}:{order_msg}' ) - await ems_stream.send(filled_msg) - # update ledger and position tracking - trans = await update_ledger(acctid, trades) - active, closed = pp.update_pps_conf( - 'kraken', - acctid, - trade_records=trans, - ledger_reload={}.fromkeys( - t.bsuid for t in trans), - ) + # order request status updates + case { + 'event': etype, + 'status': status, + 'reqid': reqid, + **rest, + } as event if ( + etype in { + 'addOrderStatus', + 'editOrderStatus', + 'cancelOrderStatus', + } + ): + log.info( + f'{etype}:\n' + f'{pformat(msg)}' + ) - # emit pp msgs - for pos in filter( - bool, - chain(active.values(), closed.values()), - ): - pp_msg = BrokerdPosition( - broker='kraken', + txid = rest.get('txid') + lasttxid = reqids2txids.get(reqid) - # XXX: ok so this is annoying, we're - # relaying an account name with the - # backend suffix prefixed but when - # reading accounts from ledgers we - # don't need it and/or it's prefixed - # in the section table.. we should - # just strip this from the message - # right since `.broker` is already - # included? - account=f'kraken.{acctid}', - symbol=pos.symbol.front_fqsn(), - size=pos.size, - avg_price=pos.be_price, + # TODO: relay these to EMS once it supports + # open order loading. + oid = ids.inverse.get(reqid) + if not oid: + log.warning( + 'Unknown order status update?:\n' + f'{event}' + ) + continue - # TODO - # currency='' - ) - await ems_stream.send(pp_msg) + # update the msg chain + chain = apiflows[reqid] + chain.maps.append(event) - case [ - trades_msgs, - 'openOrders', - {'sequence': seq}, - ]: - # TODO: async order update handling which we - # should remove from `handle_order_requests()` - # above: - # https://github.com/pikers/piker/issues/293 - # https://github.com/pikers/piker/issues/310 - log.info(f'Order update {seq}:{trades_msgs}') + if status == 'error': + # any of ``{'add', 'edit', 'cancel'}`` + action = etype.removesuffix('OrderStatus') + errmsg = rest['errorMessage'] + log.error( + f'Failed to {action} order {reqid}:\n' + f'{errmsg}' + ) + await ems_stream.send(BrokerdError( + oid=oid, + # XXX: use old reqid in case it changed? + reqid=reqid, + symbol=chain.get('symbol', 'N/A'), - case _: - log.warning(f'Unhandled trades msg: {msg}') - await tractor.breakpoint() + reason=f'Failed {action}:\n{errmsg}', + broker_details=event + )) + + txid = txid or lasttxid + if ( + txid + + # we throttle too-fast-requests on the ems side + and not isinstance(txid, TooFastEdit) + ): + # client was editting too quickly + # so we instead cancel this order + log.cancel( + f'Cancelling {reqid}@{txid} due to:\n {event}') + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid or 0, + 'txid': [txid], + }) + case _: + + log.warning(f'Unhandled trades update msg: {msg}') def norm_trade_records( ledger: dict[str, Any], -) -> list[pp.Transaction]: +) -> dict[str, Transaction]: - records: list[pp.Transaction] = [] + records: dict[str, Transaction] = {} for tid, record in ledger.items(): - size = record.get('vol') * { + size = float(record.get('vol')) * { 'buy': 1, 'sell': -1, }[record['type']] - bsuid = record['pair'] - norm_sym = normalize_symbol(bsuid) - records.append( - pp.Transaction( - fqsn=f'{norm_sym}.kraken', - tid=tid, - size=float(size), - price=float(record['price']), - cost=float(record['fee']), - dt=pendulum.from_timestamp(float(record['time'])), - bsuid=bsuid, + # we normalize to kraken's `altname` always.. + bsuid = norm_sym = Client.normalize_symbol(record['pair']) - # XXX: there are no derivs on kraken right? - # expiry=expiry, - ) + records[tid] = Transaction( + fqsn=f'{norm_sym}.kraken', + tid=tid, + size=size, + price=float(record['price']), + cost=float(record['fee']), + dt=pendulum.from_timestamp(float(record['time'])), + bsuid=bsuid, + + # XXX: there are no derivs on kraken right? + # expiry=expiry, ) return records -async def update_ledger( +@cm +def open_ledger( acctid: str, trade_entries: list[dict[str, Any]], -) -> list[pp.Transaction]: +) -> set[Transaction]: + ''' + Write recent session's trades to the user's (local) ledger file. - # write recent session's trades to the user's (local) ledger file. - with pp.open_trade_ledger( + ''' + with open_trade_ledger( 'kraken', acctid, ) as ledger: - ledger.update(trade_entries) + yield ledger - # normalize to transaction form - records = norm_trade_records(trade_entries) - return records + # update on exit + ledger.update(trade_entries) diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index e52f49aa..7c589d85 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -19,7 +19,6 @@ Real-time and historical data feed endpoints. ''' from contextlib import asynccontextmanager as acm -from dataclasses import asdict from datetime import datetime from typing import ( Any, @@ -28,6 +27,7 @@ from typing import ( ) import time +from async_generator import aclosing from fuzzywuzzy import process as fuzzy import numpy as np import pendulum @@ -49,7 +49,6 @@ from piker.data._web_bs import open_autorecon_ws, NoBsWs from . import log from .api import ( Client, - OHLC, ) @@ -87,6 +86,30 @@ class Pair(Struct): ordermin: float # minimum order volume for pair +class OHLC(Struct): + ''' + Description of the flattened OHLC quote format. + + For schema details see: + https://docs.kraken.com/websockets/#message-ohlc + + ''' + chan_id: int # internal kraken id + chan_name: str # eg. ohlc-1 (name-interval) + pair: str # fx pair + time: float # Begin time of interval, in seconds since epoch + etime: float # End time of interval, in seconds since epoch + open: float # Open price of interval + high: float # High price within interval + low: float # Low price within interval + close: float # Close price of interval + vwap: float # Volume weighted average price within interval + volume: float # Accumulated volume **within interval** + count: int # Number of trades within interval + # (sampled) generated tick data + ticks: list[Any] = [] + + async def stream_messages( ws: NoBsWs, ): @@ -117,9 +140,8 @@ async def stream_messages( too_slow_count = 0 continue - if isinstance(msg, dict): - if msg.get('event') == 'heartbeat': - + match msg: + case {'event': 'heartbeat'}: now = time.time() delay = now - last_hb last_hb = now @@ -130,11 +152,9 @@ async def stream_messages( continue - err = msg.get('errorMessage') - if err: - raise BrokerError(err) - else: - yield msg + case _: + # passthrough sub msgs + yield msg async def process_data_feed_msgs( @@ -145,44 +165,69 @@ async def process_data_feed_msgs( ''' async for msg in stream_messages(ws): + match msg: + case { + 'errorMessage': errmsg + }: + raise BrokerError(errmsg) - chan_id, *payload_array, chan_name, pair = msg + case { + 'event': 'subscriptionStatus', + } as sub: + log.info( + 'WS subscription is active:\n' + f'{sub}' + ) + continue - if 'ohlc' in chan_name: + case [ + chan_id, + *payload_array, + chan_name, + pair + ]: + if 'ohlc' in chan_name: + ohlc = OHLC( + chan_id, + chan_name, + pair, + *payload_array[0] + ) + ohlc.typecast() + yield 'ohlc', ohlc - yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0]) + elif 'spread' in chan_name: - elif 'spread' in chan_name: + bid, ask, ts, bsize, asize = map( + float, payload_array[0]) - bid, ask, ts, bsize, asize = map(float, payload_array[0]) + # TODO: really makes you think IB has a horrible API... + quote = { + 'symbol': pair.replace('/', ''), + 'ticks': [ + {'type': 'bid', 'price': bid, 'size': bsize}, + {'type': 'bsize', 'price': bid, 'size': bsize}, - # TODO: really makes you think IB has a horrible API... - quote = { - 'symbol': pair.replace('/', ''), - 'ticks': [ - {'type': 'bid', 'price': bid, 'size': bsize}, - {'type': 'bsize', 'price': bid, 'size': bsize}, + {'type': 'ask', 'price': ask, 'size': asize}, + {'type': 'asize', 'price': ask, 'size': asize}, + ], + } + yield 'l1', quote - {'type': 'ask', 'price': ask, 'size': asize}, - {'type': 'asize', 'price': ask, 'size': asize}, - ], - } - yield 'l1', quote + # elif 'book' in msg[-2]: + # chan_id, *payload_array, chan_name, pair = msg + # print(msg) - # elif 'book' in msg[-2]: - # chan_id, *payload_array, chan_name, pair = msg - # print(msg) - - else: - print(f'UNHANDLED MSG: {msg}') - yield msg + case _: + print(f'UNHANDLED MSG: {msg}') + # yield msg def normalize( ohlc: OHLC, ) -> dict: - quote = asdict(ohlc) + quote = ohlc.to_dict() quote['broker_ts'] = quote['time'] quote['brokerd_ts'] = time.time() quote['symbol'] = quote['pair'] = quote['pair'].replace('/', '') @@ -376,17 +421,15 @@ async def stream_quotes( # see the tips on reconnection logic: # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds ws: NoBsWs - async with open_autorecon_ws( - 'wss://ws.kraken.com/', - fixture=subscribe, - ) as ws: - + async with ( + open_autorecon_ws( + 'wss://ws.kraken.com/', + fixture=subscribe, + ) as ws, + aclosing(process_data_feed_msgs(ws)) as msg_gen, + ): # pull a first quote and deliver - msg_gen = process_data_feed_msgs(ws) - - # TODO: use ``anext()`` when it lands in 3.10! - typ, ohlc_last = await msg_gen.__anext__() - + typ, ohlc_last = await anext(msg_gen) topic, quote = normalize(ohlc_last) task_status.started((init_msgs, quote)) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 1a764812..5f08cfa5 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -88,7 +88,8 @@ def mk_check( @dataclass class _DarkBook: - '''EMS-trigger execution book. + ''' + EMS-trigger execution book. Contains conditions for executions (aka "orders" or "triggers") which are not exposed to brokers and thus the market; i.e. these are @@ -653,6 +654,13 @@ async def translate_and_relay_brokerd_events( else: # check for existing live flow entry entry = book._ems_entries.get(oid) + old_reqid = entry.reqid + + if old_reqid and old_reqid != reqid: + log.warning( + f'Brokerd order id change for {oid}:\n' + f'{old_reqid} -> {reqid}' + ) # initial response to brokerd order request if name == 'ack': @@ -663,6 +671,10 @@ async def translate_and_relay_brokerd_events( # a ``BrokerdOrderAck`` **must** be sent after an order # request in order to establish this id mapping. book._ems2brokerd_ids[oid] = reqid + log.info( + 'Rx ACK for order\n' + f'oid: {oid} -> reqid: {reqid}' + ) # new order which has not yet be registered into the # local ems book, insert it now and handle 2 cases: @@ -690,6 +702,9 @@ async def translate_and_relay_brokerd_events( # a live flow now exists oid = entry.oid + # TODO: instead this should be our status set. + # ack, open, fill, closed, cancelled' + resp = None broker_details = {} diff --git a/piker/clearing/_messages.py b/piker/clearing/_messages.py index e5813c78..c30ada54 100644 --- a/piker/clearing/_messages.py +++ b/piker/clearing/_messages.py @@ -186,6 +186,7 @@ class BrokerdStatus(Struct): # XXX: should be best effort set for every update account: str = '' + # TODO: instead (ack, pending, open, fill, clos(ed), cancelled) # { # 'submitted', # 'cancelled', diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index 6910d206..218d46e0 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -39,7 +39,11 @@ from docker.errors import ( APIError, # ContainerError, ) -from requests.exceptions import ConnectionError, ReadTimeout +import requests +from requests.exceptions import ( + ConnectionError, + ReadTimeout, +) from ..log import get_logger, get_console_log from .. import config @@ -188,13 +192,12 @@ class Container: def hard_kill(self, start: float) -> None: delay = time.time() - start - log.error( - f'Failed to kill container {self.cntr.id} after {delay}s\n' - 'sending SIGKILL..' - ) # get out the big guns, bc apparently marketstore # doesn't actually know how to terminate gracefully # :eyeroll:... + log.error( + f'SIGKILL-ing: {self.cntr.id} after {delay}s\n' + ) self.try_signal('SIGKILL') self.cntr.wait( timeout=3, @@ -218,20 +221,25 @@ class Container: self.try_signal('SIGINT') start = time.time() - for _ in range(30): + for _ in range(6): with trio.move_on_after(0.5) as cs: - cs.shield = True log.cancel('polling for CNTR logs...') try: await self.process_logs_until(stop_msg) except ApplicationLogError: hard_kill = True + else: + # if we aren't cancelled on above checkpoint then we + # assume we read the expected stop msg and + # terminated. + break - # if we aren't cancelled on above checkpoint then we - # assume we read the expected stop msg and terminated. - break + if cs.cancelled_caught: + # on timeout just try a hard kill after + # a quick container sync-wait. + hard_kill = True try: log.info(f'Polling for container shutdown:\n{cid}') @@ -254,9 +262,16 @@ class Container: except ( docker.errors.APIError, ConnectionError, + requests.exceptions.ConnectionError, + trio.Cancelled, ): log.exception('Docker connection failure') self.hard_kill(start) + raise + + except trio.Cancelled: + log.exception('trio cancelled...') + self.hard_kill(start) else: hard_kill = True @@ -305,16 +320,13 @@ async def open_ahabd( )) try: - # TODO: we might eventually want a proxy-style msg-prot here # to allow remote control of containers without needing # callers to have root perms? await trio.sleep_forever() finally: - # needed? - with trio.CancelScope(shield=True): - await cntr.cancel(stop_msg) + await cntr.cancel(stop_msg) async def start_ahab( diff --git a/piker/data/types.py b/piker/data/types.py index c6cba61d..d8926610 100644 --- a/piker/data/types.py +++ b/piker/data/types.py @@ -66,3 +66,10 @@ class Struct( ).decode( msgspec.msgpack.Encoder().encode(self) ) + + def typecast( + self, + # fields: Optional[list[str]] = None, + ) -> None: + for fname, ftype in self.__annotations__.items(): + setattr(self, fname, ftype(getattr(self, fname))) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index f79c56ae..05603c63 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -63,7 +63,7 @@ from ..log import get_logger log = get_logger(__name__) # TODO: load this from a config.toml! -_quote_throttle_rate: int = 60 # Hz +_quote_throttle_rate: int = 22 # Hz # a working tick-type-classes template diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index ce08a64c..41078e05 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -794,15 +794,11 @@ async def process_trades_and_update_ui( pp_msg_symbol = msg['symbol'].lower() fqsn = sym.front_fqsn() broker, key = sym.front_feed() - # print( - # f'pp msg symbol: {pp_msg_symbol}\n', - # f'fqsn: {fqsn}\n', - # f'front key: {key}\n', - # ) - if ( - pp_msg_symbol == fqsn.replace(f'.{broker}', '') + pp_msg_symbol == fqsn + or pp_msg_symbol == fqsn.removesuffix(f'.{broker}') ): + log.info(f'{fqsn} matched pp msg: {fmsg}') tracker = mode.trackers[msg['account']] tracker.live_pp.update_from_msg(msg) # update order pane widgets @@ -843,16 +839,25 @@ async def process_trades_and_update_ui( # resp to 'cancel' request or error condition # for action request elif resp in ( - 'broker_cancelled', 'broker_inactive', 'broker_errored', + ): + # delete level line from view + mode.on_cancel(oid) + broker_msg = msg['brokerd_msg'] + log.error( + f'Order {oid}->{resp} with:\n{pformat(broker_msg)}' + ) + + elif resp in ( + 'broker_cancelled', 'dark_cancelled' ): # delete level line from view mode.on_cancel(oid) broker_msg = msg['brokerd_msg'] - log.warning( - f'Order {oid} failed with:\n{pformat(broker_msg)}' + log.cancel( + f'Order {oid}->{resp} with:\n{pformat(broker_msg)}' ) elif resp in (