r"""Parse bazel query outputs
A larger system description:
- inputs:
- bazel query //... --output proto > query_result.pb
- the full dependency tree
- bazel test //... --build_event_binary_file=test_all.pb
- bazel run //utils:bep_reader < test_all.pb
- the execution time related to each test target
- git_utils.get_file_commit_map_from_log
- how files have changed over time, can be used to generate
probabilities of files changing in the future
- intermediates:
- representation for source files and bazel together
- outputs:
- test targets:
- likelihood of executing
- expected value of runtime
- source files:
- cost in execution time of modification
- expected cost of file change (based on probability of change * cost)
- graph with the values above, we could take any set of file inputs and
describe cost
- graph that we could identify overly depended upon things
- git log --since="10 years ago" --name-only --pretty=format: | sort \
| uniq -c | sort -nr
- this is much faster
- could identify renames via:
- git log --since="1 month ago" --name-status --pretty=format: \
| grep -P 'R[0-9]*\t' | awk '{print $2, "->", $3}'
- then correct
- can get commit association via
- git log --since="1 month ago" --name-status --pretty=format:"%H"
- statuses are A,M,D,R\d\d\d
```
# Regex pattern to match the git log output
pattern = r"^([AMD])\s+(.+?)(\s*->\s*(.+))?$|^R(\d+)\s+(.+?)\s*->\s*(.+)$"
# Parse each line using the regex
for line in git_log_output.strip().split('\n'):
match = re.match(pattern, line.strip())
if match:
if match.group(1): # For A, M, D statuses
change_type = match.group(1)
old_file = match.group(2)
new_file = match.group(4) if match.group(4) else None
print(f"Change type: {change_type}, Old file: {old_file}, "
f"New file: {new_file}")
elif match.group(5): # For R status (renames)
change_type = 'R'
similarity_index = match.group(5)
old_file = match.group(6)
new_file = match.group(7)
print(f"Change type: {change_type}, Similarity index:"
f" {similarity_index}, Old file: {old_file}, New file:"
f" {new_file}")
```
Example Script:
repo_dir=`pwd`
file_commit_pb=$repo_dir/file_commit.pb
query_pb=$repo_dir/s_result.pb
bep_pb=$repo_dir/test_all.pb
out_gml=$repo_dir/my.gml
out_csv=$repo_dir/my.csv
out_html=$repo_dir/my.html
# Prepare data
bazel query "//... - //docs/... - //third_party/bazel/..." --output proto \
> $query_pb
bazel test //... --build_event_binary_file=$bep_pb
bazel run //apps/bazel_parser --output_groups=-mypy -- git-capture --repo-dir \
$repo_dir --days-ago 400 --file-commit-pb $file_commit_pb
# Separate step if we want build timing data
bazel clean
bazel build --noremote_accept_cached \
--experimental_execution_log_compact_file=exec_log.pb.zst \
--generate_json_trace_profile --profile=example_profile_new.json \
//...
# Would then need to process the exec_log.pb.zst file to get timing from it and
# then add to the other timing information
# Process and visualize the data
bazel run //apps/bazel_parser --output_groups=-mypy -- process \
--file-commit-pb $file_commit_pb --query-pb $query_pb --bep-pb \
$bep_pb --out-gml $out_gml --out-csv $out_csv
bazel run //apps/bazel_parser --output_groups=-mypy -- visualize \
--gml $out_gml --out-html $out_html
"""
import datetime
import logging
import pathlib
import subprocess
import sys
import tempfile
import click
import networkx
import pandas
import pydantic
import yaml
from apps.bazel_parser import panel
from apps.bazel_parser import parsing
from apps.bazel_parser import refinement
from tools import bazel_utils
from tools import git_pb2
from tools import git_utils
from utils import bep_reader
[docs]
logger = logging.getLogger(__name__)
[docs]
PATH_TYPE = click.Path(exists=True, path_type=pathlib.Path)
[docs]
OUT_PATH_TYPE = click.Path(exists=False, path_type=pathlib.Path)
[docs]
class Config(pydantic.BaseModel):
# Error if extra arguments
[docs]
model_config = pydantic.ConfigDict(extra="forbid")
[docs]
refinement: refinement.RefinementConfig
[docs]
def load_config(config_yaml_path: pathlib.Path, overrides: dict) -> Config:
with open(config_yaml_path, "r") as f:
raw_data = yaml.safe_load(f)
# Apply overrides
raw_data.update(overrides)
# Validates and parses
return Config(**raw_data)
[docs]
def get_config(
config_file: pathlib.Path | None, days_ago: int | None
) -> Config:
if config_file:
overrides = {}
if days_ago is not None:
overrides["days_ago"] = days_ago
return load_config(config_file, overrides=overrides)
else:
assert days_ago is not None
return Config(
query_target="//...",
test_target="//...",
days_ago=days_ago,
refinement=refinement.RefinementConfig(
name_patterns=[],
class_patterns=[],
class_pattern_to_name_patterns={},
),
)
@click.group()
@click.command()
@click.option("--repo-dir", type=PATH_TYPE, required=True)
@click.option("--days-ago", type=int, required=True)
@click.option("--file-commit-pb", type=OUT_PATH_TYPE, required=True)
[docs]
def git_capture(
repo_dir: pathlib.Path,
days_ago: int,
file_commit_pb: pathlib.Path,
) -> None:
git_query_after = datetime.datetime.now() - datetime.timedelta(
days=days_ago
)
file_commit_map = git_utils.get_file_commit_map_from_log(
git_directory=repo_dir, after=git_query_after
)
file_commit_pb.write_bytes(
file_commit_map.to_proto().SerializeToString(deterministic=True)
)
@click.command()
@click.option("--query-pb", type=PATH_TYPE, required=True)
@click.option("--bep-pb", type=PATH_TYPE, required=True)
@click.option("--file-commit-pb", type=PATH_TYPE, required=True)
@click.option("--out-gml", type=OUT_PATH_TYPE, required=True)
@click.option("--out-csv", type=OUT_PATH_TYPE, required=True)
@click.option("--config-file", type=PATH_TYPE, required=False)
[docs]
def process(
query_pb: pathlib.Path,
bep_pb: pathlib.Path,
file_commit_pb: pathlib.Path,
out_gml: pathlib.Path,
out_csv: pathlib.Path,
config_file: pathlib.Path | None,
) -> None:
# days_ago is unused here, but just placing to get a value
config = get_config(config_file, days_ago=28)
logger.info("Query...")
query_result = bazel_utils.parse_build_output(query_pb.read_bytes())
logger.info("Runtime...")
with bep_pb.open("rb") as bep_buf:
label_to_runtime = bep_reader.get_label_to_runtime(bep_buf)
logger.info("Probability...")
file_commit_proto = git_pb2.FileCommitMap()
file_commit_proto.ParseFromString(file_commit_pb.read_bytes())
file_commit_map = git_utils.FileCommitMap.from_proto(file_commit_proto)
logger.info("Get graph data")
r = parsing.get_repo_graph_data(
query_result=query_result,
label_to_runtime=label_to_runtime,
file_commit_map=file_commit_map,
)
logger.info("Refining...")
refinement.full_refinement(
repo=r,
refinement=config.refinement,
verbosity=refinement.Verbosity.COUNT,
)
r.refresh()
graph_metrics = r.get_graph_metrics()
logger.info("Graph metrics: %s", graph_metrics)
# TODO: serialize graph_metrics to output file after case studies
r.to_csv(out_csv)
r.to_gml(out_gml)
@click.command()
@click.option("--repo-dir", type=PATH_TYPE, required=True)
@click.option("--days-ago", type=int, required=False)
@click.option("--config-file", type=PATH_TYPE, required=False)
@click.option("--out-gml", type=OUT_PATH_TYPE, required=False)
@click.option("--out-csv", type=OUT_PATH_TYPE, required=False)
[docs]
def full(
repo_dir: pathlib.Path,
days_ago: int | None,
config_file: pathlib.Path | None,
out_gml: pathlib.Path | None,
out_csv: pathlib.Path | None,
) -> None:
config = get_config(config_file, days_ago=days_ago)
# Query for graph
logger.info("Querying...")
query_pb = subprocess.check_output(
[
"bazel",
"query",
"--notool_deps",
"--output",
"proto",
config.query_target,
],
cwd=repo_dir,
)
query_result = bazel_utils.parse_build_output(query_pb)
# Test for timing
logger.info("Testing...")
with tempfile.NamedTemporaryFile() as tmpfile:
bep_pb = tmpfile.name
subprocess.check_call(
[
"bazel",
"test",
f"--build_event_binary_file={bep_pb}",
config.test_target,
],
cwd=repo_dir,
)
with open(bep_pb, "rb") as bep_buf:
label_to_runtime = bep_reader.get_label_to_runtime(bep_buf)
# Capture git information
logger.info("History from git...")
git_query_after = datetime.datetime.now() - datetime.timedelta(
days=config.days_ago
)
file_commit_map = git_utils.get_file_commit_map_from_log(
git_directory=repo_dir, after=git_query_after
)
logger.info("Parsing...")
r = parsing.get_repo_graph_data(
query_result=query_result,
label_to_runtime=label_to_runtime,
file_commit_map=file_commit_map,
)
logger.info("Refining...")
refinement.full_refinement(
repo=r,
refinement=config.refinement,
verbosity=refinement.Verbosity.COUNT,
)
logger.info("Outputting...")
graph_metrics = r.get_graph_metrics()
logger.info("Graph metrics: %s", graph_metrics)
# TODO: serialize graph_metrics to output file after case studies
if out_csv is not None:
r.to_csv(out_csv)
if out_gml is not None:
r.to_gml(out_gml)
logger.info("Done...")
def _emit_refinement_suggestions(df: pandas.DataFrame) -> None:
total_by_class = df.groupby("node_class").size().rename("total")
seen: set[str] = set()
def _check(
mask: pandas.Series,
description: str,
suggestions: list[tuple[str, str, int, int, float]],
) -> None:
flagged = df[mask & ~df["node_class"].isin(seen)]
if len(flagged) == 0:
return
counts = flagged.groupby("node_class").size().rename("count")
stats: pandas.DataFrame = pandas.concat(
[total_by_class, counts], axis=1
).dropna()
stats["pct"] = stats["count"] / stats["total"] * 100
mask = (stats["pct"] > 50) & (stats["count"] >= 10)
candidates = pandas.DataFrame(stats.loc[mask]).sort_values(
by="count", ascending=False
)
for node_class, row in candidates.iterrows():
seen.add(str(node_class))
suggestions.append(
(
str(node_class),
description,
int(row["count"]),
int(row["total"]),
row["pct"],
)
)
suggestions: list[tuple[str, str, int, int, float]] = []
_check(
(df["num_ancestors"] == 0) & (df["num_descendants"] == 0),
"fully isolated (no edges)",
suggestions,
)
_check(
(df["num_descendants"] == 0) & ~df["is_source"] & ~df["has_duration"],
"non-source leaf (distorts ancestor scores of parents)",
suggestions,
)
_check(
(df["num_ancestors"] + df["num_descendants"] <= 2)
& (df["num_ancestors"] + df["num_descendants"] > 0)
& ~df["is_source"]
& ~df["has_duration"],
"very low connectivity — likely in a small disconnected component",
suggestions,
)
if suggestions:
click.echo("\n--- REFINEMENT SUGGESTIONS ---")
click.echo(
"Node classes that may distort analysis results.\n"
"Consider adding to refinement.class_patterns in your config:"
)
for node_class, reason, count, total, pct in suggestions:
click.echo(
f" {node_class}: {count} of {total} nodes — {reason} "
f"({pct:.0f}%)"
)
@click.command()
@click.option("--csv", "csv_path", type=PATH_TYPE, required=True)
@click.option("--top-n", type=int, default=10, show_default=True)
[docs]
def report(csv_path: pathlib.Path, top_n: int) -> None:
df = pandas.read_csv(csv_path)
num_nodes = len(df)
num_sources = df["is_source"].sum()
num_tests = df["has_duration"].sum()
total_duration_s = df["node_duration_s"].sum()
expected_duration_s = (
df["node_duration_s"] * (1 - df["group_probability_cache_hit"])
).sum()
avg_nodes_per_commit = (1 - df["group_probability_cache_hit"]).sum()
click.echo("=== BUILD GRAPH REPORT ===")
click.echo(f"Source: {csv_path}")
click.echo(
f"Nodes: {num_nodes} | "
f"Source files: {num_sources} | "
f"Tests: {num_tests}"
)
click.echo(
f"Total test duration: {total_duration_s:.1f}s | "
f"Expected cost/commit: {expected_duration_s:.1f}s | "
f"Avg nodes invalidated/commit: {avg_nodes_per_commit:.0f}"
)
click.echo("\n--- STRUCTURAL BOTTLENECKS ---")
click.echo(
"Nodes with both dependents and dependencies, ranked by structural\n"
"coupling (ancestors × descendants). Splitting reduces blast radius."
)
mid = pandas.DataFrame(
df.loc[(df["num_ancestors"] > 0) & (df["num_descendants"] > 0)]
)
for _, row in mid.nlargest(top_n, "ancestors_by_descendants").iterrows():
click.echo(
f" {row['node_name']} [{row['node_class']}]\n"
f" ancestors={row['num_ancestors']}, "
f"descendants={row['num_descendants']}, "
f"score={int(row['ancestors_by_descendants']):,}, "
f"expected_duration={row['expected_duration_s']:.1f}s"
)
click.echo("\n--- COSTLY BOTTLENECKS ---")
click.echo(
"Same filter, ranked by expected duration: downstream test time\n"
"weighted by invalidation probability. Prioritizes CI cost."
)
for _, row in mid.nlargest(top_n, "expected_duration_s").iterrows():
cache_pct = row["group_probability_cache_hit"] * 100
click.echo(
f" {row['node_name']} [{row['node_class']}]\n"
f" expected_duration={row['expected_duration_s']:.1f}s, "
f"cache_hit={cache_pct:.1f}%, "
f"score={int(row['ancestors_by_descendants']):,}"
)
click.echo("\n--- HOT SOURCE FILES ---")
click.echo(
"Source files that change and trigger many downstream rebuilds.\n"
"Isolating into narrower targets reduces blast radius."
)
src = pandas.DataFrame(
df.loc[df["is_source"] & (df["node_probability_cache_hit"] < 1.0)]
)
for _, row in src.nlargest(top_n, "ancestors_by_group_p").iterrows():
cache_pct = row["node_probability_cache_hit"] * 100
click.echo(
f" {row['node_name']}\n"
f" change_cost={row['ancestors_by_group_p']:.1f}, "
f"cache_hit={cache_pct:.1f}%, "
f"downstream_tests={row['group_duration_s']:.1f}s"
)
click.echo("\n--- EXPENSIVE TESTS ---")
click.echo(
"Test targets with high expected cost (slow and frequently "
"invalidated).\nReducing their dependencies lowers CI cost per commit."
)
tests = df[df["has_duration"]]
for _, row in tests.nlargest(top_n, "expected_duration_s").iterrows():
cache_pct = row["group_probability_cache_hit"] * 100
click.echo(
f" {row['node_name']}\n"
f" duration={row['node_duration_s']:.1f}s, "
f"cache_hit={cache_pct:.1f}%, "
f"expected_duration={row['expected_duration_s']:.1f}s"
)
_emit_refinement_suggestions(df)
@click.command()
@click.option("--gml", type=PATH_TYPE, required=True)
@click.option("--out-html", type=OUT_PATH_TYPE, required=False)
[docs]
def visualize(
gml: pathlib.Path,
out_html: pathlib.Path | None,
) -> None:
graph = networkx.read_gml(gml)
panel.run_panel(graph=graph, html_out=out_html)
if __name__ == "__main__":
logging.basicConfig(
stream=sys.stderr,
format=(
"%(asctime)s - %(levelname)s - %(name)s:%(lineno)d -"
" %(message)s"
),
datefmt="%Y-%m-%d %H:%M:%S",
level=logging.DEBUG,
)
cli.add_command(git_capture)
cli.add_command(process)
cli.add_command(report)
cli.add_command(visualize)
cli.add_command(full)
cli()