Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet 869aa8251a Import adjustments to allow msg codec overriding in `tractor` 2022-07-08 10:56:50 -04:00
Tyler Goodlet d31c38ef51 Mucking with custom `msgspec.Struct` codecs
Syncs with https://github.com/goodboy/tractor/pull/311
which is nowhere near ready and this approach didn't end up being
as straight forward as hoped. We're going to need a top level
`Msg`-boxing type/protocol in `tractor` first...
2022-07-08 10:56:50 -04:00
Tyler Goodlet de91c2196d Drop remaining `BaseModel` api usage from rest of codebase 2022-07-08 10:55:02 -04:00
Tyler Goodlet 583fa79e5e Add `Struct.copy()` which does a rountrip validate 2022-07-08 10:54:04 -04:00
Tyler Goodlet 6887d4d1b0 Change all clearing msgs over to `msgspec` 2022-07-08 10:53:33 -04:00
Tyler Goodlet c87704e593 Cast slots to `int` before range set 2022-07-07 21:08:46 -04:00
Tyler Goodlet cfc08a5814 Drop pydantic from allocator 2022-07-07 21:04:53 -04:00
Tyler Goodlet c10a85a8f3 Add a custom `msgspec.Struct` with some humanizing 2022-07-07 19:36:39 -04:00
Tyler Goodlet 4e9ff65465 Finally solve the last-price-is-`nan` issue..
Not sure why I put this off for so long but the check is in now such
that if the market isn't open or no rt quote comes in from the first
query, we just pull from the last shm history 'close' value.
Includes another fix to avoid raising when a double remove on the client
side stream from the registry sometimes happens.
2022-07-07 19:15:01 -04:00
Tyler Goodlet bf5fcfe896 Fix missing container id, drop custom exception 2022-07-07 17:10:06 -04:00
14 changed files with 231 additions and 162 deletions

View File

@ -22,13 +22,13 @@ from typing import Optional, Union, Callable, Any
from contextlib import asynccontextmanager as acm
from collections import defaultdict
from msgspec import Struct
import tractor
import trio
from trio_typing import TaskStatus
from .log import get_logger, get_console_log
from .brokers import get_brokermod
from .data.types import Struct
log = get_logger(__name__)
@ -204,6 +204,9 @@ async def open_piker_runtime(
assert _services is None
# XXX: this may open a root actor as well
with tractor.msg.configure_native_msgs(
[Struct],
):
async with (
tractor.open_root_actor(
@ -260,6 +263,10 @@ async def maybe_open_pikerd(
if loglevel:
get_console_log(loglevel)
# XXX: this may open a root actor as well
with tractor.msg.configure_native_msgs(
[Struct],
):
# subtle, we must have the runtime up here or portal lookup will fail
async with maybe_open_runtime(loglevel, **kwargs):
@ -442,7 +449,7 @@ async def spawn_brokerd(
)
# non-blocking setup of brokerd service nursery
from .data import _setup_persistent_brokerd
from .data.feed import _setup_persistent_brokerd
await _services.start_service_task(
dname,

View File

@ -148,7 +148,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'No account found: `{account}` ?',
).dict())
))
continue
client = _accounts2clients.get(account)
@ -161,7 +161,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'No api client loaded for account: `{account}` ?',
).dict())
))
continue
if action in {'buy', 'sell'}:
@ -188,7 +188,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason='Order already active?',
).dict())
))
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
@ -197,9 +197,8 @@ async def handle_order_requests(
oid=order.oid,
# broker specific request id
reqid=reqid,
time_ns=time.time_ns(),
account=account,
).dict()
)
)
elif action == 'cancel':
@ -559,7 +558,7 @@ async def trades_dialogue(
cids2pps,
validate=True,
)
all_positions.extend(msg.dict() for msg in msgs)
all_positions.extend(msg for msg in msgs)
if not all_positions and cids2pps:
raise RuntimeError(
@ -665,7 +664,7 @@ async def emit_pp_update(
msg = msgs[0]
break
await ems_stream.send(msg.dict())
await ems_stream.send(msg)
async def deliver_trade_events(
@ -743,7 +742,7 @@ async def deliver_trade_events(
broker_details={'name': 'ib'},
)
await ems_stream.send(msg.dict())
await ems_stream.send(msg)
case 'fill':
@ -803,7 +802,7 @@ async def deliver_trade_events(
broker_time=trade_entry['broker_time'],
)
await ems_stream.send(msg.dict())
await ems_stream.send(msg)
# 2 cases:
# - fill comes first or
@ -879,7 +878,7 @@ async def deliver_trade_events(
cid, msg = pack_position(item)
# acctid = msg.account = accounts_def.inverse[msg.account]
# cuck ib and it's shitty fifo sys for pps!
# await ems_stream.send(msg.dict())
# await ems_stream.send(msg)
case 'event':
@ -891,7 +890,7 @@ async def deliver_trade_events(
# level...
# reqid = item.get('reqid', 0)
# if getattr(msg, 'reqid', 0) < -1:
# log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
# log.info(f"TWS triggered trade\n{pformat(msg)}")
# msg.reqid = 'tws-' + str(-1 * reqid)

View File

@ -184,7 +184,7 @@ async def handle_order_requests(
'Invalid request msg:\n{msg}'
),
).dict()
)
)
@ -282,7 +282,7 @@ async def trades_dialogue(
avg_price=p.be_price,
currency='',
)
position_msgs.append(msg.dict())
position_msgs.append(msg)
await ctx.started(
(position_msgs, [acc_name])
@ -405,7 +405,7 @@ async def handle_order_updates(
broker_details={'name': 'kraken'},
broker_time=broker_time
)
await ems_stream.send(fill_msg.dict())
await ems_stream.send(fill_msg)
filled_msg = BrokerdStatus(
reqid=reqid,
@ -429,7 +429,7 @@ async def handle_order_updates(
# https://github.com/pikers/piker/issues/296
remaining=0,
)
await ems_stream.send(filled_msg.dict())
await ems_stream.send(filled_msg)
# update ledger and position tracking
with open_ledger(acctid, trades) as trans:
@ -466,7 +466,7 @@ async def handle_order_updates(
# TODO
# currency=''
)
await ems_stream.send(pp_msg.dict())
await ems_stream.send(pp_msg)
# process and relay order state change events
# https://docs.kraken.com/websockets/#message-openOrders
@ -563,7 +563,7 @@ async def handle_order_updates(
),
)
msgs.append(resp)
await ems_stream.send(resp.dict())
await ems_stream.send(resp)
case _:
log.warning(
@ -603,7 +603,7 @@ async def handle_order_updates(
msgs.extend(resps)
for resp in resps:
await ems_stream.send(resp.dict())
await ems_stream.send(resp)
case _:
log.warning(f'Unhandled trades update msg: {msg}')

View File

@ -22,10 +22,9 @@ from enum import Enum
from typing import Optional
from bidict import bidict
from pydantic import BaseModel, validator
# from msgspec import Struct
from ..data._source import Symbol
from ..data.types import Struct
from ..pp import Position
@ -41,33 +40,30 @@ SizeUnit = Enum(
)
class Allocator(BaseModel):
class Config:
validate_assignment = True
copy_on_model_validation = False
arbitrary_types_allowed = True
# required to get the account validator lookup working?
extra = 'allow'
underscore_attrs_are_private = False
class Allocator(Struct):
symbol: Symbol
account: Optional[str] = 'paper'
_size_units: bidict[str, Optional[str]] = _size_units
# TODO: for enums this clearly doesn't fucking work, you can't set
# a default at startup by passing in a `dict` but yet you can set
# that value through assignment..for wtv cucked reason.. honestly, pure
# unintuitive garbage.
size_unit: str = 'currency'
_size_units: dict[str, Optional[str]] = _size_units
_size_unit: str = 'currency'
@validator('size_unit', pre=True)
def maybe_lookup_key(cls, v):
# apply the corresponding enum key for the text "description" value
@property
def size_unit(self) -> str:
return self._size_unit
@size_unit.setter
def size_unit(self, v: str) -> Optional[str]:
if v not in _size_units:
return _size_units.inverse[v]
v = _size_units.inverse[v]
assert v in _size_units
self._size_unit = v
return v
# TODO: if we ever want ot support non-uniform entry-slot-proportion
@ -262,7 +258,7 @@ def mk_allocator(
# default allocation settings
defaults: dict[str, float] = {
'account': None, # select paper by default
'size_unit': 'currency',
# 'size_unit': 'currency',
'units_limit': 400,
'currency_limit': 5e3,
'slots': 4,
@ -301,6 +297,9 @@ def mk_allocator(
# entry step 1.0
alloc.units_limit = alloc.slots
else:
alloc.size_unit = 'currency'
# if the current position is already greater then the limit
# settings, increase the limit to the current position
if alloc.size_unit == 'currency':

View File

@ -58,11 +58,11 @@ class OrderBook:
def send(
self,
msg: Order,
msg: Order | dict,
) -> dict:
self._sent_orders[msg.oid] = msg
self._to_ems.send_nowait(msg.dict())
self._to_ems.send_nowait(msg)
return msg
def update(
@ -73,9 +73,8 @@ class OrderBook:
) -> dict:
cmd = self._sent_orders[uuid]
msg = cmd.dict()
msg.update(data)
self._sent_orders[uuid] = Order(**msg)
msg = cmd.copy(update=data)
self._sent_orders[uuid] = msg
self._to_ems.send_nowait(msg)
return cmd
@ -88,7 +87,7 @@ class OrderBook:
oid=uuid,
symbol=cmd.symbol,
)
self._to_ems.send_nowait(msg.dict())
self._to_ems.send_nowait(msg)
_orders: OrderBook = None
@ -149,7 +148,7 @@ async def relay_order_cmds_from_sync_code(
book = get_orders()
async with book._from_order_book.subscribe() as orders_stream:
async for cmd in orders_stream:
if cmd['symbol'] == symbol_key:
if cmd.symbol == symbol_key:
log.info(f'Send order cmd:\n{pformat(cmd)}')
# send msg over IPC / wire
await to_ems_stream.send(cmd)

View File

@ -20,6 +20,7 @@ In da suit parlances: "Execution management systems"
"""
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from math import isnan
from pprint import pformat
import time
from typing import AsyncIterator, Callable
@ -231,7 +232,7 @@ async def clear_dark_triggers(
price=submit_price,
size=cmd['size'],
)
await brokerd_orders_stream.send(msg.dict())
await brokerd_orders_stream.send(msg)
# mark this entry as having sent an order
# request. the entry will be replaced once the
@ -247,14 +248,11 @@ async def clear_dark_triggers(
msg = Status(
oid=oid, # ems order id
resp=resp,
time_ns=time.time_ns(),
symbol=fqsn,
resp=resp,
trigger_price=price,
broker_details={'name': broker},
cmd=cmd, # original request message
).dict()
brokerd_msg=cmd,
)
# remove exec-condition from set
log.info(f'removing pred for {oid}')
@ -577,11 +575,11 @@ async def translate_and_relay_brokerd_events(
if name == 'position':
pos_msg = BrokerdPosition(**brokerd_msg).dict()
pos_msg = BrokerdPosition(**brokerd_msg)
# XXX: this will be useful for automatic strats yah?
# keep pps per account up to date locally in ``emsd`` mem
sym, broker = pos_msg['symbol'], pos_msg['broker']
sym, broker = pos_msg.symbol, pos_msg.broker
relay.positions.setdefault(
# NOTE: translate to a FQSN!
@ -683,7 +681,7 @@ async def translate_and_relay_brokerd_events(
entry.reqid = reqid
# tell broker to cancel immediately
await brokerd_trades_stream.send(entry.dict())
await brokerd_trades_stream.send(entry)
# - the order is now active and will be mirrored in
# our book -> registered as live flow
@ -723,7 +721,7 @@ async def translate_and_relay_brokerd_events(
# if 10147 in message: cancel
resp = 'broker_errored'
broker_details = msg.dict()
broker_details = msg
# don't relay message to order requester client
# continue
@ -758,7 +756,7 @@ async def translate_and_relay_brokerd_events(
resp = 'broker_' + msg.status
# pass the BrokerdStatus msg inside the broker details field
broker_details = msg.dict()
broker_details = msg
elif name in (
'fill',
@ -767,7 +765,7 @@ async def translate_and_relay_brokerd_events(
# proxy through the "fill" result(s)
resp = 'broker_filled'
broker_details = msg.dict()
broker_details = msg
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
@ -785,7 +783,7 @@ async def translate_and_relay_brokerd_events(
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=broker_details,
).dict()
)
)
except KeyError:
log.error(
@ -857,7 +855,7 @@ async def process_client_order_cmds(
f'Submitting cancel for live order {reqid}'
)
await brokerd_order_stream.send(msg.dict())
await brokerd_order_stream.send(msg)
else:
# this might be a cancel for an order that hasn't been
@ -879,7 +877,7 @@ async def process_client_order_cmds(
resp='dark_cancelled',
oid=oid,
time_ns=time.time_ns(),
).dict()
)
)
# de-register this client dialogue
router.dialogues.pop(oid)
@ -934,7 +932,7 @@ async def process_client_order_cmds(
# handle relaying the ems side responses back to
# the client/cmd sender from this request
log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
await brokerd_order_stream.send(msg.dict())
await brokerd_order_stream.send(msg)
# an immediate response should be ``BrokerdOrderAck``
# with ems order id from the ``trades_dialogue()``
@ -959,6 +957,12 @@ async def process_client_order_cmds(
# like every other shitty tina platform that makes
# the user choose the predicate operator.
last = dark_book.lasts[fqsn]
# sometimes the real-time feed hasn't come up
# so just pull from the latest history.
if isnan(last):
last = feed.shm.array[-1]['close']
pred = mk_check(trigger_price, last, action)
spread_slap: float = 5
@ -1008,7 +1012,7 @@ async def process_client_order_cmds(
resp=resp,
oid=oid,
time_ns=time.time_ns(),
).dict()
)
)
@ -1146,8 +1150,14 @@ async def _emsd_main(
)
finally:
# remove client from "registry"
# try to remove client from "registry"
try:
_router.clients.remove(ems_client_order_stream)
except KeyError:
log.warning(
f'Stream {ems_client_order_stream._ctx.chan.uid}'
' was already dropped?'
)
dialogues = _router.dialogues

View File

@ -20,16 +20,15 @@ Clearing system messagingn types and protocols.
"""
from typing import Optional, Union
# TODO: try out just encoding/send direction for now?
# import msgspec
from pydantic import BaseModel
from ..data._source import Symbol
from ..data.types import Struct
# --------------
# Client -> emsd
# --------------
class Cancel(BaseModel):
class Cancel(Struct):
'''Cancel msg for removing a dark (ems triggered) or
broker-submitted (live) trigger/order.
@ -39,7 +38,7 @@ class Cancel(BaseModel):
symbol: str
class Order(BaseModel):
class Order(Struct):
action: str # {'buy', 'sell', 'alert'}
# internal ``emdsd`` unique "order id"
@ -59,20 +58,14 @@ class Order(BaseModel):
# the backend broker
exec_mode: str # {'dark', 'live', 'paper'}
class Config:
# just for pre-loading a ``Symbol`` when used
# in the order mode staging process
arbitrary_types_allowed = True
# don't copy this model instance when used in
# a recursive model
copy_on_model_validation = False
# --------------
# Client <- emsd
# --------------
# update msgs from ems which relay state change info
# from the active clearing engine.
class Status(BaseModel):
class Status(Struct):
name: str = 'status'
oid: str # uuid4
@ -95,8 +88,6 @@ class Status(BaseModel):
# }
resp: str # "response", see above
# symbol: str
# trigger info
trigger_price: Optional[float] = None
# price: float
@ -111,10 +102,12 @@ class Status(BaseModel):
brokerd_msg: dict = {}
# ---------------
# emsd -> brokerd
# ---------------
# requests *sent* from ems to respective backend broker daemon
class BrokerdCancel(BaseModel):
class BrokerdCancel(Struct):
action: str = 'cancel'
oid: str # piker emsd order id
@ -130,7 +123,7 @@ class BrokerdCancel(BaseModel):
reqid: Optional[Union[int, str]] = None
class BrokerdOrder(BaseModel):
class BrokerdOrder(Struct):
action: str # {buy, sell}
oid: str
@ -150,11 +143,12 @@ class BrokerdOrder(BaseModel):
size: float
# ---------------
# emsd <- brokerd
# ---------------
# requests *received* to ems from broker backend
class BrokerdOrderAck(BaseModel):
class BrokerdOrderAck(Struct):
'''
Immediate reponse to a brokerd order request providing the broker
specific unique order id so that the EMS can associate this
@ -172,7 +166,7 @@ class BrokerdOrderAck(BaseModel):
account: str = ''
class BrokerdStatus(BaseModel):
class BrokerdStatus(Struct):
name: str = 'status'
reqid: Union[int, str]
@ -205,7 +199,7 @@ class BrokerdStatus(BaseModel):
}
class BrokerdFill(BaseModel):
class BrokerdFill(Struct):
'''
A single message indicating a "fill-details" event from the broker
if avaiable.
@ -230,7 +224,7 @@ class BrokerdFill(BaseModel):
broker_time: float
class BrokerdError(BaseModel):
class BrokerdError(Struct):
'''
Optional error type that can be relayed to emsd for error handling.
@ -249,7 +243,7 @@ class BrokerdError(BaseModel):
broker_details: dict = {}
class BrokerdPosition(BaseModel):
class BrokerdPosition(Struct):
'''Position update event from brokerd.
'''

View File

@ -30,8 +30,8 @@ import trio
import tractor
from dataclasses import dataclass
from .. import data
from ..data._source import Symbol
from ..data.feed import open_feed
from ..pp import Position
from ..data._normalize import iterticks
from ..data._source import unpack_fqsn
@ -117,7 +117,7 @@ class PaperBoi:
reason='paper_trigger',
remaining=size,
)
await self.ems_trades_stream.send(msg.dict())
await self.ems_trades_stream.send(msg)
# if we're already a clearing price simulate an immediate fill
if (
@ -173,7 +173,7 @@ class PaperBoi:
broker=self.broker,
time_ns=time.time_ns(),
)
await self.ems_trades_stream.send(msg.dict())
await self.ems_trades_stream.send(msg)
async def fake_fill(
self,
@ -216,7 +216,7 @@ class PaperBoi:
'name': self.broker + '_paper',
},
)
await self.ems_trades_stream.send(msg.dict())
await self.ems_trades_stream.send(msg)
if order_complete:
@ -240,7 +240,7 @@ class PaperBoi:
'name': self.broker,
},
)
await self.ems_trades_stream.send(msg.dict())
await self.ems_trades_stream.send(msg)
# lookup any existing position
token = f'{symbol}.{self.broker}'
@ -268,7 +268,7 @@ class PaperBoi:
)
pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price)
await self.ems_trades_stream.send(pp_msg.dict())
await self.ems_trades_stream.send(pp_msg)
async def simulate_fills(
@ -384,7 +384,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'Paper only. No account found: `{account}` ?',
).dict())
))
continue
# validate
@ -416,7 +416,7 @@ async def handle_order_requests(
# broker specific request id
reqid=reqid,
).dict()
)
)
elif action == 'cancel':
@ -441,14 +441,11 @@ async def trades_dialogue(
) -> None:
tractor.log.get_console_log(loglevel)
async with (
data.open_feed(
async with open_feed(
[fqsn],
loglevel=loglevel,
) as feed,
) as feed:
):
# TODO: load paper positions per broker from .toml config file
# and pass as symbol to position data mapping: ``dict[str, dict]``
# await ctx.started(all_positions)

View File

@ -30,19 +30,19 @@ from ._sharedmem import (
get_shm_token,
ShmArray,
)
from .feed import (
open_feed,
_setup_persistent_brokerd,
)
# from .feed import (
# # open_feed,
# _setup_persistent_brokerd,
# )
__all__ = [
'open_feed',
# 'open_feed',
'ShmArray',
'iterticks',
'maybe_open_shm_array',
'attach_shm_array',
'open_shm_array',
'get_shm_token',
'_setup_persistent_brokerd',
# '_setup_persistent_brokerd',
]

View File

@ -51,10 +51,6 @@ class DockerNotStarted(Exception):
'Prolly you dint start da daemon bruh'
class ContainerError(RuntimeError):
'Error reported via app-container logging level'
@acm
async def open_docker(
url: Optional[str] = None,
@ -189,7 +185,7 @@ class Container:
def hard_kill(self, start: float) -> None:
delay = time.time() - start
log.error(
f'Failed to kill container {cid} after {delay}s\n'
f'Failed to kill container {self.cntr.id} after {delay}s\n'
'sending SIGKILL..'
)
# get out the big guns, bc apparently marketstore

View File

@ -128,12 +128,15 @@ class _Token(Struct, frozen=True):
@classmethod
def from_msg(cls, msg: dict) -> _Token:
if isinstance(msg, _Token):
return msg
# TODO: native struct decoding
# return _token_dec.decode(msg)
if isinstance(msg, _Token):
return msg
# assert 0
msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
return _Token(**msg)

View File

@ -0,0 +1,68 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Built-in (extension) types.
"""
from typing import Optional
from pprint import pformat
import msgspec
class Struct(
msgspec.Struct,
# https://jcristharif.com/msgspec/structs.html#tagged-unions
# tag='pikerstruct',
# tag=True,
):
'''
A "human friendlier" (aka repl buddy) struct subtype.
'''
def to_dict(self) -> dict:
return {
f: getattr(self, f)
for f in self.__struct_fields__
}
def __repr__(self):
return f'Struct({pformat(self.to_dict())})'
def copy(
self,
update: Optional[dict] = None,
) -> msgspec.Struct:
'''
Validate-typecast all self defined fields, return a copy of us
with all such fields.
This is kinda like the default behaviour in `pydantic.BaseModel`.
'''
if update:
for k, v in update.items():
setattr(self, k, v)
# roundtrip serialize to validate
return msgspec.msgpack.Decoder(
type=type(self)
).decode(
msgspec.msgpack.Encoder().encode(self)
)

View File

@ -619,7 +619,7 @@ class FillStatusBar(QProgressBar):
# color: #19232D;
# width: 10px;
self.setRange(0, slots)
self.setRange(0, int(slots))
self.setValue(value)

View File

@ -264,7 +264,8 @@ class OrderMode:
self,
) -> OrderDialog:
'''Send execution order to EMS return a level line to
'''
Send execution order to EMS return a level line to
represent the order on a chart.
'''
@ -273,13 +274,9 @@ class OrderMode:
oid = str(uuid.uuid4())
# format order data for ems
fqsn = symbol.front_fqsn()
order = staged.copy(
update={
'symbol': fqsn,
'oid': oid,
}
)
order = staged.copy()
order.oid = oid
order.symbol = symbol.front_fqsn()
line = self.line_from_order(
order,