Merge pull request #408 from pikers/offline_dark_clearing

Offline dark clearing
ci_fix_tractor_testing
goodboy 2022-10-10 09:25:59 -04:00 committed by GitHub
commit 5144299f4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 460 additions and 348 deletions

View File

@ -19,15 +19,14 @@ Orders and execution client API.
""" """
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from typing import Dict
from pprint import pformat from pprint import pformat
from dataclasses import dataclass, field
import trio import trio
import tractor import tractor
from tractor.trionics import broadcast_receiver from tractor.trionics import broadcast_receiver
from ..log import get_logger from ..log import get_logger
from ..data.types import Struct
from ._ems import _emsd_main from ._ems import _emsd_main
from .._daemon import maybe_open_emsd from .._daemon import maybe_open_emsd
from ._messages import Order, Cancel from ._messages import Order, Cancel
@ -37,8 +36,7 @@ from ..brokers import get_brokermod
log = get_logger(__name__) log = get_logger(__name__)
@dataclass class OrderBook(Struct):
class OrderBook:
'''EMS-client-side order book ctl and tracking. '''EMS-client-side order book ctl and tracking.
A style similar to "model-view" is used here where this api is A style similar to "model-view" is used here where this api is
@ -53,9 +51,7 @@ class OrderBook:
# mem channels used to relay order requests to the EMS daemon # mem channels used to relay order requests to the EMS daemon
_to_ems: trio.abc.SendChannel _to_ems: trio.abc.SendChannel
_from_order_book: trio.abc.ReceiveChannel _from_order_book: trio.abc.ReceiveChannel
_sent_orders: dict[str, Order] = {}
_sent_orders: Dict[str, Order] = field(default_factory=dict)
_ready_to_receive: trio.Event = trio.Event()
def send( def send(
self, self,
@ -66,7 +62,7 @@ class OrderBook:
self._to_ems.send_nowait(msg) self._to_ems.send_nowait(msg)
return msg return msg
def update( def send_update(
self, self,
uuid: str, uuid: str,

View File

@ -23,7 +23,7 @@ from collections import (
defaultdict, defaultdict,
# ChainMap, # ChainMap,
) )
from contextlib import asynccontextmanager from contextlib import asynccontextmanager as acm
from math import isnan from math import isnan
from pprint import pformat from pprint import pformat
import time import time
@ -31,6 +31,7 @@ from typing import (
AsyncIterator, AsyncIterator,
Any, Any,
Callable, Callable,
Hashable,
Optional, Optional,
) )
@ -41,9 +42,16 @@ import tractor
from ..log import get_logger from ..log import get_logger
from ..data._normalize import iterticks from ..data._normalize import iterticks
from ..data.feed import Feed, maybe_open_feed from ..data._source import (
unpack_fqsn,
mk_fqsn,
)
from ..data.feed import (
Feed,
maybe_open_feed,
)
from ..ui._notify import notify_from_ems_status_msg
from ..data.types import Struct from ..data.types import Struct
from .._daemon import maybe_spawn_brokerd
from . import _paper_engine as paper from . import _paper_engine as paper
from ._messages import ( from ._messages import (
Order, Order,
@ -102,7 +110,7 @@ def mk_check(
) )
class _DarkBook(Struct): class DarkBook(Struct):
''' '''
EMS-trigger execution book. EMS-trigger execution book.
@ -117,7 +125,7 @@ class _DarkBook(Struct):
broker: str broker: str
# levels which have an executable action (eg. alert, order, signal) # levels which have an executable action (eg. alert, order, signal)
orders: dict[ triggers: dict[
str, # symbol str, # symbol
dict[ dict[
str, # uuid str, # uuid
@ -129,15 +137,8 @@ class _DarkBook(Struct):
] ]
] = {} ] = {}
# tracks most recent values per symbol each from data feed lasts: dict[str, float] = {} # quote prices
lasts: dict[ _active: dict[str, Status] = {} # active order dialogs
str,
float,
] = {}
# _ems_entries: dict[str, str] = {}
_active: dict = {}
_ems2brokerd_ids: dict[str, str] = bidict() _ems2brokerd_ids: dict[str, str] = bidict()
@ -156,7 +157,7 @@ async def clear_dark_triggers(
broker: str, broker: str,
fqsn: str, fqsn: str,
book: _DarkBook, book: DarkBook,
) -> None: ) -> None:
''' '''
@ -173,7 +174,7 @@ async def clear_dark_triggers(
async for quotes in quote_stream: async for quotes in quote_stream:
# start = time.time() # start = time.time()
for sym, quote in quotes.items(): for sym, quote in quotes.items():
execs = book.orders.get(sym, {}) execs = book.triggers.get(sym, {})
for tick in iterticks( for tick in iterticks(
quote, quote,
# dark order price filter(s) # dark order price filter(s)
@ -215,6 +216,7 @@ async def clear_dark_triggers(
brokerd_msg: Optional[BrokerdOrder] = None brokerd_msg: Optional[BrokerdOrder] = None
match cmd: match cmd:
# alert: nothing to do but relay a status # alert: nothing to do but relay a status
# back to the requesting ems client # back to the requesting ems client
case Order(action='alert'): case Order(action='alert'):
@ -244,12 +246,8 @@ async def clear_dark_triggers(
price=submit_price, price=submit_price,
size=size, size=size,
) )
await brokerd_orders_stream.send(brokerd_msg) await brokerd_orders_stream.send(brokerd_msg)
# book._ems_entries[oid] = live_req
# book._msgflows[oid].maps.insert(0, live_req)
case _: case _:
raise ValueError(f'Invalid dark book entry: {cmd}') raise ValueError(f'Invalid dark book entry: {cmd}')
@ -294,7 +292,7 @@ async def clear_dark_triggers(
else: # condition scan loop complete else: # condition scan loop complete
log.debug(f'execs are {execs}') log.debug(f'execs are {execs}')
if execs: if execs:
book.orders[fqsn] = execs book.triggers[fqsn] = execs
# print(f'execs scan took: {time.time() - start}') # print(f'execs scan took: {time.time() - start}')
@ -331,10 +329,7 @@ class Router(Struct):
nursery: trio.Nursery nursery: trio.Nursery
# broker to book map # broker to book map
books: dict[str, _DarkBook] = {} books: dict[str, DarkBook] = {}
# order id to client stream map
clients: set[tractor.MsgStream] = set()
# sets of clients mapped from subscription keys # sets of clients mapped from subscription keys
subscribers: defaultdict[ subscribers: defaultdict[
@ -365,9 +360,9 @@ class Router(Struct):
self, self,
brokername: str, brokername: str,
) -> _DarkBook: ) -> DarkBook:
return self.books.setdefault(brokername, _DarkBook(brokername)) return self.books.setdefault(brokername, DarkBook(brokername))
def get_subs( def get_subs(
self, self,
@ -383,123 +378,33 @@ class Router(Struct):
if not stream._closed if not stream._closed
) )
@asynccontextmanager @acm
async def maybe_open_brokerd_trades_dialogue( async def maybe_open_brokerd_dialog(
self, self,
feed: Feed, feed: Feed,
symbol: str,
dark_book: _DarkBook,
exec_mode: str, exec_mode: str,
symbol: str,
loglevel: str, loglevel: str,
) -> tuple[dict, tractor.MsgStream]:
'''
Open and yield ``brokerd`` trades dialogue context-stream if
none already exists.
'''
broker = feed.mod.name
relay: TradesRelay = self.relays.get(broker)
if (
relay is None
# We always want to spawn a new relay for the paper engine
# per symbol since we need a new tractor context to be
# opened for every every symbol such that a new data feed
# and ``PaperBoi`` client will be created and then used to
# simulate clearing events.
or exec_mode == 'paper'
):
relay = await self.nursery.start(
open_brokerd_trades_dialog,
self,
feed,
symbol,
exec_mode,
loglevel,
)
self.nursery.start_soon(
translate_and_relay_brokerd_events,
broker,
relay.brokerd_stream,
self,
)
relay.consumers += 1
# TODO: get updated positions here?
assert relay.brokerd_stream
try:
yield relay
finally:
# TODO: what exactly needs to be torn down here or
# are we just consumer tracking?
relay.consumers -= 1
async def client_broadcast(
self,
sub_key: str,
msg: dict,
) -> None: ) -> None:
to_remove: set[tractor.MsgStream] = set() brokermod = feed.mod
subs = self.subscribers[sub_key] broker = brokermod.name
for client_stream in subs: relay: TradesRelay = self.relays.get(broker)
try: if (
await client_stream.send(msg) relay
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
):
to_remove.add(client_stream)
self.clients.remove(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
if to_remove: # We always want to spawn a new relay for the paper
subs.difference_update(to_remove) # engine per symbol since we need a new tractor context
# to be opened for every every symbol such that a new
# data feed and ``PaperBoi`` client will be created and
# then used to simulate clearing events.
and exec_mode != 'paper'
):
# deliver already cached instance
yield relay
return
trades_endpoint = getattr(brokermod, 'trades_dialogue', None)
_router: Router = None
async def open_brokerd_trades_dialog(
router: Router,
feed: Feed,
symbol: str,
exec_mode: str,
loglevel: str,
task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED,
) -> tuple[dict, tractor.MsgStream]:
'''
Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
broker = feed.mod.name
# TODO: make a `tractor` bug/test for this!
# if only i could member what the problem was..
# probably some GC of the portal thing?
# portal = feed.portal
# XXX: we must have our own portal + channel otherwise
# when the data feed closes it may result in a half-closed
# channel that the brokerd side thinks is still open somehow!?
async with maybe_spawn_brokerd(
broker,
loglevel=loglevel,
) as portal:
if ( if (
trades_endpoint is None trades_endpoint is None
or exec_mode == 'paper' or exec_mode == 'paper'
@ -522,77 +427,201 @@ async def open_brokerd_trades_dialog(
else: else:
# open live brokerd trades endpoint # open live brokerd trades endpoint
open_trades_endpoint = portal.open_context( open_trades_endpoint = feed.portal.open_context(
trades_endpoint, trades_endpoint,
loglevel=loglevel, loglevel=loglevel,
) )
try: # open trades-dialog endpoint with backend broker
positions: list[BrokerdPosition] positions: list[BrokerdPosition]
accounts: tuple[str] accounts: tuple[str]
async with ( async with (
open_trades_endpoint as (brokerd_ctx, (positions, accounts,)), open_trades_endpoint as (
brokerd_ctx.open_stream() as brokerd_trades_stream, brokerd_ctx,
): (positions, accounts,),
# XXX: really we only want one stream per `emsd` actor ),
# to relay global `brokerd` order events unless we're brokerd_ctx.open_stream() as brokerd_trades_stream,
# going to expect each backend to relay only orders ):
# affiliated with a particular ``trades_dialogue()`` # XXX: really we only want one stream per `emsd`
# session (seems annoying for implementers). So, here # actor to relay global `brokerd` order events
# we cache the relay task and instead of running multiple # unless we're going to expect each backend to
# tasks (which will result in multiples of the same msg being # relay only orders affiliated with a particular
# relayed for each EMS client) we just register each client # ``trades_dialogue()`` session (seems annoying
# stream to this single relay loop in the dialog table. # for implementers). So, here we cache the relay
# task and instead of running multiple tasks
# (which will result in multiples of the same
# msg being relayed for each EMS client) we just
# register each client stream to this single
# relay loop in the dialog table.
# begin processing order events from the target brokerd backend # begin processing order events from the target
# by receiving order submission response messages, # brokerd backend by receiving order submission
# normalizing them to EMS messages and relaying back to # response messages, normalizing them to EMS
# the piker order client set. # messages and relaying back to the piker order
# client set.
# locally cache and track positions per account with # locally cache and track positions per account with
# a table of (brokername, acctid) -> `BrokerdPosition` # a table of (brokername, acctid) -> `BrokerdPosition`
# msgs. # msgs.
pps = {} pps = {}
for msg in positions: for msg in positions:
log.info(f'loading pp: {msg}') log.info(f'loading pp: {msg}')
account = msg['account'] account = msg['account']
# TODO: better value error for this which # TODO: better value error for this which
# dumps the account and message and states the # dumps the account and message and states the
# mismatch.. # mismatch..
assert account in accounts assert account in accounts
pps.setdefault( pps.setdefault(
(broker, account), (broker, account),
[], [],
).append(msg) ).append(msg)
relay = TradesRelay( relay = TradesRelay(
brokerd_stream=brokerd_trades_stream, brokerd_stream=brokerd_trades_stream,
positions=pps, positions=pps,
accounts=accounts, accounts=accounts,
consumers=1, consumers=1,
)
self.relays[broker] = relay
# this context should block here indefinitely until
# the ``brokerd`` task either dies or is cancelled
try:
yield relay
finally:
# parent context must have been closed remove from cache so
# next client will respawn if needed
relay = self.relays.pop(broker, None)
if not relay:
log.warning(
f'Relay for {broker} was already removed!?')
async def open_trade_relays(
self,
fqsn: str,
exec_mode: str,
loglevel: str,
task_status: TaskStatus[
tuple[TradesRelay, Feed]
] = trio.TASK_STATUS_IGNORED,
) -> tuple[TradesRelay, Feed]:
'''
Open and yield ``brokerd`` trades dialogue context-stream if
none already exists.
'''
from ..data._source import unpack_fqsn
broker, symbol, suffix = unpack_fqsn(fqsn)
async with (
maybe_open_feed(
[fqsn],
loglevel=loglevel,
) as (feed, quote_stream),
):
brokermod = feed.mod
broker = brokermod.name
# XXX: this should be initial price quote from target provider
first_quote: dict = feed.first_quotes[fqsn]
book: DarkBook = self.get_dark_book(broker)
book.lasts[fqsn]: float = first_quote['last']
async with self.maybe_open_brokerd_dialog(
feed=feed,
exec_mode=exec_mode,
symbol=symbol,
loglevel=loglevel,
) as relay:
# dark book clearing loop, also lives with parent
# daemon to allow dark order clearing while no
# client is connected.
self.nursery.start_soon(
clear_dark_triggers,
self,
relay.brokerd_stream,
quote_stream,
broker,
fqsn, # form: <name>.<venue>.<suffix>.<broker>
book
) )
router.relays[broker] = relay client_ready = trio.Event()
task_status.started((relay, feed, client_ready))
# the ems scan loop may be cancelled by the client but we # sync to the client side by waiting for the stream
# want to keep the ``brokerd`` dialogue up regardless # connection setup before relaying any existing live
# orders from the brokerd.
await client_ready.wait()
assert self.subscribers
task_status.started(relay) # spawn a ``brokerd`` order control dialog stream
# that syncs lifetime with the parent `emsd` daemon.
self.nursery.start_soon(
translate_and_relay_brokerd_events,
broker,
relay.brokerd_stream,
self,
)
# this context should block here indefinitely until
# the ``brokerd`` task either dies or is cancelled
await trio.sleep_forever() await trio.sleep_forever()
finally: async def client_broadcast(
# parent context must have been closed remove from cache so self,
# next client will respawn if needed sub_key: str,
relay = router.relays.pop(broker, None) msg: dict,
if not relay: notify_on_headless: bool = True,
log.warning(f'Relay for {broker} was already removed!?')
) -> bool:
to_remove: set[tractor.MsgStream] = set()
if sub_key == 'all':
subs = set()
for s in self.subscribers.values():
subs |= s
else:
subs = self.subscribers[sub_key]
sent_some: bool = False
for client_stream in subs:
try:
await client_stream.send(msg)
sent_some = True
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
):
to_remove.add(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
if to_remove:
subs.difference_update(to_remove)
if (
not sent_some
and notify_on_headless
):
log.info(
'No clients attached, firing notification for {sub_key} msg:\n'
f'{msg}'
)
await notify_from_ems_status_msg(
msg,
is_subproc=True,
)
return sent_some
_router: Router = None
@tractor.context @tractor.context
@ -643,7 +672,7 @@ async def translate_and_relay_brokerd_events(
{'presubmitted', 'submitted', 'cancelled', 'inactive'} {'presubmitted', 'submitted', 'cancelled', 'inactive'}
''' '''
book: _DarkBook = router.get_dark_book(broker) book: DarkBook = router.get_dark_book(broker)
relay: TradesRelay = router.relays[broker] relay: TradesRelay = router.relays[broker]
assert relay.brokerd_stream == brokerd_trades_stream assert relay.brokerd_stream == brokerd_trades_stream
@ -677,7 +706,9 @@ async def translate_and_relay_brokerd_events(
# fan-out-relay position msgs immediately by # fan-out-relay position msgs immediately by
# broadcasting updates on all client streams # broadcasting updates on all client streams
await router.client_broadcast(sym, pos_msg) # TODO: this should be subscription based for privacy
# eventually!
await router.client_broadcast('all', pos_msg)
continue continue
# BrokerdOrderAck # BrokerdOrderAck
@ -769,13 +800,8 @@ async def translate_and_relay_brokerd_events(
# TODO: maybe pack this into a composite type that # TODO: maybe pack this into a composite type that
# contains both the IPC stream as well the # contains both the IPC stream as well the
# msg-chain/dialog. # msg-chain/dialog.
ems_client_order_streams = router.get_subs(oid)
status_msg = book._active.get(oid) status_msg = book._active.get(oid)
if not status_msg:
if (
not ems_client_order_streams
or not status_msg
):
log.warning( log.warning(
f'Received status for untracked dialog {oid}:\n' f'Received status for untracked dialog {oid}:\n'
f'{fmsg}' f'{fmsg}'
@ -827,7 +853,7 @@ async def translate_and_relay_brokerd_events(
# ``.oid`` in the msg since we're planning to # ``.oid`` in the msg since we're planning to
# maybe-kinda offer that via using ``Status`` # maybe-kinda offer that via using ``Status``
# in the longer run anyway? # in the longer run anyway?
log.warning(f'Unkown fill for {fmsg}') log.warning(f'Unknown fill for {fmsg}')
continue continue
# proxy through the "fill" result(s) # proxy through the "fill" result(s)
@ -843,6 +869,11 @@ async def translate_and_relay_brokerd_events(
status_msg.reqid = reqid status_msg.reqid = reqid
status_msg.brokerd_msg = msg status_msg.brokerd_msg = msg
# TODO: if no client is connected (aka we're
# headless) we should record the fill in the
# ``.msg_flow`` chain and re-transmit on client
# connect so that fills can be displayed in a
# chart?
await router.client_broadcast( await router.client_broadcast(
status_msg.req.symbol, status_msg.req.symbol,
status_msg, status_msg,
@ -881,7 +912,11 @@ async def translate_and_relay_brokerd_events(
# use backend request id as our ems id though this # use backend request id as our ems id though this
# may end up with collisions? # may end up with collisions?
status_msg = Status(**brokerd_msg) status_msg = Status(**brokerd_msg)
# NOTE: be sure to pack an fqsn for the client side!
order = Order(**status_msg.req) order = Order(**status_msg.req)
order.symbol = mk_fqsn(broker, order.symbol)
assert order.price and order.size assert order.price and order.size
status_msg.req = order status_msg.req = order
@ -957,7 +992,7 @@ async def process_client_order_cmds(
fqsn: str, fqsn: str,
feed: Feed, feed: Feed,
dark_book: _DarkBook, dark_book: DarkBook,
router: Router, router: Router,
) -> None: ) -> None:
@ -1026,7 +1061,6 @@ async def process_client_order_cmds(
# acked yet by a brokerd, so register a cancel for when # acked yet by a brokerd, so register a cancel for when
# the order ack does show up later such that the brokerd # the order ack does show up later such that the brokerd
# order request can be cancelled at that time. # order request can be cancelled at that time.
# dark_book._ems_entries[oid] = msg
# special case for now.. # special case for now..
status.req = to_brokerd_msg status.req = to_brokerd_msg
@ -1039,7 +1073,7 @@ async def process_client_order_cmds(
and status.resp == 'dark_open' and status.resp == 'dark_open'
): ):
# remove from dark book clearing # remove from dark book clearing
entry = dark_book.orders[fqsn].pop(oid, None) entry = dark_book.triggers[fqsn].pop(oid, None)
if entry: if entry:
( (
pred, pred,
@ -1201,7 +1235,7 @@ async def process_client_order_cmds(
# submit execution/order to EMS scan loop # submit execution/order to EMS scan loop
# NOTE: this may result in an override of an existing # NOTE: this may result in an override of an existing
# dark book entry if the order id already exists # dark book entry if the order id already exists
dark_book.orders.setdefault( dark_book.triggers.setdefault(
fqsn, {} fqsn, {}
)[oid] = ( )[oid] = (
pred, pred,
@ -1232,6 +1266,59 @@ async def process_client_order_cmds(
) )
@acm
async def maybe_open_trade_relays(
router: Router,
fqsn: str,
exec_mode: str, # ('paper', 'live')
loglevel: str = 'info',
) -> tuple:
def cache_on_fqsn_unless_paper(
router: Router,
fqsn: str,
exec_mode: str, # ('paper', 'live')
loglevel: str = 'info',
) -> Hashable:
if exec_mode == 'paper':
return f'paper_{fqsn}'
else:
return fqsn
# XXX: closure to enable below use of
# ``tractor.trionics.maybe_open_context()``
@acm
async def cached_mngr(
router: Router,
fqsn: str,
exec_mode: str, # ('paper', 'live')
loglevel: str = 'info',
):
relay, feed, client_ready = await _router.nursery.start(
_router.open_trade_relays,
fqsn,
exec_mode,
loglevel,
)
yield relay, feed, client_ready
async with tractor.trionics.maybe_open_context(
acm_func=cached_mngr,
kwargs={
'router': _router,
'fqsn': fqsn,
'exec_mode': exec_mode,
'loglevel': loglevel,
},
key=cache_on_fqsn_unless_paper,
) as (
cache_hit,
(relay, feed, client_ready)
):
yield relay, feed, client_ready
@tractor.context @tractor.context
async def _emsd_main( async def _emsd_main(
ctx: tractor.Context, ctx: tractor.Context,
@ -1284,9 +1371,7 @@ async def _emsd_main(
global _router global _router
assert _router assert _router
from ..data._source import unpack_fqsn
broker, symbol, suffix = unpack_fqsn(fqsn) broker, symbol, suffix = unpack_fqsn(fqsn)
dark_book = _router.get_dark_book(broker)
# TODO: would be nice if in tractor we can require either a ctx arg, # TODO: would be nice if in tractor we can require either a ctx arg,
# or a named arg with ctx in it and a type annotation of # or a named arg with ctx in it and a type annotation of
@ -1294,108 +1379,71 @@ async def _emsd_main(
ems_ctx = ctx ems_ctx = ctx
# spawn one task per broker feed # spawn one task per broker feed
relay: TradesRelay
feed: Feed feed: Feed
async with ( client_ready: trio.Event
maybe_open_feed(
[fqsn],
loglevel=loglevel,
) as (feed, quote_stream),
):
# XXX: this should be initial price quote from target provider # NOTE: open a stream with the brokerd backend for order flow
first_quote: dict = feed.first_quotes[fqsn] # dialogue and dark clearing but only open one: we try to keep as
book: _DarkBook = _router.get_dark_book(broker) # few duplicate streams as necessary per ems actor.
book.lasts[fqsn]: float = first_quote['last'] async with maybe_open_trade_relays(
_router,
fqsn,
exec_mode,
loglevel,
) as (relay, feed, client_ready):
# open a stream with the brokerd backend for order brokerd_stream = relay.brokerd_stream
# flow dialogue dark_book = _router.get_dark_book(broker)
async with (
# only open if one isn't already up: we try to keep # signal to client that we're started and deliver
# as few duplicate streams as necessary # all known pps and accounts for this ``brokerd``.
_router.maybe_open_brokerd_trades_dialogue( await ems_ctx.started((
feed, relay.positions,
symbol, list(relay.accounts),
dark_book, dark_book._active,
exec_mode, ))
loglevel,
) as relay, # establish 2-way stream with requesting order-client and
trio.open_nursery() as n, # begin handling inbound order requests and updates
): async with ems_ctx.open_stream() as client_stream:
brokerd_stream = relay.brokerd_stream # register the client side before starting the
# brokerd-side relay task to ensure the client is
# delivered all exisiting open orders on startup.
# TODO: instead of by fqsn we need a subscription
# system/schema here to limit what each new client is
# allowed to see in terms of broadcasted order flow
# updates per dialog.
_router.subscribers[fqsn].add(client_stream)
client_ready.set()
# signal to client that we're started and deliver # start inbound (from attached client) order request processing
# all known pps and accounts for this ``brokerd``. # main entrypoint, run here until cancelled.
await ems_ctx.started(( try:
relay.positions, await process_client_order_cmds(
list(relay.accounts), client_stream,
book._active,
))
# establish 2-way stream with requesting order-client and
# begin handling inbound order requests and updates
async with ems_ctx.open_stream() as client_stream:
# register the client side before starting the
# brokerd-side relay task to ensure the client is
# delivered all exisiting open orders on startup.
_router.clients.add(client_stream)
# TODO: instead of by fqsn we need a subscription
# system/schema here to limit what each new client is
# allowed to see in terms of broadcasted order flow
# updates per dialog.
_router.subscribers[fqsn].add(client_stream)
# trigger scan and exec loop
n.start_soon(
clear_dark_triggers,
_router,
brokerd_stream, brokerd_stream,
quote_stream, fqsn,
broker, feed,
fqsn, # form: <name>.<venue>.<suffix>.<broker> dark_book,
book _router,
) )
finally:
# try to remove client from subscription registry
_router.subscribers[fqsn].remove(client_stream)
# start inbound (from attached client) order request processing for oid, client_streams in _router.dialogs.items():
# main entrypoint, run here until cancelled. client_streams.discard(client_stream)
try:
await process_client_order_cmds(
client_stream,
brokerd_stream,
fqsn,
feed,
dark_book,
_router,
)
finally: # TODO: for order dialogs left "alive" in
# try to remove client from "registry" # the ems this is where we should allow some
try: # system to take over management. Likely we
_router.clients.remove(client_stream) # want to allow the user to choose what kind
except KeyError: # of policy to use (eg. cancel all orders
# from client, run some algo, etc.)
if not client_streams:
log.warning( log.warning(
f'Stream {client_stream._ctx.chan.uid}' f'Order dialog is not being monitored:\n'
' was already dropped?' f'{oid} ->\n{client_stream._ctx.chan.uid}'
) )
_router.subscribers[fqsn].remove(client_stream)
dialogs = _router.dialogs
for oid, client_streams in dialogs.items():
if client_stream in client_streams:
client_streams.remove(client_stream)
# TODO: for order dialogs left "alive" in
# the ems this is where we should allow some
# system to take over management. Likely we
# want to allow the user to choose what kind
# of policy to use (eg. cancel all orders
# from client, run some algo, etc.)
if not client_streams:
log.warning(
f'Order dialog is being unmonitored:\n'
f'{oid} ->\n{client_stream._ctx.chan.uid}'
)

View File

@ -49,10 +49,13 @@ class Struct(
hasattr(sys, 'ps1') hasattr(sys, 'ps1')
# TODO: check if we're in pdb # TODO: check if we're in pdb
): ):
return f'Struct({pformat(self.to_dict())})' return self.pformat()
return super().__repr__() return super().__repr__()
def pformat(self) -> str:
return f'Struct({pformat(self.to_dict())})'
def copy( def copy(
self, self,
update: Optional[dict] = None, update: Optional[dict] = None,

View File

@ -0,0 +1,95 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Notifications utils.
"""
import os
import platform
import subprocess
from typing import Optional
import trio
from ..log import get_logger
from ..clearing._messages import (
Status,
)
log = get_logger(__name__)
_dbus_uid: Optional[str] = ''
async def notify_from_ems_status_msg(
msg: Status,
duration: int = 3000,
is_subproc: bool = False,
) -> None:
'''
Send a linux desktop notification.
Handle subprocesses by discovering the dbus user id
on first call.
'''
if platform.system() != "Linux":
return
# TODO: this in another task?
# not sure if this will ever be a bottleneck,
# we probably could do graphics stuff first tho?
if is_subproc:
global _dbus_uid
if not _dbus_uid:
su = os.environ['SUDO_USER']
# TODO: use `trio` but we need to use nursery.start()
# to use pipes?
# result = await trio.run_process(
result = subprocess.run(
[
'id',
'-u',
su,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
# check=True
)
_dbus_uid = result.stdout.decode("utf-8").replace('\n', '')
os.environ['DBUS_SESSION_BUS_ADDRESS'] = (
f'unix:path=/run/user/{_dbus_uid}/bus'
)
result = await trio.run_process(
[
'notify-send',
'-u', 'normal',
'-t', f'{duration}',
'piker',
# TODO: add in standard fill/exec info that maybe we
# pack in a broker independent way?
f"'{msg.pformat()}'",
],
)
log.runtime(result)

View File

@ -42,7 +42,8 @@ from ._anchors import (
gpath_pin, gpath_pin,
) )
from ..calc import humanize, pnl, puterize from ..calc import humanize, pnl, puterize
from ..clearing._allocate import Allocator, Position from ..clearing._allocate import Allocator
from ..pp import Position
from ..data._normalize import iterticks from ..data._normalize import iterticks
from ..data.feed import Feed from ..data.feed import Feed
from ..data.types import Struct from ..data.types import Struct

View File

@ -23,7 +23,6 @@ from contextlib import asynccontextmanager
from dataclasses import dataclass, field from dataclasses import dataclass, field
from functools import partial from functools import partial
from pprint import pformat from pprint import pformat
import platform
import time import time
from typing import ( from typing import (
Optional, Optional,
@ -64,6 +63,7 @@ from ..clearing._messages import (
BrokerdPosition, BrokerdPosition,
) )
from ._forms import open_form_input_handling from ._forms import open_form_input_handling
from ._notify import notify_from_ems_status_msg
if TYPE_CHECKING: if TYPE_CHECKING:
@ -433,7 +433,7 @@ class OrderMode:
size = dialog.order.size size = dialog.order.size
# NOTE: sends modified order msg to EMS # NOTE: sends modified order msg to EMS
self.book.update( self.book.send_update(
uuid=line.dialog.uuid, uuid=line.dialog.uuid,
price=level, price=level,
size=size, size=size,
@ -530,39 +530,6 @@ class OrderMode:
else: else:
log.warn("No line(s) for order {uuid}!?") log.warn("No line(s) for order {uuid}!?")
async def on_exec(
self,
uuid: str,
msg: Status,
) -> None:
# DESKTOP NOTIFICATIONS
#
# TODO: this in another task?
# not sure if this will ever be a bottleneck,
# we probably could do graphics stuff first tho?
# TODO: make this not trash.
# XXX: linux only for now
if platform.system() == "Windows":
return
result = await trio.run_process(
[
'notify-send',
'-u', 'normal',
'-t', '1616',
'piker',
# TODO: add in standard fill/exec info that maybe we
# pack in a broker independent way?
f'{msg.resp}: {msg.req.price}',
],
)
log.runtime(result)
def on_cancel( def on_cancel(
self, self,
uuid: str uuid: str
@ -1064,7 +1031,7 @@ async def process_trade_msg(
) )
mode.lines.remove_line(uuid=oid) mode.lines.remove_line(uuid=oid)
msg.req = req msg.req = req
await mode.on_exec(oid, msg) await notify_from_ems_status_msg(msg)
# response to completed 'dialog' for order request # response to completed 'dialog' for order request
case Status( case Status(
@ -1073,19 +1040,19 @@ async def process_trade_msg(
req=req, req=req,
): ):
msg.req = Order(**req) msg.req = Order(**req)
await mode.on_exec(oid, msg) await notify_from_ems_status_msg(msg)
mode.lines.remove_line(uuid=oid) mode.lines.remove_line(uuid=oid)
# each clearing tick is responded individually # each clearing tick is responded individually
case Status(resp='fill'): case Status(resp='fill'):
# handle out-of-piker fills reporting? # handle out-of-piker fills reporting?
known_order = book._sent_orders.get(oid) order: Order = book._sent_orders.get(oid)
if not known_order: if not order:
log.warning(f'order {oid} is unknown') log.warning(f'order {oid} is unknown')
return order = msg.req
action = known_order.action action = order.action
details = msg.brokerd_msg details = msg.brokerd_msg
# TODO: some kinda progress system # TODO: some kinda progress system
@ -1110,7 +1077,9 @@ async def process_trade_msg(
), ),
) )
# TODO: how should we look this up? # TODO: append these fill events to the position's clear
# table?
# tracker = mode.trackers[msg['account']] # tracker = mode.trackers[msg['account']]
# tracker.live_pp.fills.append(msg) # tracker.live_pp.fills.append(msg)