from __future__ import annotations
import csv
import logging
import json
import subprocess as sp
from pathlib import Path
from ast import literal_eval
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Tuple,
Type,
Iterable,
NamedTuple,
Set,
)
from collections import defaultdict
from enum import Enum
from itertools import chain
from src.config import context
if TYPE_CHECKING:
from src.opencode import OpenCode
logger: logging.Logger = logging.getLogger(__name__)
[docs]
class DashboardColumns(Enum):
ID = 0
NAME = 1
COMMITS = 2
LAST_UPDATE = 3
BRANCHES = 4
ISSUES = 5
CONTRIBUTORS = 6
USERS = 7
STARS = 8
LANGUAGES = 9
LOC = 10
[docs]
class Dashboard:
dashboard_file_path: Path = context.settings["Dashboard_file"]
pl_whitelist_path: Path = context.settings["Dashboard_pl_whitelist"]
pl_whitelist_wiki_path: Path = context.settings[
"Dashboard_pl_whitelist_wiki"
]
pl_whitelist_manual: Set[str] = set(
[
"bourne shell",
"c# generated",
"objective-c++",
"zsh",
"visual basic",
"bourne again shell",
]
)
pl_blacklist_manual: Set[str] = set(["HTML"])
# 702 = Glibc
repo_blacklist_manual: Set[int] = set([702])
dashboard_columns: List[Tuple[str, Type]] = [
("id", int),
("Name", str),
("Commits", int),
("LastUpdate", str),
("Branches", int),
("Issues", int),
("Contributors", int),
("Users", int),
("Stars", int),
("Languages", str),
("LOC", str),
]
dashboardColumns = NamedTuple("dashboardColumns", dashboard_columns)
[docs]
def __init__(self, oc: "OpenCode") -> None:
self.oc: OpenCode = oc
self._maybe_create_pl_whitelist()
self.pl_whitelist: Set[str] = self._load_pl_whitelist()
[docs]
def _load_pl_whitelist(self) -> Set[str]:
pl_whitelist: Set[str] = set()
assert self.pl_whitelist_path.exists()
assert self.pl_whitelist_wiki_path.exists()
with self.pl_whitelist_path.open(
mode="r", newline="", encoding="utf-8"
) as pl_whitelist_file, self.pl_whitelist_wiki_path.open(
mode="r", newline="", encoding="utf-8"
) as pl_whitelist_wiki_file:
for programming_language in chain(
pl_whitelist_file,
pl_whitelist_wiki_file,
self.pl_whitelist_manual,
):
pl_whitelist.add(programming_language.strip())
for not_a_programming_language in self.pl_blacklist_manual:
if not_a_programming_language.lower() in pl_whitelist:
pl_whitelist.remove(not_a_programming_language.lower())
return pl_whitelist
[docs]
def _maybe_create_pl_whitelist(self) -> bool:
"""`cloc` also counts stuff that's not a PL, use a whitelist to
filter its output"""
if self.pl_whitelist_path.exists():
return False
logger.info("Creating a fresh PL Whitelist")
if not self.dashboard_file_path.exists():
self.create_dashboard()
pl_whitelist: Set[str] = set()
for project in self._iter_dashboard():
# ... and there goes security out of the window, bye bye
pl_dict: Dict[str, float] = literal_eval(project.Languages)
for programming_language in pl_dict.keys():
pl_whitelist.add(programming_language.lower())
with self.pl_whitelist_path.open(
mode="w", newline="", encoding="utf-8"
) as pl_whitelist_file:
for programming_language in pl_whitelist:
pl_whitelist_file.write(programming_language + "\n")
return True
# pylint: disable=too-complex
[docs]
def create_dashboard(self) -> None:
# pylint: disable=too-many-locals
logger.info("Creating a fresh dashboard")
if self.dashboard_file_path.exists():
logger.info("Deleting old dashboard")
self.dashboard_file_path.unlink()
with self.dashboard_file_path.open(
mode="w", newline="", encoding="utf-8"
) as f:
num = 1
writer = csv.writer(f)
writer.writerow(
[name for name, _ in self.dashboard_columns]
)
for proj, repo in self.oc.iter_projects():
logger.info(
f"Processing repo {num} of {len(self.oc.projects)}"
)
num += 1
# id
_id: int = int(proj.id)
# Name
name: str = str(proj.name_with_namespace)
logger.info(f"{_id} {name}")
# Commits
try:
num_commits: int = int(
sp.run(
["git", "rev-list", "--all", "--count"],
check=True,
capture_output=True,
cwd=repo.working_tree_dir,
).stdout.decode()
)
except sp.CalledProcessError as E:
logger.error(
f"Failed to count commits: {E.cmd}, {E.stdout},"
f"{E.stderr}, {E.output}"
)
num_commits: int = -1
# Last commit
try:
commit_time: str = str(
next(repo.iter_commits()).authored_datetime
)
except ValueError as E:
logger.error(
f"Failed to determine time of last commit: {E}"
)
commit_time: str = "n.a."
# Branches
num_branches: int = len(repo.branches)
# Issues
num_issues: int = len(proj.issues.list(get_all=True))
# Contributors
num_contrib: int = len(
proj.repository_contributors(get_all=True)
)
# Users
num_users: int = len(proj.users.list(get_all=True))
# Stars
num_stars: int = int(proj.star_count)
# Languages
langs: str = str(dict(proj.languages()))
# LOC
try:
loc: str = str(
sp.run(
[
"timeout",
"--kill-after="
f"{context.settings['Dashboard_pl_loc_timeout']}",
f"{context.settings['Dashboard_pl_loc_timeout']}",
"docker",
"run",
"--rm",
"-v",
str(repo.working_tree_dir) + ":/tmp",
"aldanial/cloc",
"--processes="
f"{context.settings['Dashboard_pl_loc_ncpu']}",
"--vcs=git",
"--json",
],
check=True,
capture_output=True,
cwd=repo.working_tree_dir,
).stdout.decode()
)
except sp.CalledProcessError as E:
logger.error(
f"Failed to count loc: {E.cmd}, {E.stdout},"
f"{E.stderr}, {E.output}"
)
loc: str = "{}"
writer.writerow(
[
_id,
name,
num_commits,
commit_time,
num_branches,
num_issues,
num_contrib,
num_users,
num_stars,
langs,
loc,
]
)
[docs]
def _iter_dashboard(self) -> Iterable[dashboardColumns]:
with self.dashboard_file_path.open(
mode="r", encoding="utf-8"
) as f:
reader = csv.reader(f)
try:
next(reader)
except StopIteration:
return
for line in reader:
tmp = self.dashboardColumns(*line)
if int(tmp.id) in self.repo_blacklist_manual:
logger.info(
f"Skipping project {tmp.id} {tmp.Name} "
"since it is on blacklist"
)
else:
yield tmp
[docs]
def pl_repo(self) -> None:
if not self.dashboard_file_path.exists():
self.create_dashboard()
proj_w_code: int = 0
pl_sums: Dict[str, float] = defaultdict(lambda: 0.0)
for project in self._iter_dashboard():
# ... and there goes security out of the window, bye bye
pl_dict: Dict[str, float] = literal_eval(project.Languages)
if pl_dict.keys():
proj_w_code += 1
for programming_language, fraction in pl_dict.items():
pl_sums[programming_language] += fraction
result: List[Tuple[str, float]] = sorted(
[(k, v / (100 * proj_w_code)) for k, v in pl_sums.items()],
key=lambda x: x[1],
reverse=True,
)
# ein bisschen Schwund is immer, aber bitte nicht zu viel
assert sum(v for _, v in result) > 0.99
self._pl_print_cvs(result)
[docs]
def pl_loc(self) -> None:
total_pl_whitelist_loc: int = 0
pl_sums: Dict[str, float] = defaultdict(lambda: 0)
for project in self._iter_dashboard():
not_pl_whitelist_loc: int = 0
cloc: Dict[str, Any] = json.loads(project.LOC)
if not cloc.keys():
logger.warning(f"No cLOC info for {project.Name}")
continue
for lang, stats in cloc.items():
if lang in {"header", "SUM"}:
continue
if lang.lower() not in self.pl_whitelist:
logger.debug(
f"Skipping language {lang} as it is not in "
"whitelist"
)
not_pl_whitelist_loc += int(stats["code"])
continue
pl_sums[lang] += int(stats["code"])
total_pl_whitelist_loc += (
int(cloc["SUM"]["code"]) - not_pl_whitelist_loc
)
result: List[Tuple[str, float]] = sorted(
[
(lang, loc / total_pl_whitelist_loc)
for lang, loc in pl_sums.items()
],
key=lambda x: x[1],
reverse=True,
)
assert sum(v for _, v in result) > 0.99
self._pl_print_cvs(result)
[docs]
def _pl_print_cvs(self, result: List[Tuple[str, float]]) -> None:
print("Language, Fraction")
for lang in result:
print(f"{lang[0]}, {lang[1]}")
[docs]
def run(self, args_dict: Dict[str, Any]) -> None:
if args_dict.get("analysis", None):
getattr(self, args_dict.get("analysis", None))()
else:
self.create_dashboard()