Compare commits

...

28 Commits

Author SHA1 Message Date
Tyler Goodlet 72072b5737 Support `0` value `reqid`s 🤦 2022-07-08 23:16:29 -04:00
Tyler Goodlet 381b3121d6 Cancel any live orders found on connect
More or less just to avoid orders the user wasn't aware of from
persisting until we get "open order relaying" through the ems working.

Some further fixes which required a new `reqids2txids` map which keeps
track of which `kraken` "txid" is mapped to our `reqid: int`; mainly
this was needed for cancel requests which require knowing the underlying
`txid`s (since apparently kraken doesn't keep track of the "reqid"  we
pass it). Pass the ws instance into `handle_order_updates()` to enable
the cancelling orders on startup. Don't key error on unknown `reqid`
values (for eg. when receiving historical trade events on startup).
Handle cancel requests first in the ems side loop.
2022-07-08 23:10:25 -04:00
Tyler Goodlet 8984c1b60b Use `aclosing()` around ws async gen 2022-07-08 19:14:45 -04:00
Tyler Goodlet 2e3cac1407 Add some todo-reminders for ``msgspec`` stuff 2022-07-08 19:14:45 -04:00
Tyler Goodlet b250a48b8d Drop remaining `BaseModel` api usage from rest of codebase 2022-07-08 19:14:43 -04:00
Tyler Goodlet 3dfe7ef8dd Add `Struct.copy()` which does a rountrip validate 2022-07-08 19:14:11 -04:00
Tyler Goodlet 2774a7e6ec Change all clearing msgs over to `msgspec` 2022-07-08 19:14:11 -04:00
Tyler Goodlet 7b6318f025 Cast slots to `int` before range set 2022-07-08 19:14:11 -04:00
Tyler Goodlet b0ee764423 Drop pydantic from allocator 2022-07-08 19:14:11 -04:00
Tyler Goodlet a63f0ee1c0 Add a custom `msgspec.Struct` with some humanizing 2022-07-08 19:14:11 -04:00
Tyler Goodlet 7d49335f8b Lol, gotta `float()` that vlm before `*` XD 2022-07-08 19:14:11 -04:00
Tyler Goodlet 9d176c2dda Remove `BaseModel` use from all dataclass-like uses 2022-07-08 19:14:11 -04:00
Tyler Goodlet 6cc02bd8f5 Use struct for shm tokens 2022-07-08 19:14:11 -04:00
Tyler Goodlet 693c7ce12a Use our struct for kraken `Pair` type 2022-07-08 19:14:11 -04:00
Tyler Goodlet 5e60c79664 Drop `pydantic` from service mngr 2022-07-08 19:14:11 -04:00
Tyler Goodlet 7f779dda19 Use our struct in binance backend 2022-07-08 19:14:11 -04:00
Tyler Goodlet 5100036e10 Pass our manually mapped `reqid: int` to EMS
Since we seem to always be able to get back the `reqid`/`userref` value
we send to kraken ws endpoints, we can use this as our brokerd side
order id and avoid all race cases with getting the true `txid` value
that `kraken` assigns (and which changes when you do "edits"
:eyeroll:). This simplifies status updates by allowing our relay loop
just to pass back our generated `.reqid` verbatim and allows responding
with a `BrokerdOrderAck` immediately in the request handler task which
should guarantee there are no further race conditions with the relay
loop and mapping `txid`s from kraken.. and figuring out wtf to do when
they change, etc.
2022-07-08 19:13:32 -04:00
Tyler Goodlet 78b9d90202 Add ledger and `pps.toml` snippets 2022-07-08 19:13:32 -04:00
Tyler Goodlet 9300b3d6db Try out a backend readme 2022-07-08 19:13:32 -04:00
Tyler Goodlet 6d13c8255f Don't require an ems msg symbol on error statuses 2022-07-08 19:13:32 -04:00
Tyler Goodlet 3765c61f2d Update ledger *after* pps updates from new trades
Addressing same issue as in #350 where we need to compute position
updates using the *first read* from the ledger **before** we update it
to make sure `Position.lifo_update()` gets called and **not skipped**
because new trades were read as clears entries but haven't actually been
included in update calcs yet.. aka we call `Position.lifo_update()`.

Main change here is to convert `update_ledger()` into a context mngr so
that the ledger write is committed after pps updates using
`pp.update_pps_conf()`..

This is basically a hotfix to #346 as well.
2022-07-08 19:13:32 -04:00
Tyler Goodlet cb7a9b9449 Factor status handling into a new `process_status()` helper 2022-07-08 19:13:32 -04:00
Tyler Goodlet f1192dff09 Factor msg loop into new func: `handle_order_updates()` 2022-07-08 19:13:32 -04:00
Tyler Goodlet 9e8d32cdff Drop uneeded count-sequencec verification 2022-07-08 19:13:32 -04:00
Tyler Goodlet c74741228f Get order "editing" working fully
Turns out the EMS can support this as originally expected: you can
update a `brokerd`-side `.reqid` through a `BrokerdAck` msg and the ems
which update its cross-dialog (leg) tracking correctly! The issue was
a bug in the `editOrderStatus` msg handling and appropriate tracking
of the correct `.oid` (ems uid) on the kraken side. This unfortunately
required adding a `emsflow: dict[str, list[BrokerdOrder]]` msg flow
tracing table which means the broker daemon is tracking all the msg flow
with the ems, though I'm wondering now if this is just good practise
anyway and maybe we should offer a small primitive type from our msging
utils to aid with this? I've used such constructs in event handling
systems prior.

There's a lot more factoring that can be done after these changes as
well but the quick detailed summary is,
- rework the `handle_order_requests()` loop to use `match:` syntax and
  update the new `emsflow` table on every new request from the ems.
- fix the `editOrderStatus` case pattern to not include an error msg and
  thus actually be triggered to respond to the ems with a `BrokerdAck`
  containing the new `.reqid`, the new kraken side `txid`.
- skip any `openOrders` msgs which are detected as being kraken's
  internal order "edits" by matching on the `cancel_reason` field.
- update the `emsflow` table in all ws-stream msg handling blocks
  with responses sent to the ems.

Relates to #290
2022-07-08 19:13:32 -04:00
Tyler Goodlet f38eef2bf4 Make ems relay loop report on brokerd `.reqid` changes 2022-07-08 19:13:32 -04:00
Tyler Goodlet e757e1f277 Use `match:` syntax in data feed subs processing 2022-07-08 19:13:32 -04:00
Tyler Goodlet 4823f87422 First draft, working WS based order management
Move to using the websocket API for all order control ops and dropping
the sync rest api approach which resulted in a bunch of buggy races.
Further this gets us must faster (batch) order cancellation for free
and a simpler ems request handler loop. We now heavily leverage the new
py3.10 `match:` syntax for all kraken-side API msg parsing and
processing and handle both the `openOrders` and `ownTrades` subscription
streams.

We also block "order editing" (by immediate cancellation) for now since
the EMS isn't entirely yet equipped to handle brokerd side `.reqid`
changes (which is how kraken implements so called order "updates" or
"edits") for a given order-request dialog and we may want to even
consider just implementing "updates" ourselves via independent cancel
and submit requests? Definitely something to ponder. Alternatively we
can "masquerade" such updates behind the count-style `.oid` remapping we
had to implement anyway (kraken's limitation) and maybe everything will
just work?

Further details in this patch:
- create 2 tables for tracking the EMS's `.oid` (uui4) value to `int`s
  that kraken expects (for `reqid`s): `ids` and `reqmsgs` which enable
  local lookup of ems uids to piker-backend-client-side request ids and
  received order messages.
- add `openOrders` sub support which more or less directly relays to
  equivalent `BrokerdStatus` updates and calc the `.filled` and
  `.remaining` values based on cleared vlm updates.
- add handler blocks for `[add/edit/cancel]OrderStatus` events including
  error msg cases.
- don't do any order request response processing in
  `handle_order_requests()` since responses are always received via one
  (or both?) of the new ws subs: `ownTrades` and `openOrders` and thus
  such msgs are now handled in the response relay loop.

Relates to #290
Resolves #310, #296
2022-07-08 19:13:32 -04:00
19 changed files with 901 additions and 472 deletions

View File

@ -22,10 +22,10 @@ from typing import Optional, Union, Callable, Any
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from collections import defaultdict from collections import defaultdict
from pydantic import BaseModel from msgspec import Struct
import tractor
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor
from .log import get_logger, get_console_log from .log import get_logger, get_console_log
from .brokers import get_brokermod from .brokers import get_brokermod
@ -47,16 +47,13 @@ _root_modules = [
] ]
class Services(BaseModel): class Services(Struct):
actor_n: tractor._supervise.ActorNursery actor_n: tractor._supervise.ActorNursery
service_n: trio.Nursery service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {} service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}
class Config:
arbitrary_types_allowed = True
async def start_service_task( async def start_service_task(
self, self,
name: str, name: str,

View File

@ -34,13 +34,13 @@ from fuzzywuzzy import process as fuzzy
import numpy as np import numpy as np
import tractor import tractor
from pydantic.dataclasses import dataclass from pydantic.dataclasses import dataclass
from pydantic import BaseModel
import wsproto import wsproto
from .._cacheables import open_cached_client from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound from ._util import resproc, SymbolNotFound
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ShmArray from ..data import ShmArray
from ..data.types import Struct
from ..data._web_bs import open_autorecon_ws, NoBsWs from ..data._web_bs import open_autorecon_ws, NoBsWs
log = get_logger(__name__) log = get_logger(__name__)
@ -79,12 +79,14 @@ _show_wap_in_history = False
# https://binance-docs.github.io/apidocs/spot/en/#exchange-information # https://binance-docs.github.io/apidocs/spot/en/#exchange-information
class Pair(BaseModel): class Pair(Struct, frozen=True):
symbol: str symbol: str
status: str status: str
baseAsset: str baseAsset: str
baseAssetPrecision: int baseAssetPrecision: int
cancelReplaceAllowed: bool
allowTrailingStop: bool
quoteAsset: str quoteAsset: str
quotePrecision: int quotePrecision: int
quoteAssetPrecision: int quoteAssetPrecision: int
@ -287,7 +289,7 @@ async def get_client() -> Client:
# validation type # validation type
class AggTrade(BaseModel): class AggTrade(Struct):
e: str # Event type e: str # Event type
E: int # Event time E: int # Event time
s: str # Symbol s: str # Symbol
@ -341,7 +343,9 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
elif msg.get('e') == 'aggTrade': elif msg.get('e') == 'aggTrade':
# validate # NOTE: this is purely for a definition, ``msgspec.Struct``
# does not runtime-validate until you decode/encode.
# see: https://jcristharif.com/msgspec/structs.html#type-validation
msg = AggTrade(**msg) msg = AggTrade(**msg)
# TODO: type out and require this quote format # TODO: type out and require this quote format
@ -352,8 +356,8 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
'brokerd_ts': time.time(), 'brokerd_ts': time.time(),
'ticks': [{ 'ticks': [{
'type': 'trade', 'type': 'trade',
'price': msg.p, 'price': float(msg.p),
'size': msg.q, 'size': float(msg.q),
'broker_ts': msg.T, 'broker_ts': msg.T,
}], }],
} }
@ -448,7 +452,7 @@ async def stream_quotes(
d = cache[sym.upper()] d = cache[sym.upper()]
syminfo = Pair(**d) # validation syminfo = Pair(**d) # validation
si = sym_infos[sym] = syminfo.dict() si = sym_infos[sym] = syminfo.to_dict()
# XXX: after manually inspecting the response format we # XXX: after manually inspecting the response format we
# just directly pick out the info we need # just directly pick out the info we need

View File

@ -148,7 +148,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'No account found: `{account}` ?', reason=f'No account found: `{account}` ?',
).dict()) ))
continue continue
client = _accounts2clients.get(account) client = _accounts2clients.get(account)
@ -161,7 +161,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'No api client loaded for account: `{account}` ?', reason=f'No api client loaded for account: `{account}` ?',
).dict()) ))
continue continue
if action in {'buy', 'sell'}: if action in {'buy', 'sell'}:
@ -188,7 +188,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason='Order already active?', reason='Order already active?',
).dict()) ))
# deliver ack that order has been submitted to broker routing # deliver ack that order has been submitted to broker routing
await ems_order_stream.send( await ems_order_stream.send(
@ -197,9 +197,8 @@ async def handle_order_requests(
oid=order.oid, oid=order.oid,
# broker specific request id # broker specific request id
reqid=reqid, reqid=reqid,
time_ns=time.time_ns(),
account=account, account=account,
).dict() )
) )
elif action == 'cancel': elif action == 'cancel':
@ -559,7 +558,7 @@ async def trades_dialogue(
cids2pps, cids2pps,
validate=True, validate=True,
) )
all_positions.extend(msg.dict() for msg in msgs) all_positions.extend(msg for msg in msgs)
if not all_positions and cids2pps: if not all_positions and cids2pps:
raise RuntimeError( raise RuntimeError(
@ -665,7 +664,7 @@ async def emit_pp_update(
msg = msgs[0] msg = msgs[0]
break break
await ems_stream.send(msg.dict()) await ems_stream.send(msg)
async def deliver_trade_events( async def deliver_trade_events(
@ -743,7 +742,7 @@ async def deliver_trade_events(
broker_details={'name': 'ib'}, broker_details={'name': 'ib'},
) )
await ems_stream.send(msg.dict()) await ems_stream.send(msg)
case 'fill': case 'fill':
@ -803,7 +802,7 @@ async def deliver_trade_events(
broker_time=trade_entry['broker_time'], broker_time=trade_entry['broker_time'],
) )
await ems_stream.send(msg.dict()) await ems_stream.send(msg)
# 2 cases: # 2 cases:
# - fill comes first or # - fill comes first or
@ -879,7 +878,7 @@ async def deliver_trade_events(
cid, msg = pack_position(item) cid, msg = pack_position(item)
# acctid = msg.account = accounts_def.inverse[msg.account] # acctid = msg.account = accounts_def.inverse[msg.account]
# cuck ib and it's shitty fifo sys for pps! # cuck ib and it's shitty fifo sys for pps!
# await ems_stream.send(msg.dict()) # await ems_stream.send(msg)
case 'event': case 'event':
@ -891,7 +890,7 @@ async def deliver_trade_events(
# level... # level...
# reqid = item.get('reqid', 0) # reqid = item.get('reqid', 0)
# if getattr(msg, 'reqid', 0) < -1: # if getattr(msg, 'reqid', 0) < -1:
# log.info(f"TWS triggered trade\n{pformat(msg.dict())}") # log.info(f"TWS triggered trade\n{pformat(msg)}")
# msg.reqid = 'tws-' + str(-1 * reqid) # msg.reqid = 'tws-' + str(-1 * reqid)

View File

@ -0,0 +1,64 @@
``kraken`` backend
------------------
though they don't have the most liquidity of all the cexes they sure are
accommodating to those of us who appreciate a little ``xmr``.
status
******
current support is *production grade* and both real-time data and order
management should be correct and fast. this backend is used by core devs
for live trading.
config
******
In order to get order mode support your ``brokers.toml``
needs to have something like the following:
.. code:: toml
[kraken]
accounts.spot = 'spot'
key_descr = "spot"
api_key = "69696969696969696696969696969696969696969696969696969696"
secret = "BOOBSBOOBSBOOBSBOOBSBOOBSSMBZ69696969696969669969696969696"
If everything works correctly you should see any current positions
loaded in the pps pane on chart load and you should also be able to
check your trade records in the file::
<pikerk_conf_dir>/ledgers/trades_kraken_spot.toml
An example ledger file will have entries written verbatim from the
trade events schema:
.. code:: toml
[TFJBKK-SMBZS-VJ4UWS]
ordertxid = "SMBZSA-7CNQU-3HWLNJ"
postxid = "SMBZSE-M7IF5-CFI7LT"
pair = "XXMRZEUR"
time = 1655691993.4133966
type = "buy"
ordertype = "limit"
price = "103.97000000"
cost = "499.99999977"
fee = "0.80000000"
vol = "4.80907954"
margin = "0.00000000"
misc = ""
your ``pps.toml`` file will have position entries like,
.. code:: toml
[kraken.spot."xmreur.kraken"]
size = 4.80907954
be_price = 103.97000000
bsuid = "XXMRZEUR"
clears = [
{ tid = "TFJBKK-SMBZS-VJ4UWS", cost = 0.8, price = 103.97, size = 4.80907954, dt = "2022-05-20T02:26:33.413397+00:00" },
]

View File

@ -282,6 +282,7 @@ class Client:
"volume": str(size), "volume": str(size),
} }
return await self.endpoint('AddOrder', data) return await self.endpoint('AddOrder', data)
else: else:
# Edit order data for kraken api # Edit order data for kraken api
data["txid"] = reqid data["txid"] = reqid

View File

@ -18,21 +18,23 @@
Order api and machinery Order api and machinery
''' '''
from contextlib import asynccontextmanager as acm from contextlib import (
asynccontextmanager as acm,
contextmanager as cm,
)
from functools import partial from functools import partial
from itertools import chain from itertools import chain, count
from pprint import pformat from pprint import pformat
import time import time
from typing import ( from typing import (
Any, Any,
AsyncIterator, AsyncIterator,
# Callable, Union,
# Optional,
# Union,
) )
from async_generator import aclosing
from bidict import bidict
import pendulum import pendulum
from pydantic import BaseModel
import trio import trio
import tractor import tractor
import wsproto import wsproto
@ -61,179 +63,137 @@ from .feed import (
stream_messages, stream_messages,
) )
MsgUnion = Union[
class Trade(BaseModel): BrokerdCancel,
''' BrokerdError,
Trade class that helps parse and validate ownTrades stream BrokerdFill,
BrokerdOrder,
''' BrokerdOrderAck,
reqid: str # kraken order transaction id BrokerdPosition,
action: str # buy or sell BrokerdStatus,
price: float # price of asset ]
size: float # vol of asset
broker_time: str # e.g GTC, GTD
async def handle_order_requests( async def handle_order_requests(
ws: NoBsWs,
client: Client, client: Client,
ems_order_stream: tractor.MsgStream, ems_order_stream: tractor.MsgStream,
token: str,
emsflow: dict[str, list[MsgUnion]],
ids: bidict[str, int],
reqids2txids: dict[int, str],
) -> None: ) -> None:
'''
Process new order submission requests from the EMS
and deliver acks or errors.
request_msg: dict '''
# XXX: UGH, let's unify this.. with ``msgspec``.
msg: dict[str, Any]
order: BrokerdOrder order: BrokerdOrder
counter = count()
async for request_msg in ems_order_stream: async for msg in ems_order_stream:
log.info( log.info(f'Rx order msg:\n{pformat(msg)}')
'Received order request:\n' match msg:
f'{pformat(request_msg)}' case {
) 'action': 'cancel',
}:
cancel = BrokerdCancel(**msg)
last = emsflow[cancel.oid]
reqid = ids[cancel.oid]
txid = reqids2txids[reqid]
action = request_msg['action'] # call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [txid], # should be txid from submission
})
if action in {'buy', 'sell'}: case {
'account': 'kraken.spot' as account,
'action': action,
} if action in {'buy', 'sell'}:
account = request_msg['account'] # validate
if account != 'kraken.spot': order = BrokerdOrder(**msg)
log.error(
'This is a kraken account, \
only a `kraken.spot` selection is valid'
)
await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
# reason=f'Kraken only, No account found: `{account}` ?', # logic from old `Client.submit_limit()`
reason=( if order.oid in ids:
'Kraken only, order mode disabled due to ' ep = 'editOrder'
'https://github.com/pikers/piker/issues/299' reqid = ids[order.oid] # integer not txid
), txid = reqids2txids[reqid]
extra = {
'orderid': txid, # txid
}
).dict())
continue
# validate
order = BrokerdOrder(**request_msg)
# call our client api to submit the order
resp = await client.submit_limit(
symbol=order.symbol,
price=order.price,
action=order.action,
size=order.size,
reqid=order.reqid,
)
err = resp['error']
if err:
oid = order.oid
log.error(f'Failed to submit order: {oid}')
await ems_order_stream.send(
BrokerdError(
oid=order.oid,
reqid=order.reqid,
symbol=order.symbol,
reason="Failed order submission",
broker_details=resp
).dict()
)
else:
# TODO: handle multiple orders (cancels?)
# txid is an array of strings
if order.reqid is None:
reqid = resp['result']['txid'][0]
else: else:
# update the internal pairing of oid to krakens ep = 'addOrder'
# txid with the new txid that is returned on edit reqid = next(counter)
reqid = resp['result']['txid'] ids[order.oid] = reqid
log.debug(
f"Adding order {reqid}\n"
f'{ids}'
)
extra = {
'ordertype': 'limit',
'type': order.action,
}
# deliver ack that order has been submitted to broker routing psym = order.symbol.upper()
await ems_order_stream.send( pair = f'{psym[:3]}/{psym[3:]}'
BrokerdOrderAck(
# ems order request id # call ws api to submit the order:
oid=order.oid, # https://docs.kraken.com/websockets/#message-addOrder
req = {
'event': ep,
'token': token,
# broker specific request id 'reqid': reqid, # remapped-to-int uid from ems
reqid=reqid, 'pair': pair,
'price': str(order.price),
'volume': str(order.size),
# account the made the order # only ensures request is valid, nothing more
account=order.account # validate: 'true',
).dict() } | extra
log.info(f'Submitting WS order request:\n{pformat(req)}')
await ws.send_msg(req)
resp = BrokerdOrderAck(
oid=order.oid, # ems order request id
reqid=reqid, # our custom int mapping
account=account, # piker account
) )
await ems_order_stream.send(resp)
elif action == 'cancel': # placehold for sanity checking in relay loop
msg = BrokerdCancel(**request_msg) emsflow.setdefault(order.oid, []).append(order)
# Send order cancellation to kraken case _:
resp = await client.submit_cancel( account = msg.get('account')
reqid=msg.reqid if account != 'kraken.spot':
) log.error(
'This is a kraken account, \
# Check to make sure there was no error returned by only a `kraken.spot` selection is valid'
# the kraken endpoint. Assert one order was cancelled. )
try:
result = resp['result']
count = result['count']
# check for 'error' key if we received no 'result'
except KeyError:
error = resp.get('error')
await ems_order_stream.send( await ems_order_stream.send(
BrokerdError( BrokerdError(
oid=msg.oid, oid=msg['oid'],
reqid=msg.reqid, symbol=msg['symbol'],
symbol=msg.symbol, reason=(
reason="Failed order cancel", 'Invalid request msg:\n{msg}'
broker_details=resp ),
).dict()
)
if not error:
raise BrokerError(f'Unknown order cancel response: {resp}')
else:
if not count: # no orders were cancelled?
# XXX: what exactly is this from and why would we care?
# there doesn't seem to be any docs here?
# https://docs.kraken.com/rest/#operation/cancelOrder
# Check to make sure the cancellation is NOT pending,
# then send the confirmation to the ems order stream
pending = result.get('pending')
if pending:
log.error(f'Order {oid} cancel was not yet successful')
await ems_order_stream.send(
BrokerdError(
oid=msg.oid,
reqid=msg.reqid,
symbol=msg.symbol,
# TODO: maybe figure out if pending
# cancels will eventually get cancelled
reason="Order cancel is still pending?",
broker_details=resp
).dict()
)
else: # order cancel success case.
await ems_order_stream.send(
BrokerdStatus(
reqid=msg.reqid,
account=msg.account,
time_ns=time.time_ns(),
status='cancelled',
reason='Order cancelled',
broker_details={'name': 'kraken'}
).dict()
) )
else: )
log.error(f'Unknown order command: {request_msg}')
@acm @acm
@ -310,13 +270,13 @@ async def trades_dialogue(
log.info( log.info(
f'Loaded {len(trades)} trades from account `{acc_name}`' f'Loaded {len(trades)} trades from account `{acc_name}`'
) )
trans = await update_ledger(acctid, trades) with open_ledger(acctid, trades) as trans:
active, closed = pp.update_pps_conf( active, closed = pp.update_pps_conf(
'kraken', 'kraken',
acctid, acctid,
trade_records=trans, trade_records=trans,
ledger_reload={}.fromkeys(t.bsuid for t in trans), ledger_reload={}.fromkeys(t.bsuid for t in trans),
) )
position_msgs: list[dict] = [] position_msgs: list[dict] = []
pps: dict[int, pp.Position] pps: dict[int, pp.Position]
@ -330,7 +290,7 @@ async def trades_dialogue(
avg_price=p.be_price, avg_price=p.be_price,
currency='', currency='',
) )
position_msgs.append(msg.dict()) position_msgs.append(msg)
await ctx.started( await ctx.started(
(position_msgs, [acc_name]) (position_msgs, [acc_name])
@ -357,135 +317,441 @@ async def trades_dialogue(
), ),
) as ws, ) as ws,
trio.open_nursery() as n, trio.open_nursery() as n,
aclosing(stream_messages(ws)) as stream,
): ):
# task local msg dialog tracking
emsflow: dict[
str,
list[MsgUnion],
] = {}
# 2way map for ems ids to kraken int reqids..
ids: bidict[str, int] = bidict()
reqids2txids: dict[int, str] = {}
# task for processing inbound requests from ems # task for processing inbound requests from ems
n.start_soon(handle_order_requests, client, ems_stream) n.start_soon(
handle_order_requests,
ws,
client,
ems_stream,
token,
emsflow,
ids,
reqids2txids,
)
count: int = 0 # enter relay loop
await handle_order_updates(
ws,
stream,
ems_stream,
emsflow,
ids,
reqids2txids,
trans,
acctid,
acc_name,
token,
)
# process and relay trades events to ems
async def handle_order_updates(
ws: NoBsWs,
ws_stream: AsyncIterator,
ems_stream: tractor.MsgStream,
emsflow: dict[str, list[MsgUnion]],
ids: bidict[str, int],
reqids2txids: dict[int, str],
trans: list[pp.Transaction],
acctid: str,
acc_name: str,
token: str,
) -> None:
'''
Main msg handling loop for all things order management.
This code is broken out to make the context explicit and state variables
defined in the signature clear to the reader.
'''
# transaction records which will be updated
# on new trade clearing events (aka order "fills")
trans: list[pp.Transaction]
async for msg in ws_stream:
match msg:
# process and relay clearing trade events to ems
# https://docs.kraken.com/websockets/#message-ownTrades # https://docs.kraken.com/websockets/#message-ownTrades
async for msg in stream_messages(ws): case [
match msg: trades_msgs,
case [ 'ownTrades',
trades_msgs, # won't exist for historical values?
'ownTrades', # 'userref': reqid,
{'sequence': seq}, {'sequence': seq},
]: ]:
# XXX: do we actually need this orrr? # flatten msgs to an {id -> data} table for processing
# ensure that we are only processing new trades? trades = {
assert seq > count tid: trade
count += 1 for entry in trades_msgs
for (tid, trade) in entry.items()
# flatten msgs for processing # only emit entries which are already not-in-ledger
trades = { if tid not in {r.tid for r in trans}
tid: trade }
for entry in trades_msgs for tid, trade in trades.items():
for (tid, trade) in entry.items()
# only emit entries which are already not-in-ledger # NOTE: try to get the requid sent in the order
if tid not in {r.tid for r in trans} # request message if posssible; it may not be
} # provided since this sub also returns generic
for tid, trade in trades.items(): # historical trade events.
reqid = trade.get('userref', trade['ordertxid'])
# parse-cast action = trade['type']
reqid = trade['ordertxid'] price = float(trade['price'])
action = trade['type'] size = float(trade['vol'])
price = float(trade['price']) broker_time = float(trade['time'])
size = float(trade['vol'])
broker_time = float(trade['time']) # send a fill msg for gui update
fill_msg = BrokerdFill(
reqid=reqid,
time_ns=time.time_ns(),
action=action,
size=size,
price=price,
# TODO: maybe capture more msg data
# i.e fees?
broker_details={'name': 'kraken'},
broker_time=broker_time
)
await ems_stream.send(fill_msg)
filled_msg = BrokerdStatus(
reqid=reqid,
time_ns=time.time_ns(),
account=acc_name,
status='filled',
filled=size,
reason='Order filled by kraken',
broker_details={
'name': 'kraken',
'broker_time': broker_time
},
# TODO: figure out if kraken gives a count
# of how many units of underlying were
# filled. Alternatively we can decrement
# this value ourselves by associating and
# calcing from the diff with the original
# client-side request, see:
# https://github.com/pikers/piker/issues/296
remaining=0,
)
await ems_stream.send(filled_msg)
# update ledger and position tracking
with open_ledger(acctid, trades) as trans:
active, closed = pp.update_pps_conf(
'kraken',
acctid,
trade_records=trans,
ledger_reload={}.fromkeys(
t.bsuid for t in trans),
)
# emit any new pp msgs to ems
for pos in filter(
bool,
chain(active.values(), closed.values()),
):
pp_msg = BrokerdPosition(
broker='kraken',
# XXX: ok so this is annoying, we're
# relaying an account name with the
# backend suffix prefixed but when
# reading accounts from ledgers we
# don't need it and/or it's prefixed
# in the section table.. we should
# just strip this from the message
# right since `.broker` is already
# included?
account=f'kraken.{acctid}',
symbol=pos.symbol.front_fqsn(),
size=pos.size,
avg_price=pos.be_price,
# TODO
# currency=''
)
await ems_stream.send(pp_msg)
# process and relay order state change events
# https://docs.kraken.com/websockets/#message-openOrders
case [
order_msgs,
'openOrders',
{'sequence': seq},
]:
for order_msg in order_msgs:
log.info(
f'Order msg update_{seq}:\n'
f'{pformat(order_msg)}'
)
txid, update_msg = list(order_msg.items())[0]
match update_msg:
# we ignore internal order updates triggered by
# kraken's "edit" endpoint.
case {
'cancel_reason': 'Order replaced',
'status': status,
'userref': reqid,
**rest,
}:
continue
case {
'status': status,
'userref': reqid,
**rest,
# XXX: eg. of remaining msg schema:
# 'avg_price': _,
# 'cost': _,
# 'descr': {
# 'close': None,
# 'leverage': None,
# 'order': descr,
# 'ordertype': 'limit',
# 'pair': 'XMR/EUR',
# 'price': '74.94000000',
# 'price2': '0.00000000',
# 'type': 'buy'
# },
# 'expiretm': None,
# 'fee': '0.00000000',
# 'limitprice': '0.00000000',
# 'misc': '',
# 'oflags': 'fciq',
# 'opentm': '1656966131.337344',
# 'refid': None,
# 'starttm': None,
# 'stopprice': '0.00000000',
# 'timeinforce': 'GTC',
# 'vol': submit_vlm, # '13.34400854',
# 'vol_exec': exec_vlm, # 0.0000
}:
ems_status = {
'open': 'submitted',
'closed': 'cancelled',
'canceled': 'cancelled',
# do we even need to forward
# this state to the ems?
'pending': 'pending',
}[status]
submit_vlm = rest.get('vol', 0)
exec_vlm = rest.get('vol_exec', 0)
reqids2txids[reqid] = txid
oid = ids.inverse.get(reqid)
if not oid:
# TODO: handle these and relay them
# through the EMS to the client / UI
# side!
log.warning(
f'Received active order {txid}:\n'
f'{update_msg}\n'
'Cancelling order for now!..'
)
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [txid],
})
continue
msgs = emsflow[oid]
# send BrokerdStatus messages for all
# order state updates
resp = BrokerdStatus(
# send a fill msg for gui update
fill_msg = BrokerdFill(
reqid=reqid, reqid=reqid,
time_ns=time.time_ns(), time_ns=time.time_ns(), # cuz why not
action=action,
size=size,
price=price,
# TODO: maybe capture more msg data
# i.e fees?
broker_details={'name': 'kraken'},
broker_time=broker_time
)
await ems_stream.send(fill_msg.dict())
filled_msg = BrokerdStatus(
reqid=reqid,
time_ns=time.time_ns(),
account=acc_name,
status='filled',
filled=size,
reason='Order filled by kraken',
broker_details={
'name': 'kraken',
'broker_time': broker_time
},
# TODO: figure out if kraken gives a count
# of how many units of underlying were
# filled. Alternatively we can decrement
# this value ourselves by associating and
# calcing from the diff with the original
# client-side request, see:
# https://github.com/pikers/piker/issues/296
remaining=0,
)
await ems_stream.send(filled_msg.dict())
# update ledger and position tracking
trans = await update_ledger(acctid, trades)
active, closed = pp.update_pps_conf(
'kraken',
acctid,
trade_records=trans,
ledger_reload={}.fromkeys(
t.bsuid for t in trans),
)
# emit pp msgs
for pos in filter(
bool,
chain(active.values(), closed.values()),
):
pp_msg = BrokerdPosition(
broker='kraken',
# XXX: ok so this is annoying, we're
# relaying an account name with the
# backend suffix prefixed but when
# reading accounts from ledgers we
# don't need it and/or it's prefixed
# in the section table.. we should
# just strip this from the message
# right since `.broker` is already
# included?
account=f'kraken.{acctid}', account=f'kraken.{acctid}',
symbol=pos.symbol.front_fqsn(),
size=pos.size,
avg_price=pos.be_price,
# TODO # everyone doin camel case..
# currency='' status=ems_status, # force lower case
filled=exec_vlm,
reason='', # why held?
remaining=(
float(submit_vlm)
-
float(exec_vlm)
),
broker_details=dict(
{'name': 'kraken'}, **update_msg
),
) )
await ems_stream.send(pp_msg.dict()) msgs.append(resp)
await ems_stream.send(resp)
case [ case _:
trades_msgs, log.warning(
'openOrders', 'Unknown orders msg:\n'
{'sequence': seq}, f'{txid}:{order_msg}'
]: )
# TODO: async order update handling which we
# should remove from `handle_order_requests()`
# above:
# https://github.com/pikers/piker/issues/293
# https://github.com/pikers/piker/issues/310
log.info(f'Order update {seq}:{trades_msgs}')
case _: case {
log.warning(f'Unhandled trades msg: {msg}') 'event': etype,
await tractor.breakpoint() 'status': status,
'reqid': reqid,
**rest,
} as event if (
etype in {
'addOrderStatus',
'editOrderStatus',
'cancelOrderStatus',
}
):
oid = ids.inverse.get(reqid)
if not oid:
log.warning(
'Unknown order status update?:\n'
f'{event}'
)
continue
txid = rest.get('txid')
if txid:
reqids2txids[reqid] = txid
msgs = emsflow[oid]
last = msgs[-1]
resps, errored = process_status(
event,
oid,
token,
msgs,
last,
)
if resps:
msgs.extend(resps)
for resp in resps:
await ems_stream.send(resp)
case _:
log.warning(f'Unhandled trades update msg: {msg}')
def process_status(
event: dict[str, str],
oid: str,
token: str,
msgs: list[MsgUnion],
last: MsgUnion,
) -> tuple[list[MsgUnion], bool]:
'''
Process `'[add/edit/cancel]OrderStatus'` events by translating to
and returning the equivalent EMS-msg responses.
'''
match event:
case {
'event': etype,
'status': 'error',
'reqid': reqid,
'errorMessage': errmsg,
}:
# any of ``{'add', 'edit', 'cancel'}``
action = etype.rstrip('OrderStatus')
log.error(
f'Failed to {action} order {reqid}:\n'
f'{errmsg}'
)
resp = BrokerdError(
oid=oid,
# XXX: use old reqid in case it changed?
reqid=reqid,
symbol=getattr(last, 'symbol', 'N/A'),
reason=f'Failed {action}:\n{errmsg}',
broker_details=event
)
return [resp], True
# successful request cases
case {
'event': 'addOrderStatus',
'status': "ok",
'reqid': reqid, # oid from ems side
'txid': txid,
'descr': descr, # only on success?
}:
log.info(
f'Submitting order: {descr}\n'
f'ems oid: {oid}\n'
f're-mapped reqid: {reqid}\n'
f'txid: {txid}\n'
)
return [], False
case {
'event': 'editOrderStatus',
'status': "ok",
'reqid': reqid, # oid from ems side
'descr': descr,
# NOTE: for edit request this is a new value
'txid': txid,
'originaltxid': origtxid,
}:
log.info(
f'Editting order {oid}[requid={reqid}]:\n'
f'txid: {origtxid} -> {txid}\n'
f'{descr}'
)
# deliver another ack to update the ems-side `.reqid`.
return [], False
case {
"event": "cancelOrderStatus",
"status": "ok",
'reqid': reqid,
# XXX: sometimes this isn't provided!?
# 'txid': txids,
**rest,
}:
# TODO: should we support "batch" acking of
# multiple cancels thus avoiding the below loop?
resps: list[MsgUnion] = []
for txid in rest.get('txid', [last.reqid]):
resp = BrokerdStatus(
reqid=reqid,
account=last.account,
time_ns=time.time_ns(),
status='cancelled',
reason='Cancel success: {oid}@{txid}',
broker_details=event,
)
resps.append(resp)
return resps, False
def norm_trade_records( def norm_trade_records(
@ -494,10 +760,9 @@ def norm_trade_records(
) -> list[pp.Transaction]: ) -> list[pp.Transaction]:
records: list[pp.Transaction] = [] records: list[pp.Transaction] = []
for tid, record in ledger.items(): for tid, record in ledger.items():
size = record.get('vol') * { size = float(record.get('vol')) * {
'buy': 1, 'buy': 1,
'sell': -1, 'sell': -1,
}[record['type']] }[record['type']]
@ -508,7 +773,7 @@ def norm_trade_records(
pp.Transaction( pp.Transaction(
fqsn=f'{norm_sym}.kraken', fqsn=f'{norm_sym}.kraken',
tid=tid, tid=tid,
size=float(size), size=size,
price=float(record['price']), price=float(record['price']),
cost=float(record['fee']), cost=float(record['fee']),
dt=pendulum.from_timestamp(float(record['time'])), dt=pendulum.from_timestamp(float(record['time'])),
@ -522,19 +787,24 @@ def norm_trade_records(
return records return records
async def update_ledger( @cm
def open_ledger(
acctid: str, acctid: str,
trade_entries: list[dict[str, Any]], trade_entries: list[dict[str, Any]],
) -> list[pp.Transaction]: ) -> list[pp.Transaction]:
'''
Write recent session's trades to the user's (local) ledger file.
# write recent session's trades to the user's (local) ledger file. '''
with pp.open_trade_ledger( with pp.open_trade_ledger(
'kraken', 'kraken',
acctid, acctid,
) as ledger: ) as ledger:
ledger.update(trade_entries)
# normalize to transaction form # normalize to transaction form
records = norm_trade_records(trade_entries) records = norm_trade_records(trade_entries)
return records yield records
# update on exit
ledger.update(trade_entries)

View File

@ -31,7 +31,6 @@ import time
from fuzzywuzzy import process as fuzzy from fuzzywuzzy import process as fuzzy
import numpy as np import numpy as np
import pendulum import pendulum
from pydantic import BaseModel
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
import trio import trio
@ -45,6 +44,7 @@ from piker.brokers._util import (
) )
from piker.log import get_console_log from piker.log import get_console_log
from piker.data import ShmArray from piker.data import ShmArray
from piker.data.types import Struct
from piker.data._web_bs import open_autorecon_ws, NoBsWs from piker.data._web_bs import open_autorecon_ws, NoBsWs
from . import log from . import log
from .api import ( from .api import (
@ -54,7 +54,7 @@ from .api import (
# https://www.kraken.com/features/api#get-tradable-pairs # https://www.kraken.com/features/api#get-tradable-pairs
class Pair(BaseModel): class Pair(Struct):
altname: str # alternate pair name altname: str # alternate pair name
wsname: str # WebSocket pair name (if available) wsname: str # WebSocket pair name (if available)
aclass_base: str # asset class of base component aclass_base: str # asset class of base component
@ -117,9 +117,8 @@ async def stream_messages(
too_slow_count = 0 too_slow_count = 0
continue continue
if isinstance(msg, dict): match msg:
if msg.get('event') == 'heartbeat': case {'event': 'heartbeat'}:
now = time.time() now = time.time()
delay = now - last_hb delay = now - last_hb
last_hb = now last_hb = now
@ -130,11 +129,20 @@ async def stream_messages(
continue continue
err = msg.get('errorMessage') case {
if err: 'connectionID': _,
raise BrokerError(err) 'event': 'systemStatus',
else: 'status': 'online',
yield msg 'version': _,
} as msg:
log.info(
'WS connection is up:\n'
f'{msg}'
)
continue
case _:
yield msg
async def process_data_feed_msgs( async def process_data_feed_msgs(
@ -145,37 +153,60 @@ async def process_data_feed_msgs(
''' '''
async for msg in stream_messages(ws): async for msg in stream_messages(ws):
match msg:
case {
'errorMessage': errmsg
}:
raise BrokerError(errmsg)
chan_id, *payload_array, chan_name, pair = msg case {
'event': 'subscriptionStatus',
} as sub:
log.info(
'WS subscription is active:\n'
f'{sub}'
)
continue
if 'ohlc' in chan_name: case [
chan_id,
*payload_array,
chan_name,
pair
]:
if 'ohlc' in chan_name:
yield 'ohlc', OHLC(
chan_id,
chan_name,
pair,
*payload_array[0]
)
yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0]) elif 'spread' in chan_name:
elif 'spread' in chan_name: bid, ask, ts, bsize, asize = map(
float, payload_array[0])
bid, ask, ts, bsize, asize = map(float, payload_array[0]) # TODO: really makes you think IB has a horrible API...
quote = {
'symbol': pair.replace('/', ''),
'ticks': [
{'type': 'bid', 'price': bid, 'size': bsize},
{'type': 'bsize', 'price': bid, 'size': bsize},
# TODO: really makes you think IB has a horrible API... {'type': 'ask', 'price': ask, 'size': asize},
quote = { {'type': 'asize', 'price': ask, 'size': asize},
'symbol': pair.replace('/', ''), ],
'ticks': [ }
{'type': 'bid', 'price': bid, 'size': bsize}, yield 'l1', quote
{'type': 'bsize', 'price': bid, 'size': bsize},
{'type': 'ask', 'price': ask, 'size': asize}, # elif 'book' in msg[-2]:
{'type': 'asize', 'price': ask, 'size': asize}, # chan_id, *payload_array, chan_name, pair = msg
], # print(msg)
}
yield 'l1', quote
# elif 'book' in msg[-2]: case _:
# chan_id, *payload_array, chan_name, pair = msg print(f'UNHANDLED MSG: {msg}')
# print(msg) # yield msg
else:
print(f'UNHANDLED MSG: {msg}')
yield msg
def normalize( def normalize(
@ -316,7 +347,7 @@ async def stream_quotes(
sym = sym.upper() sym = sym.upper()
si = Pair(**await client.symbol_info(sym)) # validation si = Pair(**await client.symbol_info(sym)) # validation
syminfo = si.dict() syminfo = si.to_dict()
syminfo['price_tick_size'] = 1 / 10**si.pair_decimals syminfo['price_tick_size'] = 1 / 10**si.pair_decimals
syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals
syminfo['asset_type'] = 'crypto' syminfo['asset_type'] = 'crypto'
@ -385,7 +416,7 @@ async def stream_quotes(
msg_gen = process_data_feed_msgs(ws) msg_gen = process_data_feed_msgs(ws)
# TODO: use ``anext()`` when it lands in 3.10! # TODO: use ``anext()`` when it lands in 3.10!
typ, ohlc_last = await msg_gen.__anext__() typ, ohlc_last = await anext(msg_gen)
topic, quote = normalize(ohlc_last) topic, quote = normalize(ohlc_last)

View File

@ -22,10 +22,9 @@ from enum import Enum
from typing import Optional from typing import Optional
from bidict import bidict from bidict import bidict
from pydantic import BaseModel, validator
# from msgspec import Struct
from ..data._source import Symbol from ..data._source import Symbol
from ..data.types import Struct
from ..pp import Position from ..pp import Position
@ -41,33 +40,30 @@ SizeUnit = Enum(
) )
class Allocator(BaseModel): class Allocator(Struct):
class Config:
validate_assignment = True
copy_on_model_validation = False
arbitrary_types_allowed = True
# required to get the account validator lookup working?
extra = 'allow'
underscore_attrs_are_private = False
symbol: Symbol symbol: Symbol
account: Optional[str] = 'paper' account: Optional[str] = 'paper'
_size_units: bidict[str, Optional[str]] = _size_units
# TODO: for enums this clearly doesn't fucking work, you can't set # TODO: for enums this clearly doesn't fucking work, you can't set
# a default at startup by passing in a `dict` but yet you can set # a default at startup by passing in a `dict` but yet you can set
# that value through assignment..for wtv cucked reason.. honestly, pure # that value through assignment..for wtv cucked reason.. honestly, pure
# unintuitive garbage. # unintuitive garbage.
size_unit: str = 'currency' _size_unit: str = 'currency'
_size_units: dict[str, Optional[str]] = _size_units
@validator('size_unit', pre=True) @property
def maybe_lookup_key(cls, v): def size_unit(self) -> str:
# apply the corresponding enum key for the text "description" value return self._size_unit
@size_unit.setter
def size_unit(self, v: str) -> Optional[str]:
if v not in _size_units: if v not in _size_units:
return _size_units.inverse[v] v = _size_units.inverse[v]
assert v in _size_units assert v in _size_units
self._size_unit = v
return v return v
# TODO: if we ever want ot support non-uniform entry-slot-proportion # TODO: if we ever want ot support non-uniform entry-slot-proportion
@ -262,7 +258,7 @@ def mk_allocator(
# default allocation settings # default allocation settings
defaults: dict[str, float] = { defaults: dict[str, float] = {
'account': None, # select paper by default 'account': None, # select paper by default
'size_unit': 'currency', # 'size_unit': 'currency',
'units_limit': 400, 'units_limit': 400,
'currency_limit': 5e3, 'currency_limit': 5e3,
'slots': 4, 'slots': 4,
@ -301,6 +297,9 @@ def mk_allocator(
# entry step 1.0 # entry step 1.0
alloc.units_limit = alloc.slots alloc.units_limit = alloc.slots
else:
alloc.size_unit = 'currency'
# if the current position is already greater then the limit # if the current position is already greater then the limit
# settings, increase the limit to the current position # settings, increase the limit to the current position
if alloc.size_unit == 'currency': if alloc.size_unit == 'currency':

View File

@ -58,11 +58,11 @@ class OrderBook:
def send( def send(
self, self,
msg: Order, msg: Order | dict,
) -> dict: ) -> dict:
self._sent_orders[msg.oid] = msg self._sent_orders[msg.oid] = msg
self._to_ems.send_nowait(msg.dict()) self._to_ems.send_nowait(msg)
return msg return msg
def update( def update(
@ -73,9 +73,8 @@ class OrderBook:
) -> dict: ) -> dict:
cmd = self._sent_orders[uuid] cmd = self._sent_orders[uuid]
msg = cmd.dict() msg = cmd.copy(update=data)
msg.update(data) self._sent_orders[uuid] = msg
self._sent_orders[uuid] = Order(**msg)
self._to_ems.send_nowait(msg) self._to_ems.send_nowait(msg)
return cmd return cmd
@ -88,7 +87,7 @@ class OrderBook:
oid=uuid, oid=uuid,
symbol=cmd.symbol, symbol=cmd.symbol,
) )
self._to_ems.send_nowait(msg.dict()) self._to_ems.send_nowait(msg)
_orders: OrderBook = None _orders: OrderBook = None
@ -149,7 +148,7 @@ async def relay_order_cmds_from_sync_code(
book = get_orders() book = get_orders()
async with book._from_order_book.subscribe() as orders_stream: async with book._from_order_book.subscribe() as orders_stream:
async for cmd in orders_stream: async for cmd in orders_stream:
if cmd['symbol'] == symbol_key: if cmd.symbol == symbol_key:
log.info(f'Send order cmd:\n{pformat(cmd)}') log.info(f'Send order cmd:\n{pformat(cmd)}')
# send msg over IPC / wire # send msg over IPC / wire
await to_ems_stream.send(cmd) await to_ems_stream.send(cmd)

View File

@ -26,7 +26,6 @@ import time
from typing import AsyncIterator, Callable from typing import AsyncIterator, Callable
from bidict import bidict from bidict import bidict
from pydantic import BaseModel
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
@ -34,6 +33,7 @@ import tractor
from ..log import get_logger from ..log import get_logger
from ..data._normalize import iterticks from ..data._normalize import iterticks
from ..data.feed import Feed, maybe_open_feed from ..data.feed import Feed, maybe_open_feed
from ..data.types import Struct
from .._daemon import maybe_spawn_brokerd from .._daemon import maybe_spawn_brokerd
from . import _paper_engine as paper from . import _paper_engine as paper
from ._messages import ( from ._messages import (
@ -88,7 +88,8 @@ def mk_check(
@dataclass @dataclass
class _DarkBook: class _DarkBook:
'''EMS-trigger execution book. '''
EMS-trigger execution book.
Contains conditions for executions (aka "orders" or "triggers") Contains conditions for executions (aka "orders" or "triggers")
which are not exposed to brokers and thus the market; i.e. these are which are not exposed to brokers and thus the market; i.e. these are
@ -231,7 +232,7 @@ async def clear_dark_triggers(
price=submit_price, price=submit_price,
size=cmd['size'], size=cmd['size'],
) )
await brokerd_orders_stream.send(msg.dict()) await brokerd_orders_stream.send(msg)
# mark this entry as having sent an order # mark this entry as having sent an order
# request. the entry will be replaced once the # request. the entry will be replaced once the
@ -247,14 +248,11 @@ async def clear_dark_triggers(
msg = Status( msg = Status(
oid=oid, # ems order id oid=oid, # ems order id
resp=resp,
time_ns=time.time_ns(), time_ns=time.time_ns(),
symbol=fqsn, resp=resp,
trigger_price=price, trigger_price=price,
broker_details={'name': broker}, brokerd_msg=cmd,
cmd=cmd, # original request message )
).dict()
# remove exec-condition from set # remove exec-condition from set
log.info(f'removing pred for {oid}') log.info(f'removing pred for {oid}')
@ -303,7 +301,7 @@ class TradesRelay:
consumers: int = 0 consumers: int = 0
class Router(BaseModel): class Router(Struct):
''' '''
Order router which manages and tracks per-broker dark book, Order router which manages and tracks per-broker dark book,
alerts, clearing and related data feed management. alerts, clearing and related data feed management.
@ -324,10 +322,6 @@ class Router(BaseModel):
# brokername to trades-dialogues streams with ``brokerd`` actors # brokername to trades-dialogues streams with ``brokerd`` actors
relays: dict[str, TradesRelay] = {} relays: dict[str, TradesRelay] = {}
class Config:
arbitrary_types_allowed = True
underscore_attrs_are_private = False
def get_dark_book( def get_dark_book(
self, self,
brokername: str, brokername: str,
@ -581,11 +575,11 @@ async def translate_and_relay_brokerd_events(
if name == 'position': if name == 'position':
pos_msg = BrokerdPosition(**brokerd_msg).dict() pos_msg = BrokerdPosition(**brokerd_msg)
# XXX: this will be useful for automatic strats yah? # XXX: this will be useful for automatic strats yah?
# keep pps per account up to date locally in ``emsd`` mem # keep pps per account up to date locally in ``emsd`` mem
sym, broker = pos_msg['symbol'], pos_msg['broker'] sym, broker = pos_msg.symbol, pos_msg.broker
relay.positions.setdefault( relay.positions.setdefault(
# NOTE: translate to a FQSN! # NOTE: translate to a FQSN!
@ -652,6 +646,13 @@ async def translate_and_relay_brokerd_events(
else: else:
# check for existing live flow entry # check for existing live flow entry
entry = book._ems_entries.get(oid) entry = book._ems_entries.get(oid)
old_reqid = entry.reqid
if old_reqid and old_reqid != reqid:
log.warning(
f'Brokerd order id change for {oid}:\n'
f'{old_reqid} -> {reqid}'
)
# initial response to brokerd order request # initial response to brokerd order request
if name == 'ack': if name == 'ack':
@ -662,6 +663,10 @@ async def translate_and_relay_brokerd_events(
# a ``BrokerdOrderAck`` **must** be sent after an order # a ``BrokerdOrderAck`` **must** be sent after an order
# request in order to establish this id mapping. # request in order to establish this id mapping.
book._ems2brokerd_ids[oid] = reqid book._ems2brokerd_ids[oid] = reqid
log.info(
'Rx ACK for order\n'
f'oid: {oid} -> reqid: {reqid}'
)
# new order which has not yet be registered into the # new order which has not yet be registered into the
# local ems book, insert it now and handle 2 cases: # local ems book, insert it now and handle 2 cases:
@ -676,7 +681,7 @@ async def translate_and_relay_brokerd_events(
entry.reqid = reqid entry.reqid = reqid
# tell broker to cancel immediately # tell broker to cancel immediately
await brokerd_trades_stream.send(entry.dict()) await brokerd_trades_stream.send(entry)
# - the order is now active and will be mirrored in # - the order is now active and will be mirrored in
# our book -> registered as live flow # our book -> registered as live flow
@ -716,7 +721,7 @@ async def translate_and_relay_brokerd_events(
# if 10147 in message: cancel # if 10147 in message: cancel
resp = 'broker_errored' resp = 'broker_errored'
broker_details = msg.dict() broker_details = msg
# don't relay message to order requester client # don't relay message to order requester client
# continue # continue
@ -751,7 +756,7 @@ async def translate_and_relay_brokerd_events(
resp = 'broker_' + msg.status resp = 'broker_' + msg.status
# pass the BrokerdStatus msg inside the broker details field # pass the BrokerdStatus msg inside the broker details field
broker_details = msg.dict() broker_details = msg
elif name in ( elif name in (
'fill', 'fill',
@ -760,7 +765,7 @@ async def translate_and_relay_brokerd_events(
# proxy through the "fill" result(s) # proxy through the "fill" result(s)
resp = 'broker_filled' resp = 'broker_filled'
broker_details = msg.dict() broker_details = msg
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
@ -778,7 +783,7 @@ async def translate_and_relay_brokerd_events(
time_ns=time.time_ns(), time_ns=time.time_ns(),
broker_reqid=reqid, broker_reqid=reqid,
brokerd_msg=broker_details, brokerd_msg=broker_details,
).dict() )
) )
except KeyError: except KeyError:
log.error( log.error(
@ -843,14 +848,14 @@ async def process_client_order_cmds(
# NOTE: cancel response will be relayed back in messages # NOTE: cancel response will be relayed back in messages
# from corresponding broker # from corresponding broker
if reqid: if reqid is not None:
# send cancel to brokerd immediately! # send cancel to brokerd immediately!
log.info( log.info(
f'Submitting cancel for live order {reqid}' f'Submitting cancel for live order {reqid}'
) )
await brokerd_order_stream.send(msg.dict()) await brokerd_order_stream.send(msg)
else: else:
# this might be a cancel for an order that hasn't been # this might be a cancel for an order that hasn't been
@ -872,7 +877,7 @@ async def process_client_order_cmds(
resp='dark_cancelled', resp='dark_cancelled',
oid=oid, oid=oid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
).dict() )
) )
# de-register this client dialogue # de-register this client dialogue
router.dialogues.pop(oid) router.dialogues.pop(oid)
@ -927,7 +932,7 @@ async def process_client_order_cmds(
# handle relaying the ems side responses back to # handle relaying the ems side responses back to
# the client/cmd sender from this request # the client/cmd sender from this request
log.info(f'Sending live order to {broker}:\n{pformat(msg)}') log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
await brokerd_order_stream.send(msg.dict()) await brokerd_order_stream.send(msg)
# an immediate response should be ``BrokerdOrderAck`` # an immediate response should be ``BrokerdOrderAck``
# with ems order id from the ``trades_dialogue()`` # with ems order id from the ``trades_dialogue()``
@ -1007,7 +1012,7 @@ async def process_client_order_cmds(
resp=resp, resp=resp,
oid=oid, oid=oid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
).dict() )
) )

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers # piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0) # Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -15,21 +15,26 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" """
Clearing system messagingn types and protocols. Clearing sub-system message and protocols.
""" """
from typing import Optional, Union from typing import Optional, Union
# TODO: try out just encoding/send direction for now?
# import msgspec
from pydantic import BaseModel
from ..data._source import Symbol from ..data._source import Symbol
from ..data.types import Struct
# TODO: ``msgspec`` stuff worth paying attention to:
# - schema evolution: https://jcristharif.com/msgspec/usage.html#schema-evolution
# - use literals for a common msg determined by diff keys?
# - https://jcristharif.com/msgspec/usage.html#literal
# - for eg. ``BrokerdStatus``, instead just have separate messages?
# --------------
# Client -> emsd # Client -> emsd
# --------------
class Cancel(Struct):
class Cancel(BaseModel):
'''Cancel msg for removing a dark (ems triggered) or '''Cancel msg for removing a dark (ems triggered) or
broker-submitted (live) trigger/order. broker-submitted (live) trigger/order.
@ -39,8 +44,10 @@ class Cancel(BaseModel):
symbol: str symbol: str
class Order(BaseModel): class Order(Struct):
# TODO: use ``msgspec.Literal``
# https://jcristharif.com/msgspec/usage.html#literal
action: str # {'buy', 'sell', 'alert'} action: str # {'buy', 'sell', 'alert'}
# internal ``emdsd`` unique "order id" # internal ``emdsd`` unique "order id"
oid: str # uuid4 oid: str # uuid4
@ -48,6 +55,9 @@ class Order(BaseModel):
account: str # should we set a default as '' ? account: str # should we set a default as '' ?
price: float price: float
# TODO: could we drop the ``.action`` field above and instead just
# use +/- values here? Would make the msg smaller at the sake of a
# teensie fp precision?
size: float size: float
brokers: list[str] brokers: list[str]
@ -59,20 +69,14 @@ class Order(BaseModel):
# the backend broker # the backend broker
exec_mode: str # {'dark', 'live', 'paper'} exec_mode: str # {'dark', 'live', 'paper'}
class Config:
# just for pre-loading a ``Symbol`` when used
# in the order mode staging process
arbitrary_types_allowed = True
# don't copy this model instance when used in
# a recursive model
copy_on_model_validation = False
# --------------
# Client <- emsd # Client <- emsd
# --------------
# update msgs from ems which relay state change info # update msgs from ems which relay state change info
# from the active clearing engine. # from the active clearing engine.
class Status(Struct):
class Status(BaseModel):
name: str = 'status' name: str = 'status'
oid: str # uuid4 oid: str # uuid4
@ -95,8 +99,6 @@ class Status(BaseModel):
# } # }
resp: str # "response", see above resp: str # "response", see above
# symbol: str
# trigger info # trigger info
trigger_price: Optional[float] = None trigger_price: Optional[float] = None
# price: float # price: float
@ -111,10 +113,12 @@ class Status(BaseModel):
brokerd_msg: dict = {} brokerd_msg: dict = {}
# ---------------
# emsd -> brokerd # emsd -> brokerd
# ---------------
# requests *sent* from ems to respective backend broker daemon # requests *sent* from ems to respective backend broker daemon
class BrokerdCancel(BaseModel): class BrokerdCancel(Struct):
action: str = 'cancel' action: str = 'cancel'
oid: str # piker emsd order id oid: str # piker emsd order id
@ -130,7 +134,7 @@ class BrokerdCancel(BaseModel):
reqid: Optional[Union[int, str]] = None reqid: Optional[Union[int, str]] = None
class BrokerdOrder(BaseModel): class BrokerdOrder(Struct):
action: str # {buy, sell} action: str # {buy, sell}
oid: str oid: str
@ -150,11 +154,12 @@ class BrokerdOrder(BaseModel):
size: float size: float
# ---------------
# emsd <- brokerd # emsd <- brokerd
# ---------------
# requests *received* to ems from broker backend # requests *received* to ems from broker backend
class BrokerdOrderAck(Struct):
class BrokerdOrderAck(BaseModel):
''' '''
Immediate reponse to a brokerd order request providing the broker Immediate reponse to a brokerd order request providing the broker
specific unique order id so that the EMS can associate this specific unique order id so that the EMS can associate this
@ -172,7 +177,7 @@ class BrokerdOrderAck(BaseModel):
account: str = '' account: str = ''
class BrokerdStatus(BaseModel): class BrokerdStatus(Struct):
name: str = 'status' name: str = 'status'
reqid: Union[int, str] reqid: Union[int, str]
@ -205,7 +210,7 @@ class BrokerdStatus(BaseModel):
} }
class BrokerdFill(BaseModel): class BrokerdFill(Struct):
''' '''
A single message indicating a "fill-details" event from the broker A single message indicating a "fill-details" event from the broker
if avaiable. if avaiable.
@ -230,7 +235,7 @@ class BrokerdFill(BaseModel):
broker_time: float broker_time: float
class BrokerdError(BaseModel): class BrokerdError(Struct):
''' '''
Optional error type that can be relayed to emsd for error handling. Optional error type that can be relayed to emsd for error handling.
@ -249,7 +254,7 @@ class BrokerdError(BaseModel):
broker_details: dict = {} broker_details: dict = {}
class BrokerdPosition(BaseModel): class BrokerdPosition(Struct):
'''Position update event from brokerd. '''Position update event from brokerd.
''' '''

View File

@ -117,7 +117,7 @@ class PaperBoi:
reason='paper_trigger', reason='paper_trigger',
remaining=size, remaining=size,
) )
await self.ems_trades_stream.send(msg.dict()) await self.ems_trades_stream.send(msg)
# if we're already a clearing price simulate an immediate fill # if we're already a clearing price simulate an immediate fill
if ( if (
@ -173,7 +173,7 @@ class PaperBoi:
broker=self.broker, broker=self.broker,
time_ns=time.time_ns(), time_ns=time.time_ns(),
) )
await self.ems_trades_stream.send(msg.dict()) await self.ems_trades_stream.send(msg)
async def fake_fill( async def fake_fill(
self, self,
@ -216,7 +216,7 @@ class PaperBoi:
'name': self.broker + '_paper', 'name': self.broker + '_paper',
}, },
) )
await self.ems_trades_stream.send(msg.dict()) await self.ems_trades_stream.send(msg)
if order_complete: if order_complete:
@ -240,7 +240,7 @@ class PaperBoi:
'name': self.broker, 'name': self.broker,
}, },
) )
await self.ems_trades_stream.send(msg.dict()) await self.ems_trades_stream.send(msg)
# lookup any existing position # lookup any existing position
token = f'{symbol}.{self.broker}' token = f'{symbol}.{self.broker}'
@ -268,7 +268,7 @@ class PaperBoi:
) )
pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price) pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price)
await self.ems_trades_stream.send(pp_msg.dict()) await self.ems_trades_stream.send(pp_msg)
async def simulate_fills( async def simulate_fills(
@ -384,7 +384,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'Paper only. No account found: `{account}` ?', reason=f'Paper only. No account found: `{account}` ?',
).dict()) ))
continue continue
# validate # validate
@ -416,7 +416,7 @@ async def handle_order_requests(
# broker specific request id # broker specific request id
reqid=reqid, reqid=reqid,
).dict() )
) )
elif action == 'cancel': elif action == 'cancel':

View File

@ -27,13 +27,14 @@ from multiprocessing.shared_memory import SharedMemory, _USE_POSIX
if _USE_POSIX: if _USE_POSIX:
from _posixshmem import shm_unlink from _posixshmem import shm_unlink
import tractor # import msgspec
import numpy as np import numpy as np
from pydantic import BaseModel
from numpy.lib import recfunctions as rfn from numpy.lib import recfunctions as rfn
import tractor
from ..log import get_logger from ..log import get_logger
from ._source import base_iohlc_dtype from ._source import base_iohlc_dtype
from .types import Struct
log = get_logger(__name__) log = get_logger(__name__)
@ -107,15 +108,12 @@ class SharedInt:
log.warning(f'Shm for {name} already unlinked?') log.warning(f'Shm for {name} already unlinked?')
class _Token(BaseModel): class _Token(Struct, frozen=True):
''' '''
Internal represenation of a shared memory "token" Internal represenation of a shared memory "token"
which can be used to key a system wide post shm entry. which can be used to key a system wide post shm entry.
''' '''
class Config:
frozen = True
shm_name: str # this servers as a "key" value shm_name: str # this servers as a "key" value
shm_first_index_name: str shm_first_index_name: str
shm_last_index_name: str shm_last_index_name: str
@ -126,17 +124,22 @@ class _Token(BaseModel):
return np.dtype(list(map(tuple, self.dtype_descr))).descr return np.dtype(list(map(tuple, self.dtype_descr))).descr
def as_msg(self): def as_msg(self):
return self.dict() return self.to_dict()
@classmethod @classmethod
def from_msg(cls, msg: dict) -> _Token: def from_msg(cls, msg: dict) -> _Token:
if isinstance(msg, _Token): if isinstance(msg, _Token):
return msg return msg
# TODO: native struct decoding
# return _token_dec.decode(msg)
msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr'])) msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
return _Token(**msg) return _Token(**msg)
# _token_dec = msgspec.msgpack.Decoder(_Token)
# TODO: this api? # TODO: this api?
# _known_tokens = tractor.ActorVar('_shm_tokens', {}) # _known_tokens = tractor.ActorVar('_shm_tokens', {})
# _known_tokens = tractor.ContextStack('_known_tokens', ) # _known_tokens = tractor.ContextStack('_known_tokens', )
@ -167,7 +170,7 @@ def _make_token(
shm_name=key, shm_name=key,
shm_first_index_name=key + "_first", shm_first_index_name=key + "_first",
shm_last_index_name=key + "_last", shm_last_index_name=key + "_last",
dtype_descr=np.dtype(dtype).descr dtype_descr=tuple(np.dtype(dtype).descr)
) )

View File

@ -42,7 +42,6 @@ from trio_typing import TaskStatus
import trimeter import trimeter
import tractor import tractor
from tractor.trionics import maybe_open_context from tractor.trionics import maybe_open_context
from pydantic import BaseModel
import pendulum import pendulum
import numpy as np import numpy as np
@ -59,6 +58,7 @@ from ._sharedmem import (
ShmArray, ShmArray,
) )
from .ingest import get_ingestormod from .ingest import get_ingestormod
from .types import Struct
from ._source import ( from ._source import (
base_iohlc_dtype, base_iohlc_dtype,
Symbol, Symbol,
@ -84,7 +84,7 @@ if TYPE_CHECKING:
log = get_logger(__name__) log = get_logger(__name__)
class _FeedsBus(BaseModel): class _FeedsBus(Struct):
''' '''
Data feeds broadcaster and persistence management. Data feeds broadcaster and persistence management.
@ -100,10 +100,6 @@ class _FeedsBus(BaseModel):
a dedicated cancel scope. a dedicated cancel scope.
''' '''
class Config:
arbitrary_types_allowed = True
underscore_attrs_are_private = False
brokername: str brokername: str
nursery: trio.Nursery nursery: trio.Nursery
feeds: dict[str, tuple[dict, dict]] = {} feeds: dict[str, tuple[dict, dict]] = {}

View File

@ -0,0 +1,68 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Built-in (extension) types.
"""
from typing import Optional
from pprint import pformat
import msgspec
class Struct(
msgspec.Struct,
# https://jcristharif.com/msgspec/structs.html#tagged-unions
# tag='pikerstruct',
# tag=True,
):
'''
A "human friendlier" (aka repl buddy) struct subtype.
'''
def to_dict(self) -> dict:
return {
f: getattr(self, f)
for f in self.__struct_fields__
}
def __repr__(self):
return f'Struct({pformat(self.to_dict())})'
def copy(
self,
update: Optional[dict] = None,
) -> msgspec.Struct:
'''
Validate-typecast all self defined fields, return a copy of us
with all such fields.
This is kinda like the default behaviour in `pydantic.BaseModel`.
'''
if update:
for k, v in update.items():
setattr(self, k, v)
# roundtrip serialize to validate
return msgspec.msgpack.Decoder(
type=type(self)
).decode(
msgspec.msgpack.Encoder().encode(self)
)

View File

@ -21,7 +21,6 @@ Qt event proxying and processing using ``trio`` mem chans.
from contextlib import asynccontextmanager, AsyncExitStack from contextlib import asynccontextmanager, AsyncExitStack
from typing import Callable from typing import Callable
from pydantic import BaseModel
import trio import trio
from PyQt5 import QtCore from PyQt5 import QtCore
from PyQt5.QtCore import QEvent, pyqtBoundSignal from PyQt5.QtCore import QEvent, pyqtBoundSignal
@ -30,6 +29,8 @@ from PyQt5.QtWidgets import (
QGraphicsSceneMouseEvent as gs_mouse, QGraphicsSceneMouseEvent as gs_mouse,
) )
from ..data.types import Struct
MOUSE_EVENTS = { MOUSE_EVENTS = {
gs_mouse.GraphicsSceneMousePress, gs_mouse.GraphicsSceneMousePress,
@ -43,13 +44,10 @@ MOUSE_EVENTS = {
# TODO: maybe consider some constrained ints down the road? # TODO: maybe consider some constrained ints down the road?
# https://pydantic-docs.helpmanual.io/usage/types/#constrained-types # https://pydantic-docs.helpmanual.io/usage/types/#constrained-types
class KeyboardMsg(BaseModel): class KeyboardMsg(Struct):
'''Unpacked Qt keyboard event data. '''Unpacked Qt keyboard event data.
''' '''
class Config:
arbitrary_types_allowed = True
event: QEvent event: QEvent
etype: int etype: int
key: int key: int
@ -57,16 +55,13 @@ class KeyboardMsg(BaseModel):
txt: str txt: str
def to_tuple(self) -> tuple: def to_tuple(self) -> tuple:
return tuple(self.dict().values()) return tuple(self.to_dict().values())
class MouseMsg(BaseModel): class MouseMsg(Struct):
'''Unpacked Qt keyboard event data. '''Unpacked Qt keyboard event data.
''' '''
class Config:
arbitrary_types_allowed = True
event: QEvent event: QEvent
etype: int etype: int
button: int button: int

View File

@ -619,7 +619,7 @@ class FillStatusBar(QProgressBar):
# color: #19232D; # color: #19232D;
# width: 10px; # width: 10px;
self.setRange(0, slots) self.setRange(0, int(slots))
self.setValue(value) self.setValue(value)

View File

@ -22,12 +22,9 @@ from __future__ import annotations
from typing import ( from typing import (
Optional, Generic, Optional, Generic,
TypeVar, Callable, TypeVar, Callable,
Literal,
) )
import enum
import sys
from pydantic import BaseModel, validator # from pydantic import BaseModel, validator
from pydantic.generics import GenericModel from pydantic.generics import GenericModel
from PyQt5.QtWidgets import ( from PyQt5.QtWidgets import (
QWidget, QWidget,
@ -38,6 +35,7 @@ from ._forms import (
# FontScaledDelegate, # FontScaledDelegate,
Edit, Edit,
) )
from ..data.types import Struct
DataType = TypeVar('DataType') DataType = TypeVar('DataType')
@ -62,7 +60,7 @@ class Selection(Field[DataType], Generic[DataType]):
options: dict[str, DataType] options: dict[str, DataType]
# value: DataType = None # value: DataType = None
@validator('value') # , always=True) # @validator('value') # , always=True)
def set_value_first( def set_value_first(
cls, cls,
@ -100,7 +98,7 @@ class Edit(Field[DataType], Generic[DataType]):
widget_factory = Edit widget_factory = Edit
class AllocatorPane(BaseModel): class AllocatorPane(Struct):
account = Selection[str]( account = Selection[str](
options=dict.fromkeys( options=dict.fromkeys(

View File

@ -27,7 +27,6 @@ import time
from typing import Optional, Dict, Callable, Any from typing import Optional, Dict, Callable, Any
import uuid import uuid
from pydantic import BaseModel
import tractor import tractor
import trio import trio
from PyQt5.QtCore import Qt from PyQt5.QtCore import Qt
@ -41,6 +40,7 @@ from ..clearing._allocate import (
from ._style import _font from ._style import _font
from ..data._source import Symbol from ..data._source import Symbol
from ..data.feed import Feed from ..data.feed import Feed
from ..data.types import Struct
from ..log import get_logger from ..log import get_logger
from ._editors import LineEditor, ArrowEditor from ._editors import LineEditor, ArrowEditor
from ._lines import order_line, LevelLine from ._lines import order_line, LevelLine
@ -58,7 +58,7 @@ from ._forms import open_form_input_handling
log = get_logger(__name__) log = get_logger(__name__)
class OrderDialog(BaseModel): class OrderDialog(Struct):
''' '''
Trade dialogue meta-data describing the lifetime Trade dialogue meta-data describing the lifetime
of an order submission to ``emsd`` from a chart. of an order submission to ``emsd`` from a chart.
@ -73,10 +73,6 @@ class OrderDialog(BaseModel):
msgs: dict[str, dict] = {} msgs: dict[str, dict] = {}
fills: Dict[str, Any] = {} fills: Dict[str, Any] = {}
class Config:
arbitrary_types_allowed = True
underscore_attrs_are_private = False
def on_level_change_update_next_order_info( def on_level_change_update_next_order_info(
@ -268,7 +264,8 @@ class OrderMode:
self, self,
) -> OrderDialog: ) -> OrderDialog:
'''Send execution order to EMS return a level line to '''
Send execution order to EMS return a level line to
represent the order on a chart. represent the order on a chart.
''' '''
@ -277,13 +274,9 @@ class OrderMode:
oid = str(uuid.uuid4()) oid = str(uuid.uuid4())
# format order data for ems # format order data for ems
fqsn = symbol.front_fqsn() order = staged.copy()
order = staged.copy( order.oid = oid
update={ order.symbol = symbol.front_fqsn()
'symbol': fqsn,
'oid': oid,
}
)
line = self.line_from_order( line = self.line_from_order(
order, order,
@ -858,7 +851,9 @@ async def process_trades_and_update_ui(
# delete level line from view # delete level line from view
mode.on_cancel(oid) mode.on_cancel(oid)
broker_msg = msg['brokerd_msg'] broker_msg = msg['brokerd_msg']
log.warning(f'Order {oid} failed with:\n{pformat(broker_msg)}') log.warning(
f'Order {oid} failed with:\n{pformat(broker_msg)}'
)
elif resp in ( elif resp in (
'dark_triggered' 'dark_triggered'