Flip to `open_order_mode()` as ctx mngr

We need a subtask to compute the current pp PnL in real-time but really
only if a pp exists - a spawnable subtask would be ideal for this. Stage
a tick streaming task using a stream bcaster; no actual pnl calc yet.

Since we're going to need subtasks anyway might as well stick the order
mode UI processing loop in a task as well and then just give the whole
thing a ctx mngr api. This'll probably be handy for when we have
auto-strats that need to dynamically use the mode's api as well.

Oh, and move the time -> index mapper to a chart method for now.
fsp_feeds
Tyler Goodlet 2021-08-27 14:53:30 -04:00
parent ee377e6d6b
commit dfe4ca948a
2 changed files with 161 additions and 124 deletions

View File

@ -71,7 +71,7 @@ from .. import brokers
from ..log import get_logger from ..log import get_logger
from ._exec import run_qtractor from ._exec import run_qtractor
from ._interaction import ChartView from ._interaction import ChartView
from .order_mode import run_order_mode from .order_mode import open_order_mode
from .. import fsp from .. import fsp
from ._forms import ( from ._forms import (
FieldsForm, FieldsForm,
@ -1084,6 +1084,22 @@ class ChartPlotWidget(pg.PlotWidget):
self.sig_mouse_leave.emit(self) self.sig_mouse_leave.emit(self)
self.scene().leaveEvent(ev) self.scene().leaveEvent(ev)
def get_index(self, time: float) -> int:
# TODO: this should go onto some sort of
# data-view strimg thinger..right?
ohlc = self._shm.array
# ohlc = chart._shm.array
# XXX: not sure why the time is so off here
# looks like we're gonna have to do some fixing..
indexes = ohlc['time'] >= time
if any(indexes):
return ohlc['index'][indexes][-1]
else:
return ohlc['index'][-1]
_clear_throttle_rate: int = 60 # Hz _clear_throttle_rate: int = 60 # Hz
_book_throttle_rate: int = 16 # Hz _book_throttle_rate: int = 16 # Hz
@ -1695,19 +1711,6 @@ async def display_symbol_data(
) as feed, ) as feed,
trio.open_nursery() as n, trio.open_nursery() as n,
): ):
# async def print_quotes():
# async with feed.stream.subscribe() as bstream:
# last_tick = time.time()
# async for quotes in bstream:
# now = time.time()
# period = now - last_tick
# for sym, quote in quotes.items():
# ticks = quote.get('ticks', ())
# if ticks:
# # print(f'{1/period} Hz')
# last_tick = time.time()
# n.start_soon(print_quotes)
ohlcv: ShmArray = feed.shm ohlcv: ShmArray = feed.shm
bars = ohlcv.array bars = ohlcv.array
@ -1833,12 +1836,37 @@ async def display_symbol_data(
linkedsplits linkedsplits
) )
await run_order_mode( async with (
open_order_mode(
chart, chart,
symbol, symbol,
provider, provider,
order_mode_started order_mode_started
) ) as order_mode,
):
pp = order_mode.pp
live = pp.live_pp
# real-time update pnl on the order mode
async with feed.stream.subscribe() as bstream:
last_tick = time.time()
async for quotes in bstream:
now = time.time()
period = now - last_tick
for sym, quote in quotes.items():
ticks = quote.get('ticks', ())
if ticks:
print(f'{1/period} Hz')
if live.size:
# compute and display pnl
pass
last_tick = time.time()
async def load_provider_search( async def load_provider_search(

View File

@ -18,6 +18,7 @@
Chart trading, the only way to scalp. Chart trading, the only way to scalp.
""" """
from contextlib import asynccontextmanager
from dataclasses import dataclass, field from dataclasses import dataclass, field
from functools import partial from functools import partial
from pprint import pformat from pprint import pformat
@ -36,7 +37,7 @@ from ..data._source import Symbol
from ..log import get_logger from ..log import get_logger
from ._editors import LineEditor, ArrowEditor from ._editors import LineEditor, ArrowEditor
from ._lines import order_line, LevelLine from ._lines import order_line, LevelLine
from ._position import PositionTracker, OrderModePane, Allocator, _size_units from ._position import PositionTracker, SettingsPane, Allocator, _size_units
from ._window import MultiStatus from ._window import MultiStatus
from ..clearing._messages import Order from ..clearing._messages import Order
from ._forms import open_form_input_handling from ._forms import open_form_input_handling
@ -91,7 +92,7 @@ class OrderMode:
multistatus: MultiStatus multistatus: MultiStatus
pp: PositionTracker pp: PositionTracker
allocator: 'Allocator' # noqa allocator: 'Allocator' # noqa
pane: OrderModePane pane: SettingsPane
active: bool = False active: bool = False
@ -462,7 +463,8 @@ class OrderMode:
return ids return ids
async def run_order_mode( @asynccontextmanager
async def open_order_mode(
chart: 'ChartPlotWidget', # noqa chart: 'ChartPlotWidget', # noqa
symbol: Symbol, symbol: Symbol,
@ -493,6 +495,7 @@ async def run_order_mode(
trades_stream, trades_stream,
positions positions
), ),
trio.open_nursery() as n,
): ):
log.info(f'Opening order mode for {brokername}.{symbol.key}') log.info(f'Opening order mode for {brokername}.{symbol.key}')
@ -521,7 +524,7 @@ async def run_order_mode(
pp_tracker.hide() pp_tracker.hide()
# order pane widgets and allocation model # order pane widgets and allocation model
order_pane = OrderModePane( order_pane = SettingsPane(
tracker=pp_tracker, tracker=pp_tracker,
form=form, form=form,
alloc=alloc, alloc=alloc,
@ -574,21 +577,6 @@ async def run_order_mode(
# make fill bar and positioning snapshot # make fill bar and positioning snapshot
order_pane.init_status_ui() order_pane.init_status_ui()
# TODO: this should go onto some sort of
# data-view strimg thinger..right?
def get_index(time: float):
# XXX: not sure why the time is so off here
# looks like we're gonna have to do some fixing..
ohlc = chart._shm.array
indexes = ohlc['time'] >= time
if any(indexes):
return ohlc['index'][indexes][-1]
else:
return ohlc['index'][-1]
# Begin order-response streaming # Begin order-response streaming
done() done()
@ -611,6 +599,27 @@ async def run_order_mode(
# to handle input since the ems connection is ready # to handle input since the ems connection is ready
started.set() started.set()
n.start_soon(
process_trades_and_update_ui,
mode,
trades_stream,
book,
)
yield mode
# await trio.sleep_forever()
async def process_trades_and_update_ui(
mode: OrderMode,
trades_stream: tractor.MsgStream,
book: OrderBook,
) -> None:
get_index = mode.chart.get_index
tracker = mode.pp
# this is where we receive **back** messages # this is where we receive **back** messages
# about executions **from** the EMS actor # about executions **from** the EMS actor
async for msg in trades_stream: async for msg in trades_stream:
@ -626,7 +635,7 @@ async def run_order_mode(
sym = mode.chart.linked.symbol sym = mode.chart.linked.symbol
if msg['symbol'].lower() in sym.key: if msg['symbol'].lower() in sym.key:
pp_tracker.update(msg) tracker.update(msg)
# update order pane widgets # update order pane widgets
mode.pane.update_status_ui() mode.pane.update_status_ui()
@ -717,4 +726,4 @@ async def run_order_mode(
arrow_index=get_index(details['broker_time']), arrow_index=get_index(details['broker_time']),
) )
pp_tracker.live_pp.fills.append(msg) tracker.live_pp.fills.append(msg)