diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 7c75d735..74119e88 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -120,7 +120,7 @@ class Pair(BaseModel): class Trade(BaseModel): - """Order class that helps parse and validate order stream""" + """Trade class that helps parse and validate ownTrades stream""" reqid: str # kraken order transaction id action: str # buy or sell price: str # price of asset @@ -258,17 +258,37 @@ class Client: self, data: Dict[str, Any] = {} ) -> Dict[str, Any]: - resp = await self.kraken_endpoint('Balance', data) - balances = resp['result'] - ## TODO: grab all entries, not just first 50 - resp = await self.kraken_endpoint('TradesHistory', data) - traders = resp['result'] + data['ofs'] = 0 positions = {} vols = {} - + # Grab all trade history + while True: + resp = await self.kraken_endpoint('TradesHistory', data) + # grab the first 50 trades + if data['ofs'] == 0: + trades = resp['result']['trades'] + # load the next 50 trades using dict constructor + # for speed + elif data['ofs'] == 50: + trades = dict(trades, **resp['result']['trades']) + # catch the of the trades + elif resp['result']['trades'] == {}: + count = resp['result']['count'] + break + # update existing dict if num trades exceeds 100 + else: + trades.update(resp['result']['trades']) + # increment the offset counter + data['ofs'] += 50 + # To avoid exceeding API rate limit in case of a lot of trades + time.sleep(1) + + # make sure you grabbed all the trades + assert count == len(trades.values()) + # positions ## TODO: Make sure to add option to include fees in positions calc - for trade in traders['trades'].values(): + for trade in trades.values(): sign = -1 if trade['type'] == 'sell' else 1 try: positions[trade['pair']] += sign * float(trade['cost']) @@ -438,15 +458,10 @@ class Client: async def get_client() -> Client: client = Client() - ## TODO: maybe add conditional based on section section = get_config() client._name = section['key_descr'] client._api_key = section['api_key'] client._secret = section['secret'] - ## TODO: Add a client attribute to hold this info - #data = { - # # add non-nonce and non-ofs vars - #} # at startup, load all symbols locally for fast search await client.cache_symbols() @@ -628,7 +643,6 @@ async def trades_dialogue( @asynccontextmanager async def subscribe(ws: wsproto.WSConnection, token: str): - ## TODO: Fix docs and points to right urls # XXX: setup subs # https://docs.kraken.com/websockets/#message-subscribe # specific logic for this in kraken's shitty sync client: @@ -643,15 +657,6 @@ async def trades_dialogue( # locked. await ws.send_msg(trades_sub) - ## trade data (aka L1) - #l1_sub = make_sub( - # list(ws_pairs.values()), - # {'name': 'spread'} # 'depth': 10} - #) - - ## pull a first quote and deliver - #await ws.send_msg(l1_sub) - yield # unsub from all pairs on teardown @@ -677,11 +682,6 @@ async def trades_dialogue( msg = pack_position(acc_name, norm_sym, pos, vols[ticker]) all_positions.append(msg.dict()) - ## TODO: create a new ems message schema for open orders - open_orders = await client.kraken_endpoint('OpenOrders', {}) - print(open_orders) - #await tractor.breakpoint() - await ctx.started((all_positions, (acc_name,))) # Get websocket token for authenticated data stream @@ -844,10 +844,6 @@ async def process_trade_msgs( sequence_counter = 0 async for msg in stream_messages(ws): - # TODO: write your order event parser here! - # HINT: create a ``pydantic.BaseModel`` to parse and validate - # and then in the caller recast to our native ``BrokerdX`` msg types. - try: # check that we are on the ownTrades stream and that msgs are # arriving in sequence with kraken @@ -855,7 +851,6 @@ async def process_trade_msgs( assert msg[2]['sequence'] > sequence_counter sequence_counter += 1 raw_msgs = msg[0] - # TODO: get length and start list trade_msgs = [] # Check that we are only processing new trades