piker/.claude/skills/timeseries_numpy_polars_opt...

11 KiB
Raw Permalink Blame History

Timeseries Optimization: NumPy & Polars

Skill for high-performance timeseries processing using NumPy and Polars, with focus on patterns common in financial/trading applications.

Core Principle: Vectorization Over Iteration

Never write Python loops over large arrays. Always look for vectorized alternatives.

# BAD: Python loop (slow!)
results = []
for i in range(len(array)):
    if array['time'][i] == target_time:
        results.append(array[i])

# GOOD: vectorized boolean indexing (fast!)
results = array[array['time'] == target_time]

NumPy Structured Arrays

Piker uses structured arrays for OHLCV data:

# typical piker array dtype
dtype = [
    ('index', 'i8'),   # absolute sequence index
    ('time', 'f8'),    # unix epoch timestamp
    ('open', 'f8'),
    ('high', 'f8'),
    ('low', 'f8'),
    ('close', 'f8'),
    ('volume', 'f8'),
]

arr = np.array([(0, 1234.0, 100, 101, 99, 100.5, 1000)],
               dtype=dtype)

# field access
times = arr['time']     # returns view, not copy
closes = arr['close']

Structured Array Performance Gotchas

1. Field access in loops is slow

# BAD: repeated struct field access per iteration
for i, row in enumerate(arr):
    x = row['index']    # struct access per iteration!
    y = row['close']
    process(x, y)

# GOOD: extract fields once, iterate plain arrays
indices = arr['index']  # extract once
closes = arr['close']
for i in range(len(arr)):
    x = indices[i]      # plain array indexing
    y = closes[i]
    process(x, y)

2. Dict comprehensions with struct arrays

# SLOW: field access per row in Python loop
time_to_row = {
    float(row['time']): {
        'index': float(row['index']),
        'close': float(row['close']),
    }
    for row in matched_rows  # struct field access!
}

# FAST: extract to plain arrays first
times = matched_rows['time'].astype(float)
indices = matched_rows['index'].astype(float)
closes = matched_rows['close'].astype(float)

time_to_row = {
    t: {'index': idx, 'close': cls}
    for t, idx, cls in zip(times, indices, closes)
}

Timestamp Lookup Patterns

Linear Scan (O(n)) - Avoid!

# BAD: O(n) scan through entire array
for target_ts in timestamps:  # m iterations
    matches = array[array['time'] == target_ts]  # O(n) scan
    # Total: O(m * n) - catastrophic for large datasets!

Performance: - 1000 lookups × 10k array = 10M comparisons - Timing: ~50-100ms for 1k lookups

Binary Search (O(log n)) - Good!

# GOOD: O(m log n) using searchsorted
import numpy as np

time_arr = array['time']  # extract once
ts_array = np.array(timestamps)

# binary search for all timestamps at once
indices = np.searchsorted(time_arr, ts_array)

# bounds check and exact match verification
valid_mask = (
    (indices < len(array))
    &
    (time_arr[indices] == ts_array)
)

valid_indices = indices[valid_mask]
matched_rows = array[valid_indices]

Requirements for searchsorted(): - Input array MUST be sorted (ascending by default) - Works on any sortable dtype (floats, ints, etc) - Returns insertion indices (not found = len(array))

Performance: - 1000 lookups × 10k array = ~10k comparisons - Timing: <1ms for 1k lookups - ~100-1000x faster than linear scan

Hash Table (O(1)) - Best for Multiple Lookups!

If youll do many lookups on same array, build dict once:

# build lookup once
time_to_idx = {
    float(array['time'][i]): i
    for i in range(len(array))
}

# O(1) lookups
for target_ts in timestamps:
    idx = time_to_idx.get(target_ts)
    if idx is not None:
        row = array[idx]

When to use: - Many repeated lookups on same array - Array doesnt change between lookups - Can afford upfront dict building cost

Vectorized Boolean Operations

Basic Filtering

# single condition
recent = array[array['time'] > cutoff_time]

# multiple conditions with &, |
filtered = array[
    (array['time'] > start_time)
    &
    (array['time'] < end_time)
    &
    (array['volume'] > min_volume)
]

# IMPORTANT: parentheses required around each condition!
# (operator precedence: & binds tighter than >)

Fancy Indexing

# boolean mask
mask = array['close'] > array['open']  # up bars
up_bars = array[mask]

# integer indices
indices = np.array([0, 5, 10, 15])
selected = array[indices]

# combine boolean + fancy indexing
mask = array['volume'] > threshold
high_vol_indices = np.where(mask)[0]
subset = array[high_vol_indices[::2]]  # every other

Common Financial Patterns

Gap Detection

# assume sorted by time
time_diffs = np.diff(array['time'])
expected_step = 60.0  # 1-minute bars

# find gaps larger than expected
gap_mask = time_diffs > (expected_step * 1.5)
gap_indices = np.where(gap_mask)[0]

# get gap start/end times
gap_starts = array['time'][gap_indices]
gap_ends = array['time'][gap_indices + 1]

Rolling Window Operations

# simple moving average (close)
window = 20
sma = np.convolve(
    array['close'],
    np.ones(window) / window,
    mode='valid',
)

# alternatively, use stride tricks for efficiency
from numpy.lib.stride_tricks import sliding_window_view
windows = sliding_window_view(array['close'], window)
sma = windows.mean(axis=1)

OHLC Resampling (NumPy)

# resample 1m bars to 5m bars
def resample_ohlc(arr, old_step, new_step):
    n_bars = len(arr)
    factor = int(new_step / old_step)

    # truncate to multiple of factor
    n_complete = (n_bars // factor) * factor
    arr = arr[:n_complete]

    # reshape into chunks
    reshaped = arr.reshape(-1, factor)

    # aggregate OHLC
    opens = reshaped[:, 0]['open']
    highs = reshaped['high'].max(axis=1)
    lows = reshaped['low'].min(axis=1)
    closes = reshaped[:, -1]['close']
    volumes = reshaped['volume'].sum(axis=1)

    return np.rec.fromarrays(
        [opens, highs, lows, closes, volumes],
        names=['open', 'high', 'low', 'close', 'volume'],
    )

Polars Integration

Piker is transitioning to Polars for some operations.

NumPy ↔︎ Polars Conversion

import polars as pl

# numpy to polars
df = pl.from_numpy(
    arr,
    schema=['index', 'time', 'open', 'high', 'low', 'close', 'volume'],
)

# polars to numpy (via arrow)
arr = df.to_numpy()

# piker convenience
from piker.tsp import np2pl, pl2np
df = np2pl(arr)
arr = pl2np(df)

Polars Performance Patterns

Lazy evaluation:

# build query lazily
lazy_df = (
    df.lazy()
    .filter(pl.col('volume') > 1000)
    .with_columns([
        (pl.col('close') - pl.col('open')).alias('change')
    ])
    .sort('time')
)

# execute once
result = lazy_df.collect()

Groupby aggregations:

# resample to 5-minute bars
resampled = df.groupby_dynamic(
    index_column='time',
    every='5m',
).agg([
    pl.col('open').first(),
    pl.col('high').max(),
    pl.col('low').min(),
    pl.col('close').last(),
    pl.col('volume').sum(),
])

When to Use Polars vs NumPy

Use Polars when: - Complex queries with multiple filters/joins - Need SQL-like operations (groupby, window functions) - Working with heterogeneous column types - Want lazy evaluation optimization

Use NumPy when: - Simple array operations (indexing, slicing) - Direct memory access needed (e.g., SHM arrays) - Compatibility with Qt/pyqtgraph (expects NumPy) - Maximum performance for numerical computation

Memory Considerations

Views vs Copies

# VIEW: shares memory (fast, no copy)
times = array['time']         # field access
subset = array[10:20]         # slicing
reshaped = array.reshape(-1, 2)

# COPY: new memory allocation
filtered = array[array['time'] > cutoff]  # boolean indexing
sorted_arr = np.sort(array)               # sorting
casted = array.astype(np.float32)         # type conversion

# force copy when needed
explicit_copy = array.copy()

In-Place Operations

# modify in-place (no new allocation)
array['close'] *= 1.01  # scale prices
array['volume'][mask] = 0  # zero out specific rows

# careful: compound operations may create temporaries
array['close'] = array['close'] * 1.01  # creates temp!
array['close'] *= 1.01  # true in-place

Performance Checklist

When optimizing timeseries operations:

  • Is the array sorted? (enables binary search)
  • Are you doing repeated lookups? (build hash table)
  • Are struct fields accessed in loops? (extract to plain arrays)
  • Are you using boolean indexing? (vectorized vs loop)
  • Can operations be batched? (minimize round-trips)
  • Is memory being copied unnecessarily? (use views)
  • Are you using the right tool? (NumPy vs Polars)

Common Bottlenecks and Fixes

Bottleneck: Timestamp Lookups

# BEFORE: O(n*m) - 100ms for 1k lookups
for ts in timestamps:
    matches = array[array['time'] == ts]

# AFTER: O(m log n) - <1ms for 1k lookups
indices = np.searchsorted(array['time'], timestamps)

Bottleneck: Dict Building from Struct Array

# BEFORE: 100ms for 3k rows
result = {
    float(row['time']): {
        'index': float(row['index']),
        'close': float(row['close']),
    }
    for row in matched_rows
}

# AFTER: <5ms for 3k rows
times = matched_rows['time'].astype(float)
indices = matched_rows['index'].astype(float)
closes = matched_rows['close'].astype(float)

result = {
    t: {'index': idx, 'close': cls}
    for t, idx, cls in zip(times, indices, closes)
}

Bottleneck: Repeated Field Access

# BEFORE: 50ms for 1k iterations
for i, spec in enumerate(specs):
    start_row = array[array['time'] == spec['start_time']][0]
    end_row = array[array['time'] == spec['end_time']][0]
    process(start_row['index'], end_row['close'])

# AFTER: <5ms for 1k iterations
# 1. Build lookup once
time_to_row = {...}  # via searchsorted

# 2. Extract fields to plain arrays beforehand
indices_arr = array['index']
closes_arr = array['close']

# 3. Use lookup + plain array indexing
for spec in specs:
    start_idx = time_to_row[spec['start_time']]['array_idx']
    end_idx = time_to_row[spec['end_time']]['array_idx']
    process(indices_arr[start_idx], closes_arr[end_idx])

References

Skill Maintenance

Update when: - New vectorization patterns discovered - Performance bottlenecks identified - Polars migration patterns emerge - NumPy best practices evolve


Last updated: 2026-01-31 Session: Batch gap annotation optimization Key win: 100ms → 5ms dict building via field extraction