Comments clean and improvments

ems_to_bidir_streaming
Tyler Goodlet 2021-06-10 08:24:10 -04:00
parent a1f605bd52
commit b06cb5bb5a
5 changed files with 49 additions and 59 deletions

View File

@ -248,7 +248,6 @@ async def maybe_spawn_daemon(
if pikerd_portal is None:
# we are root so spawn brokerd directly in our tree
# the root nursery is accessed through process global state
# await spawn_brokerd(brokername, loglevel=loglevel)
await spawn_func(**spawn_args)
else:

View File

@ -223,12 +223,11 @@ async def open_ems(
# connect to emsd
portal.open_context(
_emsd_main,
broker=broker,
symbol=symbol.key,
# TODO: ``first`` here should be the active orders/execs
# persistent on the ems so that loca UI's can be populated.
) as (ctx, positions),
# open 2-way trade command stream

View File

@ -124,15 +124,12 @@ _DEFAULT_SIZE: float = 1.0
async def clear_dark_triggers(
# ctx: tractor.Context,
brokerd_orders_stream: tractor.MsgStream,
ems_client_order_stream: tractor.MsgStream,
quote_stream: tractor.ReceiveMsgStream, # noqa
broker: str,
symbol: str,
# client: 'Client', # noqa
# order_msg_stream: 'Client', # noqa
book: _DarkBook,
@ -189,8 +186,7 @@ async def clear_dark_triggers(
# message back to the requesting ems client
resp = 'alert_triggered'
else:
# executable order submission
else: # executable order submission
# submit_price = price + price*percent_away
submit_price = price + abs_diff_away
@ -199,18 +195,17 @@ async def clear_dark_triggers(
f'Dark order triggered for price {price}\n'
f'Submitting order @ price {submit_price}')
# TODO: port to BrokerdOrder message sending
msg = BrokerdOrder(
action=cmd['action'],
oid=oid,
time_ns=time.time_ns(),
# this is a brand new order request for the
# this **creates** new order request for the
# underlying broker so we set a "broker
# request id" (brid) to "nothing" so that the
# broker client knows that we aren't trying
# to modify an existing order-request.
# request id" (``reqid`` kwarg) to ``None``
# so that the broker client knows that we
# aren't trying to modify an existing
# order-request and instead create a new one.
reqid=None,
symbol=sym,
@ -218,13 +213,21 @@ async def clear_dark_triggers(
size=cmd['size'],
)
await brokerd_orders_stream.send(msg.dict())
# mark this entry as having send an order request
# mark this entry as having sent an order
# request. the entry will be replaced once the
# target broker replies back with
# a ``BrokerdOrderAck`` msg including the
# allocated unique ``BrokerdOrderAck.reqid`` key
# generated by the broker's own systems.
book._ems_entries[oid] = msg
# our internal status value for client-side
# triggered "dark orders"
resp = 'dark_triggered'
msg = Status(
oid=oid, # piker order id
oid=oid, # ems order id
resp=resp,
time_ns=time.time_ns(),
@ -340,11 +343,11 @@ async def translate_and_relay_brokerd_events(
# initial response to brokerd order request
if name == 'ack':
# register the brokerd request id (that was likely
# generated internally) with our locall ems order id for
# reverse lookup later. a BrokerdOrderAck **must** be
# sent after an order request in order to establish this
# id mapping.
# register the brokerd request id (that was generated
# / created internally by the broker backend) with our
# local ems order id for reverse lookup later.
# a ``BrokerdOrderAck`` **must** be sent after an order
# request in order to establish this id mapping.
book._ems2brokerd_ids[oid] = reqid
# new order which has not yet be registered into the
@ -455,16 +458,17 @@ async def translate_and_relay_brokerd_events(
else:
raise ValueError(f'Brokerd message {brokerd_msg} is invalid')
# Create and relay EMS response status message
resp = Status(
# Create and relay response status message
# to requesting EMS client
await ems_client_order_stream.send(
Status(
oid=oid,
resp=resp,
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=broker_details,
).dict()
)
# relay response to requesting EMS client
await ems_client_order_stream.send(resp.dict())
async def process_client_order_cmds(
@ -509,9 +513,9 @@ async def process_client_order_cmds(
await brokerd_order_stream.send(msg.dict())
else:
# might be a cancel for order that hasn't been acked yet
# by brokerd so register a cancel for then the order
# does show up later
# this might be a cancel for an order that hasn't been
# acked yet by a brokerd, so register a cancel for when
# the order ack does show up later
dark_book._ems_entries[oid] = msg
# check for EMS active exec
@ -552,10 +556,9 @@ async def process_client_order_cmds(
# if we already had a broker order id then
# this is likely an order update commmand.
log.info(f"Modifying order: {live_entry.reqid}")
log.info(
f"Modifying live {broker} order: {live_entry.reqid}")
# TODO: port to BrokerdOrder message sending
# register broker id for ems id
msg = BrokerdOrder(
oid=oid, # no ib support for oids...
time_ns=time.time_ns(),
@ -575,7 +578,7 @@ async def process_client_order_cmds(
# (``translate_and_relay_brokerd_events()`` above) will
# handle relaying the ems side responses back to
# the client/cmd sender from this request
print(f'sending live order {msg}')
log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
await brokerd_order_stream.send(msg.dict())
# an immediate response should be brokerd ack with order
@ -653,14 +656,13 @@ async def process_client_order_cmds(
async def _emsd_main(
ctx: tractor.Context,
# client_actor_name: str,
broker: str,
symbol: str,
_exec_mode: str = 'dark', # ('paper', 'dark', 'live')
loglevel: str = 'info',
) -> None:
"""EMS (sub)actor entrypoint providing the
'''EMS (sub)actor entrypoint providing the
execution management (micro)service which conducts broker
order control on behalf of clients.
@ -693,12 +695,13 @@ async def _emsd_main(
accepts order cmds from requesting piker clients, registers
execs with exec loop
"""
# from ._client import send_order_cmds
'''
global _router
dark_book = _router.get_dark_book(broker)
# TODO: would be nice if in tractor we can require either a ctx arg,
# or a named arg with ctx in it and a type annotation of
# tractor.Context instead of strictly requiring a ctx arg.
ems_ctx = ctx
cached_feed = _router.feeds.get((broker, symbol))
@ -742,9 +745,9 @@ async def _emsd_main(
_exec_mode = 'paper'
log.warning(f'Entering paper trading mode for {broker}')
# load the paper trading engine inside the brokerd
# actor to simulate the real load it'll likely be under
# when also pulling data from feeds
# load the paper trading engine as a subactor of this emsd
# actor to simulate the real IPC load it'll have when also
# pulling data from feeds
open_trades_endpoint = paper.open_paperboi(
broker=broker,
symbol=symbol,
@ -814,7 +817,7 @@ class _Router(BaseModel):
'''
nursery: trio.Nursery
feeds: dict[str, tuple[trio.CancelScope, float]] = {}
feeds: dict[tuple[str, str], data.feed.Feed] = {}
books: dict[str, _DarkBook] = {}
class Config:
@ -842,8 +845,9 @@ async def _setup_persistent_emsd(
global _router
# spawn one task per broker feed
# open a root "service nursery" for the ``emsd`` actor
async with trio.open_nursery() as service_nursery:
_router = _Router(nursery=service_nursery)
# TODO: send back the full set of persistent orders/execs persistent

View File

@ -216,7 +216,6 @@ class PaperBoi:
time_ns=time.time_ns(),
status='filled',
# broker=self.broker,
filled=size,
remaining=0 if order_complete else remaining,
@ -224,7 +223,6 @@ class PaperBoi:
size=size,
price=price,
# broker=self.broker,
broker_details={
'paper_info': {
'oid': oid,
@ -321,12 +319,6 @@ async def simulate_fills(
break
# class MockBrokerdMsgStream:
# async def MockContext(*args, **kwargs):
async def handle_order_requests(
client: PaperBoi,

View File

@ -127,10 +127,6 @@ class OrderMode:
"""
line = self.lines.commit_line(uuid)
# req_msg = self.book._sent_orders.get(uuid)
# if req_msg:
# req_msg.ack_time_ns = time.time_ns()
return line
def on_fill(