Take OCaml odoc output into MCP
1#!/usr/bin/env python3 2# /// script 3# requires-python = ">=3.11" 4# dependencies = [ 5# "bs4", 6# ] 7# /// 8""" 9odoc2json.py - Convert odoc JSON output to structured JSON records 10 11This script parses the JSON output files from odoc-driver (an OCaml documentation 12generator) and converts them into structured JSON records that include package name, 13version, and each function signature with associated documentation. 14 15The output is intended for further processing, analysis, and search over OCaml type 16signatures, especially for loading into columnar formats like Parquet. 17""" 18 19import os 20import json 21import re 22import time 23from bs4 import BeautifulSoup 24from typing import Dict, List, Any, Optional, Tuple 25import argparse 26from pathlib import Path 27 28 29def extract_package_info(path: str, mode: str = 'full', 30 override_package_name: Optional[str] = None, 31 override_package_version: Optional[str] = None) -> Tuple[str, str]: 32 """ 33 Extract package name and version from the path. 34 35 Args: 36 path: Path to the odoc output directory 37 mode: Operating mode - 'full' for full packages list, 'single' for a single package 38 override_package_name: Optional override for package name 39 override_package_version: Optional override for package version 40 41 Returns: 42 Tuple of (package_name, package_version) 43 """ 44 # Always prioritize explicit overrides if provided 45 if override_package_name: 46 package_name = override_package_name 47 else: 48 package_name = "unknown" 49 50 if override_package_version: 51 package_version = override_package_version 52 else: 53 package_version = "unknown" 54 55 # If we have both overrides, no need to analyze path 56 if override_package_name and override_package_version: 57 return package_name, package_version 58 59 # Use Path for more reliable path parsing 60 p = Path(path).resolve() 61 parts = list(p.parts) 62 63 if mode == 'single': 64 # In single package mode, the package name is typically the directory name 65 if not override_package_name and parts: 66 # Extract package name from the last part of the path 67 package_name = parts[-1] 68 69 # Check if there's a subdirectory in the path that seems like a package name 70 subdir = next((d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))), None) 71 if subdir: 72 package_name = subdir 73 74 elif mode == 'full': 75 # In full mode, we need to look at the directory structure more carefully 76 # For test/ directory, the structure is test/package-name/package-version/ 77 78 # First, check if the directory structure matches the expected pattern 79 # Look for subdirectories in the current path 80 try: 81 subdirs = [d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))] 82 83 # If we have subdirectories that might be package names 84 if subdirs and not override_package_name: 85 # For each subdirectory (potential package name), check if it contains version subdirectories 86 for subdir in subdirs: 87 version_dirs = [d for d in os.listdir(os.path.join(path, subdir)) 88 if os.path.isdir(os.path.join(path, subdir, d))] 89 90 # If this subdirectory contains potential version directories, it's likely a package 91 if version_dirs: 92 # We'll use the current file's path to determine which package and version it belongs to 93 # We're processing files at the specific file level elsewhere, so here we just return 94 # default values which will be overridden during actual file processing 95 return subdir, "unknown" 96 97 # If we found no package structure or we're processing a file already in a package context 98 # In this case, we'll determine package/version from the path of the file being processed 99 if len(parts) >= 3: 100 # Path structure might be test/package-name/version/... 101 # Check if the first part is "test" 102 if parts[-3] == "test" or "test" in str(p): 103 package_name = parts[-2] if not override_package_name else package_name 104 package_version = parts[-1] if not override_package_version else package_version 105 else: 106 # Standard structure: .../package-name/package-version/... 107 package_name = parts[-2] if not override_package_name else package_name 108 package_version = parts[-1] if not override_package_version else package_version 109 except (FileNotFoundError, PermissionError) as e: 110 # Handle cases where we can't access the directory 111 print(f"Error accessing directory {path}: {str(e)}") 112 113 return package_name, package_version 114 115 116def parse_html_content(content: str) -> List[Dict[str, Any]]: 117 """ 118 Parse the HTML content from the odoc JSON to extract signatures and documentation. 119 120 Args: 121 content: HTML content from the odoc JSON file 122 123 Returns: 124 List of dictionaries containing extracted information 125 """ 126 soup = BeautifulSoup(content, 'html.parser') 127 result = [] 128 129 # Process each specification block (function, type, module, etc.) 130 for spec in soup.find_all(class_="odoc-spec"): 131 item = {} 132 133 # Get the spec element (contains the signature) 134 spec_elem = spec.find(class_="spec") 135 if not spec_elem: 136 continue 137 138 # Determine the kind of element 139 kind = None 140 for cls in spec_elem.get('class', []): 141 if cls in ['type', 'value', 'module', 'class', 'exception', 'constructor']: 142 kind = cls 143 break 144 145 if not kind: 146 continue 147 148 item['kind'] = kind 149 150 # Extract the signature first to use for name extraction if needed 151 code_elem = spec_elem.find('code') 152 signature = "" 153 if code_elem: 154 # Get the full signature text and strip all newlines and normalize whitespace 155 signature = code_elem.get_text() 156 157 # Extract the name 158 name = None 159 160 # First try to get name from anchor ID 161 anchor = spec_elem.find('a', class_="anchor") 162 if anchor and anchor.get('id'): 163 item_id = anchor.get('id') 164 # Clean up the ID to get the name 165 name = item_id.split('.')[-1] if '.' in item_id else item_id 166 # Remove prefixes like 'type-', 'val-', etc. 167 name = re.sub(r'^(type|val|module|class|exception)-', '', name) 168 169 # For values (functions), extract the name from signature as a fallback 170 # This handles cases where the anchor doesn't contain the function name 171 if kind == 'value' and not name and signature: 172 # Look for "val name :" pattern in the signature 173 val_match = re.search(r'val\s+(\w+)\s*:', signature) 174 if val_match: 175 name = val_match.group(1) 176 177 if name: 178 item['name'] = name 179 180 # Add the processed signature 181 if signature: 182 # Replace newlines and multiple whitespace with a single space 183 signature = re.sub(r'\s+', ' ', signature) 184 item['signature'] = signature.strip() 185 186 # Extract documentation 187 doc_elem = spec.find(class_="spec-doc") 188 if doc_elem: 189 # Get the raw HTML content and remove all HTML tags 190 html_content = str(doc_elem) 191 # First, convert <br> tags to spaces 192 html_content = re.sub(r'<br\s*/?\s*>', ' ', html_content) 193 # Parse the modified HTML 194 soup_doc = BeautifulSoup(html_content, 'html.parser') 195 # Get text with all whitespace normalized 196 doc = soup_doc.get_text() 197 # Replace all newlines and multiple spaces with a single space 198 doc = re.sub(r'\s+', ' ', doc) 199 item['documentation'] = doc.strip() 200 201 # Add the item to our results 202 result.append(item) 203 204 return result 205 206 207def process_json_file(file_path: str, package_name: str, package_version: str) -> List[Dict[str, Any]]: 208 """ 209 Process a single odoc JSON file and extract the relevant information. 210 211 Args: 212 file_path: Path to the JSON file 213 package_name: Name of the package 214 package_version: Version of the package 215 216 Returns: 217 List of dictionaries containing extracted information 218 """ 219 # Extract package and version from file path if not already properly set 220 if package_version == "unknown" or package_name == "unknown": 221 # Check if this file is in a test directory structure 222 file_path_parts = Path(file_path).resolve().parts 223 224 # Look for test/package-name/version pattern in the path 225 for i, part in enumerate(file_path_parts): 226 if part == "test" and i + 2 < len(file_path_parts): 227 # We found a test directory, extract package name and version 228 package_name = file_path_parts[i + 1] 229 package_version = file_path_parts[i + 2] 230 break 231 232 try: 233 with open(file_path, 'r', encoding='utf-8') as f: 234 try: 235 data = json.load(f) 236 except json.JSONDecodeError: 237 print(f"Error decoding JSON from {file_path}") 238 return [] 239 except UnicodeDecodeError: 240 # Try opening with a different encoding or with errors='ignore' 241 try: 242 with open(file_path, 'r', encoding='latin-1') as f: 243 try: 244 data = json.load(f) 245 except json.JSONDecodeError: 246 print(f"Error decoding JSON from {file_path} with latin-1 encoding") 247 return [] 248 except Exception as e: 249 print(f"Error reading {file_path}: {str(e)}") 250 return [] 251 252 if 'content' not in data: 253 return [] 254 255 # Extract module path from breadcrumbs 256 module_path = [] 257 if 'breadcrumbs' in data: 258 for crumb in data['breadcrumbs']: 259 if crumb.get('kind') == 'module': 260 module_path.append(crumb.get('name')) 261 262 module_name = ".".join(module_path) if module_path else os.path.basename(os.path.dirname(file_path)) 263 264 # Extract items from the content 265 items = parse_html_content(data['content']) 266 267 # Add package and module information to each item 268 for item in items: 269 item['package_name'] = package_name 270 item['package_version'] = package_version 271 item['module_name'] = module_name 272 273 # Create a full path for the item that includes the item name 274 # - module_name: just the module hierarchy (e.g., "Math.Operations") 275 # - full_path: complete path including item name (e.g., "Math.Operations.add") 276 if 'name' in item: 277 item['full_path'] = f"{module_name}.{item['name']}" 278 else: 279 item['full_path'] = module_name 280 281 return items 282 283 284def process_directory(directory: str, mode: str = 'full', 285 override_package_name: Optional[str] = None, 286 override_package_version: Optional[str] = None) -> List[Dict[str, Any]]: 287 """ 288 Process all JSON files in a directory recursively. 289 290 Args: 291 directory: Path to the directory containing odoc JSON files 292 mode: Operating mode - 'full' for full packages list, 'single' for a single package 293 override_package_name: Optional override for package name 294 override_package_version: Optional override for package version 295 296 Returns: 297 List of all extracted items from all files 298 """ 299 all_items = [] 300 package_name, package_version = extract_package_info( 301 directory, 302 mode=mode, 303 override_package_name=override_package_name, 304 override_package_version=override_package_version 305 ) 306 307 # First count total files to process for progress tracking 308 total_files = 0 309 for root, _, files in os.walk(directory): 310 for file in files: 311 if file.endswith('.html.json'): 312 total_files += 1 313 314 if total_files == 0: 315 print(f"No .html.json files found in {directory}") 316 return all_items 317 318 mode_str = f"single package mode" if mode == 'single' else "full packages mode" 319 print(f"Processing {total_files} files from {package_name} {package_version} in {mode_str}...") 320 321 # Process each file with progress indicator 322 processed_files = 0 323 extracted_items = 0 324 325 for root, _, files in os.walk(directory): 326 for file in files: 327 if file.endswith('.html.json'): 328 file_path = os.path.join(root, file) 329 items = process_json_file(file_path, package_name, package_version) 330 all_items.extend(items) 331 332 # Update progress 333 processed_files += 1 334 extracted_items += len(items) 335 336 # Print progress every 100 files or on the last file 337 if processed_files % 100 == 0 or processed_files == total_files: 338 percent = (processed_files / total_files) * 100 339 print(f"Progress: {processed_files}/{total_files} files ({percent:.1f}%) - {extracted_items} items extracted", 340 end="\r", flush=True) 341 342 print(f"\nCompleted processing {processed_files} files - extracted {extracted_items} items total.") 343 return all_items 344 345 346def main(): 347 """ 348 Main entry point for the script. 349 350 Usage examples: 351 352 # Process in full mode (multiple packages) 353 python odoc2json.py /path/to/odoc/output output.json 354 355 # Process a single package with automatic detection 356 python odoc2json.py /path/to/odoc/package output.json --mode single 357 358 # Process with explicit package name and version 359 python odoc2json.py /path/to/odoc/package output.json --mode single --package-name package-name --package-version 5.0.0 360 """ 361 parser = argparse.ArgumentParser(description='Convert odoc JSON to structured JSON records') 362 parser.add_argument('input_dir', help='Directory containing odoc JSON output') 363 parser.add_argument('output_file', help='Output JSON file path') 364 parser.add_argument('--pretty', action='store_true', help='Pretty-print the JSON output') 365 parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose output') 366 parser.add_argument('--mode', choices=['full', 'single'], default='full', 367 help='Run mode: "full" for complete list of packages, "single" for a single package') 368 parser.add_argument('--package-name', help='Override the package name (useful in single mode)') 369 parser.add_argument('--package-version', help='Override the package version (useful in single mode)') 370 args = parser.parse_args() 371 372 start_time = time.time() 373 print(f"Starting extraction from {args.input_dir} in {args.mode} mode") 374 375 # Process all files in the directory 376 items = process_directory( 377 args.input_dir, 378 mode=args.mode, 379 override_package_name=args.package_name, 380 override_package_version=args.package_version 381 ) 382 383 # Write the output 384 print(f"Writing {len(items)} items to {args.output_file}...") 385 with open(args.output_file, 'w', encoding='utf-8') as f: 386 if args.pretty: 387 json.dump(items, f, indent=2, ensure_ascii=False) 388 else: 389 json.dump(items, f, ensure_ascii=False) 390 391 elapsed_time = time.time() - start_time 392 print(f"Processed {len(items)} items in {elapsed_time:.2f} seconds") 393 print(f"Output saved to {args.output_file}") 394 395 396if __name__ == "__main__": 397 main()