Compare commits
12 Commits
29ce8de462
...
b9af6176c5
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | b9af6176c5 | |
Tyler Goodlet | dd0167b9a5 | |
Tyler Goodlet | 9e71e0768f | |
Tyler Goodlet | 6029f39a3f | |
Tyler Goodlet | 656e2c6a88 | |
Tyler Goodlet | b8065a413b | |
Tyler Goodlet | 9245d24b47 | |
Tyler Goodlet | 22bd83943b | |
Tyler Goodlet | b94931bbdd | |
Tyler Goodlet | 239c1c457e | |
Tyler Goodlet | 24a54a7085 | |
Tyler Goodlet | ebd1eb114e |
|
@ -117,9 +117,57 @@ SecondFactorDevice=
|
||||||
|
|
||||||
# If you use the IBKR Mobile app for second factor authentication,
|
# If you use the IBKR Mobile app for second factor authentication,
|
||||||
# and you fail to complete the process before the time limit imposed
|
# and you fail to complete the process before the time limit imposed
|
||||||
# by IBKR, you can use this setting to tell IBC to exit: arrangements
|
# by IBKR, this setting tells IBC whether to automatically restart
|
||||||
# can then be made to automatically restart IBC in order to initiate
|
# the login sequence, giving you another opportunity to complete
|
||||||
# the login sequence afresh. Otherwise, manual intervention at TWS's
|
# second factor authentication.
|
||||||
|
#
|
||||||
|
# Permitted values are 'yes' and 'no'.
|
||||||
|
#
|
||||||
|
# If this setting is not present or has no value, then the value
|
||||||
|
# of the deprecated ExitAfterSecondFactorAuthenticationTimeout is
|
||||||
|
# used instead. If this also has no value, then this setting defaults
|
||||||
|
# to 'no'.
|
||||||
|
#
|
||||||
|
# NB: you must be using IBC v3.14.0 or later to use this setting:
|
||||||
|
# earlier versions ignore it.
|
||||||
|
|
||||||
|
ReloginAfterSecondFactorAuthenticationTimeout=
|
||||||
|
|
||||||
|
|
||||||
|
# This setting is only relevant if
|
||||||
|
# ReloginAfterSecondFactorAuthenticationTimeout is set to 'yes',
|
||||||
|
# or if ExitAfterSecondFactorAuthenticationTimeout is set to 'yes'.
|
||||||
|
#
|
||||||
|
# It controls how long (in seconds) IBC waits for login to complete
|
||||||
|
# after the user acknowledges the second factor authentication
|
||||||
|
# alert at the IBKR Mobile app. If login has not completed after
|
||||||
|
# this time, IBC terminates.
|
||||||
|
# The default value is 60.
|
||||||
|
|
||||||
|
SecondFactorAuthenticationExitInterval=
|
||||||
|
|
||||||
|
|
||||||
|
# This setting specifies the timeout for second factor authentication
|
||||||
|
# imposed by IB. The value is in seconds. You should not change this
|
||||||
|
# setting unless you have reason to believe that IB has changed the
|
||||||
|
# timeout. The default value is 180.
|
||||||
|
|
||||||
|
SecondFactorAuthenticationTimeout=180
|
||||||
|
|
||||||
|
|
||||||
|
# DEPRECATED SETTING
|
||||||
|
# ------------------
|
||||||
|
#
|
||||||
|
# ExitAfterSecondFactorAuthenticationTimeout - THIS SETTING WILL BE
|
||||||
|
# REMOVED IN A FUTURE RELEASE. For IBC version 3.14.0 and later, see
|
||||||
|
# the notes for ReloginAfterSecondFactorAuthenticationTimeout above.
|
||||||
|
#
|
||||||
|
# For IBC versions earlier than 3.14.0: If you use the IBKR Mobile
|
||||||
|
# app for second factor authentication, and you fail to complete the
|
||||||
|
# process before the time limit imposed by IBKR, you can use this
|
||||||
|
# setting to tell IBC to exit: arrangements can then be made to
|
||||||
|
# automatically restart IBC in order to initiate the login sequence
|
||||||
|
# afresh. Otherwise, manual intervention at TWS's
|
||||||
# Second Factor Authentication dialog is needed to complete the
|
# Second Factor Authentication dialog is needed to complete the
|
||||||
# login.
|
# login.
|
||||||
#
|
#
|
||||||
|
@ -132,29 +180,18 @@ SecondFactorDevice=
|
||||||
ExitAfterSecondFactorAuthenticationTimeout=no
|
ExitAfterSecondFactorAuthenticationTimeout=no
|
||||||
|
|
||||||
|
|
||||||
# This setting is only relevant if
|
|
||||||
# ExitAfterSecondFactorAuthenticationTimeout is set to 'yes'.
|
|
||||||
#
|
|
||||||
# It controls how long (in seconds) IBC waits for login to complete
|
|
||||||
# after the user acknowledges the second factor authentication
|
|
||||||
# alert at the IBKR Mobile app. If login has not completed after
|
|
||||||
# this time, IBC terminates.
|
|
||||||
# The default value is 40.
|
|
||||||
|
|
||||||
SecondFactorAuthenticationExitInterval=
|
|
||||||
|
|
||||||
|
|
||||||
# Trading Mode
|
# Trading Mode
|
||||||
# ------------
|
# ------------
|
||||||
#
|
#
|
||||||
# TWS 955 introduced a new Trading Mode combo box on its login
|
# This indicates whether the live account or the paper trading
|
||||||
# dialog. This indicates whether the live account or the paper
|
# account corresponding to the supplied credentials is to be used.
|
||||||
# trading account corresponding to the supplied credentials is
|
# The allowed values are 'live' (the default) and 'paper'.
|
||||||
# to be used. The allowed values are 'live' (the default) and
|
#
|
||||||
# 'paper'. For earlier versions of TWS this setting has no
|
# If this is set to 'live', then the credentials for the live
|
||||||
# effect.
|
# account must be supplied. If it is set to 'paper', then either
|
||||||
|
# the live or the paper-trading credentials may be supplied.
|
||||||
|
|
||||||
TradingMode=
|
TradingMode=paper
|
||||||
|
|
||||||
|
|
||||||
# Paper-trading Account Warning
|
# Paper-trading Account Warning
|
||||||
|
@ -188,7 +225,7 @@ AcceptNonBrokerageAccountWarning=yes
|
||||||
#
|
#
|
||||||
# The default value is 60.
|
# The default value is 60.
|
||||||
|
|
||||||
LoginDialogDisplayTimeout=20
|
LoginDialogDisplayTimeout=60
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -217,7 +254,15 @@ LoginDialogDisplayTimeout=20
|
||||||
# but they are acceptable.
|
# but they are acceptable.
|
||||||
#
|
#
|
||||||
# The default is the current working directory when IBC is
|
# The default is the current working directory when IBC is
|
||||||
# started.
|
# started, unless the TWS_SETTINGS_PATH setting in the relevant
|
||||||
|
# start script is set.
|
||||||
|
#
|
||||||
|
# If both this setting and TWS_SETTINGS_PATH are set, then this
|
||||||
|
# setting takes priority. Note that if they have different values,
|
||||||
|
# auto-restart will not work.
|
||||||
|
#
|
||||||
|
# NB: this setting is now DEPRECATED. You should use the
|
||||||
|
# TWS_SETTINGS_PATH setting in the relevant start script.
|
||||||
|
|
||||||
IbDir=/root/Jts
|
IbDir=/root/Jts
|
||||||
|
|
||||||
|
@ -284,15 +329,32 @@ ExistingSessionDetectedAction=primary
|
||||||
# Override TWS API Port Number
|
# Override TWS API Port Number
|
||||||
# ----------------------------
|
# ----------------------------
|
||||||
#
|
#
|
||||||
# If OverrideTwsApiPort is set to an integer, IBC changes the
|
# If OverrideTwsApiPort is set to an integer, IBC changes the
|
||||||
# 'Socket port' in TWS's API configuration to that number shortly
|
# 'Socket port' in TWS's API configuration to that number shortly
|
||||||
# after startup. Leaving the setting blank will make no change to
|
# after startup (but note that for the FIX Gateway, this setting is
|
||||||
# the current setting. This setting is only intended for use in
|
# actually stored in jts.ini rather than the Gateway's settings
|
||||||
# certain specialized situations where the port number needs to
|
# file). Leaving the setting blank will make no change to
|
||||||
|
# the current setting. This setting is only intended for use in
|
||||||
|
# certain specialized situations where the port number needs to
|
||||||
|
# be set dynamically at run-time, and for the FIX Gateway: most
|
||||||
|
# non-FIX users will never need it, so don't use it unless you know
|
||||||
|
# you need it.
|
||||||
|
|
||||||
|
OverrideTwsApiPort=4000
|
||||||
|
|
||||||
|
|
||||||
|
# Override TWS Master Client ID
|
||||||
|
# -----------------------------
|
||||||
|
#
|
||||||
|
# If OverrideTwsMasterClientID is set to an integer, IBC changes the
|
||||||
|
# 'Master Client ID' value in TWS's API configuration to that
|
||||||
|
# value shortly after startup. Leaving the setting blank will make
|
||||||
|
# no change to the current setting. This setting is only intended
|
||||||
|
# for use in certain specialized situations where the value needs to
|
||||||
# be set dynamically at run-time: most users will never need it,
|
# be set dynamically at run-time: most users will never need it,
|
||||||
# so don't use it unless you know you need it.
|
# so don't use it unless you know you need it.
|
||||||
|
|
||||||
; OverrideTwsApiPort=4002
|
OverrideTwsMasterClientID=
|
||||||
|
|
||||||
|
|
||||||
# Read-only Login
|
# Read-only Login
|
||||||
|
@ -302,11 +364,13 @@ ExistingSessionDetectedAction=primary
|
||||||
# account security programme, the user will not be asked to perform
|
# account security programme, the user will not be asked to perform
|
||||||
# the second factor authentication action, and login to TWS will
|
# the second factor authentication action, and login to TWS will
|
||||||
# occur automatically in read-only mode: in this mode, placing or
|
# occur automatically in read-only mode: in this mode, placing or
|
||||||
# managing orders is not allowed. If set to 'no', and the user is
|
# managing orders is not allowed.
|
||||||
# enrolled in IB's account security programme, the user must perform
|
#
|
||||||
# the relevant second factor authentication action to complete the
|
# If set to 'no', and the user is enrolled in IB's account security
|
||||||
# login.
|
# programme, the second factor authentication process is handled
|
||||||
|
# according to the Second Factor Authentication Settings described
|
||||||
|
# elsewhere in this file.
|
||||||
|
#
|
||||||
# If the user is not enrolled in IB's account security programme,
|
# If the user is not enrolled in IB's account security programme,
|
||||||
# this setting is ignored. The default is 'no'.
|
# this setting is ignored. The default is 'no'.
|
||||||
|
|
||||||
|
@ -326,7 +390,44 @@ ReadOnlyLogin=no
|
||||||
# set the relevant checkbox (this only needs to be done once) and
|
# set the relevant checkbox (this only needs to be done once) and
|
||||||
# not provide a value for this setting.
|
# not provide a value for this setting.
|
||||||
|
|
||||||
ReadOnlyApi=no
|
ReadOnlyApi=
|
||||||
|
|
||||||
|
|
||||||
|
# API Precautions
|
||||||
|
# ---------------
|
||||||
|
#
|
||||||
|
# These settings relate to the corresponding 'Precautions' checkboxes in the
|
||||||
|
# API section of the Global Configuration dialog.
|
||||||
|
#
|
||||||
|
# For all of these, the accepted values are:
|
||||||
|
# - 'yes' sets the checkbox
|
||||||
|
# - 'no' clears the checkbox
|
||||||
|
# - if not set, the existing TWS/Gateway configuration is unchanged
|
||||||
|
#
|
||||||
|
# NB: thess settings are really only supplied for the benefit of new TWS
|
||||||
|
# or Gateway instances that are being automatically installed and
|
||||||
|
# started without user intervention, or where user settings are not preserved
|
||||||
|
# between sessions (eg some Docker containers). Where a user is involved, they
|
||||||
|
# should use the Global Configuration to set the relevant checkboxes and not
|
||||||
|
# provide values for these settings.
|
||||||
|
|
||||||
|
BypassOrderPrecautions=
|
||||||
|
|
||||||
|
BypassBondWarning=
|
||||||
|
|
||||||
|
BypassNegativeYieldToWorstConfirmation=
|
||||||
|
|
||||||
|
BypassCalledBondWarning=
|
||||||
|
|
||||||
|
BypassSameActionPairTradeWarning=
|
||||||
|
|
||||||
|
BypassPriceBasedVolatilityRiskWarning=
|
||||||
|
|
||||||
|
BypassUSStocksMarketDataInSharesWarning=
|
||||||
|
|
||||||
|
BypassRedirectOrderWarning=
|
||||||
|
|
||||||
|
BypassNoOverfillProtectionPrecaution=
|
||||||
|
|
||||||
|
|
||||||
# Market data size for US stocks - lots or shares
|
# Market data size for US stocks - lots or shares
|
||||||
|
@ -381,54 +482,145 @@ AcceptBidAskLastSizeDisplayUpdateNotification=accept
|
||||||
SendMarketDataInLotsForUSstocks=
|
SendMarketDataInLotsForUSstocks=
|
||||||
|
|
||||||
|
|
||||||
|
# Trusted API Client IPs
|
||||||
|
# ----------------------
|
||||||
|
#
|
||||||
|
# NB: THIS SETTING IS ONLY RELEVANT FOR THE GATEWAY, AND ONLY WHEN FIX=yes.
|
||||||
|
# In all other cases it is ignored.
|
||||||
|
#
|
||||||
|
# This is a list of IP addresses separated by commas. API clients with IP
|
||||||
|
# addresses in this list are able to connect to the API without Gateway
|
||||||
|
# generating the 'Incoming connection' popup.
|
||||||
|
#
|
||||||
|
# Note that 127.0.0.1 is always permitted to connect, so do not include it
|
||||||
|
# in this setting.
|
||||||
|
|
||||||
|
TrustedTwsApiClientIPs=
|
||||||
|
|
||||||
|
|
||||||
|
# Reset Order ID Sequence
|
||||||
|
# -----------------------
|
||||||
|
#
|
||||||
|
# The setting resets the order id sequence for orders submitted via the API, so
|
||||||
|
# that the next invocation of the `NextValidId` API callback will return the
|
||||||
|
# value 1. The reset occurs when TWS starts.
|
||||||
|
#
|
||||||
|
# Note that order ids are reset for all API clients, except those that have
|
||||||
|
# outstanding (ie incomplete) orders: their order id sequence carries on as
|
||||||
|
# before.
|
||||||
|
#
|
||||||
|
# Valid values are 'yes', 'true', 'false' and 'no'. The default is 'no'.
|
||||||
|
|
||||||
|
ResetOrderIdsAtStart=
|
||||||
|
|
||||||
|
|
||||||
|
# This setting specifies IBC's action when TWS displays the dialog asking for
|
||||||
|
# confirmation of a request to reset the API order id sequence.
|
||||||
|
#
|
||||||
|
# Note that the Gateway never displays this dialog, so this setting is ignored
|
||||||
|
# for a Gateway session.
|
||||||
|
#
|
||||||
|
# Valid values consist of two strings separated by a solidus '/'. The first
|
||||||
|
# value specifies the action to take when the order id reset request resulted
|
||||||
|
# from setting ResetOrderIdsAtStart=yes. The second specifies the action to
|
||||||
|
# take when the order id reset request is a result of the user clicking the
|
||||||
|
# 'Reset API order ID sequence' button in the API configuration. Each value
|
||||||
|
# must be one of the following:
|
||||||
|
#
|
||||||
|
# 'confirm'
|
||||||
|
# order ids will be reset
|
||||||
|
#
|
||||||
|
# 'reject'
|
||||||
|
# order ids will not be reset
|
||||||
|
#
|
||||||
|
# 'ignore'
|
||||||
|
# IBC will ignore the dialog. The user must take action.
|
||||||
|
#
|
||||||
|
# The default setting is ignore/ignore
|
||||||
|
|
||||||
|
# Examples:
|
||||||
|
#
|
||||||
|
# 'confirm/reject' - confirm order id reset only if ResetOrderIdsAtStart=yes
|
||||||
|
# and reject any user-initiated requests
|
||||||
|
#
|
||||||
|
# 'ignore/confirm' - user must decide what to do if ResetOrderIdsAtStart=yes
|
||||||
|
# and confirm user-initiated requests
|
||||||
|
#
|
||||||
|
# 'reject/ignore' - reject order id reset if ResetOrderIdsAtStart=yes but
|
||||||
|
# allow user to handle user-initiated requests
|
||||||
|
|
||||||
|
ConfirmOrderIdReset=
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
# 4. TWS Auto-Closedown
|
# 4. TWS Auto-Logoff and Auto-Restart
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
#
|
#
|
||||||
# IMPORTANT NOTE: Starting with TWS 974, this setting no longer
|
# TWS and Gateway insist on being restarted every day. Two alternative
|
||||||
# works properly, because IB have changed the way TWS handles its
|
# automatic options are offered:
|
||||||
# autologoff mechanism.
|
|
||||||
#
|
#
|
||||||
# You should now configure the TWS autologoff time to something
|
# - Auto-Logoff: at a specified time, TWS shuts down tidily, without
|
||||||
# convenient for you, and restart IBC each day.
|
# restarting.
|
||||||
#
|
#
|
||||||
# Alternatively, discontinue use of IBC and use the auto-relogin
|
# - Auto-Restart: at a specified time, TWS shuts down and then restarts
|
||||||
# mechanism within TWS 974 and later versions (note that the
|
# without the user having to re-autheticate.
|
||||||
# auto-relogin mechanism provided by IB is not available if you
|
#
|
||||||
# use IBC).
|
# The normal way to configure the time at which this happens is via the Lock
|
||||||
|
# and Exit section of the Configuration dialog. Once this time has been
|
||||||
|
# configured in this way, the setting persists until the user changes it again.
|
||||||
|
#
|
||||||
|
# However, there are situations where there is no user available to do this
|
||||||
|
# configuration, or where there is no persistent storage (for example some
|
||||||
|
# Docker images). In such cases, the auto-restart or auto-logoff time can be
|
||||||
|
# set whenever IBC starts with the settings below.
|
||||||
|
#
|
||||||
|
# The value, if specified, must be a time in HH:MM AM/PM format, for example
|
||||||
|
# 08:00 AM or 10:00 PM. Note that there must be a single space between the
|
||||||
|
# two parts of this value; also that midnight is "12:00 AM" and midday is
|
||||||
|
# "12:00 PM".
|
||||||
|
#
|
||||||
|
# If no value is specified for either setting, the currently configured
|
||||||
|
# settings will apply. If a value is supplied for one setting, the other
|
||||||
|
# setting is cleared. If values are supplied for both settings, only the
|
||||||
|
# auto-restart time is set, and the auto-logoff time is cleared.
|
||||||
|
#
|
||||||
|
# Note that for a normal TWS/Gateway installation with persistent storage
|
||||||
|
# (for example on a desktop computer) the value will be persisted as if the
|
||||||
|
# user had set it via the configuration dialog.
|
||||||
|
#
|
||||||
|
# If you choose to auto-restart, you should take note of the considerations
|
||||||
|
# described at the link below. Note that where this information mentions
|
||||||
|
# 'manual authentication', restarting IBC will do the job (IBKR does not
|
||||||
|
# recognise the existence of IBC in its docuemntation).
|
||||||
|
#
|
||||||
|
# https://www.interactivebrokers.com/en/software/tws/twsguide.htm#usersguidebook/configuretws/auto_restart_info.htm
|
||||||
|
#
|
||||||
|
# If you use the "RESTART" command via the IBC command server, and IBC is
|
||||||
|
# running any version of the Gateway (or a version of TWS earlier than 1018),
|
||||||
|
# note that this will set the Auto-Restart time in Gateway/TWS's configuration
|
||||||
|
# dialog to the time at which the restart actually happens (which may be up to
|
||||||
|
# a minute after the RESTART command is issued). To prevent future auto-
|
||||||
|
# restarts at this time, you must make sure you have set AutoLogoffTime or
|
||||||
|
# AutoRestartTime to your desired value before running IBC. NB: this does not
|
||||||
|
# apply to TWS from version 1018 onwards.
|
||||||
|
|
||||||
# Set to yes or no (lower case).
|
AutoLogoffTime=
|
||||||
#
|
|
||||||
# yes means allow TWS to shut down automatically at its
|
|
||||||
# specified shutdown time, which is set via the TWS
|
|
||||||
# configuration menu.
|
|
||||||
#
|
|
||||||
# no means TWS never shuts down automatically.
|
|
||||||
#
|
|
||||||
# NB: IB recommends that you do not keep TWS running
|
|
||||||
# continuously. If you set this setting to 'no', you may
|
|
||||||
# experience incorrect TWS operation.
|
|
||||||
#
|
|
||||||
# NB: the default for this setting is 'no'. Since this will
|
|
||||||
# only work properly with TWS versions earlier than 974, you
|
|
||||||
# should explicitly set this to 'yes' for version 974 and later.
|
|
||||||
|
|
||||||
IbAutoClosedown=yes
|
|
||||||
|
|
||||||
|
AutoRestartTime=
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
# 5. TWS Tidy Closedown Time
|
# 5. TWS Tidy Closedown Time
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
#
|
#
|
||||||
# NB: starting with TWS 974 this is no longer a useful option
|
# Specifies a time at which TWS will close down tidily, with no restart.
|
||||||
# because both TWS and Gateway now have the same auto-logoff
|
|
||||||
# mechanism, and IBC can no longer avoid this.
|
|
||||||
#
|
#
|
||||||
# Note that giving this setting a value does not change TWS's
|
# There is little reason to use this setting. It is similar to AutoLogoffTime,
|
||||||
# auto-logoff in any way: any setting will be additional to the
|
# but can include a day-of-the-week, whereas AutoLogoffTime and AutoRestartTime
|
||||||
# TWS auto-logoff.
|
# apply every day. So for example you could use ClosedownAt in conjunction with
|
||||||
|
# AutoRestartTime to shut down TWS on Friday evenings after the markets
|
||||||
|
# close, without it running on Saturday as well.
|
||||||
#
|
#
|
||||||
# To tell IBC to tidily close TWS at a specified time every
|
# To tell IBC to tidily close TWS at a specified time every
|
||||||
# day, set this value to <hh:mm>, for example:
|
# day, set this value to <hh:mm>, for example:
|
||||||
|
@ -487,7 +679,7 @@ AcceptIncomingConnectionAction=reject
|
||||||
# no means the dialog remains on display and must be
|
# no means the dialog remains on display and must be
|
||||||
# handled by the user.
|
# handled by the user.
|
||||||
|
|
||||||
AllowBlindTrading=yes
|
AllowBlindTrading=no
|
||||||
|
|
||||||
|
|
||||||
# Save Settings on a Schedule
|
# Save Settings on a Schedule
|
||||||
|
@ -530,6 +722,26 @@ AllowBlindTrading=yes
|
||||||
SaveTwsSettingsAt=
|
SaveTwsSettingsAt=
|
||||||
|
|
||||||
|
|
||||||
|
# Confirm Crypto Currency Orders Automatically
|
||||||
|
# --------------------------------------------
|
||||||
|
#
|
||||||
|
# When you place an order for a cryptocurrency contract, a dialog is displayed
|
||||||
|
# asking you to confirm that you want to place the order, and notifying you
|
||||||
|
# that you are placing an order to trade cryptocurrency with Paxos, a New York
|
||||||
|
# limited trust company, and not at Interactive Brokers.
|
||||||
|
#
|
||||||
|
# transmit means that the order will be placed automatically, and the
|
||||||
|
# dialog will then be closed
|
||||||
|
#
|
||||||
|
# cancel means that the order will not be placed, and the dialog will
|
||||||
|
# then be closed
|
||||||
|
#
|
||||||
|
# manual means that IBC will take no action and the user must deal
|
||||||
|
# with the dialog
|
||||||
|
|
||||||
|
ConfirmCryptoCurrencyOrders=transmit
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
# 7. Settings Specific to Indian Versions of TWS
|
# 7. Settings Specific to Indian Versions of TWS
|
||||||
|
@ -566,13 +778,17 @@ DismissNSEComplianceNotice=yes
|
||||||
#
|
#
|
||||||
# The port number that IBC listens on for commands
|
# The port number that IBC listens on for commands
|
||||||
# such as "STOP". DO NOT set this to the port number
|
# such as "STOP". DO NOT set this to the port number
|
||||||
# used for TWS API connections. There is no good reason
|
# used for TWS API connections.
|
||||||
# to change this setting unless the port is used by
|
#
|
||||||
# some other application (typically another instance of
|
# The convention is to use 7462 for this port,
|
||||||
# IBC). The default value is 0, which tells IBC not to
|
# but it must be set to a different value from any other
|
||||||
# start the command server
|
# IBC instance that might run at the same time.
|
||||||
|
#
|
||||||
|
# The default value is 0, which tells IBC not to start
|
||||||
|
# the command server
|
||||||
|
|
||||||
#CommandServerPort=7462
|
#CommandServerPort=7462
|
||||||
|
CommandServerPort=0
|
||||||
|
|
||||||
|
|
||||||
# Permitted Command Sources
|
# Permitted Command Sources
|
||||||
|
@ -583,19 +799,19 @@ DismissNSEComplianceNotice=yes
|
||||||
# IBC. Commands can always be sent from the
|
# IBC. Commands can always be sent from the
|
||||||
# same host as IBC is running on.
|
# same host as IBC is running on.
|
||||||
|
|
||||||
ControlFrom=127.0.0.1
|
ControlFrom=
|
||||||
|
|
||||||
|
|
||||||
# Address for Receiving Commands
|
# Address for Receiving Commands
|
||||||
# ------------------------------
|
# ------------------------------
|
||||||
#
|
#
|
||||||
# Specifies the IP address on which the Command Server
|
# Specifies the IP address on which the Command Server
|
||||||
# is so listen. For a multi-homed host, this can be used
|
# is to listen. For a multi-homed host, this can be used
|
||||||
# to specify that connection requests are only to be
|
# to specify that connection requests are only to be
|
||||||
# accepted on the specified address. The default is to
|
# accepted on the specified address. The default is to
|
||||||
# accept connection requests on all local addresses.
|
# accept connection requests on all local addresses.
|
||||||
|
|
||||||
BindAddress=127.0.0.1
|
BindAddress=
|
||||||
|
|
||||||
|
|
||||||
# Command Prompt
|
# Command Prompt
|
||||||
|
@ -621,7 +837,7 @@ CommandPrompt=
|
||||||
# information is sent. The default is that such information
|
# information is sent. The default is that such information
|
||||||
# is not sent.
|
# is not sent.
|
||||||
|
|
||||||
SuppressInfoMessages=no
|
SuppressInfoMessages=yes
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -651,10 +867,10 @@ SuppressInfoMessages=no
|
||||||
# The LogStructureScope setting indicates which windows are
|
# The LogStructureScope setting indicates which windows are
|
||||||
# eligible for structure logging:
|
# eligible for structure logging:
|
||||||
#
|
#
|
||||||
# - if set to 'known', only windows that IBC recognizes
|
# - (default value) if set to 'known', only windows that
|
||||||
# are eligible - these are windows that IBC has some
|
# IBC recognizes are eligible - these are windows that
|
||||||
# interest in monitoring, usually to take some action
|
# IBC has some interest in monitoring, usually to take
|
||||||
# on the user's behalf;
|
# some action on the user's behalf;
|
||||||
#
|
#
|
||||||
# - if set to 'unknown', only windows that IBC does not
|
# - if set to 'unknown', only windows that IBC does not
|
||||||
# recognize are eligible. Most windows displayed by
|
# recognize are eligible. Most windows displayed by
|
||||||
|
@ -667,9 +883,8 @@ SuppressInfoMessages=no
|
||||||
# - if set to 'all', then every window displayed by TWS
|
# - if set to 'all', then every window displayed by TWS
|
||||||
# is eligible.
|
# is eligible.
|
||||||
#
|
#
|
||||||
# The default value is 'known'.
|
|
||||||
|
|
||||||
LogStructureScope=all
|
LogStructureScope=known
|
||||||
|
|
||||||
|
|
||||||
# When to Log Window Structure
|
# When to Log Window Structure
|
||||||
|
@ -682,13 +897,15 @@ LogStructureScope=all
|
||||||
# structure of an eligible window the first time it
|
# structure of an eligible window the first time it
|
||||||
# is encountered;
|
# is encountered;
|
||||||
#
|
#
|
||||||
|
# - if set to 'openclose', the structure is logged every
|
||||||
|
# time an eligible window is opened or closed;
|
||||||
|
#
|
||||||
# - if set to 'activate', the structure is logged every
|
# - if set to 'activate', the structure is logged every
|
||||||
# time an eligible window is made active;
|
# time an eligible window is made active;
|
||||||
#
|
#
|
||||||
# - if set to 'never' or 'no' or 'false', structure
|
# - (default value) if set to 'never' or 'no' or 'false',
|
||||||
# information is never logged.
|
# structure information is never logged.
|
||||||
#
|
#
|
||||||
# The default value is 'never'.
|
|
||||||
|
|
||||||
LogStructureWhen=never
|
LogStructureWhen=never
|
||||||
|
|
||||||
|
@ -708,4 +925,3 @@ LogStructureWhen=never
|
||||||
#LogComponents=
|
#LogComponents=
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -327,7 +327,11 @@ class MktPair(Struct, frozen=True):
|
||||||
) -> dict:
|
) -> dict:
|
||||||
d = super().to_dict(**kwargs)
|
d = super().to_dict(**kwargs)
|
||||||
d['src'] = self.src.to_dict(**kwargs)
|
d['src'] = self.src.to_dict(**kwargs)
|
||||||
d['dst'] = self.dst.to_dict(**kwargs)
|
|
||||||
|
if not isinstance(self.dst, str):
|
||||||
|
d['dst'] = self.dst.to_dict(**kwargs)
|
||||||
|
else:
|
||||||
|
d['dst'] = str(self.dst)
|
||||||
|
|
||||||
d['price_tick'] = str(self.price_tick)
|
d['price_tick'] = str(self.price_tick)
|
||||||
d['size_tick'] = str(self.size_tick)
|
d['size_tick'] = str(self.size_tick)
|
||||||
|
@ -349,11 +353,16 @@ class MktPair(Struct, frozen=True):
|
||||||
Constructor for a received msg-dict normally received over IPC.
|
Constructor for a received msg-dict normally received over IPC.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
dst_asset_msg = msg.pop('dst')
|
if not isinstance(
|
||||||
dst = Asset.from_msg(dst_asset_msg) # .copy()
|
dst_asset_msg := msg.pop('dst'),
|
||||||
|
str,
|
||||||
|
):
|
||||||
|
dst: Asset = Asset.from_msg(dst_asset_msg) # .copy()
|
||||||
|
else:
|
||||||
|
dst: str = dst_asset_msg
|
||||||
|
|
||||||
src_asset_msg = msg.pop('src')
|
src_asset_msg: dict = msg.pop('src')
|
||||||
src = Asset.from_msg(src_asset_msg) # .copy()
|
src: Asset = Asset.from_msg(src_asset_msg) # .copy()
|
||||||
|
|
||||||
# XXX NOTE: ``msgspec`` can encode `Decimal` but it doesn't
|
# XXX NOTE: ``msgspec`` can encode `Decimal` but it doesn't
|
||||||
# decide to it by default since we aren't spec-cing these
|
# decide to it by default since we aren't spec-cing these
|
||||||
|
|
|
@ -229,8 +229,8 @@ _samplings: dict[int, tuple[str, str]] = {
|
||||||
# throughput can be made faster during backfilling.
|
# throughput can be made faster during backfilling.
|
||||||
60: (
|
60: (
|
||||||
'1 min',
|
'1 min',
|
||||||
'1 D',
|
'2 D',
|
||||||
pendulum.duration(days=1),
|
pendulum.duration(days=2),
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -818,7 +818,7 @@ async def stream_quotes(
|
||||||
details: ibis.ContractDetails
|
details: ibis.ContractDetails
|
||||||
async with (
|
async with (
|
||||||
open_data_client() as proxy,
|
open_data_client() as proxy,
|
||||||
trio.open_nursery() as tn,
|
# trio.open_nursery() as tn,
|
||||||
):
|
):
|
||||||
mkt, details = await get_mkt_info(
|
mkt, details = await get_mkt_info(
|
||||||
sym,
|
sym,
|
||||||
|
|
|
@ -214,7 +214,7 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
|
||||||
last = time.time()
|
last = time.time()
|
||||||
async for pattern in stream:
|
async for pattern in stream:
|
||||||
log.info(f'received {pattern}')
|
log.info(f'received {pattern}')
|
||||||
now = time.time()
|
now: float = time.time()
|
||||||
|
|
||||||
# this causes tractor hang...
|
# this causes tractor hang...
|
||||||
# assert 0
|
# assert 0
|
||||||
|
@ -261,7 +261,9 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
|
||||||
# defined adhoc symbol set.
|
# defined adhoc symbol set.
|
||||||
stock_results = []
|
stock_results = []
|
||||||
|
|
||||||
async def stash_results(target: Awaitable[list]):
|
async def extend_results(
|
||||||
|
target: Awaitable[list]
|
||||||
|
) -> None:
|
||||||
try:
|
try:
|
||||||
results = await target
|
results = await target
|
||||||
except tractor.trionics.Lagged:
|
except tractor.trionics.Lagged:
|
||||||
|
@ -274,7 +276,7 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
|
||||||
with trio.move_on_after(3) as cs:
|
with trio.move_on_after(3) as cs:
|
||||||
async with trio.open_nursery() as sn:
|
async with trio.open_nursery() as sn:
|
||||||
sn.start_soon(
|
sn.start_soon(
|
||||||
stash_results,
|
extend_results,
|
||||||
proxy.search_symbols(
|
proxy.search_symbols(
|
||||||
pattern=pattern,
|
pattern=pattern,
|
||||||
upto=5,
|
upto=5,
|
||||||
|
@ -289,8 +291,10 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
|
||||||
f'Search timeout? {proxy._aio_ns.ib.client}'
|
f'Search timeout? {proxy._aio_ns.ib.client}'
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
else:
|
elif stock_results:
|
||||||
break
|
break
|
||||||
|
# else:
|
||||||
|
await tractor.pause()
|
||||||
|
|
||||||
# # match against our ad-hoc set immediately
|
# # match against our ad-hoc set immediately
|
||||||
# adhoc_matches = fuzzy.extract(
|
# adhoc_matches = fuzzy.extract(
|
||||||
|
|
|
@ -42,35 +42,15 @@ if TYPE_CHECKING:
|
||||||
from .feed import Feed
|
from .feed import Feed
|
||||||
|
|
||||||
|
|
||||||
# TODO: ideas for further abstractions as per
|
|
||||||
# https://github.com/pikers/piker/issues/216 and
|
|
||||||
# https://github.com/pikers/piker/issues/270:
|
|
||||||
# - a ``Cascade`` would be the minimal "connection" of 2 ``Flumes``
|
|
||||||
# as per circuit parlance:
|
|
||||||
# https://en.wikipedia.org/wiki/Two-port_network#Cascade_connection
|
|
||||||
# - could cover the combination of our `FspAdmin` and the
|
|
||||||
# backend `.fsp._engine` related machinery to "connect" one flume
|
|
||||||
# to another?
|
|
||||||
# - a (financial signal) ``Flow`` would be the a "collection" of such
|
|
||||||
# minmial cascades. Some engineering based jargon concepts:
|
|
||||||
# - https://en.wikipedia.org/wiki/Signal_chain
|
|
||||||
# - https://en.wikipedia.org/wiki/Daisy_chain_(electrical_engineering)
|
|
||||||
# - https://en.wikipedia.org/wiki/Audio_signal_flow
|
|
||||||
# - https://en.wikipedia.org/wiki/Digital_signal_processing#Implementation
|
|
||||||
# - https://en.wikipedia.org/wiki/Dataflow_programming
|
|
||||||
# - https://en.wikipedia.org/wiki/Signal_programming
|
|
||||||
# - https://en.wikipedia.org/wiki/Incremental_computing
|
|
||||||
|
|
||||||
|
|
||||||
class Flume(Struct):
|
class Flume(Struct):
|
||||||
'''
|
'''
|
||||||
Composite reference type which points to all the addressing handles
|
Composite reference type which points to all the addressing
|
||||||
and other meta-data necessary for the read, measure and management
|
handles and other meta-data necessary for the read, measure and
|
||||||
of a set of real-time updated data flows.
|
management of a set of real-time updated data flows.
|
||||||
|
|
||||||
Can be thought of as a "flow descriptor" or "flow frame" which
|
Can be thought of as a "flow descriptor" or "flow frame" which
|
||||||
describes the high level properties of a set of data flows that can
|
describes the high level properties of a set of data flows that
|
||||||
be used seamlessly across process-memory boundaries.
|
can be used seamlessly across process-memory boundaries.
|
||||||
|
|
||||||
Each instance's sub-components normally includes:
|
Each instance's sub-components normally includes:
|
||||||
- a msg oriented quote stream provided via an IPC transport
|
- a msg oriented quote stream provided via an IPC transport
|
||||||
|
@ -93,6 +73,7 @@ class Flume(Struct):
|
||||||
# private shm refs loaded dynamically from tokens
|
# private shm refs loaded dynamically from tokens
|
||||||
_hist_shm: ShmArray | None = None
|
_hist_shm: ShmArray | None = None
|
||||||
_rt_shm: ShmArray | None = None
|
_rt_shm: ShmArray | None = None
|
||||||
|
_readonly: bool = True
|
||||||
|
|
||||||
stream: tractor.MsgStream | None = None
|
stream: tractor.MsgStream | None = None
|
||||||
izero_hist: int = 0
|
izero_hist: int = 0
|
||||||
|
@ -109,7 +90,7 @@ class Flume(Struct):
|
||||||
if self._rt_shm is None:
|
if self._rt_shm is None:
|
||||||
self._rt_shm = attach_shm_array(
|
self._rt_shm = attach_shm_array(
|
||||||
token=self._rt_shm_token,
|
token=self._rt_shm_token,
|
||||||
readonly=True,
|
readonly=self._readonly,
|
||||||
)
|
)
|
||||||
|
|
||||||
return self._rt_shm
|
return self._rt_shm
|
||||||
|
@ -122,12 +103,10 @@ class Flume(Struct):
|
||||||
'No shm token has been set for the history buffer?'
|
'No shm token has been set for the history buffer?'
|
||||||
)
|
)
|
||||||
|
|
||||||
if (
|
if self._hist_shm is None:
|
||||||
self._hist_shm is None
|
|
||||||
):
|
|
||||||
self._hist_shm = attach_shm_array(
|
self._hist_shm = attach_shm_array(
|
||||||
token=self._hist_shm_token,
|
token=self._hist_shm_token,
|
||||||
readonly=True,
|
readonly=self._readonly,
|
||||||
)
|
)
|
||||||
|
|
||||||
return self._hist_shm
|
return self._hist_shm
|
||||||
|
@ -146,10 +125,10 @@ class Flume(Struct):
|
||||||
period and ratio between them.
|
period and ratio between them.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
times = self.hist_shm.array['time']
|
times: np.ndarray = self.hist_shm.array['time']
|
||||||
end = pendulum.from_timestamp(times[-1])
|
end: float | int = pendulum.from_timestamp(times[-1])
|
||||||
start = pendulum.from_timestamp(times[times != times[-1]][-1])
|
start: float | int = pendulum.from_timestamp(times[times != times[-1]][-1])
|
||||||
hist_step_size_s = (end - start).seconds
|
hist_step_size_s: float = (end - start).seconds
|
||||||
|
|
||||||
times = self.rt_shm.array['time']
|
times = self.rt_shm.array['time']
|
||||||
end = pendulum.from_timestamp(times[-1])
|
end = pendulum.from_timestamp(times[-1])
|
||||||
|
@ -169,17 +148,25 @@ class Flume(Struct):
|
||||||
msg = self.to_dict()
|
msg = self.to_dict()
|
||||||
msg['mkt'] = self.mkt.to_dict()
|
msg['mkt'] = self.mkt.to_dict()
|
||||||
|
|
||||||
# can't serialize the stream or feed objects, it's expected
|
# NOTE: pop all un-msg-serializable fields:
|
||||||
# you'll have a ref to it since this msg should be rxed on
|
# - `tractor.MsgStream`
|
||||||
# a stream on whatever far end IPC..
|
# - `Feed`
|
||||||
|
# - `Shmarray`
|
||||||
|
# it's expected the `.from_msg()` on the other side
|
||||||
|
# will get instead some kind of msg-compat version
|
||||||
|
# that it can load.
|
||||||
msg.pop('stream')
|
msg.pop('stream')
|
||||||
msg.pop('feed')
|
msg.pop('feed')
|
||||||
|
msg.pop('_rt_shm')
|
||||||
|
msg.pop('_hist_shm')
|
||||||
|
|
||||||
return msg
|
return msg
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_msg(
|
def from_msg(
|
||||||
cls,
|
cls,
|
||||||
msg: dict,
|
msg: dict,
|
||||||
|
readonly: bool = True,
|
||||||
|
|
||||||
) -> dict:
|
) -> dict:
|
||||||
'''
|
'''
|
||||||
|
@ -190,7 +177,11 @@ class Flume(Struct):
|
||||||
mkt_msg = msg.pop('mkt')
|
mkt_msg = msg.pop('mkt')
|
||||||
from ..accounting import MktPair # cycle otherwise..
|
from ..accounting import MktPair # cycle otherwise..
|
||||||
mkt = MktPair.from_msg(mkt_msg)
|
mkt = MktPair.from_msg(mkt_msg)
|
||||||
return cls(mkt=mkt, **msg)
|
msg |= {'_readonly': readonly}
|
||||||
|
return cls(
|
||||||
|
mkt=mkt,
|
||||||
|
**msg,
|
||||||
|
)
|
||||||
|
|
||||||
def get_index(
|
def get_index(
|
||||||
self,
|
self,
|
||||||
|
|
|
@ -57,6 +57,7 @@ from ._sampling import (
|
||||||
from ..brokers._util import (
|
from ..brokers._util import (
|
||||||
DataUnavailable,
|
DataUnavailable,
|
||||||
)
|
)
|
||||||
|
from ..storage import TimeseriesNotFound
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from bidict import bidict
|
from bidict import bidict
|
||||||
|
@ -690,13 +691,18 @@ async def tsdb_backfill(
|
||||||
# but if not then below the remaining history can be lazy
|
# but if not then below the remaining history can be lazy
|
||||||
# loaded?
|
# loaded?
|
||||||
fqme: str = mkt.fqme
|
fqme: str = mkt.fqme
|
||||||
tsdb_entry: tuple | None = await storage.load(
|
|
||||||
fqme,
|
|
||||||
timeframe=timeframe,
|
|
||||||
)
|
|
||||||
|
|
||||||
last_tsdb_dt: datetime | None = None
|
last_tsdb_dt: datetime | None = None
|
||||||
if tsdb_entry:
|
try:
|
||||||
|
tsdb_entry: tuple | None = await storage.load(
|
||||||
|
fqme,
|
||||||
|
timeframe=timeframe,
|
||||||
|
)
|
||||||
|
except TimeseriesNotFound:
|
||||||
|
log.warning(
|
||||||
|
f'No timeseries yet for {fqme}'
|
||||||
|
)
|
||||||
|
|
||||||
|
else:
|
||||||
(
|
(
|
||||||
tsdb_history,
|
tsdb_history,
|
||||||
first_tsdb_dt,
|
first_tsdb_dt,
|
||||||
|
@ -963,7 +969,8 @@ async def manage_history(
|
||||||
sub_for_broadcasts=False,
|
sub_for_broadcasts=False,
|
||||||
|
|
||||||
) as sample_stream:
|
) as sample_stream:
|
||||||
# register 1s and 1m buffers with the global incrementer task
|
# register 1s and 1m buffers with the global
|
||||||
|
# incrementer task
|
||||||
log.info(f'Connected to sampler stream: {sample_stream}')
|
log.info(f'Connected to sampler stream: {sample_stream}')
|
||||||
|
|
||||||
for timeframe in [60, 1]:
|
for timeframe in [60, 1]:
|
||||||
|
|
|
@ -26,7 +26,10 @@ from ._api import (
|
||||||
maybe_mk_fsp_shm,
|
maybe_mk_fsp_shm,
|
||||||
Fsp,
|
Fsp,
|
||||||
)
|
)
|
||||||
from ._engine import cascade
|
from ._engine import (
|
||||||
|
cascade,
|
||||||
|
Cascade,
|
||||||
|
)
|
||||||
from ._volume import (
|
from ._volume import (
|
||||||
dolla_vlm,
|
dolla_vlm,
|
||||||
flow_rates,
|
flow_rates,
|
||||||
|
@ -35,6 +38,7 @@ from ._volume import (
|
||||||
|
|
||||||
__all__: list[str] = [
|
__all__: list[str] = [
|
||||||
'cascade',
|
'cascade',
|
||||||
|
'Cascade',
|
||||||
'maybe_mk_fsp_shm',
|
'maybe_mk_fsp_shm',
|
||||||
'Fsp',
|
'Fsp',
|
||||||
'dolla_vlm',
|
'dolla_vlm',
|
||||||
|
@ -46,9 +50,12 @@ __all__: list[str] = [
|
||||||
async def latency(
|
async def latency(
|
||||||
source: 'TickStream[Dict[str, float]]', # noqa
|
source: 'TickStream[Dict[str, float]]', # noqa
|
||||||
ohlcv: np.ndarray
|
ohlcv: np.ndarray
|
||||||
|
|
||||||
) -> AsyncIterator[np.ndarray]:
|
) -> AsyncIterator[np.ndarray]:
|
||||||
"""Latency measurements, broker to piker.
|
'''
|
||||||
"""
|
Latency measurements, broker to piker.
|
||||||
|
|
||||||
|
'''
|
||||||
# TODO: do we want to offer yielding this async
|
# TODO: do we want to offer yielding this async
|
||||||
# before the rt data connection comes up?
|
# before the rt data connection comes up?
|
||||||
|
|
||||||
|
|
|
@ -18,13 +18,12 @@
|
||||||
core task logic for processing chains
|
core task logic for processing chains
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from dataclasses import dataclass
|
from __future__ import annotations
|
||||||
|
from contextlib import asynccontextmanager as acm
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from typing import (
|
from typing import (
|
||||||
AsyncIterator,
|
AsyncIterator,
|
||||||
Callable,
|
Callable,
|
||||||
Optional,
|
|
||||||
Union,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
@ -33,9 +32,9 @@ from trio_typing import TaskStatus
|
||||||
import tractor
|
import tractor
|
||||||
from tractor.msg import NamespacePath
|
from tractor.msg import NamespacePath
|
||||||
|
|
||||||
|
from piker.types import Struct
|
||||||
from ..log import get_logger, get_console_log
|
from ..log import get_logger, get_console_log
|
||||||
from .. import data
|
from .. import data
|
||||||
from ..data import attach_shm_array
|
|
||||||
from ..data.feed import (
|
from ..data.feed import (
|
||||||
Flume,
|
Flume,
|
||||||
Feed,
|
Feed,
|
||||||
|
@ -56,12 +55,6 @@ from ..toolz import Profiler
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class TaskTracker:
|
|
||||||
complete: trio.Event
|
|
||||||
cs: trio.CancelScope
|
|
||||||
|
|
||||||
|
|
||||||
async def filter_quotes_by_sym(
|
async def filter_quotes_by_sym(
|
||||||
|
|
||||||
sym: str,
|
sym: str,
|
||||||
|
@ -82,30 +75,168 @@ async def filter_quotes_by_sym(
|
||||||
if quote:
|
if quote:
|
||||||
yield quote
|
yield quote
|
||||||
|
|
||||||
|
# TODO: unifying the abstractions in this FSP subsys/layer:
|
||||||
|
# -[ ] move the `.data.flows.Flume` type into this
|
||||||
|
# module/subsys/pkg?
|
||||||
|
# -[ ] ideas for further abstractions as per
|
||||||
|
# - https://github.com/pikers/piker/issues/216,
|
||||||
|
# - https://github.com/pikers/piker/issues/270:
|
||||||
|
# - a (financial signal) ``Flow`` would be the a "collection" of such
|
||||||
|
# minmial cascades. Some engineering based jargon concepts:
|
||||||
|
# - https://en.wikipedia.org/wiki/Signal_chain
|
||||||
|
# - https://en.wikipedia.org/wiki/Daisy_chain_(electrical_engineering)
|
||||||
|
# - https://en.wikipedia.org/wiki/Audio_signal_flow
|
||||||
|
# - https://en.wikipedia.org/wiki/Digital_signal_processing#Implementation
|
||||||
|
# - https://en.wikipedia.org/wiki/Dataflow_programming
|
||||||
|
# - https://en.wikipedia.org/wiki/Signal_programming
|
||||||
|
# - https://en.wikipedia.org/wiki/Incremental_computing
|
||||||
|
# - https://en.wikipedia.org/wiki/Signal-flow_graph
|
||||||
|
# - https://en.wikipedia.org/wiki/Signal-flow_graph#Basic_components
|
||||||
|
|
||||||
async def fsp_compute(
|
# -[ ] we probably want to eval THE BELOW design and unify with the
|
||||||
|
# proto `TaskManager` in the `tractor` dev branch as well as with
|
||||||
|
# our below idea for `Cascade`:
|
||||||
|
# - https://github.com/goodboy/tractor/pull/363
|
||||||
|
class Cascade(Struct):
|
||||||
|
'''
|
||||||
|
As per sig-proc engineering parlance, this is a chaining of
|
||||||
|
`Flume`s, which are themselves collections of "Streams"
|
||||||
|
implemented currently via `ShmArray`s.
|
||||||
|
|
||||||
|
A `Cascade` is be the minimal "connection" of 2 `Flumes`
|
||||||
|
as per circuit parlance:
|
||||||
|
https://en.wikipedia.org/wiki/Two-port_network#Cascade_connection
|
||||||
|
|
||||||
|
TODO:
|
||||||
|
-[ ] could cover the combination of our `FspAdmin` and the
|
||||||
|
backend `.fsp._engine` related machinery to "connect" one flume
|
||||||
|
to another?
|
||||||
|
|
||||||
|
'''
|
||||||
|
# TODO: make these `Flume`s
|
||||||
|
src: Flume
|
||||||
|
dst: Flume
|
||||||
|
tn: trio.Nursery
|
||||||
|
fsp: Fsp # UI-side middleware ctl API
|
||||||
|
|
||||||
|
# filled during cascade/.bind_func() (fsp_compute) init phases
|
||||||
|
bind_func: Callable | None = None
|
||||||
|
complete: trio.Event | None = None
|
||||||
|
cs: trio.CancelScope | None = None
|
||||||
|
client_stream: tractor.MsgStream | None = None
|
||||||
|
|
||||||
|
async def resync(self) -> int:
|
||||||
|
# TODO: adopt an incremental update engine/approach
|
||||||
|
# where possible here eventually!
|
||||||
|
log.info(f're-syncing fsp {self.fsp.name} to source')
|
||||||
|
self.cs.cancel()
|
||||||
|
await self.complete.wait()
|
||||||
|
index: int = await self.tn.start(self.bind_func)
|
||||||
|
|
||||||
|
# always trigger UI refresh after history update,
|
||||||
|
# see ``piker.ui._fsp.FspAdmin.open_chain()`` and
|
||||||
|
# ``piker.ui._display.trigger_update()``.
|
||||||
|
dst_shm: ShmArray = self.dst.rt_shm
|
||||||
|
await self.client_stream.send({
|
||||||
|
'fsp_update': {
|
||||||
|
'key': dst_shm.token,
|
||||||
|
'first': dst_shm._first.value,
|
||||||
|
'last': dst_shm._last.value,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return index
|
||||||
|
|
||||||
|
def is_synced(self) -> tuple[bool, int, int]:
|
||||||
|
'''
|
||||||
|
Predicate to dertmine if a destination FSP
|
||||||
|
output array is aligned to its source array.
|
||||||
|
|
||||||
|
'''
|
||||||
|
src_shm: ShmArray = self.src.rt_shm
|
||||||
|
dst_shm: ShmArray = self.dst.rt_shm
|
||||||
|
step_diff = src_shm.index - dst_shm.index
|
||||||
|
len_diff = abs(len(src_shm.array) - len(dst_shm.array))
|
||||||
|
synced: bool = not (
|
||||||
|
# the source is likely backfilling and we must
|
||||||
|
# sync history calculations
|
||||||
|
len_diff > 2
|
||||||
|
|
||||||
|
# we aren't step synced to the source and may be
|
||||||
|
# leading/lagging by a step
|
||||||
|
or step_diff > 1
|
||||||
|
or step_diff < 0
|
||||||
|
)
|
||||||
|
if not synced:
|
||||||
|
fsp: Fsp = self.fsp
|
||||||
|
log.warning(
|
||||||
|
'***DESYNCED FSP***\n'
|
||||||
|
f'{fsp.ns_path}@{src_shm.token}\n'
|
||||||
|
f'step_diff: {step_diff}\n'
|
||||||
|
f'len_diff: {len_diff}\n'
|
||||||
|
)
|
||||||
|
return (
|
||||||
|
synced,
|
||||||
|
step_diff,
|
||||||
|
len_diff,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def poll_and_sync_to_step(self) -> int:
|
||||||
|
synced, step_diff, _ = self.is_synced()
|
||||||
|
while not synced:
|
||||||
|
await self.resync()
|
||||||
|
synced, step_diff, _ = self.is_synced()
|
||||||
|
|
||||||
|
return step_diff
|
||||||
|
|
||||||
|
@acm
|
||||||
|
async def open_edge(
|
||||||
|
self,
|
||||||
|
bind_func: Callable,
|
||||||
|
) -> int:
|
||||||
|
self.bind_func = bind_func
|
||||||
|
index = await self.tn.start(bind_func)
|
||||||
|
yield index
|
||||||
|
# TODO: what do we want on teardown/error?
|
||||||
|
# -[ ] dynamic reconnection after update?
|
||||||
|
|
||||||
|
|
||||||
|
async def connect_streams(
|
||||||
|
casc: Cascade,
|
||||||
mkt: MktPair,
|
mkt: MktPair,
|
||||||
flume: Flume,
|
|
||||||
quote_stream: trio.abc.ReceiveChannel,
|
quote_stream: trio.abc.ReceiveChannel,
|
||||||
|
src: Flume,
|
||||||
|
dst: Flume,
|
||||||
|
|
||||||
src: ShmArray,
|
edge_func: Callable,
|
||||||
dst: ShmArray,
|
|
||||||
|
|
||||||
func: Callable,
|
|
||||||
|
|
||||||
# attach_stream: bool = False,
|
# attach_stream: bool = False,
|
||||||
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
'''
|
||||||
|
Stream and per-sample compute and write the cascade of
|
||||||
|
2 `Flumes`/streams given some operating `func`.
|
||||||
|
|
||||||
|
https://en.wikipedia.org/wiki/Signal-flow_graph#Basic_components
|
||||||
|
|
||||||
|
Not literally, but something like:
|
||||||
|
|
||||||
|
edge_func(Flume_in) -> Flume_out
|
||||||
|
|
||||||
|
'''
|
||||||
profiler = Profiler(
|
profiler = Profiler(
|
||||||
delayed=False,
|
delayed=False,
|
||||||
disabled=True
|
disabled=True
|
||||||
)
|
)
|
||||||
|
|
||||||
fqme = mkt.fqme
|
# TODO: just pull it from src.mkt.fqme no?
|
||||||
out_stream = func(
|
# fqme: str = mkt.fqme
|
||||||
|
fqme: str = src.mkt.fqme
|
||||||
|
|
||||||
|
# TODO: dynamic introspection of what the underlying (vertex)
|
||||||
|
# function actually requires from input node (flumes) then
|
||||||
|
# deliver those inputs as part of a graph "compilation" step?
|
||||||
|
out_stream = edge_func(
|
||||||
|
|
||||||
# TODO: do we even need this if we do the feed api right?
|
# TODO: do we even need this if we do the feed api right?
|
||||||
# shouldn't a local stream do this before we get a handle
|
# shouldn't a local stream do this before we get a handle
|
||||||
|
@ -113,20 +244,21 @@ async def fsp_compute(
|
||||||
# async itertools style?
|
# async itertools style?
|
||||||
filter_quotes_by_sym(fqme, quote_stream),
|
filter_quotes_by_sym(fqme, quote_stream),
|
||||||
|
|
||||||
# XXX: currently the ``ohlcv`` arg
|
# XXX: currently the ``ohlcv`` arg, but we should allow
|
||||||
flume.rt_shm,
|
# (dynamic) requests for src flume (node) streams?
|
||||||
|
src.rt_shm,
|
||||||
)
|
)
|
||||||
|
|
||||||
# HISTORY COMPUTE PHASE
|
# HISTORY COMPUTE PHASE
|
||||||
# conduct a single iteration of fsp with historical bars input
|
# conduct a single iteration of fsp with historical bars input
|
||||||
# and get historical output.
|
# and get historical output.
|
||||||
history_output: Union[
|
history_output: (
|
||||||
dict[str, np.ndarray], # multi-output case
|
dict[str, np.ndarray] # multi-output case
|
||||||
np.ndarray, # single output case
|
| np.ndarray, # single output case
|
||||||
]
|
)
|
||||||
history_output = await anext(out_stream)
|
history_output = await anext(out_stream)
|
||||||
|
|
||||||
func_name = func.__name__
|
func_name = edge_func.__name__
|
||||||
profiler(f'{func_name} generated history')
|
profiler(f'{func_name} generated history')
|
||||||
|
|
||||||
# build struct array with an 'index' field to push as history
|
# build struct array with an 'index' field to push as history
|
||||||
|
@ -134,10 +266,12 @@ async def fsp_compute(
|
||||||
# TODO: push using a[['f0', 'f1', .., 'fn']] = .. syntax no?
|
# TODO: push using a[['f0', 'f1', .., 'fn']] = .. syntax no?
|
||||||
# if the output array is multi-field then push
|
# if the output array is multi-field then push
|
||||||
# each respective field.
|
# each respective field.
|
||||||
fields = getattr(dst.array.dtype, 'fields', None).copy()
|
dst_shm: ShmArray = dst.rt_shm
|
||||||
|
fields = getattr(dst_shm.array.dtype, 'fields', None).copy()
|
||||||
fields.pop('index')
|
fields.pop('index')
|
||||||
history_by_field: Optional[np.ndarray] = None
|
history_by_field: np.ndarray | None = None
|
||||||
src_time = src.array['time']
|
src_shm: ShmArray = src.rt_shm
|
||||||
|
src_time = src_shm.array['time']
|
||||||
|
|
||||||
if (
|
if (
|
||||||
fields and
|
fields and
|
||||||
|
@ -156,7 +290,7 @@ async def fsp_compute(
|
||||||
if history_by_field is None:
|
if history_by_field is None:
|
||||||
|
|
||||||
if output is None:
|
if output is None:
|
||||||
length = len(src.array)
|
length = len(src_shm.array)
|
||||||
else:
|
else:
|
||||||
length = len(output)
|
length = len(output)
|
||||||
|
|
||||||
|
@ -165,7 +299,7 @@ async def fsp_compute(
|
||||||
# will be pushed to shm.
|
# will be pushed to shm.
|
||||||
history_by_field = np.zeros(
|
history_by_field = np.zeros(
|
||||||
length,
|
length,
|
||||||
dtype=dst.array.dtype
|
dtype=dst_shm.array.dtype
|
||||||
)
|
)
|
||||||
|
|
||||||
if output is None:
|
if output is None:
|
||||||
|
@ -182,13 +316,13 @@ async def fsp_compute(
|
||||||
)
|
)
|
||||||
history_by_field = np.zeros(
|
history_by_field = np.zeros(
|
||||||
len(history_output),
|
len(history_output),
|
||||||
dtype=dst.array.dtype
|
dtype=dst_shm.array.dtype
|
||||||
)
|
)
|
||||||
history_by_field[func_name] = history_output
|
history_by_field[func_name] = history_output
|
||||||
|
|
||||||
history_by_field['time'] = src_time[-len(history_by_field):]
|
history_by_field['time'] = src_time[-len(history_by_field):]
|
||||||
|
|
||||||
history_output['time'] = src.array['time']
|
history_output['time'] = src_shm.array['time']
|
||||||
|
|
||||||
# TODO: XXX:
|
# TODO: XXX:
|
||||||
# THERE'S A BIG BUG HERE WITH THE `index` field since we're
|
# THERE'S A BIG BUG HERE WITH THE `index` field since we're
|
||||||
|
@ -201,11 +335,11 @@ async def fsp_compute(
|
||||||
# is `index` aware such that historical data can be indexed
|
# is `index` aware such that historical data can be indexed
|
||||||
# relative to the true first datum? Not sure if this is sane
|
# relative to the true first datum? Not sure if this is sane
|
||||||
# for incremental compuations.
|
# for incremental compuations.
|
||||||
first = dst._first.value = src._first.value
|
first = dst_shm._first.value = src_shm._first.value
|
||||||
|
|
||||||
# TODO: can we use this `start` flag instead of the manual
|
# TODO: can we use this `start` flag instead of the manual
|
||||||
# setting above?
|
# setting above?
|
||||||
index = dst.push(
|
index = dst_shm.push(
|
||||||
history_by_field,
|
history_by_field,
|
||||||
start=first,
|
start=first,
|
||||||
)
|
)
|
||||||
|
@ -216,12 +350,9 @@ async def fsp_compute(
|
||||||
# setup a respawn handle
|
# setup a respawn handle
|
||||||
with trio.CancelScope() as cs:
|
with trio.CancelScope() as cs:
|
||||||
|
|
||||||
# TODO: might be better to just make a "restart" method where
|
casc.cs = cs
|
||||||
# the target task is spawned implicitly and then the event is
|
casc.complete = trio.Event()
|
||||||
# set via some higher level api? At that poing we might as well
|
task_status.started(index)
|
||||||
# be writing a one-cancels-one nursery though right?
|
|
||||||
tracker = TaskTracker(trio.Event(), cs)
|
|
||||||
task_status.started((tracker, index))
|
|
||||||
|
|
||||||
profiler(f'{func_name} yield last index')
|
profiler(f'{func_name} yield last index')
|
||||||
|
|
||||||
|
@ -235,12 +366,12 @@ async def fsp_compute(
|
||||||
log.debug(f"{func_name}: {processed}")
|
log.debug(f"{func_name}: {processed}")
|
||||||
key, output = processed
|
key, output = processed
|
||||||
# dst.array[-1][key] = output
|
# dst.array[-1][key] = output
|
||||||
dst.array[[key, 'time']][-1] = (
|
dst_shm.array[[key, 'time']][-1] = (
|
||||||
output,
|
output,
|
||||||
# TODO: what about pushing ``time.time_ns()``
|
# TODO: what about pushing ``time.time_ns()``
|
||||||
# in which case we'll need to round at the graphics
|
# in which case we'll need to round at the graphics
|
||||||
# processing / sampling layer?
|
# processing / sampling layer?
|
||||||
src.array[-1]['time']
|
src_shm.array[-1]['time']
|
||||||
)
|
)
|
||||||
|
|
||||||
# NOTE: for now we aren't streaming this to the consumer
|
# NOTE: for now we aren't streaming this to the consumer
|
||||||
|
@ -252,7 +383,7 @@ async def fsp_compute(
|
||||||
# N-consumers who subscribe for the real-time output,
|
# N-consumers who subscribe for the real-time output,
|
||||||
# which we'll likely want to implement using local-mem
|
# which we'll likely want to implement using local-mem
|
||||||
# chans for the fan out?
|
# chans for the fan out?
|
||||||
# index = src.index
|
# index = src_shm.index
|
||||||
# if attach_stream:
|
# if attach_stream:
|
||||||
# await client_stream.send(index)
|
# await client_stream.send(index)
|
||||||
|
|
||||||
|
@ -262,7 +393,7 @@ async def fsp_compute(
|
||||||
# log.info(f'FSP quote too fast: {hz}')
|
# log.info(f'FSP quote too fast: {hz}')
|
||||||
# last = time.time()
|
# last = time.time()
|
||||||
finally:
|
finally:
|
||||||
tracker.complete.set()
|
casc.complete.set()
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
@ -273,15 +404,15 @@ async def cascade(
|
||||||
# data feed key
|
# data feed key
|
||||||
fqme: str,
|
fqme: str,
|
||||||
|
|
||||||
src_shm_token: dict,
|
# flume pair cascaded using an "edge function"
|
||||||
dst_shm_token: tuple[str, np.dtype],
|
src_flume_addr: dict,
|
||||||
|
dst_flume_addr: dict,
|
||||||
ns_path: NamespacePath,
|
ns_path: NamespacePath,
|
||||||
|
|
||||||
shm_registry: dict[str, _Token],
|
shm_registry: dict[str, _Token],
|
||||||
|
|
||||||
zero_on_step: bool = False,
|
zero_on_step: bool = False,
|
||||||
loglevel: Optional[str] = None,
|
loglevel: str | None = None,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -297,8 +428,14 @@ async def cascade(
|
||||||
if loglevel:
|
if loglevel:
|
||||||
get_console_log(loglevel)
|
get_console_log(loglevel)
|
||||||
|
|
||||||
src = attach_shm_array(token=src_shm_token)
|
src: Flume = Flume.from_msg(src_flume_addr)
|
||||||
dst = attach_shm_array(readonly=False, token=dst_shm_token)
|
dst: Flume = Flume.from_msg(
|
||||||
|
dst_flume_addr,
|
||||||
|
readonly=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
# src: ShmArray = attach_shm_array(token=src_shm_token)
|
||||||
|
# dst: ShmArray = attach_shm_array(readonly=False, token=dst_shm_token)
|
||||||
|
|
||||||
reg = _load_builtins()
|
reg = _load_builtins()
|
||||||
lines = '\n'.join([f'{key.rpartition(":")[2]} => {key}' for key in reg])
|
lines = '\n'.join([f'{key.rpartition(":")[2]} => {key}' for key in reg])
|
||||||
|
@ -306,11 +443,11 @@ async def cascade(
|
||||||
f'Registered FSP set:\n{lines}'
|
f'Registered FSP set:\n{lines}'
|
||||||
)
|
)
|
||||||
|
|
||||||
# update actorlocal flows table which registers
|
# NOTE XXX: update actorlocal flows table which registers
|
||||||
# readonly "instances" of this fsp for symbol/source
|
# readonly "instances" of this fsp for symbol/source so that
|
||||||
# so that consumer fsps can look it up by source + fsp.
|
# consumer fsps can look it up by source + fsp.
|
||||||
# TODO: ugh i hate this wind/unwind to list over the wire
|
# TODO: ugh i hate this wind/unwind to list over the wire but
|
||||||
# but not sure how else to do it.
|
# not sure how else to do it.
|
||||||
for (token, fsp_name, dst_token) in shm_registry:
|
for (token, fsp_name, dst_token) in shm_registry:
|
||||||
Fsp._flow_registry[(
|
Fsp._flow_registry[(
|
||||||
_Token.from_msg(token),
|
_Token.from_msg(token),
|
||||||
|
@ -320,12 +457,15 @@ async def cascade(
|
||||||
fsp: Fsp = reg.get(
|
fsp: Fsp = reg.get(
|
||||||
NamespacePath(ns_path)
|
NamespacePath(ns_path)
|
||||||
)
|
)
|
||||||
func = fsp.func
|
func: Callable = fsp.func
|
||||||
|
|
||||||
if not func:
|
if not func:
|
||||||
# TODO: assume it's a func target path
|
# TODO: assume it's a func target path
|
||||||
raise ValueError(f'Unknown fsp target: {ns_path}')
|
raise ValueError(f'Unknown fsp target: {ns_path}')
|
||||||
|
|
||||||
|
_fqme: str = src.mkt.fqme
|
||||||
|
assert _fqme == fqme
|
||||||
|
|
||||||
# open a data feed stream with requested broker
|
# open a data feed stream with requested broker
|
||||||
feed: Feed
|
feed: Feed
|
||||||
async with data.feed.maybe_open_feed(
|
async with data.feed.maybe_open_feed(
|
||||||
|
@ -339,177 +479,142 @@ async def cascade(
|
||||||
|
|
||||||
) as feed:
|
) as feed:
|
||||||
|
|
||||||
flume = feed.flumes[fqme]
|
flume: Flume = feed.flumes[fqme]
|
||||||
mkt = flume.mkt
|
# XXX: can't do this since flume.feed will be set XD
|
||||||
assert src.token == flume.rt_shm.token
|
# assert flume == src
|
||||||
|
assert flume.mkt == src.mkt
|
||||||
|
mkt: MktPair = flume.mkt
|
||||||
|
|
||||||
|
# NOTE: FOR NOW, sanity checks around the feed as being
|
||||||
|
# always the src flume (until we get to fancier/lengthier
|
||||||
|
# chains/graphs.
|
||||||
|
assert src.rt_shm.token == flume.rt_shm.token
|
||||||
|
|
||||||
|
# XXX: won't work bc the _hist_shm_token value will be
|
||||||
|
# list[list] after IPC..
|
||||||
|
# assert flume.to_msg() == src_flume_addr
|
||||||
|
|
||||||
profiler(f'{func}: feed up')
|
profiler(f'{func}: feed up')
|
||||||
|
|
||||||
func_name = func.__name__
|
func_name: str = func.__name__
|
||||||
async with (
|
async with (
|
||||||
trio.open_nursery() as n,
|
trio.open_nursery() as tn,
|
||||||
):
|
):
|
||||||
|
# TODO: might be better to just make a "restart" method where
|
||||||
|
# the target task is spawned implicitly and then the event is
|
||||||
|
# set via some higher level api? At that poing we might as well
|
||||||
|
# be writing a one-cancels-one nursery though right?
|
||||||
|
casc = Cascade(
|
||||||
|
src,
|
||||||
|
dst,
|
||||||
|
tn,
|
||||||
|
fsp,
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO: this seems like it should be wrapped somewhere?
|
||||||
fsp_target = partial(
|
fsp_target = partial(
|
||||||
|
connect_streams,
|
||||||
fsp_compute,
|
casc=casc,
|
||||||
mkt=mkt,
|
mkt=mkt,
|
||||||
flume=flume,
|
|
||||||
quote_stream=flume.stream,
|
quote_stream=flume.stream,
|
||||||
|
|
||||||
# shm
|
# flumes and shm passthrough
|
||||||
src=src,
|
src=src,
|
||||||
dst=dst,
|
dst=dst,
|
||||||
|
|
||||||
# target
|
# chain function which takes src flume input(s)
|
||||||
func=func
|
# and renders dst flume output(s)
|
||||||
|
edge_func=func
|
||||||
)
|
)
|
||||||
|
async with casc.open_edge(
|
||||||
|
bind_func=fsp_target,
|
||||||
|
) as index:
|
||||||
|
# casc.bind_func = fsp_target
|
||||||
|
# index = await tn.start(fsp_target)
|
||||||
|
dst_shm: ShmArray = dst.rt_shm
|
||||||
|
src_shm: ShmArray = src.rt_shm
|
||||||
|
|
||||||
tracker, index = await n.start(fsp_target)
|
if zero_on_step:
|
||||||
|
last = dst.rt_shm.array[-1:]
|
||||||
|
zeroed = np.zeros(last.shape, dtype=last.dtype)
|
||||||
|
|
||||||
if zero_on_step:
|
profiler(f'{func_name}: fsp up')
|
||||||
last = dst.array[-1:]
|
|
||||||
zeroed = np.zeros(last.shape, dtype=last.dtype)
|
|
||||||
|
|
||||||
profiler(f'{func_name}: fsp up')
|
# sync to client-side actor
|
||||||
|
await ctx.started(index)
|
||||||
|
|
||||||
# sync client
|
# XXX: rt stream with client which we MUST
|
||||||
await ctx.started(index)
|
# open here (and keep it open) in order to make
|
||||||
|
# incremental "updates" as history prepends take
|
||||||
|
# place.
|
||||||
|
async with ctx.open_stream() as client_stream:
|
||||||
|
casc.client_stream: tractor.MsgStream = client_stream
|
||||||
|
|
||||||
# XXX: rt stream with client which we MUST
|
s, step, ld = casc.is_synced()
|
||||||
# open here (and keep it open) in order to make
|
|
||||||
# incremental "updates" as history prepends take
|
|
||||||
# place.
|
|
||||||
async with ctx.open_stream() as client_stream:
|
|
||||||
|
|
||||||
# TODO: these likely should all become
|
# detect sample period step for subscription to increment
|
||||||
# methods of this ``TaskLifetime`` or wtv
|
# signal
|
||||||
# abstraction..
|
times = src.rt_shm.array['time']
|
||||||
async def resync(
|
if len(times) > 1:
|
||||||
tracker: TaskTracker,
|
last_ts = times[-1]
|
||||||
|
delay_s: float = float(last_ts - times[times != last_ts][-1])
|
||||||
|
else:
|
||||||
|
# our default "HFT" sample rate.
|
||||||
|
delay_s: float = _default_delay_s
|
||||||
|
|
||||||
) -> tuple[TaskTracker, int]:
|
# sub and increment the underlying shared memory buffer
|
||||||
# TODO: adopt an incremental update engine/approach
|
# on every step msg received from the global `samplerd`
|
||||||
# where possible here eventually!
|
# service.
|
||||||
log.info(f're-syncing fsp {func_name} to source')
|
async with open_sample_stream(
|
||||||
tracker.cs.cancel()
|
float(delay_s)
|
||||||
await tracker.complete.wait()
|
) as istream:
|
||||||
tracker, index = await n.start(fsp_target)
|
|
||||||
|
|
||||||
# always trigger UI refresh after history update,
|
profiler(f'{func_name}: sample stream up')
|
||||||
# see ``piker.ui._fsp.FspAdmin.open_chain()`` and
|
profiler.finish()
|
||||||
# ``piker.ui._display.trigger_update()``.
|
|
||||||
await client_stream.send({
|
|
||||||
'fsp_update': {
|
|
||||||
'key': dst_shm_token,
|
|
||||||
'first': dst._first.value,
|
|
||||||
'last': dst._last.value,
|
|
||||||
}
|
|
||||||
})
|
|
||||||
return tracker, index
|
|
||||||
|
|
||||||
def is_synced(
|
async for i in istream:
|
||||||
src: ShmArray,
|
# print(f'FSP incrementing {i}')
|
||||||
dst: ShmArray
|
|
||||||
) -> tuple[bool, int, int]:
|
|
||||||
'''
|
|
||||||
Predicate to dertmine if a destination FSP
|
|
||||||
output array is aligned to its source array.
|
|
||||||
|
|
||||||
'''
|
# respawn the compute task if the source
|
||||||
step_diff = src.index - dst.index
|
# array has been updated such that we compute
|
||||||
len_diff = abs(len(src.array) - len(dst.array))
|
# new history from the (prepended) source.
|
||||||
return not (
|
synced, step_diff, _ = casc.is_synced()
|
||||||
# the source is likely backfilling and we must
|
if not synced:
|
||||||
# sync history calculations
|
step_diff: int = await casc.poll_and_sync_to_step()
|
||||||
len_diff > 2
|
|
||||||
|
|
||||||
# we aren't step synced to the source and may be
|
# skip adding a last bar since we should already
|
||||||
# leading/lagging by a step
|
# be step alinged
|
||||||
or step_diff > 1
|
if step_diff == 0:
|
||||||
or step_diff < 0
|
continue
|
||||||
), step_diff, len_diff
|
|
||||||
|
|
||||||
async def poll_and_sync_to_step(
|
# read out last shm row, copy and write new row
|
||||||
tracker: TaskTracker,
|
array = dst_shm.array
|
||||||
src: ShmArray,
|
|
||||||
dst: ShmArray,
|
|
||||||
|
|
||||||
) -> tuple[TaskTracker, int]:
|
# some metrics like vlm should be reset
|
||||||
|
# to zero every step.
|
||||||
|
if zero_on_step:
|
||||||
|
last = zeroed
|
||||||
|
else:
|
||||||
|
last = array[-1:].copy()
|
||||||
|
|
||||||
synced, step_diff, _ = is_synced(src, dst)
|
dst.rt_shm.push(last)
|
||||||
while not synced:
|
|
||||||
tracker, index = await resync(tracker)
|
|
||||||
synced, step_diff, _ = is_synced(src, dst)
|
|
||||||
|
|
||||||
return tracker, step_diff
|
# sync with source buffer's time step
|
||||||
|
src_l2 = src_shm.array[-2:]
|
||||||
|
src_li, src_lt = src_l2[-1][['index', 'time']]
|
||||||
|
src_2li, src_2lt = src_l2[-2][['index', 'time']]
|
||||||
|
dst_shm._array['time'][src_li] = src_lt
|
||||||
|
dst_shm._array['time'][src_2li] = src_2lt
|
||||||
|
|
||||||
s, step, ld = is_synced(src, dst)
|
# last2 = dst.array[-2:]
|
||||||
|
# if (
|
||||||
# detect sample period step for subscription to increment
|
# last2[-1]['index'] != src_li
|
||||||
# signal
|
# or last2[-2]['index'] != src_2li
|
||||||
times = src.array['time']
|
# ):
|
||||||
if len(times) > 1:
|
# dstl2 = list(last2)
|
||||||
last_ts = times[-1]
|
# srcl2 = list(src_l2)
|
||||||
delay_s = float(last_ts - times[times != last_ts][-1])
|
# print(
|
||||||
else:
|
# # f'{dst.token}\n'
|
||||||
# our default "HFT" sample rate.
|
# f'src: {srcl2}\n'
|
||||||
delay_s = _default_delay_s
|
# f'dst: {dstl2}\n'
|
||||||
|
# )
|
||||||
# sub and increment the underlying shared memory buffer
|
|
||||||
# on every step msg received from the global `samplerd`
|
|
||||||
# service.
|
|
||||||
async with open_sample_stream(float(delay_s)) as istream:
|
|
||||||
|
|
||||||
profiler(f'{func_name}: sample stream up')
|
|
||||||
profiler.finish()
|
|
||||||
|
|
||||||
async for i in istream:
|
|
||||||
# print(f'FSP incrementing {i}')
|
|
||||||
|
|
||||||
# respawn the compute task if the source
|
|
||||||
# array has been updated such that we compute
|
|
||||||
# new history from the (prepended) source.
|
|
||||||
synced, step_diff, _ = is_synced(src, dst)
|
|
||||||
if not synced:
|
|
||||||
tracker, step_diff = await poll_and_sync_to_step(
|
|
||||||
tracker,
|
|
||||||
src,
|
|
||||||
dst,
|
|
||||||
)
|
|
||||||
|
|
||||||
# skip adding a last bar since we should already
|
|
||||||
# be step alinged
|
|
||||||
if step_diff == 0:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# read out last shm row, copy and write new row
|
|
||||||
array = dst.array
|
|
||||||
|
|
||||||
# some metrics like vlm should be reset
|
|
||||||
# to zero every step.
|
|
||||||
if zero_on_step:
|
|
||||||
last = zeroed
|
|
||||||
else:
|
|
||||||
last = array[-1:].copy()
|
|
||||||
|
|
||||||
dst.push(last)
|
|
||||||
|
|
||||||
# sync with source buffer's time step
|
|
||||||
src_l2 = src.array[-2:]
|
|
||||||
src_li, src_lt = src_l2[-1][['index', 'time']]
|
|
||||||
src_2li, src_2lt = src_l2[-2][['index', 'time']]
|
|
||||||
dst._array['time'][src_li] = src_lt
|
|
||||||
dst._array['time'][src_2li] = src_2lt
|
|
||||||
|
|
||||||
# last2 = dst.array[-2:]
|
|
||||||
# if (
|
|
||||||
# last2[-1]['index'] != src_li
|
|
||||||
# or last2[-2]['index'] != src_2li
|
|
||||||
# ):
|
|
||||||
# dstl2 = list(last2)
|
|
||||||
# srcl2 = list(src_l2)
|
|
||||||
# print(
|
|
||||||
# # f'{dst.token}\n'
|
|
||||||
# f'src: {srcl2}\n'
|
|
||||||
# f'dst: {dstl2}\n'
|
|
||||||
# )
|
|
||||||
|
|
|
@ -84,10 +84,10 @@ async def open_piker_runtime(
|
||||||
a root actor.
|
a root actor.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
# check for existing runtime, boot it
|
||||||
|
# if not already running.
|
||||||
try:
|
try:
|
||||||
# check for existing runtime
|
actor = tractor.current_actor()
|
||||||
actor = tractor.current_actor().uid
|
|
||||||
|
|
||||||
except tractor._exceptions.NoRuntime:
|
except tractor._exceptions.NoRuntime:
|
||||||
tractor._state._runtime_vars[
|
tractor._state._runtime_vars[
|
||||||
'piker_vars'
|
'piker_vars'
|
||||||
|
@ -116,15 +116,16 @@ async def open_piker_runtime(
|
||||||
enable_modules=enable_modules,
|
enable_modules=enable_modules,
|
||||||
|
|
||||||
**tractor_kwargs,
|
**tractor_kwargs,
|
||||||
) as _,
|
) as actor,
|
||||||
|
|
||||||
open_registry(
|
open_registry(
|
||||||
registry_addrs,
|
registry_addrs,
|
||||||
ensure_exists=False,
|
ensure_exists=False,
|
||||||
) as addrs,
|
) as addrs,
|
||||||
):
|
):
|
||||||
|
assert actor is tractor.current_actor()
|
||||||
yield (
|
yield (
|
||||||
tractor.current_actor(),
|
actor,
|
||||||
addrs,
|
addrs,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
@ -268,28 +269,39 @@ async def maybe_open_pikerd(
|
||||||
# async with open_portal(chan) as arb_portal:
|
# async with open_portal(chan) as arb_portal:
|
||||||
# yield arb_portal
|
# yield arb_portal
|
||||||
|
|
||||||
registry_addrs = registry_addrs or [_default_reg_addr]
|
registry_addrs: list[tuple[str, int]] = (
|
||||||
|
registry_addrs
|
||||||
|
or [_default_reg_addr]
|
||||||
|
)
|
||||||
|
|
||||||
|
pikerd_portal: tractor.Portal | None
|
||||||
async with (
|
async with (
|
||||||
open_piker_runtime(
|
open_piker_runtime(
|
||||||
name=query_name,
|
name=query_name,
|
||||||
registry_addrs=registry_addrs,
|
registry_addrs=registry_addrs,
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
) as _,
|
) as (actor, addrs),
|
||||||
|
|
||||||
|
# try to attach to any existing (host-local) `pikerd`
|
||||||
tractor.find_actor(
|
tractor.find_actor(
|
||||||
_root_dname,
|
_root_dname,
|
||||||
registry_addrs=registry_addrs,
|
registry_addrs=registry_addrs,
|
||||||
only_first=True,
|
only_first=True,
|
||||||
) as portal
|
# raise_on_none=True,
|
||||||
|
) as pikerd_portal,
|
||||||
|
|
||||||
):
|
):
|
||||||
# connect to any existing daemon presuming
|
# connect to any existing remote daemon presuming its
|
||||||
# its registry socket was selected.
|
# registry socket was selected.
|
||||||
if (
|
if pikerd_portal is not None:
|
||||||
portal is not None
|
|
||||||
):
|
# sanity check that we are actually connecting to
|
||||||
yield portal
|
# a remote process and not ourselves.
|
||||||
|
assert actor.uid != pikerd_portal.channel.uid
|
||||||
|
assert registry_addrs
|
||||||
|
|
||||||
|
yield pikerd_portal
|
||||||
return
|
return
|
||||||
|
|
||||||
# presume pikerd role since no daemon could be found at
|
# presume pikerd role since no daemon could be found at
|
||||||
|
|
|
@ -102,7 +102,7 @@ async def open_registry(
|
||||||
not tractor.is_root_process()
|
not tractor.is_root_process()
|
||||||
and not Registry.addrs
|
and not Registry.addrs
|
||||||
):
|
):
|
||||||
Registry.addrs.extend(actor._reg_addrs)
|
Registry.addrs.extend(actor.reg_addrs)
|
||||||
|
|
||||||
if (
|
if (
|
||||||
ensure_exists
|
ensure_exists
|
||||||
|
|
|
@ -139,6 +139,13 @@ class StorageClient(
|
||||||
...
|
...
|
||||||
|
|
||||||
|
|
||||||
|
class TimeseriesNotFound(Exception):
|
||||||
|
'''
|
||||||
|
No timeseries entry can be found for this backend.
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
|
||||||
class StorageConnectionError(ConnectionError):
|
class StorageConnectionError(ConnectionError):
|
||||||
'''
|
'''
|
||||||
Can't connect to the desired tsdb subsys/service.
|
Can't connect to the desired tsdb subsys/service.
|
||||||
|
|
|
@ -140,19 +140,27 @@ def delete(
|
||||||
def anal(
|
def anal(
|
||||||
fqme: str,
|
fqme: str,
|
||||||
period: int = 60,
|
period: int = 60,
|
||||||
|
pdb: bool = False,
|
||||||
|
|
||||||
) -> np.ndarray:
|
) -> np.ndarray:
|
||||||
|
'''
|
||||||
|
Anal-ysis is when you take the data do stuff to it, i think.
|
||||||
|
|
||||||
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
async with (
|
async with (
|
||||||
open_piker_runtime(
|
open_piker_runtime(
|
||||||
|
# are you a bear or boi?
|
||||||
'tsdb_polars_anal',
|
'tsdb_polars_anal',
|
||||||
debug_mode=True,
|
debug_mode=pdb,
|
||||||
|
),
|
||||||
|
open_storage_client() as (
|
||||||
|
mod,
|
||||||
|
client,
|
||||||
),
|
),
|
||||||
open_storage_client() as (mod, client),
|
|
||||||
):
|
):
|
||||||
syms: list[str] = await client.list_keys()
|
syms: list[str] = await client.list_keys()
|
||||||
print(f'{len(syms)} FOUND for {mod.name}')
|
log.info(f'{len(syms)} FOUND for {mod.name}')
|
||||||
|
|
||||||
(
|
(
|
||||||
history,
|
history,
|
||||||
|
|
|
@ -19,7 +19,8 @@
|
||||||
call a poor man's tsdb).
|
call a poor man's tsdb).
|
||||||
|
|
||||||
AKA a `piker`-native file-system native "time series database"
|
AKA a `piker`-native file-system native "time series database"
|
||||||
without needing an extra process and no standard TSDB features, YET!
|
without needing an extra process and no standard TSDB features,
|
||||||
|
YET!
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# TODO: like there's soo much..
|
# TODO: like there's soo much..
|
||||||
|
@ -67,6 +68,7 @@ from piker import config
|
||||||
from piker.data import def_iohlcv_fields
|
from piker.data import def_iohlcv_fields
|
||||||
from piker.data import ShmArray
|
from piker.data import ShmArray
|
||||||
from piker.log import get_logger
|
from piker.log import get_logger
|
||||||
|
from . import TimeseriesNotFound
|
||||||
|
|
||||||
|
|
||||||
log = get_logger('storage.nativedb')
|
log = get_logger('storage.nativedb')
|
||||||
|
@ -228,8 +230,21 @@ class NativeStorageClient:
|
||||||
fqme,
|
fqme,
|
||||||
timeframe,
|
timeframe,
|
||||||
)
|
)
|
||||||
except FileNotFoundError:
|
except FileNotFoundError as fnfe:
|
||||||
return None
|
|
||||||
|
bs_fqme, _, *_ = fqme.rpartition('.')
|
||||||
|
|
||||||
|
possible_matches: list[str] = []
|
||||||
|
for tskey in self._index:
|
||||||
|
if bs_fqme in tskey:
|
||||||
|
possible_matches.append(tskey)
|
||||||
|
|
||||||
|
match_str: str = '\n'.join(sorted(possible_matches))
|
||||||
|
raise TimeseriesNotFound(
|
||||||
|
f'No entry for `{fqme}`?\n'
|
||||||
|
f'Maybe you need a more specific fqme-key like:\n\n'
|
||||||
|
f'{match_str}'
|
||||||
|
) from fnfe
|
||||||
|
|
||||||
times = array['time']
|
times = array['time']
|
||||||
return (
|
return (
|
||||||
|
@ -376,6 +391,8 @@ class NativeStorageClient:
|
||||||
# ...
|
# ...
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: does this need to be async on average?
|
||||||
|
# I guess for any IPC connected backend yes?
|
||||||
@acm
|
@acm
|
||||||
async def get_client(
|
async def get_client(
|
||||||
|
|
||||||
|
@ -393,7 +410,7 @@ async def get_client(
|
||||||
'''
|
'''
|
||||||
datadir: Path = config.get_conf_dir() / 'nativedb'
|
datadir: Path = config.get_conf_dir() / 'nativedb'
|
||||||
if not datadir.is_dir():
|
if not datadir.is_dir():
|
||||||
log.info(f'Creating `nativedb` director: {datadir}')
|
log.info(f'Creating `nativedb` dir: {datadir}')
|
||||||
datadir.mkdir()
|
datadir.mkdir()
|
||||||
|
|
||||||
client = NativeStorageClient(datadir)
|
client = NativeStorageClient(datadir)
|
||||||
|
|
|
@ -390,7 +390,7 @@ class FspAdmin:
|
||||||
complete: trio.Event,
|
complete: trio.Event,
|
||||||
started: trio.Event,
|
started: trio.Event,
|
||||||
fqme: str,
|
fqme: str,
|
||||||
dst_fsp_flume: Flume,
|
dst_flume: Flume,
|
||||||
conf: dict,
|
conf: dict,
|
||||||
target: Fsp,
|
target: Fsp,
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
|
@ -408,16 +408,14 @@ class FspAdmin:
|
||||||
# chaining entrypoint
|
# chaining entrypoint
|
||||||
cascade,
|
cascade,
|
||||||
|
|
||||||
|
# TODO: can't we just drop this and expect
|
||||||
|
# far end to read the src flume's .mkt.fqme?
|
||||||
# data feed key
|
# data feed key
|
||||||
fqme=fqme,
|
fqme=fqme,
|
||||||
|
|
||||||
# TODO: pass `Flume.to_msg()`s here?
|
src_flume_addr=self.flume.to_msg(),
|
||||||
# mems
|
dst_flume_addr=dst_flume.to_msg(),
|
||||||
src_shm_token=self.flume.rt_shm.token,
|
ns_path=ns_path, # edge-bind-func
|
||||||
dst_shm_token=dst_fsp_flume.rt_shm.token,
|
|
||||||
|
|
||||||
# target
|
|
||||||
ns_path=ns_path,
|
|
||||||
|
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
zero_on_step=conf.get('zero_on_step', False),
|
zero_on_step=conf.get('zero_on_step', False),
|
||||||
|
@ -431,14 +429,14 @@ class FspAdmin:
|
||||||
ctx.open_stream() as stream,
|
ctx.open_stream() as stream,
|
||||||
):
|
):
|
||||||
|
|
||||||
dst_fsp_flume.stream: tractor.MsgStream = stream
|
dst_flume.stream: tractor.MsgStream = stream
|
||||||
|
|
||||||
# register output data
|
# register output data
|
||||||
self._registry[
|
self._registry[
|
||||||
(fqme, ns_path)
|
(fqme, ns_path)
|
||||||
] = (
|
] = (
|
||||||
stream,
|
stream,
|
||||||
dst_fsp_flume.rt_shm,
|
dst_flume.rt_shm,
|
||||||
complete
|
complete
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -515,7 +513,7 @@ class FspAdmin:
|
||||||
broker='piker',
|
broker='piker',
|
||||||
_atype='fsp',
|
_atype='fsp',
|
||||||
)
|
)
|
||||||
dst_fsp_flume = Flume(
|
dst_flume = Flume(
|
||||||
mkt=mkt,
|
mkt=mkt,
|
||||||
_rt_shm_token=dst_shm.token,
|
_rt_shm_token=dst_shm.token,
|
||||||
first_quote={},
|
first_quote={},
|
||||||
|
@ -543,13 +541,13 @@ class FspAdmin:
|
||||||
complete,
|
complete,
|
||||||
started,
|
started,
|
||||||
fqme,
|
fqme,
|
||||||
dst_fsp_flume,
|
dst_flume,
|
||||||
conf,
|
conf,
|
||||||
target,
|
target,
|
||||||
loglevel,
|
loglevel,
|
||||||
)
|
)
|
||||||
|
|
||||||
return dst_fsp_flume, started
|
return dst_flume, started
|
||||||
|
|
||||||
async def open_fsp_chart(
|
async def open_fsp_chart(
|
||||||
self,
|
self,
|
||||||
|
|
Loading…
Reference in New Issue