Compare commits
12 Commits
29ce8de462
...
b9af6176c5
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | b9af6176c5 | |
Tyler Goodlet | dd0167b9a5 | |
Tyler Goodlet | 9e71e0768f | |
Tyler Goodlet | 6029f39a3f | |
Tyler Goodlet | 656e2c6a88 | |
Tyler Goodlet | b8065a413b | |
Tyler Goodlet | 9245d24b47 | |
Tyler Goodlet | 22bd83943b | |
Tyler Goodlet | b94931bbdd | |
Tyler Goodlet | 239c1c457e | |
Tyler Goodlet | 24a54a7085 | |
Tyler Goodlet | ebd1eb114e |
|
@ -117,9 +117,57 @@ SecondFactorDevice=
|
|||
|
||||
# If you use the IBKR Mobile app for second factor authentication,
|
||||
# and you fail to complete the process before the time limit imposed
|
||||
# by IBKR, you can use this setting to tell IBC to exit: arrangements
|
||||
# can then be made to automatically restart IBC in order to initiate
|
||||
# the login sequence afresh. Otherwise, manual intervention at TWS's
|
||||
# by IBKR, this setting tells IBC whether to automatically restart
|
||||
# the login sequence, giving you another opportunity to complete
|
||||
# second factor authentication.
|
||||
#
|
||||
# Permitted values are 'yes' and 'no'.
|
||||
#
|
||||
# If this setting is not present or has no value, then the value
|
||||
# of the deprecated ExitAfterSecondFactorAuthenticationTimeout is
|
||||
# used instead. If this also has no value, then this setting defaults
|
||||
# to 'no'.
|
||||
#
|
||||
# NB: you must be using IBC v3.14.0 or later to use this setting:
|
||||
# earlier versions ignore it.
|
||||
|
||||
ReloginAfterSecondFactorAuthenticationTimeout=
|
||||
|
||||
|
||||
# This setting is only relevant if
|
||||
# ReloginAfterSecondFactorAuthenticationTimeout is set to 'yes',
|
||||
# or if ExitAfterSecondFactorAuthenticationTimeout is set to 'yes'.
|
||||
#
|
||||
# It controls how long (in seconds) IBC waits for login to complete
|
||||
# after the user acknowledges the second factor authentication
|
||||
# alert at the IBKR Mobile app. If login has not completed after
|
||||
# this time, IBC terminates.
|
||||
# The default value is 60.
|
||||
|
||||
SecondFactorAuthenticationExitInterval=
|
||||
|
||||
|
||||
# This setting specifies the timeout for second factor authentication
|
||||
# imposed by IB. The value is in seconds. You should not change this
|
||||
# setting unless you have reason to believe that IB has changed the
|
||||
# timeout. The default value is 180.
|
||||
|
||||
SecondFactorAuthenticationTimeout=180
|
||||
|
||||
|
||||
# DEPRECATED SETTING
|
||||
# ------------------
|
||||
#
|
||||
# ExitAfterSecondFactorAuthenticationTimeout - THIS SETTING WILL BE
|
||||
# REMOVED IN A FUTURE RELEASE. For IBC version 3.14.0 and later, see
|
||||
# the notes for ReloginAfterSecondFactorAuthenticationTimeout above.
|
||||
#
|
||||
# For IBC versions earlier than 3.14.0: If you use the IBKR Mobile
|
||||
# app for second factor authentication, and you fail to complete the
|
||||
# process before the time limit imposed by IBKR, you can use this
|
||||
# setting to tell IBC to exit: arrangements can then be made to
|
||||
# automatically restart IBC in order to initiate the login sequence
|
||||
# afresh. Otherwise, manual intervention at TWS's
|
||||
# Second Factor Authentication dialog is needed to complete the
|
||||
# login.
|
||||
#
|
||||
|
@ -132,29 +180,18 @@ SecondFactorDevice=
|
|||
ExitAfterSecondFactorAuthenticationTimeout=no
|
||||
|
||||
|
||||
# This setting is only relevant if
|
||||
# ExitAfterSecondFactorAuthenticationTimeout is set to 'yes'.
|
||||
#
|
||||
# It controls how long (in seconds) IBC waits for login to complete
|
||||
# after the user acknowledges the second factor authentication
|
||||
# alert at the IBKR Mobile app. If login has not completed after
|
||||
# this time, IBC terminates.
|
||||
# The default value is 40.
|
||||
|
||||
SecondFactorAuthenticationExitInterval=
|
||||
|
||||
|
||||
# Trading Mode
|
||||
# ------------
|
||||
#
|
||||
# TWS 955 introduced a new Trading Mode combo box on its login
|
||||
# dialog. This indicates whether the live account or the paper
|
||||
# trading account corresponding to the supplied credentials is
|
||||
# to be used. The allowed values are 'live' (the default) and
|
||||
# 'paper'. For earlier versions of TWS this setting has no
|
||||
# effect.
|
||||
# This indicates whether the live account or the paper trading
|
||||
# account corresponding to the supplied credentials is to be used.
|
||||
# The allowed values are 'live' (the default) and 'paper'.
|
||||
#
|
||||
# If this is set to 'live', then the credentials for the live
|
||||
# account must be supplied. If it is set to 'paper', then either
|
||||
# the live or the paper-trading credentials may be supplied.
|
||||
|
||||
TradingMode=
|
||||
TradingMode=paper
|
||||
|
||||
|
||||
# Paper-trading Account Warning
|
||||
|
@ -188,7 +225,7 @@ AcceptNonBrokerageAccountWarning=yes
|
|||
#
|
||||
# The default value is 60.
|
||||
|
||||
LoginDialogDisplayTimeout=20
|
||||
LoginDialogDisplayTimeout=60
|
||||
|
||||
|
||||
|
||||
|
@ -217,7 +254,15 @@ LoginDialogDisplayTimeout=20
|
|||
# but they are acceptable.
|
||||
#
|
||||
# The default is the current working directory when IBC is
|
||||
# started.
|
||||
# started, unless the TWS_SETTINGS_PATH setting in the relevant
|
||||
# start script is set.
|
||||
#
|
||||
# If both this setting and TWS_SETTINGS_PATH are set, then this
|
||||
# setting takes priority. Note that if they have different values,
|
||||
# auto-restart will not work.
|
||||
#
|
||||
# NB: this setting is now DEPRECATED. You should use the
|
||||
# TWS_SETTINGS_PATH setting in the relevant start script.
|
||||
|
||||
IbDir=/root/Jts
|
||||
|
||||
|
@ -284,15 +329,32 @@ ExistingSessionDetectedAction=primary
|
|||
# Override TWS API Port Number
|
||||
# ----------------------------
|
||||
#
|
||||
# If OverrideTwsApiPort is set to an integer, IBC changes the
|
||||
# 'Socket port' in TWS's API configuration to that number shortly
|
||||
# after startup. Leaving the setting blank will make no change to
|
||||
# the current setting. This setting is only intended for use in
|
||||
# certain specialized situations where the port number needs to
|
||||
# If OverrideTwsApiPort is set to an integer, IBC changes the
|
||||
# 'Socket port' in TWS's API configuration to that number shortly
|
||||
# after startup (but note that for the FIX Gateway, this setting is
|
||||
# actually stored in jts.ini rather than the Gateway's settings
|
||||
# file). Leaving the setting blank will make no change to
|
||||
# the current setting. This setting is only intended for use in
|
||||
# certain specialized situations where the port number needs to
|
||||
# be set dynamically at run-time, and for the FIX Gateway: most
|
||||
# non-FIX users will never need it, so don't use it unless you know
|
||||
# you need it.
|
||||
|
||||
OverrideTwsApiPort=4000
|
||||
|
||||
|
||||
# Override TWS Master Client ID
|
||||
# -----------------------------
|
||||
#
|
||||
# If OverrideTwsMasterClientID is set to an integer, IBC changes the
|
||||
# 'Master Client ID' value in TWS's API configuration to that
|
||||
# value shortly after startup. Leaving the setting blank will make
|
||||
# no change to the current setting. This setting is only intended
|
||||
# for use in certain specialized situations where the value needs to
|
||||
# be set dynamically at run-time: most users will never need it,
|
||||
# so don't use it unless you know you need it.
|
||||
|
||||
; OverrideTwsApiPort=4002
|
||||
OverrideTwsMasterClientID=
|
||||
|
||||
|
||||
# Read-only Login
|
||||
|
@ -302,11 +364,13 @@ ExistingSessionDetectedAction=primary
|
|||
# account security programme, the user will not be asked to perform
|
||||
# the second factor authentication action, and login to TWS will
|
||||
# occur automatically in read-only mode: in this mode, placing or
|
||||
# managing orders is not allowed. If set to 'no', and the user is
|
||||
# enrolled in IB's account security programme, the user must perform
|
||||
# the relevant second factor authentication action to complete the
|
||||
# login.
|
||||
|
||||
# managing orders is not allowed.
|
||||
#
|
||||
# If set to 'no', and the user is enrolled in IB's account security
|
||||
# programme, the second factor authentication process is handled
|
||||
# according to the Second Factor Authentication Settings described
|
||||
# elsewhere in this file.
|
||||
#
|
||||
# If the user is not enrolled in IB's account security programme,
|
||||
# this setting is ignored. The default is 'no'.
|
||||
|
||||
|
@ -326,7 +390,44 @@ ReadOnlyLogin=no
|
|||
# set the relevant checkbox (this only needs to be done once) and
|
||||
# not provide a value for this setting.
|
||||
|
||||
ReadOnlyApi=no
|
||||
ReadOnlyApi=
|
||||
|
||||
|
||||
# API Precautions
|
||||
# ---------------
|
||||
#
|
||||
# These settings relate to the corresponding 'Precautions' checkboxes in the
|
||||
# API section of the Global Configuration dialog.
|
||||
#
|
||||
# For all of these, the accepted values are:
|
||||
# - 'yes' sets the checkbox
|
||||
# - 'no' clears the checkbox
|
||||
# - if not set, the existing TWS/Gateway configuration is unchanged
|
||||
#
|
||||
# NB: thess settings are really only supplied for the benefit of new TWS
|
||||
# or Gateway instances that are being automatically installed and
|
||||
# started without user intervention, or where user settings are not preserved
|
||||
# between sessions (eg some Docker containers). Where a user is involved, they
|
||||
# should use the Global Configuration to set the relevant checkboxes and not
|
||||
# provide values for these settings.
|
||||
|
||||
BypassOrderPrecautions=
|
||||
|
||||
BypassBondWarning=
|
||||
|
||||
BypassNegativeYieldToWorstConfirmation=
|
||||
|
||||
BypassCalledBondWarning=
|
||||
|
||||
BypassSameActionPairTradeWarning=
|
||||
|
||||
BypassPriceBasedVolatilityRiskWarning=
|
||||
|
||||
BypassUSStocksMarketDataInSharesWarning=
|
||||
|
||||
BypassRedirectOrderWarning=
|
||||
|
||||
BypassNoOverfillProtectionPrecaution=
|
||||
|
||||
|
||||
# Market data size for US stocks - lots or shares
|
||||
|
@ -381,54 +482,145 @@ AcceptBidAskLastSizeDisplayUpdateNotification=accept
|
|||
SendMarketDataInLotsForUSstocks=
|
||||
|
||||
|
||||
# Trusted API Client IPs
|
||||
# ----------------------
|
||||
#
|
||||
# NB: THIS SETTING IS ONLY RELEVANT FOR THE GATEWAY, AND ONLY WHEN FIX=yes.
|
||||
# In all other cases it is ignored.
|
||||
#
|
||||
# This is a list of IP addresses separated by commas. API clients with IP
|
||||
# addresses in this list are able to connect to the API without Gateway
|
||||
# generating the 'Incoming connection' popup.
|
||||
#
|
||||
# Note that 127.0.0.1 is always permitted to connect, so do not include it
|
||||
# in this setting.
|
||||
|
||||
TrustedTwsApiClientIPs=
|
||||
|
||||
|
||||
# Reset Order ID Sequence
|
||||
# -----------------------
|
||||
#
|
||||
# The setting resets the order id sequence for orders submitted via the API, so
|
||||
# that the next invocation of the `NextValidId` API callback will return the
|
||||
# value 1. The reset occurs when TWS starts.
|
||||
#
|
||||
# Note that order ids are reset for all API clients, except those that have
|
||||
# outstanding (ie incomplete) orders: their order id sequence carries on as
|
||||
# before.
|
||||
#
|
||||
# Valid values are 'yes', 'true', 'false' and 'no'. The default is 'no'.
|
||||
|
||||
ResetOrderIdsAtStart=
|
||||
|
||||
|
||||
# This setting specifies IBC's action when TWS displays the dialog asking for
|
||||
# confirmation of a request to reset the API order id sequence.
|
||||
#
|
||||
# Note that the Gateway never displays this dialog, so this setting is ignored
|
||||
# for a Gateway session.
|
||||
#
|
||||
# Valid values consist of two strings separated by a solidus '/'. The first
|
||||
# value specifies the action to take when the order id reset request resulted
|
||||
# from setting ResetOrderIdsAtStart=yes. The second specifies the action to
|
||||
# take when the order id reset request is a result of the user clicking the
|
||||
# 'Reset API order ID sequence' button in the API configuration. Each value
|
||||
# must be one of the following:
|
||||
#
|
||||
# 'confirm'
|
||||
# order ids will be reset
|
||||
#
|
||||
# 'reject'
|
||||
# order ids will not be reset
|
||||
#
|
||||
# 'ignore'
|
||||
# IBC will ignore the dialog. The user must take action.
|
||||
#
|
||||
# The default setting is ignore/ignore
|
||||
|
||||
# Examples:
|
||||
#
|
||||
# 'confirm/reject' - confirm order id reset only if ResetOrderIdsAtStart=yes
|
||||
# and reject any user-initiated requests
|
||||
#
|
||||
# 'ignore/confirm' - user must decide what to do if ResetOrderIdsAtStart=yes
|
||||
# and confirm user-initiated requests
|
||||
#
|
||||
# 'reject/ignore' - reject order id reset if ResetOrderIdsAtStart=yes but
|
||||
# allow user to handle user-initiated requests
|
||||
|
||||
ConfirmOrderIdReset=
|
||||
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 4. TWS Auto-Closedown
|
||||
# 4. TWS Auto-Logoff and Auto-Restart
|
||||
# =============================================================================
|
||||
#
|
||||
# IMPORTANT NOTE: Starting with TWS 974, this setting no longer
|
||||
# works properly, because IB have changed the way TWS handles its
|
||||
# autologoff mechanism.
|
||||
# TWS and Gateway insist on being restarted every day. Two alternative
|
||||
# automatic options are offered:
|
||||
#
|
||||
# You should now configure the TWS autologoff time to something
|
||||
# convenient for you, and restart IBC each day.
|
||||
# - Auto-Logoff: at a specified time, TWS shuts down tidily, without
|
||||
# restarting.
|
||||
#
|
||||
# Alternatively, discontinue use of IBC and use the auto-relogin
|
||||
# mechanism within TWS 974 and later versions (note that the
|
||||
# auto-relogin mechanism provided by IB is not available if you
|
||||
# use IBC).
|
||||
# - Auto-Restart: at a specified time, TWS shuts down and then restarts
|
||||
# without the user having to re-autheticate.
|
||||
#
|
||||
# The normal way to configure the time at which this happens is via the Lock
|
||||
# and Exit section of the Configuration dialog. Once this time has been
|
||||
# configured in this way, the setting persists until the user changes it again.
|
||||
#
|
||||
# However, there are situations where there is no user available to do this
|
||||
# configuration, or where there is no persistent storage (for example some
|
||||
# Docker images). In such cases, the auto-restart or auto-logoff time can be
|
||||
# set whenever IBC starts with the settings below.
|
||||
#
|
||||
# The value, if specified, must be a time in HH:MM AM/PM format, for example
|
||||
# 08:00 AM or 10:00 PM. Note that there must be a single space between the
|
||||
# two parts of this value; also that midnight is "12:00 AM" and midday is
|
||||
# "12:00 PM".
|
||||
#
|
||||
# If no value is specified for either setting, the currently configured
|
||||
# settings will apply. If a value is supplied for one setting, the other
|
||||
# setting is cleared. If values are supplied for both settings, only the
|
||||
# auto-restart time is set, and the auto-logoff time is cleared.
|
||||
#
|
||||
# Note that for a normal TWS/Gateway installation with persistent storage
|
||||
# (for example on a desktop computer) the value will be persisted as if the
|
||||
# user had set it via the configuration dialog.
|
||||
#
|
||||
# If you choose to auto-restart, you should take note of the considerations
|
||||
# described at the link below. Note that where this information mentions
|
||||
# 'manual authentication', restarting IBC will do the job (IBKR does not
|
||||
# recognise the existence of IBC in its docuemntation).
|
||||
#
|
||||
# https://www.interactivebrokers.com/en/software/tws/twsguide.htm#usersguidebook/configuretws/auto_restart_info.htm
|
||||
#
|
||||
# If you use the "RESTART" command via the IBC command server, and IBC is
|
||||
# running any version of the Gateway (or a version of TWS earlier than 1018),
|
||||
# note that this will set the Auto-Restart time in Gateway/TWS's configuration
|
||||
# dialog to the time at which the restart actually happens (which may be up to
|
||||
# a minute after the RESTART command is issued). To prevent future auto-
|
||||
# restarts at this time, you must make sure you have set AutoLogoffTime or
|
||||
# AutoRestartTime to your desired value before running IBC. NB: this does not
|
||||
# apply to TWS from version 1018 onwards.
|
||||
|
||||
# Set to yes or no (lower case).
|
||||
#
|
||||
# yes means allow TWS to shut down automatically at its
|
||||
# specified shutdown time, which is set via the TWS
|
||||
# configuration menu.
|
||||
#
|
||||
# no means TWS never shuts down automatically.
|
||||
#
|
||||
# NB: IB recommends that you do not keep TWS running
|
||||
# continuously. If you set this setting to 'no', you may
|
||||
# experience incorrect TWS operation.
|
||||
#
|
||||
# NB: the default for this setting is 'no'. Since this will
|
||||
# only work properly with TWS versions earlier than 974, you
|
||||
# should explicitly set this to 'yes' for version 974 and later.
|
||||
|
||||
IbAutoClosedown=yes
|
||||
AutoLogoffTime=
|
||||
|
||||
AutoRestartTime=
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 5. TWS Tidy Closedown Time
|
||||
# =============================================================================
|
||||
#
|
||||
# NB: starting with TWS 974 this is no longer a useful option
|
||||
# because both TWS and Gateway now have the same auto-logoff
|
||||
# mechanism, and IBC can no longer avoid this.
|
||||
# Specifies a time at which TWS will close down tidily, with no restart.
|
||||
#
|
||||
# Note that giving this setting a value does not change TWS's
|
||||
# auto-logoff in any way: any setting will be additional to the
|
||||
# TWS auto-logoff.
|
||||
# There is little reason to use this setting. It is similar to AutoLogoffTime,
|
||||
# but can include a day-of-the-week, whereas AutoLogoffTime and AutoRestartTime
|
||||
# apply every day. So for example you could use ClosedownAt in conjunction with
|
||||
# AutoRestartTime to shut down TWS on Friday evenings after the markets
|
||||
# close, without it running on Saturday as well.
|
||||
#
|
||||
# To tell IBC to tidily close TWS at a specified time every
|
||||
# day, set this value to <hh:mm>, for example:
|
||||
|
@ -487,7 +679,7 @@ AcceptIncomingConnectionAction=reject
|
|||
# no means the dialog remains on display and must be
|
||||
# handled by the user.
|
||||
|
||||
AllowBlindTrading=yes
|
||||
AllowBlindTrading=no
|
||||
|
||||
|
||||
# Save Settings on a Schedule
|
||||
|
@ -530,6 +722,26 @@ AllowBlindTrading=yes
|
|||
SaveTwsSettingsAt=
|
||||
|
||||
|
||||
# Confirm Crypto Currency Orders Automatically
|
||||
# --------------------------------------------
|
||||
#
|
||||
# When you place an order for a cryptocurrency contract, a dialog is displayed
|
||||
# asking you to confirm that you want to place the order, and notifying you
|
||||
# that you are placing an order to trade cryptocurrency with Paxos, a New York
|
||||
# limited trust company, and not at Interactive Brokers.
|
||||
#
|
||||
# transmit means that the order will be placed automatically, and the
|
||||
# dialog will then be closed
|
||||
#
|
||||
# cancel means that the order will not be placed, and the dialog will
|
||||
# then be closed
|
||||
#
|
||||
# manual means that IBC will take no action and the user must deal
|
||||
# with the dialog
|
||||
|
||||
ConfirmCryptoCurrencyOrders=transmit
|
||||
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 7. Settings Specific to Indian Versions of TWS
|
||||
|
@ -566,13 +778,17 @@ DismissNSEComplianceNotice=yes
|
|||
#
|
||||
# The port number that IBC listens on for commands
|
||||
# such as "STOP". DO NOT set this to the port number
|
||||
# used for TWS API connections. There is no good reason
|
||||
# to change this setting unless the port is used by
|
||||
# some other application (typically another instance of
|
||||
# IBC). The default value is 0, which tells IBC not to
|
||||
# start the command server
|
||||
# used for TWS API connections.
|
||||
#
|
||||
# The convention is to use 7462 for this port,
|
||||
# but it must be set to a different value from any other
|
||||
# IBC instance that might run at the same time.
|
||||
#
|
||||
# The default value is 0, which tells IBC not to start
|
||||
# the command server
|
||||
|
||||
#CommandServerPort=7462
|
||||
CommandServerPort=0
|
||||
|
||||
|
||||
# Permitted Command Sources
|
||||
|
@ -583,19 +799,19 @@ DismissNSEComplianceNotice=yes
|
|||
# IBC. Commands can always be sent from the
|
||||
# same host as IBC is running on.
|
||||
|
||||
ControlFrom=127.0.0.1
|
||||
ControlFrom=
|
||||
|
||||
|
||||
# Address for Receiving Commands
|
||||
# ------------------------------
|
||||
#
|
||||
# Specifies the IP address on which the Command Server
|
||||
# is so listen. For a multi-homed host, this can be used
|
||||
# is to listen. For a multi-homed host, this can be used
|
||||
# to specify that connection requests are only to be
|
||||
# accepted on the specified address. The default is to
|
||||
# accept connection requests on all local addresses.
|
||||
|
||||
BindAddress=127.0.0.1
|
||||
BindAddress=
|
||||
|
||||
|
||||
# Command Prompt
|
||||
|
@ -621,7 +837,7 @@ CommandPrompt=
|
|||
# information is sent. The default is that such information
|
||||
# is not sent.
|
||||
|
||||
SuppressInfoMessages=no
|
||||
SuppressInfoMessages=yes
|
||||
|
||||
|
||||
|
||||
|
@ -651,10 +867,10 @@ SuppressInfoMessages=no
|
|||
# The LogStructureScope setting indicates which windows are
|
||||
# eligible for structure logging:
|
||||
#
|
||||
# - if set to 'known', only windows that IBC recognizes
|
||||
# are eligible - these are windows that IBC has some
|
||||
# interest in monitoring, usually to take some action
|
||||
# on the user's behalf;
|
||||
# - (default value) if set to 'known', only windows that
|
||||
# IBC recognizes are eligible - these are windows that
|
||||
# IBC has some interest in monitoring, usually to take
|
||||
# some action on the user's behalf;
|
||||
#
|
||||
# - if set to 'unknown', only windows that IBC does not
|
||||
# recognize are eligible. Most windows displayed by
|
||||
|
@ -667,9 +883,8 @@ SuppressInfoMessages=no
|
|||
# - if set to 'all', then every window displayed by TWS
|
||||
# is eligible.
|
||||
#
|
||||
# The default value is 'known'.
|
||||
|
||||
LogStructureScope=all
|
||||
LogStructureScope=known
|
||||
|
||||
|
||||
# When to Log Window Structure
|
||||
|
@ -682,13 +897,15 @@ LogStructureScope=all
|
|||
# structure of an eligible window the first time it
|
||||
# is encountered;
|
||||
#
|
||||
# - if set to 'openclose', the structure is logged every
|
||||
# time an eligible window is opened or closed;
|
||||
#
|
||||
# - if set to 'activate', the structure is logged every
|
||||
# time an eligible window is made active;
|
||||
#
|
||||
# - if set to 'never' or 'no' or 'false', structure
|
||||
# information is never logged.
|
||||
# - (default value) if set to 'never' or 'no' or 'false',
|
||||
# structure information is never logged.
|
||||
#
|
||||
# The default value is 'never'.
|
||||
|
||||
LogStructureWhen=never
|
||||
|
||||
|
@ -708,4 +925,3 @@ LogStructureWhen=never
|
|||
#LogComponents=
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -327,7 +327,11 @@ class MktPair(Struct, frozen=True):
|
|||
) -> dict:
|
||||
d = super().to_dict(**kwargs)
|
||||
d['src'] = self.src.to_dict(**kwargs)
|
||||
d['dst'] = self.dst.to_dict(**kwargs)
|
||||
|
||||
if not isinstance(self.dst, str):
|
||||
d['dst'] = self.dst.to_dict(**kwargs)
|
||||
else:
|
||||
d['dst'] = str(self.dst)
|
||||
|
||||
d['price_tick'] = str(self.price_tick)
|
||||
d['size_tick'] = str(self.size_tick)
|
||||
|
@ -349,11 +353,16 @@ class MktPair(Struct, frozen=True):
|
|||
Constructor for a received msg-dict normally received over IPC.
|
||||
|
||||
'''
|
||||
dst_asset_msg = msg.pop('dst')
|
||||
dst = Asset.from_msg(dst_asset_msg) # .copy()
|
||||
if not isinstance(
|
||||
dst_asset_msg := msg.pop('dst'),
|
||||
str,
|
||||
):
|
||||
dst: Asset = Asset.from_msg(dst_asset_msg) # .copy()
|
||||
else:
|
||||
dst: str = dst_asset_msg
|
||||
|
||||
src_asset_msg = msg.pop('src')
|
||||
src = Asset.from_msg(src_asset_msg) # .copy()
|
||||
src_asset_msg: dict = msg.pop('src')
|
||||
src: Asset = Asset.from_msg(src_asset_msg) # .copy()
|
||||
|
||||
# XXX NOTE: ``msgspec`` can encode `Decimal` but it doesn't
|
||||
# decide to it by default since we aren't spec-cing these
|
||||
|
|
|
@ -229,8 +229,8 @@ _samplings: dict[int, tuple[str, str]] = {
|
|||
# throughput can be made faster during backfilling.
|
||||
60: (
|
||||
'1 min',
|
||||
'1 D',
|
||||
pendulum.duration(days=1),
|
||||
'2 D',
|
||||
pendulum.duration(days=2),
|
||||
),
|
||||
}
|
||||
|
||||
|
|
|
@ -818,7 +818,7 @@ async def stream_quotes(
|
|||
details: ibis.ContractDetails
|
||||
async with (
|
||||
open_data_client() as proxy,
|
||||
trio.open_nursery() as tn,
|
||||
# trio.open_nursery() as tn,
|
||||
):
|
||||
mkt, details = await get_mkt_info(
|
||||
sym,
|
||||
|
|
|
@ -214,7 +214,7 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
|
|||
last = time.time()
|
||||
async for pattern in stream:
|
||||
log.info(f'received {pattern}')
|
||||
now = time.time()
|
||||
now: float = time.time()
|
||||
|
||||
# this causes tractor hang...
|
||||
# assert 0
|
||||
|
@ -261,7 +261,9 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
|
|||
# defined adhoc symbol set.
|
||||
stock_results = []
|
||||
|
||||
async def stash_results(target: Awaitable[list]):
|
||||
async def extend_results(
|
||||
target: Awaitable[list]
|
||||
) -> None:
|
||||
try:
|
||||
results = await target
|
||||
except tractor.trionics.Lagged:
|
||||
|
@ -274,7 +276,7 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
|
|||
with trio.move_on_after(3) as cs:
|
||||
async with trio.open_nursery() as sn:
|
||||
sn.start_soon(
|
||||
stash_results,
|
||||
extend_results,
|
||||
proxy.search_symbols(
|
||||
pattern=pattern,
|
||||
upto=5,
|
||||
|
@ -289,8 +291,10 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
|
|||
f'Search timeout? {proxy._aio_ns.ib.client}'
|
||||
)
|
||||
continue
|
||||
else:
|
||||
elif stock_results:
|
||||
break
|
||||
# else:
|
||||
await tractor.pause()
|
||||
|
||||
# # match against our ad-hoc set immediately
|
||||
# adhoc_matches = fuzzy.extract(
|
||||
|
|
|
@ -42,35 +42,15 @@ if TYPE_CHECKING:
|
|||
from .feed import Feed
|
||||
|
||||
|
||||
# TODO: ideas for further abstractions as per
|
||||
# https://github.com/pikers/piker/issues/216 and
|
||||
# https://github.com/pikers/piker/issues/270:
|
||||
# - a ``Cascade`` would be the minimal "connection" of 2 ``Flumes``
|
||||
# as per circuit parlance:
|
||||
# https://en.wikipedia.org/wiki/Two-port_network#Cascade_connection
|
||||
# - could cover the combination of our `FspAdmin` and the
|
||||
# backend `.fsp._engine` related machinery to "connect" one flume
|
||||
# to another?
|
||||
# - a (financial signal) ``Flow`` would be the a "collection" of such
|
||||
# minmial cascades. Some engineering based jargon concepts:
|
||||
# - https://en.wikipedia.org/wiki/Signal_chain
|
||||
# - https://en.wikipedia.org/wiki/Daisy_chain_(electrical_engineering)
|
||||
# - https://en.wikipedia.org/wiki/Audio_signal_flow
|
||||
# - https://en.wikipedia.org/wiki/Digital_signal_processing#Implementation
|
||||
# - https://en.wikipedia.org/wiki/Dataflow_programming
|
||||
# - https://en.wikipedia.org/wiki/Signal_programming
|
||||
# - https://en.wikipedia.org/wiki/Incremental_computing
|
||||
|
||||
|
||||
class Flume(Struct):
|
||||
'''
|
||||
Composite reference type which points to all the addressing handles
|
||||
and other meta-data necessary for the read, measure and management
|
||||
of a set of real-time updated data flows.
|
||||
Composite reference type which points to all the addressing
|
||||
handles and other meta-data necessary for the read, measure and
|
||||
management of a set of real-time updated data flows.
|
||||
|
||||
Can be thought of as a "flow descriptor" or "flow frame" which
|
||||
describes the high level properties of a set of data flows that can
|
||||
be used seamlessly across process-memory boundaries.
|
||||
describes the high level properties of a set of data flows that
|
||||
can be used seamlessly across process-memory boundaries.
|
||||
|
||||
Each instance's sub-components normally includes:
|
||||
- a msg oriented quote stream provided via an IPC transport
|
||||
|
@ -93,6 +73,7 @@ class Flume(Struct):
|
|||
# private shm refs loaded dynamically from tokens
|
||||
_hist_shm: ShmArray | None = None
|
||||
_rt_shm: ShmArray | None = None
|
||||
_readonly: bool = True
|
||||
|
||||
stream: tractor.MsgStream | None = None
|
||||
izero_hist: int = 0
|
||||
|
@ -109,7 +90,7 @@ class Flume(Struct):
|
|||
if self._rt_shm is None:
|
||||
self._rt_shm = attach_shm_array(
|
||||
token=self._rt_shm_token,
|
||||
readonly=True,
|
||||
readonly=self._readonly,
|
||||
)
|
||||
|
||||
return self._rt_shm
|
||||
|
@ -122,12 +103,10 @@ class Flume(Struct):
|
|||
'No shm token has been set for the history buffer?'
|
||||
)
|
||||
|
||||
if (
|
||||
self._hist_shm is None
|
||||
):
|
||||
if self._hist_shm is None:
|
||||
self._hist_shm = attach_shm_array(
|
||||
token=self._hist_shm_token,
|
||||
readonly=True,
|
||||
readonly=self._readonly,
|
||||
)
|
||||
|
||||
return self._hist_shm
|
||||
|
@ -146,10 +125,10 @@ class Flume(Struct):
|
|||
period and ratio between them.
|
||||
|
||||
'''
|
||||
times = self.hist_shm.array['time']
|
||||
end = pendulum.from_timestamp(times[-1])
|
||||
start = pendulum.from_timestamp(times[times != times[-1]][-1])
|
||||
hist_step_size_s = (end - start).seconds
|
||||
times: np.ndarray = self.hist_shm.array['time']
|
||||
end: float | int = pendulum.from_timestamp(times[-1])
|
||||
start: float | int = pendulum.from_timestamp(times[times != times[-1]][-1])
|
||||
hist_step_size_s: float = (end - start).seconds
|
||||
|
||||
times = self.rt_shm.array['time']
|
||||
end = pendulum.from_timestamp(times[-1])
|
||||
|
@ -169,17 +148,25 @@ class Flume(Struct):
|
|||
msg = self.to_dict()
|
||||
msg['mkt'] = self.mkt.to_dict()
|
||||
|
||||
# can't serialize the stream or feed objects, it's expected
|
||||
# you'll have a ref to it since this msg should be rxed on
|
||||
# a stream on whatever far end IPC..
|
||||
# NOTE: pop all un-msg-serializable fields:
|
||||
# - `tractor.MsgStream`
|
||||
# - `Feed`
|
||||
# - `Shmarray`
|
||||
# it's expected the `.from_msg()` on the other side
|
||||
# will get instead some kind of msg-compat version
|
||||
# that it can load.
|
||||
msg.pop('stream')
|
||||
msg.pop('feed')
|
||||
msg.pop('_rt_shm')
|
||||
msg.pop('_hist_shm')
|
||||
|
||||
return msg
|
||||
|
||||
@classmethod
|
||||
def from_msg(
|
||||
cls,
|
||||
msg: dict,
|
||||
readonly: bool = True,
|
||||
|
||||
) -> dict:
|
||||
'''
|
||||
|
@ -190,7 +177,11 @@ class Flume(Struct):
|
|||
mkt_msg = msg.pop('mkt')
|
||||
from ..accounting import MktPair # cycle otherwise..
|
||||
mkt = MktPair.from_msg(mkt_msg)
|
||||
return cls(mkt=mkt, **msg)
|
||||
msg |= {'_readonly': readonly}
|
||||
return cls(
|
||||
mkt=mkt,
|
||||
**msg,
|
||||
)
|
||||
|
||||
def get_index(
|
||||
self,
|
||||
|
|
|
@ -57,6 +57,7 @@ from ._sampling import (
|
|||
from ..brokers._util import (
|
||||
DataUnavailable,
|
||||
)
|
||||
from ..storage import TimeseriesNotFound
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from bidict import bidict
|
||||
|
@ -690,13 +691,18 @@ async def tsdb_backfill(
|
|||
# but if not then below the remaining history can be lazy
|
||||
# loaded?
|
||||
fqme: str = mkt.fqme
|
||||
tsdb_entry: tuple | None = await storage.load(
|
||||
fqme,
|
||||
timeframe=timeframe,
|
||||
)
|
||||
|
||||
last_tsdb_dt: datetime | None = None
|
||||
if tsdb_entry:
|
||||
try:
|
||||
tsdb_entry: tuple | None = await storage.load(
|
||||
fqme,
|
||||
timeframe=timeframe,
|
||||
)
|
||||
except TimeseriesNotFound:
|
||||
log.warning(
|
||||
f'No timeseries yet for {fqme}'
|
||||
)
|
||||
|
||||
else:
|
||||
(
|
||||
tsdb_history,
|
||||
first_tsdb_dt,
|
||||
|
@ -963,7 +969,8 @@ async def manage_history(
|
|||
sub_for_broadcasts=False,
|
||||
|
||||
) as sample_stream:
|
||||
# register 1s and 1m buffers with the global incrementer task
|
||||
# register 1s and 1m buffers with the global
|
||||
# incrementer task
|
||||
log.info(f'Connected to sampler stream: {sample_stream}')
|
||||
|
||||
for timeframe in [60, 1]:
|
||||
|
|
|
@ -26,7 +26,10 @@ from ._api import (
|
|||
maybe_mk_fsp_shm,
|
||||
Fsp,
|
||||
)
|
||||
from ._engine import cascade
|
||||
from ._engine import (
|
||||
cascade,
|
||||
Cascade,
|
||||
)
|
||||
from ._volume import (
|
||||
dolla_vlm,
|
||||
flow_rates,
|
||||
|
@ -35,6 +38,7 @@ from ._volume import (
|
|||
|
||||
__all__: list[str] = [
|
||||
'cascade',
|
||||
'Cascade',
|
||||
'maybe_mk_fsp_shm',
|
||||
'Fsp',
|
||||
'dolla_vlm',
|
||||
|
@ -46,9 +50,12 @@ __all__: list[str] = [
|
|||
async def latency(
|
||||
source: 'TickStream[Dict[str, float]]', # noqa
|
||||
ohlcv: np.ndarray
|
||||
|
||||
) -> AsyncIterator[np.ndarray]:
|
||||
"""Latency measurements, broker to piker.
|
||||
"""
|
||||
'''
|
||||
Latency measurements, broker to piker.
|
||||
|
||||
'''
|
||||
# TODO: do we want to offer yielding this async
|
||||
# before the rt data connection comes up?
|
||||
|
||||
|
|
|
@ -18,13 +18,12 @@
|
|||
core task logic for processing chains
|
||||
|
||||
'''
|
||||
from dataclasses import dataclass
|
||||
from __future__ import annotations
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from functools import partial
|
||||
from typing import (
|
||||
AsyncIterator,
|
||||
Callable,
|
||||
Optional,
|
||||
Union,
|
||||
)
|
||||
|
||||
import numpy as np
|
||||
|
@ -33,9 +32,9 @@ from trio_typing import TaskStatus
|
|||
import tractor
|
||||
from tractor.msg import NamespacePath
|
||||
|
||||
from piker.types import Struct
|
||||
from ..log import get_logger, get_console_log
|
||||
from .. import data
|
||||
from ..data import attach_shm_array
|
||||
from ..data.feed import (
|
||||
Flume,
|
||||
Feed,
|
||||
|
@ -56,12 +55,6 @@ from ..toolz import Profiler
|
|||
log = get_logger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TaskTracker:
|
||||
complete: trio.Event
|
||||
cs: trio.CancelScope
|
||||
|
||||
|
||||
async def filter_quotes_by_sym(
|
||||
|
||||
sym: str,
|
||||
|
@ -82,30 +75,168 @@ async def filter_quotes_by_sym(
|
|||
if quote:
|
||||
yield quote
|
||||
|
||||
# TODO: unifying the abstractions in this FSP subsys/layer:
|
||||
# -[ ] move the `.data.flows.Flume` type into this
|
||||
# module/subsys/pkg?
|
||||
# -[ ] ideas for further abstractions as per
|
||||
# - https://github.com/pikers/piker/issues/216,
|
||||
# - https://github.com/pikers/piker/issues/270:
|
||||
# - a (financial signal) ``Flow`` would be the a "collection" of such
|
||||
# minmial cascades. Some engineering based jargon concepts:
|
||||
# - https://en.wikipedia.org/wiki/Signal_chain
|
||||
# - https://en.wikipedia.org/wiki/Daisy_chain_(electrical_engineering)
|
||||
# - https://en.wikipedia.org/wiki/Audio_signal_flow
|
||||
# - https://en.wikipedia.org/wiki/Digital_signal_processing#Implementation
|
||||
# - https://en.wikipedia.org/wiki/Dataflow_programming
|
||||
# - https://en.wikipedia.org/wiki/Signal_programming
|
||||
# - https://en.wikipedia.org/wiki/Incremental_computing
|
||||
# - https://en.wikipedia.org/wiki/Signal-flow_graph
|
||||
# - https://en.wikipedia.org/wiki/Signal-flow_graph#Basic_components
|
||||
|
||||
async def fsp_compute(
|
||||
# -[ ] we probably want to eval THE BELOW design and unify with the
|
||||
# proto `TaskManager` in the `tractor` dev branch as well as with
|
||||
# our below idea for `Cascade`:
|
||||
# - https://github.com/goodboy/tractor/pull/363
|
||||
class Cascade(Struct):
|
||||
'''
|
||||
As per sig-proc engineering parlance, this is a chaining of
|
||||
`Flume`s, which are themselves collections of "Streams"
|
||||
implemented currently via `ShmArray`s.
|
||||
|
||||
A `Cascade` is be the minimal "connection" of 2 `Flumes`
|
||||
as per circuit parlance:
|
||||
https://en.wikipedia.org/wiki/Two-port_network#Cascade_connection
|
||||
|
||||
TODO:
|
||||
-[ ] could cover the combination of our `FspAdmin` and the
|
||||
backend `.fsp._engine` related machinery to "connect" one flume
|
||||
to another?
|
||||
|
||||
'''
|
||||
# TODO: make these `Flume`s
|
||||
src: Flume
|
||||
dst: Flume
|
||||
tn: trio.Nursery
|
||||
fsp: Fsp # UI-side middleware ctl API
|
||||
|
||||
# filled during cascade/.bind_func() (fsp_compute) init phases
|
||||
bind_func: Callable | None = None
|
||||
complete: trio.Event | None = None
|
||||
cs: trio.CancelScope | None = None
|
||||
client_stream: tractor.MsgStream | None = None
|
||||
|
||||
async def resync(self) -> int:
|
||||
# TODO: adopt an incremental update engine/approach
|
||||
# where possible here eventually!
|
||||
log.info(f're-syncing fsp {self.fsp.name} to source')
|
||||
self.cs.cancel()
|
||||
await self.complete.wait()
|
||||
index: int = await self.tn.start(self.bind_func)
|
||||
|
||||
# always trigger UI refresh after history update,
|
||||
# see ``piker.ui._fsp.FspAdmin.open_chain()`` and
|
||||
# ``piker.ui._display.trigger_update()``.
|
||||
dst_shm: ShmArray = self.dst.rt_shm
|
||||
await self.client_stream.send({
|
||||
'fsp_update': {
|
||||
'key': dst_shm.token,
|
||||
'first': dst_shm._first.value,
|
||||
'last': dst_shm._last.value,
|
||||
}
|
||||
})
|
||||
return index
|
||||
|
||||
def is_synced(self) -> tuple[bool, int, int]:
|
||||
'''
|
||||
Predicate to dertmine if a destination FSP
|
||||
output array is aligned to its source array.
|
||||
|
||||
'''
|
||||
src_shm: ShmArray = self.src.rt_shm
|
||||
dst_shm: ShmArray = self.dst.rt_shm
|
||||
step_diff = src_shm.index - dst_shm.index
|
||||
len_diff = abs(len(src_shm.array) - len(dst_shm.array))
|
||||
synced: bool = not (
|
||||
# the source is likely backfilling and we must
|
||||
# sync history calculations
|
||||
len_diff > 2
|
||||
|
||||
# we aren't step synced to the source and may be
|
||||
# leading/lagging by a step
|
||||
or step_diff > 1
|
||||
or step_diff < 0
|
||||
)
|
||||
if not synced:
|
||||
fsp: Fsp = self.fsp
|
||||
log.warning(
|
||||
'***DESYNCED FSP***\n'
|
||||
f'{fsp.ns_path}@{src_shm.token}\n'
|
||||
f'step_diff: {step_diff}\n'
|
||||
f'len_diff: {len_diff}\n'
|
||||
)
|
||||
return (
|
||||
synced,
|
||||
step_diff,
|
||||
len_diff,
|
||||
)
|
||||
|
||||
async def poll_and_sync_to_step(self) -> int:
|
||||
synced, step_diff, _ = self.is_synced()
|
||||
while not synced:
|
||||
await self.resync()
|
||||
synced, step_diff, _ = self.is_synced()
|
||||
|
||||
return step_diff
|
||||
|
||||
@acm
|
||||
async def open_edge(
|
||||
self,
|
||||
bind_func: Callable,
|
||||
) -> int:
|
||||
self.bind_func = bind_func
|
||||
index = await self.tn.start(bind_func)
|
||||
yield index
|
||||
# TODO: what do we want on teardown/error?
|
||||
# -[ ] dynamic reconnection after update?
|
||||
|
||||
|
||||
async def connect_streams(
|
||||
casc: Cascade,
|
||||
mkt: MktPair,
|
||||
flume: Flume,
|
||||
quote_stream: trio.abc.ReceiveChannel,
|
||||
src: Flume,
|
||||
dst: Flume,
|
||||
|
||||
src: ShmArray,
|
||||
dst: ShmArray,
|
||||
|
||||
func: Callable,
|
||||
edge_func: Callable,
|
||||
|
||||
# attach_stream: bool = False,
|
||||
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Stream and per-sample compute and write the cascade of
|
||||
2 `Flumes`/streams given some operating `func`.
|
||||
|
||||
https://en.wikipedia.org/wiki/Signal-flow_graph#Basic_components
|
||||
|
||||
Not literally, but something like:
|
||||
|
||||
edge_func(Flume_in) -> Flume_out
|
||||
|
||||
'''
|
||||
profiler = Profiler(
|
||||
delayed=False,
|
||||
disabled=True
|
||||
)
|
||||
|
||||
fqme = mkt.fqme
|
||||
out_stream = func(
|
||||
# TODO: just pull it from src.mkt.fqme no?
|
||||
# fqme: str = mkt.fqme
|
||||
fqme: str = src.mkt.fqme
|
||||
|
||||
# TODO: dynamic introspection of what the underlying (vertex)
|
||||
# function actually requires from input node (flumes) then
|
||||
# deliver those inputs as part of a graph "compilation" step?
|
||||
out_stream = edge_func(
|
||||
|
||||
# TODO: do we even need this if we do the feed api right?
|
||||
# shouldn't a local stream do this before we get a handle
|
||||
|
@ -113,20 +244,21 @@ async def fsp_compute(
|
|||
# async itertools style?
|
||||
filter_quotes_by_sym(fqme, quote_stream),
|
||||
|
||||
# XXX: currently the ``ohlcv`` arg
|
||||
flume.rt_shm,
|
||||
# XXX: currently the ``ohlcv`` arg, but we should allow
|
||||
# (dynamic) requests for src flume (node) streams?
|
||||
src.rt_shm,
|
||||
)
|
||||
|
||||
# HISTORY COMPUTE PHASE
|
||||
# conduct a single iteration of fsp with historical bars input
|
||||
# and get historical output.
|
||||
history_output: Union[
|
||||
dict[str, np.ndarray], # multi-output case
|
||||
np.ndarray, # single output case
|
||||
]
|
||||
history_output: (
|
||||
dict[str, np.ndarray] # multi-output case
|
||||
| np.ndarray, # single output case
|
||||
)
|
||||
history_output = await anext(out_stream)
|
||||
|
||||
func_name = func.__name__
|
||||
func_name = edge_func.__name__
|
||||
profiler(f'{func_name} generated history')
|
||||
|
||||
# build struct array with an 'index' field to push as history
|
||||
|
@ -134,10 +266,12 @@ async def fsp_compute(
|
|||
# TODO: push using a[['f0', 'f1', .., 'fn']] = .. syntax no?
|
||||
# if the output array is multi-field then push
|
||||
# each respective field.
|
||||
fields = getattr(dst.array.dtype, 'fields', None).copy()
|
||||
dst_shm: ShmArray = dst.rt_shm
|
||||
fields = getattr(dst_shm.array.dtype, 'fields', None).copy()
|
||||
fields.pop('index')
|
||||
history_by_field: Optional[np.ndarray] = None
|
||||
src_time = src.array['time']
|
||||
history_by_field: np.ndarray | None = None
|
||||
src_shm: ShmArray = src.rt_shm
|
||||
src_time = src_shm.array['time']
|
||||
|
||||
if (
|
||||
fields and
|
||||
|
@ -156,7 +290,7 @@ async def fsp_compute(
|
|||
if history_by_field is None:
|
||||
|
||||
if output is None:
|
||||
length = len(src.array)
|
||||
length = len(src_shm.array)
|
||||
else:
|
||||
length = len(output)
|
||||
|
||||
|
@ -165,7 +299,7 @@ async def fsp_compute(
|
|||
# will be pushed to shm.
|
||||
history_by_field = np.zeros(
|
||||
length,
|
||||
dtype=dst.array.dtype
|
||||
dtype=dst_shm.array.dtype
|
||||
)
|
||||
|
||||
if output is None:
|
||||
|
@ -182,13 +316,13 @@ async def fsp_compute(
|
|||
)
|
||||
history_by_field = np.zeros(
|
||||
len(history_output),
|
||||
dtype=dst.array.dtype
|
||||
dtype=dst_shm.array.dtype
|
||||
)
|
||||
history_by_field[func_name] = history_output
|
||||
|
||||
history_by_field['time'] = src_time[-len(history_by_field):]
|
||||
|
||||
history_output['time'] = src.array['time']
|
||||
history_output['time'] = src_shm.array['time']
|
||||
|
||||
# TODO: XXX:
|
||||
# THERE'S A BIG BUG HERE WITH THE `index` field since we're
|
||||
|
@ -201,11 +335,11 @@ async def fsp_compute(
|
|||
# is `index` aware such that historical data can be indexed
|
||||
# relative to the true first datum? Not sure if this is sane
|
||||
# for incremental compuations.
|
||||
first = dst._first.value = src._first.value
|
||||
first = dst_shm._first.value = src_shm._first.value
|
||||
|
||||
# TODO: can we use this `start` flag instead of the manual
|
||||
# setting above?
|
||||
index = dst.push(
|
||||
index = dst_shm.push(
|
||||
history_by_field,
|
||||
start=first,
|
||||
)
|
||||
|
@ -216,12 +350,9 @@ async def fsp_compute(
|
|||
# setup a respawn handle
|
||||
with trio.CancelScope() as cs:
|
||||
|
||||
# TODO: might be better to just make a "restart" method where
|
||||
# the target task is spawned implicitly and then the event is
|
||||
# set via some higher level api? At that poing we might as well
|
||||
# be writing a one-cancels-one nursery though right?
|
||||
tracker = TaskTracker(trio.Event(), cs)
|
||||
task_status.started((tracker, index))
|
||||
casc.cs = cs
|
||||
casc.complete = trio.Event()
|
||||
task_status.started(index)
|
||||
|
||||
profiler(f'{func_name} yield last index')
|
||||
|
||||
|
@ -235,12 +366,12 @@ async def fsp_compute(
|
|||
log.debug(f"{func_name}: {processed}")
|
||||
key, output = processed
|
||||
# dst.array[-1][key] = output
|
||||
dst.array[[key, 'time']][-1] = (
|
||||
dst_shm.array[[key, 'time']][-1] = (
|
||||
output,
|
||||
# TODO: what about pushing ``time.time_ns()``
|
||||
# in which case we'll need to round at the graphics
|
||||
# processing / sampling layer?
|
||||
src.array[-1]['time']
|
||||
src_shm.array[-1]['time']
|
||||
)
|
||||
|
||||
# NOTE: for now we aren't streaming this to the consumer
|
||||
|
@ -252,7 +383,7 @@ async def fsp_compute(
|
|||
# N-consumers who subscribe for the real-time output,
|
||||
# which we'll likely want to implement using local-mem
|
||||
# chans for the fan out?
|
||||
# index = src.index
|
||||
# index = src_shm.index
|
||||
# if attach_stream:
|
||||
# await client_stream.send(index)
|
||||
|
||||
|
@ -262,7 +393,7 @@ async def fsp_compute(
|
|||
# log.info(f'FSP quote too fast: {hz}')
|
||||
# last = time.time()
|
||||
finally:
|
||||
tracker.complete.set()
|
||||
casc.complete.set()
|
||||
|
||||
|
||||
@tractor.context
|
||||
|
@ -273,15 +404,15 @@ async def cascade(
|
|||
# data feed key
|
||||
fqme: str,
|
||||
|
||||
src_shm_token: dict,
|
||||
dst_shm_token: tuple[str, np.dtype],
|
||||
|
||||
# flume pair cascaded using an "edge function"
|
||||
src_flume_addr: dict,
|
||||
dst_flume_addr: dict,
|
||||
ns_path: NamespacePath,
|
||||
|
||||
shm_registry: dict[str, _Token],
|
||||
|
||||
zero_on_step: bool = False,
|
||||
loglevel: Optional[str] = None,
|
||||
loglevel: str | None = None,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
|
@ -297,8 +428,14 @@ async def cascade(
|
|||
if loglevel:
|
||||
get_console_log(loglevel)
|
||||
|
||||
src = attach_shm_array(token=src_shm_token)
|
||||
dst = attach_shm_array(readonly=False, token=dst_shm_token)
|
||||
src: Flume = Flume.from_msg(src_flume_addr)
|
||||
dst: Flume = Flume.from_msg(
|
||||
dst_flume_addr,
|
||||
readonly=False,
|
||||
)
|
||||
|
||||
# src: ShmArray = attach_shm_array(token=src_shm_token)
|
||||
# dst: ShmArray = attach_shm_array(readonly=False, token=dst_shm_token)
|
||||
|
||||
reg = _load_builtins()
|
||||
lines = '\n'.join([f'{key.rpartition(":")[2]} => {key}' for key in reg])
|
||||
|
@ -306,11 +443,11 @@ async def cascade(
|
|||
f'Registered FSP set:\n{lines}'
|
||||
)
|
||||
|
||||
# update actorlocal flows table which registers
|
||||
# readonly "instances" of this fsp for symbol/source
|
||||
# so that consumer fsps can look it up by source + fsp.
|
||||
# TODO: ugh i hate this wind/unwind to list over the wire
|
||||
# but not sure how else to do it.
|
||||
# NOTE XXX: update actorlocal flows table which registers
|
||||
# readonly "instances" of this fsp for symbol/source so that
|
||||
# consumer fsps can look it up by source + fsp.
|
||||
# TODO: ugh i hate this wind/unwind to list over the wire but
|
||||
# not sure how else to do it.
|
||||
for (token, fsp_name, dst_token) in shm_registry:
|
||||
Fsp._flow_registry[(
|
||||
_Token.from_msg(token),
|
||||
|
@ -320,12 +457,15 @@ async def cascade(
|
|||
fsp: Fsp = reg.get(
|
||||
NamespacePath(ns_path)
|
||||
)
|
||||
func = fsp.func
|
||||
func: Callable = fsp.func
|
||||
|
||||
if not func:
|
||||
# TODO: assume it's a func target path
|
||||
raise ValueError(f'Unknown fsp target: {ns_path}')
|
||||
|
||||
_fqme: str = src.mkt.fqme
|
||||
assert _fqme == fqme
|
||||
|
||||
# open a data feed stream with requested broker
|
||||
feed: Feed
|
||||
async with data.feed.maybe_open_feed(
|
||||
|
@ -339,177 +479,142 @@ async def cascade(
|
|||
|
||||
) as feed:
|
||||
|
||||
flume = feed.flumes[fqme]
|
||||
mkt = flume.mkt
|
||||
assert src.token == flume.rt_shm.token
|
||||
flume: Flume = feed.flumes[fqme]
|
||||
# XXX: can't do this since flume.feed will be set XD
|
||||
# assert flume == src
|
||||
assert flume.mkt == src.mkt
|
||||
mkt: MktPair = flume.mkt
|
||||
|
||||
# NOTE: FOR NOW, sanity checks around the feed as being
|
||||
# always the src flume (until we get to fancier/lengthier
|
||||
# chains/graphs.
|
||||
assert src.rt_shm.token == flume.rt_shm.token
|
||||
|
||||
# XXX: won't work bc the _hist_shm_token value will be
|
||||
# list[list] after IPC..
|
||||
# assert flume.to_msg() == src_flume_addr
|
||||
|
||||
profiler(f'{func}: feed up')
|
||||
|
||||
func_name = func.__name__
|
||||
func_name: str = func.__name__
|
||||
async with (
|
||||
trio.open_nursery() as n,
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
# TODO: might be better to just make a "restart" method where
|
||||
# the target task is spawned implicitly and then the event is
|
||||
# set via some higher level api? At that poing we might as well
|
||||
# be writing a one-cancels-one nursery though right?
|
||||
casc = Cascade(
|
||||
src,
|
||||
dst,
|
||||
tn,
|
||||
fsp,
|
||||
)
|
||||
|
||||
# TODO: this seems like it should be wrapped somewhere?
|
||||
fsp_target = partial(
|
||||
|
||||
fsp_compute,
|
||||
connect_streams,
|
||||
casc=casc,
|
||||
mkt=mkt,
|
||||
flume=flume,
|
||||
quote_stream=flume.stream,
|
||||
|
||||
# shm
|
||||
# flumes and shm passthrough
|
||||
src=src,
|
||||
dst=dst,
|
||||
|
||||
# target
|
||||
func=func
|
||||
# chain function which takes src flume input(s)
|
||||
# and renders dst flume output(s)
|
||||
edge_func=func
|
||||
)
|
||||
async with casc.open_edge(
|
||||
bind_func=fsp_target,
|
||||
) as index:
|
||||
# casc.bind_func = fsp_target
|
||||
# index = await tn.start(fsp_target)
|
||||
dst_shm: ShmArray = dst.rt_shm
|
||||
src_shm: ShmArray = src.rt_shm
|
||||
|
||||
tracker, index = await n.start(fsp_target)
|
||||
if zero_on_step:
|
||||
last = dst.rt_shm.array[-1:]
|
||||
zeroed = np.zeros(last.shape, dtype=last.dtype)
|
||||
|
||||
if zero_on_step:
|
||||
last = dst.array[-1:]
|
||||
zeroed = np.zeros(last.shape, dtype=last.dtype)
|
||||
profiler(f'{func_name}: fsp up')
|
||||
|
||||
profiler(f'{func_name}: fsp up')
|
||||
# sync to client-side actor
|
||||
await ctx.started(index)
|
||||
|
||||
# sync client
|
||||
await ctx.started(index)
|
||||
# XXX: rt stream with client which we MUST
|
||||
# open here (and keep it open) in order to make
|
||||
# incremental "updates" as history prepends take
|
||||
# place.
|
||||
async with ctx.open_stream() as client_stream:
|
||||
casc.client_stream: tractor.MsgStream = client_stream
|
||||
|
||||
# XXX: rt stream with client which we MUST
|
||||
# open here (and keep it open) in order to make
|
||||
# incremental "updates" as history prepends take
|
||||
# place.
|
||||
async with ctx.open_stream() as client_stream:
|
||||
s, step, ld = casc.is_synced()
|
||||
|
||||
# TODO: these likely should all become
|
||||
# methods of this ``TaskLifetime`` or wtv
|
||||
# abstraction..
|
||||
async def resync(
|
||||
tracker: TaskTracker,
|
||||
# detect sample period step for subscription to increment
|
||||
# signal
|
||||
times = src.rt_shm.array['time']
|
||||
if len(times) > 1:
|
||||
last_ts = times[-1]
|
||||
delay_s: float = float(last_ts - times[times != last_ts][-1])
|
||||
else:
|
||||
# our default "HFT" sample rate.
|
||||
delay_s: float = _default_delay_s
|
||||
|
||||
) -> tuple[TaskTracker, int]:
|
||||
# TODO: adopt an incremental update engine/approach
|
||||
# where possible here eventually!
|
||||
log.info(f're-syncing fsp {func_name} to source')
|
||||
tracker.cs.cancel()
|
||||
await tracker.complete.wait()
|
||||
tracker, index = await n.start(fsp_target)
|
||||
# sub and increment the underlying shared memory buffer
|
||||
# on every step msg received from the global `samplerd`
|
||||
# service.
|
||||
async with open_sample_stream(
|
||||
float(delay_s)
|
||||
) as istream:
|
||||
|
||||
# always trigger UI refresh after history update,
|
||||
# see ``piker.ui._fsp.FspAdmin.open_chain()`` and
|
||||
# ``piker.ui._display.trigger_update()``.
|
||||
await client_stream.send({
|
||||
'fsp_update': {
|
||||
'key': dst_shm_token,
|
||||
'first': dst._first.value,
|
||||
'last': dst._last.value,
|
||||
}
|
||||
})
|
||||
return tracker, index
|
||||
profiler(f'{func_name}: sample stream up')
|
||||
profiler.finish()
|
||||
|
||||
def is_synced(
|
||||
src: ShmArray,
|
||||
dst: ShmArray
|
||||
) -> tuple[bool, int, int]:
|
||||
'''
|
||||
Predicate to dertmine if a destination FSP
|
||||
output array is aligned to its source array.
|
||||
async for i in istream:
|
||||
# print(f'FSP incrementing {i}')
|
||||
|
||||
'''
|
||||
step_diff = src.index - dst.index
|
||||
len_diff = abs(len(src.array) - len(dst.array))
|
||||
return not (
|
||||
# the source is likely backfilling and we must
|
||||
# sync history calculations
|
||||
len_diff > 2
|
||||
# respawn the compute task if the source
|
||||
# array has been updated such that we compute
|
||||
# new history from the (prepended) source.
|
||||
synced, step_diff, _ = casc.is_synced()
|
||||
if not synced:
|
||||
step_diff: int = await casc.poll_and_sync_to_step()
|
||||
|
||||
# we aren't step synced to the source and may be
|
||||
# leading/lagging by a step
|
||||
or step_diff > 1
|
||||
or step_diff < 0
|
||||
), step_diff, len_diff
|
||||
# skip adding a last bar since we should already
|
||||
# be step alinged
|
||||
if step_diff == 0:
|
||||
continue
|
||||
|
||||
async def poll_and_sync_to_step(
|
||||
tracker: TaskTracker,
|
||||
src: ShmArray,
|
||||
dst: ShmArray,
|
||||
# read out last shm row, copy and write new row
|
||||
array = dst_shm.array
|
||||
|
||||
) -> tuple[TaskTracker, int]:
|
||||
# some metrics like vlm should be reset
|
||||
# to zero every step.
|
||||
if zero_on_step:
|
||||
last = zeroed
|
||||
else:
|
||||
last = array[-1:].copy()
|
||||
|
||||
synced, step_diff, _ = is_synced(src, dst)
|
||||
while not synced:
|
||||
tracker, index = await resync(tracker)
|
||||
synced, step_diff, _ = is_synced(src, dst)
|
||||
dst.rt_shm.push(last)
|
||||
|
||||
return tracker, step_diff
|
||||
# sync with source buffer's time step
|
||||
src_l2 = src_shm.array[-2:]
|
||||
src_li, src_lt = src_l2[-1][['index', 'time']]
|
||||
src_2li, src_2lt = src_l2[-2][['index', 'time']]
|
||||
dst_shm._array['time'][src_li] = src_lt
|
||||
dst_shm._array['time'][src_2li] = src_2lt
|
||||
|
||||
s, step, ld = is_synced(src, dst)
|
||||
|
||||
# detect sample period step for subscription to increment
|
||||
# signal
|
||||
times = src.array['time']
|
||||
if len(times) > 1:
|
||||
last_ts = times[-1]
|
||||
delay_s = float(last_ts - times[times != last_ts][-1])
|
||||
else:
|
||||
# our default "HFT" sample rate.
|
||||
delay_s = _default_delay_s
|
||||
|
||||
# sub and increment the underlying shared memory buffer
|
||||
# on every step msg received from the global `samplerd`
|
||||
# service.
|
||||
async with open_sample_stream(float(delay_s)) as istream:
|
||||
|
||||
profiler(f'{func_name}: sample stream up')
|
||||
profiler.finish()
|
||||
|
||||
async for i in istream:
|
||||
# print(f'FSP incrementing {i}')
|
||||
|
||||
# respawn the compute task if the source
|
||||
# array has been updated such that we compute
|
||||
# new history from the (prepended) source.
|
||||
synced, step_diff, _ = is_synced(src, dst)
|
||||
if not synced:
|
||||
tracker, step_diff = await poll_and_sync_to_step(
|
||||
tracker,
|
||||
src,
|
||||
dst,
|
||||
)
|
||||
|
||||
# skip adding a last bar since we should already
|
||||
# be step alinged
|
||||
if step_diff == 0:
|
||||
continue
|
||||
|
||||
# read out last shm row, copy and write new row
|
||||
array = dst.array
|
||||
|
||||
# some metrics like vlm should be reset
|
||||
# to zero every step.
|
||||
if zero_on_step:
|
||||
last = zeroed
|
||||
else:
|
||||
last = array[-1:].copy()
|
||||
|
||||
dst.push(last)
|
||||
|
||||
# sync with source buffer's time step
|
||||
src_l2 = src.array[-2:]
|
||||
src_li, src_lt = src_l2[-1][['index', 'time']]
|
||||
src_2li, src_2lt = src_l2[-2][['index', 'time']]
|
||||
dst._array['time'][src_li] = src_lt
|
||||
dst._array['time'][src_2li] = src_2lt
|
||||
|
||||
# last2 = dst.array[-2:]
|
||||
# if (
|
||||
# last2[-1]['index'] != src_li
|
||||
# or last2[-2]['index'] != src_2li
|
||||
# ):
|
||||
# dstl2 = list(last2)
|
||||
# srcl2 = list(src_l2)
|
||||
# print(
|
||||
# # f'{dst.token}\n'
|
||||
# f'src: {srcl2}\n'
|
||||
# f'dst: {dstl2}\n'
|
||||
# )
|
||||
# last2 = dst.array[-2:]
|
||||
# if (
|
||||
# last2[-1]['index'] != src_li
|
||||
# or last2[-2]['index'] != src_2li
|
||||
# ):
|
||||
# dstl2 = list(last2)
|
||||
# srcl2 = list(src_l2)
|
||||
# print(
|
||||
# # f'{dst.token}\n'
|
||||
# f'src: {srcl2}\n'
|
||||
# f'dst: {dstl2}\n'
|
||||
# )
|
||||
|
|
|
@ -84,10 +84,10 @@ async def open_piker_runtime(
|
|||
a root actor.
|
||||
|
||||
'''
|
||||
# check for existing runtime, boot it
|
||||
# if not already running.
|
||||
try:
|
||||
# check for existing runtime
|
||||
actor = tractor.current_actor().uid
|
||||
|
||||
actor = tractor.current_actor()
|
||||
except tractor._exceptions.NoRuntime:
|
||||
tractor._state._runtime_vars[
|
||||
'piker_vars'
|
||||
|
@ -116,15 +116,16 @@ async def open_piker_runtime(
|
|||
enable_modules=enable_modules,
|
||||
|
||||
**tractor_kwargs,
|
||||
) as _,
|
||||
) as actor,
|
||||
|
||||
open_registry(
|
||||
registry_addrs,
|
||||
ensure_exists=False,
|
||||
) as addrs,
|
||||
):
|
||||
assert actor is tractor.current_actor()
|
||||
yield (
|
||||
tractor.current_actor(),
|
||||
actor,
|
||||
addrs,
|
||||
)
|
||||
else:
|
||||
|
@ -268,28 +269,39 @@ async def maybe_open_pikerd(
|
|||
# async with open_portal(chan) as arb_portal:
|
||||
# yield arb_portal
|
||||
|
||||
registry_addrs = registry_addrs or [_default_reg_addr]
|
||||
registry_addrs: list[tuple[str, int]] = (
|
||||
registry_addrs
|
||||
or [_default_reg_addr]
|
||||
)
|
||||
|
||||
pikerd_portal: tractor.Portal | None
|
||||
async with (
|
||||
open_piker_runtime(
|
||||
name=query_name,
|
||||
registry_addrs=registry_addrs,
|
||||
loglevel=loglevel,
|
||||
**kwargs,
|
||||
) as _,
|
||||
) as (actor, addrs),
|
||||
|
||||
# try to attach to any existing (host-local) `pikerd`
|
||||
tractor.find_actor(
|
||||
_root_dname,
|
||||
registry_addrs=registry_addrs,
|
||||
only_first=True,
|
||||
) as portal
|
||||
# raise_on_none=True,
|
||||
) as pikerd_portal,
|
||||
|
||||
):
|
||||
# connect to any existing daemon presuming
|
||||
# its registry socket was selected.
|
||||
if (
|
||||
portal is not None
|
||||
):
|
||||
yield portal
|
||||
# connect to any existing remote daemon presuming its
|
||||
# registry socket was selected.
|
||||
if pikerd_portal is not None:
|
||||
|
||||
# sanity check that we are actually connecting to
|
||||
# a remote process and not ourselves.
|
||||
assert actor.uid != pikerd_portal.channel.uid
|
||||
assert registry_addrs
|
||||
|
||||
yield pikerd_portal
|
||||
return
|
||||
|
||||
# presume pikerd role since no daemon could be found at
|
||||
|
|
|
@ -102,7 +102,7 @@ async def open_registry(
|
|||
not tractor.is_root_process()
|
||||
and not Registry.addrs
|
||||
):
|
||||
Registry.addrs.extend(actor._reg_addrs)
|
||||
Registry.addrs.extend(actor.reg_addrs)
|
||||
|
||||
if (
|
||||
ensure_exists
|
||||
|
|
|
@ -139,6 +139,13 @@ class StorageClient(
|
|||
...
|
||||
|
||||
|
||||
class TimeseriesNotFound(Exception):
|
||||
'''
|
||||
No timeseries entry can be found for this backend.
|
||||
|
||||
'''
|
||||
|
||||
|
||||
class StorageConnectionError(ConnectionError):
|
||||
'''
|
||||
Can't connect to the desired tsdb subsys/service.
|
||||
|
|
|
@ -140,19 +140,27 @@ def delete(
|
|||
def anal(
|
||||
fqme: str,
|
||||
period: int = 60,
|
||||
pdb: bool = False,
|
||||
|
||||
) -> np.ndarray:
|
||||
'''
|
||||
Anal-ysis is when you take the data do stuff to it, i think.
|
||||
|
||||
'''
|
||||
async def main():
|
||||
async with (
|
||||
open_piker_runtime(
|
||||
# are you a bear or boi?
|
||||
'tsdb_polars_anal',
|
||||
debug_mode=True,
|
||||
debug_mode=pdb,
|
||||
),
|
||||
open_storage_client() as (
|
||||
mod,
|
||||
client,
|
||||
),
|
||||
open_storage_client() as (mod, client),
|
||||
):
|
||||
syms: list[str] = await client.list_keys()
|
||||
print(f'{len(syms)} FOUND for {mod.name}')
|
||||
log.info(f'{len(syms)} FOUND for {mod.name}')
|
||||
|
||||
(
|
||||
history,
|
||||
|
|
|
@ -19,7 +19,8 @@
|
|||
call a poor man's tsdb).
|
||||
|
||||
AKA a `piker`-native file-system native "time series database"
|
||||
without needing an extra process and no standard TSDB features, YET!
|
||||
without needing an extra process and no standard TSDB features,
|
||||
YET!
|
||||
|
||||
'''
|
||||
# TODO: like there's soo much..
|
||||
|
@ -67,6 +68,7 @@ from piker import config
|
|||
from piker.data import def_iohlcv_fields
|
||||
from piker.data import ShmArray
|
||||
from piker.log import get_logger
|
||||
from . import TimeseriesNotFound
|
||||
|
||||
|
||||
log = get_logger('storage.nativedb')
|
||||
|
@ -228,8 +230,21 @@ class NativeStorageClient:
|
|||
fqme,
|
||||
timeframe,
|
||||
)
|
||||
except FileNotFoundError:
|
||||
return None
|
||||
except FileNotFoundError as fnfe:
|
||||
|
||||
bs_fqme, _, *_ = fqme.rpartition('.')
|
||||
|
||||
possible_matches: list[str] = []
|
||||
for tskey in self._index:
|
||||
if bs_fqme in tskey:
|
||||
possible_matches.append(tskey)
|
||||
|
||||
match_str: str = '\n'.join(sorted(possible_matches))
|
||||
raise TimeseriesNotFound(
|
||||
f'No entry for `{fqme}`?\n'
|
||||
f'Maybe you need a more specific fqme-key like:\n\n'
|
||||
f'{match_str}'
|
||||
) from fnfe
|
||||
|
||||
times = array['time']
|
||||
return (
|
||||
|
@ -376,6 +391,8 @@ class NativeStorageClient:
|
|||
# ...
|
||||
|
||||
|
||||
# TODO: does this need to be async on average?
|
||||
# I guess for any IPC connected backend yes?
|
||||
@acm
|
||||
async def get_client(
|
||||
|
||||
|
@ -393,7 +410,7 @@ async def get_client(
|
|||
'''
|
||||
datadir: Path = config.get_conf_dir() / 'nativedb'
|
||||
if not datadir.is_dir():
|
||||
log.info(f'Creating `nativedb` director: {datadir}')
|
||||
log.info(f'Creating `nativedb` dir: {datadir}')
|
||||
datadir.mkdir()
|
||||
|
||||
client = NativeStorageClient(datadir)
|
||||
|
|
|
@ -390,7 +390,7 @@ class FspAdmin:
|
|||
complete: trio.Event,
|
||||
started: trio.Event,
|
||||
fqme: str,
|
||||
dst_fsp_flume: Flume,
|
||||
dst_flume: Flume,
|
||||
conf: dict,
|
||||
target: Fsp,
|
||||
loglevel: str,
|
||||
|
@ -408,16 +408,14 @@ class FspAdmin:
|
|||
# chaining entrypoint
|
||||
cascade,
|
||||
|
||||
# TODO: can't we just drop this and expect
|
||||
# far end to read the src flume's .mkt.fqme?
|
||||
# data feed key
|
||||
fqme=fqme,
|
||||
|
||||
# TODO: pass `Flume.to_msg()`s here?
|
||||
# mems
|
||||
src_shm_token=self.flume.rt_shm.token,
|
||||
dst_shm_token=dst_fsp_flume.rt_shm.token,
|
||||
|
||||
# target
|
||||
ns_path=ns_path,
|
||||
src_flume_addr=self.flume.to_msg(),
|
||||
dst_flume_addr=dst_flume.to_msg(),
|
||||
ns_path=ns_path, # edge-bind-func
|
||||
|
||||
loglevel=loglevel,
|
||||
zero_on_step=conf.get('zero_on_step', False),
|
||||
|
@ -431,14 +429,14 @@ class FspAdmin:
|
|||
ctx.open_stream() as stream,
|
||||
):
|
||||
|
||||
dst_fsp_flume.stream: tractor.MsgStream = stream
|
||||
dst_flume.stream: tractor.MsgStream = stream
|
||||
|
||||
# register output data
|
||||
self._registry[
|
||||
(fqme, ns_path)
|
||||
] = (
|
||||
stream,
|
||||
dst_fsp_flume.rt_shm,
|
||||
dst_flume.rt_shm,
|
||||
complete
|
||||
)
|
||||
|
||||
|
@ -515,7 +513,7 @@ class FspAdmin:
|
|||
broker='piker',
|
||||
_atype='fsp',
|
||||
)
|
||||
dst_fsp_flume = Flume(
|
||||
dst_flume = Flume(
|
||||
mkt=mkt,
|
||||
_rt_shm_token=dst_shm.token,
|
||||
first_quote={},
|
||||
|
@ -543,13 +541,13 @@ class FspAdmin:
|
|||
complete,
|
||||
started,
|
||||
fqme,
|
||||
dst_fsp_flume,
|
||||
dst_flume,
|
||||
conf,
|
||||
target,
|
||||
loglevel,
|
||||
)
|
||||
|
||||
return dst_fsp_flume, started
|
||||
return dst_flume, started
|
||||
|
||||
async def open_fsp_chart(
|
||||
self,
|
||||
|
|
Loading…
Reference in New Issue