Source code for src.dashboard

from __future__ import annotations
import csv
import logging
import json
import subprocess as sp
from pathlib import Path
from ast import literal_eval
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    List,
    Tuple,
    Type,
    Iterable,
    NamedTuple,
    Set,
)
from collections import defaultdict
from enum import Enum
from itertools import chain

from src.config import context

if TYPE_CHECKING:
    from src.opencode import OpenCode

logger: logging.Logger = logging.getLogger(__name__)


[docs] class DashboardColumns(Enum): ID = 0 NAME = 1 COMMITS = 2 LAST_UPDATE = 3 BRANCHES = 4 ISSUES = 5 CONTRIBUTORS = 6 USERS = 7 STARS = 8 LANGUAGES = 9 LOC = 10
[docs] class Dashboard: dashboard_file_path: Path = context.settings["Dashboard_file"] pl_whitelist_path: Path = context.settings["Dashboard_pl_whitelist"] pl_whitelist_wiki_path: Path = context.settings[ "Dashboard_pl_whitelist_wiki" ] pl_whitelist_manual: Set[str] = set( [ "bourne shell", "c# generated", "objective-c++", "zsh", "visual basic", "bourne again shell", ] ) pl_blacklist_manual: Set[str] = set(["HTML"]) # 702 = Glibc repo_blacklist_manual: Set[int] = set([702]) dashboard_columns: List[Tuple[str, Type]] = [ ("id", int), ("Name", str), ("Commits", int), ("LastUpdate", str), ("Branches", int), ("Issues", int), ("Contributors", int), ("Users", int), ("Stars", int), ("Languages", str), ("LOC", str), ] dashboardColumns = NamedTuple("dashboardColumns", dashboard_columns)
[docs] def __init__(self, oc: "OpenCode") -> None: self.oc: OpenCode = oc self._maybe_create_pl_whitelist() self.pl_whitelist: Set[str] = self._load_pl_whitelist()
[docs] def _load_pl_whitelist(self) -> Set[str]: pl_whitelist: Set[str] = set() assert self.pl_whitelist_path.exists() assert self.pl_whitelist_wiki_path.exists() with self.pl_whitelist_path.open( mode="r", newline="", encoding="utf-8" ) as pl_whitelist_file, self.pl_whitelist_wiki_path.open( mode="r", newline="", encoding="utf-8" ) as pl_whitelist_wiki_file: for programming_language in chain( pl_whitelist_file, pl_whitelist_wiki_file, self.pl_whitelist_manual, ): pl_whitelist.add(programming_language.strip()) for not_a_programming_language in self.pl_blacklist_manual: if not_a_programming_language.lower() in pl_whitelist: pl_whitelist.remove(not_a_programming_language.lower()) return pl_whitelist
[docs] def _maybe_create_pl_whitelist(self) -> bool: """`cloc` also counts stuff that's not a PL, use a whitelist to filter its output""" if self.pl_whitelist_path.exists(): return False logger.info("Creating a fresh PL Whitelist") if not self.dashboard_file_path.exists(): self.create_dashboard() pl_whitelist: Set[str] = set() for project in self._iter_dashboard(): # ... and there goes security out of the window, bye bye pl_dict: Dict[str, float] = literal_eval(project.Languages) for programming_language in pl_dict.keys(): pl_whitelist.add(programming_language.lower()) with self.pl_whitelist_path.open( mode="w", newline="", encoding="utf-8" ) as pl_whitelist_file: for programming_language in pl_whitelist: pl_whitelist_file.write(programming_language + "\n") return True
# pylint: disable=too-complex
[docs] def create_dashboard(self) -> None: # pylint: disable=too-many-locals logger.info("Creating a fresh dashboard") if self.dashboard_file_path.exists(): logger.info("Deleting old dashboard") self.dashboard_file_path.unlink() with self.dashboard_file_path.open( mode="w", newline="", encoding="utf-8" ) as f: num = 1 writer = csv.writer(f) writer.writerow( [name for name, _ in self.dashboard_columns] ) for proj, repo in self.oc.iter_projects(): logger.info( f"Processing repo {num} of {len(self.oc.projects)}" ) num += 1 # id _id: int = int(proj.id) # Name name: str = str(proj.name_with_namespace) logger.info(f"{_id} {name}") # Commits try: num_commits: int = int( sp.run( ["git", "rev-list", "--all", "--count"], check=True, capture_output=True, cwd=repo.working_tree_dir, ).stdout.decode() ) except sp.CalledProcessError as E: logger.error( f"Failed to count commits: {E.cmd}, {E.stdout}," f"{E.stderr}, {E.output}" ) num_commits: int = -1 # Last commit try: commit_time: str = str( next(repo.iter_commits()).authored_datetime ) except ValueError as E: logger.error( f"Failed to determine time of last commit: {E}" ) commit_time: str = "n.a." # Branches num_branches: int = len(repo.branches) # Issues num_issues: int = len(proj.issues.list(get_all=True)) # Contributors num_contrib: int = len( proj.repository_contributors(get_all=True) ) # Users num_users: int = len(proj.users.list(get_all=True)) # Stars num_stars: int = int(proj.star_count) # Languages langs: str = str(dict(proj.languages())) # LOC try: loc: str = str( sp.run( [ "timeout", "--kill-after=" f"{context.settings['Dashboard_pl_loc_timeout']}", f"{context.settings['Dashboard_pl_loc_timeout']}", "docker", "run", "--rm", "-v", str(repo.working_tree_dir) + ":/tmp", "aldanial/cloc", "--processes=" f"{context.settings['Dashboard_pl_loc_ncpu']}", "--vcs=git", "--json", ], check=True, capture_output=True, cwd=repo.working_tree_dir, ).stdout.decode() ) except sp.CalledProcessError as E: logger.error( f"Failed to count loc: {E.cmd}, {E.stdout}," f"{E.stderr}, {E.output}" ) loc: str = "{}" writer.writerow( [ _id, name, num_commits, commit_time, num_branches, num_issues, num_contrib, num_users, num_stars, langs, loc, ] )
[docs] def _iter_dashboard(self) -> Iterable[dashboardColumns]: with self.dashboard_file_path.open( mode="r", encoding="utf-8" ) as f: reader = csv.reader(f) try: next(reader) except StopIteration: return for line in reader: tmp = self.dashboardColumns(*line) if int(tmp.id) in self.repo_blacklist_manual: logger.info( f"Skipping project {tmp.id} {tmp.Name} " "since it is on blacklist" ) else: yield tmp
[docs] def pl_repo(self) -> None: if not self.dashboard_file_path.exists(): self.create_dashboard() proj_w_code: int = 0 pl_sums: Dict[str, float] = defaultdict(lambda: 0.0) for project in self._iter_dashboard(): # ... and there goes security out of the window, bye bye pl_dict: Dict[str, float] = literal_eval(project.Languages) if pl_dict.keys(): proj_w_code += 1 for programming_language, fraction in pl_dict.items(): pl_sums[programming_language] += fraction result: List[Tuple[str, float]] = sorted( [(k, v / (100 * proj_w_code)) for k, v in pl_sums.items()], key=lambda x: x[1], reverse=True, ) # ein bisschen Schwund is immer, aber bitte nicht zu viel assert sum(v for _, v in result) > 0.99 self._pl_print_cvs(result)
[docs] def pl_loc(self) -> None: total_pl_whitelist_loc: int = 0 pl_sums: Dict[str, float] = defaultdict(lambda: 0) for project in self._iter_dashboard(): not_pl_whitelist_loc: int = 0 cloc: Dict[str, Any] = json.loads(project.LOC) if not cloc.keys(): logger.warning(f"No cLOC info for {project.Name}") continue for lang, stats in cloc.items(): if lang in {"header", "SUM"}: continue if lang.lower() not in self.pl_whitelist: logger.debug( f"Skipping language {lang} as it is not in " "whitelist" ) not_pl_whitelist_loc += int(stats["code"]) continue pl_sums[lang] += int(stats["code"]) total_pl_whitelist_loc += ( int(cloc["SUM"]["code"]) - not_pl_whitelist_loc ) result: List[Tuple[str, float]] = sorted( [ (lang, loc / total_pl_whitelist_loc) for lang, loc in pl_sums.items() ], key=lambda x: x[1], reverse=True, ) assert sum(v for _, v in result) > 0.99 self._pl_print_cvs(result)
[docs] def _pl_print_cvs(self, result: List[Tuple[str, float]]) -> None: print("Language, Fraction") for lang in result: print(f"{lang[0]}, {lang[1]}")
[docs] def run(self, args_dict: Dict[str, Any]) -> None: if args_dict.get("analysis", None): getattr(self, args_dict.get("analysis", None))() else: self.create_dashboard()