Clear out old commented code
parent
8e8a005128
commit
a1f605bd52
|
@ -26,7 +26,6 @@ from typing import AsyncIterator, Callable
|
||||||
from bidict import bidict
|
from bidict import bidict
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
import trio
|
import trio
|
||||||
from trio_typing import TaskStatus
|
|
||||||
import tractor
|
import tractor
|
||||||
|
|
||||||
from .. import data
|
from .. import data
|
||||||
|
@ -224,44 +223,6 @@ async def clear_dark_triggers(
|
||||||
|
|
||||||
resp = 'dark_triggered'
|
resp = 'dark_triggered'
|
||||||
|
|
||||||
# an internal brokerd-broker specific
|
|
||||||
# order-request id is expected to be generated
|
|
||||||
|
|
||||||
# reqid = await client.submit_limit(
|
|
||||||
|
|
||||||
# oid=oid,
|
|
||||||
|
|
||||||
# # this is a brand new order request for the
|
|
||||||
# # underlying broker so we set a "broker
|
|
||||||
# # request id" (brid) to "nothing" so that the
|
|
||||||
# # broker client knows that we aren't trying
|
|
||||||
# # to modify an existing order-request.
|
|
||||||
# brid=None,
|
|
||||||
|
|
||||||
# symbol=sym,
|
|
||||||
# action=cmd['action'],
|
|
||||||
# price=submit_price,
|
|
||||||
# size=cmd['size'],
|
|
||||||
# )
|
|
||||||
|
|
||||||
# # register broker request id to ems id
|
|
||||||
|
|
||||||
# else:
|
|
||||||
# # alerts have no broker request id
|
|
||||||
# reqid = ''
|
|
||||||
|
|
||||||
# resp = {
|
|
||||||
# 'resp': 'dark_executed',
|
|
||||||
# 'cmd': cmd, # original request message
|
|
||||||
|
|
||||||
# 'time_ns': time.time_ns(),
|
|
||||||
# 'trigger_price': price,
|
|
||||||
|
|
||||||
# 'broker_reqid': reqid,
|
|
||||||
# 'broker': broker,
|
|
||||||
# 'oid': oid, # piker order id
|
|
||||||
|
|
||||||
# }
|
|
||||||
msg = Status(
|
msg = Status(
|
||||||
oid=oid, # piker order id
|
oid=oid, # piker order id
|
||||||
resp=resp,
|
resp=resp,
|
||||||
|
@ -270,7 +231,6 @@ async def clear_dark_triggers(
|
||||||
symbol=symbol,
|
symbol=symbol,
|
||||||
trigger_price=price,
|
trigger_price=price,
|
||||||
|
|
||||||
# broker_reqid=reqid,
|
|
||||||
broker_details={'name': broker},
|
broker_details={'name': broker},
|
||||||
|
|
||||||
cmd=cmd, # original request message
|
cmd=cmd, # original request message
|
||||||
|
@ -281,7 +241,6 @@ async def clear_dark_triggers(
|
||||||
log.info(f'removing pred for {oid}')
|
log.info(f'removing pred for {oid}')
|
||||||
execs.pop(oid)
|
execs.pop(oid)
|
||||||
|
|
||||||
# await ctx.send_yield(resp)
|
|
||||||
await ems_client_order_stream.send(msg)
|
await ems_client_order_stream.send(msg)
|
||||||
|
|
||||||
else: # condition scan loop complete
|
else: # condition scan loop complete
|
||||||
|
@ -292,51 +251,6 @@ async def clear_dark_triggers(
|
||||||
# print(f'execs scan took: {time.time() - start}')
|
# print(f'execs scan took: {time.time() - start}')
|
||||||
|
|
||||||
|
|
||||||
# async def start_clearing(
|
|
||||||
|
|
||||||
# # ctx: tractor.Context,
|
|
||||||
# brokerd_order_stream: tractor.MsgStream,
|
|
||||||
# quote_stream: tractor.MsgStream,
|
|
||||||
|
|
||||||
# # client: 'Client',
|
|
||||||
|
|
||||||
# # feed: 'Feed', # noqa
|
|
||||||
# broker: str,
|
|
||||||
# symbol: str,
|
|
||||||
# _exec_mode: str,
|
|
||||||
|
|
||||||
# book: _DarkBook,
|
|
||||||
|
|
||||||
# # task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED,
|
|
||||||
|
|
||||||
# ) -> AsyncIterator[dict]:
|
|
||||||
# """Main scan loop for order execution conditions and submission
|
|
||||||
# to brokers.
|
|
||||||
|
|
||||||
# """
|
|
||||||
# async with trio.open_nursery() as n:
|
|
||||||
|
|
||||||
# # trigger scan and exec loop
|
|
||||||
# n.start_soon(
|
|
||||||
# trigger_executions,
|
|
||||||
|
|
||||||
# brokerd_order_stream,
|
|
||||||
# quote_stream,
|
|
||||||
|
|
||||||
# broker,
|
|
||||||
# symbol,
|
|
||||||
# book
|
|
||||||
# # ctx,
|
|
||||||
# # client,
|
|
||||||
# )
|
|
||||||
|
|
||||||
# # # paper engine simulator task
|
|
||||||
# # if _exec_mode == 'paper':
|
|
||||||
# # # TODO: make this an actual broadcast channels as in:
|
|
||||||
# # # https://github.com/python-trio/trio/issues/987
|
|
||||||
# # n.start_soon(simulate_fills, quote_stream, client)
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: lots of cases still to handle
|
# TODO: lots of cases still to handle
|
||||||
# XXX: right now this is very very ad-hoc to IB
|
# XXX: right now this is very very ad-hoc to IB
|
||||||
# - short-sale but securities haven't been located, in this case we
|
# - short-sale but securities haven't been located, in this case we
|
||||||
|
@ -349,15 +263,11 @@ async def clear_dark_triggers(
|
||||||
|
|
||||||
async def translate_and_relay_brokerd_events(
|
async def translate_and_relay_brokerd_events(
|
||||||
|
|
||||||
# ctx: tractor.Context,
|
|
||||||
broker: str,
|
broker: str,
|
||||||
ems_client_order_stream: tractor.MsgStream,
|
ems_client_order_stream: tractor.MsgStream,
|
||||||
brokerd_trades_stream: tractor.MsgStream,
|
brokerd_trades_stream: tractor.MsgStream,
|
||||||
book: _DarkBook,
|
book: _DarkBook,
|
||||||
|
|
||||||
# feed: 'Feed', # noqa
|
|
||||||
task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED,
|
|
||||||
|
|
||||||
) -> AsyncIterator[dict]:
|
) -> AsyncIterator[dict]:
|
||||||
"""Trades update loop - receive updates from broker, convert
|
"""Trades update loop - receive updates from broker, convert
|
||||||
to EMS responses, transmit to ordering client(s).
|
to EMS responses, transmit to ordering client(s).
|
||||||
|
@ -377,27 +287,13 @@ async def translate_and_relay_brokerd_events(
|
||||||
{'presubmitted', 'submitted', 'cancelled', 'inactive'}
|
{'presubmitted', 'submitted', 'cancelled', 'inactive'}
|
||||||
|
|
||||||
"""
|
"""
|
||||||
# broker = feed.mod.name
|
|
||||||
|
|
||||||
# TODO: make this a context
|
|
||||||
# in the paper engine case this is just a mem receive channel
|
|
||||||
# async with feed.receive_trades_data() as brokerd_trades_stream:
|
|
||||||
|
|
||||||
# first = await brokerd_trades_stream.__anext__()
|
|
||||||
|
|
||||||
# startup msg expected as first from broker backend
|
|
||||||
# assert first['local_trades'] == 'start'
|
|
||||||
# task_status.started()
|
|
||||||
|
|
||||||
async for brokerd_msg in brokerd_trades_stream:
|
async for brokerd_msg in brokerd_trades_stream:
|
||||||
|
|
||||||
# name, msg = event['local_trades']
|
|
||||||
name = brokerd_msg['name']
|
name = brokerd_msg['name']
|
||||||
|
|
||||||
log.info(f'Received broker trade event:\n{pformat(brokerd_msg)}')
|
log.info(f'Received broker trade event:\n{pformat(brokerd_msg)}')
|
||||||
|
|
||||||
if name == 'position':
|
if name == 'position':
|
||||||
# msg['resp'] = 'position'
|
|
||||||
|
|
||||||
# relay through position msgs immediately
|
# relay through position msgs immediately
|
||||||
await ems_client_order_stream.send(
|
await ems_client_order_stream.send(
|
||||||
|
@ -476,34 +372,6 @@ async def translate_and_relay_brokerd_events(
|
||||||
# a live flow now exists
|
# a live flow now exists
|
||||||
oid = entry.oid
|
oid = entry.oid
|
||||||
|
|
||||||
# make response packet to EMS client(s)
|
|
||||||
# reqid = book._ems_entries.get(oid)
|
|
||||||
|
|
||||||
# # msg is for unknown emsd order id
|
|
||||||
# if oid is None:
|
|
||||||
# oid = msg['oid']
|
|
||||||
|
|
||||||
# # XXX: paper clearing special cases
|
|
||||||
# # paper engine race case: ``Client.submit_limit()`` hasn't
|
|
||||||
# # returned yet and provided an output reqid to register
|
|
||||||
# # locally, so we need to retreive the oid that was already
|
|
||||||
# # packed at submission since we already know it ahead of
|
|
||||||
# # time
|
|
||||||
# paper = msg.get('paper_info')
|
|
||||||
# if paper:
|
|
||||||
# oid = paper['oid']
|
|
||||||
|
|
||||||
# else:
|
|
||||||
# msg.get('external')
|
|
||||||
# if not msg:
|
|
||||||
# log.error(f"Unknown trade event {event}")
|
|
||||||
|
|
||||||
# continue
|
|
||||||
|
|
||||||
# resp = {
|
|
||||||
# 'resp': None, # placeholder
|
|
||||||
# 'oid': oid
|
|
||||||
# }
|
|
||||||
resp = None
|
resp = None
|
||||||
broker_details = {}
|
broker_details = {}
|
||||||
|
|
||||||
|
@ -551,7 +419,6 @@ async def translate_and_relay_brokerd_events(
|
||||||
|
|
||||||
# everyone doin camel case
|
# everyone doin camel case
|
||||||
msg = BrokerdStatus(**brokerd_msg)
|
msg = BrokerdStatus(**brokerd_msg)
|
||||||
# status = msg['status'].lower()
|
|
||||||
|
|
||||||
if msg.status == 'filled':
|
if msg.status == 'filled':
|
||||||
|
|
||||||
|
@ -563,12 +430,10 @@ async def translate_and_relay_brokerd_events(
|
||||||
|
|
||||||
log.info(f'Execution for {oid} is complete!')
|
log.info(f'Execution for {oid} is complete!')
|
||||||
|
|
||||||
|
|
||||||
# just log it
|
# just log it
|
||||||
else:
|
else:
|
||||||
log.info(f'{broker} filled {msg}')
|
log.info(f'{broker} filled {msg}')
|
||||||
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# one of {submitted, cancelled}
|
# one of {submitted, cancelled}
|
||||||
resp = 'broker_' + msg.status
|
resp = 'broker_' + msg.status
|
||||||
|
@ -604,13 +469,11 @@ async def translate_and_relay_brokerd_events(
|
||||||
|
|
||||||
async def process_client_order_cmds(
|
async def process_client_order_cmds(
|
||||||
|
|
||||||
# ctx: tractor.Context,
|
|
||||||
client_order_stream: tractor.MsgStream, # noqa
|
client_order_stream: tractor.MsgStream, # noqa
|
||||||
brokerd_order_stream: tractor.MsgStream,
|
brokerd_order_stream: tractor.MsgStream,
|
||||||
|
|
||||||
symbol: str,
|
symbol: str,
|
||||||
feed: 'Feed', # noqa
|
feed: 'Feed', # noqa
|
||||||
# client: 'Client', # noqa
|
|
||||||
dark_book: _DarkBook,
|
dark_book: _DarkBook,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -643,7 +506,6 @@ async def process_client_order_cmds(
|
||||||
|
|
||||||
# NOTE: cancel response will be relayed back in messages
|
# NOTE: cancel response will be relayed back in messages
|
||||||
# from corresponding broker
|
# from corresponding broker
|
||||||
# await client.submit_cancel(reqid=reqid)
|
|
||||||
await brokerd_order_stream.send(msg.dict())
|
await brokerd_order_stream.send(msg.dict())
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
@ -675,12 +537,6 @@ async def process_client_order_cmds(
|
||||||
|
|
||||||
msg = Order(**cmd)
|
msg = Order(**cmd)
|
||||||
|
|
||||||
# sym = cmd['symbol']
|
|
||||||
# trigger_price = cmd['price']
|
|
||||||
# size = cmd['size']
|
|
||||||
# brokers = cmd['brokers']
|
|
||||||
# exec_mode = cmd['exec_mode']
|
|
||||||
|
|
||||||
sym = msg.symbol
|
sym = msg.symbol
|
||||||
trigger_price = msg.price
|
trigger_price = msg.price
|
||||||
size = msg.size
|
size = msg.size
|
||||||
|
@ -722,20 +578,6 @@ async def process_client_order_cmds(
|
||||||
print(f'sending live order {msg}')
|
print(f'sending live order {msg}')
|
||||||
await brokerd_order_stream.send(msg.dict())
|
await brokerd_order_stream.send(msg.dict())
|
||||||
|
|
||||||
# order_id = await client.submit_limit(
|
|
||||||
|
|
||||||
# oid=oid, # no ib support for oids...
|
|
||||||
|
|
||||||
# # if this is None, creates a new order
|
|
||||||
# # otherwise will modify any existing one
|
|
||||||
# brid=brid,
|
|
||||||
|
|
||||||
# symbol=sym,
|
|
||||||
# action=action,
|
|
||||||
# price=trigger_price,
|
|
||||||
# size=size,
|
|
||||||
# )
|
|
||||||
|
|
||||||
# an immediate response should be brokerd ack with order
|
# an immediate response should be brokerd ack with order
|
||||||
# id but we register our request as part of the flow
|
# id but we register our request as part of the flow
|
||||||
dark_book._ems_entries[oid] = msg
|
dark_book._ems_entries[oid] = msg
|
||||||
|
@ -793,14 +635,6 @@ async def process_client_order_cmds(
|
||||||
abs_diff_away
|
abs_diff_away
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: if the predicate resolves immediately send the
|
|
||||||
# execution to the broker asap? Or no?
|
|
||||||
|
|
||||||
# ack-response that order is live in EMS
|
|
||||||
# await ctx.send_yield(
|
|
||||||
# {'resp': 'dark_submitted',
|
|
||||||
# 'oid': oid}
|
|
||||||
# )
|
|
||||||
if action == 'alert':
|
if action == 'alert':
|
||||||
resp = 'alert_submitted'
|
resp = 'alert_submitted'
|
||||||
else:
|
else:
|
||||||
|
@ -846,8 +680,9 @@ async def _emsd_main(
|
||||||
sets up brokerd feed, order feed with ems client, trades dialogue with
|
sets up brokerd feed, order feed with ems client, trades dialogue with
|
||||||
brokderd trading api.
|
brokderd trading api.
|
||||||
|
|
|
|
||||||
- ``start_clearing()``:
|
- ``clear_dark_triggers()``:
|
||||||
run (dark) conditions on inputs and trigger broker submissions
|
run (dark order) conditions on inputs and trigger brokerd "live"
|
||||||
|
order submissions.
|
||||||
|
|
|
|
||||||
- ``translate_and_relay_brokerd_events()``:
|
- ``translate_and_relay_brokerd_events()``:
|
||||||
accept normalized trades responses from brokerd, process and
|
accept normalized trades responses from brokerd, process and
|
||||||
|
@ -899,6 +734,10 @@ async def _emsd_main(
|
||||||
|
|
||||||
if trades_endpoint is None or _exec_mode == 'paper':
|
if trades_endpoint is None or _exec_mode == 'paper':
|
||||||
|
|
||||||
|
# for paper mode we need to mock this trades response feed
|
||||||
|
# so we load bidir stream to a new sub-actor running a
|
||||||
|
# paper-simulator clearing engine.
|
||||||
|
|
||||||
# load the paper trading engine
|
# load the paper trading engine
|
||||||
_exec_mode = 'paper'
|
_exec_mode = 'paper'
|
||||||
log.warning(f'Entering paper trading mode for {broker}')
|
log.warning(f'Entering paper trading mode for {broker}')
|
||||||
|
@ -912,14 +751,6 @@ async def _emsd_main(
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
)
|
)
|
||||||
|
|
||||||
# for paper mode we need to mock this trades response feed
|
|
||||||
# so we pass a duck-typed feed-looking mem chan which is fed
|
|
||||||
# fill and submission events from the exec loop
|
|
||||||
# feed._trade_stream = client.trade_stream
|
|
||||||
|
|
||||||
# init the trades stream
|
|
||||||
# client._to_trade_stream.send_nowait({'local_trades': 'start'})
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# open live brokerd trades endpoint
|
# open live brokerd trades endpoint
|
||||||
open_trades_endpoint = portal.open_context(
|
open_trades_endpoint = portal.open_context(
|
||||||
|
@ -931,36 +762,9 @@ async def _emsd_main(
|
||||||
open_trades_endpoint as (brokerd_ctx, positions),
|
open_trades_endpoint as (brokerd_ctx, positions),
|
||||||
brokerd_ctx.open_stream() as brokerd_trades_stream,
|
brokerd_ctx.open_stream() as brokerd_trades_stream,
|
||||||
):
|
):
|
||||||
|
|
||||||
# if trades_endpoint is not None and _exec_mode != 'paper':
|
|
||||||
|
|
||||||
# # TODO: open a bidir stream here?
|
|
||||||
# # we have an order API for this broker
|
|
||||||
# client = client_factory(feed._brokerd_portal)
|
|
||||||
|
|
||||||
# else:
|
|
||||||
|
|
||||||
# return control to parent task
|
|
||||||
# task_status.started((first_quote, feed, client))
|
|
||||||
|
|
||||||
# stream = feed.stream
|
|
||||||
|
|
||||||
# start the real-time clearing condition scan loop and
|
|
||||||
# paper engine simulator.
|
|
||||||
|
|
||||||
# n.start_soon(
|
|
||||||
# start_clearing,
|
|
||||||
# brokerd_trades_stream,
|
|
||||||
# feed.stream, # quote stream
|
|
||||||
# # client,
|
|
||||||
# broker,
|
|
||||||
# symbol,
|
|
||||||
# _exec_mode,
|
|
||||||
# book,
|
|
||||||
# )
|
|
||||||
|
|
||||||
# signal to client that we're started
|
# signal to client that we're started
|
||||||
# TODO: we could send back **all** brokerd positions here?
|
# TODO: we could eventually send back **all** brokerd
|
||||||
|
# positions here?
|
||||||
await ems_ctx.started(positions)
|
await ems_ctx.started(positions)
|
||||||
|
|
||||||
# establish 2-way stream with requesting order-client and
|
# establish 2-way stream with requesting order-client and
|
||||||
|
@ -978,14 +782,15 @@ async def _emsd_main(
|
||||||
broker,
|
broker,
|
||||||
symbol,
|
symbol,
|
||||||
book
|
book
|
||||||
# ctx,
|
|
||||||
# client,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# begin processing order events from the target brokerd backend
|
# begin processing order events from the target brokerd backend
|
||||||
|
# by receiving order submission response messages,
|
||||||
|
# normalizing them to EMS messages and relaying back to
|
||||||
|
# the piker order client.
|
||||||
n.start_soon(
|
n.start_soon(
|
||||||
|
|
||||||
translate_and_relay_brokerd_events,
|
translate_and_relay_brokerd_events,
|
||||||
|
|
||||||
broker,
|
broker,
|
||||||
ems_client_order_stream,
|
ems_client_order_stream,
|
||||||
brokerd_trades_stream,
|
brokerd_trades_stream,
|
||||||
|
|
Loading…
Reference in New Issue