Create and "EMS" module for order execution/routing actor(s)

basic_alerts
Tyler Goodlet 2021-01-01 17:48:22 -05:00
parent 80d48e5ece
commit a55d72f8d6
3 changed files with 104 additions and 79 deletions

91
piker/_ems.py 100644
View File

@ -0,0 +1,91 @@
# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
In suit parlance: "Execution management systems"
"""
import trio
from trio_typing import TaskStatus
import tractor
_to_router: trio.abc.SendChannel = None
_from_ui: trio.abc.ReceiveChannel = None
# TODO: make this a ``tractor.msg.pub``
async def stream_orders():
"""Order streaming task: deliver orders transmitted from UI
to downstream consumers.
This is run in the UI actor (usually the one running Qt).
The UI simply delivers order messages to the above ``_to_router``
send channel (from sync code using ``.send_nowait()``), these values
are pulled from the channel here and send to any consumer(s).
"""
global _from_ui
async for order in _from_ui:
yield order
async def stream_and_route(ui_name):
"""Order router (sub)actor entrypoint.
"""
actor = tractor.current_actor()
# new router entry point
async with tractor.wait_for_actor(ui_name) as portal:
async for order in await portal.run(stream_orders):
print(f'order {order} received in {actor.uid}')
# push order back to parent as an "alert"
# (mocking for eg. a "fill")
yield order
async def spawn_router_stream_alerts(
ident: str,
task_status: TaskStatus[str] = trio.TASK_STATUS_IGNORED,
) -> None:
# setup local ui event streaming channels
global _from_ui, _to_router
_to_router, _from_ui = trio.open_memory_channel(100)
actor = tractor.current_actor()
subactor_name = ident + '.router'
async with tractor.open_nursery() as n:
portal = await n.start_actor(
subactor_name,
rpc_module_paths=[__name__],
)
stream = await portal.run(
stream_and_route,
ui_name=actor.name
)
async with tractor.wait_for_actor(subactor_name):
# let parent task continue
task_status.started(_to_router)
async for alert in stream:
print(f'alert {alert} received in {actor.uid}')

View File

@ -16,6 +16,7 @@
""" """
High level Qt chart widgets. High level Qt chart widgets.
""" """
from typing import Tuple, Dict, Any, Optional, Callable from typing import Tuple, Dict, Any, Optional, Callable
from functools import partial from functools import partial
@ -25,7 +26,6 @@ import numpy as np
import pyqtgraph as pg import pyqtgraph as pg
import tractor import tractor
import trio import trio
from trio_typing import TaskStatus
from ._axes import ( from ._axes import (
DynamicDateAxis, DynamicDateAxis,
@ -59,6 +59,7 @@ from ..log import get_logger
from ._exec import run_qtractor, current_screen from ._exec import run_qtractor, current_screen
from ._interaction import ChartView from ._interaction import ChartView
from .. import fsp from .. import fsp
from .._ems import spawn_router_stream_alerts, _to_router
log = get_logger(__name__) log = get_logger(__name__)
@ -818,73 +819,6 @@ class ChartPlotWidget(pg.PlotWidget):
self.scene().leaveEvent(ev) self.scene().leaveEvent(ev)
_to_router: trio.abc.SendChannel = None
_from_ui: trio.abc.ReceiveChannel = None
# TODO: make this a ``tractor.msg.pub``
async def stream_orders():
"""Order streaming task: deliver orders transmitted from UI
to downstream consumers.
This is run in the UI actor (usually the one running Qt).
The UI simply delivers order messages to the above ``_to_router``
send channel (from sync code using ``.send_nowait()``), these values
are pulled from the channel here and send to any consumer(s).
"""
global _from_ui
async for order in _from_ui:
yield order
async def stream_and_route(ui_name):
"""Order router actor entrypoint.
"""
actor = tractor.current_actor()
# new router entry point
async with tractor.wait_for_actor(ui_name) as portal:
async for order in await portal.run(stream_orders):
print(f'order {order} received in {actor.uid}')
# push order back to parent as an "alert"
# (mocking for eg. a "fill")
yield order
async def spawn_router_stream_alerts(
ident: str,
task_status: TaskStatus[str] = trio.TASK_STATUS_IGNORED,
) -> None:
# setup local ui event streaming channels
global _from_ui, _to_router
_to_router, _from_ui = trio.open_memory_channel(100)
actor = tractor.current_actor()
subactor_name = ident + '.router'
async with tractor.open_nursery() as n:
portal = await n.start_actor(
subactor_name,
rpc_module_paths=[__name__],
)
stream = await portal.run(
stream_and_route,
ui_name=actor.name
)
# let parent task continue
task_status.started(subactor_name)
async for alert in stream:
print(f'alert {alert} received in {actor.uid}')
async def _async_main( async def _async_main(
sym: str, sym: str,
brokername: str, brokername: str,
@ -970,16 +904,6 @@ async def _async_main(
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
router_name = await n.start(
spawn_router_stream_alerts,
sym,
)
# wait for router to come up before setting
# enabling send channel on chart
async with tractor.wait_for_actor(router_name):
global _to_router
linked_charts._to_router = _to_router
# load initial fsp chain (otherwise known as "indicators") # load initial fsp chain (otherwise known as "indicators")
n.start_soon( n.start_soon(
@ -1001,8 +925,18 @@ async def _async_main(
wap_in_history, wap_in_history,
) )
router_send_chan = await n.start(
spawn_router_stream_alerts,
sym,
)
# wait for router to come up before setting
# enabling send channel on chart
linked_charts._to_router = router_send_chan
# wait for a first quote before we start any update tasks # wait for a first quote before we start any update tasks
quote = await feed.receive() quote = await feed.receive()
log.info(f'Received first quote {quote}') log.info(f'Received first quote {quote}')
n.start_soon( n.start_soon(

View File

@ -150,6 +150,6 @@ def chart(config, symbol, date, rate, test, profile):
tractor_kwargs={ tractor_kwargs={
'debug_mode': True, 'debug_mode': True,
'loglevel': tractorloglevel, 'loglevel': tractorloglevel,
'rpc_module_paths': ['piker.ui._chart'], 'rpc_module_paths': ['piker._ems'],
}, },
) )