Merge pull request #298 from pikers/kraken_cleaning

Kraken cleaning, disable order support due to #299!
kraken_editorder
goodboy 2022-04-10 17:28:20 -04:00 committed by GitHub
commit 039d06cc48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 99 additions and 69 deletions

View File

@ -33,7 +33,7 @@ import tractor
from pydantic.dataclasses import dataclass from pydantic.dataclasses import dataclass
from pydantic import BaseModel from pydantic import BaseModel
import wsproto import wsproto
from itertools import count import itertools
import urllib.parse import urllib.parse
import hashlib import hashlib
import hmac import hmac
@ -124,11 +124,11 @@ class Trade(BaseModel):
Trade class that helps parse and validate ownTrades stream Trade class that helps parse and validate ownTrades stream
''' '''
reqid: str # kraken order transaction id reqid: str # kraken order transaction id
action: str # buy or sell action: str # buy or sell
price: str # price of asset price: str # price of asset
size: str # vol of asset size: str # vol of asset
broker_time: str # e.g GTC, GTD broker_time: str # e.g GTC, GTD
@dataclass @dataclass
@ -159,9 +159,15 @@ class OHLC:
def get_config() -> dict[str, Any]: def get_config() -> dict[str, Any]:
conf, path = config.load() conf, path = config.load()
section = conf.get('kraken') section = conf.get('kraken')
if section:
log.warning(
'Kraken order mode is currently disabled due to bug!\n'
'See https://github.com/pikers/piker/issues/299'
)
return {}
if section is None: if section is None:
log.warning(f'No config section found for kraken in {path}') log.warning(f'No config section found for kraken in {path}')
return {} return {}
@ -242,7 +248,7 @@ class Client:
uri_path: str uri_path: str
) -> Dict[str, Any]: ) -> Dict[str, Any]:
headers = { headers = {
'Content-Type': 'Content-Type':
'application/x-www-form-urlencoded', 'application/x-www-form-urlencoded',
'API-Key': 'API-Key':
self._api_key, self._api_key,
@ -288,7 +294,7 @@ class Client:
count = resp['result']['count'] count = resp['result']['count']
break break
# update existing dict if num trades exceeds 100 # update existing dict if num trades exceeds 100
else: else:
trades.update(resp['result']['trades']) trades.update(resp['result']['trades'])
# increment the offset counter # increment the offset counter
data['ofs'] += 50 data['ofs'] += 50
@ -446,12 +452,11 @@ class Client:
raise SymbolNotFound(json['error'][0] + f': {symbol}') raise SymbolNotFound(json['error'][0] + f': {symbol}')
@asynccontextmanager @asynccontextmanager
async def get_client() -> Client: async def get_client() -> Client:
section = get_config() section = get_config()
if section: if section:
client = Client( client = Client(
name=section['key_descr'], name=section['key_descr'],
api_key=section['api_key'], api_key=section['api_key'],
@ -541,7 +546,8 @@ async def handle_order_requests(
request_msg: dict request_msg: dict
order: BrokerdOrder order: BrokerdOrder
userref_counter = count() userref_counter = itertools.count()
async for request_msg in ems_order_stream: async for request_msg in ems_order_stream:
log.info(f'Received order request {request_msg}') log.info(f'Received order request {request_msg}')
@ -558,14 +564,20 @@ async def handle_order_requests(
await ems_order_stream.send(BrokerdError( await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'Kraken only, No account found: `{account}` ?',
# reason=f'Kraken only, No account found: `{account}` ?',
reason=(
'Kraken only, order mode disabled due to '
'https://github.com/pikers/piker/issues/299'
),
).dict()) ).dict())
continue continue
# validate # validate
temp_id = next(userref_counter) temp_id = next(userref_counter)
order = BrokerdOrder(**request_msg) order = BrokerdOrder(**request_msg)
# call our client api to submit the order # call our client api to submit the order
resp = await client.submit_limit( resp = await client.submit_limit(
oid=order.oid, oid=order.oid,
@ -578,7 +590,9 @@ async def handle_order_requests(
err = resp['error'] err = resp['error']
if err: if err:
log.error(f'Failed to submit order') oid = order.oid
log.error(f'Failed to submit order: {oid}')
await ems_order_stream.send( await ems_order_stream.send(
BrokerdError( BrokerdError(
oid=order.oid, oid=order.oid,
@ -616,30 +630,16 @@ async def handle_order_requests(
reqid=msg.reqid reqid=msg.reqid
) )
# Check to make sure there was no error returned by
# the kraken endpoint. Assert one order was cancelled.
try: try:
# Check to make sure there was no error returned by result = resp['result']
# the kraken endpoint. Assert one order was cancelled count = result['count']
assert resp['error'] == []
assert resp['result']['count'] == 1 # check for 'error' key if we received no 'result'
except KeyError:
error = resp.get('error')
# TODO: Change this code using .get
try:
pending = resp['result']['pending']
# Check to make sure the cancellation is NOT pending,
# then send the confirmation to the ems order stream
except KeyError:
await ems_order_stream.send(
BrokerdStatus(
reqid=msg.reqid,
account=msg.account,
time_ns=time.time_ns(),
status='cancelled',
reason='Order cancelled',
broker_details={'name': 'kraken'}
).dict()
)
except AssertionError:
log.error(f'Order cancel was not successful')
await ems_order_stream.send( await ems_order_stream.send(
BrokerdError( BrokerdError(
oid=order.oid, oid=order.oid,
@ -650,8 +650,46 @@ async def handle_order_requests(
).dict() ).dict()
) )
else: if not error:
log.error(f'Unknown order command: {request_msg}') raise BrokerError(f'Unknown order cancel response: {resp}')
else:
if not count: # no orders were cancelled?
# XXX: what exactly is this from and why would we care?
# there doesn't seem to be any docs here?
# https://docs.kraken.com/rest/#operation/cancelOrder
# Check to make sure the cancellation is NOT pending,
# then send the confirmation to the ems order stream
pending = result.get('pending')
if pending:
log.error(f'Order {oid} cancel was not yet successful')
await ems_order_stream.send(
BrokerdError(
oid=order.oid,
reqid=temp_id,
symbol=order.symbol,
reason="Order cancel is still pending?",
broker_details=resp
).dict()
)
else: # order cancel success case.
await ems_order_stream.send(
BrokerdStatus(
reqid=msg.reqid,
account=msg.account,
time_ns=time.time_ns(),
status='cancelled',
reason='Order cancelled',
broker_details={'name': 'kraken'}
).dict()
)
else:
log.error(f'Unknown order command: {request_msg}')
@tractor.context @tractor.context
@ -694,7 +732,7 @@ async def trades_dialogue(
async with get_client() as client: async with get_client() as client:
if not client._api_key: if not client._api_key:
log.error('Missing Kraken API key: Trades WS connection failed') log.error('Missing Kraken API key: Trades WS connection failed')
await ctx.started(({}, {'paper',})) await ctx.started(({}, ['paper']))
async with ( async with (
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
@ -713,7 +751,7 @@ async def trades_dialogue(
_positions={}, _positions={},
) )
## TODO: maybe add multiple accounts # TODO: maybe add multiple accounts
n.start_soon(handle_order_requests, client, ems_stream) n.start_soon(handle_order_requests, client, ems_stream)
acc_name = 'kraken.' + client._name acc_name = 'kraken.' + client._name
@ -724,7 +762,7 @@ async def trades_dialogue(
await ctx.started((position_msgs, (acc_name,))) await ctx.started((position_msgs, (acc_name,)))
# Get websocket token for authenticated data stream # Get websocket token for authenticated data stream
# Assert that a token was actually received # Assert that a token was actually received.
resp = await client.endpoint('GetWebSocketsToken', {}) resp = await client.endpoint('GetWebSocketsToken', {})
assert resp['error'] == [] assert resp['error'] == []
token = resp['result']['token'] token = resp['result']['token']
@ -733,7 +771,7 @@ async def trades_dialogue(
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
trio.open_nursery() as n, trio.open_nursery() as n,
): ):
## TODO: maybe add multiple accounts # TODO: maybe add multiple accounts
n.start_soon(handle_order_requests, client, ems_stream) n.start_soon(handle_order_requests, client, ems_stream)
# Process trades msg stream of ws # Process trades msg stream of ws
@ -746,24 +784,8 @@ async def trades_dialogue(
for trade in msg: for trade in msg:
# check the type of packaged message # check the type of packaged message
assert type(trade) == Trade assert type(trade) == Trade
# prepare and send a status update for line update
trade_msg = BrokerdStatus(
reqid=trade.reqid,
time_ns=time.time_ns(),
account='kraken.spot',
status='executed',
filled=float(trade.size),
reason='Order filled by kraken',
# remaining='' # TODO: not sure what to do here.
broker_details={
'name': 'kraken',
'broker_time': trade.broker_time
}
)
await ems_stream.send(trade_msg.dict())
# prepare and send a filled status update
filled_msg = BrokerdStatus( filled_msg = BrokerdStatus(
reqid=trade.reqid, reqid=trade.reqid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
@ -772,13 +794,21 @@ async def trades_dialogue(
status='filled', status='filled',
filled=float(trade.size), filled=float(trade.size),
reason='Order filled by kraken', reason='Order filled by kraken',
# remaining='' # TODO: not sure what to do here.
broker_details={ broker_details={
'name': 'kraken', 'name': 'kraken',
'broker_time': trade.broker_time 'broker_time': trade.broker_time
} },
# TODO: figure out if kraken gives a count
# of how many units of underlying were
# filled. Alternatively we can decrement
# this value ourselves by associating and
# calcing from the diff with the original
# client-side request, see:
# https://github.com/pikers/piker/issues/296
remaining=0,
) )
await ems_stream.send(filled_msg.dict()) await ems_stream.send(filled_msg.dict())
# send a fill msg for gui update # send a fill msg for gui update
@ -793,7 +823,7 @@ async def trades_dialogue(
broker_details={'name': 'kraken'}, broker_details={'name': 'kraken'},
broker_time=float(trade.broker_time) broker_time=float(trade.broker_time)
) )
await ems_stream.send(fill_msg.dict()) await ems_stream.send(fill_msg.dict())
@ -899,10 +929,10 @@ async def process_trade_msgs(
async for msg in stream_messages(ws): async for msg in stream_messages(ws):
try: try:
# check that we are on the ownTrades stream and that msgs are # check that we are on the ownTrades stream and that msgs
# arriving in sequence with kraken # are arriving in sequence with kraken For clarification the
# For clarification the kraken ws api docs for this stream: # kraken ws api docs for this stream:
# https://docs.kraken.com/websockets/#message-ownTrades # https://docs.kraken.com/websockets/#message-ownTrades
assert msg[1] == 'ownTrades' assert msg[1] == 'ownTrades'
assert msg[2]['sequence'] > sequence_counter assert msg[2]['sequence'] > sequence_counter
sequence_counter += 1 sequence_counter += 1

View File

@ -184,7 +184,7 @@ class BrokerdStatus(BaseModel):
# { # {
# 'submitted', # 'submitted',
# 'cancelled', # 'cancelled',
# 'executed', # 'filled',
# } # }
status: str status: str