Compare commits

...

28 Commits

Author SHA1 Message Date
Tyler Goodlet 72072b5737 Support `0` value `reqid`s 🤦 2022-07-08 23:16:29 -04:00
Tyler Goodlet 381b3121d6 Cancel any live orders found on connect
More or less just to avoid orders the user wasn't aware of from
persisting until we get "open order relaying" through the ems working.

Some further fixes which required a new `reqids2txids` map which keeps
track of which `kraken` "txid" is mapped to our `reqid: int`; mainly
this was needed for cancel requests which require knowing the underlying
`txid`s (since apparently kraken doesn't keep track of the "reqid"  we
pass it). Pass the ws instance into `handle_order_updates()` to enable
the cancelling orders on startup. Don't key error on unknown `reqid`
values (for eg. when receiving historical trade events on startup).
Handle cancel requests first in the ems side loop.
2022-07-08 23:10:25 -04:00
Tyler Goodlet 8984c1b60b Use `aclosing()` around ws async gen 2022-07-08 19:14:45 -04:00
Tyler Goodlet 2e3cac1407 Add some todo-reminders for ``msgspec`` stuff 2022-07-08 19:14:45 -04:00
Tyler Goodlet b250a48b8d Drop remaining `BaseModel` api usage from rest of codebase 2022-07-08 19:14:43 -04:00
Tyler Goodlet 3dfe7ef8dd Add `Struct.copy()` which does a rountrip validate 2022-07-08 19:14:11 -04:00
Tyler Goodlet 2774a7e6ec Change all clearing msgs over to `msgspec` 2022-07-08 19:14:11 -04:00
Tyler Goodlet 7b6318f025 Cast slots to `int` before range set 2022-07-08 19:14:11 -04:00
Tyler Goodlet b0ee764423 Drop pydantic from allocator 2022-07-08 19:14:11 -04:00
Tyler Goodlet a63f0ee1c0 Add a custom `msgspec.Struct` with some humanizing 2022-07-08 19:14:11 -04:00
Tyler Goodlet 7d49335f8b Lol, gotta `float()` that vlm before `*` XD 2022-07-08 19:14:11 -04:00
Tyler Goodlet 9d176c2dda Remove `BaseModel` use from all dataclass-like uses 2022-07-08 19:14:11 -04:00
Tyler Goodlet 6cc02bd8f5 Use struct for shm tokens 2022-07-08 19:14:11 -04:00
Tyler Goodlet 693c7ce12a Use our struct for kraken `Pair` type 2022-07-08 19:14:11 -04:00
Tyler Goodlet 5e60c79664 Drop `pydantic` from service mngr 2022-07-08 19:14:11 -04:00
Tyler Goodlet 7f779dda19 Use our struct in binance backend 2022-07-08 19:14:11 -04:00
Tyler Goodlet 5100036e10 Pass our manually mapped `reqid: int` to EMS
Since we seem to always be able to get back the `reqid`/`userref` value
we send to kraken ws endpoints, we can use this as our brokerd side
order id and avoid all race cases with getting the true `txid` value
that `kraken` assigns (and which changes when you do "edits"
:eyeroll:). This simplifies status updates by allowing our relay loop
just to pass back our generated `.reqid` verbatim and allows responding
with a `BrokerdOrderAck` immediately in the request handler task which
should guarantee there are no further race conditions with the relay
loop and mapping `txid`s from kraken.. and figuring out wtf to do when
they change, etc.
2022-07-08 19:13:32 -04:00
Tyler Goodlet 78b9d90202 Add ledger and `pps.toml` snippets 2022-07-08 19:13:32 -04:00
Tyler Goodlet 9300b3d6db Try out a backend readme 2022-07-08 19:13:32 -04:00
Tyler Goodlet 6d13c8255f Don't require an ems msg symbol on error statuses 2022-07-08 19:13:32 -04:00
Tyler Goodlet 3765c61f2d Update ledger *after* pps updates from new trades
Addressing same issue as in #350 where we need to compute position
updates using the *first read* from the ledger **before** we update it
to make sure `Position.lifo_update()` gets called and **not skipped**
because new trades were read as clears entries but haven't actually been
included in update calcs yet.. aka we call `Position.lifo_update()`.

Main change here is to convert `update_ledger()` into a context mngr so
that the ledger write is committed after pps updates using
`pp.update_pps_conf()`..

This is basically a hotfix to #346 as well.
2022-07-08 19:13:32 -04:00
Tyler Goodlet cb7a9b9449 Factor status handling into a new `process_status()` helper 2022-07-08 19:13:32 -04:00
Tyler Goodlet f1192dff09 Factor msg loop into new func: `handle_order_updates()` 2022-07-08 19:13:32 -04:00
Tyler Goodlet 9e8d32cdff Drop uneeded count-sequencec verification 2022-07-08 19:13:32 -04:00
Tyler Goodlet c74741228f Get order "editing" working fully
Turns out the EMS can support this as originally expected: you can
update a `brokerd`-side `.reqid` through a `BrokerdAck` msg and the ems
which update its cross-dialog (leg) tracking correctly! The issue was
a bug in the `editOrderStatus` msg handling and appropriate tracking
of the correct `.oid` (ems uid) on the kraken side. This unfortunately
required adding a `emsflow: dict[str, list[BrokerdOrder]]` msg flow
tracing table which means the broker daemon is tracking all the msg flow
with the ems, though I'm wondering now if this is just good practise
anyway and maybe we should offer a small primitive type from our msging
utils to aid with this? I've used such constructs in event handling
systems prior.

There's a lot more factoring that can be done after these changes as
well but the quick detailed summary is,
- rework the `handle_order_requests()` loop to use `match:` syntax and
  update the new `emsflow` table on every new request from the ems.
- fix the `editOrderStatus` case pattern to not include an error msg and
  thus actually be triggered to respond to the ems with a `BrokerdAck`
  containing the new `.reqid`, the new kraken side `txid`.
- skip any `openOrders` msgs which are detected as being kraken's
  internal order "edits" by matching on the `cancel_reason` field.
- update the `emsflow` table in all ws-stream msg handling blocks
  with responses sent to the ems.

Relates to #290
2022-07-08 19:13:32 -04:00
Tyler Goodlet f38eef2bf4 Make ems relay loop report on brokerd `.reqid` changes 2022-07-08 19:13:32 -04:00
Tyler Goodlet e757e1f277 Use `match:` syntax in data feed subs processing 2022-07-08 19:13:32 -04:00
Tyler Goodlet 4823f87422 First draft, working WS based order management
Move to using the websocket API for all order control ops and dropping
the sync rest api approach which resulted in a bunch of buggy races.
Further this gets us must faster (batch) order cancellation for free
and a simpler ems request handler loop. We now heavily leverage the new
py3.10 `match:` syntax for all kraken-side API msg parsing and
processing and handle both the `openOrders` and `ownTrades` subscription
streams.

We also block "order editing" (by immediate cancellation) for now since
the EMS isn't entirely yet equipped to handle brokerd side `.reqid`
changes (which is how kraken implements so called order "updates" or
"edits") for a given order-request dialog and we may want to even
consider just implementing "updates" ourselves via independent cancel
and submit requests? Definitely something to ponder. Alternatively we
can "masquerade" such updates behind the count-style `.oid` remapping we
had to implement anyway (kraken's limitation) and maybe everything will
just work?

Further details in this patch:
- create 2 tables for tracking the EMS's `.oid` (uui4) value to `int`s
  that kraken expects (for `reqid`s): `ids` and `reqmsgs` which enable
  local lookup of ems uids to piker-backend-client-side request ids and
  received order messages.
- add `openOrders` sub support which more or less directly relays to
  equivalent `BrokerdStatus` updates and calc the `.filled` and
  `.remaining` values based on cleared vlm updates.
- add handler blocks for `[add/edit/cancel]OrderStatus` events including
  error msg cases.
- don't do any order request response processing in
  `handle_order_requests()` since responses are always received via one
  (or both?) of the new ws subs: `ownTrades` and `openOrders` and thus
  such msgs are now handled in the response relay loop.

Relates to #290
Resolves #310, #296
2022-07-08 19:13:32 -04:00
19 changed files with 901 additions and 472 deletions

View File

@ -22,10 +22,10 @@ from typing import Optional, Union, Callable, Any
from contextlib import asynccontextmanager as acm
from collections import defaultdict
from pydantic import BaseModel
from msgspec import Struct
import tractor
import trio
from trio_typing import TaskStatus
import tractor
from .log import get_logger, get_console_log
from .brokers import get_brokermod
@ -47,16 +47,13 @@ _root_modules = [
]
class Services(BaseModel):
class Services(Struct):
actor_n: tractor._supervise.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}
class Config:
arbitrary_types_allowed = True
async def start_service_task(
self,
name: str,

View File

@ -34,13 +34,13 @@ from fuzzywuzzy import process as fuzzy
import numpy as np
import tractor
from pydantic.dataclasses import dataclass
from pydantic import BaseModel
import wsproto
from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound
from ..log import get_logger, get_console_log
from ..data import ShmArray
from ..data.types import Struct
from ..data._web_bs import open_autorecon_ws, NoBsWs
log = get_logger(__name__)
@ -79,12 +79,14 @@ _show_wap_in_history = False
# https://binance-docs.github.io/apidocs/spot/en/#exchange-information
class Pair(BaseModel):
class Pair(Struct, frozen=True):
symbol: str
status: str
baseAsset: str
baseAssetPrecision: int
cancelReplaceAllowed: bool
allowTrailingStop: bool
quoteAsset: str
quotePrecision: int
quoteAssetPrecision: int
@ -287,7 +289,7 @@ async def get_client() -> Client:
# validation type
class AggTrade(BaseModel):
class AggTrade(Struct):
e: str # Event type
E: int # Event time
s: str # Symbol
@ -341,7 +343,9 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
elif msg.get('e') == 'aggTrade':
# validate
# NOTE: this is purely for a definition, ``msgspec.Struct``
# does not runtime-validate until you decode/encode.
# see: https://jcristharif.com/msgspec/structs.html#type-validation
msg = AggTrade(**msg)
# TODO: type out and require this quote format
@ -352,8 +356,8 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
'brokerd_ts': time.time(),
'ticks': [{
'type': 'trade',
'price': msg.p,
'size': msg.q,
'price': float(msg.p),
'size': float(msg.q),
'broker_ts': msg.T,
}],
}
@ -448,7 +452,7 @@ async def stream_quotes(
d = cache[sym.upper()]
syminfo = Pair(**d) # validation
si = sym_infos[sym] = syminfo.dict()
si = sym_infos[sym] = syminfo.to_dict()
# XXX: after manually inspecting the response format we
# just directly pick out the info we need

View File

@ -148,7 +148,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'No account found: `{account}` ?',
).dict())
))
continue
client = _accounts2clients.get(account)
@ -161,7 +161,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'No api client loaded for account: `{account}` ?',
).dict())
))
continue
if action in {'buy', 'sell'}:
@ -188,7 +188,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason='Order already active?',
).dict())
))
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
@ -197,9 +197,8 @@ async def handle_order_requests(
oid=order.oid,
# broker specific request id
reqid=reqid,
time_ns=time.time_ns(),
account=account,
).dict()
)
)
elif action == 'cancel':
@ -559,7 +558,7 @@ async def trades_dialogue(
cids2pps,
validate=True,
)
all_positions.extend(msg.dict() for msg in msgs)
all_positions.extend(msg for msg in msgs)
if not all_positions and cids2pps:
raise RuntimeError(
@ -665,7 +664,7 @@ async def emit_pp_update(
msg = msgs[0]
break
await ems_stream.send(msg.dict())
await ems_stream.send(msg)
async def deliver_trade_events(
@ -743,7 +742,7 @@ async def deliver_trade_events(
broker_details={'name': 'ib'},
)
await ems_stream.send(msg.dict())
await ems_stream.send(msg)
case 'fill':
@ -803,7 +802,7 @@ async def deliver_trade_events(
broker_time=trade_entry['broker_time'],
)
await ems_stream.send(msg.dict())
await ems_stream.send(msg)
# 2 cases:
# - fill comes first or
@ -879,7 +878,7 @@ async def deliver_trade_events(
cid, msg = pack_position(item)
# acctid = msg.account = accounts_def.inverse[msg.account]
# cuck ib and it's shitty fifo sys for pps!
# await ems_stream.send(msg.dict())
# await ems_stream.send(msg)
case 'event':
@ -891,7 +890,7 @@ async def deliver_trade_events(
# level...
# reqid = item.get('reqid', 0)
# if getattr(msg, 'reqid', 0) < -1:
# log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
# log.info(f"TWS triggered trade\n{pformat(msg)}")
# msg.reqid = 'tws-' + str(-1 * reqid)

View File

@ -0,0 +1,64 @@
``kraken`` backend
------------------
though they don't have the most liquidity of all the cexes they sure are
accommodating to those of us who appreciate a little ``xmr``.
status
******
current support is *production grade* and both real-time data and order
management should be correct and fast. this backend is used by core devs
for live trading.
config
******
In order to get order mode support your ``brokers.toml``
needs to have something like the following:
.. code:: toml
[kraken]
accounts.spot = 'spot'
key_descr = "spot"
api_key = "69696969696969696696969696969696969696969696969696969696"
secret = "BOOBSBOOBSBOOBSBOOBSBOOBSSMBZ69696969696969669969696969696"
If everything works correctly you should see any current positions
loaded in the pps pane on chart load and you should also be able to
check your trade records in the file::
<pikerk_conf_dir>/ledgers/trades_kraken_spot.toml
An example ledger file will have entries written verbatim from the
trade events schema:
.. code:: toml
[TFJBKK-SMBZS-VJ4UWS]
ordertxid = "SMBZSA-7CNQU-3HWLNJ"
postxid = "SMBZSE-M7IF5-CFI7LT"
pair = "XXMRZEUR"
time = 1655691993.4133966
type = "buy"
ordertype = "limit"
price = "103.97000000"
cost = "499.99999977"
fee = "0.80000000"
vol = "4.80907954"
margin = "0.00000000"
misc = ""
your ``pps.toml`` file will have position entries like,
.. code:: toml
[kraken.spot."xmreur.kraken"]
size = 4.80907954
be_price = 103.97000000
bsuid = "XXMRZEUR"
clears = [
{ tid = "TFJBKK-SMBZS-VJ4UWS", cost = 0.8, price = 103.97, size = 4.80907954, dt = "2022-05-20T02:26:33.413397+00:00" },
]

View File

@ -282,6 +282,7 @@ class Client:
"volume": str(size),
}
return await self.endpoint('AddOrder', data)
else:
# Edit order data for kraken api
data["txid"] = reqid

View File

@ -18,21 +18,23 @@
Order api and machinery
'''
from contextlib import asynccontextmanager as acm
from contextlib import (
asynccontextmanager as acm,
contextmanager as cm,
)
from functools import partial
from itertools import chain
from itertools import chain, count
from pprint import pformat
import time
from typing import (
Any,
AsyncIterator,
# Callable,
# Optional,
# Union,
Union,
)
from async_generator import aclosing
from bidict import bidict
import pendulum
from pydantic import BaseModel
import trio
import tractor
import wsproto
@ -61,179 +63,137 @@ from .feed import (
stream_messages,
)
class Trade(BaseModel):
'''
Trade class that helps parse and validate ownTrades stream
'''
reqid: str # kraken order transaction id
action: str # buy or sell
price: float # price of asset
size: float # vol of asset
broker_time: str # e.g GTC, GTD
MsgUnion = Union[
BrokerdCancel,
BrokerdError,
BrokerdFill,
BrokerdOrder,
BrokerdOrderAck,
BrokerdPosition,
BrokerdStatus,
]
async def handle_order_requests(
ws: NoBsWs,
client: Client,
ems_order_stream: tractor.MsgStream,
token: str,
emsflow: dict[str, list[MsgUnion]],
ids: bidict[str, int],
reqids2txids: dict[int, str],
) -> None:
'''
Process new order submission requests from the EMS
and deliver acks or errors.
request_msg: dict
'''
# XXX: UGH, let's unify this.. with ``msgspec``.
msg: dict[str, Any]
order: BrokerdOrder
counter = count()
async for request_msg in ems_order_stream:
log.info(
'Received order request:\n'
f'{pformat(request_msg)}'
)
async for msg in ems_order_stream:
log.info(f'Rx order msg:\n{pformat(msg)}')
match msg:
case {
'action': 'cancel',
}:
cancel = BrokerdCancel(**msg)
last = emsflow[cancel.oid]
reqid = ids[cancel.oid]
txid = reqids2txids[reqid]
action = request_msg['action']
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [txid], # should be txid from submission
})
if action in {'buy', 'sell'}:
case {
'account': 'kraken.spot' as account,
'action': action,
} if action in {'buy', 'sell'}:
account = request_msg['account']
if account != 'kraken.spot':
log.error(
'This is a kraken account, \
only a `kraken.spot` selection is valid'
)
await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
# validate
order = BrokerdOrder(**msg)
# reason=f'Kraken only, No account found: `{account}` ?',
reason=(
'Kraken only, order mode disabled due to '
'https://github.com/pikers/piker/issues/299'
),
# logic from old `Client.submit_limit()`
if order.oid in ids:
ep = 'editOrder'
reqid = ids[order.oid] # integer not txid
txid = reqids2txids[reqid]
extra = {
'orderid': txid, # txid
}
).dict())
continue
# validate
order = BrokerdOrder(**request_msg)
# call our client api to submit the order
resp = await client.submit_limit(
symbol=order.symbol,
price=order.price,
action=order.action,
size=order.size,
reqid=order.reqid,
)
err = resp['error']
if err:
oid = order.oid
log.error(f'Failed to submit order: {oid}')
await ems_order_stream.send(
BrokerdError(
oid=order.oid,
reqid=order.reqid,
symbol=order.symbol,
reason="Failed order submission",
broker_details=resp
).dict()
)
else:
# TODO: handle multiple orders (cancels?)
# txid is an array of strings
if order.reqid is None:
reqid = resp['result']['txid'][0]
else:
# update the internal pairing of oid to krakens
# txid with the new txid that is returned on edit
reqid = resp['result']['txid']
ep = 'addOrder'
reqid = next(counter)
ids[order.oid] = reqid
log.debug(
f"Adding order {reqid}\n"
f'{ids}'
)
extra = {
'ordertype': 'limit',
'type': order.action,
}
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
psym = order.symbol.upper()
pair = f'{psym[:3]}/{psym[3:]}'
# ems order request id
oid=order.oid,
# call ws api to submit the order:
# https://docs.kraken.com/websockets/#message-addOrder
req = {
'event': ep,
'token': token,
# broker specific request id
reqid=reqid,
'reqid': reqid, # remapped-to-int uid from ems
'pair': pair,
'price': str(order.price),
'volume': str(order.size),
# account the made the order
account=order.account
# only ensures request is valid, nothing more
# validate: 'true',
).dict()
} | extra
log.info(f'Submitting WS order request:\n{pformat(req)}')
await ws.send_msg(req)
resp = BrokerdOrderAck(
oid=order.oid, # ems order request id
reqid=reqid, # our custom int mapping
account=account, # piker account
)
await ems_order_stream.send(resp)
elif action == 'cancel':
msg = BrokerdCancel(**request_msg)
# placehold for sanity checking in relay loop
emsflow.setdefault(order.oid, []).append(order)
# Send order cancellation to kraken
resp = await client.submit_cancel(
reqid=msg.reqid
)
# Check to make sure there was no error returned by
# the kraken endpoint. Assert one order was cancelled.
try:
result = resp['result']
count = result['count']
# check for 'error' key if we received no 'result'
except KeyError:
error = resp.get('error')
case _:
account = msg.get('account')
if account != 'kraken.spot':
log.error(
'This is a kraken account, \
only a `kraken.spot` selection is valid'
)
await ems_order_stream.send(
BrokerdError(
oid=msg.oid,
reqid=msg.reqid,
symbol=msg.symbol,
reason="Failed order cancel",
broker_details=resp
).dict()
)
oid=msg['oid'],
symbol=msg['symbol'],
reason=(
'Invalid request msg:\n{msg}'
),
if not error:
raise BrokerError(f'Unknown order cancel response: {resp}')
else:
if not count: # no orders were cancelled?
# XXX: what exactly is this from and why would we care?
# there doesn't seem to be any docs here?
# https://docs.kraken.com/rest/#operation/cancelOrder
# Check to make sure the cancellation is NOT pending,
# then send the confirmation to the ems order stream
pending = result.get('pending')
if pending:
log.error(f'Order {oid} cancel was not yet successful')
await ems_order_stream.send(
BrokerdError(
oid=msg.oid,
reqid=msg.reqid,
symbol=msg.symbol,
# TODO: maybe figure out if pending
# cancels will eventually get cancelled
reason="Order cancel is still pending?",
broker_details=resp
).dict()
)
else: # order cancel success case.
await ems_order_stream.send(
BrokerdStatus(
reqid=msg.reqid,
account=msg.account,
time_ns=time.time_ns(),
status='cancelled',
reason='Order cancelled',
broker_details={'name': 'kraken'}
).dict()
)
else:
log.error(f'Unknown order command: {request_msg}')
)
@acm
@ -310,13 +270,13 @@ async def trades_dialogue(
log.info(
f'Loaded {len(trades)} trades from account `{acc_name}`'
)
trans = await update_ledger(acctid, trades)
active, closed = pp.update_pps_conf(
'kraken',
acctid,
trade_records=trans,
ledger_reload={}.fromkeys(t.bsuid for t in trans),
)
with open_ledger(acctid, trades) as trans:
active, closed = pp.update_pps_conf(
'kraken',
acctid,
trade_records=trans,
ledger_reload={}.fromkeys(t.bsuid for t in trans),
)
position_msgs: list[dict] = []
pps: dict[int, pp.Position]
@ -330,7 +290,7 @@ async def trades_dialogue(
avg_price=p.be_price,
currency='',
)
position_msgs.append(msg.dict())
position_msgs.append(msg)
await ctx.started(
(position_msgs, [acc_name])
@ -357,135 +317,441 @@ async def trades_dialogue(
),
) as ws,
trio.open_nursery() as n,
aclosing(stream_messages(ws)) as stream,
):
# task local msg dialog tracking
emsflow: dict[
str,
list[MsgUnion],
] = {}
# 2way map for ems ids to kraken int reqids..
ids: bidict[str, int] = bidict()
reqids2txids: dict[int, str] = {}
# task for processing inbound requests from ems
n.start_soon(handle_order_requests, client, ems_stream)
n.start_soon(
handle_order_requests,
ws,
client,
ems_stream,
token,
emsflow,
ids,
reqids2txids,
)
count: int = 0
# enter relay loop
await handle_order_updates(
ws,
stream,
ems_stream,
emsflow,
ids,
reqids2txids,
trans,
acctid,
acc_name,
token,
)
# process and relay trades events to ems
async def handle_order_updates(
ws: NoBsWs,
ws_stream: AsyncIterator,
ems_stream: tractor.MsgStream,
emsflow: dict[str, list[MsgUnion]],
ids: bidict[str, int],
reqids2txids: dict[int, str],
trans: list[pp.Transaction],
acctid: str,
acc_name: str,
token: str,
) -> None:
'''
Main msg handling loop for all things order management.
This code is broken out to make the context explicit and state variables
defined in the signature clear to the reader.
'''
# transaction records which will be updated
# on new trade clearing events (aka order "fills")
trans: list[pp.Transaction]
async for msg in ws_stream:
match msg:
# process and relay clearing trade events to ems
# https://docs.kraken.com/websockets/#message-ownTrades
async for msg in stream_messages(ws):
match msg:
case [
trades_msgs,
'ownTrades',
{'sequence': seq},
]:
# XXX: do we actually need this orrr?
# ensure that we are only processing new trades?
assert seq > count
count += 1
case [
trades_msgs,
'ownTrades',
# won't exist for historical values?
# 'userref': reqid,
{'sequence': seq},
]:
# flatten msgs to an {id -> data} table for processing
trades = {
tid: trade
for entry in trades_msgs
for (tid, trade) in entry.items()
# flatten msgs for processing
trades = {
tid: trade
for entry in trades_msgs
for (tid, trade) in entry.items()
# only emit entries which are already not-in-ledger
if tid not in {r.tid for r in trans}
}
for tid, trade in trades.items():
# only emit entries which are already not-in-ledger
if tid not in {r.tid for r in trans}
}
for tid, trade in trades.items():
# NOTE: try to get the requid sent in the order
# request message if posssible; it may not be
# provided since this sub also returns generic
# historical trade events.
reqid = trade.get('userref', trade['ordertxid'])
# parse-cast
reqid = trade['ordertxid']
action = trade['type']
price = float(trade['price'])
size = float(trade['vol'])
broker_time = float(trade['time'])
action = trade['type']
price = float(trade['price'])
size = float(trade['vol'])
broker_time = float(trade['time'])
# send a fill msg for gui update
fill_msg = BrokerdFill(
reqid=reqid,
time_ns=time.time_ns(),
action=action,
size=size,
price=price,
# TODO: maybe capture more msg data
# i.e fees?
broker_details={'name': 'kraken'},
broker_time=broker_time
)
await ems_stream.send(fill_msg)
filled_msg = BrokerdStatus(
reqid=reqid,
time_ns=time.time_ns(),
account=acc_name,
status='filled',
filled=size,
reason='Order filled by kraken',
broker_details={
'name': 'kraken',
'broker_time': broker_time
},
# TODO: figure out if kraken gives a count
# of how many units of underlying were
# filled. Alternatively we can decrement
# this value ourselves by associating and
# calcing from the diff with the original
# client-side request, see:
# https://github.com/pikers/piker/issues/296
remaining=0,
)
await ems_stream.send(filled_msg)
# update ledger and position tracking
with open_ledger(acctid, trades) as trans:
active, closed = pp.update_pps_conf(
'kraken',
acctid,
trade_records=trans,
ledger_reload={}.fromkeys(
t.bsuid for t in trans),
)
# emit any new pp msgs to ems
for pos in filter(
bool,
chain(active.values(), closed.values()),
):
pp_msg = BrokerdPosition(
broker='kraken',
# XXX: ok so this is annoying, we're
# relaying an account name with the
# backend suffix prefixed but when
# reading accounts from ledgers we
# don't need it and/or it's prefixed
# in the section table.. we should
# just strip this from the message
# right since `.broker` is already
# included?
account=f'kraken.{acctid}',
symbol=pos.symbol.front_fqsn(),
size=pos.size,
avg_price=pos.be_price,
# TODO
# currency=''
)
await ems_stream.send(pp_msg)
# process and relay order state change events
# https://docs.kraken.com/websockets/#message-openOrders
case [
order_msgs,
'openOrders',
{'sequence': seq},
]:
for order_msg in order_msgs:
log.info(
f'Order msg update_{seq}:\n'
f'{pformat(order_msg)}'
)
txid, update_msg = list(order_msg.items())[0]
match update_msg:
# we ignore internal order updates triggered by
# kraken's "edit" endpoint.
case {
'cancel_reason': 'Order replaced',
'status': status,
'userref': reqid,
**rest,
}:
continue
case {
'status': status,
'userref': reqid,
**rest,
# XXX: eg. of remaining msg schema:
# 'avg_price': _,
# 'cost': _,
# 'descr': {
# 'close': None,
# 'leverage': None,
# 'order': descr,
# 'ordertype': 'limit',
# 'pair': 'XMR/EUR',
# 'price': '74.94000000',
# 'price2': '0.00000000',
# 'type': 'buy'
# },
# 'expiretm': None,
# 'fee': '0.00000000',
# 'limitprice': '0.00000000',
# 'misc': '',
# 'oflags': 'fciq',
# 'opentm': '1656966131.337344',
# 'refid': None,
# 'starttm': None,
# 'stopprice': '0.00000000',
# 'timeinforce': 'GTC',
# 'vol': submit_vlm, # '13.34400854',
# 'vol_exec': exec_vlm, # 0.0000
}:
ems_status = {
'open': 'submitted',
'closed': 'cancelled',
'canceled': 'cancelled',
# do we even need to forward
# this state to the ems?
'pending': 'pending',
}[status]
submit_vlm = rest.get('vol', 0)
exec_vlm = rest.get('vol_exec', 0)
reqids2txids[reqid] = txid
oid = ids.inverse.get(reqid)
if not oid:
# TODO: handle these and relay them
# through the EMS to the client / UI
# side!
log.warning(
f'Received active order {txid}:\n'
f'{update_msg}\n'
'Cancelling order for now!..'
)
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'reqid': reqid,
'txid': [txid],
})
continue
msgs = emsflow[oid]
# send BrokerdStatus messages for all
# order state updates
resp = BrokerdStatus(
# send a fill msg for gui update
fill_msg = BrokerdFill(
reqid=reqid,
time_ns=time.time_ns(),
action=action,
size=size,
price=price,
# TODO: maybe capture more msg data
# i.e fees?
broker_details={'name': 'kraken'},
broker_time=broker_time
)
await ems_stream.send(fill_msg.dict())
filled_msg = BrokerdStatus(
reqid=reqid,
time_ns=time.time_ns(),
account=acc_name,
status='filled',
filled=size,
reason='Order filled by kraken',
broker_details={
'name': 'kraken',
'broker_time': broker_time
},
# TODO: figure out if kraken gives a count
# of how many units of underlying were
# filled. Alternatively we can decrement
# this value ourselves by associating and
# calcing from the diff with the original
# client-side request, see:
# https://github.com/pikers/piker/issues/296
remaining=0,
)
await ems_stream.send(filled_msg.dict())
# update ledger and position tracking
trans = await update_ledger(acctid, trades)
active, closed = pp.update_pps_conf(
'kraken',
acctid,
trade_records=trans,
ledger_reload={}.fromkeys(
t.bsuid for t in trans),
)
# emit pp msgs
for pos in filter(
bool,
chain(active.values(), closed.values()),
):
pp_msg = BrokerdPosition(
broker='kraken',
# XXX: ok so this is annoying, we're
# relaying an account name with the
# backend suffix prefixed but when
# reading accounts from ledgers we
# don't need it and/or it's prefixed
# in the section table.. we should
# just strip this from the message
# right since `.broker` is already
# included?
time_ns=time.time_ns(), # cuz why not
account=f'kraken.{acctid}',
symbol=pos.symbol.front_fqsn(),
size=pos.size,
avg_price=pos.be_price,
# TODO
# currency=''
# everyone doin camel case..
status=ems_status, # force lower case
filled=exec_vlm,
reason='', # why held?
remaining=(
float(submit_vlm)
-
float(exec_vlm)
),
broker_details=dict(
{'name': 'kraken'}, **update_msg
),
)
await ems_stream.send(pp_msg.dict())
msgs.append(resp)
await ems_stream.send(resp)
case [
trades_msgs,
'openOrders',
{'sequence': seq},
]:
# TODO: async order update handling which we
# should remove from `handle_order_requests()`
# above:
# https://github.com/pikers/piker/issues/293
# https://github.com/pikers/piker/issues/310
log.info(f'Order update {seq}:{trades_msgs}')
case _:
log.warning(
'Unknown orders msg:\n'
f'{txid}:{order_msg}'
)
case _:
log.warning(f'Unhandled trades msg: {msg}')
await tractor.breakpoint()
case {
'event': etype,
'status': status,
'reqid': reqid,
**rest,
} as event if (
etype in {
'addOrderStatus',
'editOrderStatus',
'cancelOrderStatus',
}
):
oid = ids.inverse.get(reqid)
if not oid:
log.warning(
'Unknown order status update?:\n'
f'{event}'
)
continue
txid = rest.get('txid')
if txid:
reqids2txids[reqid] = txid
msgs = emsflow[oid]
last = msgs[-1]
resps, errored = process_status(
event,
oid,
token,
msgs,
last,
)
if resps:
msgs.extend(resps)
for resp in resps:
await ems_stream.send(resp)
case _:
log.warning(f'Unhandled trades update msg: {msg}')
def process_status(
event: dict[str, str],
oid: str,
token: str,
msgs: list[MsgUnion],
last: MsgUnion,
) -> tuple[list[MsgUnion], bool]:
'''
Process `'[add/edit/cancel]OrderStatus'` events by translating to
and returning the equivalent EMS-msg responses.
'''
match event:
case {
'event': etype,
'status': 'error',
'reqid': reqid,
'errorMessage': errmsg,
}:
# any of ``{'add', 'edit', 'cancel'}``
action = etype.rstrip('OrderStatus')
log.error(
f'Failed to {action} order {reqid}:\n'
f'{errmsg}'
)
resp = BrokerdError(
oid=oid,
# XXX: use old reqid in case it changed?
reqid=reqid,
symbol=getattr(last, 'symbol', 'N/A'),
reason=f'Failed {action}:\n{errmsg}',
broker_details=event
)
return [resp], True
# successful request cases
case {
'event': 'addOrderStatus',
'status': "ok",
'reqid': reqid, # oid from ems side
'txid': txid,
'descr': descr, # only on success?
}:
log.info(
f'Submitting order: {descr}\n'
f'ems oid: {oid}\n'
f're-mapped reqid: {reqid}\n'
f'txid: {txid}\n'
)
return [], False
case {
'event': 'editOrderStatus',
'status': "ok",
'reqid': reqid, # oid from ems side
'descr': descr,
# NOTE: for edit request this is a new value
'txid': txid,
'originaltxid': origtxid,
}:
log.info(
f'Editting order {oid}[requid={reqid}]:\n'
f'txid: {origtxid} -> {txid}\n'
f'{descr}'
)
# deliver another ack to update the ems-side `.reqid`.
return [], False
case {
"event": "cancelOrderStatus",
"status": "ok",
'reqid': reqid,
# XXX: sometimes this isn't provided!?
# 'txid': txids,
**rest,
}:
# TODO: should we support "batch" acking of
# multiple cancels thus avoiding the below loop?
resps: list[MsgUnion] = []
for txid in rest.get('txid', [last.reqid]):
resp = BrokerdStatus(
reqid=reqid,
account=last.account,
time_ns=time.time_ns(),
status='cancelled',
reason='Cancel success: {oid}@{txid}',
broker_details=event,
)
resps.append(resp)
return resps, False
def norm_trade_records(
@ -494,10 +760,9 @@ def norm_trade_records(
) -> list[pp.Transaction]:
records: list[pp.Transaction] = []
for tid, record in ledger.items():
size = record.get('vol') * {
size = float(record.get('vol')) * {
'buy': 1,
'sell': -1,
}[record['type']]
@ -508,7 +773,7 @@ def norm_trade_records(
pp.Transaction(
fqsn=f'{norm_sym}.kraken',
tid=tid,
size=float(size),
size=size,
price=float(record['price']),
cost=float(record['fee']),
dt=pendulum.from_timestamp(float(record['time'])),
@ -522,19 +787,24 @@ def norm_trade_records(
return records
async def update_ledger(
@cm
def open_ledger(
acctid: str,
trade_entries: list[dict[str, Any]],
) -> list[pp.Transaction]:
'''
Write recent session's trades to the user's (local) ledger file.
# write recent session's trades to the user's (local) ledger file.
'''
with pp.open_trade_ledger(
'kraken',
acctid,
) as ledger:
ledger.update(trade_entries)
# normalize to transaction form
records = norm_trade_records(trade_entries)
return records
# normalize to transaction form
records = norm_trade_records(trade_entries)
yield records
# update on exit
ledger.update(trade_entries)

View File

@ -31,7 +31,6 @@ import time
from fuzzywuzzy import process as fuzzy
import numpy as np
import pendulum
from pydantic import BaseModel
from trio_typing import TaskStatus
import tractor
import trio
@ -45,6 +44,7 @@ from piker.brokers._util import (
)
from piker.log import get_console_log
from piker.data import ShmArray
from piker.data.types import Struct
from piker.data._web_bs import open_autorecon_ws, NoBsWs
from . import log
from .api import (
@ -54,7 +54,7 @@ from .api import (
# https://www.kraken.com/features/api#get-tradable-pairs
class Pair(BaseModel):
class Pair(Struct):
altname: str # alternate pair name
wsname: str # WebSocket pair name (if available)
aclass_base: str # asset class of base component
@ -117,9 +117,8 @@ async def stream_messages(
too_slow_count = 0
continue
if isinstance(msg, dict):
if msg.get('event') == 'heartbeat':
match msg:
case {'event': 'heartbeat'}:
now = time.time()
delay = now - last_hb
last_hb = now
@ -130,11 +129,20 @@ async def stream_messages(
continue
err = msg.get('errorMessage')
if err:
raise BrokerError(err)
else:
yield msg
case {
'connectionID': _,
'event': 'systemStatus',
'status': 'online',
'version': _,
} as msg:
log.info(
'WS connection is up:\n'
f'{msg}'
)
continue
case _:
yield msg
async def process_data_feed_msgs(
@ -145,37 +153,60 @@ async def process_data_feed_msgs(
'''
async for msg in stream_messages(ws):
match msg:
case {
'errorMessage': errmsg
}:
raise BrokerError(errmsg)
chan_id, *payload_array, chan_name, pair = msg
case {
'event': 'subscriptionStatus',
} as sub:
log.info(
'WS subscription is active:\n'
f'{sub}'
)
continue
if 'ohlc' in chan_name:
case [
chan_id,
*payload_array,
chan_name,
pair
]:
if 'ohlc' in chan_name:
yield 'ohlc', OHLC(
chan_id,
chan_name,
pair,
*payload_array[0]
)
yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0])
elif 'spread' in chan_name:
elif 'spread' in chan_name:
bid, ask, ts, bsize, asize = map(
float, payload_array[0])
bid, ask, ts, bsize, asize = map(float, payload_array[0])
# TODO: really makes you think IB has a horrible API...
quote = {
'symbol': pair.replace('/', ''),
'ticks': [
{'type': 'bid', 'price': bid, 'size': bsize},
{'type': 'bsize', 'price': bid, 'size': bsize},
# TODO: really makes you think IB has a horrible API...
quote = {
'symbol': pair.replace('/', ''),
'ticks': [
{'type': 'bid', 'price': bid, 'size': bsize},
{'type': 'bsize', 'price': bid, 'size': bsize},
{'type': 'ask', 'price': ask, 'size': asize},
{'type': 'asize', 'price': ask, 'size': asize},
],
}
yield 'l1', quote
{'type': 'ask', 'price': ask, 'size': asize},
{'type': 'asize', 'price': ask, 'size': asize},
],
}
yield 'l1', quote
# elif 'book' in msg[-2]:
# chan_id, *payload_array, chan_name, pair = msg
# print(msg)
# elif 'book' in msg[-2]:
# chan_id, *payload_array, chan_name, pair = msg
# print(msg)
else:
print(f'UNHANDLED MSG: {msg}')
yield msg
case _:
print(f'UNHANDLED MSG: {msg}')
# yield msg
def normalize(
@ -316,7 +347,7 @@ async def stream_quotes(
sym = sym.upper()
si = Pair(**await client.symbol_info(sym)) # validation
syminfo = si.dict()
syminfo = si.to_dict()
syminfo['price_tick_size'] = 1 / 10**si.pair_decimals
syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals
syminfo['asset_type'] = 'crypto'
@ -385,7 +416,7 @@ async def stream_quotes(
msg_gen = process_data_feed_msgs(ws)
# TODO: use ``anext()`` when it lands in 3.10!
typ, ohlc_last = await msg_gen.__anext__()
typ, ohlc_last = await anext(msg_gen)
topic, quote = normalize(ohlc_last)

View File

@ -22,10 +22,9 @@ from enum import Enum
from typing import Optional
from bidict import bidict
from pydantic import BaseModel, validator
# from msgspec import Struct
from ..data._source import Symbol
from ..data.types import Struct
from ..pp import Position
@ -41,33 +40,30 @@ SizeUnit = Enum(
)
class Allocator(BaseModel):
class Config:
validate_assignment = True
copy_on_model_validation = False
arbitrary_types_allowed = True
# required to get the account validator lookup working?
extra = 'allow'
underscore_attrs_are_private = False
class Allocator(Struct):
symbol: Symbol
account: Optional[str] = 'paper'
_size_units: bidict[str, Optional[str]] = _size_units
# TODO: for enums this clearly doesn't fucking work, you can't set
# a default at startup by passing in a `dict` but yet you can set
# that value through assignment..for wtv cucked reason.. honestly, pure
# unintuitive garbage.
size_unit: str = 'currency'
_size_units: dict[str, Optional[str]] = _size_units
_size_unit: str = 'currency'
@validator('size_unit', pre=True)
def maybe_lookup_key(cls, v):
# apply the corresponding enum key for the text "description" value
@property
def size_unit(self) -> str:
return self._size_unit
@size_unit.setter
def size_unit(self, v: str) -> Optional[str]:
if v not in _size_units:
return _size_units.inverse[v]
v = _size_units.inverse[v]
assert v in _size_units
self._size_unit = v
return v
# TODO: if we ever want ot support non-uniform entry-slot-proportion
@ -262,7 +258,7 @@ def mk_allocator(
# default allocation settings
defaults: dict[str, float] = {
'account': None, # select paper by default
'size_unit': 'currency',
# 'size_unit': 'currency',
'units_limit': 400,
'currency_limit': 5e3,
'slots': 4,
@ -301,6 +297,9 @@ def mk_allocator(
# entry step 1.0
alloc.units_limit = alloc.slots
else:
alloc.size_unit = 'currency'
# if the current position is already greater then the limit
# settings, increase the limit to the current position
if alloc.size_unit == 'currency':

View File

@ -58,11 +58,11 @@ class OrderBook:
def send(
self,
msg: Order,
msg: Order | dict,
) -> dict:
self._sent_orders[msg.oid] = msg
self._to_ems.send_nowait(msg.dict())
self._to_ems.send_nowait(msg)
return msg
def update(
@ -73,9 +73,8 @@ class OrderBook:
) -> dict:
cmd = self._sent_orders[uuid]
msg = cmd.dict()
msg.update(data)
self._sent_orders[uuid] = Order(**msg)
msg = cmd.copy(update=data)
self._sent_orders[uuid] = msg
self._to_ems.send_nowait(msg)
return cmd
@ -88,7 +87,7 @@ class OrderBook:
oid=uuid,
symbol=cmd.symbol,
)
self._to_ems.send_nowait(msg.dict())
self._to_ems.send_nowait(msg)
_orders: OrderBook = None
@ -149,7 +148,7 @@ async def relay_order_cmds_from_sync_code(
book = get_orders()
async with book._from_order_book.subscribe() as orders_stream:
async for cmd in orders_stream:
if cmd['symbol'] == symbol_key:
if cmd.symbol == symbol_key:
log.info(f'Send order cmd:\n{pformat(cmd)}')
# send msg over IPC / wire
await to_ems_stream.send(cmd)

View File

@ -26,7 +26,6 @@ import time
from typing import AsyncIterator, Callable
from bidict import bidict
from pydantic import BaseModel
import trio
from trio_typing import TaskStatus
import tractor
@ -34,6 +33,7 @@ import tractor
from ..log import get_logger
from ..data._normalize import iterticks
from ..data.feed import Feed, maybe_open_feed
from ..data.types import Struct
from .._daemon import maybe_spawn_brokerd
from . import _paper_engine as paper
from ._messages import (
@ -88,7 +88,8 @@ def mk_check(
@dataclass
class _DarkBook:
'''EMS-trigger execution book.
'''
EMS-trigger execution book.
Contains conditions for executions (aka "orders" or "triggers")
which are not exposed to brokers and thus the market; i.e. these are
@ -231,7 +232,7 @@ async def clear_dark_triggers(
price=submit_price,
size=cmd['size'],
)
await brokerd_orders_stream.send(msg.dict())
await brokerd_orders_stream.send(msg)
# mark this entry as having sent an order
# request. the entry will be replaced once the
@ -247,14 +248,11 @@ async def clear_dark_triggers(
msg = Status(
oid=oid, # ems order id
resp=resp,
time_ns=time.time_ns(),
symbol=fqsn,
resp=resp,
trigger_price=price,
broker_details={'name': broker},
cmd=cmd, # original request message
).dict()
brokerd_msg=cmd,
)
# remove exec-condition from set
log.info(f'removing pred for {oid}')
@ -303,7 +301,7 @@ class TradesRelay:
consumers: int = 0
class Router(BaseModel):
class Router(Struct):
'''
Order router which manages and tracks per-broker dark book,
alerts, clearing and related data feed management.
@ -324,10 +322,6 @@ class Router(BaseModel):
# brokername to trades-dialogues streams with ``brokerd`` actors
relays: dict[str, TradesRelay] = {}
class Config:
arbitrary_types_allowed = True
underscore_attrs_are_private = False
def get_dark_book(
self,
brokername: str,
@ -581,11 +575,11 @@ async def translate_and_relay_brokerd_events(
if name == 'position':
pos_msg = BrokerdPosition(**brokerd_msg).dict()
pos_msg = BrokerdPosition(**brokerd_msg)
# XXX: this will be useful for automatic strats yah?
# keep pps per account up to date locally in ``emsd`` mem
sym, broker = pos_msg['symbol'], pos_msg['broker']
sym, broker = pos_msg.symbol, pos_msg.broker
relay.positions.setdefault(
# NOTE: translate to a FQSN!
@ -652,6 +646,13 @@ async def translate_and_relay_brokerd_events(
else:
# check for existing live flow entry
entry = book._ems_entries.get(oid)
old_reqid = entry.reqid
if old_reqid and old_reqid != reqid:
log.warning(
f'Brokerd order id change for {oid}:\n'
f'{old_reqid} -> {reqid}'
)
# initial response to brokerd order request
if name == 'ack':
@ -662,6 +663,10 @@ async def translate_and_relay_brokerd_events(
# a ``BrokerdOrderAck`` **must** be sent after an order
# request in order to establish this id mapping.
book._ems2brokerd_ids[oid] = reqid
log.info(
'Rx ACK for order\n'
f'oid: {oid} -> reqid: {reqid}'
)
# new order which has not yet be registered into the
# local ems book, insert it now and handle 2 cases:
@ -676,7 +681,7 @@ async def translate_and_relay_brokerd_events(
entry.reqid = reqid
# tell broker to cancel immediately
await brokerd_trades_stream.send(entry.dict())
await brokerd_trades_stream.send(entry)
# - the order is now active and will be mirrored in
# our book -> registered as live flow
@ -716,7 +721,7 @@ async def translate_and_relay_brokerd_events(
# if 10147 in message: cancel
resp = 'broker_errored'
broker_details = msg.dict()
broker_details = msg
# don't relay message to order requester client
# continue
@ -751,7 +756,7 @@ async def translate_and_relay_brokerd_events(
resp = 'broker_' + msg.status
# pass the BrokerdStatus msg inside the broker details field
broker_details = msg.dict()
broker_details = msg
elif name in (
'fill',
@ -760,7 +765,7 @@ async def translate_and_relay_brokerd_events(
# proxy through the "fill" result(s)
resp = 'broker_filled'
broker_details = msg.dict()
broker_details = msg
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
@ -778,7 +783,7 @@ async def translate_and_relay_brokerd_events(
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=broker_details,
).dict()
)
)
except KeyError:
log.error(
@ -843,14 +848,14 @@ async def process_client_order_cmds(
# NOTE: cancel response will be relayed back in messages
# from corresponding broker
if reqid:
if reqid is not None:
# send cancel to brokerd immediately!
log.info(
f'Submitting cancel for live order {reqid}'
)
await brokerd_order_stream.send(msg.dict())
await brokerd_order_stream.send(msg)
else:
# this might be a cancel for an order that hasn't been
@ -872,7 +877,7 @@ async def process_client_order_cmds(
resp='dark_cancelled',
oid=oid,
time_ns=time.time_ns(),
).dict()
)
)
# de-register this client dialogue
router.dialogues.pop(oid)
@ -927,7 +932,7 @@ async def process_client_order_cmds(
# handle relaying the ems side responses back to
# the client/cmd sender from this request
log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
await brokerd_order_stream.send(msg.dict())
await brokerd_order_stream.send(msg)
# an immediate response should be ``BrokerdOrderAck``
# with ems order id from the ``trades_dialogue()``
@ -1007,7 +1012,7 @@ async def process_client_order_cmds(
resp=resp,
oid=oid,
time_ns=time.time_ns(),
).dict()
)
)

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -15,21 +15,26 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Clearing system messagingn types and protocols.
Clearing sub-system message and protocols.
"""
from typing import Optional, Union
# TODO: try out just encoding/send direction for now?
# import msgspec
from pydantic import BaseModel
from ..data._source import Symbol
from ..data.types import Struct
# TODO: ``msgspec`` stuff worth paying attention to:
# - schema evolution: https://jcristharif.com/msgspec/usage.html#schema-evolution
# - use literals for a common msg determined by diff keys?
# - https://jcristharif.com/msgspec/usage.html#literal
# - for eg. ``BrokerdStatus``, instead just have separate messages?
# --------------
# Client -> emsd
# --------------
class Cancel(BaseModel):
class Cancel(Struct):
'''Cancel msg for removing a dark (ems triggered) or
broker-submitted (live) trigger/order.
@ -39,8 +44,10 @@ class Cancel(BaseModel):
symbol: str
class Order(BaseModel):
class Order(Struct):
# TODO: use ``msgspec.Literal``
# https://jcristharif.com/msgspec/usage.html#literal
action: str # {'buy', 'sell', 'alert'}
# internal ``emdsd`` unique "order id"
oid: str # uuid4
@ -48,6 +55,9 @@ class Order(BaseModel):
account: str # should we set a default as '' ?
price: float
# TODO: could we drop the ``.action`` field above and instead just
# use +/- values here? Would make the msg smaller at the sake of a
# teensie fp precision?
size: float
brokers: list[str]
@ -59,20 +69,14 @@ class Order(BaseModel):
# the backend broker
exec_mode: str # {'dark', 'live', 'paper'}
class Config:
# just for pre-loading a ``Symbol`` when used
# in the order mode staging process
arbitrary_types_allowed = True
# don't copy this model instance when used in
# a recursive model
copy_on_model_validation = False
# --------------
# Client <- emsd
# --------------
# update msgs from ems which relay state change info
# from the active clearing engine.
class Status(BaseModel):
class Status(Struct):
name: str = 'status'
oid: str # uuid4
@ -95,8 +99,6 @@ class Status(BaseModel):
# }
resp: str # "response", see above
# symbol: str
# trigger info
trigger_price: Optional[float] = None
# price: float
@ -111,10 +113,12 @@ class Status(BaseModel):
brokerd_msg: dict = {}
# ---------------
# emsd -> brokerd
# ---------------
# requests *sent* from ems to respective backend broker daemon
class BrokerdCancel(BaseModel):
class BrokerdCancel(Struct):
action: str = 'cancel'
oid: str # piker emsd order id
@ -130,7 +134,7 @@ class BrokerdCancel(BaseModel):
reqid: Optional[Union[int, str]] = None
class BrokerdOrder(BaseModel):
class BrokerdOrder(Struct):
action: str # {buy, sell}
oid: str
@ -150,11 +154,12 @@ class BrokerdOrder(BaseModel):
size: float
# ---------------
# emsd <- brokerd
# ---------------
# requests *received* to ems from broker backend
class BrokerdOrderAck(BaseModel):
class BrokerdOrderAck(Struct):
'''
Immediate reponse to a brokerd order request providing the broker
specific unique order id so that the EMS can associate this
@ -172,7 +177,7 @@ class BrokerdOrderAck(BaseModel):
account: str = ''
class BrokerdStatus(BaseModel):
class BrokerdStatus(Struct):
name: str = 'status'
reqid: Union[int, str]
@ -205,7 +210,7 @@ class BrokerdStatus(BaseModel):
}
class BrokerdFill(BaseModel):
class BrokerdFill(Struct):
'''
A single message indicating a "fill-details" event from the broker
if avaiable.
@ -230,7 +235,7 @@ class BrokerdFill(BaseModel):
broker_time: float
class BrokerdError(BaseModel):
class BrokerdError(Struct):
'''
Optional error type that can be relayed to emsd for error handling.
@ -249,7 +254,7 @@ class BrokerdError(BaseModel):
broker_details: dict = {}
class BrokerdPosition(BaseModel):
class BrokerdPosition(Struct):
'''Position update event from brokerd.
'''

View File

@ -117,7 +117,7 @@ class PaperBoi:
reason='paper_trigger',
remaining=size,
)
await self.ems_trades_stream.send(msg.dict())
await self.ems_trades_stream.send(msg)
# if we're already a clearing price simulate an immediate fill
if (
@ -173,7 +173,7 @@ class PaperBoi:
broker=self.broker,
time_ns=time.time_ns(),
)
await self.ems_trades_stream.send(msg.dict())
await self.ems_trades_stream.send(msg)
async def fake_fill(
self,
@ -216,7 +216,7 @@ class PaperBoi:
'name': self.broker + '_paper',
},
)
await self.ems_trades_stream.send(msg.dict())
await self.ems_trades_stream.send(msg)
if order_complete:
@ -240,7 +240,7 @@ class PaperBoi:
'name': self.broker,
},
)
await self.ems_trades_stream.send(msg.dict())
await self.ems_trades_stream.send(msg)
# lookup any existing position
token = f'{symbol}.{self.broker}'
@ -268,7 +268,7 @@ class PaperBoi:
)
pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price)
await self.ems_trades_stream.send(pp_msg.dict())
await self.ems_trades_stream.send(pp_msg)
async def simulate_fills(
@ -384,7 +384,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'Paper only. No account found: `{account}` ?',
).dict())
))
continue
# validate
@ -416,7 +416,7 @@ async def handle_order_requests(
# broker specific request id
reqid=reqid,
).dict()
)
)
elif action == 'cancel':

View File

@ -27,13 +27,14 @@ from multiprocessing.shared_memory import SharedMemory, _USE_POSIX
if _USE_POSIX:
from _posixshmem import shm_unlink
import tractor
# import msgspec
import numpy as np
from pydantic import BaseModel
from numpy.lib import recfunctions as rfn
import tractor
from ..log import get_logger
from ._source import base_iohlc_dtype
from .types import Struct
log = get_logger(__name__)
@ -107,15 +108,12 @@ class SharedInt:
log.warning(f'Shm for {name} already unlinked?')
class _Token(BaseModel):
class _Token(Struct, frozen=True):
'''
Internal represenation of a shared memory "token"
which can be used to key a system wide post shm entry.
'''
class Config:
frozen = True
shm_name: str # this servers as a "key" value
shm_first_index_name: str
shm_last_index_name: str
@ -126,17 +124,22 @@ class _Token(BaseModel):
return np.dtype(list(map(tuple, self.dtype_descr))).descr
def as_msg(self):
return self.dict()
return self.to_dict()
@classmethod
def from_msg(cls, msg: dict) -> _Token:
if isinstance(msg, _Token):
return msg
# TODO: native struct decoding
# return _token_dec.decode(msg)
msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
return _Token(**msg)
# _token_dec = msgspec.msgpack.Decoder(_Token)
# TODO: this api?
# _known_tokens = tractor.ActorVar('_shm_tokens', {})
# _known_tokens = tractor.ContextStack('_known_tokens', )
@ -167,7 +170,7 @@ def _make_token(
shm_name=key,
shm_first_index_name=key + "_first",
shm_last_index_name=key + "_last",
dtype_descr=np.dtype(dtype).descr
dtype_descr=tuple(np.dtype(dtype).descr)
)

View File

@ -42,7 +42,6 @@ from trio_typing import TaskStatus
import trimeter
import tractor
from tractor.trionics import maybe_open_context
from pydantic import BaseModel
import pendulum
import numpy as np
@ -59,6 +58,7 @@ from ._sharedmem import (
ShmArray,
)
from .ingest import get_ingestormod
from .types import Struct
from ._source import (
base_iohlc_dtype,
Symbol,
@ -84,7 +84,7 @@ if TYPE_CHECKING:
log = get_logger(__name__)
class _FeedsBus(BaseModel):
class _FeedsBus(Struct):
'''
Data feeds broadcaster and persistence management.
@ -100,10 +100,6 @@ class _FeedsBus(BaseModel):
a dedicated cancel scope.
'''
class Config:
arbitrary_types_allowed = True
underscore_attrs_are_private = False
brokername: str
nursery: trio.Nursery
feeds: dict[str, tuple[dict, dict]] = {}

View File

@ -0,0 +1,68 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Built-in (extension) types.
"""
from typing import Optional
from pprint import pformat
import msgspec
class Struct(
msgspec.Struct,
# https://jcristharif.com/msgspec/structs.html#tagged-unions
# tag='pikerstruct',
# tag=True,
):
'''
A "human friendlier" (aka repl buddy) struct subtype.
'''
def to_dict(self) -> dict:
return {
f: getattr(self, f)
for f in self.__struct_fields__
}
def __repr__(self):
return f'Struct({pformat(self.to_dict())})'
def copy(
self,
update: Optional[dict] = None,
) -> msgspec.Struct:
'''
Validate-typecast all self defined fields, return a copy of us
with all such fields.
This is kinda like the default behaviour in `pydantic.BaseModel`.
'''
if update:
for k, v in update.items():
setattr(self, k, v)
# roundtrip serialize to validate
return msgspec.msgpack.Decoder(
type=type(self)
).decode(
msgspec.msgpack.Encoder().encode(self)
)

View File

@ -21,7 +21,6 @@ Qt event proxying and processing using ``trio`` mem chans.
from contextlib import asynccontextmanager, AsyncExitStack
from typing import Callable
from pydantic import BaseModel
import trio
from PyQt5 import QtCore
from PyQt5.QtCore import QEvent, pyqtBoundSignal
@ -30,6 +29,8 @@ from PyQt5.QtWidgets import (
QGraphicsSceneMouseEvent as gs_mouse,
)
from ..data.types import Struct
MOUSE_EVENTS = {
gs_mouse.GraphicsSceneMousePress,
@ -43,13 +44,10 @@ MOUSE_EVENTS = {
# TODO: maybe consider some constrained ints down the road?
# https://pydantic-docs.helpmanual.io/usage/types/#constrained-types
class KeyboardMsg(BaseModel):
class KeyboardMsg(Struct):
'''Unpacked Qt keyboard event data.
'''
class Config:
arbitrary_types_allowed = True
event: QEvent
etype: int
key: int
@ -57,16 +55,13 @@ class KeyboardMsg(BaseModel):
txt: str
def to_tuple(self) -> tuple:
return tuple(self.dict().values())
return tuple(self.to_dict().values())
class MouseMsg(BaseModel):
class MouseMsg(Struct):
'''Unpacked Qt keyboard event data.
'''
class Config:
arbitrary_types_allowed = True
event: QEvent
etype: int
button: int

View File

@ -619,7 +619,7 @@ class FillStatusBar(QProgressBar):
# color: #19232D;
# width: 10px;
self.setRange(0, slots)
self.setRange(0, int(slots))
self.setValue(value)

View File

@ -22,12 +22,9 @@ from __future__ import annotations
from typing import (
Optional, Generic,
TypeVar, Callable,
Literal,
)
import enum
import sys
from pydantic import BaseModel, validator
# from pydantic import BaseModel, validator
from pydantic.generics import GenericModel
from PyQt5.QtWidgets import (
QWidget,
@ -38,6 +35,7 @@ from ._forms import (
# FontScaledDelegate,
Edit,
)
from ..data.types import Struct
DataType = TypeVar('DataType')
@ -62,7 +60,7 @@ class Selection(Field[DataType], Generic[DataType]):
options: dict[str, DataType]
# value: DataType = None
@validator('value') # , always=True)
# @validator('value') # , always=True)
def set_value_first(
cls,
@ -100,7 +98,7 @@ class Edit(Field[DataType], Generic[DataType]):
widget_factory = Edit
class AllocatorPane(BaseModel):
class AllocatorPane(Struct):
account = Selection[str](
options=dict.fromkeys(

View File

@ -27,7 +27,6 @@ import time
from typing import Optional, Dict, Callable, Any
import uuid
from pydantic import BaseModel
import tractor
import trio
from PyQt5.QtCore import Qt
@ -41,6 +40,7 @@ from ..clearing._allocate import (
from ._style import _font
from ..data._source import Symbol
from ..data.feed import Feed
from ..data.types import Struct
from ..log import get_logger
from ._editors import LineEditor, ArrowEditor
from ._lines import order_line, LevelLine
@ -58,7 +58,7 @@ from ._forms import open_form_input_handling
log = get_logger(__name__)
class OrderDialog(BaseModel):
class OrderDialog(Struct):
'''
Trade dialogue meta-data describing the lifetime
of an order submission to ``emsd`` from a chart.
@ -73,10 +73,6 @@ class OrderDialog(BaseModel):
msgs: dict[str, dict] = {}
fills: Dict[str, Any] = {}
class Config:
arbitrary_types_allowed = True
underscore_attrs_are_private = False
def on_level_change_update_next_order_info(
@ -268,7 +264,8 @@ class OrderMode:
self,
) -> OrderDialog:
'''Send execution order to EMS return a level line to
'''
Send execution order to EMS return a level line to
represent the order on a chart.
'''
@ -277,13 +274,9 @@ class OrderMode:
oid = str(uuid.uuid4())
# format order data for ems
fqsn = symbol.front_fqsn()
order = staged.copy(
update={
'symbol': fqsn,
'oid': oid,
}
)
order = staged.copy()
order.oid = oid
order.symbol = symbol.front_fqsn()
line = self.line_from_order(
order,
@ -858,7 +851,9 @@ async def process_trades_and_update_ui(
# delete level line from view
mode.on_cancel(oid)
broker_msg = msg['brokerd_msg']
log.warning(f'Order {oid} failed with:\n{pformat(broker_msg)}')
log.warning(
f'Order {oid} failed with:\n{pformat(broker_msg)}'
)
elif resp in (
'dark_triggered'