#!/usr/bin/env python3
"""Pre-install static analysis of pip package contents.

Examines downloaded sdist/wheel for suspicious patterns commonly found in
supply chain attacks (setup.py code execution, network calls, credential
harvesting, encoded payloads, etc.)
"""

import json
import os
import re
import sys
import tarfile
import zipfile
from pathlib import Path

# Patterns that indicate potentially malicious behavior in setup.py / build scripts
SUSPICIOUS_PATTERNS = {
    "base64_decode": {
        "pattern": r"base64\.(b64decode|decodebytes|decodestring)",
        "severity": "HIGH",
        "description": "Base64 decoding - common obfuscation technique in malicious packages",
    },
    "exec_eval": {
        "pattern": r"\b(exec|eval|compile)\s*\(",
        "severity": "HIGH",
        "description": "Dynamic code execution - can hide arbitrary payloads",
    },
    "subprocess_spawn": {
        "pattern": r"(subprocess\.(run|Popen|call|check_output)|os\.(system|popen|exec[lv]p?e?))\s*\(",
        "severity": "HIGH",
        "description": "Subprocess execution during install",
    },
    "network_import": {
        "pattern": r"\b(import\s+(urllib|requests|httplib|http\.client|socket|aiohttp)|from\s+(urllib|requests|httplib|http\.client|socket|aiohttp)\s+import)",
        "severity": "HIGH",
        "description": "Network library import - package may exfiltrate data during install",
    },
    "network_call": {
        "pattern": r"(urlopen|requests\.(get|post|put)|http\.client\.HTTP|socket\.socket)\s*\(",
        "severity": "CRITICAL",
        "description": "Active network call during install",
    },
    "credential_paths": {
        "pattern": r"(\.ssh/|\.aws/|\.gnupg/|\.kube/config|\.docker/config|\.npmrc|\.pypirc|/etc/shadow|/etc/passwd)",
        "severity": "CRITICAL",
        "description": "References to credential/secret file paths",
    },
    "env_credential_harvest": {
        "pattern": r"os\.environ\.(get|items|\[).*?(TOKEN|SECRET|KEY|PASSWORD|CREDENTIAL|API_KEY|AWS_|GITHUB_|SSH_)",
        "severity": "CRITICAL",
        "description": "Environment variable credential harvesting",
    },
    "powershell": {
        "pattern": r"(powershell|pwsh|cmd\.exe|/bin/sh\s+-c|/bin/bash\s+-c)",
        "severity": "MEDIUM",
        "description": "Shell invocation from Python",
    },
    "dns_exfil": {
        "pattern": r"(socket\.getaddrinfo|socket\.gethostbyname|dns\.resolver)",
        "severity": "HIGH",
        "description": "DNS resolution - possible DNS-based data exfiltration",
    },
    "file_write_system": {
        "pattern": r"open\s*\(\s*['\"]/(usr|etc|tmp|var|home)",
        "severity": "MEDIUM",
        "description": "Writing to system directories during install",
    },
    "pth_file": {
        "pattern": r"\.pth['\"]",
        "severity": "HIGH",
        "description": ".pth file manipulation - can execute code on interpreter startup",
    },
    "encoded_payload": {
        "pattern": r"(\\x[0-9a-fA-F]{2}){8,}",
        "severity": "HIGH",
        "description": "Long hex-encoded payload",
    },
    "install_cmdclass": {
        "pattern": r"cmdclass\s*=\s*\{",
        "severity": "MEDIUM",
        "description": "Custom install command class - runs arbitrary code during install",
    },
    "platform_detect": {
        "pattern": r"(platform\.(system|machine|node)|sys\.platform|os\.name)",
        "severity": "LOW",
        "description": "Platform detection - may be targeting specific OS",
    },
}

# Files of interest for analysis
INSTALL_SCRIPT_FILES = {
    "setup.py",
    "setup.cfg",
    "pyproject.toml",
    "__init__.py",
    "conftest.py",
}


def analyze_archive(archive_path: str) -> dict:
    """Extract and analyze a pip package archive."""
    result = {
        "file": str(archive_path),
        "type": None,
        "findings": [],
        "files_analyzed": [],
        "install_scripts": [],
        "metadata": {},
    }

    if archive_path.endswith((".tar.gz", ".tgz")):
        result["type"] = "sdist"
        result = _analyze_tarball(archive_path, result)
    elif archive_path.endswith(".whl") or archive_path.endswith(".zip"):
        result["type"] = "wheel" if archive_path.endswith(".whl") else "sdist-zip"
        result = _analyze_zipfile(archive_path, result)
    else:
        result["type"] = "unknown"
        result["findings"].append({
            "severity": "INFO",
            "message": f"Unknown archive format: {archive_path}",
        })

    return result


def _analyze_tarball(path: str, result: dict) -> dict:
    try:
        with tarfile.open(path, "r:gz") as tf:
            for member in tf.getmembers():
                basename = os.path.basename(member.name)
                if basename in INSTALL_SCRIPT_FILES or member.name.endswith(".py"):
                    try:
                        f = tf.extractfile(member)
                        if f is None:
                            continue
                        content = f.read().decode("utf-8", errors="replace")
                        result["files_analyzed"].append(member.name)

                        if basename in INSTALL_SCRIPT_FILES:
                            result["install_scripts"].append(member.name)

                        findings = _scan_content(content, member.name)
                        result["findings"].extend(findings)
                    except Exception as e:
                        result["findings"].append({
                            "severity": "WARN",
                            "message": f"Could not read {member.name}: {e}",
                        })
    except Exception as e:
        result["findings"].append({
            "severity": "ERROR",
            "message": f"Could not open tarball: {e}",
        })
    return result


def _analyze_zipfile(path: str, result: dict) -> dict:
    try:
        with zipfile.ZipFile(path, "r") as zf:
            for info in zf.infolist():
                basename = os.path.basename(info.filename)
                if basename in INSTALL_SCRIPT_FILES or info.filename.endswith(".py"):
                    try:
                        content = zf.read(info.filename).decode("utf-8", errors="replace")
                        result["files_analyzed"].append(info.filename)

                        if basename in INSTALL_SCRIPT_FILES:
                            result["install_scripts"].append(info.filename)

                        findings = _scan_content(content, info.filename)
                        result["findings"].extend(findings)
                    except Exception as e:
                        result["findings"].append({
                            "severity": "WARN",
                            "message": f"Could not read {info.filename}: {e}",
                        })
    except Exception as e:
        result["findings"].append({
            "severity": "ERROR",
            "message": f"Could not open zip: {e}",
        })
    return result


def _scan_content(content: str, filename: str) -> list:
    findings = []
    for name, rule in SUSPICIOUS_PATTERNS.items():
        matches = list(re.finditer(rule["pattern"], content, re.IGNORECASE))
        if matches:
            # Extract matching lines for context
            lines = content.split("\n")
            match_lines = []
            for m in matches[:5]:  # cap at 5 matches per rule
                line_num = content[:m.start()].count("\n") + 1
                if line_num <= len(lines):
                    match_lines.append({
                        "line": line_num,
                        "content": lines[line_num - 1].strip()[:200],
                    })

            findings.append({
                "rule": name,
                "severity": rule["severity"],
                "description": rule["description"],
                "file": filename,
                "match_count": len(matches),
                "matches": match_lines,
            })
    return findings


def main():
    if len(sys.argv) < 3:
        print(json.dumps({"error": "Usage: analyze.py <download-dir> <package-name>"}))
        sys.exit(1)

    download_dir = sys.argv[1]
    package_name = sys.argv[2]

    analysis = {
        "package": package_name,
        "archives": [],
        "risk_score": 0,
        "risk_level": "UNKNOWN",
        "summary": [],
    }

    # Find downloaded archives
    archives = []
    if os.path.isdir(download_dir):
        for f in os.listdir(download_dir):
            fp = os.path.join(download_dir, f)
            if os.path.isfile(fp):
                archives.append(fp)

    if not archives:
        analysis["summary"].append("No archives found to analyze")
        print(json.dumps(analysis, indent=2))
        return

    # Analyze each archive
    severity_scores = {"CRITICAL": 10, "HIGH": 5, "MEDIUM": 2, "LOW": 1, "INFO": 0, "WARN": 0, "ERROR": 0}
    total_score = 0

    for archive in archives:
        result = analyze_archive(archive)
        analysis["archives"].append(result)

        for finding in result["findings"]:
            sev = finding.get("severity", "INFO")
            total_score += severity_scores.get(sev, 0)

            if sev in ("CRITICAL", "HIGH"):
                analysis["summary"].append(
                    f"[{sev}] {finding['description']} in {finding.get('file', 'unknown')}"
                )

    analysis["risk_score"] = total_score

    if total_score >= 20:
        analysis["risk_level"] = "CRITICAL"
    elif total_score >= 10:
        analysis["risk_level"] = "HIGH"
    elif total_score >= 5:
        analysis["risk_level"] = "MEDIUM"
    elif total_score >= 1:
        analysis["risk_level"] = "LOW"
    else:
        analysis["risk_level"] = "CLEAN"

    print(json.dumps(analysis, indent=2))


if __name__ == "__main__":
    main()
