Optionally load `MktPair` in `Flume`s

pre_overruns_ctxcancelled
Tyler Goodlet 2023-03-21 14:21:36 -04:00
parent 8b0aead72e
commit 52de60c7ee
1 changed files with 23 additions and 12 deletions

View File

@ -22,7 +22,7 @@ real-time data processing data-structures.
""" """
from __future__ import annotations from __future__ import annotations
from decimal import Decimal # from decimal import Decimal
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
) )
@ -193,19 +193,30 @@ class Flume(Struct):
return msg return msg
@classmethod @classmethod
def from_msg(cls, msg: dict) -> dict: def from_msg(
cls,
msg: dict,
) -> dict:
'''
Load from an IPC msg presumably in either `dict` or
`msgspec.Struct` form.
'''
sym_msg = msg.pop('symbol')
if 'dst' in sym_msg:
mkt = MktPair.from_msg(sym_msg)
else:
# XXX NOTE: ``msgspec`` can encode `Decimal` # XXX NOTE: ``msgspec`` can encode `Decimal`
# but it doesn't decide to it by default since # but it doesn't decide to it by default since
# we aren't spec-cing these msgs as structs... # we aren't spec-cing these msgs as structs, SO
symbol = Symbol(**msg.pop('symbol')) # we have to ensure we do a struct type case (which `.copy()`
symbol.tick_size = Decimal(symbol.tick_size) # does) to ensure we get the right type!
symbol.lot_tick_size = Decimal(symbol.lot_tick_size) mkt = Symbol(**sym_msg).copy()
return cls( return cls(symbol=mkt, **msg)
symbol=symbol,
**msg,
)
def get_index( def get_index(
self, self,