Add example mtr prober from `mtrpacket`
Started rejigging example code from this example to use more modern `asyncio` APIs: https://github.com/matt-kimball/mtr-packet-python/blob/master/examples/trace-concurrent.py Relates to #330account_tests
parent
f66a1f8b23
commit
78178c2fb7
|
@ -0,0 +1,196 @@
|
||||||
|
import asyncio
|
||||||
|
import curses
|
||||||
|
import itertools
|
||||||
|
import sys
|
||||||
|
|
||||||
|
import mtrpacket
|
||||||
|
|
||||||
|
#
|
||||||
|
# ProbeRecord keeps a record of round-trip times of probes and repsonder
|
||||||
|
# IP addresses, for a particular time-to-live (TTL) value.
|
||||||
|
#
|
||||||
|
# There may be multiple IP addresses for one particular TTL value,
|
||||||
|
# because some networks have multiple equally weighted routes.
|
||||||
|
#
|
||||||
|
class ProbeRecord:
|
||||||
|
def __init__(self, ttl):
|
||||||
|
self.ttl = ttl
|
||||||
|
self.success = False
|
||||||
|
self.ip_addrs = []
|
||||||
|
self.probe_times = []
|
||||||
|
|
||||||
|
# Format the information about this line for display
|
||||||
|
def print(self, screen):
|
||||||
|
line = '{:>2}. '.format(self.ttl)
|
||||||
|
|
||||||
|
if self.ip_addrs:
|
||||||
|
line += '{:42}'.format(self.ip_addrs[0])
|
||||||
|
else:
|
||||||
|
line += '{:42}'.format(' ???')
|
||||||
|
|
||||||
|
for time in self.probe_times:
|
||||||
|
if time is None:
|
||||||
|
line += ' *'
|
||||||
|
else:
|
||||||
|
line += ' {:>7.3f}ms'.format(time)
|
||||||
|
|
||||||
|
# Use curses to display the line
|
||||||
|
screen.addstr(line + '\n')
|
||||||
|
|
||||||
|
# List IP addresses beyond the first
|
||||||
|
for addr in self.ip_addrs[1:]:
|
||||||
|
screen.addstr(' ' + addr + '\n')
|
||||||
|
|
||||||
|
|
||||||
|
# When we've got a result for one of our probes, we'll regenerate
|
||||||
|
# the screen output, and allow curses to refresh it.
|
||||||
|
def redraw(screen, hostname, all_records):
|
||||||
|
screen.erase()
|
||||||
|
|
||||||
|
screen.addstr('Tracing to "{}"\n\n'.format(hostname))
|
||||||
|
|
||||||
|
for record in all_records:
|
||||||
|
record.print(screen)
|
||||||
|
|
||||||
|
# If one of our probes has arrived at the destination IP,
|
||||||
|
# we don't need to display further hops
|
||||||
|
if record.success:
|
||||||
|
break
|
||||||
|
|
||||||
|
# screen.addstr('\n(press SPACEBAR to exit)\n')
|
||||||
|
|
||||||
|
screen.refresh()
|
||||||
|
|
||||||
|
|
||||||
|
# Perform multiple probes with a specific time to live (TTL) value
|
||||||
|
async def probe_ttl(
|
||||||
|
mtr,
|
||||||
|
hostname,
|
||||||
|
ttl,
|
||||||
|
record,
|
||||||
|
redraw_callback,
|
||||||
|
):
|
||||||
|
for _ in itertools.count():
|
||||||
|
result = await mtr.probe(hostname, ttl=ttl, timeout=6)
|
||||||
|
|
||||||
|
if result.success:
|
||||||
|
record.success = True
|
||||||
|
|
||||||
|
# Record the time of the latest probe
|
||||||
|
record.probe_times.append(result.time_ms)
|
||||||
|
|
||||||
|
addr = result.responder
|
||||||
|
# If the address of the responder isn't already in the list
|
||||||
|
# of addresses responding at this TTL, add it
|
||||||
|
if addr and addr not in record.ip_addrs:
|
||||||
|
record.ip_addrs.append(addr)
|
||||||
|
|
||||||
|
# Redraw the display, which will include this probe
|
||||||
|
redraw_callback()
|
||||||
|
|
||||||
|
# Wait a small amount of time before sending the next probe
|
||||||
|
# to get an independent sample of network conditions
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
|
||||||
|
|
||||||
|
# Launch all the probes for the trace.
|
||||||
|
# We'll use a separate coroutine (probe_ttl) for each ttl value,
|
||||||
|
# and those coroutines will run concurrently.
|
||||||
|
async def launch_probes(screen, hostname):
|
||||||
|
all_records = []
|
||||||
|
|
||||||
|
# When one of the probes has a result to display, we'll use
|
||||||
|
# this callback to display it
|
||||||
|
def redraw_hops():
|
||||||
|
redraw(screen, hostname, all_records)
|
||||||
|
|
||||||
|
async with mtrpacket.MtrPacket() as mtr:
|
||||||
|
probe_tasks = []
|
||||||
|
|
||||||
|
try:
|
||||||
|
for ttl in range(1, 32):
|
||||||
|
# We need a new ProbeRecord for each ttl value
|
||||||
|
record = ProbeRecord(ttl)
|
||||||
|
all_records.append(record)
|
||||||
|
|
||||||
|
# Start a new asyncio task for this probe
|
||||||
|
probe_coro = probe_ttl(
|
||||||
|
mtr,
|
||||||
|
hostname,
|
||||||
|
ttl,
|
||||||
|
record,
|
||||||
|
redraw_hops,
|
||||||
|
)
|
||||||
|
probe_tasks.append(asyncio.ensure_future(probe_coro))
|
||||||
|
|
||||||
|
# Give each probe a slight delay to avoid flooding
|
||||||
|
# the network interface, which might perturb the
|
||||||
|
# results
|
||||||
|
await asyncio.sleep(0.05)
|
||||||
|
|
||||||
|
await asyncio.gather(*probe_tasks)
|
||||||
|
finally:
|
||||||
|
# We may have been cancelled, so we should cancel
|
||||||
|
# the probe tasks we started to clean up
|
||||||
|
for task in probe_tasks:
|
||||||
|
task.cancel()
|
||||||
|
|
||||||
|
|
||||||
|
# Wait until a SPACE character to be read on stdin.
|
||||||
|
# Afterward, cancel the probe task so we can exit
|
||||||
|
# async def wait_for_spacebar(probe_task):
|
||||||
|
# exit_event = asyncio.Event()
|
||||||
|
|
||||||
|
# def read_callback():
|
||||||
|
# # Read a single character
|
||||||
|
# # If we tried to read more, we may block other tasks
|
||||||
|
# key = sys.stdin.read(1)
|
||||||
|
# if key == ' ':
|
||||||
|
# exit_event.set()
|
||||||
|
|
||||||
|
# loop = asyncio.get_event_loop()
|
||||||
|
# loop.add_reader(sys.stdin, read_callback)
|
||||||
|
# await exit_event.wait()
|
||||||
|
# loop.remove_reader(sys.stdin)
|
||||||
|
|
||||||
|
# # After spacebar is pressed, stop sending probes
|
||||||
|
# probe_task.cancel()
|
||||||
|
|
||||||
|
|
||||||
|
# The main asynchronous routine, running within the asyncio event loop
|
||||||
|
async def main_task(hostname):
|
||||||
|
screen = curses.initscr()
|
||||||
|
try:
|
||||||
|
probe_task = asyncio.ensure_future(
|
||||||
|
launch_probes(screen, hostname)
|
||||||
|
)
|
||||||
|
# spacebar_task = asyncio.ensure_future(wait_for_spacebar(probe_task))
|
||||||
|
|
||||||
|
try:
|
||||||
|
# await probe_task
|
||||||
|
await asyncio.gather(probe_task)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
# It is normal for probe_task to be cancelled by
|
||||||
|
# the spacebar task
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
# We need to clean up by cancelling if gather has returned
|
||||||
|
# early, perhaps due to an exception raised in one of
|
||||||
|
# our tasks.
|
||||||
|
probe_task.cancel()
|
||||||
|
# spacebar_task.cancel()
|
||||||
|
finally:
|
||||||
|
curses.endwin()
|
||||||
|
|
||||||
|
|
||||||
|
# Get the hostname to trace to on the commandline
|
||||||
|
if __name__ == '__main__':
|
||||||
|
|
||||||
|
host: str = '192.168.1.68'
|
||||||
|
if not len(sys.argv) > 1:
|
||||||
|
print('Usage: python3 trace-concurrent.py <hostname>')
|
||||||
|
sys.exit(1)
|
||||||
|
else:
|
||||||
|
host: str = sys.argv[1]
|
||||||
|
|
||||||
|
out = asyncio.run(main_task(host))
|
Loading…
Reference in New Issue