Softwaretechnik-II/project/main.py
Jan-Niclas Loosen 88f5a7241c try sonar cube
2025-02-04 10:58:50 +01:00

176 lines
6.0 KiB
Python

import os
import requests
import random
import zipfile
import io
import subprocess
import time
import shutil
from dotenv import load_dotenv
# === LOAD CONFIGURATION FROM .env ===
if not os.path.exists(".env"):
print("Warning: .env file not found!")
load_dotenv()
GITHUB_API_URL = os.getenv("GITHUB_API_URL", "https://api.github.com")
SONAR_URL = os.getenv("SONAR_URL", "http://localhost:9000")
SONAR_TOKEN = os.getenv("SONAR_TOKEN")
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
HEADERS = {"Authorization": f"token {GITHUB_TOKEN}"} if GITHUB_TOKEN else {}
EXTRACT_DIR = "project_repo" # Directory to store the extracted repo
# === SUPPORTED LANGUAGES ===
LANGUAGES = ["PHP", "JavaScript", "Python"]
def check_github_rate_limit():
"""Check GitHub API rate limits."""
response = requests.get(f"{GITHUB_API_URL}/rate_limit", headers=HEADERS)
if response.status_code == 403:
print("GitHub API rate limit exceeded. Try again later.")
exit(1)
def get_random_repo(language, min_stars=0, max_stars=100000):
search_url = f"{GITHUB_API_URL}/search/repositories?q=language:{language}+stars:{min_stars}..{max_stars}&sort=stars&order=desc"
response = requests.get(search_url, headers=HEADERS).json()
if "items" not in response or not response["items"]:
print("Error fetching repositories:", response)
return None
repo_list = response["items"]
if not repo_list:
print("No repositories found for the given criteria.")
return None
return random.choice(repo_list)["full_name"]
def get_default_branch(repo):
url = f"{GITHUB_API_URL}/repos/{repo}"
response = requests.get(url, headers=HEADERS).json()
return response.get("default_branch", "main")
def download_and_extract_repo(repo):
branch = get_default_branch(repo)
zip_url = f"https://github.com/{repo}/archive/refs/heads/{branch}.zip"
response = requests.get(zip_url, headers=HEADERS)
if response.status_code != 200:
print(f"Error downloading repository {repo}. HTTP {response.status_code}: {response.text}")
return None
if os.path.exists(EXTRACT_DIR):
shutil.rmtree(EXTRACT_DIR)
with zipfile.ZipFile(io.BytesIO(response.content), "r") as zip_ref:
zip_ref.extractall(EXTRACT_DIR)
return os.path.join(EXTRACT_DIR, os.listdir(EXTRACT_DIR)[0])
def get_source_files(project_path, extensions):
source_files = []
for root, _, files in os.walk(project_path):
for file in files:
if file.endswith(tuple(extensions)):
source_files.append(os.path.join(root, file))
return source_files
def run_sonar_scanner(project_path, project_key):
if shutil.which("sonar-scanner") is None:
print("Error: SonarScanner is not installed.")
return
sonar_cmd = [
"sonar-scanner",
f"-Dsonar.projectKey={project_key}",
f"-Dsonar.sources={project_path}",
f"-Dsonar.host.url={SONAR_URL}",
f"-Dsonar.login={SONAR_TOKEN}",
"-Dsonar.scm.disabled=true",
"-Dsonar.exclusions=**/*.md"
]
print("Running SonarScanner...")
try:
subprocess.run(sonar_cmd, check=True)
except subprocess.CalledProcessError as e:
print(f"SonarQube error skipped: {e}")
def get_latest_task_id(project_key):
url = f"{SONAR_URL}/api/ce/component?component={project_key}"
headers = {"Authorization": f"Bearer {SONAR_TOKEN}"}
response = requests.get(url, headers=headers).json()
return response.get("current", {}).get("id")
def wait_for_sonar_analysis(task_id, max_retries=15, delay=10):
url = f"{SONAR_URL}/api/ce/task?id={task_id}"
headers = {"Authorization": f"Bearer {SONAR_TOKEN}"}
for _ in range(max_retries):
response = requests.get(url, headers=headers).json()
status = response.get("task", {}).get("status", "PENDING")
if status == "SUCCESS":
print("SonarQube analysis completed successfully.")
return True
time.sleep(delay)
print("SonarQube analysis did not complete in time.")
return False
def get_sonar_metrics(project_key):
task_id = get_latest_task_id(project_key)
if not task_id or not wait_for_sonar_analysis(task_id):
return 0, 0
url = f"{SONAR_URL}/api/measures/component"
metric_keys = "code_smells,bugs,vulnerabilities,security_hotspots,duplicated_lines_density,sqale_index,ncloc"
params = {"component": project_key, "metricKeys": metric_keys}
headers = {"Authorization": f"Bearer {SONAR_TOKEN}"}
response = requests.get(url, params=params, headers=headers)
data = response.json()
if "component" in data and "measures" in data["component"]:
metrics = {m["metric"]: m.get("value", 0) for m in data["component"]["measures"]}
print("SonarQube Metrics:")
for key, value in metrics.items():
print(f" {key}: {value}")
return metrics.get("code_smells", 0), metrics.get("ncloc", 0)
print("Failed to fetch SonarQube metrics.")
return 0, 0
if __name__ == "__main__":
check_github_rate_limit()
selected_language = random.choice(LANGUAGES)
repo_name = get_random_repo(selected_language, min_stars=50, max_stars=10000)
if repo_name:
print(f"Selected {selected_language} Repository: {repo_name}")
project_path = download_and_extract_repo(repo_name)
if project_path:
print(f"Extracted to: {project_path}")
extensions = {"PHP": [".php"], "JavaScript": [".js"], "Python": [".py"]}
source_files = get_source_files(project_path, extensions[selected_language])
print(f"Found {len(source_files)} {selected_language} files")
project_key = repo_name.replace("/", "_")
run_sonar_scanner(project_path, project_key)
bad_smells, total_lines = get_sonar_metrics(project_key)
print(f"Total Bad Smells Detected: {bad_smells}")
print(f"Total Lines of Code: {total_lines}")
shutil.rmtree(EXTRACT_DIR, ignore_errors=True)
print("Cleanup completed!")