Add charting components from `Quantdom`

Hand select necessary components to get real-time charting with
`pyqtgraph` from the `Quantdom` projects:
https://github.com/constverum/Quantdom

We've offered to collaborate with the author but have received no
response and the project has not been updated in over a year.
Given this, we are moving forward with taking the required components to
make further improvements upon especially since the `pyqtgraph` project
is now being actively maintained again.

If the author comes back we will be more then happy to contribute
modified components upstream:
https://github.com/constverum/Quantdom/issues/18

Relates to #80
its_happening
Tyler Goodlet 2020-06-10 12:50:09 -04:00
parent b7f306b715
commit 42aa2bce5b
8 changed files with 1991 additions and 0 deletions

View File

@ -0,0 +1,10 @@
"""
Curated set of components from ``Quantdom`` used as a starting
draft for real-time charting with ``pyqtgraph``.
Much thanks to the author:
https://github.com/constverum/Quantdom
Note this code is licensed Apache 2.0:
https://github.com/constverum/Quantdom/blob/master/LICENSE
"""

View File

@ -0,0 +1,132 @@
"""Base classes."""
from enum import Enum, auto
import numpy as np
import pandas as pd
from .const import ChartType, TimeFrame
__all__ = ('Indicator', 'Symbol', 'Quotes')
class BaseQuotes(np.recarray):
def __new__(cls, shape=None, dtype=None, order='C'):
dt = np.dtype(
[
('id', int),
('time', float),
('open', float),
('high', float),
('low', float),
('close', float),
('volume', int),
]
)
shape = shape or (1,)
return np.ndarray.__new__(cls, shape, (np.record, dt), order=order)
def _nan_to_closest_num(self):
"""Return interpolated values instead of NaN."""
for col in ['open', 'high', 'low', 'close']:
mask = np.isnan(self[col])
if not mask.size:
continue
self[col][mask] = np.interp(
np.flatnonzero(mask), np.flatnonzero(~mask), self[col][~mask]
)
def _set_time_frame(self, default_tf):
tf = {
1: TimeFrame.M1,
5: TimeFrame.M5,
15: TimeFrame.M15,
30: TimeFrame.M30,
60: TimeFrame.H1,
240: TimeFrame.H4,
1440: TimeFrame.D1,
}
minutes = int(np.diff(self.time[-10:]).min() / 60)
self.timeframe = tf.get(minutes) or tf[default_tf]
def new(self, data, source=None, default_tf=None):
shape = (len(data),)
self.resize(shape, refcheck=False)
if isinstance(data, pd.DataFrame):
data.reset_index(inplace=True)
data.insert(0, 'id', data.index)
data.Date = self.convert_dates(data.Date)
data = data.rename(
columns={
'Date': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
}
)
for name in self.dtype.names:
self[name] = data[name]
elif isinstance(data, (np.recarray, BaseQuotes)):
self[:] = data[:]
self._nan_to_closest_num()
self._set_time_frame(default_tf)
return self
def convert_dates(self, dates):
return np.array([d.timestamp() for d in dates])
class SymbolType(Enum):
FOREX = auto()
CFD = auto()
FUTURES = auto()
SHARES = auto()
class Symbol:
FOREX = SymbolType.FOREX
CFD = SymbolType.CFD
FUTURES = SymbolType.FUTURES
SHARES = SymbolType.SHARES
def __init__(self, ticker, mode, tick_size=0, tick_value=None):
self.ticker = ticker
self.mode = mode
if self.mode in [self.FOREX, self.CFD]:
# number of units of the commodity, currency
# or financial asset in one lot
self.contract_size = 100_000 # (100000 == 1 Lot)
elif self.mode == self.FUTURES:
# cost of a single price change point ($10) /
# one minimum price movement
self.tick_value = tick_value
# minimum price change step (0.0001)
self.tick_size = tick_size
if isinstance(tick_size, float):
self.digits = len(str(tick_size).split('.')[1])
else:
self.digits = 0
def __repr__(self):
return 'Symbol (%s | %s)' % (self.ticker, self.mode)
class Indicator:
def __init__(
self, label=None, window=None, data=None, tp=None, base=None, **kwargs
):
self.label = label
self.window = window
self.data = data or [0]
self.type = tp or ChartType.LINE
self.base = base or {'linewidth': 0.5, 'color': 'black'}
self.lineStyle = {'linestyle': '-', 'linewidth': 0.5, 'color': 'blue'}
self.lineStyle.update(kwargs)
Quotes = BaseQuotes()

View File

@ -0,0 +1,799 @@
"""Chart."""
import numpy as np
import pyqtgraph as pg
from PyQt5 import QtCore, QtGui
from .base import Quotes
from .const import ChartType
from .portfolio import Order, Portfolio
from .utils import fromtimestamp, timeit
__all__ = ('QuotesChart', 'EquityChart')
# pg.setConfigOption('background', 'w')
CHART_MARGINS = (0, 0, 20, 5)
class SampleLegendItem(pg.graphicsItems.LegendItem.ItemSample):
def paint(self, p, *args):
p.setRenderHint(p.Antialiasing)
if isinstance(self.item, tuple):
positive = self.item[0].opts
negative = self.item[1].opts
p.setPen(pg.mkPen(positive['pen']))
p.setBrush(pg.mkBrush(positive['brush']))
p.drawPolygon(
QtGui.QPolygonF(
[
QtCore.QPointF(0, 0),
QtCore.QPointF(18, 0),
QtCore.QPointF(18, 18),
]
)
)
p.setPen(pg.mkPen(negative['pen']))
p.setBrush(pg.mkBrush(negative['brush']))
p.drawPolygon(
QtGui.QPolygonF(
[
QtCore.QPointF(0, 0),
QtCore.QPointF(0, 18),
QtCore.QPointF(18, 18),
]
)
)
else:
opts = self.item.opts
p.setPen(pg.mkPen(opts['pen']))
p.drawRect(0, 10, 18, 0.5)
class PriceAxis(pg.AxisItem):
def __init__(self):
super().__init__(orientation='right')
self.style.update({'textFillLimits': [(0, 0.8)]})
def tickStrings(self, vals, scale, spacing):
digts = max(0, np.ceil(-np.log10(spacing * scale)))
return [
('{:<8,.%df}' % digts).format(v).replace(',', ' ') for v in vals
]
class DateAxis(pg.AxisItem):
tick_tpl = {'D1': '%d %b\n%Y'}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.quotes_count = len(Quotes) - 1
def tickStrings(self, values, scale, spacing):
s_period = 'D1'
strings = []
for ibar in values:
if ibar > self.quotes_count:
return strings
dt_tick = fromtimestamp(Quotes[int(ibar)].time)
strings.append(dt_tick.strftime(self.tick_tpl[s_period]))
return strings
class CenteredTextItem(QtGui.QGraphicsTextItem):
def __init__(
self,
text='',
parent=None,
pos=(0, 0),
pen=None,
brush=None,
valign=None,
opacity=0.1,
):
super().__init__(text, parent)
self.pen = pen
self.brush = brush
self.opacity = opacity
self.valign = valign
self.text_flags = QtCore.Qt.AlignCenter
self.setPos(*pos)
self.setFlag(self.ItemIgnoresTransformations)
def boundingRect(self): # noqa
r = super().boundingRect()
if self.valign == QtCore.Qt.AlignTop:
return QtCore.QRectF(-r.width() / 2, -37, r.width(), r.height())
elif self.valign == QtCore.Qt.AlignBottom:
return QtCore.QRectF(-r.width() / 2, 15, r.width(), r.height())
def paint(self, p, option, widget):
p.setRenderHint(p.Antialiasing, False)
p.setRenderHint(p.TextAntialiasing, True)
p.setPen(self.pen)
if self.brush.style() != QtCore.Qt.NoBrush:
p.setOpacity(self.opacity)
p.fillRect(option.rect, self.brush)
p.setOpacity(1)
p.drawText(option.rect, self.text_flags, self.toPlainText())
class AxisLabel(pg.GraphicsObject):
bg_color = pg.mkColor('#dbdbdb')
fg_color = pg.mkColor('#000000')
def __init__(self, parent=None, digits=0, color=None, opacity=1, **kwargs):
super().__init__(parent)
self.parent = parent
self.opacity = opacity
self.label_str = ''
self.digits = digits
self.quotes_count = len(Quotes) - 1
if isinstance(color, QtGui.QPen):
self.bg_color = color.color()
self.fg_color = pg.mkColor('#ffffff')
elif isinstance(color, list):
self.bg_color = {'>0': color[0].color(), '<0': color[1].color()}
self.fg_color = pg.mkColor('#ffffff')
self.setFlag(self.ItemIgnoresTransformations)
def tick_to_string(self, tick_pos):
raise NotImplementedError()
def boundingRect(self): # noqa
raise NotImplementedError()
def update_label(self, evt_post, point_view):
raise NotImplementedError()
def update_label_test(self, ypos=0, ydata=0):
self.label_str = self.tick_to_string(ydata)
height = self.boundingRect().height()
offset = 0 # if have margins
new_pos = QtCore.QPointF(0, ypos - height / 2 - offset)
self.setPos(new_pos)
def paint(self, p, option, widget):
p.setRenderHint(p.TextAntialiasing, True)
p.setPen(self.fg_color)
if self.label_str:
if not isinstance(self.bg_color, dict):
bg_color = self.bg_color
else:
if int(self.label_str.replace(' ', '')) > 0:
bg_color = self.bg_color['>0']
else:
bg_color = self.bg_color['<0']
p.setOpacity(self.opacity)
p.fillRect(option.rect, bg_color)
p.setOpacity(1)
p.drawText(option.rect, self.text_flags, self.label_str)
class XAxisLabel(AxisLabel):
text_flags = (
QtCore.Qt.TextDontClip | QtCore.Qt.AlignCenter | QtCore.Qt.AlignTop
)
def tick_to_string(self, tick_pos):
# TODO: change to actual period
tpl = self.parent.tick_tpl['D1']
return fromtimestamp(Quotes[round(tick_pos)].time).strftime(tpl)
def boundingRect(self): # noqa
return QtCore.QRectF(0, 0, 60, 38)
def update_label(self, evt_post, point_view):
ibar = point_view.x()
if ibar > self.quotes_count:
return
self.label_str = self.tick_to_string(ibar)
width = self.boundingRect().width()
offset = 0 # if have margins
new_pos = QtCore.QPointF(evt_post.x() - width / 2 - offset, 0)
self.setPos(new_pos)
class YAxisLabel(AxisLabel):
text_flags = (
QtCore.Qt.TextDontClip | QtCore.Qt.AlignLeft | QtCore.Qt.AlignVCenter
)
def tick_to_string(self, tick_pos):
return ('{: ,.%df}' % self.digits).format(tick_pos).replace(',', ' ')
def boundingRect(self): # noqa
return QtCore.QRectF(0, 0, 74, 24)
def update_label(self, evt_post, point_view):
self.label_str = self.tick_to_string(point_view.y())
height = self.boundingRect().height()
offset = 0 # if have margins
new_pos = QtCore.QPointF(0, evt_post.y() - height / 2 - offset)
self.setPos(new_pos)
class CustomPlotWidget(pg.PlotWidget):
sig_mouse_leave = QtCore.Signal(object)
sig_mouse_enter = QtCore.Signal(object)
def enterEvent(self, ev): # noqa
self.sig_mouse_enter.emit(self)
def leaveEvent(self, ev): # noqa
self.sig_mouse_leave.emit(self)
self.scene().leaveEvent(ev)
_rate_limit = 30
class CrossHairItem(pg.GraphicsObject):
def __init__(self, parent, indicators=None, digits=0):
super().__init__()
# self.pen = pg.mkPen('#000000')
self.pen = pg.mkPen('#a9a9a9')
self.parent = parent
self.indicators = {}
self.activeIndicator = None
self.xaxis = self.parent.getAxis('bottom')
self.yaxis = self.parent.getAxis('right')
self.vline = self.parent.addLine(x=0, pen=self.pen, movable=False)
self.hline = self.parent.addLine(y=0, pen=self.pen, movable=False)
self.proxy_moved = pg.SignalProxy(
self.parent.scene().sigMouseMoved,
rateLimit=_rate_limit,
slot=self.mouseMoved,
)
self.yaxis_label = YAxisLabel(
parent=self.yaxis, digits=digits, opacity=1
)
indicators = indicators or []
if indicators:
last_ind = indicators[-1]
self.xaxis_label = XAxisLabel(
parent=last_ind.getAxis('bottom'), opacity=1
)
self.proxy_enter = pg.SignalProxy(
self.parent.sig_mouse_enter,
rateLimit=_rate_limit,
slot=lambda: self.mouseAction('Enter', False),
)
self.proxy_leave = pg.SignalProxy(
self.parent.sig_mouse_leave,
rateLimit=_rate_limit,
slot=lambda: self.mouseAction('Leave', False),
)
else:
self.xaxis_label = XAxisLabel(parent=self.xaxis, opacity=1)
for i in indicators:
vl = i.addLine(x=0, pen=self.pen, movable=False)
hl = i.addLine(y=0, pen=self.pen, movable=False)
yl = YAxisLabel(parent=i.getAxis('right'), opacity=1)
px_moved = pg.SignalProxy(
i.scene().sigMouseMoved, rateLimit=_rate_limit, slot=self.mouseMoved
)
px_enter = pg.SignalProxy(
i.sig_mouse_enter,
rateLimit=_rate_limit,
slot=lambda: self.mouseAction('Enter', i),
)
px_leave = pg.SignalProxy(
i.sig_mouse_leave,
rateLimit=_rate_limit,
slot=lambda: self.mouseAction('Leave', i),
)
self.indicators[i] = {
'vl': vl,
'hl': hl,
'yl': yl,
'px': (px_moved, px_enter, px_leave),
}
def mouseAction(self, action, ind=False): # noqa
if action == 'Enter':
if ind:
self.indicators[ind]['hl'].show()
self.indicators[ind]['yl'].show()
self.activeIndicator = ind
else:
self.yaxis_label.show()
self.hline.show()
else: # Leave
if ind:
self.indicators[ind]['hl'].hide()
self.indicators[ind]['yl'].hide()
self.activeIndicator = None
else:
self.yaxis_label.hide()
self.hline.hide()
def mouseMoved(self, evt): # noqa
pos = evt[0]
if self.parent.sceneBoundingRect().contains(pos):
# mouse_point = self.vb.mapSceneToView(pos)
mouse_point = self.parent.mapToView(pos)
self.vline.setX(mouse_point.x())
self.xaxis_label.update_label(evt_post=pos, point_view=mouse_point)
for opts in self.indicators.values():
opts['vl'].setX(mouse_point.x())
if self.activeIndicator:
mouse_point_ind = self.activeIndicator.mapToView(pos)
self.indicators[self.activeIndicator]['hl'].setY(
mouse_point_ind.y()
)
self.indicators[self.activeIndicator]['yl'].update_label(
evt_post=pos, point_view=mouse_point_ind
)
else:
self.hline.setY(mouse_point.y())
self.yaxis_label.update_label(
evt_post=pos, point_view=mouse_point
)
def paint(self, p, *args):
pass
def boundingRect(self):
return self.parent.boundingRect()
class BarItem(pg.GraphicsObject):
w = 0.35
bull_brush = pg.mkPen('#00cc00')
bear_brush = pg.mkPen('#fa0000')
def __init__(self):
super().__init__()
self.generatePicture()
def _generate(self, p):
hl = np.array(
[QtCore.QLineF(q.id, q.low, q.id, q.high) for q in Quotes]
)
op = np.array(
[QtCore.QLineF(q.id - self.w, q.open, q.id, q.open) for q in Quotes]
)
cl = np.array(
[
QtCore.QLineF(q.id + self.w, q.close, q.id, q.close)
for q in Quotes
]
)
lines = np.concatenate([hl, op, cl])
long_bars = np.resize(Quotes.close > Quotes.open, len(lines))
short_bars = np.resize(Quotes.close < Quotes.open, len(lines))
p.setPen(self.bull_brush)
p.drawLines(*lines[long_bars])
p.setPen(self.bear_brush)
p.drawLines(*lines[short_bars])
@timeit
def generatePicture(self):
self.picture = QtGui.QPicture()
p = QtGui.QPainter(self.picture)
self._generate(p)
p.end()
def paint(self, p, *args):
p.drawPicture(0, 0, self.picture)
def boundingRect(self):
return QtCore.QRectF(self.picture.boundingRect())
class CandlestickItem(BarItem):
w2 = 0.7
line_pen = pg.mkPen('#000000')
bull_brush = pg.mkBrush('#00ff00')
bear_brush = pg.mkBrush('#ff0000')
def _generate(self, p):
rects = np.array(
[
QtCore.QRectF(q.id - self.w, q.open, self.w2, q.close - q.open)
for q in Quotes
]
)
p.setPen(self.line_pen)
p.drawLines([QtCore.QLineF(q.id, q.low, q.id, q.high) for q in Quotes])
p.setBrush(self.bull_brush)
p.drawRects(*rects[Quotes.close > Quotes.open])
p.setBrush(self.bear_brush)
p.drawRects(*rects[Quotes.close < Quotes.open])
class QuotesChart(QtGui.QWidget):
long_pen = pg.mkPen('#006000')
long_brush = pg.mkBrush('#00ff00')
short_pen = pg.mkPen('#600000')
short_brush = pg.mkBrush('#ff0000')
zoomIsDisabled = QtCore.pyqtSignal(bool)
def __init__(self):
super().__init__()
self.signals_visible = False
self.style = ChartType.BAR
self.indicators = []
self.xaxis = DateAxis(orientation='bottom')
self.xaxis.setStyle(
tickTextOffset=7, textFillLimits=[(0, 0.80)], showValues=False
)
self.xaxis_ind = DateAxis(orientation='bottom')
self.xaxis_ind.setStyle(tickTextOffset=7, textFillLimits=[(0, 0.80)])
self.layout = QtGui.QVBoxLayout(self)
self.layout.setContentsMargins(0, 0, 0, 0)
self.splitter = QtGui.QSplitter(QtCore.Qt.Vertical)
self.splitter.setHandleWidth(4)
self.layout.addWidget(self.splitter)
def _show_text_signals(self, lbar, rbar):
signals = [
sig
for sig in self.signals_text_items[lbar:rbar]
if isinstance(sig, CenteredTextItem)
]
if len(signals) <= 50:
for sig in signals:
sig.show()
else:
for sig in signals:
sig.hide()
def _remove_signals(self):
self.chart.removeItem(self.signals_group_arrow)
self.chart.removeItem(self.signals_group_text)
del self.signals_text_items
del self.signals_group_arrow
del self.signals_group_text
self.signals_visible = False
def _update_quotes_chart(self):
self.chart.hideAxis('left')
self.chart.showAxis('right')
self.chart.addItem(_get_chart_points(self.style))
self.chart.setLimits(
xMin=Quotes[0].id,
xMax=Quotes[-1].id,
minXRange=60,
yMin=Quotes.low.min() * 0.98,
yMax=Quotes.high.max() * 1.02,
)
self.chart.showGrid(x=True, y=True)
self.chart.setCursor(QtCore.Qt.BlankCursor)
self.chart.sigXRangeChanged.connect(self._update_yrange_limits)
def _update_ind_charts(self):
for ind, d in self.indicators:
curve = pg.PlotDataItem(d, pen='b', antialias=True)
ind.addItem(curve)
ind.hideAxis('left')
ind.showAxis('right')
# ind.setAspectLocked(1)
ind.setXLink(self.chart)
ind.setLimits(
xMin=Quotes[0].id,
xMax=Quotes[-1].id,
minXRange=60,
yMin=Quotes.open.min() * 0.98,
yMax=Quotes.open.max() * 1.02,
)
ind.showGrid(x=True, y=True)
ind.setCursor(QtCore.Qt.BlankCursor)
def _update_sizes(self):
min_h_ind = int(self.height() * 0.3 / len(self.indicators))
sizes = [int(self.height() * 0.7)]
sizes.extend([min_h_ind] * len(self.indicators))
self.splitter.setSizes(sizes) # , int(self.height()*0.2)
def _update_yrange_limits(self):
vr = self.chart.viewRect()
lbar, rbar = int(vr.left()), int(vr.right())
if self.signals_visible:
self._show_text_signals(lbar, rbar)
bars = Quotes[lbar:rbar]
ylow = bars.low.min() * 0.98
yhigh = bars.high.max() * 1.02
std = np.std(bars.close)
self.chart.setLimits(yMin=ylow, yMax=yhigh, minYRange=std)
self.chart.setYRange(ylow, yhigh)
for i, d in self.indicators:
# ydata = i.plotItem.items[0].getData()[1]
ydata = d[lbar:rbar]
ylow = ydata.min() * 0.98
yhigh = ydata.max() * 1.02
std = np.std(ydata)
i.setLimits(yMin=ylow, yMax=yhigh, minYRange=std)
i.setYRange(ylow, yhigh)
def plot(self, symbol):
self.digits = symbol.digits
self.chart = CustomPlotWidget(
parent=self.splitter,
axisItems={'bottom': self.xaxis, 'right': PriceAxis()},
enableMenu=False,
)
# self.chart.getPlotItem().setContentsMargins(*CHART_MARGINS)
self.chart.setFrameStyle(QtGui.QFrame.StyledPanel | QtGui.QFrame.Plain)
inds = [Quotes.open]
for d in inds:
ind = CustomPlotWidget(
parent=self.splitter,
axisItems={'bottom': self.xaxis_ind, 'right': PriceAxis()},
enableMenu=False,
)
ind.setFrameStyle(QtGui.QFrame.StyledPanel | QtGui.QFrame.Plain)
ind.getPlotItem().setContentsMargins(*CHART_MARGINS)
# self.splitter.addWidget(ind)
self.indicators.append((ind, d))
self._update_quotes_chart()
self._update_ind_charts()
self._update_sizes()
ch = CrossHairItem(
self.chart, [_ind for _ind, d in self.indicators], self.digits
)
self.chart.addItem(ch)
def add_signals(self):
self.signals_group_text = QtGui.QGraphicsItemGroup()
self.signals_group_arrow = QtGui.QGraphicsItemGroup()
self.signals_text_items = np.empty(len(Quotes), dtype=object)
for p in Portfolio.positions:
x, price = p.id_bar_open, p.open_price
if p.type == Order.BUY:
y = Quotes[x].low * 0.99
pg.ArrowItem(
parent=self.signals_group_arrow,
pos=(x, y),
pen=self.long_pen,
brush=self.long_brush,
angle=90,
headLen=12,
tipAngle=50,
)
text_sig = CenteredTextItem(
parent=self.signals_group_text,
pos=(x, y),
pen=self.long_pen,
brush=self.long_brush,
text=('Buy at {:.%df}' % self.digits).format(price),
valign=QtCore.Qt.AlignBottom,
)
text_sig.hide()
else:
y = Quotes[x].high * 1.01
pg.ArrowItem(
parent=self.signals_group_arrow,
pos=(x, y),
pen=self.short_pen,
brush=self.short_brush,
angle=-90,
headLen=12,
tipAngle=50,
)
text_sig = CenteredTextItem(
parent=self.signals_group_text,
pos=(x, y),
pen=self.short_pen,
brush=self.short_brush,
text=('Sell at {:.%df}' % self.digits).format(price),
valign=QtCore.Qt.AlignTop,
)
text_sig.hide()
self.signals_text_items[x] = text_sig
self.chart.addItem(self.signals_group_arrow)
self.chart.addItem(self.signals_group_text)
self.signals_visible = True
class EquityChart(QtGui.QWidget):
eq_pen_pos_color = pg.mkColor('#00cc00')
eq_pen_neg_color = pg.mkColor('#cc0000')
eq_brush_pos_color = pg.mkColor('#40ee40')
eq_brush_neg_color = pg.mkColor('#ee4040')
long_pen_color = pg.mkColor('#008000')
short_pen_color = pg.mkColor('#800000')
buy_and_hold_pen_color = pg.mkColor('#4444ff')
def __init__(self):
super().__init__()
self.xaxis = DateAxis(orientation='bottom')
self.xaxis.setStyle(tickTextOffset=7, textFillLimits=[(0, 0.80)])
self.yaxis = PriceAxis()
self.layout = QtGui.QVBoxLayout(self)
self.layout.setContentsMargins(0, 0, 0, 0)
self.chart = pg.PlotWidget(
axisItems={'bottom': self.xaxis, 'right': self.yaxis},
enableMenu=False,
)
self.chart.setFrameStyle(QtGui.QFrame.StyledPanel | QtGui.QFrame.Plain)
self.chart.getPlotItem().setContentsMargins(*CHART_MARGINS)
self.chart.showGrid(x=True, y=True)
self.chart.hideAxis('left')
self.chart.showAxis('right')
self.chart.setCursor(QtCore.Qt.BlankCursor)
self.chart.sigXRangeChanged.connect(self._update_yrange_limits)
self.layout.addWidget(self.chart)
def _add_legend(self):
legend = pg.LegendItem((140, 100), offset=(10, 10))
legend.setParentItem(self.chart.getPlotItem())
for arr, item in self.curves:
legend.addItem(
SampleLegendItem(item),
item.opts['name']
if not isinstance(item, tuple)
else item[0].opts['name'],
)
def _add_ylabels(self):
self.ylabels = []
for arr, item in self.curves:
color = (
item.opts['pen']
if not isinstance(item, tuple)
else [i.opts['pen'] for i in item]
)
label = YAxisLabel(parent=self.yaxis, color=color)
self.ylabels.append(label)
def _update_ylabels(self, vb, rbar):
for i, curve in enumerate(self.curves):
arr, item = curve
ylast = arr[rbar]
ypos = vb.mapFromView(QtCore.QPointF(0, ylast)).y()
axlabel = self.ylabels[i]
axlabel.update_label_test(ypos=ypos, ydata=ylast)
def _update_yrange_limits(self, vb=None):
if not hasattr(self, 'min_curve'):
return
vr = self.chart.viewRect()
lbar, rbar = int(vr.left()), int(vr.right())
ylow = self.min_curve[lbar:rbar].min() * 1.1
yhigh = self.max_curve[lbar:rbar].max() * 1.1
std = np.std(self.max_curve[lbar:rbar]) * 4
self.chart.setLimits(yMin=ylow, yMax=yhigh, minYRange=std)
self.chart.setYRange(ylow, yhigh)
self._update_ylabels(vb, rbar)
@timeit
def plot(self):
equity_curve = Portfolio.equity_curve
eq_pos = np.zeros_like(equity_curve)
eq_neg = np.zeros_like(equity_curve)
eq_pos[equity_curve >= 0] = equity_curve[equity_curve >= 0]
eq_neg[equity_curve <= 0] = equity_curve[equity_curve <= 0]
# Equity
self.eq_pos_curve = pg.PlotCurveItem(
eq_pos,
name='Equity',
fillLevel=0,
antialias=True,
pen=self.eq_pen_pos_color,
brush=self.eq_brush_pos_color,
)
self.eq_neg_curve = pg.PlotCurveItem(
eq_neg,
name='Equity',
fillLevel=0,
antialias=True,
pen=self.eq_pen_neg_color,
brush=self.eq_brush_neg_color,
)
self.chart.addItem(self.eq_pos_curve)
self.chart.addItem(self.eq_neg_curve)
# Only Long
self.long_curve = pg.PlotCurveItem(
Portfolio.long_curve,
name='Only Long',
pen=self.long_pen_color,
antialias=True,
)
self.chart.addItem(self.long_curve)
# Only Short
self.short_curve = pg.PlotCurveItem(
Portfolio.short_curve,
name='Only Short',
pen=self.short_pen_color,
antialias=True,
)
self.chart.addItem(self.short_curve)
# Buy and Hold
self.buy_and_hold_curve = pg.PlotCurveItem(
Portfolio.buy_and_hold_curve,
name='Buy and Hold',
pen=self.buy_and_hold_pen_color,
antialias=True,
)
self.chart.addItem(self.buy_and_hold_curve)
self.curves = [
(Portfolio.equity_curve, (self.eq_pos_curve, self.eq_neg_curve)),
(Portfolio.long_curve, self.long_curve),
(Portfolio.short_curve, self.short_curve),
(Portfolio.buy_and_hold_curve, self.buy_and_hold_curve),
]
self._add_legend()
self._add_ylabels()
ch = CrossHairItem(self.chart)
self.chart.addItem(ch)
arrs = (
Portfolio.equity_curve,
Portfolio.buy_and_hold_curve,
Portfolio.long_curve,
Portfolio.short_curve,
)
np_arrs = np.concatenate(arrs)
_min = abs(np_arrs.min()) * -1.1
_max = np_arrs.max() * 1.1
self.chart.setLimits(
xMin=Quotes[0].id,
xMax=Quotes[-1].id,
yMin=_min,
yMax=_max,
minXRange=60,
)
self.min_curve = arrs[0].copy()
self.max_curve = arrs[0].copy()
for arr in arrs[1:]:
self.min_curve = np.minimum(self.min_curve, arr)
self.max_curve = np.maximum(self.max_curve, arr)
def _get_chart_points(style):
if style == ChartType.CANDLESTICK:
return CandlestickItem()
elif style == ChartType.BAR:
return BarItem()
return pg.PlotDataItem(Quotes.close, pen='b')

View File

@ -0,0 +1,36 @@
"""Constants."""
from enum import Enum, auto
__all__ = ('ChartType', 'TimeFrame')
class ChartType(Enum):
BAR = auto()
CANDLESTICK = auto()
LINE = auto()
class TimeFrame(Enum):
M1 = auto()
M5 = auto()
M15 = auto()
M30 = auto()
H1 = auto()
H4 = auto()
D1 = auto()
W1 = auto()
MN = auto()
ANNUAL_PERIOD = 252 # number of trading days in a year
# # TODO: 6.5 - US trading hours (trading session); fix it for fx
# ANNUALIZATION_FACTORS = {
# TimeFrame.M1: int(252 * 6.5 * 60),
# TimeFrame.M5: int(252 * 6.5 * 12),
# TimeFrame.M15: int(252 * 6.5 * 4),
# TimeFrame.M30: int(252 * 6.5 * 2),
# TimeFrame.H1: int(252 * 6.5),
# TimeFrame.D1: 252,
# }

View File

@ -0,0 +1,172 @@
"""Parser."""
import logging
import os.path
import pickle
import pandas as pd
import pandas_datareader.data as web
from pandas_datareader._utils import RemoteDataError
from pandas_datareader.data import (
get_data_quandl,
get_data_yahoo,
get_data_alphavantage,
)
from pandas_datareader.nasdaq_trader import get_nasdaq_symbols
from pandas_datareader.exceptions import ImmediateDeprecationError
from .base import Quotes
from .utils import get_data_path, timeit
__all__ = (
'YahooQuotesLoader',
'QuandleQuotesLoader',
'get_symbols',
'get_quotes',
)
logger = logging.getLogger(__name__)
class QuotesLoader:
source = None
timeframe = '1D'
sort_index = False
default_tf = None
name_format = '%(symbol)s_%(tf)s_%(date_from)s_%(date_to)s.%(ext)s'
@classmethod
def _get(cls, symbol, date_from, date_to):
quotes = web.DataReader(
symbol, cls.source, start=date_from, end=date_to
)
if cls.sort_index:
quotes.sort_index(inplace=True)
return quotes
@classmethod
def _get_file_path(cls, symbol, tf, date_from, date_to):
fname = cls.name_format % {
'symbol': symbol,
'tf': tf,
'date_from': date_from.isoformat(),
'date_to': date_to.isoformat(),
'ext': 'qdom',
}
return os.path.join(get_data_path('stock_data'), fname)
@classmethod
def _save_to_disk(cls, fpath, data):
logger.debug('Saving quotes to a file: %s', fpath)
with open(fpath, 'wb') as f:
pickle.dump(data, f, pickle.HIGHEST_PROTOCOL)
@classmethod
def _load_from_disk(cls, fpath):
logger.debug('Loading quotes from a file: %s', fpath)
with open(fpath, 'rb') as f:
return pickle.load(f)
@classmethod
@timeit
def get_quotes(cls, symbol, date_from, date_to):
quotes = None
fpath = cls._get_file_path(symbol, cls.timeframe, date_from, date_to)
if os.path.exists(fpath):
quotes = Quotes.new(cls._load_from_disk(fpath))
else:
quotes_raw = cls._get(symbol, date_from, date_to)
quotes = Quotes.new(
quotes_raw, source=cls.source, default_tf=cls.default_tf
)
cls._save_to_disk(fpath, quotes)
return quotes
class YahooQuotesLoader(QuotesLoader):
source = 'yahoo'
@classmethod
def _get(cls, symbol, date_from, date_to):
return get_data_yahoo(symbol, date_from, date_to)
class QuandleQuotesLoader(QuotesLoader):
source = 'quandle'
@classmethod
def _get(cls, symbol, date_from, date_to):
quotes = get_data_quandl(symbol, date_from, date_to)
quotes.sort_index(inplace=True)
return quotes
class AlphaVantageQuotesLoader(QuotesLoader):
source = 'alphavantage'
api_key = 'demo'
@classmethod
def _get(cls, symbol, date_from, date_to):
quotes = get_data_alphavantage(
symbol, date_from, date_to, api_key=cls.api_key
)
return quotes
class StooqQuotesLoader(QuotesLoader):
source = 'stooq'
sort_index = True
default_tf = 1440
class IEXQuotesLoader(QuotesLoader):
source = 'iex'
@classmethod
def _get(cls, symbol, date_from, date_to):
quotes = web.DataReader(
symbol, cls.source, start=date_from, end=date_to
)
quotes['Date'] = pd.to_datetime(quotes.index)
return quotes
class RobinhoodQuotesLoader(QuotesLoader):
source = 'robinhood'
def get_symbols():
fpath = os.path.join(get_data_path('stock_data'), 'symbols.qdom')
if os.path.exists(fpath):
with open(fpath, 'rb') as f:
symbols = pickle.load(f)
else:
symbols = get_nasdaq_symbols()
symbols.reset_index(inplace=True)
with open(fpath, 'wb') as f:
pickle.dump(symbols, f, pickle.HIGHEST_PROTOCOL)
return symbols
def get_quotes(*args, **kwargs):
quotes = []
# don't work:
# GoogleQuotesLoader, QuandleQuotesLoader,
# AlphaVantageQuotesLoader, RobinhoodQuotesLoader
loaders = [YahooQuotesLoader, IEXQuotesLoader, StooqQuotesLoader]
while loaders:
loader = loaders.pop(0)
try:
quotes = loader.get_quotes(*args, **kwargs)
break
except (RemoteDataError, ImmediateDeprecationError) as e:
logger.error('get_quotes => error: %r', e)
return quotes

View File

@ -0,0 +1,350 @@
"""Performance."""
import codecs
import json
from collections import OrderedDict, defaultdict
import numpy as np
from .base import Quotes
from .const import ANNUAL_PERIOD
from .utils import fromtimestamp, get_resource_path
__all__ = (
'BriefPerformance',
'Performance',
'Stats',
'REPORT_COLUMNS',
'REPORT_ROWS',
)
REPORT_COLUMNS = ('All', 'Long', 'Short', 'Market')
with codecs.open(
get_resource_path('report_rows.json'), mode='r', encoding='utf-8'
) as f:
REPORT_ROWS = OrderedDict(json.load(f))
class Stats(np.recarray):
def __new__(cls, positions, shape=None, dtype=None, order='C'):
shape = shape or (len(positions['All']),)
dtype = np.dtype(
[
('type', object),
('symbol', object),
('volume', float),
('open_time', float),
('close_time', float),
('open_price', float),
('close_price', float),
('total_profit', float),
('entry_name', object),
('exit_name', object),
('status', object),
('comment', object),
('abs', float),
('perc', float),
('bars', float),
('on_bar', float),
('mae', float),
('mfe', float),
]
)
dt = [(col, dtype) for col in REPORT_COLUMNS]
return np.ndarray.__new__(cls, shape, (np.record, dt), order=order)
def __init__(self, positions, **kwargs):
for col, _positions in positions.items():
for i, p in enumerate(_positions):
self._add_position(p, col, i)
def _add_position(self, p, col, i):
self[col][i].type = p.type
self[col][i].symbol = p.symbol
self[col][i].volume = p.volume
self[col][i].open_time = p.open_time
self[col][i].close_time = p.close_time
self[col][i].open_price = p.open_price
self[col][i].close_price = p.close_price
self[col][i].total_profit = p.total_profit
self[col][i].entry_name = p.entry_name
self[col][i].exit_name = p.exit_name
self[col][i].status = p.status
self[col][i].comment = p.comment
self[col][i].abs = p.profit
self[col][i].perc = p.profit_perc
quotes_on_trade = Quotes[p.id_bar_open : p.id_bar_close]
if not quotes_on_trade.size:
# if position was opened and closed on the last bar
quotes_on_trade = Quotes[p.id_bar_open : p.id_bar_close + 1]
kwargs = {
'low': quotes_on_trade.low.min(),
'high': quotes_on_trade.high.max(),
}
self[col][i].mae = p.calc_mae(**kwargs)
self[col][i].mfe = p.calc_mfe(**kwargs)
bars = p.id_bar_close - p.id_bar_open
self[col][i].bars = bars
self[col][i].on_bar = p.profit_perc / bars
class BriefPerformance(np.recarray):
def __new__(cls, shape=None, dtype=None, order='C'):
dt = np.dtype(
[
('kwargs', object),
('net_profit_abs', float),
('net_profit_perc', float),
('year_profit', float),
('win_average_profit_perc', float),
('loss_average_profit_perc', float),
('max_drawdown_abs', float),
('total_trades', int),
('win_trades_abs', int),
('win_trades_perc', float),
('profit_factor', float),
('recovery_factor', float),
('payoff_ratio', float),
]
)
shape = shape or (1,)
return np.ndarray.__new__(cls, shape, (np.record, dt), order=order)
def _days_count(self, positions):
if hasattr(self, 'days'):
return self.days
self.days = (
(
fromtimestamp(positions[-1].close_time)
- fromtimestamp(positions[0].open_time)
).days
if positions
else 1
)
return self.days
def add(self, initial_balance, positions, i, kwargs):
position_count = len(positions)
profit = np.recarray(
(position_count,), dtype=[('abs', float), ('perc', float)]
)
for n, position in enumerate(positions):
profit[n].abs = position.profit
profit[n].perc = position.profit_perc
s = self[i]
s.kwargs = kwargs
s.net_profit_abs = np.sum(profit.abs)
s.net_profit_perc = np.sum(profit.perc)
days = self._days_count(positions)
gain_factor = (s.net_profit_abs + initial_balance) / initial_balance
s.year_profit = (gain_factor ** (365 / days) - 1) * 100
s.win_average_profit_perc = np.mean(profit.perc[profit.perc > 0])
s.loss_average_profit_perc = np.mean(profit.perc[profit.perc < 0])
s.max_drawdown_abs = profit.abs.min()
s.total_trades = position_count
wins = profit.abs[profit.abs > 0]
loss = profit.abs[profit.abs < 0]
s.win_trades_abs = len(wins)
s.win_trades_perc = round(s.win_trades_abs / s.total_trades * 100, 2)
s.profit_factor = abs(np.sum(wins) / np.sum(loss))
s.recovery_factor = abs(s.net_profit_abs / s.max_drawdown_abs)
s.payoff_ratio = abs(np.mean(wins) / np.mean(loss))
class Performance:
"""Performance Metrics."""
rows = REPORT_ROWS
columns = REPORT_COLUMNS
def __init__(self, initial_balance, stats, positions):
self._data = {}
for col in self.columns:
column = type('Column', (object,), dict.fromkeys(self.rows, 0))
column.initial_balance = initial_balance
self._data[col] = column
self.calculate(column, stats[col], positions[col])
def __getitem__(self, col):
return self._data[col]
def _calc_trade_series(self, col, positions):
win_in_series, loss_in_series = 0, 0
for i, p in enumerate(positions):
if p.profit >= 0:
win_in_series += 1
loss_in_series = 0
if win_in_series > col.win_in_series:
col.win_in_series = win_in_series
else:
win_in_series = 0
loss_in_series += 1
if loss_in_series > col.loss_in_series:
col.loss_in_series = loss_in_series
def calculate(self, col, stats, positions):
self._calc_trade_series(col, positions)
col.total_trades = len(positions)
profit_abs = stats[np.flatnonzero(stats.abs)].abs
profit_perc = stats[np.flatnonzero(stats.perc)].perc
bars = stats[np.flatnonzero(stats.bars)].bars
on_bar = stats[np.flatnonzero(stats.on_bar)].on_bar
gt_zero_abs = stats[stats.abs > 0].abs
gt_zero_perc = stats[stats.perc > 0].perc
win_bars = stats[stats.perc > 0].bars
lt_zero_abs = stats[stats.abs < 0].abs
lt_zero_perc = stats[stats.perc < 0].perc
los_bars = stats[stats.perc < 0].bars
col.average_profit_abs = np.mean(profit_abs) if profit_abs.size else 0
col.average_profit_perc = (
np.mean(profit_perc) if profit_perc.size else 0
)
col.bars_on_trade = np.mean(bars) if bars.size else 0
col.bar_profit = np.mean(on_bar) if on_bar.size else 0
col.win_average_profit_abs = (
np.mean(gt_zero_abs) if gt_zero_abs.size else 0
)
col.win_average_profit_perc = (
np.mean(gt_zero_perc) if gt_zero_perc.size else 0
)
col.win_bars_on_trade = np.mean(win_bars) if win_bars.size else 0
col.loss_average_profit_abs = (
np.mean(lt_zero_abs) if lt_zero_abs.size else 0
)
col.loss_average_profit_perc = (
np.mean(lt_zero_perc) if lt_zero_perc.size else 0
)
col.loss_bars_on_trade = np.mean(los_bars) if los_bars.size else 0
col.win_trades_abs = len(gt_zero_abs)
col.win_trades_perc = (
round(col.win_trades_abs / col.total_trades * 100, 2)
if col.total_trades
else 0
)
col.loss_trades_abs = len(lt_zero_abs)
col.loss_trades_perc = (
round(col.loss_trades_abs / col.total_trades * 100, 2)
if col.total_trades
else 0
)
col.total_profit = np.sum(gt_zero_abs)
col.total_loss = np.sum(lt_zero_abs)
col.net_profit_abs = np.sum(stats.abs)
col.net_profit_perc = np.sum(stats.perc)
col.total_mae = np.sum(stats.mae)
col.total_mfe = np.sum(stats.mfe)
# https://financial-calculators.com/roi-calculator
days = (
(
fromtimestamp(positions[-1].close_time)
- fromtimestamp(positions[0].open_time)
).days
if positions
else 1
)
gain_factor = (
col.net_profit_abs + col.initial_balance
) / col.initial_balance
col.year_profit = (gain_factor ** (365 / days) - 1) * 100
col.month_profit = (gain_factor ** (365 / days / 12) - 1) * 100
col.max_profit_abs = stats.abs.max()
col.max_profit_perc = stats.perc.max()
col.max_profit_abs_day = fromtimestamp(
stats.close_time[stats.abs == col.max_profit_abs][0]
)
col.max_profit_perc_day = fromtimestamp(
stats.close_time[stats.perc == col.max_profit_perc][0]
)
col.max_drawdown_abs = stats.abs.min()
col.max_drawdown_perc = stats.perc.min()
col.max_drawdown_abs_day = fromtimestamp(
stats.close_time[stats.abs == col.max_drawdown_abs][0]
)
col.max_drawdown_perc_day = fromtimestamp(
stats.close_time[stats.perc == col.max_drawdown_perc][0]
)
col.profit_factor = (
abs(col.total_profit / col.total_loss) if col.total_loss else 0
)
col.recovery_factor = (
abs(col.net_profit_abs / col.max_drawdown_abs)
if col.max_drawdown_abs
else 0
)
col.payoff_ratio = (
abs(col.win_average_profit_abs / col.loss_average_profit_abs)
if col.loss_average_profit_abs
else 0
)
col.sharpe_ratio = annualized_sharpe_ratio(stats)
col.sortino_ratio = annualized_sortino_ratio(stats)
# TODO:
col.alpha_ratio = np.nan
col.beta_ratio = np.nan
def day_percentage_returns(stats):
days = defaultdict(float)
trade_count = np.count_nonzero(stats)
if trade_count == 1:
# market position, so returns should based on quotes
# calculate percentage changes on a list of quotes
changes = np.diff(Quotes.close) / Quotes[:-1].close * 100
data = np.column_stack((Quotes[1:].time, changes)) # np.c_
else:
# slice `:trade_count` to exclude zero values in long/short columns
data = stats[['close_time', 'perc']][:trade_count]
# FIXME: [FutureWarning] https://github.com/numpy/numpy/issues/8383
for close_time, perc in data:
days[fromtimestamp(close_time).date()] += perc
returns = np.array(list(days.values()))
# if np.count_nonzero(stats) == 1:
# import pudb; pudb.set_trace()
if len(returns) >= ANNUAL_PERIOD:
return returns
_returns = np.zeros(ANNUAL_PERIOD)
_returns[: len(returns)] = returns
return _returns
def annualized_sharpe_ratio(stats):
# risk_free = 0
returns = day_percentage_returns(stats)
return np.sqrt(ANNUAL_PERIOD) * np.mean(returns) / np.std(returns)
def annualized_sortino_ratio(stats):
# http://www.cmegroup.com/education/files/sortino-a-sharper-ratio.pdf
required_return = 0
returns = day_percentage_returns(stats)
mask = [returns < required_return]
tdd = np.zeros(len(returns))
tdd[mask] = returns[mask] # keep only negative values and zeros
# "or 1" to prevent division by zero, if we don't have negative returns
tdd = np.sqrt(np.mean(np.square(tdd))) or 1
return np.sqrt(ANNUAL_PERIOD) * np.mean(returns) / tdd

View File

@ -0,0 +1,410 @@
"""Portfolio."""
import itertools
from contextlib import contextmanager
from enum import Enum, auto
import numpy as np
from .base import Quotes
from .performance import BriefPerformance, Performance, Stats
from .utils import fromtimestamp, timeit
__all__ = ('Portfolio', 'Position', 'Order')
class BasePortfolio:
def __init__(self, balance=100_000, leverage=5):
self._initial_balance = balance
self.balance = balance
self.equity = None
# TODO:
# self.cash
# self.currency
self.leverage = leverage
self.positions = []
self.balance_curve = None
self.equity_curve = None
self.long_curve = None
self.short_curve = None
self.mae_curve = None
self.mfe_curve = None
self.stats = None
self.performance = None
self.brief_performance = None
def clear(self):
self.positions.clear()
self.balance = self._initial_balance
@property
def initial_balance(self):
return self._initial_balance
@initial_balance.setter
def initial_balance(self, value):
self._initial_balance = value
def add_position(self, position):
position.ticket = len(self.positions) + 1
self.positions.append(position)
def position_count(self, tp=None):
if tp == Order.BUY:
return len([p for p in self.positions if p.type == Order.BUY])
elif tp == Order.SELL:
return len([p for p in self.positions if p.type == Order.SELL])
return len(self.positions)
def _close_open_positions(self):
for p in self.positions:
if p.status == Position.OPEN:
p.close(
price=Quotes[-1].open, volume=p.volume, time=Quotes[-1].time
)
def _get_market_position(self):
p = self.positions[0] # real postions
p = Position(
symbol=p.symbol,
ptype=Order.BUY,
volume=p.volume,
price=Quotes[0].open,
open_time=Quotes[0].time,
close_price=Quotes[-1].close,
close_time=Quotes[-1].time,
id_bar_close=len(Quotes) - 1,
status=Position.CLOSED,
)
p.profit = p.calc_profit(close_price=Quotes[-1].close)
p.profit_perc = p.profit / self._initial_balance * 100
return p
def _calc_equity_curve(self):
"""Equity curve."""
self.equity_curve = np.zeros_like(Quotes.time)
for i, p in enumerate(self.positions):
balance = np.sum(self.stats['All'][:i].abs)
for ibar in range(p.id_bar_open, p.id_bar_close):
profit = p.calc_profit(close_price=Quotes[ibar].close)
self.equity_curve[ibar] = balance + profit
# taking into account the real balance after the last trade
self.equity_curve[-1] = self.balance_curve[-1]
def _calc_buy_and_hold_curve(self):
"""Buy and Hold."""
p = self._get_market_position()
self.buy_and_hold_curve = np.array(
[p.calc_profit(close_price=price) for price in Quotes.close]
)
def _calc_long_short_curves(self):
"""Only Long/Short positions curve."""
self.long_curve = np.zeros_like(Quotes.time)
self.short_curve = np.zeros_like(Quotes.time)
for i, p in enumerate(self.positions):
if p.type == Order.BUY:
name = 'Long'
curve = self.long_curve
else:
name = 'Short'
curve = self.short_curve
balance = np.sum(self.stats[name][:i].abs)
# Calculate equity for this position
for ibar in range(p.id_bar_open, p.id_bar_close):
profit = p.calc_profit(close_price=Quotes[ibar].close)
curve[ibar] = balance + profit
for name, curve in [
('Long', self.long_curve),
('Short', self.short_curve),
]:
curve[:] = fill_zeros_with_last(curve)
# taking into account the real balance after the last trade
curve[-1] = np.sum(self.stats[name].abs)
def _calc_curves(self):
self.mae_curve = np.cumsum(self.stats['All'].mae)
self.mfe_curve = np.cumsum(self.stats['All'].mfe)
self.balance_curve = np.cumsum(self.stats['All'].abs)
self._calc_equity_curve()
self._calc_buy_and_hold_curve()
self._calc_long_short_curves()
@contextmanager
def optimization_mode(self):
"""Backup and restore current balance and positions."""
# mode='general',
self.backup_balance = self.balance
self.backup_positions = self.positions.copy()
self.balance = self._initial_balance
self.positions.clear()
yield
self.balance = self.backup_balance
self.positions = self.backup_positions.copy()
self.backup_positions.clear()
@timeit
def run_optimization(self, strategy, params):
keys = list(params.keys())
vals = list(params.values())
variants = list(itertools.product(*vals))
self.brief_performance = BriefPerformance(shape=(len(variants),))
with self.optimization_mode():
for i, vals in enumerate(variants):
kwargs = {keys[n]: val for n, val in enumerate(vals)}
strategy.start(**kwargs)
self._close_open_positions()
self.brief_performance.add(
self._initial_balance, self.positions, i, kwargs
)
self.clear()
@timeit
def summarize(self):
self._close_open_positions()
positions = {
'All': self.positions,
'Long': [p for p in self.positions if p.type == Order.BUY],
'Short': [p for p in self.positions if p.type == Order.SELL],
'Market': [self._get_market_position()],
}
self.stats = Stats(positions)
self.performance = Performance(
self._initial_balance, self.stats, positions
)
self._calc_curves()
Portfolio = BasePortfolio()
class PositionStatus(Enum):
OPEN = auto()
CLOSED = auto()
CANCELED = auto()
class Position:
OPEN = PositionStatus.OPEN
CLOSED = PositionStatus.CLOSED
CANCELED = PositionStatus.CANCELED
__slots__ = (
'type',
'symbol',
'ticket',
'open_price',
'close_price',
'open_time',
'close_time',
'volume',
'sl',
'tp',
'status',
'profit',
'profit_perc',
'commis',
'id_bar_open',
'id_bar_close',
'entry_name',
'exit_name',
'total_profit',
'comment',
)
def __init__(
self,
symbol,
ptype,
price,
volume,
open_time,
sl=None,
tp=None,
status=OPEN,
entry_name='',
exit_name='',
comment='',
**kwargs,
):
self.type = ptype
self.symbol = symbol
self.ticket = None
self.open_price = price
self.close_price = None
self.open_time = open_time
self.close_time = None
self.volume = volume
self.sl = sl
self.tp = tp
self.status = status
self.profit = None
self.profit_perc = None
self.commis = None
self.id_bar_open = np.where(Quotes.time == self.open_time)[0][0]
self.id_bar_close = None
self.entry_name = entry_name
self.exit_name = exit_name
self.total_profit = 0
self.comment = comment
# self.bars_on_trade = None
# self.is_profitable = False
for k, v in kwargs.items():
setattr(self, k, v)
def __repr__(self):
_type = 'LONG' if self.type == Order.BUY else 'SHORT'
time = fromtimestamp(self.open_time).strftime('%d.%m.%y %H:%M')
return '%s/%s/[%s - %.4f]' % (
self.status.name,
_type,
time,
self.open_price,
)
def close(self, price, time, volume=None):
# TODO: allow closing only part of the volume
self.close_price = price
self.close_time = time
self.id_bar_close = np.where(Quotes.time == self.close_time)[0][0]
self.profit = self.calc_profit(volume=volume or self.volume)
self.profit_perc = self.profit / Portfolio.balance * 100
Portfolio.balance += self.profit
self.total_profit = Portfolio.balance - Portfolio.initial_balance
self.status = self.CLOSED
def calc_profit(self, volume=None, close_price=None):
# TODO: rewrite it
close_price = close_price or self.close_price
volume = volume or self.volume
factor = 1 if self.type == Order.BUY else -1
price_delta = (close_price - self.open_price) * factor
if self.symbol.mode in [self.symbol.FOREX, self.symbol.CFD]:
# Margin: Lots*Contract_Size/Leverage
if (
self.symbol.mode == self.symbol.FOREX
and self.symbol.ticker[:3] == 'USD'
):
# Example: 'USD/JPY'
# Прибыль Размер Объем Текущий
# в пунктах пункта позиции курс
# 1 * 0.0001 * 100000 / 1.00770
# USD/CHF: 1*0.0001*100000/1.00770 = $9.92
# 0.01
# USD/JPY: 1*0.01*100000/121.35 = $8.24
# (1.00770-1.00595)/0.0001 = 17.5 пунктов
# (1.00770-1.00595)/0.0001*0.0001*100000*1/1.00770*1
_points = price_delta / self.symbol.tick_size
_profit = (
_points
* self.symbol.tick_size
* self.symbol.contract_size
/ close_price
* volume
)
elif (
self.symbol.mode == self.symbol.FOREX
and self.symbol.ticker[-3:] == 'USD'
):
# Example: 'EUR/USD'
# Profit: (close_price-open_price)*Contract_Size*Lots
# EUR/USD BUY: (1.05875-1.05850)*100000*1 = +$25 (без комиссии)
_profit = price_delta * self.symbol.contract_size * volume
else:
# Cross rates. Example: 'GBP/CHF'
# Цена пункта =
# объем поз.*размер п.*тек.курс баз.вал. к USD/тек. кросс-курс
# GBP/CHF: 100000*0.0001*1.48140/1.48985 = $9.94
# TODO: temporary patch (same as the previous choice) -
# in the future connect to some quotes provider and get rates
_profit = price_delta * self.symbol.contract_size * volume
elif self.symbol.mode == self.symbol.FUTURES:
# Margin: Lots *InitialMargin*Percentage/100
# Profit: (close_price-open_price)*TickPrice/TickSize*Lots
# CL BUY: (46.35-46.30)*10/0.01*1 = $50 (без учета комиссии!)
# EuroFX(6E) BUY:(1.05875-1.05850)*12.50/0.0001*1 =$31.25 (без ком)
# RTS (RIH5) BUY:(84510-84500)*12.26506/10*1 = @12.26506 (без ком)
# E-miniSP500 BUY:(2065.95-2065.25)*12.50/0.25 = $35 (без ком)
# http://americanclearing.ru/specifications.php
# http://www.moex.com/ru/contract.aspx?code=RTS-3.18
# http://www.cmegroup.com/trading/equity-index/us-index/e-mini-sandp500_contract_specifications.html
_profit = (
price_delta
* self.symbol.tick_value
/ self.symbol.tick_size
* volume
)
else:
# shares
_profit = price_delta * volume
return _profit
def calc_mae(self, low, high):
"""Return [MAE] Maximum Adverse Excursion."""
if self.type == Order.BUY:
return self.calc_profit(close_price=low)
return self.calc_profit(close_price=high)
def calc_mfe(self, low, high):
"""Return [MFE] Maximum Favorable Excursion."""
if self.type == Order.BUY:
return self.calc_profit(close_price=high)
return self.calc_profit(close_price=low)
class OrderType(Enum):
BUY = auto()
SELL = auto()
BUY_LIMIT = auto()
SELL_LIMIT = auto()
BUY_STOP = auto()
SELL_STOP = auto()
class Order:
BUY = OrderType.BUY
SELL = OrderType.SELL
BUY_LIMIT = OrderType.BUY_LIMIT
SELL_LIMIT = OrderType.SELL_LIMIT
BUY_STOP = OrderType.BUY_STOP
SELL_STOP = OrderType.SELL_STOP
@staticmethod
def open(symbol, otype, price, volume, time, sl=None, tp=None):
# TODO: add margin calculation
# and if the margin is not enough - do not open the position
position = Position(
symbol=symbol,
ptype=otype,
price=price,
volume=volume,
open_time=time,
sl=sl,
tp=tp,
)
Portfolio.add_position(position)
return position
@staticmethod
def close(position, price, time, volume=None):
# FIXME: may be closed not the whole volume, but
# the position status will be changed to CLOSED
position.close(price=price, time=time, volume=volume)
def fill_zeros_with_last(arr):
"""Fill empty(zero) elements (between positions)."""
index = np.arange(len(arr))
index[arr == 0] = 0
index = np.maximum.accumulate(index)
return arr[index]

View File

@ -0,0 +1,82 @@
"""Utils."""
import importlib.util
import inspect
import logging
import os
import os.path
import sys
import time
from datetime import datetime
from functools import wraps
from PyQt5 import QtCore
__all__ = (
'BASE_DIR',
'Settings',
'timeit',
'fromtimestamp',
'get_data_path',
'get_resource_path',
'strategies_from_file',
)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def get_data_path(path=''):
data_path = QtCore.QStandardPaths.writableLocation(
QtCore.QStandardPaths.AppDataLocation
)
data_path = os.path.join(data_path, path)
os.makedirs(data_path, mode=0o755, exist_ok=True)
return data_path
def get_resource_path(relative_path):
# PyInstaller creates a temp folder and stores path in _MEIPASS
base_path = getattr(sys, '_MEIPASS', BASE_DIR)
return os.path.join(base_path, relative_path)
config_path = os.path.join(get_data_path(), 'Quantdom', 'config.ini')
Settings = QtCore.QSettings(config_path, QtCore.QSettings.IniFormat)
def timeit(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
t = time.time()
res = fn(*args, **kwargs)
logger = logging.getLogger('runtime')
logger.debug(
'%s.%s: %.4f sec'
% (fn.__module__, fn.__qualname__, time.time() - t)
)
return res
return wrapper
def fromtimestamp(timestamp):
if timestamp == 0:
# on Win zero timestamp cause error
return datetime(1970, 1, 1)
return datetime.fromtimestamp(timestamp)
def strategies_from_file(filepath):
from .strategy import AbstractStrategy
spec = importlib.util.spec_from_file_location('Strategy', filepath)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
is_strategy = lambda _class: ( # noqa:E731
inspect.isclass(_class)
and issubclass(_class, AbstractStrategy)
and _class.__name__ != 'AbstractStrategy'
)
return [_class for _, _class in inspect.getmembers(module, is_strategy)]