WIP single brokerd dialogue
parent
2465c8fc78
commit
d6d7c24320
|
@ -27,12 +27,14 @@ from typing import AsyncIterator, Callable, Any
|
||||||
from bidict import bidict
|
from bidict import bidict
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
import trio
|
import trio
|
||||||
|
from trio_typing import TaskStatus
|
||||||
import tractor
|
import tractor
|
||||||
|
|
||||||
from .. import data
|
from .. import data
|
||||||
from ..log import get_logger
|
from ..log import get_logger
|
||||||
from ..data._normalize import iterticks
|
from ..data._normalize import iterticks
|
||||||
from ..data.feed import Feed
|
from ..data.feed import Feed
|
||||||
|
from .._daemon import maybe_spawn_brokerd
|
||||||
from . import _paper_engine as paper
|
from . import _paper_engine as paper
|
||||||
from ._messages import (
|
from ._messages import (
|
||||||
Status, Order,
|
Status, Order,
|
||||||
|
@ -82,15 +84,16 @@ def mk_check(
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class _DarkBook:
|
class _DarkBook:
|
||||||
"""Client-side execution book.
|
'''EMS-trigger execution book.
|
||||||
|
|
||||||
Contains conditions for executions (aka "orders") which are not
|
Contains conditions for executions (aka "orders" or "triggers")
|
||||||
exposed to brokers and thus the market; i.e. these are privacy
|
which are not exposed to brokers and thus the market; i.e. these are
|
||||||
focussed "client side" orders.
|
privacy focussed "client side" orders which are submitted in real-time
|
||||||
|
based on specified trigger conditions.
|
||||||
|
|
||||||
A singleton instance is created per EMS actor (for now).
|
A an instance per `brokerd` is created per EMS actor (for now).
|
||||||
|
|
||||||
"""
|
'''
|
||||||
broker: str
|
broker: str
|
||||||
|
|
||||||
# levels which have an executable action (eg. alert, order, signal)
|
# levels which have an executable action (eg. alert, order, signal)
|
||||||
|
@ -256,17 +259,33 @@ async def clear_dark_triggers(
|
||||||
# print(f'execs scan took: {time.time() - start}')
|
# print(f'execs scan took: {time.time() - start}')
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class TradesRelay:
|
||||||
|
brokerd_dialogue: tractor.MsgStream
|
||||||
|
positions: dict[str, float]
|
||||||
|
consumers: int = 0
|
||||||
|
|
||||||
|
|
||||||
class _Router(BaseModel):
|
class _Router(BaseModel):
|
||||||
'''Order router which manages per-broker dark books, alerts,
|
'''Order router which manages and tracks per-broker dark book,
|
||||||
and clearing related data feed management.
|
alerts, clearing and related data feed management.
|
||||||
|
|
||||||
|
A singleton per ``emsd`` actor.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
# setup at actor spawn time
|
||||||
nursery: trio.Nursery
|
nursery: trio.Nursery
|
||||||
|
|
||||||
feeds: dict[tuple[str, str], Any] = {}
|
feeds: dict[tuple[str, str], Any] = {}
|
||||||
|
|
||||||
|
# broker to book map
|
||||||
books: dict[str, _DarkBook] = {}
|
books: dict[str, _DarkBook] = {}
|
||||||
|
|
||||||
|
# order id to client stream map
|
||||||
dialogues: dict[str, list[tractor.MsgStream]] = {}
|
dialogues: dict[str, list[tractor.MsgStream]] = {}
|
||||||
relays: dict[str, tuple[dict, tractor.MsgStream]] = {}
|
|
||||||
|
# brokername to trades-dialogues streams with ``brokerd`` actors
|
||||||
|
relays: dict[str, TradesRelay] = {}
|
||||||
|
|
||||||
class Config:
|
class Config:
|
||||||
arbitrary_types_allowed = True
|
arbitrary_types_allowed = True
|
||||||
|
@ -280,10 +299,166 @@ class _Router(BaseModel):
|
||||||
|
|
||||||
return self.books.setdefault(brokername, _DarkBook(brokername))
|
return self.books.setdefault(brokername, _DarkBook(brokername))
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def maybe_open_brokerd_trades_dialogue(
|
||||||
|
|
||||||
|
self,
|
||||||
|
feed: Feed,
|
||||||
|
symbol: str,
|
||||||
|
dark_book: _DarkBook,
|
||||||
|
_exec_mode: str,
|
||||||
|
loglevel: str,
|
||||||
|
|
||||||
|
) -> tuple[dict, tractor.MsgStream]:
|
||||||
|
'''Open and yield ``brokerd`` trades dialogue context-stream if none
|
||||||
|
already exists.
|
||||||
|
|
||||||
|
'''
|
||||||
|
relay = self.relays.get(feed.mod.name)
|
||||||
|
|
||||||
|
if relay is None:
|
||||||
|
|
||||||
|
relay = await self.nursery.start(
|
||||||
|
open_brokerd_trades_dialogue,
|
||||||
|
self,
|
||||||
|
feed,
|
||||||
|
symbol,
|
||||||
|
_exec_mode,
|
||||||
|
loglevel,
|
||||||
|
)
|
||||||
|
|
||||||
|
relay.consumers += 1
|
||||||
|
|
||||||
|
# TODO: get updated positions here?
|
||||||
|
assert relay.brokerd_dialogue
|
||||||
|
try:
|
||||||
|
yield relay
|
||||||
|
|
||||||
|
finally:
|
||||||
|
|
||||||
|
# TODO: what exactly needs to be torn down here or
|
||||||
|
# are we just consumer tracking?
|
||||||
|
|
||||||
|
relay.consumers -= 1
|
||||||
|
|
||||||
|
|
||||||
_router: _Router = None
|
_router: _Router = None
|
||||||
|
|
||||||
|
|
||||||
|
async def open_brokerd_trades_dialogue(
|
||||||
|
|
||||||
|
router: _Router,
|
||||||
|
feed: Feed,
|
||||||
|
symbol: str,
|
||||||
|
_exec_mode: str,
|
||||||
|
loglevel: str,
|
||||||
|
|
||||||
|
task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
|
) -> tuple[dict, tractor.MsgStream]:
|
||||||
|
'''Open and yield ``brokerd`` trades dialogue context-stream if none
|
||||||
|
already exists.
|
||||||
|
|
||||||
|
'''
|
||||||
|
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
|
||||||
|
|
||||||
|
broker = feed.mod.name
|
||||||
|
|
||||||
|
# TODO: make a `tractor` bug about this!
|
||||||
|
# portal = feed._brokerd_portal
|
||||||
|
|
||||||
|
# XXX: we must have our own portal + channel otherwise
|
||||||
|
# when the data feed closes it may result in a half-closed/fucked
|
||||||
|
# channel that the brokerd side thinks is still open somehow!?
|
||||||
|
async with maybe_spawn_brokerd(
|
||||||
|
|
||||||
|
broker,
|
||||||
|
loglevel=loglevel,
|
||||||
|
|
||||||
|
) as portal:
|
||||||
|
|
||||||
|
if trades_endpoint is None or _exec_mode == 'paper':
|
||||||
|
|
||||||
|
# for paper mode we need to mock this trades response feed
|
||||||
|
# so we load bidir stream to a new sub-actor running a
|
||||||
|
# paper-simulator clearing engine.
|
||||||
|
|
||||||
|
# load the paper trading engine
|
||||||
|
_exec_mode = 'paper'
|
||||||
|
log.warning(f'Entering paper trading mode for {broker}')
|
||||||
|
|
||||||
|
# load the paper trading engine as a subactor of this emsd
|
||||||
|
# actor to simulate the real IPC load it'll have when also
|
||||||
|
# pulling data from feeds
|
||||||
|
open_trades_endpoint = paper.open_paperboi(
|
||||||
|
broker=broker,
|
||||||
|
symbol=symbol,
|
||||||
|
loglevel=loglevel,
|
||||||
|
)
|
||||||
|
|
||||||
|
else:
|
||||||
|
# open live brokerd trades endpoint
|
||||||
|
open_trades_endpoint = portal.open_context(
|
||||||
|
trades_endpoint,
|
||||||
|
loglevel=loglevel,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with (
|
||||||
|
|
||||||
|
open_trades_endpoint as (brokerd_ctx, positions),
|
||||||
|
brokerd_ctx.open_stream() as brokerd_trades_stream,
|
||||||
|
|
||||||
|
):
|
||||||
|
# XXX: really we only want one stream per `emsd` actor
|
||||||
|
# to relay global `brokerd` order events unless we're
|
||||||
|
# doing to expect each backend to relay only orders
|
||||||
|
# affiliated with a particular ``trades_dialogue()``
|
||||||
|
# session (seems annoying for implementers). So, here
|
||||||
|
# we cache the relay task and instead of running multiple
|
||||||
|
# tasks (which will result in multiples of the same msg being
|
||||||
|
# relayed for each EMS client) we just register each client
|
||||||
|
# stream to this single relay loop using _router.dialogues
|
||||||
|
|
||||||
|
# begin processing order events from the target brokerd backend
|
||||||
|
# by receiving order submission response messages,
|
||||||
|
# normalizing them to EMS messages and relaying back to
|
||||||
|
# the piker order client set.
|
||||||
|
|
||||||
|
# with brokerd_trades_stream.shield():
|
||||||
|
|
||||||
|
relay = TradesRelay(
|
||||||
|
brokerd_dialogue=brokerd_trades_stream,
|
||||||
|
positions=positions,
|
||||||
|
consumers=1
|
||||||
|
)
|
||||||
|
|
||||||
|
_router.relays[broker] = relay
|
||||||
|
|
||||||
|
# the ems scan loop may be cancelled by the client but we
|
||||||
|
# want to keep the ``brokerd`` dialogue up regardless
|
||||||
|
|
||||||
|
task_status.started(relay)
|
||||||
|
|
||||||
|
await translate_and_relay_brokerd_events(
|
||||||
|
broker,
|
||||||
|
brokerd_trades_stream,
|
||||||
|
_router,
|
||||||
|
)
|
||||||
|
|
||||||
|
# this context should block here indefinitely until
|
||||||
|
# the ``brokerd`` task either dies or is cancelled
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# context must have been closed
|
||||||
|
# remove from cache so next client will respawn if needed
|
||||||
|
# print('BROKERD DIALOGUE KILLED!!?!?!')
|
||||||
|
# with trio.CancelScope(shield=True):
|
||||||
|
# await tractor.breakpoint()
|
||||||
|
# raise
|
||||||
|
_router.relays.pop(broker)
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def _setup_persistent_emsd(
|
async def _setup_persistent_emsd(
|
||||||
|
|
||||||
|
@ -301,17 +476,13 @@ async def _setup_persistent_emsd(
|
||||||
# TODO: send back the full set of persistent orders/execs persistent
|
# TODO: send back the full set of persistent orders/execs persistent
|
||||||
await ctx.started()
|
await ctx.started()
|
||||||
|
|
||||||
# we pin this task to keep the feeds manager active until the
|
|
||||||
# parent actor decides to tear it down
|
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
async def translate_and_relay_brokerd_events(
|
async def translate_and_relay_brokerd_events(
|
||||||
|
|
||||||
broker: str,
|
broker: str,
|
||||||
# ems_client_order_stream: tractor.MsgStream,
|
|
||||||
brokerd_trades_stream: tractor.MsgStream,
|
brokerd_trades_stream: tractor.MsgStream,
|
||||||
book: _DarkBook,
|
|
||||||
router: _Router,
|
router: _Router,
|
||||||
|
|
||||||
) -> AsyncIterator[dict]:
|
) -> AsyncIterator[dict]:
|
||||||
|
@ -334,6 +505,11 @@ async def translate_and_relay_brokerd_events(
|
||||||
{'presubmitted', 'submitted', 'cancelled', 'inactive'}
|
{'presubmitted', 'submitted', 'cancelled', 'inactive'}
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
book = router.get_dark_book(broker)
|
||||||
|
relay = router.relays[broker]
|
||||||
|
|
||||||
|
assert relay.brokerd_dialogue == brokerd_trades_stream
|
||||||
|
|
||||||
async for brokerd_msg in brokerd_trades_stream:
|
async for brokerd_msg in brokerd_trades_stream:
|
||||||
|
|
||||||
name = brokerd_msg['name']
|
name = brokerd_msg['name']
|
||||||
|
@ -342,13 +518,16 @@ async def translate_and_relay_brokerd_events(
|
||||||
|
|
||||||
if name == 'position':
|
if name == 'position':
|
||||||
|
|
||||||
|
pos_msg = BrokerdPosition(**brokerd_msg).dict()
|
||||||
|
|
||||||
|
# keep up to date locally in ``emsd``
|
||||||
|
relay.positions.update(pos_msg)
|
||||||
|
|
||||||
# relay through position msgs immediately by
|
# relay through position msgs immediately by
|
||||||
# broadcasting updates on all client streams
|
# broadcasting updates on all client streams
|
||||||
for oid, ems_client_order_stream in router.dialogues.items():
|
for oid, ems_client_order_stream in router.dialogues.items():
|
||||||
|
|
||||||
await ems_client_order_stream.send(
|
await ems_client_order_stream.send(pos_msg)
|
||||||
BrokerdPosition(**brokerd_msg).dict()
|
|
||||||
)
|
|
||||||
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
@ -425,7 +604,7 @@ async def translate_and_relay_brokerd_events(
|
||||||
|
|
||||||
resp = None
|
resp = None
|
||||||
broker_details = {}
|
broker_details = {}
|
||||||
client_flow_complete: bool = False
|
# client_flow_complete: bool = False
|
||||||
|
|
||||||
if name in (
|
if name in (
|
||||||
'error',
|
'error',
|
||||||
|
@ -460,7 +639,7 @@ async def translate_and_relay_brokerd_events(
|
||||||
|
|
||||||
if msg.status == 'cancelled':
|
if msg.status == 'cancelled':
|
||||||
|
|
||||||
client_flow_complete = True
|
# client_flow_complete = True
|
||||||
log.info(f'Cancellation for {oid} is complete!')
|
log.info(f'Cancellation for {oid} is complete!')
|
||||||
|
|
||||||
if msg.status == 'filled':
|
if msg.status == 'filled':
|
||||||
|
@ -473,7 +652,7 @@ async def translate_and_relay_brokerd_events(
|
||||||
|
|
||||||
# be sure to pop this stream from our dialogue set
|
# be sure to pop this stream from our dialogue set
|
||||||
# since the order dialogue should be done.
|
# since the order dialogue should be done.
|
||||||
client_flow_complete = True
|
# client_flow_complete = True
|
||||||
log.info(f'Execution for {oid} is complete!')
|
log.info(f'Execution for {oid} is complete!')
|
||||||
|
|
||||||
# just log it
|
# just log it
|
||||||
|
@ -514,11 +693,11 @@ async def translate_and_relay_brokerd_events(
|
||||||
).dict()
|
).dict()
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: do we want this to keep things cleaned up?
|
# TODO: do we want this to keep things cleaned up?
|
||||||
# it might require a special status from brokerd to affirm the
|
# it might require a special status from brokerd to affirm the
|
||||||
# flow is complete?
|
# flow is complete?
|
||||||
# if client_flow_complete:
|
# if client_flow_complete:
|
||||||
# router.dialogues.pop(oid)
|
# router.dialogues.pop(oid)
|
||||||
|
|
||||||
|
|
||||||
async def process_client_order_cmds(
|
async def process_client_order_cmds(
|
||||||
|
@ -655,7 +834,7 @@ async def process_client_order_cmds(
|
||||||
# flow so that if a cancel comes from the requesting
|
# flow so that if a cancel comes from the requesting
|
||||||
# client, before that ack, when the ack does arrive we
|
# client, before that ack, when the ack does arrive we
|
||||||
# immediately take the reqid from the broker and cancel
|
# immediately take the reqid from the broker and cancel
|
||||||
# that order with them immediately.
|
# that live order asap.
|
||||||
dark_book._ems_entries[oid] = msg
|
dark_book._ems_entries[oid] = msg
|
||||||
|
|
||||||
# "DARK" triggers
|
# "DARK" triggers
|
||||||
|
@ -725,102 +904,6 @@ async def process_client_order_cmds(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
|
||||||
async def maybe_open_brokerd_trades_dialogue(
|
|
||||||
|
|
||||||
router: _Router,
|
|
||||||
feed: Feed,
|
|
||||||
broker: str,
|
|
||||||
symbol: str,
|
|
||||||
dark_book: _DarkBook,
|
|
||||||
_exec_mode: str,
|
|
||||||
loglevel: str,
|
|
||||||
|
|
||||||
) -> tuple[dict, tractor.MsgStream]:
|
|
||||||
'''Open and yield ``brokerd`` trades dialogue context-stream if none
|
|
||||||
already exists.
|
|
||||||
|
|
||||||
'''
|
|
||||||
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
|
|
||||||
portal = feed._brokerd_portal
|
|
||||||
|
|
||||||
if broker in _router.relays:
|
|
||||||
|
|
||||||
positions, brokerd_trades_stream = _router.relays[broker]
|
|
||||||
|
|
||||||
# TODO: get updated positions here?
|
|
||||||
yield positions, brokerd_trades_stream
|
|
||||||
return
|
|
||||||
|
|
||||||
if trades_endpoint is None or _exec_mode == 'paper':
|
|
||||||
|
|
||||||
# for paper mode we need to mock this trades response feed
|
|
||||||
# so we load bidir stream to a new sub-actor running a
|
|
||||||
# paper-simulator clearing engine.
|
|
||||||
|
|
||||||
# load the paper trading engine
|
|
||||||
_exec_mode = 'paper'
|
|
||||||
log.warning(f'Entering paper trading mode for {broker}')
|
|
||||||
|
|
||||||
# load the paper trading engine as a subactor of this emsd
|
|
||||||
# actor to simulate the real IPC load it'll have when also
|
|
||||||
# pulling data from feeds
|
|
||||||
open_trades_endpoint = paper.open_paperboi(
|
|
||||||
broker=broker,
|
|
||||||
symbol=symbol,
|
|
||||||
loglevel=loglevel,
|
|
||||||
)
|
|
||||||
|
|
||||||
else:
|
|
||||||
# open live brokerd trades endpoint
|
|
||||||
open_trades_endpoint = portal.open_context(
|
|
||||||
trades_endpoint,
|
|
||||||
loglevel=loglevel,
|
|
||||||
)
|
|
||||||
|
|
||||||
async with (
|
|
||||||
|
|
||||||
open_trades_endpoint as (brokerd_ctx, positions),
|
|
||||||
brokerd_ctx.open_stream() as brokerd_trades_stream,
|
|
||||||
trio.open_nursery() as n,
|
|
||||||
|
|
||||||
):
|
|
||||||
# XXX: really we only want one stream per `emsd` actor
|
|
||||||
# to relay global `brokerd` order events unless we're
|
|
||||||
# doing to expect each backend to relay only orders
|
|
||||||
# affiliated with a particular ``trades_dialogue()``
|
|
||||||
# session (seems annoying for implementers). So, here
|
|
||||||
# we cache the relay task and instead of running multiple
|
|
||||||
# tasks (which will result in multiples of the same msg being
|
|
||||||
# relayed for each EMS client) we just register each client
|
|
||||||
# stream to this single relay loop using _router.dialogues
|
|
||||||
|
|
||||||
# begin processing order events from the target brokerd backend
|
|
||||||
# by receiving order submission response messages,
|
|
||||||
# normalizing them to EMS messages and relaying back to
|
|
||||||
# the piker order client set.
|
|
||||||
|
|
||||||
n.start_soon(
|
|
||||||
|
|
||||||
translate_and_relay_brokerd_events,
|
|
||||||
|
|
||||||
broker,
|
|
||||||
# ems_client_order_stream,
|
|
||||||
brokerd_trades_stream,
|
|
||||||
dark_book,
|
|
||||||
_router,
|
|
||||||
)
|
|
||||||
|
|
||||||
_router.relays[broker] = (positions, brokerd_trades_stream)
|
|
||||||
|
|
||||||
try:
|
|
||||||
yield positions, brokerd_trades_stream
|
|
||||||
|
|
||||||
finally:
|
|
||||||
# remove from cache so next client will respawn if needed
|
|
||||||
_router.relays.pop(broker)
|
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def _emsd_main(
|
async def _emsd_main(
|
||||||
|
|
||||||
|
@ -855,7 +938,7 @@ async def _emsd_main(
|
||||||
run (dark order) conditions on inputs and trigger brokerd "live"
|
run (dark order) conditions on inputs and trigger brokerd "live"
|
||||||
order submissions.
|
order submissions.
|
||||||
|
|
|
|
||||||
- ``translate_and_relay_brokerd_events()``:
|
- (maybe) ``translate_and_relay_brokerd_events()``:
|
||||||
accept normalized trades responses from brokerd, process and
|
accept normalized trades responses from brokerd, process and
|
||||||
relay to ems client(s); this is a effectively a "trade event
|
relay to ems client(s); this is a effectively a "trade event
|
||||||
reponse" proxy-broker.
|
reponse" proxy-broker.
|
||||||
|
@ -905,24 +988,24 @@ async def _emsd_main(
|
||||||
|
|
||||||
# only open if one isn't already up: we try to keep
|
# only open if one isn't already up: we try to keep
|
||||||
# as few duplicate streams as necessary
|
# as few duplicate streams as necessary
|
||||||
maybe_open_brokerd_trades_dialogue(
|
_router.maybe_open_brokerd_trades_dialogue(
|
||||||
_router,
|
|
||||||
feed,
|
feed,
|
||||||
broker,
|
|
||||||
symbol,
|
symbol,
|
||||||
dark_book,
|
dark_book,
|
||||||
_exec_mode,
|
_exec_mode,
|
||||||
loglevel,
|
loglevel,
|
||||||
|
|
||||||
) as (positions, brokerd_trades_stream),
|
) as relay,
|
||||||
|
|
||||||
trio.open_nursery() as n,
|
trio.open_nursery() as n,
|
||||||
):
|
):
|
||||||
|
|
||||||
|
brokerd_stream = relay.brokerd_dialogue #.clone()
|
||||||
|
|
||||||
# signal to client that we're started
|
# signal to client that we're started
|
||||||
# TODO: we could eventually send back **all** brokerd
|
# TODO: we could eventually send back **all** brokerd
|
||||||
# positions here?
|
# positions here?
|
||||||
await ems_ctx.started(positions)
|
await ems_ctx.started(relay.positions)
|
||||||
|
|
||||||
# establish 2-way stream with requesting order-client and
|
# establish 2-way stream with requesting order-client and
|
||||||
# begin handling inbound order requests and updates
|
# begin handling inbound order requests and updates
|
||||||
|
@ -932,7 +1015,8 @@ async def _emsd_main(
|
||||||
n.start_soon(
|
n.start_soon(
|
||||||
clear_dark_triggers,
|
clear_dark_triggers,
|
||||||
|
|
||||||
brokerd_trades_stream,
|
# relay.brokerd_dialogue,
|
||||||
|
brokerd_stream,
|
||||||
ems_client_order_stream,
|
ems_client_order_stream,
|
||||||
feed.stream,
|
feed.stream,
|
||||||
|
|
||||||
|
@ -942,12 +1026,25 @@ async def _emsd_main(
|
||||||
)
|
)
|
||||||
|
|
||||||
# start inbound (from attached client) order request processing
|
# start inbound (from attached client) order request processing
|
||||||
await process_client_order_cmds(
|
try:
|
||||||
|
await process_client_order_cmds(
|
||||||
|
|
||||||
ems_client_order_stream,
|
ems_client_order_stream,
|
||||||
brokerd_trades_stream,
|
|
||||||
symbol,
|
# relay.brokerd_dialogue,
|
||||||
feed,
|
brokerd_stream,
|
||||||
dark_book,
|
|
||||||
_router,
|
symbol,
|
||||||
)
|
feed,
|
||||||
|
dark_book,
|
||||||
|
_router,
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
pass
|
||||||
|
# for oid, client_stream in _router.dialogs.copy().items():
|
||||||
|
# if client_stream is ems_client_order_stream:
|
||||||
|
# # TODO: we need a placeholder for sending
|
||||||
|
# # the updates to an alert system inside
|
||||||
|
# # ``emsd`` ??
|
||||||
|
# print(f'popping order for stream {oid}')
|
||||||
|
# _router.dialogs.pop(oid)
|
||||||
|
|
Loading…
Reference in New Issue