Just warn on double-remove of a sub

async_hist_loading
Tyler Goodlet 2022-02-22 15:28:54 -05:00
parent 81f8b4e145
commit 11d4ebd0b5
1 changed files with 12 additions and 6 deletions

View File

@ -15,7 +15,9 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" """
Data buffers for fast shared humpy. Sampling and broadcast machinery for (soft) real-time delivery of
financial data flows.
""" """
import time import time
from typing import Dict, List from typing import Dict, List
@ -48,7 +50,8 @@ async def increment_ohlc_buffer(
delay_s: int, delay_s: int,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
): ):
"""Task which inserts new bars into the provide shared memory array '''
Task which inserts new bars into the provide shared memory array
every ``delay_s`` seconds. every ``delay_s`` seconds.
This task fulfills 2 purposes: This task fulfills 2 purposes:
@ -59,8 +62,8 @@ async def increment_ohlc_buffer(
Note that if **no** actor has initiated this task then **none** of Note that if **no** actor has initiated this task then **none** of
the underlying buffers will actually be incremented. the underlying buffers will actually be incremented.
"""
'''
# # wait for brokerd to signal we should start sampling # # wait for brokerd to signal we should start sampling
# await shm_incrementing(shm_token['shm_name']).wait() # await shm_incrementing(shm_token['shm_name']).wait()
@ -137,12 +140,12 @@ async def iter_ohlc_periods(
delay_s: int, delay_s: int,
) -> None: ) -> None:
""" '''
Subscribe to OHLC sampling "step" events: when the time Subscribe to OHLC sampling "step" events: when the time
aggregation period increments, this event stream emits an index aggregation period increments, this event stream emits an index
event. event.
""" '''
# add our subscription # add our subscription
global _subscribers global _subscribers
subs = _subscribers.setdefault(delay_s, []) subs = _subscribers.setdefault(delay_s, [])
@ -290,7 +293,10 @@ async def sample_and_broadcast(
# so far seems like no since this should all # so far seems like no since this should all
# be single-threaded. Doing it anyway though # be single-threaded. Doing it anyway though
# since there seems to be some kinda race.. # since there seems to be some kinda race..
subs.remove((stream, tick_throttle)) try:
subs.remove((stream, tick_throttle))
except ValueError:
log.error(f'{stream} was already removed from subs!?')
# TODO: a less naive throttler, here's some snippets: # TODO: a less naive throttler, here's some snippets: