aio_open_interest_feed_relay
							parent
							
								
									7d18fe6a54
								
							
						
					
					
						commit
						81a2d35d42
					
				|  | @ -769,6 +769,68 @@ async def maybe_open_price_feed( | ||||||
|             yield feed |             yield feed | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | async def aio_open_interest_feed_relay( | ||||||
|  |     fh: FeedHandler, | ||||||
|  |     instruments: list, | ||||||
|  |     from_trio: asyncio.Queue, | ||||||
|  |     to_trio: trio.abc.SendChannel, | ||||||
|  | ) -> None: | ||||||
|  |     async def _trade( | ||||||
|  |         trade: Trade,  # cryptofeed, NOT ours from `.venues`! | ||||||
|  |         receipt_timestamp: int, | ||||||
|  |     ) -> None: | ||||||
|  |         ''' | ||||||
|  |         Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side. | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|  |         # Get timestamp and convert it to isoformat | ||||||
|  |         date = (datetime.utcfromtimestamp(trade.timestamp)).isoformat() | ||||||
|  |         print('Trade...') | ||||||
|  |         print(date) | ||||||
|  |         print(trade) | ||||||
|  |         print('=======================') | ||||||
|  |         to_trio.send_nowait(('trade', trade)) | ||||||
|  | 
 | ||||||
|  | 	# trade and oi are user defined functions that | ||||||
|  | 	# will be called when trade and open interest updates are received | ||||||
|  | 	# data type is not dict, is an object: cryptofeed.types.OpenINterest | ||||||
|  |     async def _oi( | ||||||
|  |         oi: OpenInterest, | ||||||
|  |         receipt_timestamp: int, | ||||||
|  |     ) -> None: | ||||||
|  |         ''' | ||||||
|  |         Proxy-thru `cryptofeed.FeedHandler` "oi" to `piker`-side. | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|  |         # Get timestamp and convert it to isoformat | ||||||
|  |         date = (datetime.utcfromtimestamp(oi.timestamp)).isoformat() | ||||||
|  |         print('>>>> Open Interest...') | ||||||
|  |         print(date) | ||||||
|  |         print(oi) | ||||||
|  |         print('==========================') | ||||||
|  |         to_trio.send_nowait(('oi', oi)) | ||||||
|  | 
 | ||||||
|  |     callbacks = {TRADES: _trade, OPEN_INTEREST: _oi} | ||||||
|  |     fh.add_feed( | ||||||
|  |         DERIBIT, | ||||||
|  |         channels=[TRADES, OPEN_INTEREST], | ||||||
|  |         symbols=instruments, | ||||||
|  |         callbacks=callbacks | ||||||
|  |     ) | ||||||
|  | 
 | ||||||
|  |     if not fh.running: | ||||||
|  |         fh.run( | ||||||
|  |             start_loop=False, | ||||||
|  |             install_signal_handlers=False | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |     # sync with trio | ||||||
|  |     to_trio.send_nowait(None) | ||||||
|  | 
 | ||||||
|  |     # run until cancelled | ||||||
|  |     await asyncio.sleep(float('inf')) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| # TODO, move all to `.broker` submod! | # TODO, move all to `.broker` submod! | ||||||
| # async def aio_order_feed_relay( | # async def aio_order_feed_relay( | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue