#!/usr/bin/env python3 # /// script # requires-python = ">=3.11" # dependencies = [ # "bs4", # ] # /// """ odoc2json.py - Convert odoc JSON output to structured JSON records This script parses the JSON output files from odoc-driver (an OCaml documentation generator) and converts them into structured JSON records that include package name, version, and each function signature with associated documentation. The output is intended for further processing, analysis, and search over OCaml type signatures, especially for loading into columnar formats like Parquet. """ import os import json import re import time import multiprocessing as mp from bs4 import BeautifulSoup from typing import Dict, List, Any, Optional, Tuple import argparse from pathlib import Path from functools import partial def extract_package_info(path: str, mode: str = 'full', override_package_name: Optional[str] = None, override_package_version: Optional[str] = None) -> Tuple[str, str]: """ Extract package name and version from the path. Args: path: Path to the odoc output directory mode: Operating mode - 'full' for full packages list, 'single' for a single package override_package_name: Optional override for package name override_package_version: Optional override for package version Returns: Tuple of (package_name, package_version) """ # Always prioritize explicit overrides if provided if override_package_name: package_name = override_package_name else: package_name = "unknown" if override_package_version: package_version = override_package_version else: package_version = "unknown" # If we have both overrides, no need to analyze path if override_package_name and override_package_version: return package_name, package_version # Use Path for more reliable path parsing p = Path(path).resolve() parts = list(p.parts) if mode == 'single': # In single package mode, the package name is typically the directory name if not override_package_name and parts: # Extract package name from the last part of the path package_name = parts[-1] # Check if there's a subdirectory in the path that seems like a package name subdir = next((d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))), None) if subdir: package_name = subdir elif mode == 'full': # In full mode, we need to look at the directory structure more carefully # For test/ directory, the structure is test/package-name/package-version/ # First, check if the directory structure matches the expected pattern # Look for subdirectories in the current path try: subdirs = [d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))] # If we have subdirectories that might be package names if subdirs and not override_package_name: # For each subdirectory (potential package name), check if it contains version subdirectories for subdir in subdirs: version_dirs = [d for d in os.listdir(os.path.join(path, subdir)) if os.path.isdir(os.path.join(path, subdir, d))] # If this subdirectory contains potential version directories, it's likely a package if version_dirs: # We'll use the current file's path to determine which package and version it belongs to # We're processing files at the specific file level elsewhere, so here we just return # default values which will be overridden during actual file processing return subdir, "unknown" # If we found no package structure or we're processing a file already in a package context # In this case, we'll determine package/version from the path of the file being processed if len(parts) >= 3: # Path structure might be test/package-name/version/... # Check if the first part is "test" if parts[-3] == "test" or "test" in str(p): package_name = parts[-2] if not override_package_name else package_name package_version = parts[-1] if not override_package_version else package_version else: # Standard structure: .../package-name/package-version/... package_name = parts[-2] if not override_package_name else package_name package_version = parts[-1] if not override_package_version else package_version except (FileNotFoundError, PermissionError) as e: # Handle cases where we can't access the directory print(f"Error accessing directory {path}: {str(e)}") return package_name, package_version def parse_html_content(content: str) -> List[Dict[str, Any]]: """ Parse the HTML content from the odoc JSON to extract signatures and documentation. Args: content: HTML content from the odoc JSON file Returns: List of dictionaries containing extracted information """ soup = BeautifulSoup(content, 'html.parser') result = [] # Process each specification block (function, type, module, etc.) for spec in soup.find_all(class_="odoc-spec"): item = {} # Get the spec element (contains the signature) spec_elem = spec.find(class_="spec") if not spec_elem: continue # Determine the kind of element kind = None for cls in spec_elem.get('class', []): if cls in ['type', 'value', 'module', 'class', 'exception', 'constructor']: kind = cls break if not kind: continue item['kind'] = kind # Extract the signature first to use for name extraction if needed code_elem = spec_elem.find('code') signature = "" if code_elem: # Get the full signature text and strip all newlines and normalize whitespace signature = code_elem.get_text() # Extract the name name = None # First try to get name from anchor ID anchor = spec_elem.find('a', class_="anchor") if anchor and anchor.get('id'): item_id = anchor.get('id') # Clean up the ID to get the name name = item_id.split('.')[-1] if '.' in item_id else item_id # Remove prefixes like 'type-', 'val-', etc. name = re.sub(r'^(type|val|module|class|exception)-', '', name) # For values (functions), extract the name from signature as a fallback # This handles cases where the anchor doesn't contain the function name if kind == 'value' and not name and signature: # Look for "val name :" pattern in the signature val_match = re.search(r'val\s+(\w+)\s*:', signature) if val_match: name = val_match.group(1) if name: item['name'] = name # Add the processed signature if signature: # Replace newlines and multiple whitespace with a single space signature = re.sub(r'\s+', ' ', signature) item['signature'] = signature.strip() # Extract documentation doc_elem = spec.find(class_="spec-doc") if doc_elem: # Get the raw HTML content and remove all HTML tags html_content = str(doc_elem) # First, convert
tags to spaces html_content = re.sub(r'', ' ', html_content) # Parse the modified HTML soup_doc = BeautifulSoup(html_content, 'html.parser') # Get text with all whitespace normalized doc = soup_doc.get_text() # Replace all newlines and multiple spaces with a single space doc = re.sub(r'\s+', ' ', doc) item['documentation'] = doc.strip() # Add the item to our results result.append(item) return result def process_json_file(file_path: str, package_name: str, package_version: str) -> List[Dict[str, Any]]: """ Process a single odoc JSON file and extract the relevant information. Args: file_path: Path to the JSON file package_name: Name of the package package_version: Version of the package Returns: List of dictionaries containing extracted information """ # Extract package and version from file path if not already properly set if package_version == "unknown" or package_name == "unknown": # Check if this file is in a test directory structure file_path_parts = Path(file_path).resolve().parts # Look for test/package-name/version pattern in the path for i, part in enumerate(file_path_parts): if part == "test" and i + 2 < len(file_path_parts): # We found a test directory, extract package name and version package_name = file_path_parts[i + 1] package_version = file_path_parts[i + 2] break try: with open(file_path, 'r', encoding='utf-8') as f: try: data = json.load(f) except json.JSONDecodeError: print(f"Error decoding JSON from {file_path}") return [] except UnicodeDecodeError: # Try opening with a different encoding or with errors='ignore' try: with open(file_path, 'r', encoding='latin-1') as f: try: data = json.load(f) except json.JSONDecodeError: print(f"Error decoding JSON from {file_path} with latin-1 encoding") return [] except Exception as e: print(f"Error reading {file_path}: {str(e)}") return [] if 'content' not in data: return [] # Extract module path from breadcrumbs module_path = [] if 'breadcrumbs' in data: for crumb in data['breadcrumbs']: if crumb.get('kind') == 'module': module_path.append(crumb.get('name')) module_name = ".".join(module_path) if module_path else os.path.basename(os.path.dirname(file_path)) # Extract items from the content items = parse_html_content(data['content']) # Add package and module information to each item for item in items: item['package_name'] = package_name item['package_version'] = package_version item['module_name'] = module_name # Create a full path for the item that includes the item name # - module_name: just the module hierarchy (e.g., "Math.Operations") # - full_path: complete path including item name (e.g., "Math.Operations.add") if 'name' in item: item['full_path'] = f"{module_name}.{item['name']}" else: item['full_path'] = module_name return items def worker_process_files(file_batch, package_name, package_version): """ Worker function to process a batch of files in parallel. Args: file_batch: List of files to process package_name: Name of the package package_version: Version of the package Returns: List of all extracted items from all files in the batch """ batch_items = [] for file_path in file_batch: items = process_json_file(file_path, package_name, package_version) batch_items.extend(items) return batch_items def collect_json_files(directory): """ Collect all JSON files in a directory recursively. Args: directory: Path to the directory to search Returns: List of file paths """ json_files = [] for root, _, files in os.walk(directory): for file in files: if file.endswith('.html.json'): json_files.append(os.path.join(root, file)) return json_files def process_directory(directory: str, mode: str = 'full', override_package_name: Optional[str] = None, override_package_version: Optional[str] = None, num_workers: int = 1) -> List[Dict[str, Any]]: """ Process all JSON files in a directory recursively using multiple processes. Args: directory: Path to the directory containing odoc JSON files mode: Operating mode - 'full' for full packages list, 'single' for a single package override_package_name: Optional override for package name override_package_version: Optional override for package version num_workers: Number of worker processes to use Returns: List of all extracted items from all files """ package_name, package_version = extract_package_info( directory, mode=mode, override_package_name=override_package_name, override_package_version=override_package_version ) # Collect all JSON files json_files = collect_json_files(directory) total_files = len(json_files) if total_files == 0: print(f"No .html.json files found in {directory}") return [] mode_str = "single package mode" if mode == 'single' else "full packages mode" print(f"Processing {total_files} files from {package_name} {package_version} in {mode_str}...") print(f"Using {num_workers} worker processes") # Split files into batches for workers batches = [] batch_size = max(1, total_files // num_workers) for i in range(0, total_files, batch_size): batches.append(json_files[i:i + batch_size]) # Create partial function with fixed package name and version process_batch = partial(worker_process_files, package_name=package_name, package_version=package_version) # Process batches in parallel start_time = time.time() all_items = [] if num_workers > 1: # Use multiprocessing Pool with mp.Pool(processes=num_workers) as pool: # Submit all batches to the pool results = pool.map(process_batch, batches) # Collect all results for batch_result in results: all_items.extend(batch_result) else: # Single process mode all_items = process_batch(json_files) elapsed_time = time.time() - start_time print(f"\nCompleted processing {total_files} files in {elapsed_time:.2f} seconds") print(f"Extracted {len(all_items)} items total") return all_items def main(): """ Main entry point for the script. Usage examples: # Process in full mode (multiple packages) python odoc2json.py /path/to/odoc/output output.json # Process a single package with automatic detection python odoc2json.py /path/to/odoc/package output.json --mode single # Process with explicit package name and version python odoc2json.py /path/to/odoc/package output.json --mode single --package-name package-name --package-version 5.0.0 # Process with multiple cores python odoc2json.py /path/to/odoc/output output.json --workers 8 """ parser = argparse.ArgumentParser(description='Convert odoc JSON to structured JSON records') parser.add_argument('input_dir', help='Directory containing odoc JSON output') parser.add_argument('output_file', help='Output JSON file path') parser.add_argument('--pretty', action='store_true', help='Pretty-print the JSON output') parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose output') parser.add_argument('--mode', choices=['full', 'single'], default='full', help='Run mode: "full" for complete list of packages, "single" for a single package') parser.add_argument('--package-name', help='Override the package name (useful in single mode)') parser.add_argument('--package-version', help='Override the package version (useful in single mode)') parser.add_argument('--workers', type=int, default=mp.cpu_count(), help=f'Number of worker processes (default: {mp.cpu_count()})') args = parser.parse_args() start_time = time.time() print(f"Starting extraction from {args.input_dir} in {args.mode} mode") # Process all files in the directory with multiple workers items = process_directory( args.input_dir, mode=args.mode, override_package_name=args.package_name, override_package_version=args.package_version, num_workers=args.workers ) # Write the output print(f"Writing {len(items)} items to {args.output_file}...") with open(args.output_file, 'w', encoding='utf-8') as f: if args.pretty: json.dump(items, f, indent=2, ensure_ascii=False) else: json.dump(items, f, ensure_ascii=False) elapsed_time = time.time() - start_time print(f"Processed {len(items)} items in {elapsed_time:.2f} seconds") print(f"Output saved to {args.output_file}") if __name__ == "__main__": main()