get basic order request loop receiving msgs

kraken_orders
Konstantine Tsafatinos 2022-02-07 19:18:00 -05:00
parent 1fe1f88806
commit b55debbe95
1 changed files with 53 additions and 51 deletions

View File

@ -438,7 +438,7 @@ def normalize_symbol(
async def handle_order_requests( async def handle_order_requests(
client: #kraken, client: 'test',#kraken,
ems_order_stream: tractor.MsgStream, ems_order_stream: tractor.MsgStream,
) -> None: ) -> None:
@ -447,61 +447,61 @@ async def handle_order_requests(
async for request_msg in ems_order_stream: async for request_msg in ems_order_stream:
log.info(f'Received order request {request_msg}') log.info(f'Received order request {request_msg}')
action = request_msg['action'] # action = request_msg['action']
if action in {'buy', 'sell'}: # if action in {'buy', 'sell'}:
account = request_msg['account'] # account = request_msg['account']
if account != 'kraken.spot': # if account != 'kraken.spot':
log.error( # log.error(
'This is a kraken account, \ # 'This is a kraken account, \
only a `kraken.spot` selection is valid' # only a `kraken.spot` selection is valid'
) # )
await ems_order_stream.send(BrokerError( # await ems_order_stream.send(BrokerError(
oid=request_msg['oid'] # oid=request_msg['oid'],
symbol=request_msg['symbol'] # symbol=request_msg['symbol'],
reason=f'Kraken only, No account found: `{account}` ?', # reason=f'Kraken only, No account found: `{account}` ?',
).dict()) # ).dict())
continue # continue
# validate # # validate
order = BrokerdOrder(**request_msg) # order = BrokerdOrder(**request_msg)
# call our client api to submit the order # # call our client api to submit the order
## TODO: look into the submit_limit method, do it write my own? # ## TODO: look into the submit_limit method, do it write my own?
reqid = await client.submit_limit( # reqid = await client.submit_limit(
oid=order.oid, # oid=order.oid,
symbol=order.symbol, # symbol=order.symbol,
price=order.price, # price=order.price,
action=order.action, # action=order.action,
size=order.size, # size=order.size,
## XXX: how do I handle new orders # ## XXX: how do I handle new orders
reqid=order.reqid, # reqid=order.reqid,
) # )
# deliver ack that order has been submitted to broker routing # # deliver ack that order has been submitted to broker routing
await ems_order_stream.send( # await ems_order_stream.send(
BrokerdOrderAck( # BrokerdOrderAck(
# ems order request id # # ems order request id
oid=order.oid, # oid=order.oid,
# broker specific request id # # broker specific request id
reqid=reqid, # reqid=reqid,
).dict() # ).dict()
) # )
elif action == 'cancel': # elif action == 'cancel':
msg = BrokerdCancel(**request_msg) # msg = BrokerdCancel(**request_msg)
await client.submit_cancel( # await client.submit_cancel(
reqid=msg.reqid # reqid=msg.reqid
) # )
else: # else:
log.error(f'Unknown order command: {request_msg}') # log.error(f'Unknown order command: {request_msg}')
@tractor.context @tractor.context
@ -527,17 +527,19 @@ async def trades_dialogue(
all_positions.append(msg.dict()) all_positions.append(msg.dict())
open_orders = await client.get_user_data('OpenOrders', {}) open_orders = await client.get_user_data('OpenOrders', {})
await tractor.breakpoint() #await tractor.breakpoint()
await ctx.started((all_positions, (acc_name,))) await ctx.started((all_positions, (acc_name,)))
await trio.sleep_forever() #await trio.sleep_forever()
# async with (
# ctx.open_stream() as ems_stream,
# trio.open_nursery as n,
# ):
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
):
## TODO: maybe add multiple accounts
n.start_soon(handle_order_requests, client, ems_stream)
async def stream_messages(ws): async def stream_messages(ws):