Take OCaml odoc output into MCP
1#!/usr/bin/env python3
2# /// script
3# requires-python = ">=3.11"
4# dependencies = [
5# "bs4",
6# ]
7# ///
8"""
9odoc2json.py - Convert odoc JSON output to structured JSON records
10
11This script parses the JSON output files from odoc-driver (an OCaml documentation
12generator) and converts them into structured JSON records that include package name,
13version, and each function signature with associated documentation.
14
15The output is intended for further processing, analysis, and search over OCaml type
16signatures, especially for loading into columnar formats like Parquet.
17"""
18
19import os
20import json
21import re
22import time
23import multiprocessing as mp
24from bs4 import BeautifulSoup
25from typing import Dict, List, Any, Optional, Tuple
26import argparse
27from pathlib import Path
28from functools import partial
29
30
31def extract_package_info(path: str, mode: str = 'full',
32 override_package_name: Optional[str] = None,
33 override_package_version: Optional[str] = None) -> Tuple[str, str]:
34 """
35 Extract package name and version from the path.
36
37 Args:
38 path: Path to the odoc output directory
39 mode: Operating mode - 'full' for full packages list, 'single' for a single package
40 override_package_name: Optional override for package name
41 override_package_version: Optional override for package version
42
43 Returns:
44 Tuple of (package_name, package_version)
45 """
46 # Always prioritize explicit overrides if provided
47 if override_package_name:
48 package_name = override_package_name
49 else:
50 package_name = "unknown"
51
52 if override_package_version:
53 package_version = override_package_version
54 else:
55 package_version = "unknown"
56
57 # If we have both overrides, no need to analyze path
58 if override_package_name and override_package_version:
59 return package_name, package_version
60
61 # Use Path for more reliable path parsing
62 p = Path(path).resolve()
63 parts = list(p.parts)
64
65 if mode == 'single':
66 # In single package mode, the package name is typically the directory name
67 if not override_package_name and parts:
68 # Extract package name from the last part of the path
69 package_name = parts[-1]
70
71 # Check if there's a subdirectory in the path that seems like a package name
72 subdir = next((d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))), None)
73 if subdir:
74 package_name = subdir
75
76 elif mode == 'full':
77 # In full mode, we need to look at the directory structure more carefully
78 # For test/ directory, the structure is test/package-name/package-version/
79
80 # First, check if the directory structure matches the expected pattern
81 # Look for subdirectories in the current path
82 try:
83 subdirs = [d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))]
84
85 # If we have subdirectories that might be package names
86 if subdirs and not override_package_name:
87 # For each subdirectory (potential package name), check if it contains version subdirectories
88 for subdir in subdirs:
89 version_dirs = [d for d in os.listdir(os.path.join(path, subdir))
90 if os.path.isdir(os.path.join(path, subdir, d))]
91
92 # If this subdirectory contains potential version directories, it's likely a package
93 if version_dirs:
94 # We'll use the current file's path to determine which package and version it belongs to
95 # We're processing files at the specific file level elsewhere, so here we just return
96 # default values which will be overridden during actual file processing
97 return subdir, "unknown"
98
99 # If we found no package structure or we're processing a file already in a package context
100 # In this case, we'll determine package/version from the path of the file being processed
101 if len(parts) >= 3:
102 # Path structure might be test/package-name/version/...
103 # Check if the first part is "test"
104 if parts[-3] == "test" or "test" in str(p):
105 package_name = parts[-2] if not override_package_name else package_name
106 package_version = parts[-1] if not override_package_version else package_version
107 else:
108 # Standard structure: .../package-name/package-version/...
109 package_name = parts[-2] if not override_package_name else package_name
110 package_version = parts[-1] if not override_package_version else package_version
111 except (FileNotFoundError, PermissionError) as e:
112 # Handle cases where we can't access the directory
113 print(f"Error accessing directory {path}: {str(e)}")
114
115 return package_name, package_version
116
117
118def parse_html_content(content: str) -> List[Dict[str, Any]]:
119 """
120 Parse the HTML content from the odoc JSON to extract signatures and documentation.
121
122 Args:
123 content: HTML content from the odoc JSON file
124
125 Returns:
126 List of dictionaries containing extracted information
127 """
128 soup = BeautifulSoup(content, 'html.parser')
129 result = []
130
131 # Process each specification block (function, type, module, etc.)
132 for spec in soup.find_all(class_="odoc-spec"):
133 item = {}
134
135 # Get the spec element (contains the signature)
136 spec_elem = spec.find(class_="spec")
137 if not spec_elem:
138 continue
139
140 # Determine the kind of element
141 kind = None
142 for cls in spec_elem.get('class', []):
143 if cls in ['type', 'value', 'module', 'class', 'exception', 'constructor']:
144 kind = cls
145 break
146
147 if not kind:
148 continue
149
150 item['kind'] = kind
151
152 # Extract the signature first to use for name extraction if needed
153 code_elem = spec_elem.find('code')
154 signature = ""
155 if code_elem:
156 # Get the full signature text and strip all newlines and normalize whitespace
157 signature = code_elem.get_text()
158
159 # Extract the name
160 name = None
161
162 # First try to get name from anchor ID
163 anchor = spec_elem.find('a', class_="anchor")
164 if anchor and anchor.get('id'):
165 item_id = anchor.get('id')
166 # Clean up the ID to get the name
167 name = item_id.split('.')[-1] if '.' in item_id else item_id
168 # Remove prefixes like 'type-', 'val-', etc.
169 name = re.sub(r'^(type|val|module|class|exception)-', '', name)
170
171 # For values (functions), extract the name from signature as a fallback
172 # This handles cases where the anchor doesn't contain the function name
173 if kind == 'value' and not name and signature:
174 # Look for "val name :" pattern in the signature
175 val_match = re.search(r'val\s+(\w+)\s*:', signature)
176 if val_match:
177 name = val_match.group(1)
178
179 if name:
180 item['name'] = name
181
182 # Add the processed signature
183 if signature:
184 # Replace newlines and multiple whitespace with a single space
185 signature = re.sub(r'\s+', ' ', signature)
186 item['signature'] = signature.strip()
187
188 # Extract documentation
189 doc_elem = spec.find(class_="spec-doc")
190 if doc_elem:
191 # Get the raw HTML content and remove all HTML tags
192 html_content = str(doc_elem)
193 # First, convert <br> tags to spaces
194 html_content = re.sub(r'<br\s*/?\s*>', ' ', html_content)
195 # Parse the modified HTML
196 soup_doc = BeautifulSoup(html_content, 'html.parser')
197 # Get text with all whitespace normalized
198 doc = soup_doc.get_text()
199 # Replace all newlines and multiple spaces with a single space
200 doc = re.sub(r'\s+', ' ', doc)
201 item['documentation'] = doc.strip()
202
203 # Add the item to our results
204 result.append(item)
205
206 return result
207
208
209def process_json_file(file_path: str, package_name: str, package_version: str) -> List[Dict[str, Any]]:
210 """
211 Process a single odoc JSON file and extract the relevant information.
212
213 Args:
214 file_path: Path to the JSON file
215 package_name: Name of the package
216 package_version: Version of the package
217
218 Returns:
219 List of dictionaries containing extracted information
220 """
221 # Extract package and version from file path if not already properly set
222 if package_version == "unknown" or package_name == "unknown":
223 # Check if this file is in a test directory structure
224 file_path_parts = Path(file_path).resolve().parts
225
226 # Look for test/package-name/version pattern in the path
227 for i, part in enumerate(file_path_parts):
228 if part == "test" and i + 2 < len(file_path_parts):
229 # We found a test directory, extract package name and version
230 package_name = file_path_parts[i + 1]
231 package_version = file_path_parts[i + 2]
232 break
233
234 try:
235 with open(file_path, 'r', encoding='utf-8') as f:
236 try:
237 data = json.load(f)
238 except json.JSONDecodeError:
239 print(f"Error decoding JSON from {file_path}")
240 return []
241 except UnicodeDecodeError:
242 # Try opening with a different encoding or with errors='ignore'
243 try:
244 with open(file_path, 'r', encoding='latin-1') as f:
245 try:
246 data = json.load(f)
247 except json.JSONDecodeError:
248 print(f"Error decoding JSON from {file_path} with latin-1 encoding")
249 return []
250 except Exception as e:
251 print(f"Error reading {file_path}: {str(e)}")
252 return []
253
254 if 'content' not in data:
255 return []
256
257 # Extract module path from breadcrumbs
258 module_path = []
259 if 'breadcrumbs' in data:
260 for crumb in data['breadcrumbs']:
261 if crumb.get('kind') == 'module':
262 module_path.append(crumb.get('name'))
263
264 module_name = ".".join(module_path) if module_path else os.path.basename(os.path.dirname(file_path))
265
266 # Extract items from the content
267 items = parse_html_content(data['content'])
268
269 # Add package and module information to each item
270 for item in items:
271 item['package_name'] = package_name
272 item['package_version'] = package_version
273 item['module_name'] = module_name
274
275 # Create a full path for the item that includes the item name
276 # - module_name: just the module hierarchy (e.g., "Math.Operations")
277 # - full_path: complete path including item name (e.g., "Math.Operations.add")
278 if 'name' in item:
279 item['full_path'] = f"{module_name}.{item['name']}"
280 else:
281 item['full_path'] = module_name
282
283 return items
284
285
286def worker_process_files(file_batch, package_name, package_version):
287 """
288 Worker function to process a batch of files in parallel.
289
290 Args:
291 file_batch: List of files to process
292 package_name: Name of the package
293 package_version: Version of the package
294
295 Returns:
296 List of all extracted items from all files in the batch
297 """
298 batch_items = []
299 for file_path in file_batch:
300 items = process_json_file(file_path, package_name, package_version)
301 batch_items.extend(items)
302 return batch_items
303
304
305def collect_json_files(directory):
306 """
307 Collect all JSON files in a directory recursively.
308
309 Args:
310 directory: Path to the directory to search
311
312 Returns:
313 List of file paths
314 """
315 json_files = []
316 for root, _, files in os.walk(directory):
317 for file in files:
318 if file.endswith('.html.json'):
319 json_files.append(os.path.join(root, file))
320 return json_files
321
322
323def process_directory(directory: str, mode: str = 'full',
324 override_package_name: Optional[str] = None,
325 override_package_version: Optional[str] = None,
326 num_workers: int = 1) -> List[Dict[str, Any]]:
327 """
328 Process all JSON files in a directory recursively using multiple processes.
329
330 Args:
331 directory: Path to the directory containing odoc JSON files
332 mode: Operating mode - 'full' for full packages list, 'single' for a single package
333 override_package_name: Optional override for package name
334 override_package_version: Optional override for package version
335 num_workers: Number of worker processes to use
336
337 Returns:
338 List of all extracted items from all files
339 """
340 package_name, package_version = extract_package_info(
341 directory,
342 mode=mode,
343 override_package_name=override_package_name,
344 override_package_version=override_package_version
345 )
346
347 # Collect all JSON files
348 json_files = collect_json_files(directory)
349 total_files = len(json_files)
350
351 if total_files == 0:
352 print(f"No .html.json files found in {directory}")
353 return []
354
355 mode_str = "single package mode" if mode == 'single' else "full packages mode"
356 print(f"Processing {total_files} files from {package_name} {package_version} in {mode_str}...")
357 print(f"Using {num_workers} worker processes")
358
359 # Split files into batches for workers
360 batches = []
361 batch_size = max(1, total_files // num_workers)
362 for i in range(0, total_files, batch_size):
363 batches.append(json_files[i:i + batch_size])
364
365 # Create partial function with fixed package name and version
366 process_batch = partial(worker_process_files, package_name=package_name, package_version=package_version)
367
368 # Process batches in parallel
369 start_time = time.time()
370 all_items = []
371
372 if num_workers > 1:
373 # Use multiprocessing Pool
374 with mp.Pool(processes=num_workers) as pool:
375 # Submit all batches to the pool
376 results = pool.map(process_batch, batches)
377 # Collect all results
378 for batch_result in results:
379 all_items.extend(batch_result)
380 else:
381 # Single process mode
382 all_items = process_batch(json_files)
383
384 elapsed_time = time.time() - start_time
385 print(f"\nCompleted processing {total_files} files in {elapsed_time:.2f} seconds")
386 print(f"Extracted {len(all_items)} items total")
387 return all_items
388
389
390def main():
391 """
392 Main entry point for the script.
393
394 Usage examples:
395
396 # Process in full mode (multiple packages)
397 python odoc2json.py /path/to/odoc/output output.json
398
399 # Process a single package with automatic detection
400 python odoc2json.py /path/to/odoc/package output.json --mode single
401
402 # Process with explicit package name and version
403 python odoc2json.py /path/to/odoc/package output.json --mode single --package-name package-name --package-version 5.0.0
404
405 # Process with multiple cores
406 python odoc2json.py /path/to/odoc/output output.json --workers 8
407 """
408 parser = argparse.ArgumentParser(description='Convert odoc JSON to structured JSON records')
409 parser.add_argument('input_dir', help='Directory containing odoc JSON output')
410 parser.add_argument('output_file', help='Output JSON file path')
411 parser.add_argument('--pretty', action='store_true', help='Pretty-print the JSON output')
412 parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose output')
413 parser.add_argument('--mode', choices=['full', 'single'], default='full',
414 help='Run mode: "full" for complete list of packages, "single" for a single package')
415 parser.add_argument('--package-name', help='Override the package name (useful in single mode)')
416 parser.add_argument('--package-version', help='Override the package version (useful in single mode)')
417 parser.add_argument('--workers', type=int, default=mp.cpu_count(),
418 help=f'Number of worker processes (default: {mp.cpu_count()})')
419 args = parser.parse_args()
420
421 start_time = time.time()
422 print(f"Starting extraction from {args.input_dir} in {args.mode} mode")
423
424 # Process all files in the directory with multiple workers
425 items = process_directory(
426 args.input_dir,
427 mode=args.mode,
428 override_package_name=args.package_name,
429 override_package_version=args.package_version,
430 num_workers=args.workers
431 )
432
433 # Write the output
434 print(f"Writing {len(items)} items to {args.output_file}...")
435 with open(args.output_file, 'w', encoding='utf-8') as f:
436 if args.pretty:
437 json.dump(items, f, indent=2, ensure_ascii=False)
438 else:
439 json.dump(items, f, ensure_ascii=False)
440
441 elapsed_time = time.time() - start_time
442 print(f"Processed {len(items)} items in {elapsed_time:.2f} seconds")
443 print(f"Output saved to {args.output_file}")
444
445
446if __name__ == "__main__":
447 main()