Source code for src.interfaces
"""Collection of all interfaces"""
import json
import logging
import re
from pathlib import Path
from typing import Any, Dict, Generator, List, Optional
import jsonschema
from git.repo import Repo
from gitlab import Gitlab
from gitlab.v4.objects import Project
from jsonschema.exceptions import SchemaError, ValidationError
from src.config import context
from src.utils import camel_to_snake
logger: logging.Logger = logging.getLogger(__name__)
[docs]
class Named:
@classmethod
@property
def name(cls) -> str:
return cls.__name__
[docs]
class CheckInterface(Named):
"""
Represents a check that can be applied to a repository
@schema: JSON schema that results of this check must adhere to
"""
# regex to match filenames to skip
exclude: Optional[re.Pattern[str]] = None
[docs]
def __init__(self, proj: Project, repo: Repo, api: Gitlab):
self.schema: Dict[str, Any] = self._load_results_schema()
self.proj: Project = proj
self.repo: Repo = repo
self.api: Gitlab = api
[docs]
def _get_resource_dir(self) -> Path:
"""
:return: The root of this check's personal resource directory.
"""
# DEFAULT section gets thrown into the following section's name space
# so this will break if I change the section order in the config file
return (
context.settings["local_repo_db_resources_dir"]
/ "checks"
/ camel_to_snake(self.name)
)
[docs]
def _load_results_schema(self) -> Dict[str, Any]:
schema_path: Path = Path(
context.settings["local_repo_db_resources_dir"]
) / Path(f"schemas/check_{self.name}_output_format.json")
logger.info(
f"Loading results schema for {self.name} from {schema_path}"
)
with schema_path.open(mode="r") as f:
return json.load(f)
[docs]
def _gen_file_list(self) -> Generator[Path, None, None]:
"""
Helper to generate a list of all _relevant_ files in a project.
Skips all files whose
- name matches the `self.exclude` pattern
- path relative to the project root matches `self.exclude_path`
pattern
:return: Iterator over all files in the project
"""
try:
logger.debug(f"Assuming fs repo base at {self.repo.working_dir}")
trees: Optional[List[Path]] = [Path(str(self.repo.working_dir))]
except ValueError as e:
logger.error(
f"Unable to obtain tree for {self.proj.id} at "
f"{self.repo}: {e}"
)
trees: Optional[List[Path]] = None
while trees:
tree: Path = trees.pop()
for f in tree.iterdir():
if self.exclude and re.search(self.exclude, f.name):
continue
if f.is_dir():
trees.append(f)
continue
logger.debug(f"{f.as_posix()}")
yield f
# pylint: disable=unused-argument
[docs]
def run(
self,
args_dict: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
logger.info(f"Running check {self.name} on {self.proj.id}")
return self.description
@property
def description(self) -> Dict[str, Any]:
return {"id": self.proj.id, "check": self.name}
[docs]
def results_valid(self, results: Dict[str, Any]) -> bool:
"""
Validates the tool-specific results of a run against
the corresponding JSON schema
returns: true iff the results match the schema
"""
logger.info("Validating check-specific results against schema")
try:
jsonschema.validate(results, self.schema)
return True
except ValidationError:
logger.error(
"Check-specific results do not conform to expected schema"
)
except SchemaError:
logger.error("Check results schema is not valid")
return False