Drop invalid status msg, linting cleanups

kraken_cleaning
Tyler Goodlet 2022-04-09 16:56:05 -04:00
parent c034ea742f
commit 88306a6c1e
1 changed files with 59 additions and 67 deletions

View File

@ -124,11 +124,11 @@ class Trade(BaseModel):
Trade class that helps parse and validate ownTrades stream Trade class that helps parse and validate ownTrades stream
''' '''
reqid: str # kraken order transaction id reqid: str # kraken order transaction id
action: str # buy or sell action: str # buy or sell
price: str # price of asset price: str # price of asset
size: str # vol of asset size: str # vol of asset
broker_time: str # e.g GTC, GTD broker_time: str # e.g GTC, GTD
@dataclass @dataclass
@ -242,7 +242,7 @@ class Client:
uri_path: str uri_path: str
) -> Dict[str, Any]: ) -> Dict[str, Any]:
headers = { headers = {
'Content-Type': 'Content-Type':
'application/x-www-form-urlencoded', 'application/x-www-form-urlencoded',
'API-Key': 'API-Key':
self._api_key, self._api_key,
@ -288,7 +288,7 @@ class Client:
count = resp['result']['count'] count = resp['result']['count']
break break
# update existing dict if num trades exceeds 100 # update existing dict if num trades exceeds 100
else: else:
trades.update(resp['result']['trades']) trades.update(resp['result']['trades'])
# increment the offset counter # increment the offset counter
data['ofs'] += 50 data['ofs'] += 50
@ -446,12 +446,11 @@ class Client:
raise SymbolNotFound(json['error'][0] + f': {symbol}') raise SymbolNotFound(json['error'][0] + f': {symbol}')
@asynccontextmanager @asynccontextmanager
async def get_client() -> Client: async def get_client() -> Client:
section = get_config() section = get_config()
if section: if section:
client = Client( client = Client(
name=section['key_descr'], name=section['key_descr'],
api_key=section['api_key'], api_key=section['api_key'],
@ -565,7 +564,7 @@ async def handle_order_requests(
# validate # validate
temp_id = next(userref_counter) temp_id = next(userref_counter)
order = BrokerdOrder(**request_msg) order = BrokerdOrder(**request_msg)
# call our client api to submit the order # call our client api to submit the order
resp = await client.submit_limit( resp = await client.submit_limit(
oid=order.oid, oid=order.oid,
@ -578,7 +577,9 @@ async def handle_order_requests(
err = resp['error'] err = resp['error']
if err: if err:
log.error(f'Failed to submit order') oid = order.oid
log.error(f'Failed to submit order: {oid}')
await ems_order_stream.send( await ems_order_stream.send(
BrokerdError( BrokerdError(
oid=order.oid, oid=order.oid,
@ -616,42 +617,41 @@ async def handle_order_requests(
reqid=msg.reqid reqid=msg.reqid
) )
try: # Check to make sure there was no error returned by
# Check to make sure there was no error returned by # the kraken endpoint. Assert one order was cancelled
# the kraken endpoint. Assert one order was cancelled assert resp['error'] == []
assert resp['error'] == [] assert resp['result']['count'] == 1
assert resp['result']['count'] == 1
# Check to make sure the cancellation is NOT pending,
# then send the confirmation to the ems order stream
pending = resp['result'].get('pending')
if pending:
oid = order.oid
log.error(f'Order {oid} cancel was not successful')
# TODO: Change this code using .get
try:
pending = resp['result']['pending']
# Check to make sure the cancellation is NOT pending,
# then send the confirmation to the ems order stream
except KeyError:
await ems_order_stream.send(
BrokerdStatus(
reqid=msg.reqid,
account=msg.account,
time_ns=time.time_ns(),
status='cancelled',
reason='Order cancelled',
broker_details={'name': 'kraken'}
).dict()
)
except AssertionError:
log.error(f'Order cancel was not successful')
await ems_order_stream.send( await ems_order_stream.send(
BrokerdError( BrokerdError(
oid=order.oid, oid=oid,
reqid=temp_id, reqid=temp_id,
symbol=order.symbol, symbol=order.symbol,
reason="Failed order cancel", reason="Failed order cancel",
broker_details=resp broker_details=resp
).dict() ).dict()
) )
else:
else: await ems_order_stream.send(
log.error(f'Unknown order command: {request_msg}') BrokerdStatus(
reqid=msg.reqid,
account=msg.account,
time_ns=time.time_ns(),
status='cancelled',
reason='Order cancelled',
broker_details={'name': 'kraken'}
).dict()
)
else:
log.error(f'Unknown order command: {request_msg}')
@tractor.context @tractor.context
@ -694,7 +694,7 @@ async def trades_dialogue(
async with get_client() as client: async with get_client() as client:
if not client._api_key: if not client._api_key:
log.error('Missing Kraken API key: Trades WS connection failed') log.error('Missing Kraken API key: Trades WS connection failed')
await ctx.started(({}, {'paper',})) await ctx.started(({}, ['paper']))
async with ( async with (
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
@ -713,7 +713,7 @@ async def trades_dialogue(
_positions={}, _positions={},
) )
## TODO: maybe add multiple accounts # TODO: maybe add multiple accounts
n.start_soon(handle_order_requests, client, ems_stream) n.start_soon(handle_order_requests, client, ems_stream)
acc_name = 'kraken.' + client._name acc_name = 'kraken.' + client._name
@ -724,7 +724,7 @@ async def trades_dialogue(
await ctx.started((position_msgs, (acc_name,))) await ctx.started((position_msgs, (acc_name,)))
# Get websocket token for authenticated data stream # Get websocket token for authenticated data stream
# Assert that a token was actually received # Assert that a token was actually received.
resp = await client.endpoint('GetWebSocketsToken', {}) resp = await client.endpoint('GetWebSocketsToken', {})
assert resp['error'] == [] assert resp['error'] == []
token = resp['result']['token'] token = resp['result']['token']
@ -733,7 +733,7 @@ async def trades_dialogue(
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
trio.open_nursery() as n, trio.open_nursery() as n,
): ):
## TODO: maybe add multiple accounts # TODO: maybe add multiple accounts
n.start_soon(handle_order_requests, client, ems_stream) n.start_soon(handle_order_requests, client, ems_stream)
# Process trades msg stream of ws # Process trades msg stream of ws
@ -746,24 +746,8 @@ async def trades_dialogue(
for trade in msg: for trade in msg:
# check the type of packaged message # check the type of packaged message
assert type(trade) == Trade assert type(trade) == Trade
# prepare and send a status update for line update
trade_msg = BrokerdStatus(
reqid=trade.reqid,
time_ns=time.time_ns(),
account='kraken.spot',
status='executed',
filled=float(trade.size),
reason='Order filled by kraken',
# remaining='' # TODO: not sure what to do here.
broker_details={
'name': 'kraken',
'broker_time': trade.broker_time
}
)
await ems_stream.send(trade_msg.dict())
# prepare and send a filled status update
filled_msg = BrokerdStatus( filled_msg = BrokerdStatus(
reqid=trade.reqid, reqid=trade.reqid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
@ -772,13 +756,21 @@ async def trades_dialogue(
status='filled', status='filled',
filled=float(trade.size), filled=float(trade.size),
reason='Order filled by kraken', reason='Order filled by kraken',
# remaining='' # TODO: not sure what to do here.
broker_details={ broker_details={
'name': 'kraken', 'name': 'kraken',
'broker_time': trade.broker_time 'broker_time': trade.broker_time
} },
# TODO: figure out if kraken gives a count
# of how many units of underlying were
# filled. Alternatively we can decrement
# this value ourselves by associating and
# calcing from the diff with the original
# client-side request, see:
# https://github.com/pikers/piker/issues/296
remaining=0,
) )
await ems_stream.send(filled_msg.dict()) await ems_stream.send(filled_msg.dict())
# send a fill msg for gui update # send a fill msg for gui update
@ -793,7 +785,7 @@ async def trades_dialogue(
broker_details={'name': 'kraken'}, broker_details={'name': 'kraken'},
broker_time=float(trade.broker_time) broker_time=float(trade.broker_time)
) )
await ems_stream.send(fill_msg.dict()) await ems_stream.send(fill_msg.dict())
@ -899,10 +891,10 @@ async def process_trade_msgs(
async for msg in stream_messages(ws): async for msg in stream_messages(ws):
try: try:
# check that we are on the ownTrades stream and that msgs are # check that we are on the ownTrades stream and that msgs
# arriving in sequence with kraken # are arriving in sequence with kraken For clarification the
# For clarification the kraken ws api docs for this stream: # kraken ws api docs for this stream:
# https://docs.kraken.com/websockets/#message-ownTrades # https://docs.kraken.com/websockets/#message-ownTrades
assert msg[1] == 'ownTrades' assert msg[1] == 'ownTrades'
assert msg[2]['sequence'] > sequence_counter assert msg[2]['sequence'] > sequence_counter
sequence_counter += 1 sequence_counter += 1