Compare commits

..

No commits in common. "869aa8251a9d5f26477c75b8630b663aa1e397b3" and "861826dd7bdd2967b2fb0e75251759c43dbfb97a" have entirely different histories.

14 changed files with 162 additions and 231 deletions

View File

@ -22,13 +22,13 @@ from typing import Optional, Union, Callable, Any
from contextlib import asynccontextmanager as acm
from collections import defaultdict
from msgspec import Struct
import tractor
import trio
from trio_typing import TaskStatus
from .log import get_logger, get_console_log
from .brokers import get_brokermod
from .data.types import Struct
log = get_logger(__name__)
@ -204,26 +204,23 @@ async def open_piker_runtime(
assert _services is None
# XXX: this may open a root actor as well
with tractor.msg.configure_native_msgs(
[Struct],
async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
arbiter_addr=_registry_addr,
name=name,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=_root_modules,
) as _,
):
async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
arbiter_addr=_registry_addr,
name=name,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=_root_modules,
) as _,
):
yield tractor.current_actor()
yield tractor.current_actor()
@acm
@ -263,31 +260,27 @@ async def maybe_open_pikerd(
if loglevel:
get_console_log(loglevel)
# XXX: this may open a root actor as well
with tractor.msg.configure_native_msgs(
[Struct],
):
# subtle, we must have the runtime up here or portal lookup will fail
async with maybe_open_runtime(loglevel, **kwargs):
# subtle, we must have the runtime up here or portal lookup will fail
async with maybe_open_runtime(loglevel, **kwargs):
async with tractor.find_actor(_root_dname) as portal:
# assert portal is not None
if portal is not None:
yield portal
return
async with tractor.find_actor(_root_dname) as portal:
# assert portal is not None
if portal is not None:
yield portal
return
# presume pikerd role since no daemon could be found at
# configured address
async with open_pikerd(
# presume pikerd role since no daemon could be found at
# configured address
async with open_pikerd(
loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False),
loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False),
) as _:
# in the case where we're starting up the
# tractor-piker runtime stack in **this** process
# we return no portal to self.
yield None
) as _:
# in the case where we're starting up the
# tractor-piker runtime stack in **this** process
# we return no portal to self.
yield None
# brokerd enabled modules
@ -449,7 +442,7 @@ async def spawn_brokerd(
)
# non-blocking setup of brokerd service nursery
from .data.feed import _setup_persistent_brokerd
from .data import _setup_persistent_brokerd
await _services.start_service_task(
dname,

View File

@ -148,7 +148,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'No account found: `{account}` ?',
))
).dict())
continue
client = _accounts2clients.get(account)
@ -161,7 +161,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'No api client loaded for account: `{account}` ?',
))
).dict())
continue
if action in {'buy', 'sell'}:
@ -188,7 +188,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason='Order already active?',
))
).dict())
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
@ -197,8 +197,9 @@ async def handle_order_requests(
oid=order.oid,
# broker specific request id
reqid=reqid,
time_ns=time.time_ns(),
account=account,
)
).dict()
)
elif action == 'cancel':
@ -558,7 +559,7 @@ async def trades_dialogue(
cids2pps,
validate=True,
)
all_positions.extend(msg for msg in msgs)
all_positions.extend(msg.dict() for msg in msgs)
if not all_positions and cids2pps:
raise RuntimeError(
@ -664,7 +665,7 @@ async def emit_pp_update(
msg = msgs[0]
break
await ems_stream.send(msg)
await ems_stream.send(msg.dict())
async def deliver_trade_events(
@ -742,7 +743,7 @@ async def deliver_trade_events(
broker_details={'name': 'ib'},
)
await ems_stream.send(msg)
await ems_stream.send(msg.dict())
case 'fill':
@ -802,7 +803,7 @@ async def deliver_trade_events(
broker_time=trade_entry['broker_time'],
)
await ems_stream.send(msg)
await ems_stream.send(msg.dict())
# 2 cases:
# - fill comes first or
@ -878,7 +879,7 @@ async def deliver_trade_events(
cid, msg = pack_position(item)
# acctid = msg.account = accounts_def.inverse[msg.account]
# cuck ib and it's shitty fifo sys for pps!
# await ems_stream.send(msg)
# await ems_stream.send(msg.dict())
case 'event':
@ -890,7 +891,7 @@ async def deliver_trade_events(
# level...
# reqid = item.get('reqid', 0)
# if getattr(msg, 'reqid', 0) < -1:
# log.info(f"TWS triggered trade\n{pformat(msg)}")
# log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
# msg.reqid = 'tws-' + str(-1 * reqid)

View File

@ -184,7 +184,7 @@ async def handle_order_requests(
'Invalid request msg:\n{msg}'
),
)
).dict()
)
@ -282,7 +282,7 @@ async def trades_dialogue(
avg_price=p.be_price,
currency='',
)
position_msgs.append(msg)
position_msgs.append(msg.dict())
await ctx.started(
(position_msgs, [acc_name])
@ -405,7 +405,7 @@ async def handle_order_updates(
broker_details={'name': 'kraken'},
broker_time=broker_time
)
await ems_stream.send(fill_msg)
await ems_stream.send(fill_msg.dict())
filled_msg = BrokerdStatus(
reqid=reqid,
@ -429,7 +429,7 @@ async def handle_order_updates(
# https://github.com/pikers/piker/issues/296
remaining=0,
)
await ems_stream.send(filled_msg)
await ems_stream.send(filled_msg.dict())
# update ledger and position tracking
with open_ledger(acctid, trades) as trans:
@ -466,7 +466,7 @@ async def handle_order_updates(
# TODO
# currency=''
)
await ems_stream.send(pp_msg)
await ems_stream.send(pp_msg.dict())
# process and relay order state change events
# https://docs.kraken.com/websockets/#message-openOrders
@ -563,7 +563,7 @@ async def handle_order_updates(
),
)
msgs.append(resp)
await ems_stream.send(resp)
await ems_stream.send(resp.dict())
case _:
log.warning(
@ -603,7 +603,7 @@ async def handle_order_updates(
msgs.extend(resps)
for resp in resps:
await ems_stream.send(resp)
await ems_stream.send(resp.dict())
case _:
log.warning(f'Unhandled trades update msg: {msg}')

View File

@ -22,9 +22,10 @@ from enum import Enum
from typing import Optional
from bidict import bidict
from pydantic import BaseModel, validator
# from msgspec import Struct
from ..data._source import Symbol
from ..data.types import Struct
from ..pp import Position
@ -40,30 +41,33 @@ SizeUnit = Enum(
)
class Allocator(Struct):
class Allocator(BaseModel):
class Config:
validate_assignment = True
copy_on_model_validation = False
arbitrary_types_allowed = True
# required to get the account validator lookup working?
extra = 'allow'
underscore_attrs_are_private = False
symbol: Symbol
account: Optional[str] = 'paper'
_size_units: bidict[str, Optional[str]] = _size_units
# TODO: for enums this clearly doesn't fucking work, you can't set
# a default at startup by passing in a `dict` but yet you can set
# that value through assignment..for wtv cucked reason.. honestly, pure
# unintuitive garbage.
_size_unit: str = 'currency'
size_unit: str = 'currency'
_size_units: dict[str, Optional[str]] = _size_units
@property
def size_unit(self) -> str:
return self._size_unit
@size_unit.setter
def size_unit(self, v: str) -> Optional[str]:
@validator('size_unit', pre=True)
def maybe_lookup_key(cls, v):
# apply the corresponding enum key for the text "description" value
if v not in _size_units:
v = _size_units.inverse[v]
return _size_units.inverse[v]
assert v in _size_units
self._size_unit = v
return v
# TODO: if we ever want ot support non-uniform entry-slot-proportion
@ -258,7 +262,7 @@ def mk_allocator(
# default allocation settings
defaults: dict[str, float] = {
'account': None, # select paper by default
# 'size_unit': 'currency',
'size_unit': 'currency',
'units_limit': 400,
'currency_limit': 5e3,
'slots': 4,
@ -297,9 +301,6 @@ def mk_allocator(
# entry step 1.0
alloc.units_limit = alloc.slots
else:
alloc.size_unit = 'currency'
# if the current position is already greater then the limit
# settings, increase the limit to the current position
if alloc.size_unit == 'currency':

View File

@ -58,11 +58,11 @@ class OrderBook:
def send(
self,
msg: Order | dict,
msg: Order,
) -> dict:
self._sent_orders[msg.oid] = msg
self._to_ems.send_nowait(msg)
self._to_ems.send_nowait(msg.dict())
return msg
def update(
@ -73,8 +73,9 @@ class OrderBook:
) -> dict:
cmd = self._sent_orders[uuid]
msg = cmd.copy(update=data)
self._sent_orders[uuid] = msg
msg = cmd.dict()
msg.update(data)
self._sent_orders[uuid] = Order(**msg)
self._to_ems.send_nowait(msg)
return cmd
@ -87,7 +88,7 @@ class OrderBook:
oid=uuid,
symbol=cmd.symbol,
)
self._to_ems.send_nowait(msg)
self._to_ems.send_nowait(msg.dict())
_orders: OrderBook = None
@ -148,7 +149,7 @@ async def relay_order_cmds_from_sync_code(
book = get_orders()
async with book._from_order_book.subscribe() as orders_stream:
async for cmd in orders_stream:
if cmd.symbol == symbol_key:
if cmd['symbol'] == symbol_key:
log.info(f'Send order cmd:\n{pformat(cmd)}')
# send msg over IPC / wire
await to_ems_stream.send(cmd)

View File

@ -20,7 +20,6 @@ In da suit parlances: "Execution management systems"
"""
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from math import isnan
from pprint import pformat
import time
from typing import AsyncIterator, Callable
@ -232,7 +231,7 @@ async def clear_dark_triggers(
price=submit_price,
size=cmd['size'],
)
await brokerd_orders_stream.send(msg)
await brokerd_orders_stream.send(msg.dict())
# mark this entry as having sent an order
# request. the entry will be replaced once the
@ -248,11 +247,14 @@ async def clear_dark_triggers(
msg = Status(
oid=oid, # ems order id
time_ns=time.time_ns(),
resp=resp,
time_ns=time.time_ns(),
symbol=fqsn,
trigger_price=price,
brokerd_msg=cmd,
)
broker_details={'name': broker},
cmd=cmd, # original request message
).dict()
# remove exec-condition from set
log.info(f'removing pred for {oid}')
@ -575,11 +577,11 @@ async def translate_and_relay_brokerd_events(
if name == 'position':
pos_msg = BrokerdPosition(**brokerd_msg)
pos_msg = BrokerdPosition(**brokerd_msg).dict()
# XXX: this will be useful for automatic strats yah?
# keep pps per account up to date locally in ``emsd`` mem
sym, broker = pos_msg.symbol, pos_msg.broker
sym, broker = pos_msg['symbol'], pos_msg['broker']
relay.positions.setdefault(
# NOTE: translate to a FQSN!
@ -681,7 +683,7 @@ async def translate_and_relay_brokerd_events(
entry.reqid = reqid
# tell broker to cancel immediately
await brokerd_trades_stream.send(entry)
await brokerd_trades_stream.send(entry.dict())
# - the order is now active and will be mirrored in
# our book -> registered as live flow
@ -721,7 +723,7 @@ async def translate_and_relay_brokerd_events(
# if 10147 in message: cancel
resp = 'broker_errored'
broker_details = msg
broker_details = msg.dict()
# don't relay message to order requester client
# continue
@ -756,7 +758,7 @@ async def translate_and_relay_brokerd_events(
resp = 'broker_' + msg.status
# pass the BrokerdStatus msg inside the broker details field
broker_details = msg
broker_details = msg.dict()
elif name in (
'fill',
@ -765,7 +767,7 @@ async def translate_and_relay_brokerd_events(
# proxy through the "fill" result(s)
resp = 'broker_filled'
broker_details = msg
broker_details = msg.dict()
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
@ -783,7 +785,7 @@ async def translate_and_relay_brokerd_events(
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=broker_details,
)
).dict()
)
except KeyError:
log.error(
@ -855,7 +857,7 @@ async def process_client_order_cmds(
f'Submitting cancel for live order {reqid}'
)
await brokerd_order_stream.send(msg)
await brokerd_order_stream.send(msg.dict())
else:
# this might be a cancel for an order that hasn't been
@ -877,7 +879,7 @@ async def process_client_order_cmds(
resp='dark_cancelled',
oid=oid,
time_ns=time.time_ns(),
)
).dict()
)
# de-register this client dialogue
router.dialogues.pop(oid)
@ -932,7 +934,7 @@ async def process_client_order_cmds(
# handle relaying the ems side responses back to
# the client/cmd sender from this request
log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
await brokerd_order_stream.send(msg)
await brokerd_order_stream.send(msg.dict())
# an immediate response should be ``BrokerdOrderAck``
# with ems order id from the ``trades_dialogue()``
@ -957,12 +959,6 @@ async def process_client_order_cmds(
# like every other shitty tina platform that makes
# the user choose the predicate operator.
last = dark_book.lasts[fqsn]
# sometimes the real-time feed hasn't come up
# so just pull from the latest history.
if isnan(last):
last = feed.shm.array[-1]['close']
pred = mk_check(trigger_price, last, action)
spread_slap: float = 5
@ -1012,7 +1008,7 @@ async def process_client_order_cmds(
resp=resp,
oid=oid,
time_ns=time.time_ns(),
)
).dict()
)
@ -1150,14 +1146,8 @@ async def _emsd_main(
)
finally:
# try to remove client from "registry"
try:
_router.clients.remove(ems_client_order_stream)
except KeyError:
log.warning(
f'Stream {ems_client_order_stream._ctx.chan.uid}'
' was already dropped?'
)
# remove client from "registry"
_router.clients.remove(ems_client_order_stream)
dialogues = _router.dialogues

View File

@ -20,15 +20,16 @@ Clearing system messagingn types and protocols.
"""
from typing import Optional, Union
# TODO: try out just encoding/send direction for now?
# import msgspec
from pydantic import BaseModel
from ..data._source import Symbol
from ..data.types import Struct
# --------------
# Client -> emsd
# --------------
class Cancel(Struct):
class Cancel(BaseModel):
'''Cancel msg for removing a dark (ems triggered) or
broker-submitted (live) trigger/order.
@ -38,7 +39,7 @@ class Cancel(Struct):
symbol: str
class Order(Struct):
class Order(BaseModel):
action: str # {'buy', 'sell', 'alert'}
# internal ``emdsd`` unique "order id"
@ -58,14 +59,20 @@ class Order(Struct):
# the backend broker
exec_mode: str # {'dark', 'live', 'paper'}
class Config:
# just for pre-loading a ``Symbol`` when used
# in the order mode staging process
arbitrary_types_allowed = True
# don't copy this model instance when used in
# a recursive model
copy_on_model_validation = False
# --------------
# Client <- emsd
# --------------
# update msgs from ems which relay state change info
# from the active clearing engine.
class Status(Struct):
class Status(BaseModel):
name: str = 'status'
oid: str # uuid4
@ -88,6 +95,8 @@ class Status(Struct):
# }
resp: str # "response", see above
# symbol: str
# trigger info
trigger_price: Optional[float] = None
# price: float
@ -102,12 +111,10 @@ class Status(Struct):
brokerd_msg: dict = {}
# ---------------
# emsd -> brokerd
# ---------------
# requests *sent* from ems to respective backend broker daemon
class BrokerdCancel(Struct):
class BrokerdCancel(BaseModel):
action: str = 'cancel'
oid: str # piker emsd order id
@ -123,7 +130,7 @@ class BrokerdCancel(Struct):
reqid: Optional[Union[int, str]] = None
class BrokerdOrder(Struct):
class BrokerdOrder(BaseModel):
action: str # {buy, sell}
oid: str
@ -143,12 +150,11 @@ class BrokerdOrder(Struct):
size: float
# ---------------
# emsd <- brokerd
# ---------------
# requests *received* to ems from broker backend
class BrokerdOrderAck(Struct):
class BrokerdOrderAck(BaseModel):
'''
Immediate reponse to a brokerd order request providing the broker
specific unique order id so that the EMS can associate this
@ -166,7 +172,7 @@ class BrokerdOrderAck(Struct):
account: str = ''
class BrokerdStatus(Struct):
class BrokerdStatus(BaseModel):
name: str = 'status'
reqid: Union[int, str]
@ -199,7 +205,7 @@ class BrokerdStatus(Struct):
}
class BrokerdFill(Struct):
class BrokerdFill(BaseModel):
'''
A single message indicating a "fill-details" event from the broker
if avaiable.
@ -224,7 +230,7 @@ class BrokerdFill(Struct):
broker_time: float
class BrokerdError(Struct):
class BrokerdError(BaseModel):
'''
Optional error type that can be relayed to emsd for error handling.
@ -243,7 +249,7 @@ class BrokerdError(Struct):
broker_details: dict = {}
class BrokerdPosition(Struct):
class BrokerdPosition(BaseModel):
'''Position update event from brokerd.
'''

View File

@ -30,8 +30,8 @@ import trio
import tractor
from dataclasses import dataclass
from .. import data
from ..data._source import Symbol
from ..data.feed import open_feed
from ..pp import Position
from ..data._normalize import iterticks
from ..data._source import unpack_fqsn
@ -117,7 +117,7 @@ class PaperBoi:
reason='paper_trigger',
remaining=size,
)
await self.ems_trades_stream.send(msg)
await self.ems_trades_stream.send(msg.dict())
# if we're already a clearing price simulate an immediate fill
if (
@ -173,7 +173,7 @@ class PaperBoi:
broker=self.broker,
time_ns=time.time_ns(),
)
await self.ems_trades_stream.send(msg)
await self.ems_trades_stream.send(msg.dict())
async def fake_fill(
self,
@ -216,7 +216,7 @@ class PaperBoi:
'name': self.broker + '_paper',
},
)
await self.ems_trades_stream.send(msg)
await self.ems_trades_stream.send(msg.dict())
if order_complete:
@ -240,7 +240,7 @@ class PaperBoi:
'name': self.broker,
},
)
await self.ems_trades_stream.send(msg)
await self.ems_trades_stream.send(msg.dict())
# lookup any existing position
token = f'{symbol}.{self.broker}'
@ -268,7 +268,7 @@ class PaperBoi:
)
pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price)
await self.ems_trades_stream.send(pp_msg)
await self.ems_trades_stream.send(pp_msg.dict())
async def simulate_fills(
@ -384,7 +384,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'Paper only. No account found: `{account}` ?',
))
).dict())
continue
# validate
@ -416,7 +416,7 @@ async def handle_order_requests(
# broker specific request id
reqid=reqid,
)
).dict()
)
elif action == 'cancel':
@ -441,11 +441,14 @@ async def trades_dialogue(
) -> None:
tractor.log.get_console_log(loglevel)
async with open_feed(
[fqsn],
loglevel=loglevel,
) as feed:
async with (
data.open_feed(
[fqsn],
loglevel=loglevel,
) as feed,
):
# TODO: load paper positions per broker from .toml config file
# and pass as symbol to position data mapping: ``dict[str, dict]``
# await ctx.started(all_positions)

View File

@ -30,19 +30,19 @@ from ._sharedmem import (
get_shm_token,
ShmArray,
)
# from .feed import (
# # open_feed,
# _setup_persistent_brokerd,
# )
from .feed import (
open_feed,
_setup_persistent_brokerd,
)
__all__ = [
# 'open_feed',
'open_feed',
'ShmArray',
'iterticks',
'maybe_open_shm_array',
'attach_shm_array',
'open_shm_array',
'get_shm_token',
# '_setup_persistent_brokerd',
'_setup_persistent_brokerd',
]

View File

@ -51,6 +51,10 @@ class DockerNotStarted(Exception):
'Prolly you dint start da daemon bruh'
class ContainerError(RuntimeError):
'Error reported via app-container logging level'
@acm
async def open_docker(
url: Optional[str] = None,
@ -185,7 +189,7 @@ class Container:
def hard_kill(self, start: float) -> None:
delay = time.time() - start
log.error(
f'Failed to kill container {self.cntr.id} after {delay}s\n'
f'Failed to kill container {cid} after {delay}s\n'
'sending SIGKILL..'
)
# get out the big guns, bc apparently marketstore

View File

@ -128,14 +128,11 @@ class _Token(Struct, frozen=True):
@classmethod
def from_msg(cls, msg: dict) -> _Token:
# TODO: native struct decoding
# return _token_dec.decode(msg)
if isinstance(msg, _Token):
return msg
# assert 0
# TODO: native struct decoding
# return _token_dec.decode(msg)
msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
return _Token(**msg)

View File

@ -1,68 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Built-in (extension) types.
"""
from typing import Optional
from pprint import pformat
import msgspec
class Struct(
msgspec.Struct,
# https://jcristharif.com/msgspec/structs.html#tagged-unions
# tag='pikerstruct',
# tag=True,
):
'''
A "human friendlier" (aka repl buddy) struct subtype.
'''
def to_dict(self) -> dict:
return {
f: getattr(self, f)
for f in self.__struct_fields__
}
def __repr__(self):
return f'Struct({pformat(self.to_dict())})'
def copy(
self,
update: Optional[dict] = None,
) -> msgspec.Struct:
'''
Validate-typecast all self defined fields, return a copy of us
with all such fields.
This is kinda like the default behaviour in `pydantic.BaseModel`.
'''
if update:
for k, v in update.items():
setattr(self, k, v)
# roundtrip serialize to validate
return msgspec.msgpack.Decoder(
type=type(self)
).decode(
msgspec.msgpack.Encoder().encode(self)
)

View File

@ -619,7 +619,7 @@ class FillStatusBar(QProgressBar):
# color: #19232D;
# width: 10px;
self.setRange(0, int(slots))
self.setRange(0, slots)
self.setValue(value)

View File

@ -264,8 +264,7 @@ class OrderMode:
self,
) -> OrderDialog:
'''
Send execution order to EMS return a level line to
'''Send execution order to EMS return a level line to
represent the order on a chart.
'''
@ -274,9 +273,13 @@ class OrderMode:
oid = str(uuid.uuid4())
# format order data for ems
order = staged.copy()
order.oid = oid
order.symbol = symbol.front_fqsn()
fqsn = symbol.front_fqsn()
order = staged.copy(
update={
'symbol': fqsn,
'oid': oid,
}
)
line = self.line_from_order(
order,