cleaned up code and added loop to grab all trades for position calcs

kraken_orders
Konstantine Tsafatinos 2022-03-01 15:30:21 -05:00
parent ee0be13af1
commit a3345dbba2
1 changed files with 28 additions and 33 deletions

View File

@ -120,7 +120,7 @@ class Pair(BaseModel):
class Trade(BaseModel): class Trade(BaseModel):
"""Order class that helps parse and validate order stream""" """Trade class that helps parse and validate ownTrades stream"""
reqid: str # kraken order transaction id reqid: str # kraken order transaction id
action: str # buy or sell action: str # buy or sell
price: str # price of asset price: str # price of asset
@ -258,17 +258,37 @@ class Client:
self, self,
data: Dict[str, Any] = {} data: Dict[str, Any] = {}
) -> Dict[str, Any]: ) -> Dict[str, Any]:
resp = await self.kraken_endpoint('Balance', data) data['ofs'] = 0
balances = resp['result']
## TODO: grab all entries, not just first 50
resp = await self.kraken_endpoint('TradesHistory', data)
traders = resp['result']
positions = {} positions = {}
vols = {} vols = {}
# Grab all trade history
while True:
resp = await self.kraken_endpoint('TradesHistory', data)
# grab the first 50 trades
if data['ofs'] == 0:
trades = resp['result']['trades']
# load the next 50 trades using dict constructor
# for speed
elif data['ofs'] == 50:
trades = dict(trades, **resp['result']['trades'])
# catch the of the trades
elif resp['result']['trades'] == {}:
count = resp['result']['count']
break
# update existing dict if num trades exceeds 100
else:
trades.update(resp['result']['trades'])
# increment the offset counter
data['ofs'] += 50
# To avoid exceeding API rate limit in case of a lot of trades
time.sleep(1)
# make sure you grabbed all the trades
assert count == len(trades.values())
# positions # positions
## TODO: Make sure to add option to include fees in positions calc ## TODO: Make sure to add option to include fees in positions calc
for trade in traders['trades'].values(): for trade in trades.values():
sign = -1 if trade['type'] == 'sell' else 1 sign = -1 if trade['type'] == 'sell' else 1
try: try:
positions[trade['pair']] += sign * float(trade['cost']) positions[trade['pair']] += sign * float(trade['cost'])
@ -438,15 +458,10 @@ class Client:
async def get_client() -> Client: async def get_client() -> Client:
client = Client() client = Client()
## TODO: maybe add conditional based on section
section = get_config() section = get_config()
client._name = section['key_descr'] client._name = section['key_descr']
client._api_key = section['api_key'] client._api_key = section['api_key']
client._secret = section['secret'] client._secret = section['secret']
## TODO: Add a client attribute to hold this info
#data = {
# # add non-nonce and non-ofs vars
#}
# at startup, load all symbols locally for fast search # at startup, load all symbols locally for fast search
await client.cache_symbols() await client.cache_symbols()
@ -628,7 +643,6 @@ async def trades_dialogue(
@asynccontextmanager @asynccontextmanager
async def subscribe(ws: wsproto.WSConnection, token: str): async def subscribe(ws: wsproto.WSConnection, token: str):
## TODO: Fix docs and points to right urls
# XXX: setup subs # XXX: setup subs
# https://docs.kraken.com/websockets/#message-subscribe # https://docs.kraken.com/websockets/#message-subscribe
# specific logic for this in kraken's shitty sync client: # specific logic for this in kraken's shitty sync client:
@ -643,15 +657,6 @@ async def trades_dialogue(
# locked. # locked.
await ws.send_msg(trades_sub) await ws.send_msg(trades_sub)
## trade data (aka L1)
#l1_sub = make_sub(
# list(ws_pairs.values()),
# {'name': 'spread'} # 'depth': 10}
#)
## pull a first quote and deliver
#await ws.send_msg(l1_sub)
yield yield
# unsub from all pairs on teardown # unsub from all pairs on teardown
@ -677,11 +682,6 @@ async def trades_dialogue(
msg = pack_position(acc_name, norm_sym, pos, vols[ticker]) msg = pack_position(acc_name, norm_sym, pos, vols[ticker])
all_positions.append(msg.dict()) all_positions.append(msg.dict())
## TODO: create a new ems message schema for open orders
open_orders = await client.kraken_endpoint('OpenOrders', {})
print(open_orders)
#await tractor.breakpoint()
await ctx.started((all_positions, (acc_name,))) await ctx.started((all_positions, (acc_name,)))
# Get websocket token for authenticated data stream # Get websocket token for authenticated data stream
@ -844,10 +844,6 @@ async def process_trade_msgs(
sequence_counter = 0 sequence_counter = 0
async for msg in stream_messages(ws): async for msg in stream_messages(ws):
# TODO: write your order event parser here!
# HINT: create a ``pydantic.BaseModel`` to parse and validate
# and then in the caller recast to our native ``BrokerdX`` msg types.
try: try:
# check that we are on the ownTrades stream and that msgs are # check that we are on the ownTrades stream and that msgs are
# arriving in sequence with kraken # arriving in sequence with kraken
@ -855,7 +851,6 @@ async def process_trade_msgs(
assert msg[2]['sequence'] > sequence_counter assert msg[2]['sequence'] > sequence_counter
sequence_counter += 1 sequence_counter += 1
raw_msgs = msg[0] raw_msgs = msg[0]
# TODO: get length and start list
trade_msgs = [] trade_msgs = []
# Check that we are only processing new trades # Check that we are only processing new trades