Take OCaml odoc output into MCP
at main 18 kB view raw
1#!/usr/bin/env python3 2# /// script 3# requires-python = ">=3.11" 4# dependencies = [ 5# "bs4", 6# ] 7# /// 8""" 9odoc2json.py - Convert odoc JSON output to structured JSON records 10 11This script parses the JSON output files from odoc-driver (an OCaml documentation 12generator) and converts them into structured JSON records that include package name, 13version, and each function signature with associated documentation. 14 15The output is intended for further processing, analysis, and search over OCaml type 16signatures, especially for loading into columnar formats like Parquet. 17""" 18 19import os 20import json 21import re 22import time 23import multiprocessing as mp 24from bs4 import BeautifulSoup 25from typing import Dict, List, Any, Optional, Tuple 26import argparse 27from pathlib import Path 28from functools import partial 29 30 31def extract_package_info(path: str, mode: str = 'full', 32 override_package_name: Optional[str] = None, 33 override_package_version: Optional[str] = None) -> Tuple[str, str]: 34 """ 35 Extract package name and version from the path. 36 37 Args: 38 path: Path to the odoc output directory 39 mode: Operating mode - 'full' for full packages list, 'single' for a single package 40 override_package_name: Optional override for package name 41 override_package_version: Optional override for package version 42 43 Returns: 44 Tuple of (package_name, package_version) 45 """ 46 # Always prioritize explicit overrides if provided 47 if override_package_name: 48 package_name = override_package_name 49 else: 50 package_name = "unknown" 51 52 if override_package_version: 53 package_version = override_package_version 54 else: 55 package_version = "unknown" 56 57 # If we have both overrides, no need to analyze path 58 if override_package_name and override_package_version: 59 return package_name, package_version 60 61 # Use Path for more reliable path parsing 62 p = Path(path).resolve() 63 parts = list(p.parts) 64 65 if mode == 'single': 66 # In single package mode, the package name is typically the directory name 67 if not override_package_name and parts: 68 # Extract package name from the last part of the path 69 package_name = parts[-1] 70 71 # Check if there's a subdirectory in the path that seems like a package name 72 subdir = next((d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))), None) 73 if subdir: 74 package_name = subdir 75 76 elif mode == 'full': 77 # In full mode, we need to look at the directory structure more carefully 78 # For test/ directory, the structure is test/package-name/package-version/ 79 80 # First, check if the directory structure matches the expected pattern 81 # Look for subdirectories in the current path 82 try: 83 subdirs = [d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))] 84 85 # If we have subdirectories that might be package names 86 if subdirs and not override_package_name: 87 # For each subdirectory (potential package name), check if it contains version subdirectories 88 for subdir in subdirs: 89 version_dirs = [d for d in os.listdir(os.path.join(path, subdir)) 90 if os.path.isdir(os.path.join(path, subdir, d))] 91 92 # If this subdirectory contains potential version directories, it's likely a package 93 if version_dirs: 94 # We'll use the current file's path to determine which package and version it belongs to 95 # We're processing files at the specific file level elsewhere, so here we just return 96 # default values which will be overridden during actual file processing 97 return subdir, "unknown" 98 99 # If we found no package structure or we're processing a file already in a package context 100 # In this case, we'll determine package/version from the path of the file being processed 101 if len(parts) >= 3: 102 # Path structure might be test/package-name/version/... 103 # Check if the first part is "test" 104 if parts[-3] == "test" or "test" in str(p): 105 package_name = parts[-2] if not override_package_name else package_name 106 package_version = parts[-1] if not override_package_version else package_version 107 else: 108 # Standard structure: .../package-name/package-version/... 109 package_name = parts[-2] if not override_package_name else package_name 110 package_version = parts[-1] if not override_package_version else package_version 111 except (FileNotFoundError, PermissionError) as e: 112 # Handle cases where we can't access the directory 113 print(f"Error accessing directory {path}: {str(e)}") 114 115 return package_name, package_version 116 117 118def parse_html_content(content: str) -> List[Dict[str, Any]]: 119 """ 120 Parse the HTML content from the odoc JSON to extract signatures and documentation. 121 122 Args: 123 content: HTML content from the odoc JSON file 124 125 Returns: 126 List of dictionaries containing extracted information 127 """ 128 soup = BeautifulSoup(content, 'html.parser') 129 result = [] 130 131 # Process each specification block (function, type, module, etc.) 132 for spec in soup.find_all(class_="odoc-spec"): 133 item = {} 134 135 # Get the spec element (contains the signature) 136 spec_elem = spec.find(class_="spec") 137 if not spec_elem: 138 continue 139 140 # Determine the kind of element 141 kind = None 142 for cls in spec_elem.get('class', []): 143 if cls in ['type', 'value', 'module', 'class', 'exception', 'constructor']: 144 kind = cls 145 break 146 147 if not kind: 148 continue 149 150 item['kind'] = kind 151 152 # Extract the signature first to use for name extraction if needed 153 code_elem = spec_elem.find('code') 154 signature = "" 155 if code_elem: 156 # Get the full signature text and strip all newlines and normalize whitespace 157 signature = code_elem.get_text() 158 159 # Extract the name 160 name = None 161 162 # First try to get name from anchor ID 163 anchor = spec_elem.find('a', class_="anchor") 164 if anchor and anchor.get('id'): 165 item_id = anchor.get('id') 166 # Clean up the ID to get the name 167 name = item_id.split('.')[-1] if '.' in item_id else item_id 168 # Remove prefixes like 'type-', 'val-', etc. 169 name = re.sub(r'^(type|val|module|class|exception)-', '', name) 170 171 # For values (functions), extract the name from signature as a fallback 172 # This handles cases where the anchor doesn't contain the function name 173 if kind == 'value' and not name and signature: 174 # Look for "val name :" pattern in the signature 175 val_match = re.search(r'val\s+(\w+)\s*:', signature) 176 if val_match: 177 name = val_match.group(1) 178 179 if name: 180 item['name'] = name 181 182 # Add the processed signature 183 if signature: 184 # Replace newlines and multiple whitespace with a single space 185 signature = re.sub(r'\s+', ' ', signature) 186 item['signature'] = signature.strip() 187 188 # Extract documentation 189 doc_elem = spec.find(class_="spec-doc") 190 if doc_elem: 191 # Get the raw HTML content and remove all HTML tags 192 html_content = str(doc_elem) 193 # First, convert <br> tags to spaces 194 html_content = re.sub(r'<br\s*/?\s*>', ' ', html_content) 195 # Parse the modified HTML 196 soup_doc = BeautifulSoup(html_content, 'html.parser') 197 # Get text with all whitespace normalized 198 doc = soup_doc.get_text() 199 # Replace all newlines and multiple spaces with a single space 200 doc = re.sub(r'\s+', ' ', doc) 201 item['documentation'] = doc.strip() 202 203 # Add the item to our results 204 result.append(item) 205 206 return result 207 208 209def process_json_file(file_path: str, package_name: str, package_version: str) -> List[Dict[str, Any]]: 210 """ 211 Process a single odoc JSON file and extract the relevant information. 212 213 Args: 214 file_path: Path to the JSON file 215 package_name: Name of the package 216 package_version: Version of the package 217 218 Returns: 219 List of dictionaries containing extracted information 220 """ 221 # Extract package and version from file path if not already properly set 222 if package_version == "unknown" or package_name == "unknown": 223 # Check if this file is in a test directory structure 224 file_path_parts = Path(file_path).resolve().parts 225 226 # Look for test/package-name/version pattern in the path 227 for i, part in enumerate(file_path_parts): 228 if part == "test" and i + 2 < len(file_path_parts): 229 # We found a test directory, extract package name and version 230 package_name = file_path_parts[i + 1] 231 package_version = file_path_parts[i + 2] 232 break 233 234 try: 235 with open(file_path, 'r', encoding='utf-8') as f: 236 try: 237 data = json.load(f) 238 except json.JSONDecodeError: 239 print(f"Error decoding JSON from {file_path}") 240 return [] 241 except UnicodeDecodeError: 242 # Try opening with a different encoding or with errors='ignore' 243 try: 244 with open(file_path, 'r', encoding='latin-1') as f: 245 try: 246 data = json.load(f) 247 except json.JSONDecodeError: 248 print(f"Error decoding JSON from {file_path} with latin-1 encoding") 249 return [] 250 except Exception as e: 251 print(f"Error reading {file_path}: {str(e)}") 252 return [] 253 254 if 'content' not in data: 255 return [] 256 257 # Extract module path from breadcrumbs 258 module_path = [] 259 if 'breadcrumbs' in data: 260 for crumb in data['breadcrumbs']: 261 if crumb.get('kind') == 'module': 262 module_path.append(crumb.get('name')) 263 264 module_name = ".".join(module_path) if module_path else os.path.basename(os.path.dirname(file_path)) 265 266 # Extract items from the content 267 items = parse_html_content(data['content']) 268 269 # Add package and module information to each item 270 for item in items: 271 item['package_name'] = package_name 272 item['package_version'] = package_version 273 item['module_name'] = module_name 274 275 # Create a full path for the item that includes the item name 276 # - module_name: just the module hierarchy (e.g., "Math.Operations") 277 # - full_path: complete path including item name (e.g., "Math.Operations.add") 278 if 'name' in item: 279 item['full_path'] = f"{module_name}.{item['name']}" 280 else: 281 item['full_path'] = module_name 282 283 return items 284 285 286def worker_process_files(file_batch, package_name, package_version): 287 """ 288 Worker function to process a batch of files in parallel. 289 290 Args: 291 file_batch: List of files to process 292 package_name: Name of the package 293 package_version: Version of the package 294 295 Returns: 296 List of all extracted items from all files in the batch 297 """ 298 batch_items = [] 299 for file_path in file_batch: 300 items = process_json_file(file_path, package_name, package_version) 301 batch_items.extend(items) 302 return batch_items 303 304 305def collect_json_files(directory): 306 """ 307 Collect all JSON files in a directory recursively. 308 309 Args: 310 directory: Path to the directory to search 311 312 Returns: 313 List of file paths 314 """ 315 json_files = [] 316 for root, _, files in os.walk(directory): 317 for file in files: 318 if file.endswith('.html.json'): 319 json_files.append(os.path.join(root, file)) 320 return json_files 321 322 323def process_directory(directory: str, mode: str = 'full', 324 override_package_name: Optional[str] = None, 325 override_package_version: Optional[str] = None, 326 num_workers: int = 1) -> List[Dict[str, Any]]: 327 """ 328 Process all JSON files in a directory recursively using multiple processes. 329 330 Args: 331 directory: Path to the directory containing odoc JSON files 332 mode: Operating mode - 'full' for full packages list, 'single' for a single package 333 override_package_name: Optional override for package name 334 override_package_version: Optional override for package version 335 num_workers: Number of worker processes to use 336 337 Returns: 338 List of all extracted items from all files 339 """ 340 package_name, package_version = extract_package_info( 341 directory, 342 mode=mode, 343 override_package_name=override_package_name, 344 override_package_version=override_package_version 345 ) 346 347 # Collect all JSON files 348 json_files = collect_json_files(directory) 349 total_files = len(json_files) 350 351 if total_files == 0: 352 print(f"No .html.json files found in {directory}") 353 return [] 354 355 mode_str = "single package mode" if mode == 'single' else "full packages mode" 356 print(f"Processing {total_files} files from {package_name} {package_version} in {mode_str}...") 357 print(f"Using {num_workers} worker processes") 358 359 # Split files into batches for workers 360 batches = [] 361 batch_size = max(1, total_files // num_workers) 362 for i in range(0, total_files, batch_size): 363 batches.append(json_files[i:i + batch_size]) 364 365 # Create partial function with fixed package name and version 366 process_batch = partial(worker_process_files, package_name=package_name, package_version=package_version) 367 368 # Process batches in parallel 369 start_time = time.time() 370 all_items = [] 371 372 if num_workers > 1: 373 # Use multiprocessing Pool 374 with mp.Pool(processes=num_workers) as pool: 375 # Submit all batches to the pool 376 results = pool.map(process_batch, batches) 377 # Collect all results 378 for batch_result in results: 379 all_items.extend(batch_result) 380 else: 381 # Single process mode 382 all_items = process_batch(json_files) 383 384 elapsed_time = time.time() - start_time 385 print(f"\nCompleted processing {total_files} files in {elapsed_time:.2f} seconds") 386 print(f"Extracted {len(all_items)} items total") 387 return all_items 388 389 390def main(): 391 """ 392 Main entry point for the script. 393 394 Usage examples: 395 396 # Process in full mode (multiple packages) 397 python odoc2json.py /path/to/odoc/output output.json 398 399 # Process a single package with automatic detection 400 python odoc2json.py /path/to/odoc/package output.json --mode single 401 402 # Process with explicit package name and version 403 python odoc2json.py /path/to/odoc/package output.json --mode single --package-name package-name --package-version 5.0.0 404 405 # Process with multiple cores 406 python odoc2json.py /path/to/odoc/output output.json --workers 8 407 """ 408 parser = argparse.ArgumentParser(description='Convert odoc JSON to structured JSON records') 409 parser.add_argument('input_dir', help='Directory containing odoc JSON output') 410 parser.add_argument('output_file', help='Output JSON file path') 411 parser.add_argument('--pretty', action='store_true', help='Pretty-print the JSON output') 412 parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose output') 413 parser.add_argument('--mode', choices=['full', 'single'], default='full', 414 help='Run mode: "full" for complete list of packages, "single" for a single package') 415 parser.add_argument('--package-name', help='Override the package name (useful in single mode)') 416 parser.add_argument('--package-version', help='Override the package version (useful in single mode)') 417 parser.add_argument('--workers', type=int, default=mp.cpu_count(), 418 help=f'Number of worker processes (default: {mp.cpu_count()})') 419 args = parser.parse_args() 420 421 start_time = time.time() 422 print(f"Starting extraction from {args.input_dir} in {args.mode} mode") 423 424 # Process all files in the directory with multiple workers 425 items = process_directory( 426 args.input_dir, 427 mode=args.mode, 428 override_package_name=args.package_name, 429 override_package_version=args.package_version, 430 num_workers=args.workers 431 ) 432 433 # Write the output 434 print(f"Writing {len(items)} items to {args.output_file}...") 435 with open(args.output_file, 'w', encoding='utf-8') as f: 436 if args.pretty: 437 json.dump(items, f, indent=2, ensure_ascii=False) 438 else: 439 json.dump(items, f, ensure_ascii=False) 440 441 elapsed_time = time.time() - start_time 442 print(f"Processed {len(items)} items in {elapsed_time:.2f} seconds") 443 print(f"Output saved to {args.output_file}") 444 445 446if __name__ == "__main__": 447 main()