Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet 72072b5737 Support `0` value `reqid`s 🤦 2022-07-08 23:16:29 -04:00
Tyler Goodlet 381b3121d6 Cancel any live orders found on connect
More or less just to avoid orders the user wasn't aware of from
persisting until we get "open order relaying" through the ems working.

Some further fixes which required a new `reqids2txids` map which keeps
track of which `kraken` "txid" is mapped to our `reqid: int`; mainly
this was needed for cancel requests which require knowing the underlying
`txid`s (since apparently kraken doesn't keep track of the "reqid"  we
pass it). Pass the ws instance into `handle_order_updates()` to enable
the cancelling orders on startup. Don't key error on unknown `reqid`
values (for eg. when receiving historical trade events on startup).
Handle cancel requests first in the ems side loop.
2022-07-08 23:10:25 -04:00
Tyler Goodlet 8984c1b60b Use `aclosing()` around ws async gen 2022-07-08 19:14:45 -04:00
Tyler Goodlet 2e3cac1407 Add some todo-reminders for ``msgspec`` stuff 2022-07-08 19:14:45 -04:00
Tyler Goodlet b250a48b8d Drop remaining `BaseModel` api usage from rest of codebase 2022-07-08 19:14:43 -04:00
Tyler Goodlet 3dfe7ef8dd Add `Struct.copy()` which does a rountrip validate 2022-07-08 19:14:11 -04:00
Tyler Goodlet 2774a7e6ec Change all clearing msgs over to `msgspec` 2022-07-08 19:14:11 -04:00
Tyler Goodlet 7b6318f025 Cast slots to `int` before range set 2022-07-08 19:14:11 -04:00
Tyler Goodlet b0ee764423 Drop pydantic from allocator 2022-07-08 19:14:11 -04:00
Tyler Goodlet a63f0ee1c0 Add a custom `msgspec.Struct` with some humanizing 2022-07-08 19:14:11 -04:00
10 changed files with 236 additions and 139 deletions

View File

@ -148,7 +148,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'No account found: `{account}` ?', reason=f'No account found: `{account}` ?',
).dict()) ))
continue continue
client = _accounts2clients.get(account) client = _accounts2clients.get(account)
@ -161,7 +161,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'No api client loaded for account: `{account}` ?', reason=f'No api client loaded for account: `{account}` ?',
).dict()) ))
continue continue
if action in {'buy', 'sell'}: if action in {'buy', 'sell'}:
@ -188,7 +188,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason='Order already active?', reason='Order already active?',
).dict()) ))
# deliver ack that order has been submitted to broker routing # deliver ack that order has been submitted to broker routing
await ems_order_stream.send( await ems_order_stream.send(
@ -197,9 +197,8 @@ async def handle_order_requests(
oid=order.oid, oid=order.oid,
# broker specific request id # broker specific request id
reqid=reqid, reqid=reqid,
time_ns=time.time_ns(),
account=account, account=account,
).dict() )
) )
elif action == 'cancel': elif action == 'cancel':
@ -559,7 +558,7 @@ async def trades_dialogue(
cids2pps, cids2pps,
validate=True, validate=True,
) )
all_positions.extend(msg.dict() for msg in msgs) all_positions.extend(msg for msg in msgs)
if not all_positions and cids2pps: if not all_positions and cids2pps:
raise RuntimeError( raise RuntimeError(
@ -665,7 +664,7 @@ async def emit_pp_update(
msg = msgs[0] msg = msgs[0]
break break
await ems_stream.send(msg.dict()) await ems_stream.send(msg)
async def deliver_trade_events( async def deliver_trade_events(
@ -743,7 +742,7 @@ async def deliver_trade_events(
broker_details={'name': 'ib'}, broker_details={'name': 'ib'},
) )
await ems_stream.send(msg.dict()) await ems_stream.send(msg)
case 'fill': case 'fill':
@ -803,7 +802,7 @@ async def deliver_trade_events(
broker_time=trade_entry['broker_time'], broker_time=trade_entry['broker_time'],
) )
await ems_stream.send(msg.dict()) await ems_stream.send(msg)
# 2 cases: # 2 cases:
# - fill comes first or # - fill comes first or
@ -879,7 +878,7 @@ async def deliver_trade_events(
cid, msg = pack_position(item) cid, msg = pack_position(item)
# acctid = msg.account = accounts_def.inverse[msg.account] # acctid = msg.account = accounts_def.inverse[msg.account]
# cuck ib and it's shitty fifo sys for pps! # cuck ib and it's shitty fifo sys for pps!
# await ems_stream.send(msg.dict()) # await ems_stream.send(msg)
case 'event': case 'event':
@ -891,7 +890,7 @@ async def deliver_trade_events(
# level... # level...
# reqid = item.get('reqid', 0) # reqid = item.get('reqid', 0)
# if getattr(msg, 'reqid', 0) < -1: # if getattr(msg, 'reqid', 0) < -1:
# log.info(f"TWS triggered trade\n{pformat(msg.dict())}") # log.info(f"TWS triggered trade\n{pformat(msg)}")
# msg.reqid = 'tws-' + str(-1 * reqid) # msg.reqid = 'tws-' + str(-1 * reqid)

View File

@ -32,6 +32,7 @@ from typing import (
Union, Union,
) )
from async_generator import aclosing
from bidict import bidict from bidict import bidict
import pendulum import pendulum
import trio import trio
@ -81,6 +82,7 @@ async def handle_order_requests(
token: str, token: str,
emsflow: dict[str, list[MsgUnion]], emsflow: dict[str, list[MsgUnion]],
ids: bidict[str, int], ids: bidict[str, int],
reqids2txids: dict[int, str],
) -> None: ) -> None:
''' '''
@ -96,6 +98,23 @@ async def handle_order_requests(
async for msg in ems_order_stream: async for msg in ems_order_stream:
log.info(f'Rx order msg:\n{pformat(msg)}') log.info(f'Rx order msg:\n{pformat(msg)}')
match msg: match msg:
case {
'action': 'cancel',
}:
cancel = BrokerdCancel(**msg)
last = emsflow[cancel.oid]
reqid = ids[cancel.oid]
txid = reqids2txids[reqid]
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [txid], # should be txid from submission
})
case { case {
'account': 'kraken.spot' as account, 'account': 'kraken.spot' as account,
'action': action, 'action': action,
@ -108,10 +127,9 @@ async def handle_order_requests(
if order.oid in ids: if order.oid in ids:
ep = 'editOrder' ep = 'editOrder'
reqid = ids[order.oid] # integer not txid reqid = ids[order.oid] # integer not txid
last = emsflow[order.oid][-1] txid = reqids2txids[reqid]
assert last.reqid == order.reqid
extra = { extra = {
'orderid': last.reqid, # txid 'orderid': txid, # txid
} }
else: else:
@ -158,23 +176,6 @@ async def handle_order_requests(
# placehold for sanity checking in relay loop # placehold for sanity checking in relay loop
emsflow.setdefault(order.oid, []).append(order) emsflow.setdefault(order.oid, []).append(order)
case {
'account': 'kraken.spot' as account,
'action': 'cancel',
}:
cancel = BrokerdCancel(**msg)
assert cancel.oid in emsflow
reqid = ids[cancel.oid]
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [cancel.reqid], # should be txid from submission
})
case _: case _:
account = msg.get('account') account = msg.get('account')
if account != 'kraken.spot': if account != 'kraken.spot':
@ -191,7 +192,7 @@ async def handle_order_requests(
'Invalid request msg:\n{msg}' 'Invalid request msg:\n{msg}'
), ),
).dict() )
) )
@ -289,7 +290,7 @@ async def trades_dialogue(
avg_price=p.be_price, avg_price=p.be_price,
currency='', currency='',
) )
position_msgs.append(msg.dict()) position_msgs.append(msg)
await ctx.started( await ctx.started(
(position_msgs, [acc_name]) (position_msgs, [acc_name])
@ -316,6 +317,7 @@ async def trades_dialogue(
), ),
) as ws, ) as ws,
trio.open_nursery() as n, trio.open_nursery() as n,
aclosing(stream_messages(ws)) as stream,
): ):
# task local msg dialog tracking # task local msg dialog tracking
emsflow: dict[ emsflow: dict[
@ -325,6 +327,7 @@ async def trades_dialogue(
# 2way map for ems ids to kraken int reqids.. # 2way map for ems ids to kraken int reqids..
ids: bidict[str, int] = bidict() ids: bidict[str, int] = bidict()
reqids2txids: dict[int, str] = {}
# task for processing inbound requests from ems # task for processing inbound requests from ems
n.start_soon( n.start_soon(
@ -335,14 +338,17 @@ async def trades_dialogue(
token, token,
emsflow, emsflow,
ids, ids,
reqids2txids,
) )
# enter relay loop # enter relay loop
await handle_order_updates( await handle_order_updates(
ws, ws,
stream,
ems_stream, ems_stream,
emsflow, emsflow,
ids, ids,
reqids2txids,
trans, trans,
acctid, acctid,
acc_name, acc_name,
@ -352,9 +358,11 @@ async def trades_dialogue(
async def handle_order_updates( async def handle_order_updates(
ws: NoBsWs, ws: NoBsWs,
ws_stream: AsyncIterator,
ems_stream: tractor.MsgStream, ems_stream: tractor.MsgStream,
emsflow: dict[str, list[MsgUnion]], emsflow: dict[str, list[MsgUnion]],
ids: bidict[str, int], ids: bidict[str, int],
reqids2txids: dict[int, str],
trans: list[pp.Transaction], trans: list[pp.Transaction],
acctid: str, acctid: str,
acc_name: str, acc_name: str,
@ -372,7 +380,7 @@ async def handle_order_updates(
# on new trade clearing events (aka order "fills") # on new trade clearing events (aka order "fills")
trans: list[pp.Transaction] trans: list[pp.Transaction]
async for msg in stream_messages(ws): async for msg in ws_stream:
match msg: match msg:
# process and relay clearing trade events to ems # process and relay clearing trade events to ems
# https://docs.kraken.com/websockets/#message-ownTrades # https://docs.kraken.com/websockets/#message-ownTrades
@ -419,7 +427,7 @@ async def handle_order_updates(
broker_details={'name': 'kraken'}, broker_details={'name': 'kraken'},
broker_time=broker_time broker_time=broker_time
) )
await ems_stream.send(fill_msg.dict()) await ems_stream.send(fill_msg)
filled_msg = BrokerdStatus( filled_msg = BrokerdStatus(
reqid=reqid, reqid=reqid,
@ -443,7 +451,7 @@ async def handle_order_updates(
# https://github.com/pikers/piker/issues/296 # https://github.com/pikers/piker/issues/296
remaining=0, remaining=0,
) )
await ems_stream.send(filled_msg.dict()) await ems_stream.send(filled_msg)
# update ledger and position tracking # update ledger and position tracking
with open_ledger(acctid, trades) as trans: with open_ledger(acctid, trades) as trans:
@ -480,7 +488,7 @@ async def handle_order_updates(
# TODO # TODO
# currency='' # currency=''
) )
await ems_stream.send(pp_msg.dict()) await ems_stream.send(pp_msg)
# process and relay order state change events # process and relay order state change events
# https://docs.kraken.com/websockets/#message-openOrders # https://docs.kraken.com/websockets/#message-openOrders
@ -550,7 +558,29 @@ async def handle_order_updates(
submit_vlm = rest.get('vol', 0) submit_vlm = rest.get('vol', 0)
exec_vlm = rest.get('vol_exec', 0) exec_vlm = rest.get('vol_exec', 0)
oid = ids.inverse[reqid] reqids2txids[reqid] = txid
oid = ids.inverse.get(reqid)
if not oid:
# TODO: handle these and relay them
# through the EMS to the client / UI
# side!
log.warning(
f'Received active order {txid}:\n'
f'{update_msg}\n'
'Cancelling order for now!..'
)
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [txid],
})
continue
msgs = emsflow[oid] msgs = emsflow[oid]
# send BrokerdStatus messages for all # send BrokerdStatus messages for all
@ -577,7 +607,7 @@ async def handle_order_updates(
), ),
) )
msgs.append(resp) msgs.append(resp)
await ems_stream.send(resp.dict()) await ems_stream.send(resp)
case _: case _:
log.warning( log.warning(
@ -589,6 +619,7 @@ async def handle_order_updates(
'event': etype, 'event': etype,
'status': status, 'status': status,
'reqid': reqid, 'reqid': reqid,
**rest,
} as event if ( } as event if (
etype in { etype in {
'addOrderStatus', 'addOrderStatus',
@ -596,7 +627,18 @@ async def handle_order_updates(
'cancelOrderStatus', 'cancelOrderStatus',
} }
): ):
oid = ids.inverse[reqid] oid = ids.inverse.get(reqid)
if not oid:
log.warning(
'Unknown order status update?:\n'
f'{event}'
)
continue
txid = rest.get('txid')
if txid:
reqids2txids[reqid] = txid
msgs = emsflow[oid] msgs = emsflow[oid]
last = msgs[-1] last = msgs[-1]
resps, errored = process_status( resps, errored = process_status(
@ -606,19 +648,10 @@ async def handle_order_updates(
msgs, msgs,
last, last,
) )
# if errored:
# if we rx any error cancel the order again
# await ws.send_msg({
# 'event': 'cancelOrder',
# 'token': token,
# 'reqid': reqid,
# 'txid': [last.reqid], # txid from submission
# })
if resps: if resps:
msgs.extend(resps) msgs.extend(resps)
for resp in resps: for resp in resps:
await ems_stream.send(resp.dict()) await ems_stream.send(resp)
case _: case _:
log.warning(f'Unhandled trades update msg: {msg}') log.warning(f'Unhandled trades update msg: {msg}')

View File

@ -22,10 +22,9 @@ from enum import Enum
from typing import Optional from typing import Optional
from bidict import bidict from bidict import bidict
from pydantic import BaseModel, validator
# from msgspec import Struct
from ..data._source import Symbol from ..data._source import Symbol
from ..data.types import Struct
from ..pp import Position from ..pp import Position
@ -41,33 +40,30 @@ SizeUnit = Enum(
) )
class Allocator(BaseModel): class Allocator(Struct):
class Config:
validate_assignment = True
copy_on_model_validation = False
arbitrary_types_allowed = True
# required to get the account validator lookup working?
extra = 'allow'
underscore_attrs_are_private = False
symbol: Symbol symbol: Symbol
account: Optional[str] = 'paper' account: Optional[str] = 'paper'
_size_units: bidict[str, Optional[str]] = _size_units
# TODO: for enums this clearly doesn't fucking work, you can't set # TODO: for enums this clearly doesn't fucking work, you can't set
# a default at startup by passing in a `dict` but yet you can set # a default at startup by passing in a `dict` but yet you can set
# that value through assignment..for wtv cucked reason.. honestly, pure # that value through assignment..for wtv cucked reason.. honestly, pure
# unintuitive garbage. # unintuitive garbage.
size_unit: str = 'currency' _size_unit: str = 'currency'
_size_units: dict[str, Optional[str]] = _size_units
@validator('size_unit', pre=True) @property
def maybe_lookup_key(cls, v): def size_unit(self) -> str:
# apply the corresponding enum key for the text "description" value return self._size_unit
@size_unit.setter
def size_unit(self, v: str) -> Optional[str]:
if v not in _size_units: if v not in _size_units:
return _size_units.inverse[v] v = _size_units.inverse[v]
assert v in _size_units assert v in _size_units
self._size_unit = v
return v return v
# TODO: if we ever want ot support non-uniform entry-slot-proportion # TODO: if we ever want ot support non-uniform entry-slot-proportion
@ -262,7 +258,7 @@ def mk_allocator(
# default allocation settings # default allocation settings
defaults: dict[str, float] = { defaults: dict[str, float] = {
'account': None, # select paper by default 'account': None, # select paper by default
'size_unit': 'currency', # 'size_unit': 'currency',
'units_limit': 400, 'units_limit': 400,
'currency_limit': 5e3, 'currency_limit': 5e3,
'slots': 4, 'slots': 4,
@ -301,6 +297,9 @@ def mk_allocator(
# entry step 1.0 # entry step 1.0
alloc.units_limit = alloc.slots alloc.units_limit = alloc.slots
else:
alloc.size_unit = 'currency'
# if the current position is already greater then the limit # if the current position is already greater then the limit
# settings, increase the limit to the current position # settings, increase the limit to the current position
if alloc.size_unit == 'currency': if alloc.size_unit == 'currency':

View File

@ -58,11 +58,11 @@ class OrderBook:
def send( def send(
self, self,
msg: Order, msg: Order | dict,
) -> dict: ) -> dict:
self._sent_orders[msg.oid] = msg self._sent_orders[msg.oid] = msg
self._to_ems.send_nowait(msg.dict()) self._to_ems.send_nowait(msg)
return msg return msg
def update( def update(
@ -73,9 +73,8 @@ class OrderBook:
) -> dict: ) -> dict:
cmd = self._sent_orders[uuid] cmd = self._sent_orders[uuid]
msg = cmd.dict() msg = cmd.copy(update=data)
msg.update(data) self._sent_orders[uuid] = msg
self._sent_orders[uuid] = Order(**msg)
self._to_ems.send_nowait(msg) self._to_ems.send_nowait(msg)
return cmd return cmd
@ -88,7 +87,7 @@ class OrderBook:
oid=uuid, oid=uuid,
symbol=cmd.symbol, symbol=cmd.symbol,
) )
self._to_ems.send_nowait(msg.dict()) self._to_ems.send_nowait(msg)
_orders: OrderBook = None _orders: OrderBook = None
@ -149,7 +148,7 @@ async def relay_order_cmds_from_sync_code(
book = get_orders() book = get_orders()
async with book._from_order_book.subscribe() as orders_stream: async with book._from_order_book.subscribe() as orders_stream:
async for cmd in orders_stream: async for cmd in orders_stream:
if cmd['symbol'] == symbol_key: if cmd.symbol == symbol_key:
log.info(f'Send order cmd:\n{pformat(cmd)}') log.info(f'Send order cmd:\n{pformat(cmd)}')
# send msg over IPC / wire # send msg over IPC / wire
await to_ems_stream.send(cmd) await to_ems_stream.send(cmd)

View File

@ -232,7 +232,7 @@ async def clear_dark_triggers(
price=submit_price, price=submit_price,
size=cmd['size'], size=cmd['size'],
) )
await brokerd_orders_stream.send(msg.dict()) await brokerd_orders_stream.send(msg)
# mark this entry as having sent an order # mark this entry as having sent an order
# request. the entry will be replaced once the # request. the entry will be replaced once the
@ -248,14 +248,11 @@ async def clear_dark_triggers(
msg = Status( msg = Status(
oid=oid, # ems order id oid=oid, # ems order id
resp=resp,
time_ns=time.time_ns(), time_ns=time.time_ns(),
symbol=fqsn, resp=resp,
trigger_price=price, trigger_price=price,
broker_details={'name': broker}, brokerd_msg=cmd,
cmd=cmd, # original request message )
).dict()
# remove exec-condition from set # remove exec-condition from set
log.info(f'removing pred for {oid}') log.info(f'removing pred for {oid}')
@ -578,11 +575,11 @@ async def translate_and_relay_brokerd_events(
if name == 'position': if name == 'position':
pos_msg = BrokerdPosition(**brokerd_msg).dict() pos_msg = BrokerdPosition(**brokerd_msg)
# XXX: this will be useful for automatic strats yah? # XXX: this will be useful for automatic strats yah?
# keep pps per account up to date locally in ``emsd`` mem # keep pps per account up to date locally in ``emsd`` mem
sym, broker = pos_msg['symbol'], pos_msg['broker'] sym, broker = pos_msg.symbol, pos_msg.broker
relay.positions.setdefault( relay.positions.setdefault(
# NOTE: translate to a FQSN! # NOTE: translate to a FQSN!
@ -684,7 +681,7 @@ async def translate_and_relay_brokerd_events(
entry.reqid = reqid entry.reqid = reqid
# tell broker to cancel immediately # tell broker to cancel immediately
await brokerd_trades_stream.send(entry.dict()) await brokerd_trades_stream.send(entry)
# - the order is now active and will be mirrored in # - the order is now active and will be mirrored in
# our book -> registered as live flow # our book -> registered as live flow
@ -724,7 +721,7 @@ async def translate_and_relay_brokerd_events(
# if 10147 in message: cancel # if 10147 in message: cancel
resp = 'broker_errored' resp = 'broker_errored'
broker_details = msg.dict() broker_details = msg
# don't relay message to order requester client # don't relay message to order requester client
# continue # continue
@ -759,7 +756,7 @@ async def translate_and_relay_brokerd_events(
resp = 'broker_' + msg.status resp = 'broker_' + msg.status
# pass the BrokerdStatus msg inside the broker details field # pass the BrokerdStatus msg inside the broker details field
broker_details = msg.dict() broker_details = msg
elif name in ( elif name in (
'fill', 'fill',
@ -768,7 +765,7 @@ async def translate_and_relay_brokerd_events(
# proxy through the "fill" result(s) # proxy through the "fill" result(s)
resp = 'broker_filled' resp = 'broker_filled'
broker_details = msg.dict() broker_details = msg
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
@ -786,7 +783,7 @@ async def translate_and_relay_brokerd_events(
time_ns=time.time_ns(), time_ns=time.time_ns(),
broker_reqid=reqid, broker_reqid=reqid,
brokerd_msg=broker_details, brokerd_msg=broker_details,
).dict() )
) )
except KeyError: except KeyError:
log.error( log.error(
@ -851,14 +848,14 @@ async def process_client_order_cmds(
# NOTE: cancel response will be relayed back in messages # NOTE: cancel response will be relayed back in messages
# from corresponding broker # from corresponding broker
if reqid: if reqid is not None:
# send cancel to brokerd immediately! # send cancel to brokerd immediately!
log.info( log.info(
f'Submitting cancel for live order {reqid}' f'Submitting cancel for live order {reqid}'
) )
await brokerd_order_stream.send(msg.dict()) await brokerd_order_stream.send(msg)
else: else:
# this might be a cancel for an order that hasn't been # this might be a cancel for an order that hasn't been
@ -880,7 +877,7 @@ async def process_client_order_cmds(
resp='dark_cancelled', resp='dark_cancelled',
oid=oid, oid=oid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
).dict() )
) )
# de-register this client dialogue # de-register this client dialogue
router.dialogues.pop(oid) router.dialogues.pop(oid)
@ -935,7 +932,7 @@ async def process_client_order_cmds(
# handle relaying the ems side responses back to # handle relaying the ems side responses back to
# the client/cmd sender from this request # the client/cmd sender from this request
log.info(f'Sending live order to {broker}:\n{pformat(msg)}') log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
await brokerd_order_stream.send(msg.dict()) await brokerd_order_stream.send(msg)
# an immediate response should be ``BrokerdOrderAck`` # an immediate response should be ``BrokerdOrderAck``
# with ems order id from the ``trades_dialogue()`` # with ems order id from the ``trades_dialogue()``
@ -1015,7 +1012,7 @@ async def process_client_order_cmds(
resp=resp, resp=resp,
oid=oid, oid=oid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
).dict() )
) )

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers # piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0) # Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -15,21 +15,26 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" """
Clearing system messagingn types and protocols. Clearing sub-system message and protocols.
""" """
from typing import Optional, Union from typing import Optional, Union
# TODO: try out just encoding/send direction for now?
# import msgspec
from pydantic import BaseModel
from ..data._source import Symbol from ..data._source import Symbol
from ..data.types import Struct
# TODO: ``msgspec`` stuff worth paying attention to:
# - schema evolution: https://jcristharif.com/msgspec/usage.html#schema-evolution
# - use literals for a common msg determined by diff keys?
# - https://jcristharif.com/msgspec/usage.html#literal
# - for eg. ``BrokerdStatus``, instead just have separate messages?
# --------------
# Client -> emsd # Client -> emsd
# --------------
class Cancel(Struct):
class Cancel(BaseModel):
'''Cancel msg for removing a dark (ems triggered) or '''Cancel msg for removing a dark (ems triggered) or
broker-submitted (live) trigger/order. broker-submitted (live) trigger/order.
@ -39,8 +44,10 @@ class Cancel(BaseModel):
symbol: str symbol: str
class Order(BaseModel): class Order(Struct):
# TODO: use ``msgspec.Literal``
# https://jcristharif.com/msgspec/usage.html#literal
action: str # {'buy', 'sell', 'alert'} action: str # {'buy', 'sell', 'alert'}
# internal ``emdsd`` unique "order id" # internal ``emdsd`` unique "order id"
oid: str # uuid4 oid: str # uuid4
@ -48,6 +55,9 @@ class Order(BaseModel):
account: str # should we set a default as '' ? account: str # should we set a default as '' ?
price: float price: float
# TODO: could we drop the ``.action`` field above and instead just
# use +/- values here? Would make the msg smaller at the sake of a
# teensie fp precision?
size: float size: float
brokers: list[str] brokers: list[str]
@ -59,20 +69,14 @@ class Order(BaseModel):
# the backend broker # the backend broker
exec_mode: str # {'dark', 'live', 'paper'} exec_mode: str # {'dark', 'live', 'paper'}
class Config:
# just for pre-loading a ``Symbol`` when used
# in the order mode staging process
arbitrary_types_allowed = True
# don't copy this model instance when used in
# a recursive model
copy_on_model_validation = False
# --------------
# Client <- emsd # Client <- emsd
# --------------
# update msgs from ems which relay state change info # update msgs from ems which relay state change info
# from the active clearing engine. # from the active clearing engine.
class Status(Struct):
class Status(BaseModel):
name: str = 'status' name: str = 'status'
oid: str # uuid4 oid: str # uuid4
@ -95,8 +99,6 @@ class Status(BaseModel):
# } # }
resp: str # "response", see above resp: str # "response", see above
# symbol: str
# trigger info # trigger info
trigger_price: Optional[float] = None trigger_price: Optional[float] = None
# price: float # price: float
@ -111,10 +113,12 @@ class Status(BaseModel):
brokerd_msg: dict = {} brokerd_msg: dict = {}
# ---------------
# emsd -> brokerd # emsd -> brokerd
# ---------------
# requests *sent* from ems to respective backend broker daemon # requests *sent* from ems to respective backend broker daemon
class BrokerdCancel(BaseModel): class BrokerdCancel(Struct):
action: str = 'cancel' action: str = 'cancel'
oid: str # piker emsd order id oid: str # piker emsd order id
@ -130,7 +134,7 @@ class BrokerdCancel(BaseModel):
reqid: Optional[Union[int, str]] = None reqid: Optional[Union[int, str]] = None
class BrokerdOrder(BaseModel): class BrokerdOrder(Struct):
action: str # {buy, sell} action: str # {buy, sell}
oid: str oid: str
@ -150,11 +154,12 @@ class BrokerdOrder(BaseModel):
size: float size: float
# ---------------
# emsd <- brokerd # emsd <- brokerd
# ---------------
# requests *received* to ems from broker backend # requests *received* to ems from broker backend
class BrokerdOrderAck(Struct):
class BrokerdOrderAck(BaseModel):
''' '''
Immediate reponse to a brokerd order request providing the broker Immediate reponse to a brokerd order request providing the broker
specific unique order id so that the EMS can associate this specific unique order id so that the EMS can associate this
@ -172,7 +177,7 @@ class BrokerdOrderAck(BaseModel):
account: str = '' account: str = ''
class BrokerdStatus(BaseModel): class BrokerdStatus(Struct):
name: str = 'status' name: str = 'status'
reqid: Union[int, str] reqid: Union[int, str]
@ -205,7 +210,7 @@ class BrokerdStatus(BaseModel):
} }
class BrokerdFill(BaseModel): class BrokerdFill(Struct):
''' '''
A single message indicating a "fill-details" event from the broker A single message indicating a "fill-details" event from the broker
if avaiable. if avaiable.
@ -230,7 +235,7 @@ class BrokerdFill(BaseModel):
broker_time: float broker_time: float
class BrokerdError(BaseModel): class BrokerdError(Struct):
''' '''
Optional error type that can be relayed to emsd for error handling. Optional error type that can be relayed to emsd for error handling.
@ -249,7 +254,7 @@ class BrokerdError(BaseModel):
broker_details: dict = {} broker_details: dict = {}
class BrokerdPosition(BaseModel): class BrokerdPosition(Struct):
'''Position update event from brokerd. '''Position update event from brokerd.
''' '''

View File

@ -117,7 +117,7 @@ class PaperBoi:
reason='paper_trigger', reason='paper_trigger',
remaining=size, remaining=size,
) )
await self.ems_trades_stream.send(msg.dict()) await self.ems_trades_stream.send(msg)
# if we're already a clearing price simulate an immediate fill # if we're already a clearing price simulate an immediate fill
if ( if (
@ -173,7 +173,7 @@ class PaperBoi:
broker=self.broker, broker=self.broker,
time_ns=time.time_ns(), time_ns=time.time_ns(),
) )
await self.ems_trades_stream.send(msg.dict()) await self.ems_trades_stream.send(msg)
async def fake_fill( async def fake_fill(
self, self,
@ -216,7 +216,7 @@ class PaperBoi:
'name': self.broker + '_paper', 'name': self.broker + '_paper',
}, },
) )
await self.ems_trades_stream.send(msg.dict()) await self.ems_trades_stream.send(msg)
if order_complete: if order_complete:
@ -240,7 +240,7 @@ class PaperBoi:
'name': self.broker, 'name': self.broker,
}, },
) )
await self.ems_trades_stream.send(msg.dict()) await self.ems_trades_stream.send(msg)
# lookup any existing position # lookup any existing position
token = f'{symbol}.{self.broker}' token = f'{symbol}.{self.broker}'
@ -268,7 +268,7 @@ class PaperBoi:
) )
pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price) pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price)
await self.ems_trades_stream.send(pp_msg.dict()) await self.ems_trades_stream.send(pp_msg)
async def simulate_fills( async def simulate_fills(
@ -384,7 +384,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'Paper only. No account found: `{account}` ?', reason=f'Paper only. No account found: `{account}` ?',
).dict()) ))
continue continue
# validate # validate
@ -416,7 +416,7 @@ async def handle_order_requests(
# broker specific request id # broker specific request id
reqid=reqid, reqid=reqid,
).dict() )
) )
elif action == 'cancel': elif action == 'cancel':

View File

@ -0,0 +1,68 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Built-in (extension) types.
"""
from typing import Optional
from pprint import pformat
import msgspec
class Struct(
msgspec.Struct,
# https://jcristharif.com/msgspec/structs.html#tagged-unions
# tag='pikerstruct',
# tag=True,
):
'''
A "human friendlier" (aka repl buddy) struct subtype.
'''
def to_dict(self) -> dict:
return {
f: getattr(self, f)
for f in self.__struct_fields__
}
def __repr__(self):
return f'Struct({pformat(self.to_dict())})'
def copy(
self,
update: Optional[dict] = None,
) -> msgspec.Struct:
'''
Validate-typecast all self defined fields, return a copy of us
with all such fields.
This is kinda like the default behaviour in `pydantic.BaseModel`.
'''
if update:
for k, v in update.items():
setattr(self, k, v)
# roundtrip serialize to validate
return msgspec.msgpack.Decoder(
type=type(self)
).decode(
msgspec.msgpack.Encoder().encode(self)
)

View File

@ -619,7 +619,7 @@ class FillStatusBar(QProgressBar):
# color: #19232D; # color: #19232D;
# width: 10px; # width: 10px;
self.setRange(0, slots) self.setRange(0, int(slots))
self.setValue(value) self.setValue(value)

View File

@ -264,7 +264,8 @@ class OrderMode:
self, self,
) -> OrderDialog: ) -> OrderDialog:
'''Send execution order to EMS return a level line to '''
Send execution order to EMS return a level line to
represent the order on a chart. represent the order on a chart.
''' '''
@ -273,13 +274,9 @@ class OrderMode:
oid = str(uuid.uuid4()) oid = str(uuid.uuid4())
# format order data for ems # format order data for ems
fqsn = symbol.front_fqsn() order = staged.copy()
order = staged.copy( order.oid = oid
update={ order.symbol = symbol.front_fqsn()
'symbol': fqsn,
'oid': oid,
}
)
line = self.line_from_order( line = self.line_from_order(
order, order,