Compare commits

..

No commits in common. "72072b5737a58d01c2b39787950a3d10ce189dae" and "7d49335f8b8149f3d8eb2f750119490d82792736" have entirely different histories.

10 changed files with 139 additions and 236 deletions

View File

@ -148,7 +148,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'No account found: `{account}` ?', reason=f'No account found: `{account}` ?',
)) ).dict())
continue continue
client = _accounts2clients.get(account) client = _accounts2clients.get(account)
@ -161,7 +161,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'No api client loaded for account: `{account}` ?', reason=f'No api client loaded for account: `{account}` ?',
)) ).dict())
continue continue
if action in {'buy', 'sell'}: if action in {'buy', 'sell'}:
@ -188,7 +188,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason='Order already active?', reason='Order already active?',
)) ).dict())
# deliver ack that order has been submitted to broker routing # deliver ack that order has been submitted to broker routing
await ems_order_stream.send( await ems_order_stream.send(
@ -197,8 +197,9 @@ async def handle_order_requests(
oid=order.oid, oid=order.oid,
# broker specific request id # broker specific request id
reqid=reqid, reqid=reqid,
time_ns=time.time_ns(),
account=account, account=account,
) ).dict()
) )
elif action == 'cancel': elif action == 'cancel':
@ -558,7 +559,7 @@ async def trades_dialogue(
cids2pps, cids2pps,
validate=True, validate=True,
) )
all_positions.extend(msg for msg in msgs) all_positions.extend(msg.dict() for msg in msgs)
if not all_positions and cids2pps: if not all_positions and cids2pps:
raise RuntimeError( raise RuntimeError(
@ -664,7 +665,7 @@ async def emit_pp_update(
msg = msgs[0] msg = msgs[0]
break break
await ems_stream.send(msg) await ems_stream.send(msg.dict())
async def deliver_trade_events( async def deliver_trade_events(
@ -742,7 +743,7 @@ async def deliver_trade_events(
broker_details={'name': 'ib'}, broker_details={'name': 'ib'},
) )
await ems_stream.send(msg) await ems_stream.send(msg.dict())
case 'fill': case 'fill':
@ -802,7 +803,7 @@ async def deliver_trade_events(
broker_time=trade_entry['broker_time'], broker_time=trade_entry['broker_time'],
) )
await ems_stream.send(msg) await ems_stream.send(msg.dict())
# 2 cases: # 2 cases:
# - fill comes first or # - fill comes first or
@ -878,7 +879,7 @@ async def deliver_trade_events(
cid, msg = pack_position(item) cid, msg = pack_position(item)
# acctid = msg.account = accounts_def.inverse[msg.account] # acctid = msg.account = accounts_def.inverse[msg.account]
# cuck ib and it's shitty fifo sys for pps! # cuck ib and it's shitty fifo sys for pps!
# await ems_stream.send(msg) # await ems_stream.send(msg.dict())
case 'event': case 'event':
@ -890,7 +891,7 @@ async def deliver_trade_events(
# level... # level...
# reqid = item.get('reqid', 0) # reqid = item.get('reqid', 0)
# if getattr(msg, 'reqid', 0) < -1: # if getattr(msg, 'reqid', 0) < -1:
# log.info(f"TWS triggered trade\n{pformat(msg)}") # log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
# msg.reqid = 'tws-' + str(-1 * reqid) # msg.reqid = 'tws-' + str(-1 * reqid)

View File

@ -32,7 +32,6 @@ from typing import (
Union, Union,
) )
from async_generator import aclosing
from bidict import bidict from bidict import bidict
import pendulum import pendulum
import trio import trio
@ -82,7 +81,6 @@ async def handle_order_requests(
token: str, token: str,
emsflow: dict[str, list[MsgUnion]], emsflow: dict[str, list[MsgUnion]],
ids: bidict[str, int], ids: bidict[str, int],
reqids2txids: dict[int, str],
) -> None: ) -> None:
''' '''
@ -98,23 +96,6 @@ async def handle_order_requests(
async for msg in ems_order_stream: async for msg in ems_order_stream:
log.info(f'Rx order msg:\n{pformat(msg)}') log.info(f'Rx order msg:\n{pformat(msg)}')
match msg: match msg:
case {
'action': 'cancel',
}:
cancel = BrokerdCancel(**msg)
last = emsflow[cancel.oid]
reqid = ids[cancel.oid]
txid = reqids2txids[reqid]
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [txid], # should be txid from submission
})
case { case {
'account': 'kraken.spot' as account, 'account': 'kraken.spot' as account,
'action': action, 'action': action,
@ -127,9 +108,10 @@ async def handle_order_requests(
if order.oid in ids: if order.oid in ids:
ep = 'editOrder' ep = 'editOrder'
reqid = ids[order.oid] # integer not txid reqid = ids[order.oid] # integer not txid
txid = reqids2txids[reqid] last = emsflow[order.oid][-1]
assert last.reqid == order.reqid
extra = { extra = {
'orderid': txid, # txid 'orderid': last.reqid, # txid
} }
else: else:
@ -176,6 +158,23 @@ async def handle_order_requests(
# placehold for sanity checking in relay loop # placehold for sanity checking in relay loop
emsflow.setdefault(order.oid, []).append(order) emsflow.setdefault(order.oid, []).append(order)
case {
'account': 'kraken.spot' as account,
'action': 'cancel',
}:
cancel = BrokerdCancel(**msg)
assert cancel.oid in emsflow
reqid = ids[cancel.oid]
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [cancel.reqid], # should be txid from submission
})
case _: case _:
account = msg.get('account') account = msg.get('account')
if account != 'kraken.spot': if account != 'kraken.spot':
@ -192,7 +191,7 @@ async def handle_order_requests(
'Invalid request msg:\n{msg}' 'Invalid request msg:\n{msg}'
), ),
) ).dict()
) )
@ -290,7 +289,7 @@ async def trades_dialogue(
avg_price=p.be_price, avg_price=p.be_price,
currency='', currency='',
) )
position_msgs.append(msg) position_msgs.append(msg.dict())
await ctx.started( await ctx.started(
(position_msgs, [acc_name]) (position_msgs, [acc_name])
@ -317,7 +316,6 @@ async def trades_dialogue(
), ),
) as ws, ) as ws,
trio.open_nursery() as n, trio.open_nursery() as n,
aclosing(stream_messages(ws)) as stream,
): ):
# task local msg dialog tracking # task local msg dialog tracking
emsflow: dict[ emsflow: dict[
@ -327,7 +325,6 @@ async def trades_dialogue(
# 2way map for ems ids to kraken int reqids.. # 2way map for ems ids to kraken int reqids..
ids: bidict[str, int] = bidict() ids: bidict[str, int] = bidict()
reqids2txids: dict[int, str] = {}
# task for processing inbound requests from ems # task for processing inbound requests from ems
n.start_soon( n.start_soon(
@ -338,17 +335,14 @@ async def trades_dialogue(
token, token,
emsflow, emsflow,
ids, ids,
reqids2txids,
) )
# enter relay loop # enter relay loop
await handle_order_updates( await handle_order_updates(
ws, ws,
stream,
ems_stream, ems_stream,
emsflow, emsflow,
ids, ids,
reqids2txids,
trans, trans,
acctid, acctid,
acc_name, acc_name,
@ -358,11 +352,9 @@ async def trades_dialogue(
async def handle_order_updates( async def handle_order_updates(
ws: NoBsWs, ws: NoBsWs,
ws_stream: AsyncIterator,
ems_stream: tractor.MsgStream, ems_stream: tractor.MsgStream,
emsflow: dict[str, list[MsgUnion]], emsflow: dict[str, list[MsgUnion]],
ids: bidict[str, int], ids: bidict[str, int],
reqids2txids: dict[int, str],
trans: list[pp.Transaction], trans: list[pp.Transaction],
acctid: str, acctid: str,
acc_name: str, acc_name: str,
@ -380,7 +372,7 @@ async def handle_order_updates(
# on new trade clearing events (aka order "fills") # on new trade clearing events (aka order "fills")
trans: list[pp.Transaction] trans: list[pp.Transaction]
async for msg in ws_stream: async for msg in stream_messages(ws):
match msg: match msg:
# process and relay clearing trade events to ems # process and relay clearing trade events to ems
# https://docs.kraken.com/websockets/#message-ownTrades # https://docs.kraken.com/websockets/#message-ownTrades
@ -427,7 +419,7 @@ async def handle_order_updates(
broker_details={'name': 'kraken'}, broker_details={'name': 'kraken'},
broker_time=broker_time broker_time=broker_time
) )
await ems_stream.send(fill_msg) await ems_stream.send(fill_msg.dict())
filled_msg = BrokerdStatus( filled_msg = BrokerdStatus(
reqid=reqid, reqid=reqid,
@ -451,7 +443,7 @@ async def handle_order_updates(
# https://github.com/pikers/piker/issues/296 # https://github.com/pikers/piker/issues/296
remaining=0, remaining=0,
) )
await ems_stream.send(filled_msg) await ems_stream.send(filled_msg.dict())
# update ledger and position tracking # update ledger and position tracking
with open_ledger(acctid, trades) as trans: with open_ledger(acctid, trades) as trans:
@ -488,7 +480,7 @@ async def handle_order_updates(
# TODO # TODO
# currency='' # currency=''
) )
await ems_stream.send(pp_msg) await ems_stream.send(pp_msg.dict())
# process and relay order state change events # process and relay order state change events
# https://docs.kraken.com/websockets/#message-openOrders # https://docs.kraken.com/websockets/#message-openOrders
@ -558,29 +550,7 @@ async def handle_order_updates(
submit_vlm = rest.get('vol', 0) submit_vlm = rest.get('vol', 0)
exec_vlm = rest.get('vol_exec', 0) exec_vlm = rest.get('vol_exec', 0)
reqids2txids[reqid] = txid oid = ids.inverse[reqid]
oid = ids.inverse.get(reqid)
if not oid:
# TODO: handle these and relay them
# through the EMS to the client / UI
# side!
log.warning(
f'Received active order {txid}:\n'
f'{update_msg}\n'
'Cancelling order for now!..'
)
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [txid],
})
continue
msgs = emsflow[oid] msgs = emsflow[oid]
# send BrokerdStatus messages for all # send BrokerdStatus messages for all
@ -607,7 +577,7 @@ async def handle_order_updates(
), ),
) )
msgs.append(resp) msgs.append(resp)
await ems_stream.send(resp) await ems_stream.send(resp.dict())
case _: case _:
log.warning( log.warning(
@ -619,7 +589,6 @@ async def handle_order_updates(
'event': etype, 'event': etype,
'status': status, 'status': status,
'reqid': reqid, 'reqid': reqid,
**rest,
} as event if ( } as event if (
etype in { etype in {
'addOrderStatus', 'addOrderStatus',
@ -627,18 +596,7 @@ async def handle_order_updates(
'cancelOrderStatus', 'cancelOrderStatus',
} }
): ):
oid = ids.inverse.get(reqid) oid = ids.inverse[reqid]
if not oid:
log.warning(
'Unknown order status update?:\n'
f'{event}'
)
continue
txid = rest.get('txid')
if txid:
reqids2txids[reqid] = txid
msgs = emsflow[oid] msgs = emsflow[oid]
last = msgs[-1] last = msgs[-1]
resps, errored = process_status( resps, errored = process_status(
@ -648,10 +606,19 @@ async def handle_order_updates(
msgs, msgs,
last, last,
) )
# if errored:
# if we rx any error cancel the order again
# await ws.send_msg({
# 'event': 'cancelOrder',
# 'token': token,
# 'reqid': reqid,
# 'txid': [last.reqid], # txid from submission
# })
if resps: if resps:
msgs.extend(resps) msgs.extend(resps)
for resp in resps: for resp in resps:
await ems_stream.send(resp) await ems_stream.send(resp.dict())
case _: case _:
log.warning(f'Unhandled trades update msg: {msg}') log.warning(f'Unhandled trades update msg: {msg}')

View File

@ -22,9 +22,10 @@ from enum import Enum
from typing import Optional from typing import Optional
from bidict import bidict from bidict import bidict
from pydantic import BaseModel, validator
# from msgspec import Struct
from ..data._source import Symbol from ..data._source import Symbol
from ..data.types import Struct
from ..pp import Position from ..pp import Position
@ -40,30 +41,33 @@ SizeUnit = Enum(
) )
class Allocator(Struct): class Allocator(BaseModel):
class Config:
validate_assignment = True
copy_on_model_validation = False
arbitrary_types_allowed = True
# required to get the account validator lookup working?
extra = 'allow'
underscore_attrs_are_private = False
symbol: Symbol symbol: Symbol
account: Optional[str] = 'paper' account: Optional[str] = 'paper'
_size_units: bidict[str, Optional[str]] = _size_units
# TODO: for enums this clearly doesn't fucking work, you can't set # TODO: for enums this clearly doesn't fucking work, you can't set
# a default at startup by passing in a `dict` but yet you can set # a default at startup by passing in a `dict` but yet you can set
# that value through assignment..for wtv cucked reason.. honestly, pure # that value through assignment..for wtv cucked reason.. honestly, pure
# unintuitive garbage. # unintuitive garbage.
_size_unit: str = 'currency' size_unit: str = 'currency'
_size_units: dict[str, Optional[str]] = _size_units
@property @validator('size_unit', pre=True)
def size_unit(self) -> str: def maybe_lookup_key(cls, v):
return self._size_unit # apply the corresponding enum key for the text "description" value
@size_unit.setter
def size_unit(self, v: str) -> Optional[str]:
if v not in _size_units: if v not in _size_units:
v = _size_units.inverse[v] return _size_units.inverse[v]
assert v in _size_units assert v in _size_units
self._size_unit = v
return v return v
# TODO: if we ever want ot support non-uniform entry-slot-proportion # TODO: if we ever want ot support non-uniform entry-slot-proportion
@ -258,7 +262,7 @@ def mk_allocator(
# default allocation settings # default allocation settings
defaults: dict[str, float] = { defaults: dict[str, float] = {
'account': None, # select paper by default 'account': None, # select paper by default
# 'size_unit': 'currency', 'size_unit': 'currency',
'units_limit': 400, 'units_limit': 400,
'currency_limit': 5e3, 'currency_limit': 5e3,
'slots': 4, 'slots': 4,
@ -297,9 +301,6 @@ def mk_allocator(
# entry step 1.0 # entry step 1.0
alloc.units_limit = alloc.slots alloc.units_limit = alloc.slots
else:
alloc.size_unit = 'currency'
# if the current position is already greater then the limit # if the current position is already greater then the limit
# settings, increase the limit to the current position # settings, increase the limit to the current position
if alloc.size_unit == 'currency': if alloc.size_unit == 'currency':

View File

@ -58,11 +58,11 @@ class OrderBook:
def send( def send(
self, self,
msg: Order | dict, msg: Order,
) -> dict: ) -> dict:
self._sent_orders[msg.oid] = msg self._sent_orders[msg.oid] = msg
self._to_ems.send_nowait(msg) self._to_ems.send_nowait(msg.dict())
return msg return msg
def update( def update(
@ -73,8 +73,9 @@ class OrderBook:
) -> dict: ) -> dict:
cmd = self._sent_orders[uuid] cmd = self._sent_orders[uuid]
msg = cmd.copy(update=data) msg = cmd.dict()
self._sent_orders[uuid] = msg msg.update(data)
self._sent_orders[uuid] = Order(**msg)
self._to_ems.send_nowait(msg) self._to_ems.send_nowait(msg)
return cmd return cmd
@ -87,7 +88,7 @@ class OrderBook:
oid=uuid, oid=uuid,
symbol=cmd.symbol, symbol=cmd.symbol,
) )
self._to_ems.send_nowait(msg) self._to_ems.send_nowait(msg.dict())
_orders: OrderBook = None _orders: OrderBook = None
@ -148,7 +149,7 @@ async def relay_order_cmds_from_sync_code(
book = get_orders() book = get_orders()
async with book._from_order_book.subscribe() as orders_stream: async with book._from_order_book.subscribe() as orders_stream:
async for cmd in orders_stream: async for cmd in orders_stream:
if cmd.symbol == symbol_key: if cmd['symbol'] == symbol_key:
log.info(f'Send order cmd:\n{pformat(cmd)}') log.info(f'Send order cmd:\n{pformat(cmd)}')
# send msg over IPC / wire # send msg over IPC / wire
await to_ems_stream.send(cmd) await to_ems_stream.send(cmd)

View File

@ -232,7 +232,7 @@ async def clear_dark_triggers(
price=submit_price, price=submit_price,
size=cmd['size'], size=cmd['size'],
) )
await brokerd_orders_stream.send(msg) await brokerd_orders_stream.send(msg.dict())
# mark this entry as having sent an order # mark this entry as having sent an order
# request. the entry will be replaced once the # request. the entry will be replaced once the
@ -248,11 +248,14 @@ async def clear_dark_triggers(
msg = Status( msg = Status(
oid=oid, # ems order id oid=oid, # ems order id
time_ns=time.time_ns(),
resp=resp, resp=resp,
time_ns=time.time_ns(),
symbol=fqsn,
trigger_price=price, trigger_price=price,
brokerd_msg=cmd, broker_details={'name': broker},
) cmd=cmd, # original request message
).dict()
# remove exec-condition from set # remove exec-condition from set
log.info(f'removing pred for {oid}') log.info(f'removing pred for {oid}')
@ -575,11 +578,11 @@ async def translate_and_relay_brokerd_events(
if name == 'position': if name == 'position':
pos_msg = BrokerdPosition(**brokerd_msg) pos_msg = BrokerdPosition(**brokerd_msg).dict()
# XXX: this will be useful for automatic strats yah? # XXX: this will be useful for automatic strats yah?
# keep pps per account up to date locally in ``emsd`` mem # keep pps per account up to date locally in ``emsd`` mem
sym, broker = pos_msg.symbol, pos_msg.broker sym, broker = pos_msg['symbol'], pos_msg['broker']
relay.positions.setdefault( relay.positions.setdefault(
# NOTE: translate to a FQSN! # NOTE: translate to a FQSN!
@ -681,7 +684,7 @@ async def translate_and_relay_brokerd_events(
entry.reqid = reqid entry.reqid = reqid
# tell broker to cancel immediately # tell broker to cancel immediately
await brokerd_trades_stream.send(entry) await brokerd_trades_stream.send(entry.dict())
# - the order is now active and will be mirrored in # - the order is now active and will be mirrored in
# our book -> registered as live flow # our book -> registered as live flow
@ -721,7 +724,7 @@ async def translate_and_relay_brokerd_events(
# if 10147 in message: cancel # if 10147 in message: cancel
resp = 'broker_errored' resp = 'broker_errored'
broker_details = msg broker_details = msg.dict()
# don't relay message to order requester client # don't relay message to order requester client
# continue # continue
@ -756,7 +759,7 @@ async def translate_and_relay_brokerd_events(
resp = 'broker_' + msg.status resp = 'broker_' + msg.status
# pass the BrokerdStatus msg inside the broker details field # pass the BrokerdStatus msg inside the broker details field
broker_details = msg broker_details = msg.dict()
elif name in ( elif name in (
'fill', 'fill',
@ -765,7 +768,7 @@ async def translate_and_relay_brokerd_events(
# proxy through the "fill" result(s) # proxy through the "fill" result(s)
resp = 'broker_filled' resp = 'broker_filled'
broker_details = msg broker_details = msg.dict()
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
@ -783,7 +786,7 @@ async def translate_and_relay_brokerd_events(
time_ns=time.time_ns(), time_ns=time.time_ns(),
broker_reqid=reqid, broker_reqid=reqid,
brokerd_msg=broker_details, brokerd_msg=broker_details,
) ).dict()
) )
except KeyError: except KeyError:
log.error( log.error(
@ -848,14 +851,14 @@ async def process_client_order_cmds(
# NOTE: cancel response will be relayed back in messages # NOTE: cancel response will be relayed back in messages
# from corresponding broker # from corresponding broker
if reqid is not None: if reqid:
# send cancel to brokerd immediately! # send cancel to brokerd immediately!
log.info( log.info(
f'Submitting cancel for live order {reqid}' f'Submitting cancel for live order {reqid}'
) )
await brokerd_order_stream.send(msg) await brokerd_order_stream.send(msg.dict())
else: else:
# this might be a cancel for an order that hasn't been # this might be a cancel for an order that hasn't been
@ -877,7 +880,7 @@ async def process_client_order_cmds(
resp='dark_cancelled', resp='dark_cancelled',
oid=oid, oid=oid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
) ).dict()
) )
# de-register this client dialogue # de-register this client dialogue
router.dialogues.pop(oid) router.dialogues.pop(oid)
@ -932,7 +935,7 @@ async def process_client_order_cmds(
# handle relaying the ems side responses back to # handle relaying the ems side responses back to
# the client/cmd sender from this request # the client/cmd sender from this request
log.info(f'Sending live order to {broker}:\n{pformat(msg)}') log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
await brokerd_order_stream.send(msg) await brokerd_order_stream.send(msg.dict())
# an immediate response should be ``BrokerdOrderAck`` # an immediate response should be ``BrokerdOrderAck``
# with ems order id from the ``trades_dialogue()`` # with ems order id from the ``trades_dialogue()``
@ -1012,7 +1015,7 @@ async def process_client_order_cmds(
resp=resp, resp=resp,
oid=oid, oid=oid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
) ).dict()
) )

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers # piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers) # Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -15,26 +15,21 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" """
Clearing sub-system message and protocols. Clearing system messagingn types and protocols.
""" """
from typing import Optional, Union from typing import Optional, Union
# TODO: try out just encoding/send direction for now?
# import msgspec
from pydantic import BaseModel
from ..data._source import Symbol from ..data._source import Symbol
from ..data.types import Struct
# TODO: ``msgspec`` stuff worth paying attention to:
# - schema evolution: https://jcristharif.com/msgspec/usage.html#schema-evolution
# - use literals for a common msg determined by diff keys?
# - https://jcristharif.com/msgspec/usage.html#literal
# - for eg. ``BrokerdStatus``, instead just have separate messages?
# --------------
# Client -> emsd # Client -> emsd
# --------------
class Cancel(Struct):
class Cancel(BaseModel):
'''Cancel msg for removing a dark (ems triggered) or '''Cancel msg for removing a dark (ems triggered) or
broker-submitted (live) trigger/order. broker-submitted (live) trigger/order.
@ -44,10 +39,8 @@ class Cancel(Struct):
symbol: str symbol: str
class Order(Struct): class Order(BaseModel):
# TODO: use ``msgspec.Literal``
# https://jcristharif.com/msgspec/usage.html#literal
action: str # {'buy', 'sell', 'alert'} action: str # {'buy', 'sell', 'alert'}
# internal ``emdsd`` unique "order id" # internal ``emdsd`` unique "order id"
oid: str # uuid4 oid: str # uuid4
@ -55,9 +48,6 @@ class Order(Struct):
account: str # should we set a default as '' ? account: str # should we set a default as '' ?
price: float price: float
# TODO: could we drop the ``.action`` field above and instead just
# use +/- values here? Would make the msg smaller at the sake of a
# teensie fp precision?
size: float size: float
brokers: list[str] brokers: list[str]
@ -69,14 +59,20 @@ class Order(Struct):
# the backend broker # the backend broker
exec_mode: str # {'dark', 'live', 'paper'} exec_mode: str # {'dark', 'live', 'paper'}
class Config:
# just for pre-loading a ``Symbol`` when used
# in the order mode staging process
arbitrary_types_allowed = True
# don't copy this model instance when used in
# a recursive model
copy_on_model_validation = False
# --------------
# Client <- emsd # Client <- emsd
# --------------
# update msgs from ems which relay state change info # update msgs from ems which relay state change info
# from the active clearing engine. # from the active clearing engine.
class Status(Struct):
class Status(BaseModel):
name: str = 'status' name: str = 'status'
oid: str # uuid4 oid: str # uuid4
@ -99,6 +95,8 @@ class Status(Struct):
# } # }
resp: str # "response", see above resp: str # "response", see above
# symbol: str
# trigger info # trigger info
trigger_price: Optional[float] = None trigger_price: Optional[float] = None
# price: float # price: float
@ -113,12 +111,10 @@ class Status(Struct):
brokerd_msg: dict = {} brokerd_msg: dict = {}
# ---------------
# emsd -> brokerd # emsd -> brokerd
# ---------------
# requests *sent* from ems to respective backend broker daemon # requests *sent* from ems to respective backend broker daemon
class BrokerdCancel(Struct): class BrokerdCancel(BaseModel):
action: str = 'cancel' action: str = 'cancel'
oid: str # piker emsd order id oid: str # piker emsd order id
@ -134,7 +130,7 @@ class BrokerdCancel(Struct):
reqid: Optional[Union[int, str]] = None reqid: Optional[Union[int, str]] = None
class BrokerdOrder(Struct): class BrokerdOrder(BaseModel):
action: str # {buy, sell} action: str # {buy, sell}
oid: str oid: str
@ -154,12 +150,11 @@ class BrokerdOrder(Struct):
size: float size: float
# ---------------
# emsd <- brokerd # emsd <- brokerd
# ---------------
# requests *received* to ems from broker backend # requests *received* to ems from broker backend
class BrokerdOrderAck(Struct):
class BrokerdOrderAck(BaseModel):
''' '''
Immediate reponse to a brokerd order request providing the broker Immediate reponse to a brokerd order request providing the broker
specific unique order id so that the EMS can associate this specific unique order id so that the EMS can associate this
@ -177,7 +172,7 @@ class BrokerdOrderAck(Struct):
account: str = '' account: str = ''
class BrokerdStatus(Struct): class BrokerdStatus(BaseModel):
name: str = 'status' name: str = 'status'
reqid: Union[int, str] reqid: Union[int, str]
@ -210,7 +205,7 @@ class BrokerdStatus(Struct):
} }
class BrokerdFill(Struct): class BrokerdFill(BaseModel):
''' '''
A single message indicating a "fill-details" event from the broker A single message indicating a "fill-details" event from the broker
if avaiable. if avaiable.
@ -235,7 +230,7 @@ class BrokerdFill(Struct):
broker_time: float broker_time: float
class BrokerdError(Struct): class BrokerdError(BaseModel):
''' '''
Optional error type that can be relayed to emsd for error handling. Optional error type that can be relayed to emsd for error handling.
@ -254,7 +249,7 @@ class BrokerdError(Struct):
broker_details: dict = {} broker_details: dict = {}
class BrokerdPosition(Struct): class BrokerdPosition(BaseModel):
'''Position update event from brokerd. '''Position update event from brokerd.
''' '''

View File

@ -117,7 +117,7 @@ class PaperBoi:
reason='paper_trigger', reason='paper_trigger',
remaining=size, remaining=size,
) )
await self.ems_trades_stream.send(msg) await self.ems_trades_stream.send(msg.dict())
# if we're already a clearing price simulate an immediate fill # if we're already a clearing price simulate an immediate fill
if ( if (
@ -173,7 +173,7 @@ class PaperBoi:
broker=self.broker, broker=self.broker,
time_ns=time.time_ns(), time_ns=time.time_ns(),
) )
await self.ems_trades_stream.send(msg) await self.ems_trades_stream.send(msg.dict())
async def fake_fill( async def fake_fill(
self, self,
@ -216,7 +216,7 @@ class PaperBoi:
'name': self.broker + '_paper', 'name': self.broker + '_paper',
}, },
) )
await self.ems_trades_stream.send(msg) await self.ems_trades_stream.send(msg.dict())
if order_complete: if order_complete:
@ -240,7 +240,7 @@ class PaperBoi:
'name': self.broker, 'name': self.broker,
}, },
) )
await self.ems_trades_stream.send(msg) await self.ems_trades_stream.send(msg.dict())
# lookup any existing position # lookup any existing position
token = f'{symbol}.{self.broker}' token = f'{symbol}.{self.broker}'
@ -268,7 +268,7 @@ class PaperBoi:
) )
pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price) pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price)
await self.ems_trades_stream.send(pp_msg) await self.ems_trades_stream.send(pp_msg.dict())
async def simulate_fills( async def simulate_fills(
@ -384,7 +384,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'Paper only. No account found: `{account}` ?', reason=f'Paper only. No account found: `{account}` ?',
)) ).dict())
continue continue
# validate # validate
@ -416,7 +416,7 @@ async def handle_order_requests(
# broker specific request id # broker specific request id
reqid=reqid, reqid=reqid,
) ).dict()
) )
elif action == 'cancel': elif action == 'cancel':

View File

@ -1,68 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Built-in (extension) types.
"""
from typing import Optional
from pprint import pformat
import msgspec
class Struct(
msgspec.Struct,
# https://jcristharif.com/msgspec/structs.html#tagged-unions
# tag='pikerstruct',
# tag=True,
):
'''
A "human friendlier" (aka repl buddy) struct subtype.
'''
def to_dict(self) -> dict:
return {
f: getattr(self, f)
for f in self.__struct_fields__
}
def __repr__(self):
return f'Struct({pformat(self.to_dict())})'
def copy(
self,
update: Optional[dict] = None,
) -> msgspec.Struct:
'''
Validate-typecast all self defined fields, return a copy of us
with all such fields.
This is kinda like the default behaviour in `pydantic.BaseModel`.
'''
if update:
for k, v in update.items():
setattr(self, k, v)
# roundtrip serialize to validate
return msgspec.msgpack.Decoder(
type=type(self)
).decode(
msgspec.msgpack.Encoder().encode(self)
)

View File

@ -619,7 +619,7 @@ class FillStatusBar(QProgressBar):
# color: #19232D; # color: #19232D;
# width: 10px; # width: 10px;
self.setRange(0, int(slots)) self.setRange(0, slots)
self.setValue(value) self.setValue(value)

View File

@ -264,8 +264,7 @@ class OrderMode:
self, self,
) -> OrderDialog: ) -> OrderDialog:
''' '''Send execution order to EMS return a level line to
Send execution order to EMS return a level line to
represent the order on a chart. represent the order on a chart.
''' '''
@ -274,9 +273,13 @@ class OrderMode:
oid = str(uuid.uuid4()) oid = str(uuid.uuid4())
# format order data for ems # format order data for ems
order = staged.copy() fqsn = symbol.front_fqsn()
order.oid = oid order = staged.copy(
order.symbol = symbol.front_fqsn() update={
'symbol': fqsn,
'oid': oid,
}
)
line = self.line_from_order( line = self.line_from_order(
order, order,