Compare commits

...

12 Commits

Author SHA1 Message Date
Tyler Goodlet b9af6176c5 Factor `TimeseriesNotFound` to top level
TO CHERRY into #486
2023-12-07 12:31:14 -05:00
Tyler Goodlet dd0167b9a5 Make `fsp.cascade()` expect src/dst `Flume`s
Been meaning to this for a while, and there's still a few design
/ interface kinks (like `.mkt: MktPair` which should be better
generalized?) but this flips over all of the fsp chaining engine
to operate on the higher level `Flume` APIs via the newly cobbled
`Cascade` thinger..
2023-12-06 17:53:35 -05:00
Tyler Goodlet 9e71e0768f Define and pass a default `Flume._readonly: bool`
Allows opening with `.from_msg(readonly=False)` for write permissions
making underlyig shm arrays readonly. Also, make sure to pop the
`ShmArray` field entries prior to msg-ization, not sure how that worked
with the `Feed.flumes` equivalent..but?
2023-12-06 17:25:49 -05:00
Tyler Goodlet 6029f39a3f Allow `MktPair.from/to_msg()` to still do `.dst: str` for fsp flumes 2023-12-06 17:09:52 -05:00
Tyler Goodlet 656e2c6a88 fsp: intro a `Cascade` type that connects `Flume`s of streams 2023-12-05 16:59:07 -05:00
Tyler Goodlet b8065a413b ib: update ibc.ini from latest upstream template 2023-12-05 16:57:38 -05:00
Tyler Goodlet 9245d24b47 ib: add `.pause()` on symbol query overruns to aid in fixing the issue 2023-12-04 13:10:15 -05:00
Tyler Goodlet 22bd83943b .storage: support `store anal --pdb` flag 2023-12-04 13:00:33 -05:00
Tyler Goodlet b94931bbdd Fix `Portal.channel: Channel` attr name error 2023-12-04 13:00:04 -05:00
Tyler Goodlet 239c1c457e Sort fqme suggestions pre-print 2023-12-04 11:34:39 -05:00
Tyler Goodlet 24a54a7085 Add `TimeseriesNotFound` for fqme lookup failures
A common usage error is to run `piker anal mnq.cme.ib` where the CLI
passed fqme is not actually fully-qualified (in this case missing an
expiry token) and we get an underlying `FileNotFoundError` from the
`StorageClient.read_ohlcv()` call. In such key misses, scan the existing
`StorageClient._index` for possible matches and report in a `raise from`
the new error.

CHERRY into #486
2023-12-04 11:22:55 -05:00
Tyler Goodlet ebd1eb114e Port runtime init to new `tractor.Actor.reg_addrs` related changes 2023-11-21 15:18:52 -05:00
15 changed files with 766 additions and 385 deletions

View File

@ -117,9 +117,57 @@ SecondFactorDevice=
# If you use the IBKR Mobile app for second factor authentication, # If you use the IBKR Mobile app for second factor authentication,
# and you fail to complete the process before the time limit imposed # and you fail to complete the process before the time limit imposed
# by IBKR, you can use this setting to tell IBC to exit: arrangements # by IBKR, this setting tells IBC whether to automatically restart
# can then be made to automatically restart IBC in order to initiate # the login sequence, giving you another opportunity to complete
# the login sequence afresh. Otherwise, manual intervention at TWS's # second factor authentication.
#
# Permitted values are 'yes' and 'no'.
#
# If this setting is not present or has no value, then the value
# of the deprecated ExitAfterSecondFactorAuthenticationTimeout is
# used instead. If this also has no value, then this setting defaults
# to 'no'.
#
# NB: you must be using IBC v3.14.0 or later to use this setting:
# earlier versions ignore it.
ReloginAfterSecondFactorAuthenticationTimeout=
# This setting is only relevant if
# ReloginAfterSecondFactorAuthenticationTimeout is set to 'yes',
# or if ExitAfterSecondFactorAuthenticationTimeout is set to 'yes'.
#
# It controls how long (in seconds) IBC waits for login to complete
# after the user acknowledges the second factor authentication
# alert at the IBKR Mobile app. If login has not completed after
# this time, IBC terminates.
# The default value is 60.
SecondFactorAuthenticationExitInterval=
# This setting specifies the timeout for second factor authentication
# imposed by IB. The value is in seconds. You should not change this
# setting unless you have reason to believe that IB has changed the
# timeout. The default value is 180.
SecondFactorAuthenticationTimeout=180
# DEPRECATED SETTING
# ------------------
#
# ExitAfterSecondFactorAuthenticationTimeout - THIS SETTING WILL BE
# REMOVED IN A FUTURE RELEASE. For IBC version 3.14.0 and later, see
# the notes for ReloginAfterSecondFactorAuthenticationTimeout above.
#
# For IBC versions earlier than 3.14.0: If you use the IBKR Mobile
# app for second factor authentication, and you fail to complete the
# process before the time limit imposed by IBKR, you can use this
# setting to tell IBC to exit: arrangements can then be made to
# automatically restart IBC in order to initiate the login sequence
# afresh. Otherwise, manual intervention at TWS's
# Second Factor Authentication dialog is needed to complete the # Second Factor Authentication dialog is needed to complete the
# login. # login.
# #
@ -132,29 +180,18 @@ SecondFactorDevice=
ExitAfterSecondFactorAuthenticationTimeout=no ExitAfterSecondFactorAuthenticationTimeout=no
# This setting is only relevant if
# ExitAfterSecondFactorAuthenticationTimeout is set to 'yes'.
#
# It controls how long (in seconds) IBC waits for login to complete
# after the user acknowledges the second factor authentication
# alert at the IBKR Mobile app. If login has not completed after
# this time, IBC terminates.
# The default value is 40.
SecondFactorAuthenticationExitInterval=
# Trading Mode # Trading Mode
# ------------ # ------------
# #
# TWS 955 introduced a new Trading Mode combo box on its login # This indicates whether the live account or the paper trading
# dialog. This indicates whether the live account or the paper # account corresponding to the supplied credentials is to be used.
# trading account corresponding to the supplied credentials is # The allowed values are 'live' (the default) and 'paper'.
# to be used. The allowed values are 'live' (the default) and #
# 'paper'. For earlier versions of TWS this setting has no # If this is set to 'live', then the credentials for the live
# effect. # account must be supplied. If it is set to 'paper', then either
# the live or the paper-trading credentials may be supplied.
TradingMode= TradingMode=paper
# Paper-trading Account Warning # Paper-trading Account Warning
@ -188,7 +225,7 @@ AcceptNonBrokerageAccountWarning=yes
# #
# The default value is 60. # The default value is 60.
LoginDialogDisplayTimeout=20 LoginDialogDisplayTimeout=60
@ -217,7 +254,15 @@ LoginDialogDisplayTimeout=20
# but they are acceptable. # but they are acceptable.
# #
# The default is the current working directory when IBC is # The default is the current working directory when IBC is
# started. # started, unless the TWS_SETTINGS_PATH setting in the relevant
# start script is set.
#
# If both this setting and TWS_SETTINGS_PATH are set, then this
# setting takes priority. Note that if they have different values,
# auto-restart will not work.
#
# NB: this setting is now DEPRECATED. You should use the
# TWS_SETTINGS_PATH setting in the relevant start script.
IbDir=/root/Jts IbDir=/root/Jts
@ -284,15 +329,32 @@ ExistingSessionDetectedAction=primary
# Override TWS API Port Number # Override TWS API Port Number
# ---------------------------- # ----------------------------
# #
# If OverrideTwsApiPort is set to an integer, IBC changes the # If OverrideTwsApiPort is set to an integer, IBC changes the
# 'Socket port' in TWS's API configuration to that number shortly # 'Socket port' in TWS's API configuration to that number shortly
# after startup. Leaving the setting blank will make no change to # after startup (but note that for the FIX Gateway, this setting is
# the current setting. This setting is only intended for use in # actually stored in jts.ini rather than the Gateway's settings
# certain specialized situations where the port number needs to # file). Leaving the setting blank will make no change to
# the current setting. This setting is only intended for use in
# certain specialized situations where the port number needs to
# be set dynamically at run-time, and for the FIX Gateway: most
# non-FIX users will never need it, so don't use it unless you know
# you need it.
OverrideTwsApiPort=4000
# Override TWS Master Client ID
# -----------------------------
#
# If OverrideTwsMasterClientID is set to an integer, IBC changes the
# 'Master Client ID' value in TWS's API configuration to that
# value shortly after startup. Leaving the setting blank will make
# no change to the current setting. This setting is only intended
# for use in certain specialized situations where the value needs to
# be set dynamically at run-time: most users will never need it, # be set dynamically at run-time: most users will never need it,
# so don't use it unless you know you need it. # so don't use it unless you know you need it.
; OverrideTwsApiPort=4002 OverrideTwsMasterClientID=
# Read-only Login # Read-only Login
@ -302,11 +364,13 @@ ExistingSessionDetectedAction=primary
# account security programme, the user will not be asked to perform # account security programme, the user will not be asked to perform
# the second factor authentication action, and login to TWS will # the second factor authentication action, and login to TWS will
# occur automatically in read-only mode: in this mode, placing or # occur automatically in read-only mode: in this mode, placing or
# managing orders is not allowed. If set to 'no', and the user is # managing orders is not allowed.
# enrolled in IB's account security programme, the user must perform #
# the relevant second factor authentication action to complete the # If set to 'no', and the user is enrolled in IB's account security
# login. # programme, the second factor authentication process is handled
# according to the Second Factor Authentication Settings described
# elsewhere in this file.
#
# If the user is not enrolled in IB's account security programme, # If the user is not enrolled in IB's account security programme,
# this setting is ignored. The default is 'no'. # this setting is ignored. The default is 'no'.
@ -326,7 +390,44 @@ ReadOnlyLogin=no
# set the relevant checkbox (this only needs to be done once) and # set the relevant checkbox (this only needs to be done once) and
# not provide a value for this setting. # not provide a value for this setting.
ReadOnlyApi=no ReadOnlyApi=
# API Precautions
# ---------------
#
# These settings relate to the corresponding 'Precautions' checkboxes in the
# API section of the Global Configuration dialog.
#
# For all of these, the accepted values are:
# - 'yes' sets the checkbox
# - 'no' clears the checkbox
# - if not set, the existing TWS/Gateway configuration is unchanged
#
# NB: thess settings are really only supplied for the benefit of new TWS
# or Gateway instances that are being automatically installed and
# started without user intervention, or where user settings are not preserved
# between sessions (eg some Docker containers). Where a user is involved, they
# should use the Global Configuration to set the relevant checkboxes and not
# provide values for these settings.
BypassOrderPrecautions=
BypassBondWarning=
BypassNegativeYieldToWorstConfirmation=
BypassCalledBondWarning=
BypassSameActionPairTradeWarning=
BypassPriceBasedVolatilityRiskWarning=
BypassUSStocksMarketDataInSharesWarning=
BypassRedirectOrderWarning=
BypassNoOverfillProtectionPrecaution=
# Market data size for US stocks - lots or shares # Market data size for US stocks - lots or shares
@ -381,54 +482,145 @@ AcceptBidAskLastSizeDisplayUpdateNotification=accept
SendMarketDataInLotsForUSstocks= SendMarketDataInLotsForUSstocks=
# Trusted API Client IPs
# ----------------------
#
# NB: THIS SETTING IS ONLY RELEVANT FOR THE GATEWAY, AND ONLY WHEN FIX=yes.
# In all other cases it is ignored.
#
# This is a list of IP addresses separated by commas. API clients with IP
# addresses in this list are able to connect to the API without Gateway
# generating the 'Incoming connection' popup.
#
# Note that 127.0.0.1 is always permitted to connect, so do not include it
# in this setting.
TrustedTwsApiClientIPs=
# Reset Order ID Sequence
# -----------------------
#
# The setting resets the order id sequence for orders submitted via the API, so
# that the next invocation of the `NextValidId` API callback will return the
# value 1. The reset occurs when TWS starts.
#
# Note that order ids are reset for all API clients, except those that have
# outstanding (ie incomplete) orders: their order id sequence carries on as
# before.
#
# Valid values are 'yes', 'true', 'false' and 'no'. The default is 'no'.
ResetOrderIdsAtStart=
# This setting specifies IBC's action when TWS displays the dialog asking for
# confirmation of a request to reset the API order id sequence.
#
# Note that the Gateway never displays this dialog, so this setting is ignored
# for a Gateway session.
#
# Valid values consist of two strings separated by a solidus '/'. The first
# value specifies the action to take when the order id reset request resulted
# from setting ResetOrderIdsAtStart=yes. The second specifies the action to
# take when the order id reset request is a result of the user clicking the
# 'Reset API order ID sequence' button in the API configuration. Each value
# must be one of the following:
#
# 'confirm'
# order ids will be reset
#
# 'reject'
# order ids will not be reset
#
# 'ignore'
# IBC will ignore the dialog. The user must take action.
#
# The default setting is ignore/ignore
# Examples:
#
# 'confirm/reject' - confirm order id reset only if ResetOrderIdsAtStart=yes
# and reject any user-initiated requests
#
# 'ignore/confirm' - user must decide what to do if ResetOrderIdsAtStart=yes
# and confirm user-initiated requests
#
# 'reject/ignore' - reject order id reset if ResetOrderIdsAtStart=yes but
# allow user to handle user-initiated requests
ConfirmOrderIdReset=
# ============================================================================= # =============================================================================
# 4. TWS Auto-Closedown # 4. TWS Auto-Logoff and Auto-Restart
# ============================================================================= # =============================================================================
# #
# IMPORTANT NOTE: Starting with TWS 974, this setting no longer # TWS and Gateway insist on being restarted every day. Two alternative
# works properly, because IB have changed the way TWS handles its # automatic options are offered:
# autologoff mechanism.
# #
# You should now configure the TWS autologoff time to something # - Auto-Logoff: at a specified time, TWS shuts down tidily, without
# convenient for you, and restart IBC each day. # restarting.
# #
# Alternatively, discontinue use of IBC and use the auto-relogin # - Auto-Restart: at a specified time, TWS shuts down and then restarts
# mechanism within TWS 974 and later versions (note that the # without the user having to re-autheticate.
# auto-relogin mechanism provided by IB is not available if you #
# use IBC). # The normal way to configure the time at which this happens is via the Lock
# and Exit section of the Configuration dialog. Once this time has been
# configured in this way, the setting persists until the user changes it again.
#
# However, there are situations where there is no user available to do this
# configuration, or where there is no persistent storage (for example some
# Docker images). In such cases, the auto-restart or auto-logoff time can be
# set whenever IBC starts with the settings below.
#
# The value, if specified, must be a time in HH:MM AM/PM format, for example
# 08:00 AM or 10:00 PM. Note that there must be a single space between the
# two parts of this value; also that midnight is "12:00 AM" and midday is
# "12:00 PM".
#
# If no value is specified for either setting, the currently configured
# settings will apply. If a value is supplied for one setting, the other
# setting is cleared. If values are supplied for both settings, only the
# auto-restart time is set, and the auto-logoff time is cleared.
#
# Note that for a normal TWS/Gateway installation with persistent storage
# (for example on a desktop computer) the value will be persisted as if the
# user had set it via the configuration dialog.
#
# If you choose to auto-restart, you should take note of the considerations
# described at the link below. Note that where this information mentions
# 'manual authentication', restarting IBC will do the job (IBKR does not
# recognise the existence of IBC in its docuemntation).
#
# https://www.interactivebrokers.com/en/software/tws/twsguide.htm#usersguidebook/configuretws/auto_restart_info.htm
#
# If you use the "RESTART" command via the IBC command server, and IBC is
# running any version of the Gateway (or a version of TWS earlier than 1018),
# note that this will set the Auto-Restart time in Gateway/TWS's configuration
# dialog to the time at which the restart actually happens (which may be up to
# a minute after the RESTART command is issued). To prevent future auto-
# restarts at this time, you must make sure you have set AutoLogoffTime or
# AutoRestartTime to your desired value before running IBC. NB: this does not
# apply to TWS from version 1018 onwards.
# Set to yes or no (lower case). AutoLogoffTime=
#
# yes means allow TWS to shut down automatically at its
# specified shutdown time, which is set via the TWS
# configuration menu.
#
# no means TWS never shuts down automatically.
#
# NB: IB recommends that you do not keep TWS running
# continuously. If you set this setting to 'no', you may
# experience incorrect TWS operation.
#
# NB: the default for this setting is 'no'. Since this will
# only work properly with TWS versions earlier than 974, you
# should explicitly set this to 'yes' for version 974 and later.
IbAutoClosedown=yes
AutoRestartTime=
# ============================================================================= # =============================================================================
# 5. TWS Tidy Closedown Time # 5. TWS Tidy Closedown Time
# ============================================================================= # =============================================================================
# #
# NB: starting with TWS 974 this is no longer a useful option # Specifies a time at which TWS will close down tidily, with no restart.
# because both TWS and Gateway now have the same auto-logoff
# mechanism, and IBC can no longer avoid this.
# #
# Note that giving this setting a value does not change TWS's # There is little reason to use this setting. It is similar to AutoLogoffTime,
# auto-logoff in any way: any setting will be additional to the # but can include a day-of-the-week, whereas AutoLogoffTime and AutoRestartTime
# TWS auto-logoff. # apply every day. So for example you could use ClosedownAt in conjunction with
# AutoRestartTime to shut down TWS on Friday evenings after the markets
# close, without it running on Saturday as well.
# #
# To tell IBC to tidily close TWS at a specified time every # To tell IBC to tidily close TWS at a specified time every
# day, set this value to <hh:mm>, for example: # day, set this value to <hh:mm>, for example:
@ -487,7 +679,7 @@ AcceptIncomingConnectionAction=reject
# no means the dialog remains on display and must be # no means the dialog remains on display and must be
# handled by the user. # handled by the user.
AllowBlindTrading=yes AllowBlindTrading=no
# Save Settings on a Schedule # Save Settings on a Schedule
@ -530,6 +722,26 @@ AllowBlindTrading=yes
SaveTwsSettingsAt= SaveTwsSettingsAt=
# Confirm Crypto Currency Orders Automatically
# --------------------------------------------
#
# When you place an order for a cryptocurrency contract, a dialog is displayed
# asking you to confirm that you want to place the order, and notifying you
# that you are placing an order to trade cryptocurrency with Paxos, a New York
# limited trust company, and not at Interactive Brokers.
#
# transmit means that the order will be placed automatically, and the
# dialog will then be closed
#
# cancel means that the order will not be placed, and the dialog will
# then be closed
#
# manual means that IBC will take no action and the user must deal
# with the dialog
ConfirmCryptoCurrencyOrders=transmit
# ============================================================================= # =============================================================================
# 7. Settings Specific to Indian Versions of TWS # 7. Settings Specific to Indian Versions of TWS
@ -566,13 +778,17 @@ DismissNSEComplianceNotice=yes
# #
# The port number that IBC listens on for commands # The port number that IBC listens on for commands
# such as "STOP". DO NOT set this to the port number # such as "STOP". DO NOT set this to the port number
# used for TWS API connections. There is no good reason # used for TWS API connections.
# to change this setting unless the port is used by #
# some other application (typically another instance of # The convention is to use 7462 for this port,
# IBC). The default value is 0, which tells IBC not to # but it must be set to a different value from any other
# start the command server # IBC instance that might run at the same time.
#
# The default value is 0, which tells IBC not to start
# the command server
#CommandServerPort=7462 #CommandServerPort=7462
CommandServerPort=0
# Permitted Command Sources # Permitted Command Sources
@ -583,19 +799,19 @@ DismissNSEComplianceNotice=yes
# IBC. Commands can always be sent from the # IBC. Commands can always be sent from the
# same host as IBC is running on. # same host as IBC is running on.
ControlFrom=127.0.0.1 ControlFrom=
# Address for Receiving Commands # Address for Receiving Commands
# ------------------------------ # ------------------------------
# #
# Specifies the IP address on which the Command Server # Specifies the IP address on which the Command Server
# is so listen. For a multi-homed host, this can be used # is to listen. For a multi-homed host, this can be used
# to specify that connection requests are only to be # to specify that connection requests are only to be
# accepted on the specified address. The default is to # accepted on the specified address. The default is to
# accept connection requests on all local addresses. # accept connection requests on all local addresses.
BindAddress=127.0.0.1 BindAddress=
# Command Prompt # Command Prompt
@ -621,7 +837,7 @@ CommandPrompt=
# information is sent. The default is that such information # information is sent. The default is that such information
# is not sent. # is not sent.
SuppressInfoMessages=no SuppressInfoMessages=yes
@ -651,10 +867,10 @@ SuppressInfoMessages=no
# The LogStructureScope setting indicates which windows are # The LogStructureScope setting indicates which windows are
# eligible for structure logging: # eligible for structure logging:
# #
# - if set to 'known', only windows that IBC recognizes # - (default value) if set to 'known', only windows that
# are eligible - these are windows that IBC has some # IBC recognizes are eligible - these are windows that
# interest in monitoring, usually to take some action # IBC has some interest in monitoring, usually to take
# on the user's behalf; # some action on the user's behalf;
# #
# - if set to 'unknown', only windows that IBC does not # - if set to 'unknown', only windows that IBC does not
# recognize are eligible. Most windows displayed by # recognize are eligible. Most windows displayed by
@ -667,9 +883,8 @@ SuppressInfoMessages=no
# - if set to 'all', then every window displayed by TWS # - if set to 'all', then every window displayed by TWS
# is eligible. # is eligible.
# #
# The default value is 'known'.
LogStructureScope=all LogStructureScope=known
# When to Log Window Structure # When to Log Window Structure
@ -682,13 +897,15 @@ LogStructureScope=all
# structure of an eligible window the first time it # structure of an eligible window the first time it
# is encountered; # is encountered;
# #
# - if set to 'openclose', the structure is logged every
# time an eligible window is opened or closed;
#
# - if set to 'activate', the structure is logged every # - if set to 'activate', the structure is logged every
# time an eligible window is made active; # time an eligible window is made active;
# #
# - if set to 'never' or 'no' or 'false', structure # - (default value) if set to 'never' or 'no' or 'false',
# information is never logged. # structure information is never logged.
# #
# The default value is 'never'.
LogStructureWhen=never LogStructureWhen=never
@ -708,4 +925,3 @@ LogStructureWhen=never
#LogComponents= #LogComponents=

View File

@ -327,7 +327,11 @@ class MktPair(Struct, frozen=True):
) -> dict: ) -> dict:
d = super().to_dict(**kwargs) d = super().to_dict(**kwargs)
d['src'] = self.src.to_dict(**kwargs) d['src'] = self.src.to_dict(**kwargs)
d['dst'] = self.dst.to_dict(**kwargs)
if not isinstance(self.dst, str):
d['dst'] = self.dst.to_dict(**kwargs)
else:
d['dst'] = str(self.dst)
d['price_tick'] = str(self.price_tick) d['price_tick'] = str(self.price_tick)
d['size_tick'] = str(self.size_tick) d['size_tick'] = str(self.size_tick)
@ -349,11 +353,16 @@ class MktPair(Struct, frozen=True):
Constructor for a received msg-dict normally received over IPC. Constructor for a received msg-dict normally received over IPC.
''' '''
dst_asset_msg = msg.pop('dst') if not isinstance(
dst = Asset.from_msg(dst_asset_msg) # .copy() dst_asset_msg := msg.pop('dst'),
str,
):
dst: Asset = Asset.from_msg(dst_asset_msg) # .copy()
else:
dst: str = dst_asset_msg
src_asset_msg = msg.pop('src') src_asset_msg: dict = msg.pop('src')
src = Asset.from_msg(src_asset_msg) # .copy() src: Asset = Asset.from_msg(src_asset_msg) # .copy()
# XXX NOTE: ``msgspec`` can encode `Decimal` but it doesn't # XXX NOTE: ``msgspec`` can encode `Decimal` but it doesn't
# decide to it by default since we aren't spec-cing these # decide to it by default since we aren't spec-cing these

View File

@ -229,8 +229,8 @@ _samplings: dict[int, tuple[str, str]] = {
# throughput can be made faster during backfilling. # throughput can be made faster during backfilling.
60: ( 60: (
'1 min', '1 min',
'1 D', '2 D',
pendulum.duration(days=1), pendulum.duration(days=2),
), ),
} }

View File

@ -818,7 +818,7 @@ async def stream_quotes(
details: ibis.ContractDetails details: ibis.ContractDetails
async with ( async with (
open_data_client() as proxy, open_data_client() as proxy,
trio.open_nursery() as tn, # trio.open_nursery() as tn,
): ):
mkt, details = await get_mkt_info( mkt, details = await get_mkt_info(
sym, sym,

View File

@ -214,7 +214,7 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
last = time.time() last = time.time()
async for pattern in stream: async for pattern in stream:
log.info(f'received {pattern}') log.info(f'received {pattern}')
now = time.time() now: float = time.time()
# this causes tractor hang... # this causes tractor hang...
# assert 0 # assert 0
@ -261,7 +261,9 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
# defined adhoc symbol set. # defined adhoc symbol set.
stock_results = [] stock_results = []
async def stash_results(target: Awaitable[list]): async def extend_results(
target: Awaitable[list]
) -> None:
try: try:
results = await target results = await target
except tractor.trionics.Lagged: except tractor.trionics.Lagged:
@ -274,7 +276,7 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
with trio.move_on_after(3) as cs: with trio.move_on_after(3) as cs:
async with trio.open_nursery() as sn: async with trio.open_nursery() as sn:
sn.start_soon( sn.start_soon(
stash_results, extend_results,
proxy.search_symbols( proxy.search_symbols(
pattern=pattern, pattern=pattern,
upto=5, upto=5,
@ -289,8 +291,10 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
f'Search timeout? {proxy._aio_ns.ib.client}' f'Search timeout? {proxy._aio_ns.ib.client}'
) )
continue continue
else: elif stock_results:
break break
# else:
await tractor.pause()
# # match against our ad-hoc set immediately # # match against our ad-hoc set immediately
# adhoc_matches = fuzzy.extract( # adhoc_matches = fuzzy.extract(

View File

@ -42,35 +42,15 @@ if TYPE_CHECKING:
from .feed import Feed from .feed import Feed
# TODO: ideas for further abstractions as per
# https://github.com/pikers/piker/issues/216 and
# https://github.com/pikers/piker/issues/270:
# - a ``Cascade`` would be the minimal "connection" of 2 ``Flumes``
# as per circuit parlance:
# https://en.wikipedia.org/wiki/Two-port_network#Cascade_connection
# - could cover the combination of our `FspAdmin` and the
# backend `.fsp._engine` related machinery to "connect" one flume
# to another?
# - a (financial signal) ``Flow`` would be the a "collection" of such
# minmial cascades. Some engineering based jargon concepts:
# - https://en.wikipedia.org/wiki/Signal_chain
# - https://en.wikipedia.org/wiki/Daisy_chain_(electrical_engineering)
# - https://en.wikipedia.org/wiki/Audio_signal_flow
# - https://en.wikipedia.org/wiki/Digital_signal_processing#Implementation
# - https://en.wikipedia.org/wiki/Dataflow_programming
# - https://en.wikipedia.org/wiki/Signal_programming
# - https://en.wikipedia.org/wiki/Incremental_computing
class Flume(Struct): class Flume(Struct):
''' '''
Composite reference type which points to all the addressing handles Composite reference type which points to all the addressing
and other meta-data necessary for the read, measure and management handles and other meta-data necessary for the read, measure and
of a set of real-time updated data flows. management of a set of real-time updated data flows.
Can be thought of as a "flow descriptor" or "flow frame" which Can be thought of as a "flow descriptor" or "flow frame" which
describes the high level properties of a set of data flows that can describes the high level properties of a set of data flows that
be used seamlessly across process-memory boundaries. can be used seamlessly across process-memory boundaries.
Each instance's sub-components normally includes: Each instance's sub-components normally includes:
- a msg oriented quote stream provided via an IPC transport - a msg oriented quote stream provided via an IPC transport
@ -93,6 +73,7 @@ class Flume(Struct):
# private shm refs loaded dynamically from tokens # private shm refs loaded dynamically from tokens
_hist_shm: ShmArray | None = None _hist_shm: ShmArray | None = None
_rt_shm: ShmArray | None = None _rt_shm: ShmArray | None = None
_readonly: bool = True
stream: tractor.MsgStream | None = None stream: tractor.MsgStream | None = None
izero_hist: int = 0 izero_hist: int = 0
@ -109,7 +90,7 @@ class Flume(Struct):
if self._rt_shm is None: if self._rt_shm is None:
self._rt_shm = attach_shm_array( self._rt_shm = attach_shm_array(
token=self._rt_shm_token, token=self._rt_shm_token,
readonly=True, readonly=self._readonly,
) )
return self._rt_shm return self._rt_shm
@ -122,12 +103,10 @@ class Flume(Struct):
'No shm token has been set for the history buffer?' 'No shm token has been set for the history buffer?'
) )
if ( if self._hist_shm is None:
self._hist_shm is None
):
self._hist_shm = attach_shm_array( self._hist_shm = attach_shm_array(
token=self._hist_shm_token, token=self._hist_shm_token,
readonly=True, readonly=self._readonly,
) )
return self._hist_shm return self._hist_shm
@ -146,10 +125,10 @@ class Flume(Struct):
period and ratio between them. period and ratio between them.
''' '''
times = self.hist_shm.array['time'] times: np.ndarray = self.hist_shm.array['time']
end = pendulum.from_timestamp(times[-1]) end: float | int = pendulum.from_timestamp(times[-1])
start = pendulum.from_timestamp(times[times != times[-1]][-1]) start: float | int = pendulum.from_timestamp(times[times != times[-1]][-1])
hist_step_size_s = (end - start).seconds hist_step_size_s: float = (end - start).seconds
times = self.rt_shm.array['time'] times = self.rt_shm.array['time']
end = pendulum.from_timestamp(times[-1]) end = pendulum.from_timestamp(times[-1])
@ -169,17 +148,25 @@ class Flume(Struct):
msg = self.to_dict() msg = self.to_dict()
msg['mkt'] = self.mkt.to_dict() msg['mkt'] = self.mkt.to_dict()
# can't serialize the stream or feed objects, it's expected # NOTE: pop all un-msg-serializable fields:
# you'll have a ref to it since this msg should be rxed on # - `tractor.MsgStream`
# a stream on whatever far end IPC.. # - `Feed`
# - `Shmarray`
# it's expected the `.from_msg()` on the other side
# will get instead some kind of msg-compat version
# that it can load.
msg.pop('stream') msg.pop('stream')
msg.pop('feed') msg.pop('feed')
msg.pop('_rt_shm')
msg.pop('_hist_shm')
return msg return msg
@classmethod @classmethod
def from_msg( def from_msg(
cls, cls,
msg: dict, msg: dict,
readonly: bool = True,
) -> dict: ) -> dict:
''' '''
@ -190,7 +177,11 @@ class Flume(Struct):
mkt_msg = msg.pop('mkt') mkt_msg = msg.pop('mkt')
from ..accounting import MktPair # cycle otherwise.. from ..accounting import MktPair # cycle otherwise..
mkt = MktPair.from_msg(mkt_msg) mkt = MktPair.from_msg(mkt_msg)
return cls(mkt=mkt, **msg) msg |= {'_readonly': readonly}
return cls(
mkt=mkt,
**msg,
)
def get_index( def get_index(
self, self,

View File

@ -57,6 +57,7 @@ from ._sampling import (
from ..brokers._util import ( from ..brokers._util import (
DataUnavailable, DataUnavailable,
) )
from ..storage import TimeseriesNotFound
if TYPE_CHECKING: if TYPE_CHECKING:
from bidict import bidict from bidict import bidict
@ -690,13 +691,18 @@ async def tsdb_backfill(
# but if not then below the remaining history can be lazy # but if not then below the remaining history can be lazy
# loaded? # loaded?
fqme: str = mkt.fqme fqme: str = mkt.fqme
tsdb_entry: tuple | None = await storage.load(
fqme,
timeframe=timeframe,
)
last_tsdb_dt: datetime | None = None last_tsdb_dt: datetime | None = None
if tsdb_entry: try:
tsdb_entry: tuple | None = await storage.load(
fqme,
timeframe=timeframe,
)
except TimeseriesNotFound:
log.warning(
f'No timeseries yet for {fqme}'
)
else:
( (
tsdb_history, tsdb_history,
first_tsdb_dt, first_tsdb_dt,
@ -963,7 +969,8 @@ async def manage_history(
sub_for_broadcasts=False, sub_for_broadcasts=False,
) as sample_stream: ) as sample_stream:
# register 1s and 1m buffers with the global incrementer task # register 1s and 1m buffers with the global
# incrementer task
log.info(f'Connected to sampler stream: {sample_stream}') log.info(f'Connected to sampler stream: {sample_stream}')
for timeframe in [60, 1]: for timeframe in [60, 1]:

View File

@ -26,7 +26,10 @@ from ._api import (
maybe_mk_fsp_shm, maybe_mk_fsp_shm,
Fsp, Fsp,
) )
from ._engine import cascade from ._engine import (
cascade,
Cascade,
)
from ._volume import ( from ._volume import (
dolla_vlm, dolla_vlm,
flow_rates, flow_rates,
@ -35,6 +38,7 @@ from ._volume import (
__all__: list[str] = [ __all__: list[str] = [
'cascade', 'cascade',
'Cascade',
'maybe_mk_fsp_shm', 'maybe_mk_fsp_shm',
'Fsp', 'Fsp',
'dolla_vlm', 'dolla_vlm',
@ -46,9 +50,12 @@ __all__: list[str] = [
async def latency( async def latency(
source: 'TickStream[Dict[str, float]]', # noqa source: 'TickStream[Dict[str, float]]', # noqa
ohlcv: np.ndarray ohlcv: np.ndarray
) -> AsyncIterator[np.ndarray]: ) -> AsyncIterator[np.ndarray]:
"""Latency measurements, broker to piker. '''
""" Latency measurements, broker to piker.
'''
# TODO: do we want to offer yielding this async # TODO: do we want to offer yielding this async
# before the rt data connection comes up? # before the rt data connection comes up?

View File

@ -18,13 +18,12 @@
core task logic for processing chains core task logic for processing chains
''' '''
from dataclasses import dataclass from __future__ import annotations
from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
from typing import ( from typing import (
AsyncIterator, AsyncIterator,
Callable, Callable,
Optional,
Union,
) )
import numpy as np import numpy as np
@ -33,9 +32,9 @@ from trio_typing import TaskStatus
import tractor import tractor
from tractor.msg import NamespacePath from tractor.msg import NamespacePath
from piker.types import Struct
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from .. import data from .. import data
from ..data import attach_shm_array
from ..data.feed import ( from ..data.feed import (
Flume, Flume,
Feed, Feed,
@ -56,12 +55,6 @@ from ..toolz import Profiler
log = get_logger(__name__) log = get_logger(__name__)
@dataclass
class TaskTracker:
complete: trio.Event
cs: trio.CancelScope
async def filter_quotes_by_sym( async def filter_quotes_by_sym(
sym: str, sym: str,
@ -82,30 +75,168 @@ async def filter_quotes_by_sym(
if quote: if quote:
yield quote yield quote
# TODO: unifying the abstractions in this FSP subsys/layer:
# -[ ] move the `.data.flows.Flume` type into this
# module/subsys/pkg?
# -[ ] ideas for further abstractions as per
# - https://github.com/pikers/piker/issues/216,
# - https://github.com/pikers/piker/issues/270:
# - a (financial signal) ``Flow`` would be the a "collection" of such
# minmial cascades. Some engineering based jargon concepts:
# - https://en.wikipedia.org/wiki/Signal_chain
# - https://en.wikipedia.org/wiki/Daisy_chain_(electrical_engineering)
# - https://en.wikipedia.org/wiki/Audio_signal_flow
# - https://en.wikipedia.org/wiki/Digital_signal_processing#Implementation
# - https://en.wikipedia.org/wiki/Dataflow_programming
# - https://en.wikipedia.org/wiki/Signal_programming
# - https://en.wikipedia.org/wiki/Incremental_computing
# - https://en.wikipedia.org/wiki/Signal-flow_graph
# - https://en.wikipedia.org/wiki/Signal-flow_graph#Basic_components
async def fsp_compute( # -[ ] we probably want to eval THE BELOW design and unify with the
# proto `TaskManager` in the `tractor` dev branch as well as with
# our below idea for `Cascade`:
# - https://github.com/goodboy/tractor/pull/363
class Cascade(Struct):
'''
As per sig-proc engineering parlance, this is a chaining of
`Flume`s, which are themselves collections of "Streams"
implemented currently via `ShmArray`s.
A `Cascade` is be the minimal "connection" of 2 `Flumes`
as per circuit parlance:
https://en.wikipedia.org/wiki/Two-port_network#Cascade_connection
TODO:
-[ ] could cover the combination of our `FspAdmin` and the
backend `.fsp._engine` related machinery to "connect" one flume
to another?
'''
# TODO: make these `Flume`s
src: Flume
dst: Flume
tn: trio.Nursery
fsp: Fsp # UI-side middleware ctl API
# filled during cascade/.bind_func() (fsp_compute) init phases
bind_func: Callable | None = None
complete: trio.Event | None = None
cs: trio.CancelScope | None = None
client_stream: tractor.MsgStream | None = None
async def resync(self) -> int:
# TODO: adopt an incremental update engine/approach
# where possible here eventually!
log.info(f're-syncing fsp {self.fsp.name} to source')
self.cs.cancel()
await self.complete.wait()
index: int = await self.tn.start(self.bind_func)
# always trigger UI refresh after history update,
# see ``piker.ui._fsp.FspAdmin.open_chain()`` and
# ``piker.ui._display.trigger_update()``.
dst_shm: ShmArray = self.dst.rt_shm
await self.client_stream.send({
'fsp_update': {
'key': dst_shm.token,
'first': dst_shm._first.value,
'last': dst_shm._last.value,
}
})
return index
def is_synced(self) -> tuple[bool, int, int]:
'''
Predicate to dertmine if a destination FSP
output array is aligned to its source array.
'''
src_shm: ShmArray = self.src.rt_shm
dst_shm: ShmArray = self.dst.rt_shm
step_diff = src_shm.index - dst_shm.index
len_diff = abs(len(src_shm.array) - len(dst_shm.array))
synced: bool = not (
# the source is likely backfilling and we must
# sync history calculations
len_diff > 2
# we aren't step synced to the source and may be
# leading/lagging by a step
or step_diff > 1
or step_diff < 0
)
if not synced:
fsp: Fsp = self.fsp
log.warning(
'***DESYNCED FSP***\n'
f'{fsp.ns_path}@{src_shm.token}\n'
f'step_diff: {step_diff}\n'
f'len_diff: {len_diff}\n'
)
return (
synced,
step_diff,
len_diff,
)
async def poll_and_sync_to_step(self) -> int:
synced, step_diff, _ = self.is_synced()
while not synced:
await self.resync()
synced, step_diff, _ = self.is_synced()
return step_diff
@acm
async def open_edge(
self,
bind_func: Callable,
) -> int:
self.bind_func = bind_func
index = await self.tn.start(bind_func)
yield index
# TODO: what do we want on teardown/error?
# -[ ] dynamic reconnection after update?
async def connect_streams(
casc: Cascade,
mkt: MktPair, mkt: MktPair,
flume: Flume,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
src: Flume,
dst: Flume,
src: ShmArray, edge_func: Callable,
dst: ShmArray,
func: Callable,
# attach_stream: bool = False, # attach_stream: bool = False,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
'''
Stream and per-sample compute and write the cascade of
2 `Flumes`/streams given some operating `func`.
https://en.wikipedia.org/wiki/Signal-flow_graph#Basic_components
Not literally, but something like:
edge_func(Flume_in) -> Flume_out
'''
profiler = Profiler( profiler = Profiler(
delayed=False, delayed=False,
disabled=True disabled=True
) )
fqme = mkt.fqme # TODO: just pull it from src.mkt.fqme no?
out_stream = func( # fqme: str = mkt.fqme
fqme: str = src.mkt.fqme
# TODO: dynamic introspection of what the underlying (vertex)
# function actually requires from input node (flumes) then
# deliver those inputs as part of a graph "compilation" step?
out_stream = edge_func(
# TODO: do we even need this if we do the feed api right? # TODO: do we even need this if we do the feed api right?
# shouldn't a local stream do this before we get a handle # shouldn't a local stream do this before we get a handle
@ -113,20 +244,21 @@ async def fsp_compute(
# async itertools style? # async itertools style?
filter_quotes_by_sym(fqme, quote_stream), filter_quotes_by_sym(fqme, quote_stream),
# XXX: currently the ``ohlcv`` arg # XXX: currently the ``ohlcv`` arg, but we should allow
flume.rt_shm, # (dynamic) requests for src flume (node) streams?
src.rt_shm,
) )
# HISTORY COMPUTE PHASE # HISTORY COMPUTE PHASE
# conduct a single iteration of fsp with historical bars input # conduct a single iteration of fsp with historical bars input
# and get historical output. # and get historical output.
history_output: Union[ history_output: (
dict[str, np.ndarray], # multi-output case dict[str, np.ndarray] # multi-output case
np.ndarray, # single output case | np.ndarray, # single output case
] )
history_output = await anext(out_stream) history_output = await anext(out_stream)
func_name = func.__name__ func_name = edge_func.__name__
profiler(f'{func_name} generated history') profiler(f'{func_name} generated history')
# build struct array with an 'index' field to push as history # build struct array with an 'index' field to push as history
@ -134,10 +266,12 @@ async def fsp_compute(
# TODO: push using a[['f0', 'f1', .., 'fn']] = .. syntax no? # TODO: push using a[['f0', 'f1', .., 'fn']] = .. syntax no?
# if the output array is multi-field then push # if the output array is multi-field then push
# each respective field. # each respective field.
fields = getattr(dst.array.dtype, 'fields', None).copy() dst_shm: ShmArray = dst.rt_shm
fields = getattr(dst_shm.array.dtype, 'fields', None).copy()
fields.pop('index') fields.pop('index')
history_by_field: Optional[np.ndarray] = None history_by_field: np.ndarray | None = None
src_time = src.array['time'] src_shm: ShmArray = src.rt_shm
src_time = src_shm.array['time']
if ( if (
fields and fields and
@ -156,7 +290,7 @@ async def fsp_compute(
if history_by_field is None: if history_by_field is None:
if output is None: if output is None:
length = len(src.array) length = len(src_shm.array)
else: else:
length = len(output) length = len(output)
@ -165,7 +299,7 @@ async def fsp_compute(
# will be pushed to shm. # will be pushed to shm.
history_by_field = np.zeros( history_by_field = np.zeros(
length, length,
dtype=dst.array.dtype dtype=dst_shm.array.dtype
) )
if output is None: if output is None:
@ -182,13 +316,13 @@ async def fsp_compute(
) )
history_by_field = np.zeros( history_by_field = np.zeros(
len(history_output), len(history_output),
dtype=dst.array.dtype dtype=dst_shm.array.dtype
) )
history_by_field[func_name] = history_output history_by_field[func_name] = history_output
history_by_field['time'] = src_time[-len(history_by_field):] history_by_field['time'] = src_time[-len(history_by_field):]
history_output['time'] = src.array['time'] history_output['time'] = src_shm.array['time']
# TODO: XXX: # TODO: XXX:
# THERE'S A BIG BUG HERE WITH THE `index` field since we're # THERE'S A BIG BUG HERE WITH THE `index` field since we're
@ -201,11 +335,11 @@ async def fsp_compute(
# is `index` aware such that historical data can be indexed # is `index` aware such that historical data can be indexed
# relative to the true first datum? Not sure if this is sane # relative to the true first datum? Not sure if this is sane
# for incremental compuations. # for incremental compuations.
first = dst._first.value = src._first.value first = dst_shm._first.value = src_shm._first.value
# TODO: can we use this `start` flag instead of the manual # TODO: can we use this `start` flag instead of the manual
# setting above? # setting above?
index = dst.push( index = dst_shm.push(
history_by_field, history_by_field,
start=first, start=first,
) )
@ -216,12 +350,9 @@ async def fsp_compute(
# setup a respawn handle # setup a respawn handle
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
# TODO: might be better to just make a "restart" method where casc.cs = cs
# the target task is spawned implicitly and then the event is casc.complete = trio.Event()
# set via some higher level api? At that poing we might as well task_status.started(index)
# be writing a one-cancels-one nursery though right?
tracker = TaskTracker(trio.Event(), cs)
task_status.started((tracker, index))
profiler(f'{func_name} yield last index') profiler(f'{func_name} yield last index')
@ -235,12 +366,12 @@ async def fsp_compute(
log.debug(f"{func_name}: {processed}") log.debug(f"{func_name}: {processed}")
key, output = processed key, output = processed
# dst.array[-1][key] = output # dst.array[-1][key] = output
dst.array[[key, 'time']][-1] = ( dst_shm.array[[key, 'time']][-1] = (
output, output,
# TODO: what about pushing ``time.time_ns()`` # TODO: what about pushing ``time.time_ns()``
# in which case we'll need to round at the graphics # in which case we'll need to round at the graphics
# processing / sampling layer? # processing / sampling layer?
src.array[-1]['time'] src_shm.array[-1]['time']
) )
# NOTE: for now we aren't streaming this to the consumer # NOTE: for now we aren't streaming this to the consumer
@ -252,7 +383,7 @@ async def fsp_compute(
# N-consumers who subscribe for the real-time output, # N-consumers who subscribe for the real-time output,
# which we'll likely want to implement using local-mem # which we'll likely want to implement using local-mem
# chans for the fan out? # chans for the fan out?
# index = src.index # index = src_shm.index
# if attach_stream: # if attach_stream:
# await client_stream.send(index) # await client_stream.send(index)
@ -262,7 +393,7 @@ async def fsp_compute(
# log.info(f'FSP quote too fast: {hz}') # log.info(f'FSP quote too fast: {hz}')
# last = time.time() # last = time.time()
finally: finally:
tracker.complete.set() casc.complete.set()
@tractor.context @tractor.context
@ -273,15 +404,15 @@ async def cascade(
# data feed key # data feed key
fqme: str, fqme: str,
src_shm_token: dict, # flume pair cascaded using an "edge function"
dst_shm_token: tuple[str, np.dtype], src_flume_addr: dict,
dst_flume_addr: dict,
ns_path: NamespacePath, ns_path: NamespacePath,
shm_registry: dict[str, _Token], shm_registry: dict[str, _Token],
zero_on_step: bool = False, zero_on_step: bool = False,
loglevel: Optional[str] = None, loglevel: str | None = None,
) -> None: ) -> None:
''' '''
@ -297,8 +428,14 @@ async def cascade(
if loglevel: if loglevel:
get_console_log(loglevel) get_console_log(loglevel)
src = attach_shm_array(token=src_shm_token) src: Flume = Flume.from_msg(src_flume_addr)
dst = attach_shm_array(readonly=False, token=dst_shm_token) dst: Flume = Flume.from_msg(
dst_flume_addr,
readonly=False,
)
# src: ShmArray = attach_shm_array(token=src_shm_token)
# dst: ShmArray = attach_shm_array(readonly=False, token=dst_shm_token)
reg = _load_builtins() reg = _load_builtins()
lines = '\n'.join([f'{key.rpartition(":")[2]} => {key}' for key in reg]) lines = '\n'.join([f'{key.rpartition(":")[2]} => {key}' for key in reg])
@ -306,11 +443,11 @@ async def cascade(
f'Registered FSP set:\n{lines}' f'Registered FSP set:\n{lines}'
) )
# update actorlocal flows table which registers # NOTE XXX: update actorlocal flows table which registers
# readonly "instances" of this fsp for symbol/source # readonly "instances" of this fsp for symbol/source so that
# so that consumer fsps can look it up by source + fsp. # consumer fsps can look it up by source + fsp.
# TODO: ugh i hate this wind/unwind to list over the wire # TODO: ugh i hate this wind/unwind to list over the wire but
# but not sure how else to do it. # not sure how else to do it.
for (token, fsp_name, dst_token) in shm_registry: for (token, fsp_name, dst_token) in shm_registry:
Fsp._flow_registry[( Fsp._flow_registry[(
_Token.from_msg(token), _Token.from_msg(token),
@ -320,12 +457,15 @@ async def cascade(
fsp: Fsp = reg.get( fsp: Fsp = reg.get(
NamespacePath(ns_path) NamespacePath(ns_path)
) )
func = fsp.func func: Callable = fsp.func
if not func: if not func:
# TODO: assume it's a func target path # TODO: assume it's a func target path
raise ValueError(f'Unknown fsp target: {ns_path}') raise ValueError(f'Unknown fsp target: {ns_path}')
_fqme: str = src.mkt.fqme
assert _fqme == fqme
# open a data feed stream with requested broker # open a data feed stream with requested broker
feed: Feed feed: Feed
async with data.feed.maybe_open_feed( async with data.feed.maybe_open_feed(
@ -339,177 +479,142 @@ async def cascade(
) as feed: ) as feed:
flume = feed.flumes[fqme] flume: Flume = feed.flumes[fqme]
mkt = flume.mkt # XXX: can't do this since flume.feed will be set XD
assert src.token == flume.rt_shm.token # assert flume == src
assert flume.mkt == src.mkt
mkt: MktPair = flume.mkt
# NOTE: FOR NOW, sanity checks around the feed as being
# always the src flume (until we get to fancier/lengthier
# chains/graphs.
assert src.rt_shm.token == flume.rt_shm.token
# XXX: won't work bc the _hist_shm_token value will be
# list[list] after IPC..
# assert flume.to_msg() == src_flume_addr
profiler(f'{func}: feed up') profiler(f'{func}: feed up')
func_name = func.__name__ func_name: str = func.__name__
async with ( async with (
trio.open_nursery() as n, trio.open_nursery() as tn,
): ):
# TODO: might be better to just make a "restart" method where
# the target task is spawned implicitly and then the event is
# set via some higher level api? At that poing we might as well
# be writing a one-cancels-one nursery though right?
casc = Cascade(
src,
dst,
tn,
fsp,
)
# TODO: this seems like it should be wrapped somewhere?
fsp_target = partial( fsp_target = partial(
connect_streams,
fsp_compute, casc=casc,
mkt=mkt, mkt=mkt,
flume=flume,
quote_stream=flume.stream, quote_stream=flume.stream,
# shm # flumes and shm passthrough
src=src, src=src,
dst=dst, dst=dst,
# target # chain function which takes src flume input(s)
func=func # and renders dst flume output(s)
edge_func=func
) )
async with casc.open_edge(
bind_func=fsp_target,
) as index:
# casc.bind_func = fsp_target
# index = await tn.start(fsp_target)
dst_shm: ShmArray = dst.rt_shm
src_shm: ShmArray = src.rt_shm
tracker, index = await n.start(fsp_target) if zero_on_step:
last = dst.rt_shm.array[-1:]
zeroed = np.zeros(last.shape, dtype=last.dtype)
if zero_on_step: profiler(f'{func_name}: fsp up')
last = dst.array[-1:]
zeroed = np.zeros(last.shape, dtype=last.dtype)
profiler(f'{func_name}: fsp up') # sync to client-side actor
await ctx.started(index)
# sync client # XXX: rt stream with client which we MUST
await ctx.started(index) # open here (and keep it open) in order to make
# incremental "updates" as history prepends take
# place.
async with ctx.open_stream() as client_stream:
casc.client_stream: tractor.MsgStream = client_stream
# XXX: rt stream with client which we MUST s, step, ld = casc.is_synced()
# open here (and keep it open) in order to make
# incremental "updates" as history prepends take
# place.
async with ctx.open_stream() as client_stream:
# TODO: these likely should all become # detect sample period step for subscription to increment
# methods of this ``TaskLifetime`` or wtv # signal
# abstraction.. times = src.rt_shm.array['time']
async def resync( if len(times) > 1:
tracker: TaskTracker, last_ts = times[-1]
delay_s: float = float(last_ts - times[times != last_ts][-1])
else:
# our default "HFT" sample rate.
delay_s: float = _default_delay_s
) -> tuple[TaskTracker, int]: # sub and increment the underlying shared memory buffer
# TODO: adopt an incremental update engine/approach # on every step msg received from the global `samplerd`
# where possible here eventually! # service.
log.info(f're-syncing fsp {func_name} to source') async with open_sample_stream(
tracker.cs.cancel() float(delay_s)
await tracker.complete.wait() ) as istream:
tracker, index = await n.start(fsp_target)
# always trigger UI refresh after history update, profiler(f'{func_name}: sample stream up')
# see ``piker.ui._fsp.FspAdmin.open_chain()`` and profiler.finish()
# ``piker.ui._display.trigger_update()``.
await client_stream.send({
'fsp_update': {
'key': dst_shm_token,
'first': dst._first.value,
'last': dst._last.value,
}
})
return tracker, index
def is_synced( async for i in istream:
src: ShmArray, # print(f'FSP incrementing {i}')
dst: ShmArray
) -> tuple[bool, int, int]:
'''
Predicate to dertmine if a destination FSP
output array is aligned to its source array.
''' # respawn the compute task if the source
step_diff = src.index - dst.index # array has been updated such that we compute
len_diff = abs(len(src.array) - len(dst.array)) # new history from the (prepended) source.
return not ( synced, step_diff, _ = casc.is_synced()
# the source is likely backfilling and we must if not synced:
# sync history calculations step_diff: int = await casc.poll_and_sync_to_step()
len_diff > 2
# we aren't step synced to the source and may be # skip adding a last bar since we should already
# leading/lagging by a step # be step alinged
or step_diff > 1 if step_diff == 0:
or step_diff < 0 continue
), step_diff, len_diff
async def poll_and_sync_to_step( # read out last shm row, copy and write new row
tracker: TaskTracker, array = dst_shm.array
src: ShmArray,
dst: ShmArray,
) -> tuple[TaskTracker, int]: # some metrics like vlm should be reset
# to zero every step.
if zero_on_step:
last = zeroed
else:
last = array[-1:].copy()
synced, step_diff, _ = is_synced(src, dst) dst.rt_shm.push(last)
while not synced:
tracker, index = await resync(tracker)
synced, step_diff, _ = is_synced(src, dst)
return tracker, step_diff # sync with source buffer's time step
src_l2 = src_shm.array[-2:]
src_li, src_lt = src_l2[-1][['index', 'time']]
src_2li, src_2lt = src_l2[-2][['index', 'time']]
dst_shm._array['time'][src_li] = src_lt
dst_shm._array['time'][src_2li] = src_2lt
s, step, ld = is_synced(src, dst) # last2 = dst.array[-2:]
# if (
# detect sample period step for subscription to increment # last2[-1]['index'] != src_li
# signal # or last2[-2]['index'] != src_2li
times = src.array['time'] # ):
if len(times) > 1: # dstl2 = list(last2)
last_ts = times[-1] # srcl2 = list(src_l2)
delay_s = float(last_ts - times[times != last_ts][-1]) # print(
else: # # f'{dst.token}\n'
# our default "HFT" sample rate. # f'src: {srcl2}\n'
delay_s = _default_delay_s # f'dst: {dstl2}\n'
# )
# sub and increment the underlying shared memory buffer
# on every step msg received from the global `samplerd`
# service.
async with open_sample_stream(float(delay_s)) as istream:
profiler(f'{func_name}: sample stream up')
profiler.finish()
async for i in istream:
# print(f'FSP incrementing {i}')
# respawn the compute task if the source
# array has been updated such that we compute
# new history from the (prepended) source.
synced, step_diff, _ = is_synced(src, dst)
if not synced:
tracker, step_diff = await poll_and_sync_to_step(
tracker,
src,
dst,
)
# skip adding a last bar since we should already
# be step alinged
if step_diff == 0:
continue
# read out last shm row, copy and write new row
array = dst.array
# some metrics like vlm should be reset
# to zero every step.
if zero_on_step:
last = zeroed
else:
last = array[-1:].copy()
dst.push(last)
# sync with source buffer's time step
src_l2 = src.array[-2:]
src_li, src_lt = src_l2[-1][['index', 'time']]
src_2li, src_2lt = src_l2[-2][['index', 'time']]
dst._array['time'][src_li] = src_lt
dst._array['time'][src_2li] = src_2lt
# last2 = dst.array[-2:]
# if (
# last2[-1]['index'] != src_li
# or last2[-2]['index'] != src_2li
# ):
# dstl2 = list(last2)
# srcl2 = list(src_l2)
# print(
# # f'{dst.token}\n'
# f'src: {srcl2}\n'
# f'dst: {dstl2}\n'
# )

View File

@ -84,10 +84,10 @@ async def open_piker_runtime(
a root actor. a root actor.
''' '''
# check for existing runtime, boot it
# if not already running.
try: try:
# check for existing runtime actor = tractor.current_actor()
actor = tractor.current_actor().uid
except tractor._exceptions.NoRuntime: except tractor._exceptions.NoRuntime:
tractor._state._runtime_vars[ tractor._state._runtime_vars[
'piker_vars' 'piker_vars'
@ -116,15 +116,16 @@ async def open_piker_runtime(
enable_modules=enable_modules, enable_modules=enable_modules,
**tractor_kwargs, **tractor_kwargs,
) as _, ) as actor,
open_registry( open_registry(
registry_addrs, registry_addrs,
ensure_exists=False, ensure_exists=False,
) as addrs, ) as addrs,
): ):
assert actor is tractor.current_actor()
yield ( yield (
tractor.current_actor(), actor,
addrs, addrs,
) )
else: else:
@ -268,28 +269,39 @@ async def maybe_open_pikerd(
# async with open_portal(chan) as arb_portal: # async with open_portal(chan) as arb_portal:
# yield arb_portal # yield arb_portal
registry_addrs = registry_addrs or [_default_reg_addr] registry_addrs: list[tuple[str, int]] = (
registry_addrs
or [_default_reg_addr]
)
pikerd_portal: tractor.Portal | None
async with ( async with (
open_piker_runtime( open_piker_runtime(
name=query_name, name=query_name,
registry_addrs=registry_addrs, registry_addrs=registry_addrs,
loglevel=loglevel, loglevel=loglevel,
**kwargs, **kwargs,
) as _, ) as (actor, addrs),
# try to attach to any existing (host-local) `pikerd`
tractor.find_actor( tractor.find_actor(
_root_dname, _root_dname,
registry_addrs=registry_addrs, registry_addrs=registry_addrs,
only_first=True, only_first=True,
) as portal # raise_on_none=True,
) as pikerd_portal,
): ):
# connect to any existing daemon presuming # connect to any existing remote daemon presuming its
# its registry socket was selected. # registry socket was selected.
if ( if pikerd_portal is not None:
portal is not None
): # sanity check that we are actually connecting to
yield portal # a remote process and not ourselves.
assert actor.uid != pikerd_portal.channel.uid
assert registry_addrs
yield pikerd_portal
return return
# presume pikerd role since no daemon could be found at # presume pikerd role since no daemon could be found at

View File

@ -102,7 +102,7 @@ async def open_registry(
not tractor.is_root_process() not tractor.is_root_process()
and not Registry.addrs and not Registry.addrs
): ):
Registry.addrs.extend(actor._reg_addrs) Registry.addrs.extend(actor.reg_addrs)
if ( if (
ensure_exists ensure_exists

View File

@ -139,6 +139,13 @@ class StorageClient(
... ...
class TimeseriesNotFound(Exception):
'''
No timeseries entry can be found for this backend.
'''
class StorageConnectionError(ConnectionError): class StorageConnectionError(ConnectionError):
''' '''
Can't connect to the desired tsdb subsys/service. Can't connect to the desired tsdb subsys/service.

View File

@ -140,19 +140,27 @@ def delete(
def anal( def anal(
fqme: str, fqme: str,
period: int = 60, period: int = 60,
pdb: bool = False,
) -> np.ndarray: ) -> np.ndarray:
'''
Anal-ysis is when you take the data do stuff to it, i think.
'''
async def main(): async def main():
async with ( async with (
open_piker_runtime( open_piker_runtime(
# are you a bear or boi?
'tsdb_polars_anal', 'tsdb_polars_anal',
debug_mode=True, debug_mode=pdb,
),
open_storage_client() as (
mod,
client,
), ),
open_storage_client() as (mod, client),
): ):
syms: list[str] = await client.list_keys() syms: list[str] = await client.list_keys()
print(f'{len(syms)} FOUND for {mod.name}') log.info(f'{len(syms)} FOUND for {mod.name}')
( (
history, history,

View File

@ -19,7 +19,8 @@
call a poor man's tsdb). call a poor man's tsdb).
AKA a `piker`-native file-system native "time series database" AKA a `piker`-native file-system native "time series database"
without needing an extra process and no standard TSDB features, YET! without needing an extra process and no standard TSDB features,
YET!
''' '''
# TODO: like there's soo much.. # TODO: like there's soo much..
@ -67,6 +68,7 @@ from piker import config
from piker.data import def_iohlcv_fields from piker.data import def_iohlcv_fields
from piker.data import ShmArray from piker.data import ShmArray
from piker.log import get_logger from piker.log import get_logger
from . import TimeseriesNotFound
log = get_logger('storage.nativedb') log = get_logger('storage.nativedb')
@ -228,8 +230,21 @@ class NativeStorageClient:
fqme, fqme,
timeframe, timeframe,
) )
except FileNotFoundError: except FileNotFoundError as fnfe:
return None
bs_fqme, _, *_ = fqme.rpartition('.')
possible_matches: list[str] = []
for tskey in self._index:
if bs_fqme in tskey:
possible_matches.append(tskey)
match_str: str = '\n'.join(sorted(possible_matches))
raise TimeseriesNotFound(
f'No entry for `{fqme}`?\n'
f'Maybe you need a more specific fqme-key like:\n\n'
f'{match_str}'
) from fnfe
times = array['time'] times = array['time']
return ( return (
@ -376,6 +391,8 @@ class NativeStorageClient:
# ... # ...
# TODO: does this need to be async on average?
# I guess for any IPC connected backend yes?
@acm @acm
async def get_client( async def get_client(
@ -393,7 +410,7 @@ async def get_client(
''' '''
datadir: Path = config.get_conf_dir() / 'nativedb' datadir: Path = config.get_conf_dir() / 'nativedb'
if not datadir.is_dir(): if not datadir.is_dir():
log.info(f'Creating `nativedb` director: {datadir}') log.info(f'Creating `nativedb` dir: {datadir}')
datadir.mkdir() datadir.mkdir()
client = NativeStorageClient(datadir) client = NativeStorageClient(datadir)

View File

@ -390,7 +390,7 @@ class FspAdmin:
complete: trio.Event, complete: trio.Event,
started: trio.Event, started: trio.Event,
fqme: str, fqme: str,
dst_fsp_flume: Flume, dst_flume: Flume,
conf: dict, conf: dict,
target: Fsp, target: Fsp,
loglevel: str, loglevel: str,
@ -408,16 +408,14 @@ class FspAdmin:
# chaining entrypoint # chaining entrypoint
cascade, cascade,
# TODO: can't we just drop this and expect
# far end to read the src flume's .mkt.fqme?
# data feed key # data feed key
fqme=fqme, fqme=fqme,
# TODO: pass `Flume.to_msg()`s here? src_flume_addr=self.flume.to_msg(),
# mems dst_flume_addr=dst_flume.to_msg(),
src_shm_token=self.flume.rt_shm.token, ns_path=ns_path, # edge-bind-func
dst_shm_token=dst_fsp_flume.rt_shm.token,
# target
ns_path=ns_path,
loglevel=loglevel, loglevel=loglevel,
zero_on_step=conf.get('zero_on_step', False), zero_on_step=conf.get('zero_on_step', False),
@ -431,14 +429,14 @@ class FspAdmin:
ctx.open_stream() as stream, ctx.open_stream() as stream,
): ):
dst_fsp_flume.stream: tractor.MsgStream = stream dst_flume.stream: tractor.MsgStream = stream
# register output data # register output data
self._registry[ self._registry[
(fqme, ns_path) (fqme, ns_path)
] = ( ] = (
stream, stream,
dst_fsp_flume.rt_shm, dst_flume.rt_shm,
complete complete
) )
@ -515,7 +513,7 @@ class FspAdmin:
broker='piker', broker='piker',
_atype='fsp', _atype='fsp',
) )
dst_fsp_flume = Flume( dst_flume = Flume(
mkt=mkt, mkt=mkt,
_rt_shm_token=dst_shm.token, _rt_shm_token=dst_shm.token,
first_quote={}, first_quote={},
@ -543,13 +541,13 @@ class FspAdmin:
complete, complete,
started, started,
fqme, fqme,
dst_fsp_flume, dst_flume,
conf, conf,
target, target,
loglevel, loglevel,
) )
return dst_fsp_flume, started return dst_flume, started
async def open_fsp_chart( async def open_fsp_chart(
self, self,