Fix the drunk fix
This should finally be correct fsp src-to-dst array syncing now.. There's a few edge cases but mostly we need to be sure we sync both back-filled history diffs and avoid current step lag/leads. Use a polling routine and the more stringent task re-spawn system to get this right.fsp_drunken_alignment
							parent
							
								
									086aaf1d16
								
							
						
					
					
						commit
						3dd82c8d31
					
				| 
						 | 
					@ -148,24 +148,27 @@ async def fsp_compute(
 | 
				
			||||||
        # import time
 | 
					        # import time
 | 
				
			||||||
        # last = time.time()
 | 
					        # last = time.time()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # rt stream
 | 
					        try:
 | 
				
			||||||
        async for processed in out_stream:
 | 
					            # rt stream
 | 
				
			||||||
 | 
					            async for processed in out_stream:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            log.debug(f"{func_name}: {processed}")
 | 
					                log.debug(f"{func_name}: {processed}")
 | 
				
			||||||
            index = src.index
 | 
					                index = src.index
 | 
				
			||||||
            dst.array[-1][func_name] = processed
 | 
					                dst.array[-1][func_name] = processed
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # NOTE: for now we aren't streaming this to the consumer
 | 
					                # NOTE: for now we aren't streaming this to the consumer
 | 
				
			||||||
            # stream latest array index entry which basically just acts
 | 
					                # stream latest array index entry which basically just acts
 | 
				
			||||||
            # as trigger msg to tell the consumer to read from shm
 | 
					                # as trigger msg to tell the consumer to read from shm
 | 
				
			||||||
            if attach_stream:
 | 
					                if attach_stream:
 | 
				
			||||||
                await stream.send(index)
 | 
					                    await stream.send(index)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # period = time.time() - last
 | 
					                # period = time.time() - last
 | 
				
			||||||
            # hz = 1/period if period else float('nan')
 | 
					                # hz = 1/period if period else float('nan')
 | 
				
			||||||
            # if hz > 60:
 | 
					                # if hz > 60:
 | 
				
			||||||
            #     log.info(f'FSP quote too fast: {hz}')
 | 
					                #     log.info(f'FSP quote too fast: {hz}')
 | 
				
			||||||
            # last = time.time()
 | 
					                # last = time.time()
 | 
				
			||||||
 | 
					        finally:
 | 
				
			||||||
 | 
					            tracker.complete.set()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@tractor.context
 | 
					@tractor.context
 | 
				
			||||||
| 
						 | 
					@ -217,7 +220,7 @@ async def cascade(
 | 
				
			||||||
        profiler(f'{func_name}: feed up')
 | 
					        profiler(f'{func_name}: feed up')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        assert src.token == feed.shm.token
 | 
					        assert src.token == feed.shm.token
 | 
				
			||||||
        last_len = new_len = len(src.array)
 | 
					        # last_len = new_len = len(src.array)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        async with (
 | 
					        async with (
 | 
				
			||||||
            ctx.open_stream() as stream,
 | 
					            ctx.open_stream() as stream,
 | 
				
			||||||
| 
						 | 
					@ -249,9 +252,16 @@ async def cascade(
 | 
				
			||||||
            await ctx.started(index)
 | 
					            await ctx.started(index)
 | 
				
			||||||
            profiler(f'{func_name}: fsp up')
 | 
					            profiler(f'{func_name}: fsp up')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            async def resync(tracker: TaskTracker) -> tuple[TaskTracker, int]:
 | 
				
			||||||
 | 
					                # TODO: adopt an incremental update engine/approach
 | 
				
			||||||
 | 
					                # where possible here eventually!
 | 
				
			||||||
 | 
					                log.warning(f're-syncing fsp {func_name} to source')
 | 
				
			||||||
 | 
					                tracker.cs.cancel()
 | 
				
			||||||
 | 
					                await tracker.complete.wait()
 | 
				
			||||||
 | 
					                return await n.start(fsp_target)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # Increment the underlying shared memory buffer on every
 | 
					            # Increment the underlying shared memory buffer on every
 | 
				
			||||||
            # "increment" msg received from the underlying data feed.
 | 
					            # "increment" msg received from the underlying data feed.
 | 
				
			||||||
 | 
					 | 
				
			||||||
            async with feed.index_stream() as stream:
 | 
					            async with feed.index_stream() as stream:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                profiler(f'{func_name}: sample stream up')
 | 
					                profiler(f'{func_name}: sample stream up')
 | 
				
			||||||
| 
						 | 
					@ -263,23 +273,44 @@ async def cascade(
 | 
				
			||||||
                    # array has been updated such that we compute
 | 
					                    # array has been updated such that we compute
 | 
				
			||||||
                    # new history from the (prepended) source.
 | 
					                    # new history from the (prepended) source.
 | 
				
			||||||
                    diff = src.index - dst.index
 | 
					                    diff = src.index - dst.index
 | 
				
			||||||
                    new_len = len(src.array)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    # XXX: ok no idea why this works but "drunk fix"
 | 
					                    # new_len = len(src.array)
 | 
				
			||||||
                    # says it don't matter.
 | 
					
 | 
				
			||||||
 | 
					                    async def poll_and_sync_to_step(tracker):
 | 
				
			||||||
 | 
					                        diff = src.index - dst.index
 | 
				
			||||||
 | 
					                        while True:
 | 
				
			||||||
 | 
					                            if diff in (0, 1):
 | 
				
			||||||
 | 
					                                break
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                            tracker, index = await resync(tracker)
 | 
				
			||||||
 | 
					                            diff = src.index - dst.index
 | 
				
			||||||
 | 
					                            # log.info(
 | 
				
			||||||
 | 
					                            #     '\n'.join((
 | 
				
			||||||
 | 
					                            #         f'history index after sync: {index}',
 | 
				
			||||||
 | 
					                            #         f'diff after sync: {diff}',
 | 
				
			||||||
 | 
					                            #     ))
 | 
				
			||||||
 | 
					                            # )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                        return tracker, diff
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    # log.debug(f'diff {diff}')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    if (
 | 
					                    if (
 | 
				
			||||||
                        new_len > last_len + 1 or
 | 
					                        # the source is likely backfilling and we must
 | 
				
			||||||
                        abs(diff) > 1
 | 
					                        # sync history calculations
 | 
				
			||||||
                    ):
 | 
					                        abs(len(src.array) - len(dst.array)) > 0 or
 | 
				
			||||||
                        # TODO: adopt an incremental update engine/approach
 | 
					 | 
				
			||||||
                        # where possible here eventually!
 | 
					 | 
				
			||||||
                        log.warning(f're-syncing fsp {func_name} to source')
 | 
					 | 
				
			||||||
                        tracker.cs.cancel()
 | 
					 | 
				
			||||||
                        await tracker.complete.wait()
 | 
					 | 
				
			||||||
                        tracker, index = await n.start(fsp_target)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        # skip adding a new bar since we should be fully aligned.
 | 
					                        # we aren't step synced to the source and may be
 | 
				
			||||||
                        continue
 | 
					                        # leading/lagging by a step
 | 
				
			||||||
 | 
					                        diff > 1 or
 | 
				
			||||||
 | 
					                        diff < 0
 | 
				
			||||||
 | 
					                    ):
 | 
				
			||||||
 | 
					                        tracker, diff = await poll_and_sync_to_step(tracker)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                        # skip adding a last bar since we should be
 | 
				
			||||||
 | 
					                        # source alinged
 | 
				
			||||||
 | 
					                        if diff == 0:
 | 
				
			||||||
 | 
					                            continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    # read out last shm row, copy and write new row
 | 
					                    # read out last shm row, copy and write new row
 | 
				
			||||||
                    array = dst.array
 | 
					                    array = dst.array
 | 
				
			||||||
| 
						 | 
					@ -292,4 +323,3 @@ async def cascade(
 | 
				
			||||||
                        last = array[-1:].copy()
 | 
					                        last = array[-1:].copy()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    dst.push(last)
 | 
					                    dst.push(last)
 | 
				
			||||||
                    last_len = new_len
 | 
					 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue