import os import requests import random import zipfile import io import subprocess import time import shutil from dotenv import load_dotenv # === LOAD CONFIGURATION FROM .env === if not os.path.exists(".env"): print("Warning: .env file not found!") load_dotenv() GITHUB_API_URL = os.getenv("GITHUB_API_URL", "https://api.github.com") SONAR_URL = os.getenv("SONAR_URL", "http://localhost:9000") SONAR_TOKEN = os.getenv("SONAR_TOKEN") GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") HEADERS = {"Authorization": f"token {GITHUB_TOKEN}"} if GITHUB_TOKEN else {} EXTRACT_DIR = "project_repo" # Directory to store the extracted repo # === SUPPORTED LANGUAGES === LANGUAGES = ["PHP", "JavaScript", "Python"] def check_github_rate_limit(): """Check GitHub API rate limits.""" response = requests.get(f"{GITHUB_API_URL}/rate_limit", headers=HEADERS) if response.status_code == 403: print("GitHub API rate limit exceeded. Try again later.") exit(1) def get_random_repo(language, min_stars=0, max_stars=100000): search_url = f"{GITHUB_API_URL}/search/repositories?q=language:{language}+stars:{min_stars}..{max_stars}&sort=stars&order=desc" response = requests.get(search_url, headers=HEADERS).json() if "items" not in response or not response["items"]: print("Error fetching repositories:", response) return None repo_list = response["items"] if not repo_list: print("No repositories found for the given criteria.") return None return random.choice(repo_list)["full_name"] def get_default_branch(repo): url = f"{GITHUB_API_URL}/repos/{repo}" response = requests.get(url, headers=HEADERS).json() return response.get("default_branch", "main") def download_and_extract_repo(repo): branch = get_default_branch(repo) zip_url = f"https://github.com/{repo}/archive/refs/heads/{branch}.zip" response = requests.get(zip_url, headers=HEADERS) if response.status_code != 200: print(f"Error downloading repository {repo}. HTTP {response.status_code}: {response.text}") return None if os.path.exists(EXTRACT_DIR): shutil.rmtree(EXTRACT_DIR) with zipfile.ZipFile(io.BytesIO(response.content), "r") as zip_ref: zip_ref.extractall(EXTRACT_DIR) return os.path.join(EXTRACT_DIR, os.listdir(EXTRACT_DIR)[0]) def get_source_files(project_path, extensions): source_files = [] for root, _, files in os.walk(project_path): for file in files: if file.endswith(tuple(extensions)): source_files.append(os.path.join(root, file)) return source_files def run_sonar_scanner(project_path, project_key): if shutil.which("sonar-scanner") is None: print("Error: SonarScanner is not installed.") return sonar_cmd = [ "sonar-scanner", f"-Dsonar.projectKey={project_key}", f"-Dsonar.sources={project_path}", f"-Dsonar.host.url={SONAR_URL}", f"-Dsonar.login={SONAR_TOKEN}", "-Dsonar.scm.disabled=true", "-Dsonar.exclusions=**/*.md" ] print("Running SonarScanner...") try: subprocess.run(sonar_cmd, check=True) except subprocess.CalledProcessError as e: print(f"SonarQube error skipped: {e}") def get_latest_task_id(project_key): url = f"{SONAR_URL}/api/ce/component?component={project_key}" headers = {"Authorization": f"Bearer {SONAR_TOKEN}"} response = requests.get(url, headers=headers).json() return response.get("current", {}).get("id") def wait_for_sonar_analysis(task_id, max_retries=15, delay=10): url = f"{SONAR_URL}/api/ce/task?id={task_id}" headers = {"Authorization": f"Bearer {SONAR_TOKEN}"} for _ in range(max_retries): response = requests.get(url, headers=headers).json() status = response.get("task", {}).get("status", "PENDING") if status == "SUCCESS": print("SonarQube analysis completed successfully.") return True time.sleep(delay) print("SonarQube analysis did not complete in time.") return False def get_sonar_metrics(project_key): task_id = get_latest_task_id(project_key) if not task_id or not wait_for_sonar_analysis(task_id): return 0, 0 url = f"{SONAR_URL}/api/measures/component" metric_keys = "code_smells,bugs,vulnerabilities,security_hotspots,duplicated_lines_density,sqale_index,ncloc" params = {"component": project_key, "metricKeys": metric_keys} headers = {"Authorization": f"Bearer {SONAR_TOKEN}"} response = requests.get(url, params=params, headers=headers) data = response.json() if "component" in data and "measures" in data["component"]: metrics = {m["metric"]: m.get("value", 0) for m in data["component"]["measures"]} print("SonarQube Metrics:") for key, value in metrics.items(): print(f" {key}: {value}") return metrics.get("code_smells", 0), metrics.get("ncloc", 0) print("Failed to fetch SonarQube metrics.") return 0, 0 if __name__ == "__main__": check_github_rate_limit() selected_language = random.choice(LANGUAGES) repo_name = get_random_repo(selected_language, min_stars=50, max_stars=10000) if repo_name: print(f"Selected {selected_language} Repository: {repo_name}") project_path = download_and_extract_repo(repo_name) if project_path: print(f"Extracted to: {project_path}") extensions = {"PHP": [".php"], "JavaScript": [".js"], "Python": [".py"]} source_files = get_source_files(project_path, extensions[selected_language]) print(f"Found {len(source_files)} {selected_language} files") project_key = repo_name.replace("/", "_") run_sonar_scanner(project_path, project_key) bad_smells, total_lines = get_sonar_metrics(project_key) print(f"Total Bad Smells Detected: {bad_smells}") print(f"Total Lines of Code: {total_lines}") shutil.rmtree(EXTRACT_DIR, ignore_errors=True) print("Cleanup completed!")