#!/usr/bin/env python3
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "bs4",
# ]
# ///
"""
odoc2json.py - Convert odoc JSON output to structured JSON records
This script parses the JSON output files from odoc-driver (an OCaml documentation
generator) and converts them into structured JSON records that include package name,
version, and each function signature with associated documentation.
The output is intended for further processing, analysis, and search over OCaml type
signatures, especially for loading into columnar formats like Parquet.
"""
import os
import json
import re
import time
import multiprocessing as mp
from bs4 import BeautifulSoup
from typing import Dict, List, Any, Optional, Tuple
import argparse
from pathlib import Path
from functools import partial
def extract_package_info(path: str, mode: str = 'full',
override_package_name: Optional[str] = None,
override_package_version: Optional[str] = None) -> Tuple[str, str]:
"""
Extract package name and version from the path.
Args:
path: Path to the odoc output directory
mode: Operating mode - 'full' for full packages list, 'single' for a single package
override_package_name: Optional override for package name
override_package_version: Optional override for package version
Returns:
Tuple of (package_name, package_version)
"""
# Always prioritize explicit overrides if provided
if override_package_name:
package_name = override_package_name
else:
package_name = "unknown"
if override_package_version:
package_version = override_package_version
else:
package_version = "unknown"
# If we have both overrides, no need to analyze path
if override_package_name and override_package_version:
return package_name, package_version
# Use Path for more reliable path parsing
p = Path(path).resolve()
parts = list(p.parts)
if mode == 'single':
# In single package mode, the package name is typically the directory name
if not override_package_name and parts:
# Extract package name from the last part of the path
package_name = parts[-1]
# Check if there's a subdirectory in the path that seems like a package name
subdir = next((d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))), None)
if subdir:
package_name = subdir
elif mode == 'full':
# In full mode, we need to look at the directory structure more carefully
# For test/ directory, the structure is test/package-name/package-version/
# First, check if the directory structure matches the expected pattern
# Look for subdirectories in the current path
try:
subdirs = [d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))]
# If we have subdirectories that might be package names
if subdirs and not override_package_name:
# For each subdirectory (potential package name), check if it contains version subdirectories
for subdir in subdirs:
version_dirs = [d for d in os.listdir(os.path.join(path, subdir))
if os.path.isdir(os.path.join(path, subdir, d))]
# If this subdirectory contains potential version directories, it's likely a package
if version_dirs:
# We'll use the current file's path to determine which package and version it belongs to
# We're processing files at the specific file level elsewhere, so here we just return
# default values which will be overridden during actual file processing
return subdir, "unknown"
# If we found no package structure or we're processing a file already in a package context
# In this case, we'll determine package/version from the path of the file being processed
if len(parts) >= 3:
# Path structure might be test/package-name/version/...
# Check if the first part is "test"
if parts[-3] == "test" or "test" in str(p):
package_name = parts[-2] if not override_package_name else package_name
package_version = parts[-1] if not override_package_version else package_version
else:
# Standard structure: .../package-name/package-version/...
package_name = parts[-2] if not override_package_name else package_name
package_version = parts[-1] if not override_package_version else package_version
except (FileNotFoundError, PermissionError) as e:
# Handle cases where we can't access the directory
print(f"Error accessing directory {path}: {str(e)}")
return package_name, package_version
def parse_html_content(content: str) -> List[Dict[str, Any]]:
"""
Parse the HTML content from the odoc JSON to extract signatures and documentation.
Args:
content: HTML content from the odoc JSON file
Returns:
List of dictionaries containing extracted information
"""
soup = BeautifulSoup(content, 'html.parser')
result = []
# Process each specification block (function, type, module, etc.)
for spec in soup.find_all(class_="odoc-spec"):
item = {}
# Get the spec element (contains the signature)
spec_elem = spec.find(class_="spec")
if not spec_elem:
continue
# Determine the kind of element
kind = None
for cls in spec_elem.get('class', []):
if cls in ['type', 'value', 'module', 'class', 'exception', 'constructor']:
kind = cls
break
if not kind:
continue
item['kind'] = kind
# Extract the signature first to use for name extraction if needed
code_elem = spec_elem.find('code')
signature = ""
if code_elem:
# Get the full signature text and strip all newlines and normalize whitespace
signature = code_elem.get_text()
# Extract the name
name = None
# First try to get name from anchor ID
anchor = spec_elem.find('a', class_="anchor")
if anchor and anchor.get('id'):
item_id = anchor.get('id')
# Clean up the ID to get the name
name = item_id.split('.')[-1] if '.' in item_id else item_id
# Remove prefixes like 'type-', 'val-', etc.
name = re.sub(r'^(type|val|module|class|exception)-', '', name)
# For values (functions), extract the name from signature as a fallback
# This handles cases where the anchor doesn't contain the function name
if kind == 'value' and not name and signature:
# Look for "val name :" pattern in the signature
val_match = re.search(r'val\s+(\w+)\s*:', signature)
if val_match:
name = val_match.group(1)
if name:
item['name'] = name
# Add the processed signature
if signature:
# Replace newlines and multiple whitespace with a single space
signature = re.sub(r'\s+', ' ', signature)
item['signature'] = signature.strip()
# Extract documentation
doc_elem = spec.find(class_="spec-doc")
if doc_elem:
# Get the raw HTML content and remove all HTML tags
html_content = str(doc_elem)
# First, convert
tags to spaces
html_content = re.sub(r'
', ' ', html_content)
# Parse the modified HTML
soup_doc = BeautifulSoup(html_content, 'html.parser')
# Get text with all whitespace normalized
doc = soup_doc.get_text()
# Replace all newlines and multiple spaces with a single space
doc = re.sub(r'\s+', ' ', doc)
item['documentation'] = doc.strip()
# Add the item to our results
result.append(item)
return result
def process_json_file(file_path: str, package_name: str, package_version: str) -> List[Dict[str, Any]]:
"""
Process a single odoc JSON file and extract the relevant information.
Args:
file_path: Path to the JSON file
package_name: Name of the package
package_version: Version of the package
Returns:
List of dictionaries containing extracted information
"""
# Extract package and version from file path if not already properly set
if package_version == "unknown" or package_name == "unknown":
# Check if this file is in a test directory structure
file_path_parts = Path(file_path).resolve().parts
# Look for test/package-name/version pattern in the path
for i, part in enumerate(file_path_parts):
if part == "test" and i + 2 < len(file_path_parts):
# We found a test directory, extract package name and version
package_name = file_path_parts[i + 1]
package_version = file_path_parts[i + 2]
break
try:
with open(file_path, 'r', encoding='utf-8') as f:
try:
data = json.load(f)
except json.JSONDecodeError:
print(f"Error decoding JSON from {file_path}")
return []
except UnicodeDecodeError:
# Try opening with a different encoding or with errors='ignore'
try:
with open(file_path, 'r', encoding='latin-1') as f:
try:
data = json.load(f)
except json.JSONDecodeError:
print(f"Error decoding JSON from {file_path} with latin-1 encoding")
return []
except Exception as e:
print(f"Error reading {file_path}: {str(e)}")
return []
if 'content' not in data:
return []
# Extract module path from breadcrumbs
module_path = []
if 'breadcrumbs' in data:
for crumb in data['breadcrumbs']:
if crumb.get('kind') == 'module':
module_path.append(crumb.get('name'))
module_name = ".".join(module_path) if module_path else os.path.basename(os.path.dirname(file_path))
# Extract items from the content
items = parse_html_content(data['content'])
# Add package and module information to each item
for item in items:
item['package_name'] = package_name
item['package_version'] = package_version
item['module_name'] = module_name
# Create a full path for the item that includes the item name
# - module_name: just the module hierarchy (e.g., "Math.Operations")
# - full_path: complete path including item name (e.g., "Math.Operations.add")
if 'name' in item:
item['full_path'] = f"{module_name}.{item['name']}"
else:
item['full_path'] = module_name
return items
def worker_process_files(file_batch, package_name, package_version):
"""
Worker function to process a batch of files in parallel.
Args:
file_batch: List of files to process
package_name: Name of the package
package_version: Version of the package
Returns:
List of all extracted items from all files in the batch
"""
batch_items = []
for file_path in file_batch:
items = process_json_file(file_path, package_name, package_version)
batch_items.extend(items)
return batch_items
def collect_json_files(directory):
"""
Collect all JSON files in a directory recursively.
Args:
directory: Path to the directory to search
Returns:
List of file paths
"""
json_files = []
for root, _, files in os.walk(directory):
for file in files:
if file.endswith('.html.json'):
json_files.append(os.path.join(root, file))
return json_files
def process_directory(directory: str, mode: str = 'full',
override_package_name: Optional[str] = None,
override_package_version: Optional[str] = None,
num_workers: int = 1) -> List[Dict[str, Any]]:
"""
Process all JSON files in a directory recursively using multiple processes.
Args:
directory: Path to the directory containing odoc JSON files
mode: Operating mode - 'full' for full packages list, 'single' for a single package
override_package_name: Optional override for package name
override_package_version: Optional override for package version
num_workers: Number of worker processes to use
Returns:
List of all extracted items from all files
"""
package_name, package_version = extract_package_info(
directory,
mode=mode,
override_package_name=override_package_name,
override_package_version=override_package_version
)
# Collect all JSON files
json_files = collect_json_files(directory)
total_files = len(json_files)
if total_files == 0:
print(f"No .html.json files found in {directory}")
return []
mode_str = "single package mode" if mode == 'single' else "full packages mode"
print(f"Processing {total_files} files from {package_name} {package_version} in {mode_str}...")
print(f"Using {num_workers} worker processes")
# Split files into batches for workers
batches = []
batch_size = max(1, total_files // num_workers)
for i in range(0, total_files, batch_size):
batches.append(json_files[i:i + batch_size])
# Create partial function with fixed package name and version
process_batch = partial(worker_process_files, package_name=package_name, package_version=package_version)
# Process batches in parallel
start_time = time.time()
all_items = []
if num_workers > 1:
# Use multiprocessing Pool
with mp.Pool(processes=num_workers) as pool:
# Submit all batches to the pool
results = pool.map(process_batch, batches)
# Collect all results
for batch_result in results:
all_items.extend(batch_result)
else:
# Single process mode
all_items = process_batch(json_files)
elapsed_time = time.time() - start_time
print(f"\nCompleted processing {total_files} files in {elapsed_time:.2f} seconds")
print(f"Extracted {len(all_items)} items total")
return all_items
def main():
"""
Main entry point for the script.
Usage examples:
# Process in full mode (multiple packages)
python odoc2json.py /path/to/odoc/output output.json
# Process a single package with automatic detection
python odoc2json.py /path/to/odoc/package output.json --mode single
# Process with explicit package name and version
python odoc2json.py /path/to/odoc/package output.json --mode single --package-name package-name --package-version 5.0.0
# Process with multiple cores
python odoc2json.py /path/to/odoc/output output.json --workers 8
"""
parser = argparse.ArgumentParser(description='Convert odoc JSON to structured JSON records')
parser.add_argument('input_dir', help='Directory containing odoc JSON output')
parser.add_argument('output_file', help='Output JSON file path')
parser.add_argument('--pretty', action='store_true', help='Pretty-print the JSON output')
parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose output')
parser.add_argument('--mode', choices=['full', 'single'], default='full',
help='Run mode: "full" for complete list of packages, "single" for a single package')
parser.add_argument('--package-name', help='Override the package name (useful in single mode)')
parser.add_argument('--package-version', help='Override the package version (useful in single mode)')
parser.add_argument('--workers', type=int, default=mp.cpu_count(),
help=f'Number of worker processes (default: {mp.cpu_count()})')
args = parser.parse_args()
start_time = time.time()
print(f"Starting extraction from {args.input_dir} in {args.mode} mode")
# Process all files in the directory with multiple workers
items = process_directory(
args.input_dir,
mode=args.mode,
override_package_name=args.package_name,
override_package_version=args.package_version,
num_workers=args.workers
)
# Write the output
print(f"Writing {len(items)} items to {args.output_file}...")
with open(args.output_file, 'w', encoding='utf-8') as f:
if args.pretty:
json.dump(items, f, indent=2, ensure_ascii=False)
else:
json.dump(items, f, ensure_ascii=False)
elapsed_time = time.time() - start_time
print(f"Processed {len(items)} items in {elapsed_time:.2f} seconds")
print(f"Output saved to {args.output_file}")
if __name__ == "__main__":
main()