Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet 869aa8251a Import adjustments to allow msg codec overriding in `tractor` 2022-07-08 10:56:50 -04:00
Tyler Goodlet d31c38ef51 Mucking with custom `msgspec.Struct` codecs
Syncs with https://github.com/goodboy/tractor/pull/311
which is nowhere near ready and this approach didn't end up being
as straight forward as hoped. We're going to need a top level
`Msg`-boxing type/protocol in `tractor` first...
2022-07-08 10:56:50 -04:00
Tyler Goodlet de91c2196d Drop remaining `BaseModel` api usage from rest of codebase 2022-07-08 10:55:02 -04:00
Tyler Goodlet 583fa79e5e Add `Struct.copy()` which does a rountrip validate 2022-07-08 10:54:04 -04:00
Tyler Goodlet 6887d4d1b0 Change all clearing msgs over to `msgspec` 2022-07-08 10:53:33 -04:00
Tyler Goodlet c87704e593 Cast slots to `int` before range set 2022-07-07 21:08:46 -04:00
Tyler Goodlet cfc08a5814 Drop pydantic from allocator 2022-07-07 21:04:53 -04:00
Tyler Goodlet c10a85a8f3 Add a custom `msgspec.Struct` with some humanizing 2022-07-07 19:36:39 -04:00
Tyler Goodlet 4e9ff65465 Finally solve the last-price-is-`nan` issue..
Not sure why I put this off for so long but the check is in now such
that if the market isn't open or no rt quote comes in from the first
query, we just pull from the last shm history 'close' value.
Includes another fix to avoid raising when a double remove on the client
side stream from the registry sometimes happens.
2022-07-07 19:15:01 -04:00
Tyler Goodlet bf5fcfe896 Fix missing container id, drop custom exception 2022-07-07 17:10:06 -04:00
14 changed files with 231 additions and 162 deletions

View File

@ -22,13 +22,13 @@ from typing import Optional, Union, Callable, Any
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from collections import defaultdict from collections import defaultdict
from msgspec import Struct
import tractor import tractor
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from .log import get_logger, get_console_log from .log import get_logger, get_console_log
from .brokers import get_brokermod from .brokers import get_brokermod
from .data.types import Struct
log = get_logger(__name__) log = get_logger(__name__)
@ -204,23 +204,26 @@ async def open_piker_runtime(
assert _services is None assert _services is None
# XXX: this may open a root actor as well # XXX: this may open a root actor as well
async with ( with tractor.msg.configure_native_msgs(
tractor.open_root_actor( [Struct],
# passed through to ``open_root_actor``
arbiter_addr=_registry_addr,
name=name,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=_root_modules,
) as _,
): ):
yield tractor.current_actor() async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
arbiter_addr=_registry_addr,
name=name,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=_root_modules,
) as _,
):
yield tractor.current_actor()
@acm @acm
@ -260,27 +263,31 @@ async def maybe_open_pikerd(
if loglevel: if loglevel:
get_console_log(loglevel) get_console_log(loglevel)
# subtle, we must have the runtime up here or portal lookup will fail # XXX: this may open a root actor as well
async with maybe_open_runtime(loglevel, **kwargs): with tractor.msg.configure_native_msgs(
[Struct],
):
# subtle, we must have the runtime up here or portal lookup will fail
async with maybe_open_runtime(loglevel, **kwargs):
async with tractor.find_actor(_root_dname) as portal: async with tractor.find_actor(_root_dname) as portal:
# assert portal is not None # assert portal is not None
if portal is not None: if portal is not None:
yield portal yield portal
return return
# presume pikerd role since no daemon could be found at # presume pikerd role since no daemon could be found at
# configured address # configured address
async with open_pikerd( async with open_pikerd(
loglevel=loglevel, loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False), debug_mode=kwargs.get('debug_mode', False),
) as _: ) as _:
# in the case where we're starting up the # in the case where we're starting up the
# tractor-piker runtime stack in **this** process # tractor-piker runtime stack in **this** process
# we return no portal to self. # we return no portal to self.
yield None yield None
# brokerd enabled modules # brokerd enabled modules
@ -442,7 +449,7 @@ async def spawn_brokerd(
) )
# non-blocking setup of brokerd service nursery # non-blocking setup of brokerd service nursery
from .data import _setup_persistent_brokerd from .data.feed import _setup_persistent_brokerd
await _services.start_service_task( await _services.start_service_task(
dname, dname,

View File

@ -148,7 +148,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'No account found: `{account}` ?', reason=f'No account found: `{account}` ?',
).dict()) ))
continue continue
client = _accounts2clients.get(account) client = _accounts2clients.get(account)
@ -161,7 +161,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'No api client loaded for account: `{account}` ?', reason=f'No api client loaded for account: `{account}` ?',
).dict()) ))
continue continue
if action in {'buy', 'sell'}: if action in {'buy', 'sell'}:
@ -188,7 +188,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason='Order already active?', reason='Order already active?',
).dict()) ))
# deliver ack that order has been submitted to broker routing # deliver ack that order has been submitted to broker routing
await ems_order_stream.send( await ems_order_stream.send(
@ -197,9 +197,8 @@ async def handle_order_requests(
oid=order.oid, oid=order.oid,
# broker specific request id # broker specific request id
reqid=reqid, reqid=reqid,
time_ns=time.time_ns(),
account=account, account=account,
).dict() )
) )
elif action == 'cancel': elif action == 'cancel':
@ -559,7 +558,7 @@ async def trades_dialogue(
cids2pps, cids2pps,
validate=True, validate=True,
) )
all_positions.extend(msg.dict() for msg in msgs) all_positions.extend(msg for msg in msgs)
if not all_positions and cids2pps: if not all_positions and cids2pps:
raise RuntimeError( raise RuntimeError(
@ -665,7 +664,7 @@ async def emit_pp_update(
msg = msgs[0] msg = msgs[0]
break break
await ems_stream.send(msg.dict()) await ems_stream.send(msg)
async def deliver_trade_events( async def deliver_trade_events(
@ -743,7 +742,7 @@ async def deliver_trade_events(
broker_details={'name': 'ib'}, broker_details={'name': 'ib'},
) )
await ems_stream.send(msg.dict()) await ems_stream.send(msg)
case 'fill': case 'fill':
@ -803,7 +802,7 @@ async def deliver_trade_events(
broker_time=trade_entry['broker_time'], broker_time=trade_entry['broker_time'],
) )
await ems_stream.send(msg.dict()) await ems_stream.send(msg)
# 2 cases: # 2 cases:
# - fill comes first or # - fill comes first or
@ -879,7 +878,7 @@ async def deliver_trade_events(
cid, msg = pack_position(item) cid, msg = pack_position(item)
# acctid = msg.account = accounts_def.inverse[msg.account] # acctid = msg.account = accounts_def.inverse[msg.account]
# cuck ib and it's shitty fifo sys for pps! # cuck ib and it's shitty fifo sys for pps!
# await ems_stream.send(msg.dict()) # await ems_stream.send(msg)
case 'event': case 'event':
@ -891,7 +890,7 @@ async def deliver_trade_events(
# level... # level...
# reqid = item.get('reqid', 0) # reqid = item.get('reqid', 0)
# if getattr(msg, 'reqid', 0) < -1: # if getattr(msg, 'reqid', 0) < -1:
# log.info(f"TWS triggered trade\n{pformat(msg.dict())}") # log.info(f"TWS triggered trade\n{pformat(msg)}")
# msg.reqid = 'tws-' + str(-1 * reqid) # msg.reqid = 'tws-' + str(-1 * reqid)

View File

@ -184,7 +184,7 @@ async def handle_order_requests(
'Invalid request msg:\n{msg}' 'Invalid request msg:\n{msg}'
), ),
).dict() )
) )
@ -282,7 +282,7 @@ async def trades_dialogue(
avg_price=p.be_price, avg_price=p.be_price,
currency='', currency='',
) )
position_msgs.append(msg.dict()) position_msgs.append(msg)
await ctx.started( await ctx.started(
(position_msgs, [acc_name]) (position_msgs, [acc_name])
@ -405,7 +405,7 @@ async def handle_order_updates(
broker_details={'name': 'kraken'}, broker_details={'name': 'kraken'},
broker_time=broker_time broker_time=broker_time
) )
await ems_stream.send(fill_msg.dict()) await ems_stream.send(fill_msg)
filled_msg = BrokerdStatus( filled_msg = BrokerdStatus(
reqid=reqid, reqid=reqid,
@ -429,7 +429,7 @@ async def handle_order_updates(
# https://github.com/pikers/piker/issues/296 # https://github.com/pikers/piker/issues/296
remaining=0, remaining=0,
) )
await ems_stream.send(filled_msg.dict()) await ems_stream.send(filled_msg)
# update ledger and position tracking # update ledger and position tracking
with open_ledger(acctid, trades) as trans: with open_ledger(acctid, trades) as trans:
@ -466,7 +466,7 @@ async def handle_order_updates(
# TODO # TODO
# currency='' # currency=''
) )
await ems_stream.send(pp_msg.dict()) await ems_stream.send(pp_msg)
# process and relay order state change events # process and relay order state change events
# https://docs.kraken.com/websockets/#message-openOrders # https://docs.kraken.com/websockets/#message-openOrders
@ -563,7 +563,7 @@ async def handle_order_updates(
), ),
) )
msgs.append(resp) msgs.append(resp)
await ems_stream.send(resp.dict()) await ems_stream.send(resp)
case _: case _:
log.warning( log.warning(
@ -603,7 +603,7 @@ async def handle_order_updates(
msgs.extend(resps) msgs.extend(resps)
for resp in resps: for resp in resps:
await ems_stream.send(resp.dict()) await ems_stream.send(resp)
case _: case _:
log.warning(f'Unhandled trades update msg: {msg}') log.warning(f'Unhandled trades update msg: {msg}')

View File

@ -22,10 +22,9 @@ from enum import Enum
from typing import Optional from typing import Optional
from bidict import bidict from bidict import bidict
from pydantic import BaseModel, validator
# from msgspec import Struct
from ..data._source import Symbol from ..data._source import Symbol
from ..data.types import Struct
from ..pp import Position from ..pp import Position
@ -41,33 +40,30 @@ SizeUnit = Enum(
) )
class Allocator(BaseModel): class Allocator(Struct):
class Config:
validate_assignment = True
copy_on_model_validation = False
arbitrary_types_allowed = True
# required to get the account validator lookup working?
extra = 'allow'
underscore_attrs_are_private = False
symbol: Symbol symbol: Symbol
account: Optional[str] = 'paper' account: Optional[str] = 'paper'
_size_units: bidict[str, Optional[str]] = _size_units
# TODO: for enums this clearly doesn't fucking work, you can't set # TODO: for enums this clearly doesn't fucking work, you can't set
# a default at startup by passing in a `dict` but yet you can set # a default at startup by passing in a `dict` but yet you can set
# that value through assignment..for wtv cucked reason.. honestly, pure # that value through assignment..for wtv cucked reason.. honestly, pure
# unintuitive garbage. # unintuitive garbage.
size_unit: str = 'currency' _size_unit: str = 'currency'
_size_units: dict[str, Optional[str]] = _size_units
@validator('size_unit', pre=True) @property
def maybe_lookup_key(cls, v): def size_unit(self) -> str:
# apply the corresponding enum key for the text "description" value return self._size_unit
@size_unit.setter
def size_unit(self, v: str) -> Optional[str]:
if v not in _size_units: if v not in _size_units:
return _size_units.inverse[v] v = _size_units.inverse[v]
assert v in _size_units assert v in _size_units
self._size_unit = v
return v return v
# TODO: if we ever want ot support non-uniform entry-slot-proportion # TODO: if we ever want ot support non-uniform entry-slot-proportion
@ -262,7 +258,7 @@ def mk_allocator(
# default allocation settings # default allocation settings
defaults: dict[str, float] = { defaults: dict[str, float] = {
'account': None, # select paper by default 'account': None, # select paper by default
'size_unit': 'currency', # 'size_unit': 'currency',
'units_limit': 400, 'units_limit': 400,
'currency_limit': 5e3, 'currency_limit': 5e3,
'slots': 4, 'slots': 4,
@ -301,6 +297,9 @@ def mk_allocator(
# entry step 1.0 # entry step 1.0
alloc.units_limit = alloc.slots alloc.units_limit = alloc.slots
else:
alloc.size_unit = 'currency'
# if the current position is already greater then the limit # if the current position is already greater then the limit
# settings, increase the limit to the current position # settings, increase the limit to the current position
if alloc.size_unit == 'currency': if alloc.size_unit == 'currency':

View File

@ -58,11 +58,11 @@ class OrderBook:
def send( def send(
self, self,
msg: Order, msg: Order | dict,
) -> dict: ) -> dict:
self._sent_orders[msg.oid] = msg self._sent_orders[msg.oid] = msg
self._to_ems.send_nowait(msg.dict()) self._to_ems.send_nowait(msg)
return msg return msg
def update( def update(
@ -73,9 +73,8 @@ class OrderBook:
) -> dict: ) -> dict:
cmd = self._sent_orders[uuid] cmd = self._sent_orders[uuid]
msg = cmd.dict() msg = cmd.copy(update=data)
msg.update(data) self._sent_orders[uuid] = msg
self._sent_orders[uuid] = Order(**msg)
self._to_ems.send_nowait(msg) self._to_ems.send_nowait(msg)
return cmd return cmd
@ -88,7 +87,7 @@ class OrderBook:
oid=uuid, oid=uuid,
symbol=cmd.symbol, symbol=cmd.symbol,
) )
self._to_ems.send_nowait(msg.dict()) self._to_ems.send_nowait(msg)
_orders: OrderBook = None _orders: OrderBook = None
@ -149,7 +148,7 @@ async def relay_order_cmds_from_sync_code(
book = get_orders() book = get_orders()
async with book._from_order_book.subscribe() as orders_stream: async with book._from_order_book.subscribe() as orders_stream:
async for cmd in orders_stream: async for cmd in orders_stream:
if cmd['symbol'] == symbol_key: if cmd.symbol == symbol_key:
log.info(f'Send order cmd:\n{pformat(cmd)}') log.info(f'Send order cmd:\n{pformat(cmd)}')
# send msg over IPC / wire # send msg over IPC / wire
await to_ems_stream.send(cmd) await to_ems_stream.send(cmd)

View File

@ -20,6 +20,7 @@ In da suit parlances: "Execution management systems"
""" """
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass, field from dataclasses import dataclass, field
from math import isnan
from pprint import pformat from pprint import pformat
import time import time
from typing import AsyncIterator, Callable from typing import AsyncIterator, Callable
@ -231,7 +232,7 @@ async def clear_dark_triggers(
price=submit_price, price=submit_price,
size=cmd['size'], size=cmd['size'],
) )
await brokerd_orders_stream.send(msg.dict()) await brokerd_orders_stream.send(msg)
# mark this entry as having sent an order # mark this entry as having sent an order
# request. the entry will be replaced once the # request. the entry will be replaced once the
@ -247,14 +248,11 @@ async def clear_dark_triggers(
msg = Status( msg = Status(
oid=oid, # ems order id oid=oid, # ems order id
resp=resp,
time_ns=time.time_ns(), time_ns=time.time_ns(),
symbol=fqsn, resp=resp,
trigger_price=price, trigger_price=price,
broker_details={'name': broker}, brokerd_msg=cmd,
cmd=cmd, # original request message )
).dict()
# remove exec-condition from set # remove exec-condition from set
log.info(f'removing pred for {oid}') log.info(f'removing pred for {oid}')
@ -577,11 +575,11 @@ async def translate_and_relay_brokerd_events(
if name == 'position': if name == 'position':
pos_msg = BrokerdPosition(**brokerd_msg).dict() pos_msg = BrokerdPosition(**brokerd_msg)
# XXX: this will be useful for automatic strats yah? # XXX: this will be useful for automatic strats yah?
# keep pps per account up to date locally in ``emsd`` mem # keep pps per account up to date locally in ``emsd`` mem
sym, broker = pos_msg['symbol'], pos_msg['broker'] sym, broker = pos_msg.symbol, pos_msg.broker
relay.positions.setdefault( relay.positions.setdefault(
# NOTE: translate to a FQSN! # NOTE: translate to a FQSN!
@ -683,7 +681,7 @@ async def translate_and_relay_brokerd_events(
entry.reqid = reqid entry.reqid = reqid
# tell broker to cancel immediately # tell broker to cancel immediately
await brokerd_trades_stream.send(entry.dict()) await brokerd_trades_stream.send(entry)
# - the order is now active and will be mirrored in # - the order is now active and will be mirrored in
# our book -> registered as live flow # our book -> registered as live flow
@ -723,7 +721,7 @@ async def translate_and_relay_brokerd_events(
# if 10147 in message: cancel # if 10147 in message: cancel
resp = 'broker_errored' resp = 'broker_errored'
broker_details = msg.dict() broker_details = msg
# don't relay message to order requester client # don't relay message to order requester client
# continue # continue
@ -758,7 +756,7 @@ async def translate_and_relay_brokerd_events(
resp = 'broker_' + msg.status resp = 'broker_' + msg.status
# pass the BrokerdStatus msg inside the broker details field # pass the BrokerdStatus msg inside the broker details field
broker_details = msg.dict() broker_details = msg
elif name in ( elif name in (
'fill', 'fill',
@ -767,7 +765,7 @@ async def translate_and_relay_brokerd_events(
# proxy through the "fill" result(s) # proxy through the "fill" result(s)
resp = 'broker_filled' resp = 'broker_filled'
broker_details = msg.dict() broker_details = msg
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
@ -785,7 +783,7 @@ async def translate_and_relay_brokerd_events(
time_ns=time.time_ns(), time_ns=time.time_ns(),
broker_reqid=reqid, broker_reqid=reqid,
brokerd_msg=broker_details, brokerd_msg=broker_details,
).dict() )
) )
except KeyError: except KeyError:
log.error( log.error(
@ -857,7 +855,7 @@ async def process_client_order_cmds(
f'Submitting cancel for live order {reqid}' f'Submitting cancel for live order {reqid}'
) )
await brokerd_order_stream.send(msg.dict()) await brokerd_order_stream.send(msg)
else: else:
# this might be a cancel for an order that hasn't been # this might be a cancel for an order that hasn't been
@ -879,7 +877,7 @@ async def process_client_order_cmds(
resp='dark_cancelled', resp='dark_cancelled',
oid=oid, oid=oid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
).dict() )
) )
# de-register this client dialogue # de-register this client dialogue
router.dialogues.pop(oid) router.dialogues.pop(oid)
@ -934,7 +932,7 @@ async def process_client_order_cmds(
# handle relaying the ems side responses back to # handle relaying the ems side responses back to
# the client/cmd sender from this request # the client/cmd sender from this request
log.info(f'Sending live order to {broker}:\n{pformat(msg)}') log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
await brokerd_order_stream.send(msg.dict()) await brokerd_order_stream.send(msg)
# an immediate response should be ``BrokerdOrderAck`` # an immediate response should be ``BrokerdOrderAck``
# with ems order id from the ``trades_dialogue()`` # with ems order id from the ``trades_dialogue()``
@ -959,6 +957,12 @@ async def process_client_order_cmds(
# like every other shitty tina platform that makes # like every other shitty tina platform that makes
# the user choose the predicate operator. # the user choose the predicate operator.
last = dark_book.lasts[fqsn] last = dark_book.lasts[fqsn]
# sometimes the real-time feed hasn't come up
# so just pull from the latest history.
if isnan(last):
last = feed.shm.array[-1]['close']
pred = mk_check(trigger_price, last, action) pred = mk_check(trigger_price, last, action)
spread_slap: float = 5 spread_slap: float = 5
@ -1008,7 +1012,7 @@ async def process_client_order_cmds(
resp=resp, resp=resp,
oid=oid, oid=oid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
).dict() )
) )
@ -1146,8 +1150,14 @@ async def _emsd_main(
) )
finally: finally:
# remove client from "registry" # try to remove client from "registry"
_router.clients.remove(ems_client_order_stream) try:
_router.clients.remove(ems_client_order_stream)
except KeyError:
log.warning(
f'Stream {ems_client_order_stream._ctx.chan.uid}'
' was already dropped?'
)
dialogues = _router.dialogues dialogues = _router.dialogues

View File

@ -20,16 +20,15 @@ Clearing system messagingn types and protocols.
""" """
from typing import Optional, Union from typing import Optional, Union
# TODO: try out just encoding/send direction for now?
# import msgspec
from pydantic import BaseModel
from ..data._source import Symbol from ..data._source import Symbol
from ..data.types import Struct
# --------------
# Client -> emsd # Client -> emsd
# --------------
class Cancel(Struct):
class Cancel(BaseModel):
'''Cancel msg for removing a dark (ems triggered) or '''Cancel msg for removing a dark (ems triggered) or
broker-submitted (live) trigger/order. broker-submitted (live) trigger/order.
@ -39,7 +38,7 @@ class Cancel(BaseModel):
symbol: str symbol: str
class Order(BaseModel): class Order(Struct):
action: str # {'buy', 'sell', 'alert'} action: str # {'buy', 'sell', 'alert'}
# internal ``emdsd`` unique "order id" # internal ``emdsd`` unique "order id"
@ -59,20 +58,14 @@ class Order(BaseModel):
# the backend broker # the backend broker
exec_mode: str # {'dark', 'live', 'paper'} exec_mode: str # {'dark', 'live', 'paper'}
class Config:
# just for pre-loading a ``Symbol`` when used
# in the order mode staging process
arbitrary_types_allowed = True
# don't copy this model instance when used in
# a recursive model
copy_on_model_validation = False
# --------------
# Client <- emsd # Client <- emsd
# --------------
# update msgs from ems which relay state change info # update msgs from ems which relay state change info
# from the active clearing engine. # from the active clearing engine.
class Status(Struct):
class Status(BaseModel):
name: str = 'status' name: str = 'status'
oid: str # uuid4 oid: str # uuid4
@ -95,8 +88,6 @@ class Status(BaseModel):
# } # }
resp: str # "response", see above resp: str # "response", see above
# symbol: str
# trigger info # trigger info
trigger_price: Optional[float] = None trigger_price: Optional[float] = None
# price: float # price: float
@ -111,10 +102,12 @@ class Status(BaseModel):
brokerd_msg: dict = {} brokerd_msg: dict = {}
# ---------------
# emsd -> brokerd # emsd -> brokerd
# ---------------
# requests *sent* from ems to respective backend broker daemon # requests *sent* from ems to respective backend broker daemon
class BrokerdCancel(BaseModel): class BrokerdCancel(Struct):
action: str = 'cancel' action: str = 'cancel'
oid: str # piker emsd order id oid: str # piker emsd order id
@ -130,7 +123,7 @@ class BrokerdCancel(BaseModel):
reqid: Optional[Union[int, str]] = None reqid: Optional[Union[int, str]] = None
class BrokerdOrder(BaseModel): class BrokerdOrder(Struct):
action: str # {buy, sell} action: str # {buy, sell}
oid: str oid: str
@ -150,11 +143,12 @@ class BrokerdOrder(BaseModel):
size: float size: float
# ---------------
# emsd <- brokerd # emsd <- brokerd
# ---------------
# requests *received* to ems from broker backend # requests *received* to ems from broker backend
class BrokerdOrderAck(Struct):
class BrokerdOrderAck(BaseModel):
''' '''
Immediate reponse to a brokerd order request providing the broker Immediate reponse to a brokerd order request providing the broker
specific unique order id so that the EMS can associate this specific unique order id so that the EMS can associate this
@ -172,7 +166,7 @@ class BrokerdOrderAck(BaseModel):
account: str = '' account: str = ''
class BrokerdStatus(BaseModel): class BrokerdStatus(Struct):
name: str = 'status' name: str = 'status'
reqid: Union[int, str] reqid: Union[int, str]
@ -205,7 +199,7 @@ class BrokerdStatus(BaseModel):
} }
class BrokerdFill(BaseModel): class BrokerdFill(Struct):
''' '''
A single message indicating a "fill-details" event from the broker A single message indicating a "fill-details" event from the broker
if avaiable. if avaiable.
@ -230,7 +224,7 @@ class BrokerdFill(BaseModel):
broker_time: float broker_time: float
class BrokerdError(BaseModel): class BrokerdError(Struct):
''' '''
Optional error type that can be relayed to emsd for error handling. Optional error type that can be relayed to emsd for error handling.
@ -249,7 +243,7 @@ class BrokerdError(BaseModel):
broker_details: dict = {} broker_details: dict = {}
class BrokerdPosition(BaseModel): class BrokerdPosition(Struct):
'''Position update event from brokerd. '''Position update event from brokerd.
''' '''

View File

@ -30,8 +30,8 @@ import trio
import tractor import tractor
from dataclasses import dataclass from dataclasses import dataclass
from .. import data
from ..data._source import Symbol from ..data._source import Symbol
from ..data.feed import open_feed
from ..pp import Position from ..pp import Position
from ..data._normalize import iterticks from ..data._normalize import iterticks
from ..data._source import unpack_fqsn from ..data._source import unpack_fqsn
@ -117,7 +117,7 @@ class PaperBoi:
reason='paper_trigger', reason='paper_trigger',
remaining=size, remaining=size,
) )
await self.ems_trades_stream.send(msg.dict()) await self.ems_trades_stream.send(msg)
# if we're already a clearing price simulate an immediate fill # if we're already a clearing price simulate an immediate fill
if ( if (
@ -173,7 +173,7 @@ class PaperBoi:
broker=self.broker, broker=self.broker,
time_ns=time.time_ns(), time_ns=time.time_ns(),
) )
await self.ems_trades_stream.send(msg.dict()) await self.ems_trades_stream.send(msg)
async def fake_fill( async def fake_fill(
self, self,
@ -216,7 +216,7 @@ class PaperBoi:
'name': self.broker + '_paper', 'name': self.broker + '_paper',
}, },
) )
await self.ems_trades_stream.send(msg.dict()) await self.ems_trades_stream.send(msg)
if order_complete: if order_complete:
@ -240,7 +240,7 @@ class PaperBoi:
'name': self.broker, 'name': self.broker,
}, },
) )
await self.ems_trades_stream.send(msg.dict()) await self.ems_trades_stream.send(msg)
# lookup any existing position # lookup any existing position
token = f'{symbol}.{self.broker}' token = f'{symbol}.{self.broker}'
@ -268,7 +268,7 @@ class PaperBoi:
) )
pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price) pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price)
await self.ems_trades_stream.send(pp_msg.dict()) await self.ems_trades_stream.send(pp_msg)
async def simulate_fills( async def simulate_fills(
@ -384,7 +384,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'Paper only. No account found: `{account}` ?', reason=f'Paper only. No account found: `{account}` ?',
).dict()) ))
continue continue
# validate # validate
@ -416,7 +416,7 @@ async def handle_order_requests(
# broker specific request id # broker specific request id
reqid=reqid, reqid=reqid,
).dict() )
) )
elif action == 'cancel': elif action == 'cancel':
@ -441,14 +441,11 @@ async def trades_dialogue(
) -> None: ) -> None:
tractor.log.get_console_log(loglevel) tractor.log.get_console_log(loglevel)
async with ( async with open_feed(
[fqsn],
loglevel=loglevel,
) as feed:
data.open_feed(
[fqsn],
loglevel=loglevel,
) as feed,
):
# TODO: load paper positions per broker from .toml config file # TODO: load paper positions per broker from .toml config file
# and pass as symbol to position data mapping: ``dict[str, dict]`` # and pass as symbol to position data mapping: ``dict[str, dict]``
# await ctx.started(all_positions) # await ctx.started(all_positions)

View File

@ -30,19 +30,19 @@ from ._sharedmem import (
get_shm_token, get_shm_token,
ShmArray, ShmArray,
) )
from .feed import ( # from .feed import (
open_feed, # # open_feed,
_setup_persistent_brokerd, # _setup_persistent_brokerd,
) # )
__all__ = [ __all__ = [
'open_feed', # 'open_feed',
'ShmArray', 'ShmArray',
'iterticks', 'iterticks',
'maybe_open_shm_array', 'maybe_open_shm_array',
'attach_shm_array', 'attach_shm_array',
'open_shm_array', 'open_shm_array',
'get_shm_token', 'get_shm_token',
'_setup_persistent_brokerd', # '_setup_persistent_brokerd',
] ]

View File

@ -51,10 +51,6 @@ class DockerNotStarted(Exception):
'Prolly you dint start da daemon bruh' 'Prolly you dint start da daemon bruh'
class ContainerError(RuntimeError):
'Error reported via app-container logging level'
@acm @acm
async def open_docker( async def open_docker(
url: Optional[str] = None, url: Optional[str] = None,
@ -189,7 +185,7 @@ class Container:
def hard_kill(self, start: float) -> None: def hard_kill(self, start: float) -> None:
delay = time.time() - start delay = time.time() - start
log.error( log.error(
f'Failed to kill container {cid} after {delay}s\n' f'Failed to kill container {self.cntr.id} after {delay}s\n'
'sending SIGKILL..' 'sending SIGKILL..'
) )
# get out the big guns, bc apparently marketstore # get out the big guns, bc apparently marketstore

View File

@ -128,12 +128,15 @@ class _Token(Struct, frozen=True):
@classmethod @classmethod
def from_msg(cls, msg: dict) -> _Token: def from_msg(cls, msg: dict) -> _Token:
if isinstance(msg, _Token):
return msg
# TODO: native struct decoding # TODO: native struct decoding
# return _token_dec.decode(msg) # return _token_dec.decode(msg)
if isinstance(msg, _Token):
return msg
# assert 0
msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr'])) msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
return _Token(**msg) return _Token(**msg)

View File

@ -0,0 +1,68 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Built-in (extension) types.
"""
from typing import Optional
from pprint import pformat
import msgspec
class Struct(
msgspec.Struct,
# https://jcristharif.com/msgspec/structs.html#tagged-unions
# tag='pikerstruct',
# tag=True,
):
'''
A "human friendlier" (aka repl buddy) struct subtype.
'''
def to_dict(self) -> dict:
return {
f: getattr(self, f)
for f in self.__struct_fields__
}
def __repr__(self):
return f'Struct({pformat(self.to_dict())})'
def copy(
self,
update: Optional[dict] = None,
) -> msgspec.Struct:
'''
Validate-typecast all self defined fields, return a copy of us
with all such fields.
This is kinda like the default behaviour in `pydantic.BaseModel`.
'''
if update:
for k, v in update.items():
setattr(self, k, v)
# roundtrip serialize to validate
return msgspec.msgpack.Decoder(
type=type(self)
).decode(
msgspec.msgpack.Encoder().encode(self)
)

View File

@ -619,7 +619,7 @@ class FillStatusBar(QProgressBar):
# color: #19232D; # color: #19232D;
# width: 10px; # width: 10px;
self.setRange(0, slots) self.setRange(0, int(slots))
self.setValue(value) self.setValue(value)

View File

@ -264,7 +264,8 @@ class OrderMode:
self, self,
) -> OrderDialog: ) -> OrderDialog:
'''Send execution order to EMS return a level line to '''
Send execution order to EMS return a level line to
represent the order on a chart. represent the order on a chart.
''' '''
@ -273,13 +274,9 @@ class OrderMode:
oid = str(uuid.uuid4()) oid = str(uuid.uuid4())
# format order data for ems # format order data for ems
fqsn = symbol.front_fqsn() order = staged.copy()
order = staged.copy( order.oid = oid
update={ order.symbol = symbol.front_fqsn()
'symbol': fqsn,
'oid': oid,
}
)
line = self.line_from_order( line = self.line_from_order(
order, order,