added the bones for the handle_order_requests func

kraken_orders
Konstantine Tsafatinos 2022-01-26 18:39:28 -05:00
parent 3d2be3674e
commit 1fe1f88806
1 changed files with 76 additions and 4 deletions

View File

@ -40,7 +40,10 @@ from ._util import resproc, SymbolNotFound, BrokerError
from ..log import get_logger, get_console_log
from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws
from ..clearing._messages import BrokerdPosition, BrokerdOrder, BrokerdStatus
from ..clearing._messages import (
BrokerdPosition, BrokerdOrder, BrokerdStatus,
BrokerdOrderAck, BrokerdError, BrokerdCancel, BrokerdFill
)
import urllib.parse
import hashlib
@ -433,6 +436,74 @@ def normalize_symbol(
return ticker.lower()
async def handle_order_requests(
client: #kraken,
ems_order_stream: tractor.MsgStream,
) -> None:
# order_request: dict
async for request_msg in ems_order_stream:
log.info(f'Received order request {request_msg}')
action = request_msg['action']
if action in {'buy', 'sell'}:
account = request_msg['account']
if account != 'kraken.spot':
log.error(
'This is a kraken account, \
only a `kraken.spot` selection is valid'
)
await ems_order_stream.send(BrokerError(
oid=request_msg['oid']
symbol=request_msg['symbol']
reason=f'Kraken only, No account found: `{account}` ?',
).dict())
continue
# validate
order = BrokerdOrder(**request_msg)
# call our client api to submit the order
## TODO: look into the submit_limit method, do it write my own?
reqid = await client.submit_limit(
oid=order.oid,
symbol=order.symbol,
price=order.price,
action=order.action,
size=order.size,
## XXX: how do I handle new orders
reqid=order.reqid,
)
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
# ems order request id
oid=order.oid,
# broker specific request id
reqid=reqid,
).dict()
)
elif action == 'cancel':
msg = BrokerdCancel(**request_msg)
await client.submit_cancel(
reqid=msg.reqid
)
else:
log.error(f'Unknown order command: {request_msg}')
@tractor.context
async def trades_dialogue(
ctx: tractor.Context,
@ -462,9 +533,10 @@ async def trades_dialogue(
await trio.sleep_forever()
# async with (
# ctx.open_stream() as ems_stream,
#
# async with (
# ctx.open_stream() as ems_stream,
# trio.open_nursery as n,
# ):
async def stream_messages(ws):