Add actor wide client ignore set, increase history retreival to 24 requests

vlm_plotz_backup
Tyler Goodlet 2021-09-23 09:41:53 -04:00
parent eca9b14cd6
commit 1416d4e6ac
1 changed files with 95 additions and 61 deletions

View File

@ -34,6 +34,7 @@ import asyncio
from pprint import pformat from pprint import pformat
import inspect import inspect
import logging import logging
import platform
from random import randint from random import randint
import time import time
@ -684,7 +685,8 @@ _try_ports = [
# TODO: remove the randint stuff and use proper error checking in client # TODO: remove the randint stuff and use proper error checking in client
# factor below.. # factor below..
_client_ids = itertools.count(randint(1, 100)) _client_ids = itertools.count(randint(1, 100))
_client_cache = {} _client_cache: dict[tuple[str, int], Client] = {}
_scan_ignore: set[tuple[str, int]] = set()
def get_config() -> dict[str, Any]: def get_config() -> dict[str, Any]:
@ -718,8 +720,7 @@ async def load_aio_clients(
TODO: consider doing this with a ctx mngr eventually? TODO: consider doing this with a ctx mngr eventually?
''' '''
global _accounts2clients global _accounts2clients, _client_cache, _scan_ignore
global _client_cache
conf = get_config() conf = get_config()
ib = None ib = None
@ -750,12 +751,14 @@ async def load_aio_clients(
) )
order = ports.pop('order', None) order = ports.pop('order', None)
if order: if order:
log.warning(f'`ports.order` section in `brokers.toml` is deprecated') log.warning('`ports.order` section in `brokers.toml` is deprecated')
accounts_def = config.load_accounts(['ib']) accounts_def = config.load_accounts(['ib'])
try_ports = list(ports.values()) try_ports = list(ports.values())
ports = try_ports if port is None else [port] ports = try_ports if port is None else [port]
we_connected = [] we_connected = []
connect_timeout = 0.5 if platform.system() != 'Windows' else 1
combos = list(itertools.product(hosts, ports))
# allocate new and/or reload disconnected but cached clients # allocate new and/or reload disconnected but cached clients
try: try:
@ -767,72 +770,86 @@ async def load_aio_clients(
# (re)load any and all clients that can be found # (re)load any and all clients that can be found
# from connection details in ``brokers.toml``. # from connection details in ``brokers.toml``.
combos = list(itertools.product(hosts, ports))
for host, port in combos: for host, port in combos:
client = _client_cache.get((host, port))
sockaddr = (host, port)
client = _client_cache.get(sockaddr)
accounts_found: dict[str, Client] = {} accounts_found: dict[str, Client] = {}
if not client or not client.ib.isConnected():
try:
ib = NonShittyIB()
# if this is a persistent brokerd, try to allocate if (
# a new id for each client client and client.ib.isConnected() or
client_id = next(_client_ids) sockaddr in _scan_ignore
):
continue
log.info(f"Connecting to the EYEBEE on port {port}!") try:
await ib.connectAsync( ib = NonShittyIB()
host,
port,
clientId=client_id,
timeout=0.5,
)
# create and cache client # if this is a persistent brokerd, try to allocate
client = Client(ib) # a new id for each client
client_id = next(_client_ids)
# Pre-collect all accounts available for this await ib.connectAsync(
# connection and map account names to this client host,
# instance. port,
pps = ib.positions() clientId=client_id,
if pps:
for pp in pps:
accounts_found[
accounts_def.inverse[pp.account]
] = client
# if there are no positions or accounts # this timeout is sensative on windows and will
# without positions we should still register # fail without a good "timeout error" so be
# them for this client # careful.
for value in ib.accountValues(): timeout=connect_timeout,
acct = value.account )
if acct not in accounts_found:
accounts_found[
accounts_def.inverse[acct]
] = client
log.info( # create and cache client
f'Loaded accounts for client @ {host}:{port}\n' client = Client(ib)
f'{pformat(accounts_found)}'
)
# update all actor-global caches # Pre-collect all accounts available for this
log.info(f"Caching client for {(host, port)}") # connection and map account names to this client
_client_cache[(host, port)] = client # instance.
we_connected.append(client) pps = ib.positions()
_accounts2clients.update(accounts_found) if pps:
for pp in pps:
accounts_found[
accounts_def.inverse[pp.account]
] = client
except ( # if there are no positions or accounts
ConnectionRefusedError, # without positions we should still register
# them for this client
for value in ib.accountValues():
acct = value.account
if acct not in accounts_found:
accounts_found[
accounts_def.inverse[acct]
] = client
# TODO: if trying to scan for remote api clients log.info(
# pretty sure we need to catch this, though it f'Loaded accounts for client @ {host}:{port}\n'
# definitely needs a shorter timeout since it hangs f'{pformat(accounts_found)}'
# for like 5s.. )
asyncio.exceptions.TimeoutError,
OSError, # update all actor-global caches
) as ce: log.info(f"Caching client for {(host, port)}")
_err = ce _client_cache[(host, port)] = client
log.warning(f'Failed to connect on {port}') we_connected.append(client)
_accounts2clients.update(accounts_found)
except (
ConnectionRefusedError,
# TODO: if trying to scan for remote api clients
# pretty sure we need to catch this, though it
# definitely needs a shorter timeout since it hangs
# for like 5s..
asyncio.exceptions.TimeoutError,
OSError,
) as ce:
_err = ce
log.warning(f'Failed to connect on {port}')
# cache logic to avoid rescanning if we already have all
# clients loaded.
_scan_ignore.add(sockaddr)
else: else:
if not _client_cache: if not _client_cache:
raise ConnectionError( raise ConnectionError(
@ -1082,10 +1099,16 @@ async def get_bars(
async def backfill_bars( async def backfill_bars(
sym: str, sym: str,
shm: ShmArray, # type: ignore # noqa shm: ShmArray, # type: ignore # noqa
# count: int = 20, # NOTE: any more and we'll overrun underlying buffer
count: int = 6, # NOTE: any more and we'll overrun the underlying buffer # TODO: we want to avoid overrunning the underlying shm array buffer
# and we should probably calc the number of calls to make depending
# on that until we have the `marketstore` daemon in place in which
# case the shm size will be driven by user config and available sys
# memory.
count: int = 24,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -1121,6 +1144,17 @@ async def backfill_bars(
continue continue
bars, bars_array, next_dt = out bars, bars_array, next_dt = out
# volume cleaning since there's -ve entries,
# wood luv to know what crookery that is..
vlm = bars_array['volume']
vlm[vlm < 0] = 0
# TODO we should probably dig into forums to see what peeps
# think this data "means" and then use it as an indicator of
# sorts? dinkus has mentioned that $vlms for the day dont'
# match other platforms nor the summary stat tws shows in
# the monitor - it's probably worth investigating.
shm.push(bars_array, prepend=True) shm.push(bars_array, prepend=True)
i += 1 i += 1