Skip to main content

Integrity Checker Source Code

#!/usr/bin/env python3
"""
File Integrity Checker v2

Features
--------
- Create a baseline of file hashes for a file or directory
- Compare current state against the saved baseline
- Detect added, modified, and deleted files
- Recursive directory scanning
- Supports multiple hash algorithms
- Optional colored CLI output
- CSV report export
- HTML report export
- Include / exclude glob patterns
- Exclude common noise like .git, __pycache__, .DS_Store
- Optional watch mode with polling

Examples
--------
Create baseline:
python integrity_checker_v2.py baseline /path/to/folder --output baseline.json

Check integrity:
python integrity_checker_v2.py check /path/to/folder --baseline baseline.json

Check with HTML and CSV reports:
python integrity_checker_v2.py check /path/to/folder --baseline baseline.json --html-report report.html --csv-report report.csv

Use include / exclude patterns:
python integrity_checker_v2.py baseline /path/to/folder --include "*.py" --exclude "*.log" --exclude ".git/*"

Watch mode:
python integrity_checker_v2.py watch /path/to/folder --baseline baseline.json --interval 10
"""

from __future__ import annotations

import argparse
import csv
import fnmatch
import hashlib
import html
import json
import os
import sys
import time
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Tuple

SUPPORTED_ALGORITHMS = {"md5", "sha1", "sha256", "sha512"}

DEFAULT_EXCLUDES = [
".git/*",
".git",
"__pycache__/*",
"__pycache__",
".DS_Store",
"*.pyc",
"*.pyo",
"*.swp",
"*.tmp",
"*.temp",
]

try:
from colorama import Fore, Style, init as colorama_init

colorama_init(autoreset=True)
COLOR_ENABLED = True
except Exception:
COLOR_ENABLED = False

class Dummy:
RED = GREEN = YELLOW = CYAN = MAGENTA = WHITE = RESET_ALL = ""

Fore = Style = Dummy()


@dataclass
class FileRecord:
path: str
hash: str
size: int
modified_time: float


@dataclass
class ScanResult:
added: List[str]
deleted: List[str]
modified: List[str]
unchanged: List[str]
errors: List[str]
algorithm: str
root_path: str
checked_at: str


def color_text(text: str, color: str) -> str:
if not COLOR_ENABLED:
return text
return f"{color}{text}{Style.RESET_ALL}"


def utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()


def normalize_path_for_matching(path: str) -> str:
return path.replace(os.sep, "/")


def matches_any_pattern(relative_path: str, patterns: Sequence[str]) -> bool:
normalized = normalize_path_for_matching(relative_path)
return any(fnmatch.fnmatch(normalized, pattern) for pattern in patterns)


def compute_file_hash(file_path: Path, algorithm: str = "sha256", chunk_size: int = 8192) -> str:
if algorithm not in SUPPORTED_ALGORITHMS:
raise ValueError(f"Unsupported algorithm: {algorithm}")

hasher = hashlib.new(algorithm)
with file_path.open("rb") as f:
while chunk := f.read(chunk_size):
hasher.update(chunk)
return hasher.hexdigest()


def collect_files(root_path: Path) -> List[Path]:
if not root_path.exists():
raise FileNotFoundError(f"Path does not exist: {root_path}")

if root_path.is_file():
return [root_path]

return sorted([p for p in root_path.rglob("*") if p.is_file()])


def should_include_file(relative_path: str, include_patterns: Sequence[str], exclude_patterns: Sequence[str]) -> bool:
normalized = normalize_path_for_matching(relative_path)

if exclude_patterns and matches_any_pattern(normalized, exclude_patterns):
return False

if include_patterns:
return matches_any_pattern(normalized, include_patterns)

return True


def create_records(
root_path: Path,
algorithm: str,
include_patterns: Sequence[str],
exclude_patterns: Sequence[str],
) -> Tuple[Dict[str, FileRecord], List[str]]:
files = collect_files(root_path)
records: Dict[str, FileRecord] = {}
errors: List[str] = []

for file_path in files:
try:
if root_path.is_file():
relative_path = file_path.name
else:
relative_path = str(file_path.relative_to(root_path))

if not should_include_file(relative_path, include_patterns, exclude_patterns):
continue

stat = file_path.stat()
file_hash = compute_file_hash(file_path, algorithm)

records[relative_path] = FileRecord(
path=relative_path,
hash=file_hash,
size=stat.st_size,
modified_time=stat.st_mtime,
)
except Exception as exc:
errors.append(f"{file_path}: {exc}")

return records, errors


def load_baseline(baseline_file: Path) -> dict:
if not baseline_file.exists():
raise FileNotFoundError(f"Baseline file not found: {baseline_file}")

try:
return json.loads(baseline_file.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
raise ValueError(f"Invalid baseline JSON: {exc}") from exc


def save_baseline(
root_path: Path,
output_file: Path,
algorithm: str,
include_patterns: Sequence[str],
exclude_patterns: Sequence[str],
) -> None:
records, errors = create_records(root_path, algorithm, include_patterns, exclude_patterns)

baseline = {
"created_at": utc_now_iso(),
"algorithm": algorithm,
"root_path": str(root_path.resolve()),
"include_patterns": list(include_patterns),
"exclude_patterns": list(exclude_patterns),
"files": {path: asdict(record) for path, record in records.items()},
}

output_file.write_text(json.dumps(baseline, indent=2), encoding="utf-8")

print(color_text(f"[+] Baseline created: {output_file}", Fore.GREEN))
print(color_text(f"[+] Files indexed: {len(records)}", Fore.GREEN))

if include_patterns:
print(color_text(f"[i] Include patterns: {', '.join(include_patterns)}", Fore.CYAN))
if exclude_patterns:
print(color_text(f"[i] Exclude patterns: {', '.join(exclude_patterns)}", Fore.CYAN))

if errors:
print(color_text(f"[!] Files with errors: {len(errors)}", Fore.YELLOW))
for err in errors:
print(color_text(f" - {err}", Fore.YELLOW))


def compare_baseline(current_records: Dict[str, FileRecord], baseline_data: dict, errors: List[str]) -> ScanResult:
baseline_files_raw = baseline_data.get("files", {})
baseline_files: Dict[str, FileRecord] = {
path: FileRecord(**record_data) for path, record_data in baseline_files_raw.items()
}

current_paths = set(current_records.keys())
baseline_paths = set(baseline_files.keys())

added = sorted(current_paths - baseline_paths)
deleted = sorted(baseline_paths - current_paths)

modified: List[str] = []
unchanged: List[str] = []

for path in sorted(current_paths & baseline_paths):
if current_records[path].hash != baseline_files[path].hash:
modified.append(path)
else:
unchanged.append(path)

return ScanResult(
added=added,
deleted=deleted,
modified=modified,
unchanged=unchanged,
errors=errors,
algorithm=baseline_data.get("algorithm", "sha256"),
root_path=baseline_data.get("root_path", ""),
checked_at=utc_now_iso(),
)


def print_report(results: ScanResult) -> None:
print(color_text("\n=== Integrity Check Report ===", Fore.CYAN))
print(f"Checked at : {results.checked_at}")
print(f"Algorithm : {results.algorithm}")
print(f"Root path : {results.root_path}")
print(color_text(f"Added files : {len(results.added)}", Fore.GREEN if results.added else Fore.WHITE))
print(color_text(f"Deleted files : {len(results.deleted)}", Fore.YELLOW if results.deleted else Fore.WHITE))
print(color_text(f"Modified files : {len(results.modified)}", Fore.RED if results.modified else Fore.WHITE))
print(f"Unchanged files : {len(results.unchanged)}")
print(color_text(f"Errors : {len(results.errors)}", Fore.MAGENTA if results.errors else Fore.WHITE))

if results.added:
print(color_text("\n[+] Added", Fore.GREEN))
for path in results.added:
print(color_text(f" - {path}", Fore.GREEN))

if results.deleted:
print(color_text("\n[-] Deleted", Fore.YELLOW))
for path in results.deleted:
print(color_text(f" - {path}", Fore.YELLOW))

if results.modified:
print(color_text("\n[!] Modified", Fore.RED))
for path in results.modified:
print(color_text(f" - {path}", Fore.RED))

if results.errors:
print(color_text("\n[!] Errors", Fore.MAGENTA))
for err in results.errors:
print(color_text(f" - {err}", Fore.MAGENTA))

if not (results.added or results.deleted or results.modified or results.errors):
print(color_text("\n[+] Integrity check passed. No changes detected.", Fore.GREEN))


def export_csv_report(results: ScanResult, output_file: Path) -> None:
rows = []

for path in results.added:
rows.append({"status": "added", "path": path})
for path in results.deleted:
rows.append({"status": "deleted", "path": path})
for path in results.modified:
rows.append({"status": "modified", "path": path})
for path in results.unchanged:
rows.append({"status": "unchanged", "path": path})
for err in results.errors:
rows.append({"status": "error", "path": err})

with output_file.open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=["status", "path"])
writer.writeheader()
writer.writerows(rows)

print(color_text(f"[+] CSV report saved: {output_file}", Fore.GREEN))


def render_list_items(items: Sequence[str], css_class: str) -> str:
if not items:
return '<li class="empty">None</li>'
return "\n".join(f'<li class="{css_class}">{html.escape(item)}</li>' for item in items)


def export_html_report(results: ScanResult, output_file: Path) -> None:
html_content = f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>File Integrity Checker Report</title>
<style>
body {{
font-family: Arial, sans-serif;
margin: 2rem;
background: #0f172a;
color: #e2e8f0;
}}
h1, h2 {{
color: #f8fafc;
}}
.card {{
background: #1e293b;
border-radius: 12px;
padding: 1rem 1.25rem;
margin-bottom: 1rem;
box-shadow: 0 6px 24px rgba(0,0,0,0.25);
}}
.stats {{
display: grid;
grid-template-columns: repeat(auto-fit, minmax(160px, 1fr));
gap: 1rem;
}}
.stat {{
background: #334155;
border-radius: 10px;
padding: 1rem;
}}
.added {{ color: #22c55e; }}
.deleted {{ color: #facc15; }}
.modified {{ color: #ef4444; }}
.error {{ color: #d946ef; }}
.empty {{ color: #94a3b8; }}
ul {{
line-height: 1.7;
}}
code {{
background: #0b1220;
padding: 0.15rem 0.35rem;
border-radius: 6px;
}}
</style>
</head>
<body>
<h1>File Integrity Checker Report</h1>

<div class="card">
<p><strong>Checked at:</strong> {html.escape(results.checked_at)}</p>
<p><strong>Algorithm:</strong> <code>{html.escape(results.algorithm)}</code></p>
<p><strong>Root path:</strong> <code>{html.escape(results.root_path)}</code></p>
</div>

<div class="card stats">
<div class="stat"><strong>Added</strong><br>{len(results.added)}</div>
<div class="stat"><strong>Deleted</strong><br>{len(results.deleted)}</div>
<div class="stat"><strong>Modified</strong><br>{len(results.modified)}</div>
<div class="stat"><strong>Unchanged</strong><br>{len(results.unchanged)}</div>
<div class="stat"><strong>Errors</strong><br>{len(results.errors)}</div>
</div>

<div class="card">
<h2>Added Files</h2>
<ul>
{render_list_items(results.added, "added")}
</ul>
</div>

<div class="card">
<h2>Deleted Files</h2>
<ul>
{render_list_items(results.deleted, "deleted")}
</ul>
</div>

<div class="card">
<h2>Modified Files</h2>
<ul>
{render_list_items(results.modified, "modified")}
</ul>
</div>

<div class="card">
<h2>Errors</h2>
<ul>
{render_list_items(results.errors, "error")}
</ul>
</div>
</body>
</html>
"""
output_file.write_text(html_content, encoding="utf-8")
print(color_text(f"[+] HTML report saved: {output_file}", Fore.GREEN))


def generate_reports(results: ScanResult, html_report: Optional[Path], csv_report: Optional[Path]) -> None:
if html_report:
export_html_report(results, html_report)
if csv_report:
export_csv_report(results, csv_report)


def run_check(
root_path: Path,
baseline_file: Path,
html_report: Optional[Path],
csv_report: Optional[Path],
include_patterns_override: Optional[Sequence[str]] = None,
exclude_patterns_override: Optional[Sequence[str]] = None,
) -> Tuple[int, ScanResult]:
baseline_data = load_baseline(baseline_file)
algorithm = baseline_data.get("algorithm", "sha256")

if algorithm not in SUPPORTED_ALGORITHMS:
raise ValueError(f"Baseline uses unsupported algorithm: {algorithm}")

include_patterns = list(include_patterns_override) if include_patterns_override is not None else list(baseline_data.get("include_patterns", []))
exclude_patterns = list(exclude_patterns_override) if exclude_patterns_override is not None else list(baseline_data.get("exclude_patterns", DEFAULT_EXCLUDES))

current_records, errors = create_records(root_path, algorithm, include_patterns, exclude_patterns)
results = compare_baseline(current_records, baseline_data, errors)
print_report(results)
generate_reports(results, html_report, csv_report)

exit_code = 1 if (results.added or results.deleted or results.modified or results.errors) else 0
return exit_code, results


def run_watch(
root_path: Path,
baseline_file: Path,
interval: int,
html_report: Optional[Path],
csv_report: Optional[Path],
include_patterns_override: Optional[Sequence[str]] = None,
exclude_patterns_override: Optional[Sequence[str]] = None,
) -> int:
print(color_text(f"[i] Watch mode started. Checking every {interval} seconds. Press Ctrl+C to stop.", Fore.CYAN))

previous_signature = None

while True:
_, results = run_check(
root_path=root_path,
baseline_file=baseline_file,
html_report=html_report,
csv_report=csv_report,
include_patterns_override=include_patterns_override,
exclude_patterns_override=exclude_patterns_override,
)

signature = (
tuple(results.added),
tuple(results.deleted),
tuple(results.modified),
tuple(results.errors),
)

if signature != previous_signature:
if results.added or results.deleted or results.modified or results.errors:
print(color_text("\n[ALERT] Integrity change detected.", Fore.RED))
else:
print(color_text("\n[OK] No changes detected.", Fore.GREEN))
previous_signature = signature

time.sleep(interval)


def parse_patterns(patterns: Optional[Sequence[str]]) -> List[str]:
if not patterns:
return []
clean = []
for pattern in patterns:
stripped = pattern.strip()
if stripped:
clean.append(stripped)
return clean


def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="File Integrity Checker v2")
subparsers = parser.add_subparsers(dest="command", required=True)

baseline_parser = subparsers.add_parser("baseline", help="Create a new baseline")
baseline_parser.add_argument("path", type=Path, help="File or directory to scan")
baseline_parser.add_argument("--output", type=Path, default=Path("baseline.json"), help="Output baseline JSON file")
baseline_parser.add_argument("--algorithm", choices=sorted(SUPPORTED_ALGORITHMS), default="sha256", help="Hash algorithm")
baseline_parser.add_argument("--include", action="append", help="Include glob pattern (can be used multiple times)")
baseline_parser.add_argument("--exclude", action="append", help="Exclude glob pattern (can be used multiple times)")
baseline_parser.add_argument("--no-default-excludes", action="store_true", help="Disable default excludes like .git and __pycache__")

check_parser = subparsers.add_parser("check", help="Check current files against baseline")
check_parser.add_argument("path", type=Path, help="File or directory to scan")
check_parser.add_argument("--baseline", type=Path, default=Path("baseline.json"), help="Baseline JSON file")
check_parser.add_argument("--html-report", type=Path, help="Optional HTML report output file")
check_parser.add_argument("--csv-report", type=Path, help="Optional CSV report output file")
check_parser.add_argument("--include", action="append", help="Override include glob pattern (can be used multiple times)")
check_parser.add_argument("--exclude", action="append", help="Override exclude glob pattern (can be used multiple times)")
check_parser.add_argument("--no-default-excludes", action="store_true", help="Disable default excludes if no custom excludes are provided")

watch_parser = subparsers.add_parser("watch", help="Watch files continuously against baseline")
watch_parser.add_argument("path", type=Path, help="File or directory to scan")
watch_parser.add_argument("--baseline", type=Path, default=Path("baseline.json"), help="Baseline JSON file")
watch_parser.add_argument("--interval", type=int, default=10, help="Polling interval in seconds")
watch_parser.add_argument("--html-report", type=Path, help="Optional HTML report output file")
watch_parser.add_argument("--csv-report", type=Path, help="Optional CSV report output file")
watch_parser.add_argument("--include", action="append", help="Override include glob pattern (can be used multiple times)")
watch_parser.add_argument("--exclude", action="append", help="Override exclude glob pattern (can be used multiple times)")
watch_parser.add_argument("--no-default-excludes", action="store_true", help="Disable default excludes if no custom excludes are provided")

return parser


def main() -> int:
parser = build_parser()
args = parser.parse_args()

try:
include_patterns = parse_patterns(getattr(args, "include", None))
exclude_patterns = parse_patterns(getattr(args, "exclude", None))

if not exclude_patterns and not getattr(args, "no_default_excludes", False):
exclude_patterns = list(DEFAULT_EXCLUDES)

if args.command == "baseline":
save_baseline(
root_path=args.path,
output_file=args.output,
algorithm=args.algorithm,
include_patterns=include_patterns,
exclude_patterns=exclude_patterns,
)
return 0

if args.command == "check":
exit_code, _ = run_check(
root_path=args.path,
baseline_file=args.baseline,
html_report=args.html_report,
csv_report=args.csv_report,
include_patterns_override=include_patterns if include_patterns else None,
exclude_patterns_override=exclude_patterns if (exclude_patterns or args.no_default_excludes) else None,
)
return exit_code

if args.command == "watch":
return run_watch(
root_path=args.path,
baseline_file=args.baseline,
interval=args.interval,
html_report=args.html_report,
csv_report=args.csv_report,
include_patterns_override=include_patterns if include_patterns else None,
exclude_patterns_override=exclude_patterns if (exclude_patterns or args.no_default_excludes) else None,
)

parser.print_help()
return 2

except KeyboardInterrupt:
print(color_text("\n[!] Operation cancelled by user.", Fore.YELLOW))
return 130
except Exception as exc:
print(color_text(f"[ERROR] {exc}", Fore.RED))
return 2


if __name__ == "__main__":
sys.exit(main())