Stop using as much closures

Use a custom tractor branch that fixes a `maybe_open_context` re entrant related bug
size_in_shm_token
Guillermo Rodriguez 2022-08-24 15:10:46 -03:00
parent 34fb497eb4
commit e97dd1cbdb
No known key found for this signature in database
GPG Key ID: EC3AB66D5D83B392
2 changed files with 101 additions and 97 deletions

View File

@ -557,69 +557,68 @@ async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
yield fh yield fh
async def aio_price_feed_relay(
fh: FeedHandler,
instrument: Symbol,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _trade(data: dict, receipt_timestamp):
to_trio.send_nowait(('trade', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'last': data,
'broker_ts': time.time(),
'data': data.to_dict(),
'receipt': receipt_timestamp
}))
async def _l1(data: dict, receipt_timestamp):
to_trio.send_nowait(('l1', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'ticks': [
{'type': 'bid',
'price': float(data.bid_price), 'size': float(data.bid_size)},
{'type': 'bsize',
'price': float(data.bid_price), 'size': float(data.bid_size)},
{'type': 'ask',
'price': float(data.ask_price), 'size': float(data.ask_size)},
{'type': 'asize',
'price': float(data.ask_price), 'size': float(data.ask_size)}
]
}))
fh.add_feed(
DERIBIT,
channels=[TRADES, L1_BOOK],
symbols=[piker_sym_to_cb_sym(instrument)],
callbacks={
TRADES: _trade,
L1_BOOK: _l1
})
if not fh.running:
fh.run(
start_loop=False,
install_signal_handlers=False)
# sync with trio
to_trio.send_nowait(None)
await asyncio.sleep(float('inf'))
@acm @acm
async def open_price_feed( async def open_price_feed(
instrument: str instrument: str) -> trio.abc.ReceiveStream:
) -> trio.abc.ReceiveStream:
# XXX: hangs when going into this ctx mngr
async with maybe_open_feed_handler() as fh: async with maybe_open_feed_handler() as fh:
async def relay(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _trade(data: dict, receipt_timestamp):
to_trio.send_nowait(('trade', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'last': data,
'broker_ts': time.time(),
'data': data.to_dict(),
'receipt': receipt_timestamp
}))
async def _l1(data: dict, receipt_timestamp):
to_trio.send_nowait(('l1', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'ticks': [
{'type': 'bid',
'price': float(data.bid_price), 'size': float(data.bid_size)},
{'type': 'bsize',
'price': float(data.bid_price), 'size': float(data.bid_size)},
{'type': 'ask',
'price': float(data.ask_price), 'size': float(data.ask_size)},
{'type': 'asize',
'price': float(data.ask_price), 'size': float(data.ask_size)}
]
}))
fh.add_feed(
DERIBIT,
channels=[TRADES, L1_BOOK],
symbols=[instrument],
callbacks={
TRADES: _trade,
L1_BOOK: _l1
})
if not fh.running:
fh.run(
start_loop=False,
install_signal_handlers=False)
# sync with trio
to_trio.send_nowait(None)
try:
await asyncio.sleep(float('inf'))
except asyncio.exceptions.CancelledError:
...
async with to_asyncio.open_channel_from( async with to_asyncio.open_channel_from(
relay partial(
aio_price_feed_relay,
fh,
instrument
)
) as (first, chan): ) as (first, chan):
yield chan yield chan
@ -642,51 +641,55 @@ async def maybe_open_price_feed(
else: else:
yield feed yield feed
async def aio_order_feed_relay(
fh: FeedHandler,
instrument: Symbol,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _fill(data: dict, receipt_timestamp):
breakpoint()
async def _order_info(data: dict, receipt_timestamp):
breakpoint()
fh.add_feed(
DERIBIT,
channels=[FILLS, ORDER_INFO],
symbols=[instrument.upper()],
callbacks={
FILLS: _fill,
ORDER_INFO: _order_info,
})
if not fh.running:
fh.run(
start_loop=False,
install_signal_handlers=False)
# sync with trio
to_trio.send_nowait(None)
await asyncio.sleep(float('inf'))
@acm @acm
async def open_order_feed( async def open_order_feed(
instrument: List[str] instrument: List[str]
) -> trio.abc.ReceiveStream: ) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh: async with maybe_open_feed_handler() as fh:
async def relay(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _fill(data: dict, receipt_timestamp):
breakpoint()
async def _order_info(data: dict, receipt_timestamp):
breakpoint()
fh.add_feed(
DERIBIT,
channels=[FILLS, ORDER_INFO],
symbols=[instrument],
callbacks={
FILLS: _fill,
ORDER_INFO: _order_info,
})
if not fh.running:
fh.run(
start_loop=False,
install_signal_handlers=False)
# sync with trio
to_trio.send_nowait(None)
try:
await asyncio.sleep(float('inf'))
except asyncio.exceptions.CancelledError:
...
async with to_asyncio.open_channel_from( async with to_asyncio.open_channel_from(
relay partial(
aio_order_feed_relay,
fh,
instrument
)
) as (first, chan): ) as (first, chan):
yield chan yield chan
@acm @acm
async def maybe_open_order_feed( async def maybe_open_order_feed(
instrument: str instrument: str
@ -696,7 +699,8 @@ async def maybe_open_order_feed(
async with maybe_open_context( async with maybe_open_context(
acm_func=open_order_feed, acm_func=open_order_feed,
kwargs={ kwargs={
'instrument': instrument 'instrument': instrument,
'fh': fh
}, },
key=f'{instrument}-order', key=f'{instrument}-order',
) as (cache_hit, feed): ) as (cache_hit, feed):

View File

@ -1,7 +1,7 @@
# we require a pinned dev branch to get some edge features that # we require a pinned dev branch to get some edge features that
# are often untested in tractor's CI and/or being tested by us # are often untested in tractor's CI and/or being tested by us
# first before committing as core features in tractor's base. # first before committing as core features in tractor's base.
-e git+https://github.com/goodboy/tractor.git@master#egg=tractor -e git+https://github.com/goodboy/tractor.git@reentrant_moc#egg=tractor
# `pyqtgraph` peeps keep breaking, fixing, improving so might as well # `pyqtgraph` peeps keep breaking, fixing, improving so might as well
# pin this to a dev branch that we have more control over especially # pin this to a dev branch that we have more control over especially