Compare commits

..

No commits in common. "b9af6176c5e0929595fba4112e4eb605505ac1d3" and "29ce8de46266fe9f545a080673734bcf18f84f0f" have entirely different histories.

15 changed files with 385 additions and 766 deletions

View File

@ -117,57 +117,9 @@ SecondFactorDevice=
# If you use the IBKR Mobile app for second factor authentication, # If you use the IBKR Mobile app for second factor authentication,
# and you fail to complete the process before the time limit imposed # and you fail to complete the process before the time limit imposed
# by IBKR, this setting tells IBC whether to automatically restart # by IBKR, you can use this setting to tell IBC to exit: arrangements
# the login sequence, giving you another opportunity to complete # can then be made to automatically restart IBC in order to initiate
# second factor authentication. # the login sequence afresh. Otherwise, manual intervention at TWS's
#
# Permitted values are 'yes' and 'no'.
#
# If this setting is not present or has no value, then the value
# of the deprecated ExitAfterSecondFactorAuthenticationTimeout is
# used instead. If this also has no value, then this setting defaults
# to 'no'.
#
# NB: you must be using IBC v3.14.0 or later to use this setting:
# earlier versions ignore it.
ReloginAfterSecondFactorAuthenticationTimeout=
# This setting is only relevant if
# ReloginAfterSecondFactorAuthenticationTimeout is set to 'yes',
# or if ExitAfterSecondFactorAuthenticationTimeout is set to 'yes'.
#
# It controls how long (in seconds) IBC waits for login to complete
# after the user acknowledges the second factor authentication
# alert at the IBKR Mobile app. If login has not completed after
# this time, IBC terminates.
# The default value is 60.
SecondFactorAuthenticationExitInterval=
# This setting specifies the timeout for second factor authentication
# imposed by IB. The value is in seconds. You should not change this
# setting unless you have reason to believe that IB has changed the
# timeout. The default value is 180.
SecondFactorAuthenticationTimeout=180
# DEPRECATED SETTING
# ------------------
#
# ExitAfterSecondFactorAuthenticationTimeout - THIS SETTING WILL BE
# REMOVED IN A FUTURE RELEASE. For IBC version 3.14.0 and later, see
# the notes for ReloginAfterSecondFactorAuthenticationTimeout above.
#
# For IBC versions earlier than 3.14.0: If you use the IBKR Mobile
# app for second factor authentication, and you fail to complete the
# process before the time limit imposed by IBKR, you can use this
# setting to tell IBC to exit: arrangements can then be made to
# automatically restart IBC in order to initiate the login sequence
# afresh. Otherwise, manual intervention at TWS's
# Second Factor Authentication dialog is needed to complete the # Second Factor Authentication dialog is needed to complete the
# login. # login.
# #
@ -180,18 +132,29 @@ SecondFactorAuthenticationTimeout=180
ExitAfterSecondFactorAuthenticationTimeout=no ExitAfterSecondFactorAuthenticationTimeout=no
# This setting is only relevant if
# ExitAfterSecondFactorAuthenticationTimeout is set to 'yes'.
#
# It controls how long (in seconds) IBC waits for login to complete
# after the user acknowledges the second factor authentication
# alert at the IBKR Mobile app. If login has not completed after
# this time, IBC terminates.
# The default value is 40.
SecondFactorAuthenticationExitInterval=
# Trading Mode # Trading Mode
# ------------ # ------------
# #
# This indicates whether the live account or the paper trading # TWS 955 introduced a new Trading Mode combo box on its login
# account corresponding to the supplied credentials is to be used. # dialog. This indicates whether the live account or the paper
# The allowed values are 'live' (the default) and 'paper'. # trading account corresponding to the supplied credentials is
# # to be used. The allowed values are 'live' (the default) and
# If this is set to 'live', then the credentials for the live # 'paper'. For earlier versions of TWS this setting has no
# account must be supplied. If it is set to 'paper', then either # effect.
# the live or the paper-trading credentials may be supplied.
TradingMode=paper TradingMode=
# Paper-trading Account Warning # Paper-trading Account Warning
@ -225,7 +188,7 @@ AcceptNonBrokerageAccountWarning=yes
# #
# The default value is 60. # The default value is 60.
LoginDialogDisplayTimeout=60 LoginDialogDisplayTimeout=20
@ -254,15 +217,7 @@ LoginDialogDisplayTimeout=60
# but they are acceptable. # but they are acceptable.
# #
# The default is the current working directory when IBC is # The default is the current working directory when IBC is
# started, unless the TWS_SETTINGS_PATH setting in the relevant # started.
# start script is set.
#
# If both this setting and TWS_SETTINGS_PATH are set, then this
# setting takes priority. Note that if they have different values,
# auto-restart will not work.
#
# NB: this setting is now DEPRECATED. You should use the
# TWS_SETTINGS_PATH setting in the relevant start script.
IbDir=/root/Jts IbDir=/root/Jts
@ -331,30 +286,13 @@ ExistingSessionDetectedAction=primary
# #
# If OverrideTwsApiPort is set to an integer, IBC changes the # If OverrideTwsApiPort is set to an integer, IBC changes the
# 'Socket port' in TWS's API configuration to that number shortly # 'Socket port' in TWS's API configuration to that number shortly
# after startup (but note that for the FIX Gateway, this setting is # after startup. Leaving the setting blank will make no change to
# actually stored in jts.ini rather than the Gateway's settings
# file). Leaving the setting blank will make no change to
# the current setting. This setting is only intended for use in # the current setting. This setting is only intended for use in
# certain specialized situations where the port number needs to # certain specialized situations where the port number needs to
# be set dynamically at run-time, and for the FIX Gateway: most
# non-FIX users will never need it, so don't use it unless you know
# you need it.
OverrideTwsApiPort=4000
# Override TWS Master Client ID
# -----------------------------
#
# If OverrideTwsMasterClientID is set to an integer, IBC changes the
# 'Master Client ID' value in TWS's API configuration to that
# value shortly after startup. Leaving the setting blank will make
# no change to the current setting. This setting is only intended
# for use in certain specialized situations where the value needs to
# be set dynamically at run-time: most users will never need it, # be set dynamically at run-time: most users will never need it,
# so don't use it unless you know you need it. # so don't use it unless you know you need it.
OverrideTwsMasterClientID= ; OverrideTwsApiPort=4002
# Read-only Login # Read-only Login
@ -364,13 +302,11 @@ OverrideTwsMasterClientID=
# account security programme, the user will not be asked to perform # account security programme, the user will not be asked to perform
# the second factor authentication action, and login to TWS will # the second factor authentication action, and login to TWS will
# occur automatically in read-only mode: in this mode, placing or # occur automatically in read-only mode: in this mode, placing or
# managing orders is not allowed. # managing orders is not allowed. If set to 'no', and the user is
# # enrolled in IB's account security programme, the user must perform
# If set to 'no', and the user is enrolled in IB's account security # the relevant second factor authentication action to complete the
# programme, the second factor authentication process is handled # login.
# according to the Second Factor Authentication Settings described
# elsewhere in this file.
#
# If the user is not enrolled in IB's account security programme, # If the user is not enrolled in IB's account security programme,
# this setting is ignored. The default is 'no'. # this setting is ignored. The default is 'no'.
@ -390,44 +326,7 @@ ReadOnlyLogin=no
# set the relevant checkbox (this only needs to be done once) and # set the relevant checkbox (this only needs to be done once) and
# not provide a value for this setting. # not provide a value for this setting.
ReadOnlyApi= ReadOnlyApi=no
# API Precautions
# ---------------
#
# These settings relate to the corresponding 'Precautions' checkboxes in the
# API section of the Global Configuration dialog.
#
# For all of these, the accepted values are:
# - 'yes' sets the checkbox
# - 'no' clears the checkbox
# - if not set, the existing TWS/Gateway configuration is unchanged
#
# NB: thess settings are really only supplied for the benefit of new TWS
# or Gateway instances that are being automatically installed and
# started without user intervention, or where user settings are not preserved
# between sessions (eg some Docker containers). Where a user is involved, they
# should use the Global Configuration to set the relevant checkboxes and not
# provide values for these settings.
BypassOrderPrecautions=
BypassBondWarning=
BypassNegativeYieldToWorstConfirmation=
BypassCalledBondWarning=
BypassSameActionPairTradeWarning=
BypassPriceBasedVolatilityRiskWarning=
BypassUSStocksMarketDataInSharesWarning=
BypassRedirectOrderWarning=
BypassNoOverfillProtectionPrecaution=
# Market data size for US stocks - lots or shares # Market data size for US stocks - lots or shares
@ -482,145 +381,54 @@ AcceptBidAskLastSizeDisplayUpdateNotification=accept
SendMarketDataInLotsForUSstocks= SendMarketDataInLotsForUSstocks=
# Trusted API Client IPs
# ----------------------
#
# NB: THIS SETTING IS ONLY RELEVANT FOR THE GATEWAY, AND ONLY WHEN FIX=yes.
# In all other cases it is ignored.
#
# This is a list of IP addresses separated by commas. API clients with IP
# addresses in this list are able to connect to the API without Gateway
# generating the 'Incoming connection' popup.
#
# Note that 127.0.0.1 is always permitted to connect, so do not include it
# in this setting.
TrustedTwsApiClientIPs=
# Reset Order ID Sequence
# -----------------------
#
# The setting resets the order id sequence for orders submitted via the API, so
# that the next invocation of the `NextValidId` API callback will return the
# value 1. The reset occurs when TWS starts.
#
# Note that order ids are reset for all API clients, except those that have
# outstanding (ie incomplete) orders: their order id sequence carries on as
# before.
#
# Valid values are 'yes', 'true', 'false' and 'no'. The default is 'no'.
ResetOrderIdsAtStart=
# This setting specifies IBC's action when TWS displays the dialog asking for
# confirmation of a request to reset the API order id sequence.
#
# Note that the Gateway never displays this dialog, so this setting is ignored
# for a Gateway session.
#
# Valid values consist of two strings separated by a solidus '/'. The first
# value specifies the action to take when the order id reset request resulted
# from setting ResetOrderIdsAtStart=yes. The second specifies the action to
# take when the order id reset request is a result of the user clicking the
# 'Reset API order ID sequence' button in the API configuration. Each value
# must be one of the following:
#
# 'confirm'
# order ids will be reset
#
# 'reject'
# order ids will not be reset
#
# 'ignore'
# IBC will ignore the dialog. The user must take action.
#
# The default setting is ignore/ignore
# Examples:
#
# 'confirm/reject' - confirm order id reset only if ResetOrderIdsAtStart=yes
# and reject any user-initiated requests
#
# 'ignore/confirm' - user must decide what to do if ResetOrderIdsAtStart=yes
# and confirm user-initiated requests
#
# 'reject/ignore' - reject order id reset if ResetOrderIdsAtStart=yes but
# allow user to handle user-initiated requests
ConfirmOrderIdReset=
# ============================================================================= # =============================================================================
# 4. TWS Auto-Logoff and Auto-Restart # 4. TWS Auto-Closedown
# ============================================================================= # =============================================================================
# #
# TWS and Gateway insist on being restarted every day. Two alternative # IMPORTANT NOTE: Starting with TWS 974, this setting no longer
# automatic options are offered: # works properly, because IB have changed the way TWS handles its
# autologoff mechanism.
# #
# - Auto-Logoff: at a specified time, TWS shuts down tidily, without # You should now configure the TWS autologoff time to something
# restarting. # convenient for you, and restart IBC each day.
# #
# - Auto-Restart: at a specified time, TWS shuts down and then restarts # Alternatively, discontinue use of IBC and use the auto-relogin
# without the user having to re-autheticate. # mechanism within TWS 974 and later versions (note that the
# # auto-relogin mechanism provided by IB is not available if you
# The normal way to configure the time at which this happens is via the Lock # use IBC).
# and Exit section of the Configuration dialog. Once this time has been
# configured in this way, the setting persists until the user changes it again.
#
# However, there are situations where there is no user available to do this
# configuration, or where there is no persistent storage (for example some
# Docker images). In such cases, the auto-restart or auto-logoff time can be
# set whenever IBC starts with the settings below.
#
# The value, if specified, must be a time in HH:MM AM/PM format, for example
# 08:00 AM or 10:00 PM. Note that there must be a single space between the
# two parts of this value; also that midnight is "12:00 AM" and midday is
# "12:00 PM".
#
# If no value is specified for either setting, the currently configured
# settings will apply. If a value is supplied for one setting, the other
# setting is cleared. If values are supplied for both settings, only the
# auto-restart time is set, and the auto-logoff time is cleared.
#
# Note that for a normal TWS/Gateway installation with persistent storage
# (for example on a desktop computer) the value will be persisted as if the
# user had set it via the configuration dialog.
#
# If you choose to auto-restart, you should take note of the considerations
# described at the link below. Note that where this information mentions
# 'manual authentication', restarting IBC will do the job (IBKR does not
# recognise the existence of IBC in its docuemntation).
#
# https://www.interactivebrokers.com/en/software/tws/twsguide.htm#usersguidebook/configuretws/auto_restart_info.htm
#
# If you use the "RESTART" command via the IBC command server, and IBC is
# running any version of the Gateway (or a version of TWS earlier than 1018),
# note that this will set the Auto-Restart time in Gateway/TWS's configuration
# dialog to the time at which the restart actually happens (which may be up to
# a minute after the RESTART command is issued). To prevent future auto-
# restarts at this time, you must make sure you have set AutoLogoffTime or
# AutoRestartTime to your desired value before running IBC. NB: this does not
# apply to TWS from version 1018 onwards.
AutoLogoffTime= # Set to yes or no (lower case).
#
# yes means allow TWS to shut down automatically at its
# specified shutdown time, which is set via the TWS
# configuration menu.
#
# no means TWS never shuts down automatically.
#
# NB: IB recommends that you do not keep TWS running
# continuously. If you set this setting to 'no', you may
# experience incorrect TWS operation.
#
# NB: the default for this setting is 'no'. Since this will
# only work properly with TWS versions earlier than 974, you
# should explicitly set this to 'yes' for version 974 and later.
IbAutoClosedown=yes
AutoRestartTime=
# ============================================================================= # =============================================================================
# 5. TWS Tidy Closedown Time # 5. TWS Tidy Closedown Time
# ============================================================================= # =============================================================================
# #
# Specifies a time at which TWS will close down tidily, with no restart. # NB: starting with TWS 974 this is no longer a useful option
# because both TWS and Gateway now have the same auto-logoff
# mechanism, and IBC can no longer avoid this.
# #
# There is little reason to use this setting. It is similar to AutoLogoffTime, # Note that giving this setting a value does not change TWS's
# but can include a day-of-the-week, whereas AutoLogoffTime and AutoRestartTime # auto-logoff in any way: any setting will be additional to the
# apply every day. So for example you could use ClosedownAt in conjunction with # TWS auto-logoff.
# AutoRestartTime to shut down TWS on Friday evenings after the markets
# close, without it running on Saturday as well.
# #
# To tell IBC to tidily close TWS at a specified time every # To tell IBC to tidily close TWS at a specified time every
# day, set this value to <hh:mm>, for example: # day, set this value to <hh:mm>, for example:
@ -679,7 +487,7 @@ AcceptIncomingConnectionAction=reject
# no means the dialog remains on display and must be # no means the dialog remains on display and must be
# handled by the user. # handled by the user.
AllowBlindTrading=no AllowBlindTrading=yes
# Save Settings on a Schedule # Save Settings on a Schedule
@ -722,26 +530,6 @@ AllowBlindTrading=no
SaveTwsSettingsAt= SaveTwsSettingsAt=
# Confirm Crypto Currency Orders Automatically
# --------------------------------------------
#
# When you place an order for a cryptocurrency contract, a dialog is displayed
# asking you to confirm that you want to place the order, and notifying you
# that you are placing an order to trade cryptocurrency with Paxos, a New York
# limited trust company, and not at Interactive Brokers.
#
# transmit means that the order will be placed automatically, and the
# dialog will then be closed
#
# cancel means that the order will not be placed, and the dialog will
# then be closed
#
# manual means that IBC will take no action and the user must deal
# with the dialog
ConfirmCryptoCurrencyOrders=transmit
# ============================================================================= # =============================================================================
# 7. Settings Specific to Indian Versions of TWS # 7. Settings Specific to Indian Versions of TWS
@ -778,17 +566,13 @@ DismissNSEComplianceNotice=yes
# #
# The port number that IBC listens on for commands # The port number that IBC listens on for commands
# such as "STOP". DO NOT set this to the port number # such as "STOP". DO NOT set this to the port number
# used for TWS API connections. # used for TWS API connections. There is no good reason
# # to change this setting unless the port is used by
# The convention is to use 7462 for this port, # some other application (typically another instance of
# but it must be set to a different value from any other # IBC). The default value is 0, which tells IBC not to
# IBC instance that might run at the same time. # start the command server
#
# The default value is 0, which tells IBC not to start
# the command server
#CommandServerPort=7462 #CommandServerPort=7462
CommandServerPort=0
# Permitted Command Sources # Permitted Command Sources
@ -799,19 +583,19 @@ CommandServerPort=0
# IBC. Commands can always be sent from the # IBC. Commands can always be sent from the
# same host as IBC is running on. # same host as IBC is running on.
ControlFrom= ControlFrom=127.0.0.1
# Address for Receiving Commands # Address for Receiving Commands
# ------------------------------ # ------------------------------
# #
# Specifies the IP address on which the Command Server # Specifies the IP address on which the Command Server
# is to listen. For a multi-homed host, this can be used # is so listen. For a multi-homed host, this can be used
# to specify that connection requests are only to be # to specify that connection requests are only to be
# accepted on the specified address. The default is to # accepted on the specified address. The default is to
# accept connection requests on all local addresses. # accept connection requests on all local addresses.
BindAddress= BindAddress=127.0.0.1
# Command Prompt # Command Prompt
@ -837,7 +621,7 @@ CommandPrompt=
# information is sent. The default is that such information # information is sent. The default is that such information
# is not sent. # is not sent.
SuppressInfoMessages=yes SuppressInfoMessages=no
@ -867,10 +651,10 @@ SuppressInfoMessages=yes
# The LogStructureScope setting indicates which windows are # The LogStructureScope setting indicates which windows are
# eligible for structure logging: # eligible for structure logging:
# #
# - (default value) if set to 'known', only windows that # - if set to 'known', only windows that IBC recognizes
# IBC recognizes are eligible - these are windows that # are eligible - these are windows that IBC has some
# IBC has some interest in monitoring, usually to take # interest in monitoring, usually to take some action
# some action on the user's behalf; # on the user's behalf;
# #
# - if set to 'unknown', only windows that IBC does not # - if set to 'unknown', only windows that IBC does not
# recognize are eligible. Most windows displayed by # recognize are eligible. Most windows displayed by
@ -883,8 +667,9 @@ SuppressInfoMessages=yes
# - if set to 'all', then every window displayed by TWS # - if set to 'all', then every window displayed by TWS
# is eligible. # is eligible.
# #
# The default value is 'known'.
LogStructureScope=known LogStructureScope=all
# When to Log Window Structure # When to Log Window Structure
@ -897,15 +682,13 @@ LogStructureScope=known
# structure of an eligible window the first time it # structure of an eligible window the first time it
# is encountered; # is encountered;
# #
# - if set to 'openclose', the structure is logged every
# time an eligible window is opened or closed;
#
# - if set to 'activate', the structure is logged every # - if set to 'activate', the structure is logged every
# time an eligible window is made active; # time an eligible window is made active;
# #
# - (default value) if set to 'never' or 'no' or 'false', # - if set to 'never' or 'no' or 'false', structure
# structure information is never logged. # information is never logged.
# #
# The default value is 'never'.
LogStructureWhen=never LogStructureWhen=never
@ -925,3 +708,4 @@ LogStructureWhen=never
#LogComponents= #LogComponents=

View File

@ -327,11 +327,7 @@ class MktPair(Struct, frozen=True):
) -> dict: ) -> dict:
d = super().to_dict(**kwargs) d = super().to_dict(**kwargs)
d['src'] = self.src.to_dict(**kwargs) d['src'] = self.src.to_dict(**kwargs)
d['dst'] = self.dst.to_dict(**kwargs)
if not isinstance(self.dst, str):
d['dst'] = self.dst.to_dict(**kwargs)
else:
d['dst'] = str(self.dst)
d['price_tick'] = str(self.price_tick) d['price_tick'] = str(self.price_tick)
d['size_tick'] = str(self.size_tick) d['size_tick'] = str(self.size_tick)
@ -353,16 +349,11 @@ class MktPair(Struct, frozen=True):
Constructor for a received msg-dict normally received over IPC. Constructor for a received msg-dict normally received over IPC.
''' '''
if not isinstance( dst_asset_msg = msg.pop('dst')
dst_asset_msg := msg.pop('dst'), dst = Asset.from_msg(dst_asset_msg) # .copy()
str,
):
dst: Asset = Asset.from_msg(dst_asset_msg) # .copy()
else:
dst: str = dst_asset_msg
src_asset_msg: dict = msg.pop('src') src_asset_msg = msg.pop('src')
src: Asset = Asset.from_msg(src_asset_msg) # .copy() src = Asset.from_msg(src_asset_msg) # .copy()
# XXX NOTE: ``msgspec`` can encode `Decimal` but it doesn't # XXX NOTE: ``msgspec`` can encode `Decimal` but it doesn't
# decide to it by default since we aren't spec-cing these # decide to it by default since we aren't spec-cing these

View File

@ -229,8 +229,8 @@ _samplings: dict[int, tuple[str, str]] = {
# throughput can be made faster during backfilling. # throughput can be made faster during backfilling.
60: ( 60: (
'1 min', '1 min',
'2 D', '1 D',
pendulum.duration(days=2), pendulum.duration(days=1),
), ),
} }

View File

@ -818,7 +818,7 @@ async def stream_quotes(
details: ibis.ContractDetails details: ibis.ContractDetails
async with ( async with (
open_data_client() as proxy, open_data_client() as proxy,
# trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
mkt, details = await get_mkt_info( mkt, details = await get_mkt_info(
sym, sym,

View File

@ -214,7 +214,7 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
last = time.time() last = time.time()
async for pattern in stream: async for pattern in stream:
log.info(f'received {pattern}') log.info(f'received {pattern}')
now: float = time.time() now = time.time()
# this causes tractor hang... # this causes tractor hang...
# assert 0 # assert 0
@ -261,9 +261,7 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
# defined adhoc symbol set. # defined adhoc symbol set.
stock_results = [] stock_results = []
async def extend_results( async def stash_results(target: Awaitable[list]):
target: Awaitable[list]
) -> None:
try: try:
results = await target results = await target
except tractor.trionics.Lagged: except tractor.trionics.Lagged:
@ -276,7 +274,7 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
with trio.move_on_after(3) as cs: with trio.move_on_after(3) as cs:
async with trio.open_nursery() as sn: async with trio.open_nursery() as sn:
sn.start_soon( sn.start_soon(
extend_results, stash_results,
proxy.search_symbols( proxy.search_symbols(
pattern=pattern, pattern=pattern,
upto=5, upto=5,
@ -291,10 +289,8 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
f'Search timeout? {proxy._aio_ns.ib.client}' f'Search timeout? {proxy._aio_ns.ib.client}'
) )
continue continue
elif stock_results: else:
break break
# else:
await tractor.pause()
# # match against our ad-hoc set immediately # # match against our ad-hoc set immediately
# adhoc_matches = fuzzy.extract( # adhoc_matches = fuzzy.extract(

View File

@ -42,15 +42,35 @@ if TYPE_CHECKING:
from .feed import Feed from .feed import Feed
# TODO: ideas for further abstractions as per
# https://github.com/pikers/piker/issues/216 and
# https://github.com/pikers/piker/issues/270:
# - a ``Cascade`` would be the minimal "connection" of 2 ``Flumes``
# as per circuit parlance:
# https://en.wikipedia.org/wiki/Two-port_network#Cascade_connection
# - could cover the combination of our `FspAdmin` and the
# backend `.fsp._engine` related machinery to "connect" one flume
# to another?
# - a (financial signal) ``Flow`` would be the a "collection" of such
# minmial cascades. Some engineering based jargon concepts:
# - https://en.wikipedia.org/wiki/Signal_chain
# - https://en.wikipedia.org/wiki/Daisy_chain_(electrical_engineering)
# - https://en.wikipedia.org/wiki/Audio_signal_flow
# - https://en.wikipedia.org/wiki/Digital_signal_processing#Implementation
# - https://en.wikipedia.org/wiki/Dataflow_programming
# - https://en.wikipedia.org/wiki/Signal_programming
# - https://en.wikipedia.org/wiki/Incremental_computing
class Flume(Struct): class Flume(Struct):
''' '''
Composite reference type which points to all the addressing Composite reference type which points to all the addressing handles
handles and other meta-data necessary for the read, measure and and other meta-data necessary for the read, measure and management
management of a set of real-time updated data flows. of a set of real-time updated data flows.
Can be thought of as a "flow descriptor" or "flow frame" which Can be thought of as a "flow descriptor" or "flow frame" which
describes the high level properties of a set of data flows that describes the high level properties of a set of data flows that can
can be used seamlessly across process-memory boundaries. be used seamlessly across process-memory boundaries.
Each instance's sub-components normally includes: Each instance's sub-components normally includes:
- a msg oriented quote stream provided via an IPC transport - a msg oriented quote stream provided via an IPC transport
@ -73,7 +93,6 @@ class Flume(Struct):
# private shm refs loaded dynamically from tokens # private shm refs loaded dynamically from tokens
_hist_shm: ShmArray | None = None _hist_shm: ShmArray | None = None
_rt_shm: ShmArray | None = None _rt_shm: ShmArray | None = None
_readonly: bool = True
stream: tractor.MsgStream | None = None stream: tractor.MsgStream | None = None
izero_hist: int = 0 izero_hist: int = 0
@ -90,7 +109,7 @@ class Flume(Struct):
if self._rt_shm is None: if self._rt_shm is None:
self._rt_shm = attach_shm_array( self._rt_shm = attach_shm_array(
token=self._rt_shm_token, token=self._rt_shm_token,
readonly=self._readonly, readonly=True,
) )
return self._rt_shm return self._rt_shm
@ -103,10 +122,12 @@ class Flume(Struct):
'No shm token has been set for the history buffer?' 'No shm token has been set for the history buffer?'
) )
if self._hist_shm is None: if (
self._hist_shm is None
):
self._hist_shm = attach_shm_array( self._hist_shm = attach_shm_array(
token=self._hist_shm_token, token=self._hist_shm_token,
readonly=self._readonly, readonly=True,
) )
return self._hist_shm return self._hist_shm
@ -125,10 +146,10 @@ class Flume(Struct):
period and ratio between them. period and ratio between them.
''' '''
times: np.ndarray = self.hist_shm.array['time'] times = self.hist_shm.array['time']
end: float | int = pendulum.from_timestamp(times[-1]) end = pendulum.from_timestamp(times[-1])
start: float | int = pendulum.from_timestamp(times[times != times[-1]][-1]) start = pendulum.from_timestamp(times[times != times[-1]][-1])
hist_step_size_s: float = (end - start).seconds hist_step_size_s = (end - start).seconds
times = self.rt_shm.array['time'] times = self.rt_shm.array['time']
end = pendulum.from_timestamp(times[-1]) end = pendulum.from_timestamp(times[-1])
@ -148,25 +169,17 @@ class Flume(Struct):
msg = self.to_dict() msg = self.to_dict()
msg['mkt'] = self.mkt.to_dict() msg['mkt'] = self.mkt.to_dict()
# NOTE: pop all un-msg-serializable fields: # can't serialize the stream or feed objects, it's expected
# - `tractor.MsgStream` # you'll have a ref to it since this msg should be rxed on
# - `Feed` # a stream on whatever far end IPC..
# - `Shmarray`
# it's expected the `.from_msg()` on the other side
# will get instead some kind of msg-compat version
# that it can load.
msg.pop('stream') msg.pop('stream')
msg.pop('feed') msg.pop('feed')
msg.pop('_rt_shm')
msg.pop('_hist_shm')
return msg return msg
@classmethod @classmethod
def from_msg( def from_msg(
cls, cls,
msg: dict, msg: dict,
readonly: bool = True,
) -> dict: ) -> dict:
''' '''
@ -177,11 +190,7 @@ class Flume(Struct):
mkt_msg = msg.pop('mkt') mkt_msg = msg.pop('mkt')
from ..accounting import MktPair # cycle otherwise.. from ..accounting import MktPair # cycle otherwise..
mkt = MktPair.from_msg(mkt_msg) mkt = MktPair.from_msg(mkt_msg)
msg |= {'_readonly': readonly} return cls(mkt=mkt, **msg)
return cls(
mkt=mkt,
**msg,
)
def get_index( def get_index(
self, self,

View File

@ -57,7 +57,6 @@ from ._sampling import (
from ..brokers._util import ( from ..brokers._util import (
DataUnavailable, DataUnavailable,
) )
from ..storage import TimeseriesNotFound
if TYPE_CHECKING: if TYPE_CHECKING:
from bidict import bidict from bidict import bidict
@ -691,18 +690,13 @@ async def tsdb_backfill(
# but if not then below the remaining history can be lazy # but if not then below the remaining history can be lazy
# loaded? # loaded?
fqme: str = mkt.fqme fqme: str = mkt.fqme
last_tsdb_dt: datetime | None = None tsdb_entry: tuple | None = await storage.load(
try: fqme,
tsdb_entry: tuple | None = await storage.load( timeframe=timeframe,
fqme, )
timeframe=timeframe,
)
except TimeseriesNotFound:
log.warning(
f'No timeseries yet for {fqme}'
)
else: last_tsdb_dt: datetime | None = None
if tsdb_entry:
( (
tsdb_history, tsdb_history,
first_tsdb_dt, first_tsdb_dt,
@ -969,8 +963,7 @@ async def manage_history(
sub_for_broadcasts=False, sub_for_broadcasts=False,
) as sample_stream: ) as sample_stream:
# register 1s and 1m buffers with the global # register 1s and 1m buffers with the global incrementer task
# incrementer task
log.info(f'Connected to sampler stream: {sample_stream}') log.info(f'Connected to sampler stream: {sample_stream}')
for timeframe in [60, 1]: for timeframe in [60, 1]:

View File

@ -26,10 +26,7 @@ from ._api import (
maybe_mk_fsp_shm, maybe_mk_fsp_shm,
Fsp, Fsp,
) )
from ._engine import ( from ._engine import cascade
cascade,
Cascade,
)
from ._volume import ( from ._volume import (
dolla_vlm, dolla_vlm,
flow_rates, flow_rates,
@ -38,7 +35,6 @@ from ._volume import (
__all__: list[str] = [ __all__: list[str] = [
'cascade', 'cascade',
'Cascade',
'maybe_mk_fsp_shm', 'maybe_mk_fsp_shm',
'Fsp', 'Fsp',
'dolla_vlm', 'dolla_vlm',
@ -50,12 +46,9 @@ __all__: list[str] = [
async def latency( async def latency(
source: 'TickStream[Dict[str, float]]', # noqa source: 'TickStream[Dict[str, float]]', # noqa
ohlcv: np.ndarray ohlcv: np.ndarray
) -> AsyncIterator[np.ndarray]: ) -> AsyncIterator[np.ndarray]:
''' """Latency measurements, broker to piker.
Latency measurements, broker to piker. """
'''
# TODO: do we want to offer yielding this async # TODO: do we want to offer yielding this async
# before the rt data connection comes up? # before the rt data connection comes up?

View File

@ -18,12 +18,13 @@
core task logic for processing chains core task logic for processing chains
''' '''
from __future__ import annotations from dataclasses import dataclass
from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
from typing import ( from typing import (
AsyncIterator, AsyncIterator,
Callable, Callable,
Optional,
Union,
) )
import numpy as np import numpy as np
@ -32,9 +33,9 @@ from trio_typing import TaskStatus
import tractor import tractor
from tractor.msg import NamespacePath from tractor.msg import NamespacePath
from piker.types import Struct
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from .. import data from .. import data
from ..data import attach_shm_array
from ..data.feed import ( from ..data.feed import (
Flume, Flume,
Feed, Feed,
@ -55,6 +56,12 @@ from ..toolz import Profiler
log = get_logger(__name__) log = get_logger(__name__)
@dataclass
class TaskTracker:
complete: trio.Event
cs: trio.CancelScope
async def filter_quotes_by_sym( async def filter_quotes_by_sym(
sym: str, sym: str,
@ -75,168 +82,30 @@ async def filter_quotes_by_sym(
if quote: if quote:
yield quote yield quote
# TODO: unifying the abstractions in this FSP subsys/layer:
# -[ ] move the `.data.flows.Flume` type into this
# module/subsys/pkg?
# -[ ] ideas for further abstractions as per
# - https://github.com/pikers/piker/issues/216,
# - https://github.com/pikers/piker/issues/270:
# - a (financial signal) ``Flow`` would be the a "collection" of such
# minmial cascades. Some engineering based jargon concepts:
# - https://en.wikipedia.org/wiki/Signal_chain
# - https://en.wikipedia.org/wiki/Daisy_chain_(electrical_engineering)
# - https://en.wikipedia.org/wiki/Audio_signal_flow
# - https://en.wikipedia.org/wiki/Digital_signal_processing#Implementation
# - https://en.wikipedia.org/wiki/Dataflow_programming
# - https://en.wikipedia.org/wiki/Signal_programming
# - https://en.wikipedia.org/wiki/Incremental_computing
# - https://en.wikipedia.org/wiki/Signal-flow_graph
# - https://en.wikipedia.org/wiki/Signal-flow_graph#Basic_components
# -[ ] we probably want to eval THE BELOW design and unify with the async def fsp_compute(
# proto `TaskManager` in the `tractor` dev branch as well as with
# our below idea for `Cascade`:
# - https://github.com/goodboy/tractor/pull/363
class Cascade(Struct):
'''
As per sig-proc engineering parlance, this is a chaining of
`Flume`s, which are themselves collections of "Streams"
implemented currently via `ShmArray`s.
A `Cascade` is be the minimal "connection" of 2 `Flumes`
as per circuit parlance:
https://en.wikipedia.org/wiki/Two-port_network#Cascade_connection
TODO:
-[ ] could cover the combination of our `FspAdmin` and the
backend `.fsp._engine` related machinery to "connect" one flume
to another?
'''
# TODO: make these `Flume`s
src: Flume
dst: Flume
tn: trio.Nursery
fsp: Fsp # UI-side middleware ctl API
# filled during cascade/.bind_func() (fsp_compute) init phases
bind_func: Callable | None = None
complete: trio.Event | None = None
cs: trio.CancelScope | None = None
client_stream: tractor.MsgStream | None = None
async def resync(self) -> int:
# TODO: adopt an incremental update engine/approach
# where possible here eventually!
log.info(f're-syncing fsp {self.fsp.name} to source')
self.cs.cancel()
await self.complete.wait()
index: int = await self.tn.start(self.bind_func)
# always trigger UI refresh after history update,
# see ``piker.ui._fsp.FspAdmin.open_chain()`` and
# ``piker.ui._display.trigger_update()``.
dst_shm: ShmArray = self.dst.rt_shm
await self.client_stream.send({
'fsp_update': {
'key': dst_shm.token,
'first': dst_shm._first.value,
'last': dst_shm._last.value,
}
})
return index
def is_synced(self) -> tuple[bool, int, int]:
'''
Predicate to dertmine if a destination FSP
output array is aligned to its source array.
'''
src_shm: ShmArray = self.src.rt_shm
dst_shm: ShmArray = self.dst.rt_shm
step_diff = src_shm.index - dst_shm.index
len_diff = abs(len(src_shm.array) - len(dst_shm.array))
synced: bool = not (
# the source is likely backfilling and we must
# sync history calculations
len_diff > 2
# we aren't step synced to the source and may be
# leading/lagging by a step
or step_diff > 1
or step_diff < 0
)
if not synced:
fsp: Fsp = self.fsp
log.warning(
'***DESYNCED FSP***\n'
f'{fsp.ns_path}@{src_shm.token}\n'
f'step_diff: {step_diff}\n'
f'len_diff: {len_diff}\n'
)
return (
synced,
step_diff,
len_diff,
)
async def poll_and_sync_to_step(self) -> int:
synced, step_diff, _ = self.is_synced()
while not synced:
await self.resync()
synced, step_diff, _ = self.is_synced()
return step_diff
@acm
async def open_edge(
self,
bind_func: Callable,
) -> int:
self.bind_func = bind_func
index = await self.tn.start(bind_func)
yield index
# TODO: what do we want on teardown/error?
# -[ ] dynamic reconnection after update?
async def connect_streams(
casc: Cascade,
mkt: MktPair, mkt: MktPair,
flume: Flume,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
src: Flume,
dst: Flume,
edge_func: Callable, src: ShmArray,
dst: ShmArray,
func: Callable,
# attach_stream: bool = False, # attach_stream: bool = False,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
'''
Stream and per-sample compute and write the cascade of
2 `Flumes`/streams given some operating `func`.
https://en.wikipedia.org/wiki/Signal-flow_graph#Basic_components
Not literally, but something like:
edge_func(Flume_in) -> Flume_out
'''
profiler = Profiler( profiler = Profiler(
delayed=False, delayed=False,
disabled=True disabled=True
) )
# TODO: just pull it from src.mkt.fqme no? fqme = mkt.fqme
# fqme: str = mkt.fqme out_stream = func(
fqme: str = src.mkt.fqme
# TODO: dynamic introspection of what the underlying (vertex)
# function actually requires from input node (flumes) then
# deliver those inputs as part of a graph "compilation" step?
out_stream = edge_func(
# TODO: do we even need this if we do the feed api right? # TODO: do we even need this if we do the feed api right?
# shouldn't a local stream do this before we get a handle # shouldn't a local stream do this before we get a handle
@ -244,21 +113,20 @@ async def connect_streams(
# async itertools style? # async itertools style?
filter_quotes_by_sym(fqme, quote_stream), filter_quotes_by_sym(fqme, quote_stream),
# XXX: currently the ``ohlcv`` arg, but we should allow # XXX: currently the ``ohlcv`` arg
# (dynamic) requests for src flume (node) streams? flume.rt_shm,
src.rt_shm,
) )
# HISTORY COMPUTE PHASE # HISTORY COMPUTE PHASE
# conduct a single iteration of fsp with historical bars input # conduct a single iteration of fsp with historical bars input
# and get historical output. # and get historical output.
history_output: ( history_output: Union[
dict[str, np.ndarray] # multi-output case dict[str, np.ndarray], # multi-output case
| np.ndarray, # single output case np.ndarray, # single output case
) ]
history_output = await anext(out_stream) history_output = await anext(out_stream)
func_name = edge_func.__name__ func_name = func.__name__
profiler(f'{func_name} generated history') profiler(f'{func_name} generated history')
# build struct array with an 'index' field to push as history # build struct array with an 'index' field to push as history
@ -266,12 +134,10 @@ async def connect_streams(
# TODO: push using a[['f0', 'f1', .., 'fn']] = .. syntax no? # TODO: push using a[['f0', 'f1', .., 'fn']] = .. syntax no?
# if the output array is multi-field then push # if the output array is multi-field then push
# each respective field. # each respective field.
dst_shm: ShmArray = dst.rt_shm fields = getattr(dst.array.dtype, 'fields', None).copy()
fields = getattr(dst_shm.array.dtype, 'fields', None).copy()
fields.pop('index') fields.pop('index')
history_by_field: np.ndarray | None = None history_by_field: Optional[np.ndarray] = None
src_shm: ShmArray = src.rt_shm src_time = src.array['time']
src_time = src_shm.array['time']
if ( if (
fields and fields and
@ -290,7 +156,7 @@ async def connect_streams(
if history_by_field is None: if history_by_field is None:
if output is None: if output is None:
length = len(src_shm.array) length = len(src.array)
else: else:
length = len(output) length = len(output)
@ -299,7 +165,7 @@ async def connect_streams(
# will be pushed to shm. # will be pushed to shm.
history_by_field = np.zeros( history_by_field = np.zeros(
length, length,
dtype=dst_shm.array.dtype dtype=dst.array.dtype
) )
if output is None: if output is None:
@ -316,13 +182,13 @@ async def connect_streams(
) )
history_by_field = np.zeros( history_by_field = np.zeros(
len(history_output), len(history_output),
dtype=dst_shm.array.dtype dtype=dst.array.dtype
) )
history_by_field[func_name] = history_output history_by_field[func_name] = history_output
history_by_field['time'] = src_time[-len(history_by_field):] history_by_field['time'] = src_time[-len(history_by_field):]
history_output['time'] = src_shm.array['time'] history_output['time'] = src.array['time']
# TODO: XXX: # TODO: XXX:
# THERE'S A BIG BUG HERE WITH THE `index` field since we're # THERE'S A BIG BUG HERE WITH THE `index` field since we're
@ -335,11 +201,11 @@ async def connect_streams(
# is `index` aware such that historical data can be indexed # is `index` aware such that historical data can be indexed
# relative to the true first datum? Not sure if this is sane # relative to the true first datum? Not sure if this is sane
# for incremental compuations. # for incremental compuations.
first = dst_shm._first.value = src_shm._first.value first = dst._first.value = src._first.value
# TODO: can we use this `start` flag instead of the manual # TODO: can we use this `start` flag instead of the manual
# setting above? # setting above?
index = dst_shm.push( index = dst.push(
history_by_field, history_by_field,
start=first, start=first,
) )
@ -350,9 +216,12 @@ async def connect_streams(
# setup a respawn handle # setup a respawn handle
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
casc.cs = cs # TODO: might be better to just make a "restart" method where
casc.complete = trio.Event() # the target task is spawned implicitly and then the event is
task_status.started(index) # set via some higher level api? At that poing we might as well
# be writing a one-cancels-one nursery though right?
tracker = TaskTracker(trio.Event(), cs)
task_status.started((tracker, index))
profiler(f'{func_name} yield last index') profiler(f'{func_name} yield last index')
@ -366,12 +235,12 @@ async def connect_streams(
log.debug(f"{func_name}: {processed}") log.debug(f"{func_name}: {processed}")
key, output = processed key, output = processed
# dst.array[-1][key] = output # dst.array[-1][key] = output
dst_shm.array[[key, 'time']][-1] = ( dst.array[[key, 'time']][-1] = (
output, output,
# TODO: what about pushing ``time.time_ns()`` # TODO: what about pushing ``time.time_ns()``
# in which case we'll need to round at the graphics # in which case we'll need to round at the graphics
# processing / sampling layer? # processing / sampling layer?
src_shm.array[-1]['time'] src.array[-1]['time']
) )
# NOTE: for now we aren't streaming this to the consumer # NOTE: for now we aren't streaming this to the consumer
@ -383,7 +252,7 @@ async def connect_streams(
# N-consumers who subscribe for the real-time output, # N-consumers who subscribe for the real-time output,
# which we'll likely want to implement using local-mem # which we'll likely want to implement using local-mem
# chans for the fan out? # chans for the fan out?
# index = src_shm.index # index = src.index
# if attach_stream: # if attach_stream:
# await client_stream.send(index) # await client_stream.send(index)
@ -393,7 +262,7 @@ async def connect_streams(
# log.info(f'FSP quote too fast: {hz}') # log.info(f'FSP quote too fast: {hz}')
# last = time.time() # last = time.time()
finally: finally:
casc.complete.set() tracker.complete.set()
@tractor.context @tractor.context
@ -404,15 +273,15 @@ async def cascade(
# data feed key # data feed key
fqme: str, fqme: str,
# flume pair cascaded using an "edge function" src_shm_token: dict,
src_flume_addr: dict, dst_shm_token: tuple[str, np.dtype],
dst_flume_addr: dict,
ns_path: NamespacePath, ns_path: NamespacePath,
shm_registry: dict[str, _Token], shm_registry: dict[str, _Token],
zero_on_step: bool = False, zero_on_step: bool = False,
loglevel: str | None = None, loglevel: Optional[str] = None,
) -> None: ) -> None:
''' '''
@ -428,14 +297,8 @@ async def cascade(
if loglevel: if loglevel:
get_console_log(loglevel) get_console_log(loglevel)
src: Flume = Flume.from_msg(src_flume_addr) src = attach_shm_array(token=src_shm_token)
dst: Flume = Flume.from_msg( dst = attach_shm_array(readonly=False, token=dst_shm_token)
dst_flume_addr,
readonly=False,
)
# src: ShmArray = attach_shm_array(token=src_shm_token)
# dst: ShmArray = attach_shm_array(readonly=False, token=dst_shm_token)
reg = _load_builtins() reg = _load_builtins()
lines = '\n'.join([f'{key.rpartition(":")[2]} => {key}' for key in reg]) lines = '\n'.join([f'{key.rpartition(":")[2]} => {key}' for key in reg])
@ -443,11 +306,11 @@ async def cascade(
f'Registered FSP set:\n{lines}' f'Registered FSP set:\n{lines}'
) )
# NOTE XXX: update actorlocal flows table which registers # update actorlocal flows table which registers
# readonly "instances" of this fsp for symbol/source so that # readonly "instances" of this fsp for symbol/source
# consumer fsps can look it up by source + fsp. # so that consumer fsps can look it up by source + fsp.
# TODO: ugh i hate this wind/unwind to list over the wire but # TODO: ugh i hate this wind/unwind to list over the wire
# not sure how else to do it. # but not sure how else to do it.
for (token, fsp_name, dst_token) in shm_registry: for (token, fsp_name, dst_token) in shm_registry:
Fsp._flow_registry[( Fsp._flow_registry[(
_Token.from_msg(token), _Token.from_msg(token),
@ -457,15 +320,12 @@ async def cascade(
fsp: Fsp = reg.get( fsp: Fsp = reg.get(
NamespacePath(ns_path) NamespacePath(ns_path)
) )
func: Callable = fsp.func func = fsp.func
if not func: if not func:
# TODO: assume it's a func target path # TODO: assume it's a func target path
raise ValueError(f'Unknown fsp target: {ns_path}') raise ValueError(f'Unknown fsp target: {ns_path}')
_fqme: str = src.mkt.fqme
assert _fqme == fqme
# open a data feed stream with requested broker # open a data feed stream with requested broker
feed: Feed feed: Feed
async with data.feed.maybe_open_feed( async with data.feed.maybe_open_feed(
@ -479,142 +339,177 @@ async def cascade(
) as feed: ) as feed:
flume: Flume = feed.flumes[fqme] flume = feed.flumes[fqme]
# XXX: can't do this since flume.feed will be set XD mkt = flume.mkt
# assert flume == src assert src.token == flume.rt_shm.token
assert flume.mkt == src.mkt
mkt: MktPair = flume.mkt
# NOTE: FOR NOW, sanity checks around the feed as being
# always the src flume (until we get to fancier/lengthier
# chains/graphs.
assert src.rt_shm.token == flume.rt_shm.token
# XXX: won't work bc the _hist_shm_token value will be
# list[list] after IPC..
# assert flume.to_msg() == src_flume_addr
profiler(f'{func}: feed up') profiler(f'{func}: feed up')
func_name: str = func.__name__ func_name = func.__name__
async with ( async with (
trio.open_nursery() as tn, trio.open_nursery() as n,
): ):
# TODO: might be better to just make a "restart" method where
# the target task is spawned implicitly and then the event is
# set via some higher level api? At that poing we might as well
# be writing a one-cancels-one nursery though right?
casc = Cascade(
src,
dst,
tn,
fsp,
)
# TODO: this seems like it should be wrapped somewhere?
fsp_target = partial( fsp_target = partial(
connect_streams,
casc=casc, fsp_compute,
mkt=mkt, mkt=mkt,
flume=flume,
quote_stream=flume.stream, quote_stream=flume.stream,
# flumes and shm passthrough # shm
src=src, src=src,
dst=dst, dst=dst,
# chain function which takes src flume input(s) # target
# and renders dst flume output(s) func=func
edge_func=func
) )
async with casc.open_edge(
bind_func=fsp_target,
) as index:
# casc.bind_func = fsp_target
# index = await tn.start(fsp_target)
dst_shm: ShmArray = dst.rt_shm
src_shm: ShmArray = src.rt_shm
if zero_on_step: tracker, index = await n.start(fsp_target)
last = dst.rt_shm.array[-1:]
zeroed = np.zeros(last.shape, dtype=last.dtype)
profiler(f'{func_name}: fsp up') if zero_on_step:
last = dst.array[-1:]
zeroed = np.zeros(last.shape, dtype=last.dtype)
# sync to client-side actor profiler(f'{func_name}: fsp up')
await ctx.started(index)
# XXX: rt stream with client which we MUST # sync client
# open here (and keep it open) in order to make await ctx.started(index)
# incremental "updates" as history prepends take
# place.
async with ctx.open_stream() as client_stream:
casc.client_stream: tractor.MsgStream = client_stream
s, step, ld = casc.is_synced() # XXX: rt stream with client which we MUST
# open here (and keep it open) in order to make
# incremental "updates" as history prepends take
# place.
async with ctx.open_stream() as client_stream:
# detect sample period step for subscription to increment # TODO: these likely should all become
# signal # methods of this ``TaskLifetime`` or wtv
times = src.rt_shm.array['time'] # abstraction..
if len(times) > 1: async def resync(
last_ts = times[-1] tracker: TaskTracker,
delay_s: float = float(last_ts - times[times != last_ts][-1])
else:
# our default "HFT" sample rate.
delay_s: float = _default_delay_s
# sub and increment the underlying shared memory buffer ) -> tuple[TaskTracker, int]:
# on every step msg received from the global `samplerd` # TODO: adopt an incremental update engine/approach
# service. # where possible here eventually!
async with open_sample_stream( log.info(f're-syncing fsp {func_name} to source')
float(delay_s) tracker.cs.cancel()
) as istream: await tracker.complete.wait()
tracker, index = await n.start(fsp_target)
profiler(f'{func_name}: sample stream up') # always trigger UI refresh after history update,
profiler.finish() # see ``piker.ui._fsp.FspAdmin.open_chain()`` and
# ``piker.ui._display.trigger_update()``.
await client_stream.send({
'fsp_update': {
'key': dst_shm_token,
'first': dst._first.value,
'last': dst._last.value,
}
})
return tracker, index
async for i in istream: def is_synced(
# print(f'FSP incrementing {i}') src: ShmArray,
dst: ShmArray
) -> tuple[bool, int, int]:
'''
Predicate to dertmine if a destination FSP
output array is aligned to its source array.
# respawn the compute task if the source '''
# array has been updated such that we compute step_diff = src.index - dst.index
# new history from the (prepended) source. len_diff = abs(len(src.array) - len(dst.array))
synced, step_diff, _ = casc.is_synced() return not (
if not synced: # the source is likely backfilling and we must
step_diff: int = await casc.poll_and_sync_to_step() # sync history calculations
len_diff > 2
# skip adding a last bar since we should already # we aren't step synced to the source and may be
# be step alinged # leading/lagging by a step
if step_diff == 0: or step_diff > 1
continue or step_diff < 0
), step_diff, len_diff
# read out last shm row, copy and write new row async def poll_and_sync_to_step(
array = dst_shm.array tracker: TaskTracker,
src: ShmArray,
dst: ShmArray,
# some metrics like vlm should be reset ) -> tuple[TaskTracker, int]:
# to zero every step.
if zero_on_step:
last = zeroed
else:
last = array[-1:].copy()
dst.rt_shm.push(last) synced, step_diff, _ = is_synced(src, dst)
while not synced:
tracker, index = await resync(tracker)
synced, step_diff, _ = is_synced(src, dst)
# sync with source buffer's time step return tracker, step_diff
src_l2 = src_shm.array[-2:]
src_li, src_lt = src_l2[-1][['index', 'time']]
src_2li, src_2lt = src_l2[-2][['index', 'time']]
dst_shm._array['time'][src_li] = src_lt
dst_shm._array['time'][src_2li] = src_2lt
# last2 = dst.array[-2:] s, step, ld = is_synced(src, dst)
# if (
# last2[-1]['index'] != src_li # detect sample period step for subscription to increment
# or last2[-2]['index'] != src_2li # signal
# ): times = src.array['time']
# dstl2 = list(last2) if len(times) > 1:
# srcl2 = list(src_l2) last_ts = times[-1]
# print( delay_s = float(last_ts - times[times != last_ts][-1])
# # f'{dst.token}\n' else:
# f'src: {srcl2}\n' # our default "HFT" sample rate.
# f'dst: {dstl2}\n' delay_s = _default_delay_s
# )
# sub and increment the underlying shared memory buffer
# on every step msg received from the global `samplerd`
# service.
async with open_sample_stream(float(delay_s)) as istream:
profiler(f'{func_name}: sample stream up')
profiler.finish()
async for i in istream:
# print(f'FSP incrementing {i}')
# respawn the compute task if the source
# array has been updated such that we compute
# new history from the (prepended) source.
synced, step_diff, _ = is_synced(src, dst)
if not synced:
tracker, step_diff = await poll_and_sync_to_step(
tracker,
src,
dst,
)
# skip adding a last bar since we should already
# be step alinged
if step_diff == 0:
continue
# read out last shm row, copy and write new row
array = dst.array
# some metrics like vlm should be reset
# to zero every step.
if zero_on_step:
last = zeroed
else:
last = array[-1:].copy()
dst.push(last)
# sync with source buffer's time step
src_l2 = src.array[-2:]
src_li, src_lt = src_l2[-1][['index', 'time']]
src_2li, src_2lt = src_l2[-2][['index', 'time']]
dst._array['time'][src_li] = src_lt
dst._array['time'][src_2li] = src_2lt
# last2 = dst.array[-2:]
# if (
# last2[-1]['index'] != src_li
# or last2[-2]['index'] != src_2li
# ):
# dstl2 = list(last2)
# srcl2 = list(src_l2)
# print(
# # f'{dst.token}\n'
# f'src: {srcl2}\n'
# f'dst: {dstl2}\n'
# )

View File

@ -84,10 +84,10 @@ async def open_piker_runtime(
a root actor. a root actor.
''' '''
# check for existing runtime, boot it
# if not already running.
try: try:
actor = tractor.current_actor() # check for existing runtime
actor = tractor.current_actor().uid
except tractor._exceptions.NoRuntime: except tractor._exceptions.NoRuntime:
tractor._state._runtime_vars[ tractor._state._runtime_vars[
'piker_vars' 'piker_vars'
@ -116,16 +116,15 @@ async def open_piker_runtime(
enable_modules=enable_modules, enable_modules=enable_modules,
**tractor_kwargs, **tractor_kwargs,
) as actor, ) as _,
open_registry( open_registry(
registry_addrs, registry_addrs,
ensure_exists=False, ensure_exists=False,
) as addrs, ) as addrs,
): ):
assert actor is tractor.current_actor()
yield ( yield (
actor, tractor.current_actor(),
addrs, addrs,
) )
else: else:
@ -269,39 +268,28 @@ async def maybe_open_pikerd(
# async with open_portal(chan) as arb_portal: # async with open_portal(chan) as arb_portal:
# yield arb_portal # yield arb_portal
registry_addrs: list[tuple[str, int]] = ( registry_addrs = registry_addrs or [_default_reg_addr]
registry_addrs
or [_default_reg_addr]
)
pikerd_portal: tractor.Portal | None
async with ( async with (
open_piker_runtime( open_piker_runtime(
name=query_name, name=query_name,
registry_addrs=registry_addrs, registry_addrs=registry_addrs,
loglevel=loglevel, loglevel=loglevel,
**kwargs, **kwargs,
) as (actor, addrs), ) as _,
# try to attach to any existing (host-local) `pikerd`
tractor.find_actor( tractor.find_actor(
_root_dname, _root_dname,
registry_addrs=registry_addrs, registry_addrs=registry_addrs,
only_first=True, only_first=True,
# raise_on_none=True, ) as portal
) as pikerd_portal,
): ):
# connect to any existing remote daemon presuming its # connect to any existing daemon presuming
# registry socket was selected. # its registry socket was selected.
if pikerd_portal is not None: if (
portal is not None
# sanity check that we are actually connecting to ):
# a remote process and not ourselves. yield portal
assert actor.uid != pikerd_portal.channel.uid
assert registry_addrs
yield pikerd_portal
return return
# presume pikerd role since no daemon could be found at # presume pikerd role since no daemon could be found at

View File

@ -102,7 +102,7 @@ async def open_registry(
not tractor.is_root_process() not tractor.is_root_process()
and not Registry.addrs and not Registry.addrs
): ):
Registry.addrs.extend(actor.reg_addrs) Registry.addrs.extend(actor._reg_addrs)
if ( if (
ensure_exists ensure_exists

View File

@ -139,13 +139,6 @@ class StorageClient(
... ...
class TimeseriesNotFound(Exception):
'''
No timeseries entry can be found for this backend.
'''
class StorageConnectionError(ConnectionError): class StorageConnectionError(ConnectionError):
''' '''
Can't connect to the desired tsdb subsys/service. Can't connect to the desired tsdb subsys/service.

View File

@ -140,27 +140,19 @@ def delete(
def anal( def anal(
fqme: str, fqme: str,
period: int = 60, period: int = 60,
pdb: bool = False,
) -> np.ndarray: ) -> np.ndarray:
'''
Anal-ysis is when you take the data do stuff to it, i think.
'''
async def main(): async def main():
async with ( async with (
open_piker_runtime( open_piker_runtime(
# are you a bear or boi?
'tsdb_polars_anal', 'tsdb_polars_anal',
debug_mode=pdb, debug_mode=True,
),
open_storage_client() as (
mod,
client,
), ),
open_storage_client() as (mod, client),
): ):
syms: list[str] = await client.list_keys() syms: list[str] = await client.list_keys()
log.info(f'{len(syms)} FOUND for {mod.name}') print(f'{len(syms)} FOUND for {mod.name}')
( (
history, history,

View File

@ -19,8 +19,7 @@
call a poor man's tsdb). call a poor man's tsdb).
AKA a `piker`-native file-system native "time series database" AKA a `piker`-native file-system native "time series database"
without needing an extra process and no standard TSDB features, without needing an extra process and no standard TSDB features, YET!
YET!
''' '''
# TODO: like there's soo much.. # TODO: like there's soo much..
@ -68,7 +67,6 @@ from piker import config
from piker.data import def_iohlcv_fields from piker.data import def_iohlcv_fields
from piker.data import ShmArray from piker.data import ShmArray
from piker.log import get_logger from piker.log import get_logger
from . import TimeseriesNotFound
log = get_logger('storage.nativedb') log = get_logger('storage.nativedb')
@ -230,21 +228,8 @@ class NativeStorageClient:
fqme, fqme,
timeframe, timeframe,
) )
except FileNotFoundError as fnfe: except FileNotFoundError:
return None
bs_fqme, _, *_ = fqme.rpartition('.')
possible_matches: list[str] = []
for tskey in self._index:
if bs_fqme in tskey:
possible_matches.append(tskey)
match_str: str = '\n'.join(sorted(possible_matches))
raise TimeseriesNotFound(
f'No entry for `{fqme}`?\n'
f'Maybe you need a more specific fqme-key like:\n\n'
f'{match_str}'
) from fnfe
times = array['time'] times = array['time']
return ( return (
@ -391,8 +376,6 @@ class NativeStorageClient:
# ... # ...
# TODO: does this need to be async on average?
# I guess for any IPC connected backend yes?
@acm @acm
async def get_client( async def get_client(
@ -410,7 +393,7 @@ async def get_client(
''' '''
datadir: Path = config.get_conf_dir() / 'nativedb' datadir: Path = config.get_conf_dir() / 'nativedb'
if not datadir.is_dir(): if not datadir.is_dir():
log.info(f'Creating `nativedb` dir: {datadir}') log.info(f'Creating `nativedb` director: {datadir}')
datadir.mkdir() datadir.mkdir()
client = NativeStorageClient(datadir) client = NativeStorageClient(datadir)

View File

@ -390,7 +390,7 @@ class FspAdmin:
complete: trio.Event, complete: trio.Event,
started: trio.Event, started: trio.Event,
fqme: str, fqme: str,
dst_flume: Flume, dst_fsp_flume: Flume,
conf: dict, conf: dict,
target: Fsp, target: Fsp,
loglevel: str, loglevel: str,
@ -408,14 +408,16 @@ class FspAdmin:
# chaining entrypoint # chaining entrypoint
cascade, cascade,
# TODO: can't we just drop this and expect
# far end to read the src flume's .mkt.fqme?
# data feed key # data feed key
fqme=fqme, fqme=fqme,
src_flume_addr=self.flume.to_msg(), # TODO: pass `Flume.to_msg()`s here?
dst_flume_addr=dst_flume.to_msg(), # mems
ns_path=ns_path, # edge-bind-func src_shm_token=self.flume.rt_shm.token,
dst_shm_token=dst_fsp_flume.rt_shm.token,
# target
ns_path=ns_path,
loglevel=loglevel, loglevel=loglevel,
zero_on_step=conf.get('zero_on_step', False), zero_on_step=conf.get('zero_on_step', False),
@ -429,14 +431,14 @@ class FspAdmin:
ctx.open_stream() as stream, ctx.open_stream() as stream,
): ):
dst_flume.stream: tractor.MsgStream = stream dst_fsp_flume.stream: tractor.MsgStream = stream
# register output data # register output data
self._registry[ self._registry[
(fqme, ns_path) (fqme, ns_path)
] = ( ] = (
stream, stream,
dst_flume.rt_shm, dst_fsp_flume.rt_shm,
complete complete
) )
@ -513,7 +515,7 @@ class FspAdmin:
broker='piker', broker='piker',
_atype='fsp', _atype='fsp',
) )
dst_flume = Flume( dst_fsp_flume = Flume(
mkt=mkt, mkt=mkt,
_rt_shm_token=dst_shm.token, _rt_shm_token=dst_shm.token,
first_quote={}, first_quote={},
@ -541,13 +543,13 @@ class FspAdmin:
complete, complete,
started, started,
fqme, fqme,
dst_flume, dst_fsp_flume,
conf, conf,
target, target,
loglevel, loglevel,
) )
return dst_flume, started return dst_fsp_flume, started
async def open_fsp_chart( async def open_fsp_chart(
self, self,