Source code for apps.bazel_parser.cli

r"""Parse bazel query outputs

A larger system description:
- inputs:
  - bazel query //... --output proto > query_result.pb
    - the full dependency tree
  - bazel test //... --build_event_binary_file=test_all.pb
    - bazel run //utils:bep_reader < test_all.pb
    - the execution time related to each test target
  - git_utils.get_file_commit_map_from_log
    - how files have changed over time, can be used to generate
      probabilities of files changing in the future
- intermediates:
  - representation for source files and bazel together
- outputs:
  - test targets:
    - likelihood of executing
      - expected value of runtime
  - source files:
    - cost in execution time of modification
    - expected cost of file change (based on probability of change * cost)
  - graph with the values above, we could take any set of file inputs and
    describe cost
  - graph that we could identify overly depended upon things

- git log --since="10 years ago" --name-only --pretty=format: | sort \
        | uniq -c | sort -nr
  - this is much faster
  - could identify renames via:
    - git log --since="1 month ago" --name-status --pretty=format: \
            | grep -P 'R[0-9]*\t' | awk '{print $2, "->", $3}'
    - then correct
    - can get commit association via
      - git log --since="1 month ago" --name-status --pretty=format:"%H"
    - statuses are A,M,D,R\d\d\d
```
# Regex pattern to match the git log output
pattern = r"^([AMD])\s+(.+?)(\s*->\s*(.+))?$|^R(\d+)\s+(.+?)\s*->\s*(.+)$"
# Parse each line using the regex
for line in git_log_output.strip().split('\n'):
    match = re.match(pattern, line.strip())
    if match:
        if match.group(1):  # For A, M, D statuses
            change_type = match.group(1)
            old_file = match.group(2)
            new_file = match.group(4) if match.group(4) else None
            print(f"Change type: {change_type}, Old file: {old_file}, "
                  f"New file: {new_file}")
        elif match.group(5):  # For R status (renames)
            change_type = 'R'
            similarity_index = match.group(5)
            old_file = match.group(6)
            new_file = match.group(7)
            print(f"Change type: {change_type}, Similarity index:"
                  f" {similarity_index}, Old file: {old_file}, New file:"
                  f" {new_file}")
```

Example Script:

repo_dir=`pwd`
file_commit_pb=$repo_dir/file_commit.pb
query_pb=$repo_dir/s_result.pb
bep_pb=$repo_dir/test_all.pb
out_gml=$repo_dir/my.gml
out_csv=$repo_dir/my.csv
out_html=$repo_dir/my.html

# Prepare data
bazel query "//... - //docs/... - //third_party/bazel/..." --output proto \
        > $query_pb
bazel test //... --build_event_binary_file=$bep_pb
bazel run //apps/bazel_parser --output_groups=-mypy -- git-capture --repo-dir \
        $repo_dir --days-ago 400 --file-commit-pb $file_commit_pb
# Separate step if we want build timing data
bazel clean
bazel build --noremote_accept_cached \
    --experimental_execution_log_compact_file=exec_log.pb.zst \
    --generate_json_trace_profile --profile=example_profile_new.json \
    //...
# Would then need to process the exec_log.pb.zst file to get timing from it and
# then add to the other timing information

# Process and visualize the data
bazel run //apps/bazel_parser --output_groups=-mypy -- process \
        --file-commit-pb $file_commit_pb --query-pb $query_pb --bep-pb \
        $bep_pb --out-gml $out_gml --out-csv $out_csv
bazel run //apps/bazel_parser --output_groups=-mypy -- visualize \
        --gml $out_gml --out-html $out_html
"""

import datetime
import logging
import pathlib
import subprocess
import sys
import tempfile

import click
import networkx
import pandas
import pydantic
import yaml

from apps.bazel_parser import panel
from apps.bazel_parser import parsing
from apps.bazel_parser import refinement
from tools import bazel_utils
from tools import git_pb2
from tools import git_utils
from utils import bep_reader

[docs] logger = logging.getLogger(__name__)
[docs] PATH_TYPE = click.Path(exists=True, path_type=pathlib.Path)
[docs] OUT_PATH_TYPE = click.Path(exists=False, path_type=pathlib.Path)
[docs] class Config(pydantic.BaseModel): # Error if extra arguments
[docs] model_config = pydantic.ConfigDict(extra="forbid")
[docs] query_target: str
[docs] test_target: str
[docs] days_ago: int
[docs] refinement: refinement.RefinementConfig
[docs] def load_config(config_yaml_path: pathlib.Path, overrides: dict) -> Config: with open(config_yaml_path, "r") as f: raw_data = yaml.safe_load(f) # Apply overrides raw_data.update(overrides) # Validates and parses return Config(**raw_data)
[docs] def get_config( config_file: pathlib.Path | None, days_ago: int | None ) -> Config: if config_file: overrides = {} if days_ago is not None: overrides["days_ago"] = days_ago return load_config(config_file, overrides=overrides) else: assert days_ago is not None return Config( query_target="//...", test_target="//...", days_ago=days_ago, refinement=refinement.RefinementConfig( name_patterns=[], class_patterns=[], class_pattern_to_name_patterns={}, ), )
@click.group()
[docs] def cli(): pass
@click.command() @click.option("--repo-dir", type=PATH_TYPE, required=True) @click.option("--days-ago", type=int, required=True) @click.option("--file-commit-pb", type=OUT_PATH_TYPE, required=True)
[docs] def git_capture( repo_dir: pathlib.Path, days_ago: int, file_commit_pb: pathlib.Path, ) -> None: git_query_after = datetime.datetime.now() - datetime.timedelta( days=days_ago ) file_commit_map = git_utils.get_file_commit_map_from_log( git_directory=repo_dir, after=git_query_after ) file_commit_pb.write_bytes( file_commit_map.to_proto().SerializeToString(deterministic=True) )
@click.command() @click.option("--query-pb", type=PATH_TYPE, required=True) @click.option("--bep-pb", type=PATH_TYPE, required=True) @click.option("--file-commit-pb", type=PATH_TYPE, required=True) @click.option("--out-gml", type=OUT_PATH_TYPE, required=True) @click.option("--out-csv", type=OUT_PATH_TYPE, required=True) @click.option("--config-file", type=PATH_TYPE, required=False)
[docs] def process( query_pb: pathlib.Path, bep_pb: pathlib.Path, file_commit_pb: pathlib.Path, out_gml: pathlib.Path, out_csv: pathlib.Path, config_file: pathlib.Path | None, ) -> None: # days_ago is unused here, but just placing to get a value config = get_config(config_file, days_ago=28) logger.info("Query...") query_result = bazel_utils.parse_build_output(query_pb.read_bytes()) logger.info("Runtime...") with bep_pb.open("rb") as bep_buf: label_to_runtime = bep_reader.get_label_to_runtime(bep_buf) logger.info("Probability...") file_commit_proto = git_pb2.FileCommitMap() file_commit_proto.ParseFromString(file_commit_pb.read_bytes()) file_commit_map = git_utils.FileCommitMap.from_proto(file_commit_proto) logger.info("Get graph data") r = parsing.get_repo_graph_data( query_result=query_result, label_to_runtime=label_to_runtime, file_commit_map=file_commit_map, ) logger.info("Refining...") refinement.full_refinement( repo=r, refinement=config.refinement, verbosity=refinement.Verbosity.COUNT, ) r.refresh() graph_metrics = r.get_graph_metrics() logger.info("Graph metrics: %s", graph_metrics) # TODO: serialize graph_metrics to output file after case studies r.to_csv(out_csv) r.to_gml(out_gml)
@click.command() @click.option("--repo-dir", type=PATH_TYPE, required=True) @click.option("--days-ago", type=int, required=False) @click.option("--config-file", type=PATH_TYPE, required=False) @click.option("--out-gml", type=OUT_PATH_TYPE, required=False) @click.option("--out-csv", type=OUT_PATH_TYPE, required=False)
[docs] def full( repo_dir: pathlib.Path, days_ago: int | None, config_file: pathlib.Path | None, out_gml: pathlib.Path | None, out_csv: pathlib.Path | None, ) -> None: config = get_config(config_file, days_ago=days_ago) # Query for graph logger.info("Querying...") query_pb = subprocess.check_output( [ "bazel", "query", "--notool_deps", "--output", "proto", config.query_target, ], cwd=repo_dir, ) query_result = bazel_utils.parse_build_output(query_pb) # Test for timing logger.info("Testing...") with tempfile.NamedTemporaryFile() as tmpfile: bep_pb = tmpfile.name subprocess.check_call( [ "bazel", "test", f"--build_event_binary_file={bep_pb}", config.test_target, ], cwd=repo_dir, ) with open(bep_pb, "rb") as bep_buf: label_to_runtime = bep_reader.get_label_to_runtime(bep_buf) # Capture git information logger.info("History from git...") git_query_after = datetime.datetime.now() - datetime.timedelta( days=config.days_ago ) file_commit_map = git_utils.get_file_commit_map_from_log( git_directory=repo_dir, after=git_query_after ) logger.info("Parsing...") r = parsing.get_repo_graph_data( query_result=query_result, label_to_runtime=label_to_runtime, file_commit_map=file_commit_map, ) logger.info("Refining...") refinement.full_refinement( repo=r, refinement=config.refinement, verbosity=refinement.Verbosity.COUNT, ) logger.info("Outputting...") graph_metrics = r.get_graph_metrics() logger.info("Graph metrics: %s", graph_metrics) # TODO: serialize graph_metrics to output file after case studies if out_csv is not None: r.to_csv(out_csv) if out_gml is not None: r.to_gml(out_gml) logger.info("Done...")
def _emit_refinement_suggestions(df: pandas.DataFrame) -> None: total_by_class = df.groupby("node_class").size().rename("total") seen: set[str] = set() def _check( mask: pandas.Series, description: str, suggestions: list[tuple[str, str, int, int, float]], ) -> None: flagged = df[mask & ~df["node_class"].isin(seen)] if len(flagged) == 0: return counts = flagged.groupby("node_class").size().rename("count") stats: pandas.DataFrame = pandas.concat( [total_by_class, counts], axis=1 ).dropna() stats["pct"] = stats["count"] / stats["total"] * 100 mask = (stats["pct"] > 50) & (stats["count"] >= 10) candidates = pandas.DataFrame(stats.loc[mask]).sort_values( by="count", ascending=False ) for node_class, row in candidates.iterrows(): seen.add(str(node_class)) suggestions.append( ( str(node_class), description, int(row["count"]), int(row["total"]), row["pct"], ) ) suggestions: list[tuple[str, str, int, int, float]] = [] _check( (df["num_ancestors"] == 0) & (df["num_descendants"] == 0), "fully isolated (no edges)", suggestions, ) _check( (df["num_descendants"] == 0) & ~df["is_source"] & ~df["has_duration"], "non-source leaf (distorts ancestor scores of parents)", suggestions, ) _check( (df["num_ancestors"] + df["num_descendants"] <= 2) & (df["num_ancestors"] + df["num_descendants"] > 0) & ~df["is_source"] & ~df["has_duration"], "very low connectivity — likely in a small disconnected component", suggestions, ) if suggestions: click.echo("\n--- REFINEMENT SUGGESTIONS ---") click.echo( "Node classes that may distort analysis results.\n" "Consider adding to refinement.class_patterns in your config:" ) for node_class, reason, count, total, pct in suggestions: click.echo( f" {node_class}: {count} of {total} nodes — {reason} " f"({pct:.0f}%)" ) @click.command() @click.option("--csv", "csv_path", type=PATH_TYPE, required=True) @click.option("--top-n", type=int, default=10, show_default=True)
[docs] def report(csv_path: pathlib.Path, top_n: int) -> None: df = pandas.read_csv(csv_path) num_nodes = len(df) num_sources = df["is_source"].sum() num_tests = df["has_duration"].sum() total_duration_s = df["node_duration_s"].sum() expected_duration_s = ( df["node_duration_s"] * (1 - df["group_probability_cache_hit"]) ).sum() avg_nodes_per_commit = (1 - df["group_probability_cache_hit"]).sum() click.echo("=== BUILD GRAPH REPORT ===") click.echo(f"Source: {csv_path}") click.echo( f"Nodes: {num_nodes} | " f"Source files: {num_sources} | " f"Tests: {num_tests}" ) click.echo( f"Total test duration: {total_duration_s:.1f}s | " f"Expected cost/commit: {expected_duration_s:.1f}s | " f"Avg nodes invalidated/commit: {avg_nodes_per_commit:.0f}" ) click.echo("\n--- STRUCTURAL BOTTLENECKS ---") click.echo( "Nodes with both dependents and dependencies, ranked by structural\n" "coupling (ancestors × descendants). Splitting reduces blast radius." ) mid = pandas.DataFrame( df.loc[(df["num_ancestors"] > 0) & (df["num_descendants"] > 0)] ) for _, row in mid.nlargest(top_n, "ancestors_by_descendants").iterrows(): click.echo( f" {row['node_name']} [{row['node_class']}]\n" f" ancestors={row['num_ancestors']}, " f"descendants={row['num_descendants']}, " f"score={int(row['ancestors_by_descendants']):,}, " f"expected_duration={row['expected_duration_s']:.1f}s" ) click.echo("\n--- COSTLY BOTTLENECKS ---") click.echo( "Same filter, ranked by expected duration: downstream test time\n" "weighted by invalidation probability. Prioritizes CI cost." ) for _, row in mid.nlargest(top_n, "expected_duration_s").iterrows(): cache_pct = row["group_probability_cache_hit"] * 100 click.echo( f" {row['node_name']} [{row['node_class']}]\n" f" expected_duration={row['expected_duration_s']:.1f}s, " f"cache_hit={cache_pct:.1f}%, " f"score={int(row['ancestors_by_descendants']):,}" ) click.echo("\n--- HOT SOURCE FILES ---") click.echo( "Source files that change and trigger many downstream rebuilds.\n" "Isolating into narrower targets reduces blast radius." ) src = pandas.DataFrame( df.loc[df["is_source"] & (df["node_probability_cache_hit"] < 1.0)] ) for _, row in src.nlargest(top_n, "ancestors_by_group_p").iterrows(): cache_pct = row["node_probability_cache_hit"] * 100 click.echo( f" {row['node_name']}\n" f" change_cost={row['ancestors_by_group_p']:.1f}, " f"cache_hit={cache_pct:.1f}%, " f"downstream_tests={row['group_duration_s']:.1f}s" ) click.echo("\n--- EXPENSIVE TESTS ---") click.echo( "Test targets with high expected cost (slow and frequently " "invalidated).\nReducing their dependencies lowers CI cost per commit." ) tests = df[df["has_duration"]] for _, row in tests.nlargest(top_n, "expected_duration_s").iterrows(): cache_pct = row["group_probability_cache_hit"] * 100 click.echo( f" {row['node_name']}\n" f" duration={row['node_duration_s']:.1f}s, " f"cache_hit={cache_pct:.1f}%, " f"expected_duration={row['expected_duration_s']:.1f}s" ) _emit_refinement_suggestions(df)
@click.command() @click.option("--gml", type=PATH_TYPE, required=True) @click.option("--out-html", type=OUT_PATH_TYPE, required=False)
[docs] def visualize( gml: pathlib.Path, out_html: pathlib.Path | None, ) -> None: graph = networkx.read_gml(gml) panel.run_panel(graph=graph, html_out=out_html)
if __name__ == "__main__": logging.basicConfig( stream=sys.stderr, format=( "%(asctime)s - %(levelname)s - %(name)s:%(lineno)d -" " %(message)s" ), datefmt="%Y-%m-%d %H:%M:%S", level=logging.DEBUG, ) cli.add_command(git_capture) cli.add_command(process) cli.add_command(report) cli.add_command(visualize) cli.add_command(full) cli()