mirror of https://github.com/torvalds/linux.git
496 lines
17 KiB
Python
Executable File
496 lines
17 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause)
|
|
"""Interactive perf list."""
|
|
|
|
from abc import ABC, abstractmethod
|
|
import argparse
|
|
from dataclasses import dataclass
|
|
import math
|
|
from typing import Any, Dict, Optional, Tuple
|
|
import perf
|
|
from textual import on
|
|
from textual.app import App, ComposeResult
|
|
from textual.binding import Binding
|
|
from textual.containers import Horizontal, HorizontalGroup, Vertical, VerticalScroll
|
|
from textual.css.query import NoMatches
|
|
from textual.command import SearchIcon
|
|
from textual.screen import ModalScreen
|
|
from textual.widgets import Button, Footer, Header, Input, Label, Sparkline, Static, Tree
|
|
from textual.widgets.tree import TreeNode
|
|
|
|
|
|
def get_info(info: Dict[str, str], key: str):
|
|
return (info[key] + "\n") if key in info else ""
|
|
|
|
|
|
class TreeValue(ABC):
|
|
"""Abstraction for the data of value in the tree."""
|
|
|
|
@abstractmethod
|
|
def name(self) -> str:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def description(self) -> str:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def matches(self, query: str) -> bool:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def parse(self) -> perf.evlist:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float:
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class Metric(TreeValue):
|
|
"""A metric in the tree."""
|
|
metric_name: str
|
|
|
|
def name(self) -> str:
|
|
return self.metric_name
|
|
|
|
def description(self) -> str:
|
|
"""Find and format metric description."""
|
|
for metric in perf.metrics():
|
|
if metric["MetricName"] != self.metric_name:
|
|
continue
|
|
desc = get_info(metric, "BriefDescription")
|
|
desc += get_info(metric, "PublicDescription")
|
|
desc += get_info(metric, "MetricExpr")
|
|
desc += get_info(metric, "MetricThreshold")
|
|
return desc
|
|
return "description"
|
|
|
|
def matches(self, query: str) -> bool:
|
|
return query in self.metric_name
|
|
|
|
def parse(self) -> perf.evlist:
|
|
return perf.parse_metrics(self.metric_name)
|
|
|
|
def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float:
|
|
val = evlist.compute_metric(self.metric_name, cpu, thread)
|
|
return 0 if math.isnan(val) else val
|
|
|
|
|
|
@dataclass
|
|
class PmuEvent(TreeValue):
|
|
"""A PMU and event within the tree."""
|
|
pmu: str
|
|
event: str
|
|
|
|
def name(self) -> str:
|
|
if self.event.startswith(self.pmu) or ':' in self.event:
|
|
return self.event
|
|
else:
|
|
return f"{self.pmu}/{self.event}/"
|
|
|
|
def description(self) -> str:
|
|
"""Find and format event description for {pmu}/{event}/."""
|
|
for p in perf.pmus():
|
|
if p.name() != self.pmu:
|
|
continue
|
|
for info in p.events():
|
|
if "name" not in info or info["name"] != self.event:
|
|
continue
|
|
|
|
desc = get_info(info, "topic")
|
|
desc += get_info(info, "event_type_desc")
|
|
desc += get_info(info, "desc")
|
|
desc += get_info(info, "long_desc")
|
|
desc += get_info(info, "encoding_desc")
|
|
return desc
|
|
return "description"
|
|
|
|
def matches(self, query: str) -> bool:
|
|
return query in self.pmu or query in self.event
|
|
|
|
def parse(self) -> perf.evlist:
|
|
return perf.parse_events(self.name())
|
|
|
|
def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float:
|
|
return evsel.read(cpu, thread).val
|
|
|
|
|
|
class ErrorScreen(ModalScreen[bool]):
|
|
"""Pop up dialog for errors."""
|
|
|
|
CSS = """
|
|
ErrorScreen {
|
|
align: center middle;
|
|
}
|
|
"""
|
|
|
|
def __init__(self, error: str):
|
|
self.error = error
|
|
super().__init__()
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Button(f"Error: {self.error}", variant="primary", id="error")
|
|
|
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
self.dismiss(True)
|
|
|
|
|
|
class SearchScreen(ModalScreen[str]):
|
|
"""Pop up dialog for search."""
|
|
|
|
CSS = """
|
|
SearchScreen Horizontal {
|
|
align: center middle;
|
|
margin-top: 1;
|
|
}
|
|
SearchScreen Input {
|
|
width: 1fr;
|
|
}
|
|
"""
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Horizontal(SearchIcon(), Input(placeholder="Event name"))
|
|
|
|
def on_input_submitted(self, event: Input.Submitted) -> None:
|
|
"""Handle the user pressing Enter in the input field."""
|
|
self.dismiss(event.value)
|
|
|
|
|
|
class Counter(HorizontalGroup):
|
|
"""Two labels for a CPU and its counter value."""
|
|
|
|
CSS = """
|
|
Label {
|
|
gutter: 1;
|
|
}
|
|
"""
|
|
|
|
def __init__(self, cpu: int) -> None:
|
|
self.cpu = cpu
|
|
super().__init__()
|
|
|
|
def compose(self) -> ComposeResult:
|
|
label = f"cpu{self.cpu}" if self.cpu >= 0 else "total"
|
|
yield Label(label + " ")
|
|
yield Label("0", id=f"counter_{label}")
|
|
|
|
|
|
class CounterSparkline(HorizontalGroup):
|
|
"""A Sparkline for a performance counter."""
|
|
|
|
def __init__(self, cpu: int) -> None:
|
|
self.cpu = cpu
|
|
super().__init__()
|
|
|
|
def compose(self) -> ComposeResult:
|
|
label = f"cpu{self.cpu}" if self.cpu >= 0 else "total"
|
|
yield Label(label)
|
|
yield Sparkline([], summary_function=max, id=f"sparkline_{label}")
|
|
|
|
|
|
class IListApp(App):
|
|
TITLE = "Interactive Perf List"
|
|
|
|
BINDINGS = [
|
|
Binding(key="s", action="search", description="Search",
|
|
tooltip="Search events and PMUs"),
|
|
Binding(key="n", action="next", description="Next",
|
|
tooltip="Next search result or item"),
|
|
Binding(key="p", action="prev", description="Previous",
|
|
tooltip="Previous search result or item"),
|
|
Binding(key="c", action="collapse", description="Collapse",
|
|
tooltip="Collapse the current PMU"),
|
|
Binding(key="^q", action="quit", description="Quit",
|
|
tooltip="Quit the app"),
|
|
]
|
|
|
|
CSS = """
|
|
/* Make the 'total' sparkline a different color. */
|
|
#sparkline_total > .sparkline--min-color {
|
|
color: $accent;
|
|
}
|
|
#sparkline_total > .sparkline--max-color {
|
|
color: $accent 30%;
|
|
}
|
|
/*
|
|
* Make the active_search initially not displayed with the text in
|
|
* the middle of the line.
|
|
*/
|
|
#active_search {
|
|
display: none;
|
|
width: 100%;
|
|
text-align: center;
|
|
}
|
|
"""
|
|
|
|
def __init__(self, interval: float) -> None:
|
|
self.interval = interval
|
|
self.evlist = None
|
|
self.selected: Optional[TreeValue] = None
|
|
self.search_results: list[TreeNode[TreeValue]] = []
|
|
self.cur_search_result: TreeNode[TreeValue] | None = None
|
|
super().__init__()
|
|
|
|
def expand_and_select(self, node: TreeNode[Any]) -> None:
|
|
"""Expand select a node in the tree."""
|
|
if node.parent:
|
|
node.parent.expand()
|
|
if node.parent.parent:
|
|
node.parent.parent.expand()
|
|
node.expand()
|
|
node.tree.select_node(node)
|
|
node.tree.scroll_to_node(node)
|
|
|
|
def set_searched_tree_node(self, previous: bool) -> None:
|
|
"""Set the cur_search_result node to either the next or previous."""
|
|
l = len(self.search_results)
|
|
|
|
if l < 1:
|
|
tree: Tree[TreeValue] = self.query_one("#root", Tree)
|
|
if previous:
|
|
tree.action_cursor_up()
|
|
else:
|
|
tree.action_cursor_down()
|
|
return
|
|
|
|
if self.cur_search_result:
|
|
idx = self.search_results.index(self.cur_search_result)
|
|
if previous:
|
|
idx = idx - 1 if idx > 0 else l - 1
|
|
else:
|
|
idx = idx + 1 if idx < l - 1 else 0
|
|
else:
|
|
idx = l - 1 if previous else 0
|
|
|
|
node = self.search_results[idx]
|
|
if node == self.cur_search_result:
|
|
return
|
|
|
|
self.cur_search_result = node
|
|
self.expand_and_select(node)
|
|
|
|
def action_search(self) -> None:
|
|
"""Search was chosen."""
|
|
def set_initial_focus(event: str | None) -> None:
|
|
"""Sets the focus after the SearchScreen is dismissed."""
|
|
|
|
search_label = self.query_one("#active_search", Label)
|
|
search_label.display = True if event else False
|
|
if not event:
|
|
return
|
|
event = event.lower()
|
|
search_label.update(f'Searching for events matching "{event}"')
|
|
|
|
tree: Tree[str] = self.query_one("#root", Tree)
|
|
|
|
def find_search_results(event: str, node: TreeNode[str],
|
|
cursor_seen: bool = False,
|
|
match_after_cursor: Optional[TreeNode[str]] = None
|
|
) -> Tuple[bool, Optional[TreeNode[str]]]:
|
|
"""Find nodes that match the search remembering the one after the cursor."""
|
|
if not cursor_seen and node == tree.cursor_node:
|
|
cursor_seen = True
|
|
if node.data and node.data.matches(event):
|
|
if cursor_seen and not match_after_cursor:
|
|
match_after_cursor = node
|
|
self.search_results.append(node)
|
|
|
|
if node.children:
|
|
for child in node.children:
|
|
(cursor_seen, match_after_cursor) = \
|
|
find_search_results(event, child, cursor_seen, match_after_cursor)
|
|
return (cursor_seen, match_after_cursor)
|
|
|
|
self.search_results.clear()
|
|
(_, self.cur_search_result) = find_search_results(event, tree.root)
|
|
if len(self.search_results) < 1:
|
|
self.push_screen(ErrorScreen(f"Failed to find pmu/event or metric {event}"))
|
|
search_label.display = False
|
|
elif self.cur_search_result:
|
|
self.expand_and_select(self.cur_search_result)
|
|
else:
|
|
self.set_searched_tree_node(previous=False)
|
|
|
|
self.push_screen(SearchScreen(), set_initial_focus)
|
|
|
|
def action_next(self) -> None:
|
|
"""Next was chosen."""
|
|
self.set_searched_tree_node(previous=False)
|
|
|
|
def action_prev(self) -> None:
|
|
"""Previous was chosen."""
|
|
self.set_searched_tree_node(previous=True)
|
|
|
|
def action_collapse(self) -> None:
|
|
"""Collapse the part of the tree currently on."""
|
|
tree: Tree[str] = self.query_one("#root", Tree)
|
|
node = tree.cursor_node
|
|
if node and node.parent:
|
|
node.parent.collapse_all()
|
|
node.tree.scroll_to_node(node.parent)
|
|
|
|
def update_counts(self) -> None:
|
|
"""Called every interval to update counts."""
|
|
if not self.selected or not self.evlist:
|
|
return
|
|
|
|
def update_count(cpu: int, count: int):
|
|
# Update the raw count display.
|
|
counter: Label = self.query(f"#counter_cpu{cpu}" if cpu >= 0 else "#counter_total")
|
|
if not counter:
|
|
return
|
|
counter = counter.first(Label)
|
|
counter.update(str(count))
|
|
|
|
# Update the sparkline.
|
|
line: Sparkline = self.query(f"#sparkline_cpu{cpu}" if cpu >= 0 else "#sparkline_total")
|
|
if not line:
|
|
return
|
|
line = line.first(Sparkline)
|
|
# If there are more events than the width, remove the front event.
|
|
if len(line.data) > line.size.width:
|
|
line.data.pop(0)
|
|
line.data.append(count)
|
|
line.mutate_reactive(Sparkline.data)
|
|
|
|
# Update the total and each CPU counts, assume there's just 1 evsel.
|
|
total = 0
|
|
self.evlist.disable()
|
|
for evsel in self.evlist:
|
|
for cpu in evsel.cpus():
|
|
aggr = 0
|
|
for thread in evsel.threads():
|
|
aggr += self.selected.value(self.evlist, evsel, cpu, thread)
|
|
update_count(cpu, aggr)
|
|
total += aggr
|
|
update_count(-1, total)
|
|
self.evlist.enable()
|
|
|
|
def on_mount(self) -> None:
|
|
"""When App starts set up periodic event updating."""
|
|
self.update_counts()
|
|
self.set_interval(self.interval, self.update_counts)
|
|
|
|
def set_selected(self, value: TreeValue) -> None:
|
|
"""Updates the event/description and starts the counters."""
|
|
try:
|
|
label_name = self.query_one("#event_name", Label)
|
|
event_description = self.query_one("#event_description", Static)
|
|
lines = self.query_one("#lines")
|
|
except NoMatches:
|
|
# A race with rendering, ignore the update as we can't
|
|
# mount the assumed output widgets.
|
|
return
|
|
|
|
self.selected = value
|
|
|
|
# Remove previous event information.
|
|
if self.evlist:
|
|
self.evlist.disable()
|
|
self.evlist.close()
|
|
old_lines = self.query(CounterSparkline)
|
|
for line in old_lines:
|
|
line.remove()
|
|
old_counters = self.query(Counter)
|
|
for counter in old_counters:
|
|
counter.remove()
|
|
|
|
# Update event/metric text and description.
|
|
label_name.update(value.name())
|
|
event_description.update(value.description())
|
|
|
|
# Open the event.
|
|
try:
|
|
self.evlist = value.parse()
|
|
if self.evlist:
|
|
self.evlist.open()
|
|
self.evlist.enable()
|
|
except:
|
|
self.evlist = None
|
|
|
|
if not self.evlist:
|
|
self.push_screen(ErrorScreen(f"Failed to open {value.name()}"))
|
|
return
|
|
|
|
# Add spark lines for all the CPUs. Note, must be done after
|
|
# open so that the evlist CPUs have been computed by propagate
|
|
# maps.
|
|
line = CounterSparkline(cpu=-1)
|
|
lines.mount(line)
|
|
for cpu in self.evlist.all_cpus():
|
|
line = CounterSparkline(cpu)
|
|
lines.mount(line)
|
|
line = Counter(cpu=-1)
|
|
lines.mount(line)
|
|
for cpu in self.evlist.all_cpus():
|
|
line = Counter(cpu)
|
|
lines.mount(line)
|
|
|
|
def compose(self) -> ComposeResult:
|
|
"""Draws the app."""
|
|
def metric_event_tree() -> Tree:
|
|
"""Create tree of PMUs and metricgroups with events or metrics under."""
|
|
tree: Tree[TreeValue] = Tree("Root", id="root")
|
|
pmus = tree.root.add("PMUs")
|
|
for pmu in perf.pmus():
|
|
pmu_name = pmu.name().lower()
|
|
pmu_node = pmus.add(pmu_name)
|
|
try:
|
|
for event in sorted(pmu.events(), key=lambda x: x["name"]):
|
|
if "name" in event:
|
|
e = event["name"].lower()
|
|
if "alias" in event:
|
|
pmu_node.add_leaf(f'{e} ({event["alias"]})',
|
|
data=PmuEvent(pmu_name, e))
|
|
else:
|
|
pmu_node.add_leaf(e, data=PmuEvent(pmu_name, e))
|
|
except:
|
|
# Reading events may fail with EPERM, ignore.
|
|
pass
|
|
metrics = tree.root.add("Metrics")
|
|
groups = set()
|
|
for metric in perf.metrics():
|
|
groups.update(metric["MetricGroup"])
|
|
|
|
def add_metrics_to_tree(node: TreeNode[TreeValue], parent: str):
|
|
for metric in sorted(perf.metrics(), key=lambda x: x["MetricName"]):
|
|
if parent in metric["MetricGroup"]:
|
|
name = metric["MetricName"]
|
|
node.add_leaf(name, data=Metric(name))
|
|
child_group_name = f'{name}_group'
|
|
if child_group_name in groups:
|
|
add_metrics_to_tree(node.add(child_group_name), child_group_name)
|
|
|
|
for group in sorted(groups):
|
|
if group.endswith('_group'):
|
|
continue
|
|
add_metrics_to_tree(metrics.add(group), group)
|
|
|
|
tree.root.expand()
|
|
return tree
|
|
|
|
yield Header(id="header")
|
|
yield Horizontal(Vertical(metric_event_tree(), id="events"),
|
|
Vertical(Label("event name", id="event_name"),
|
|
Static("description", markup=False, id="event_description"),
|
|
))
|
|
yield Label(id="active_search")
|
|
yield VerticalScroll(id="lines")
|
|
yield Footer(id="footer")
|
|
|
|
@on(Tree.NodeSelected)
|
|
def on_tree_node_selected(self, event: Tree.NodeSelected[TreeValue]) -> None:
|
|
"""Called when a tree node is selected, selecting the event."""
|
|
if event.node.data:
|
|
self.set_selected(event.node.data)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument('-I', '--interval', help="Counter update interval in seconds", default=0.1)
|
|
args = ap.parse_args()
|
|
app = IListApp(float(args.interval))
|
|
app.run()
|