change logic order for handling no config case

kraken_orders
Konstantine Tsafatinos 2022-04-07 13:03:53 -04:00
parent 2baa1b4605
commit c2e654aae2
1 changed files with 78 additions and 65 deletions

View File

@ -45,6 +45,7 @@ from ._util import resproc, SymbolNotFound, BrokerError
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ShmArray from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws, NoBsWs from ..data._web_bs import open_autorecon_ws, NoBsWs
from ..clearing._paper_engine import PaperBoi
from ..clearing._messages import ( from ..clearing._messages import (
BrokerdPosition, BrokerdOrder, BrokerdStatus, BrokerdPosition, BrokerdOrder, BrokerdStatus,
BrokerdOrderAck, BrokerdError, BrokerdCancel, BrokerdOrderAck, BrokerdError, BrokerdCancel,
@ -691,71 +692,7 @@ async def trades_dialogue(
# Authenticated block # Authenticated block
async with get_client() as client: async with get_client() as client:
if client._api_key: if not client._api_key:
acc_name = 'kraken.' + client._name
trades = await client.get_trades()
position_msgs = pack_positions(acc_name, trades)
await ctx.started((position_msgs, (acc_name,)))
# Get websocket token for authenticated data stream
# Assert that a token was actually received
resp = await client.endpoint('GetWebSocketsToken', {})
assert resp['error'] == []
token = resp['result']['token']
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
):
## TODO: maybe add multiple accounts
n.start_soon(handle_order_requests, client, ems_stream)
# Process trades msg stream of ws
async with open_autorecon_ws(
'wss://ws-auth.kraken.com/',
fixture=subscribe,
token=token,
) as ws:
async for msg in process_trade_msgs(ws):
for trade in msg:
# check the type of packaged message
assert type(trade) == Trade
# prepare and send a status update for line update
trade_msg = BrokerdStatus(
reqid=trade.reqid,
time_ns=time.time_ns(),
account='kraken.spot',
status='executed',
filled=float(trade.size),
reason='Order filled by kraken',
# remaining='' # TODO: not sure what to do here.
broker_details={
'name': 'kraken',
'broker_time': trade.broker_time
}
)
await ems_stream.send(trade_msg.dict())
# send a fill msg for gui update
fill_msg = BrokerdFill(
reqid=trade.reqid,
time_ns=time.time_ns(),
action=trade.action,
size=float(trade.size),
price=float(trade.price),
# TODO: maybe capture more msg data i.e fees?
broker_details={'name': 'kraken'},
broker_time=float(trade.broker_time)
)
await ems_stream.send(fill_msg.dict())
else:
log.error('Missing Kraken API key: Trades WS connection failed') log.error('Missing Kraken API key: Trades WS connection failed')
await ctx.started(({}, {'paper',})) await ctx.started(({}, {'paper',}))
@ -763,9 +700,85 @@ async def trades_dialogue(
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
trio.open_nursery() as n, trio.open_nursery() as n,
): ):
client = PaperBoi(
'kraken',
ems_stream,
_buys={},
_sells={},
_reqids={},
# TODO: load paper positions from ``positions.toml``
_positions={},
)
## TODO: maybe add multiple accounts ## TODO: maybe add multiple accounts
n.start_soon(handle_order_requests, client, ems_stream) n.start_soon(handle_order_requests, client, ems_stream)
acc_name = 'kraken.' + client._name
trades = await client.get_trades()
position_msgs = pack_positions(acc_name, trades)
await ctx.started((position_msgs, (acc_name,)))
# Get websocket token for authenticated data stream
# Assert that a token was actually received
resp = await client.endpoint('GetWebSocketsToken', {})
assert resp['error'] == []
token = resp['result']['token']
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
):
## TODO: maybe add multiple accounts
n.start_soon(handle_order_requests, client, ems_stream)
# Process trades msg stream of ws
async with open_autorecon_ws(
'wss://ws-auth.kraken.com/',
fixture=subscribe,
token=token,
) as ws:
async for msg in process_trade_msgs(ws):
for trade in msg:
# check the type of packaged message
assert type(trade) == Trade
# prepare and send a status update for line update
trade_msg = BrokerdStatus(
reqid=trade.reqid,
time_ns=time.time_ns(),
account='kraken.spot',
status='executed',
filled=float(trade.size),
reason='Order filled by kraken',
# remaining='' # TODO: not sure what to do here.
broker_details={
'name': 'kraken',
'broker_time': trade.broker_time
}
)
await ems_stream.send(trade_msg.dict())
# send a fill msg for gui update
fill_msg = BrokerdFill(
reqid=trade.reqid,
time_ns=time.time_ns(),
action=trade.action,
size=float(trade.size),
price=float(trade.price),
# TODO: maybe capture more msg data i.e fees?
broker_details={'name': 'kraken'},
broker_time=float(trade.broker_time)
)
await ems_stream.send(fill_msg.dict())
async def stream_messages( async def stream_messages(
ws: NoBsWs, ws: NoBsWs,