Take OCaml odoc output into MCP
1#!/usr/bin/env python3
2# /// script
3# requires-python = ">=3.11"
4# dependencies = [
5# "bs4",
6# ]
7# ///
8"""
9odoc2json.py - Convert odoc JSON output to structured JSON records
10
11This script parses the JSON output files from odoc-driver (an OCaml documentation
12generator) and converts them into structured JSON records that include package name,
13version, and each function signature with associated documentation.
14
15The output is intended for further processing, analysis, and search over OCaml type
16signatures, especially for loading into columnar formats like Parquet.
17"""
18
19import os
20import json
21import re
22import time
23from bs4 import BeautifulSoup
24from typing import Dict, List, Any, Optional, Tuple
25import argparse
26from pathlib import Path
27
28
29def extract_package_info(path: str, mode: str = 'full',
30 override_package_name: Optional[str] = None,
31 override_package_version: Optional[str] = None) -> Tuple[str, str]:
32 """
33 Extract package name and version from the path.
34
35 Args:
36 path: Path to the odoc output directory
37 mode: Operating mode - 'full' for full packages list, 'single' for a single package
38 override_package_name: Optional override for package name
39 override_package_version: Optional override for package version
40
41 Returns:
42 Tuple of (package_name, package_version)
43 """
44 # Always prioritize explicit overrides if provided
45 if override_package_name:
46 package_name = override_package_name
47 else:
48 package_name = "unknown"
49
50 if override_package_version:
51 package_version = override_package_version
52 else:
53 package_version = "unknown"
54
55 # If we have both overrides, no need to analyze path
56 if override_package_name and override_package_version:
57 return package_name, package_version
58
59 # Use Path for more reliable path parsing
60 p = Path(path).resolve()
61 parts = list(p.parts)
62
63 if mode == 'single':
64 # In single package mode, the package name is typically the directory name
65 if not override_package_name and parts:
66 # Extract package name from the last part of the path
67 package_name = parts[-1]
68
69 # Check if there's a subdirectory in the path that seems like a package name
70 subdir = next((d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))), None)
71 if subdir:
72 package_name = subdir
73
74 elif mode == 'full':
75 # In full mode, we need to look at the directory structure more carefully
76 # For test/ directory, the structure is test/package-name/package-version/
77
78 # First, check if the directory structure matches the expected pattern
79 # Look for subdirectories in the current path
80 try:
81 subdirs = [d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))]
82
83 # If we have subdirectories that might be package names
84 if subdirs and not override_package_name:
85 # For each subdirectory (potential package name), check if it contains version subdirectories
86 for subdir in subdirs:
87 version_dirs = [d for d in os.listdir(os.path.join(path, subdir))
88 if os.path.isdir(os.path.join(path, subdir, d))]
89
90 # If this subdirectory contains potential version directories, it's likely a package
91 if version_dirs:
92 # We'll use the current file's path to determine which package and version it belongs to
93 # We're processing files at the specific file level elsewhere, so here we just return
94 # default values which will be overridden during actual file processing
95 return subdir, "unknown"
96
97 # If we found no package structure or we're processing a file already in a package context
98 # In this case, we'll determine package/version from the path of the file being processed
99 if len(parts) >= 3:
100 # Path structure might be test/package-name/version/...
101 # Check if the first part is "test"
102 if parts[-3] == "test" or "test" in str(p):
103 package_name = parts[-2] if not override_package_name else package_name
104 package_version = parts[-1] if not override_package_version else package_version
105 else:
106 # Standard structure: .../package-name/package-version/...
107 package_name = parts[-2] if not override_package_name else package_name
108 package_version = parts[-1] if not override_package_version else package_version
109 except (FileNotFoundError, PermissionError) as e:
110 # Handle cases where we can't access the directory
111 print(f"Error accessing directory {path}: {str(e)}")
112
113 return package_name, package_version
114
115
116def parse_html_content(content: str) -> List[Dict[str, Any]]:
117 """
118 Parse the HTML content from the odoc JSON to extract signatures and documentation.
119
120 Args:
121 content: HTML content from the odoc JSON file
122
123 Returns:
124 List of dictionaries containing extracted information
125 """
126 soup = BeautifulSoup(content, 'html.parser')
127 result = []
128
129 # Process each specification block (function, type, module, etc.)
130 for spec in soup.find_all(class_="odoc-spec"):
131 item = {}
132
133 # Get the spec element (contains the signature)
134 spec_elem = spec.find(class_="spec")
135 if not spec_elem:
136 continue
137
138 # Determine the kind of element
139 kind = None
140 for cls in spec_elem.get('class', []):
141 if cls in ['type', 'value', 'module', 'class', 'exception', 'constructor']:
142 kind = cls
143 break
144
145 if not kind:
146 continue
147
148 item['kind'] = kind
149
150 # Extract the signature first to use for name extraction if needed
151 code_elem = spec_elem.find('code')
152 signature = ""
153 if code_elem:
154 # Get the full signature text and strip all newlines and normalize whitespace
155 signature = code_elem.get_text()
156
157 # Extract the name
158 name = None
159
160 # First try to get name from anchor ID
161 anchor = spec_elem.find('a', class_="anchor")
162 if anchor and anchor.get('id'):
163 item_id = anchor.get('id')
164 # Clean up the ID to get the name
165 name = item_id.split('.')[-1] if '.' in item_id else item_id
166 # Remove prefixes like 'type-', 'val-', etc.
167 name = re.sub(r'^(type|val|module|class|exception)-', '', name)
168
169 # For values (functions), extract the name from signature as a fallback
170 # This handles cases where the anchor doesn't contain the function name
171 if kind == 'value' and not name and signature:
172 # Look for "val name :" pattern in the signature
173 val_match = re.search(r'val\s+(\w+)\s*:', signature)
174 if val_match:
175 name = val_match.group(1)
176
177 if name:
178 item['name'] = name
179
180 # Add the processed signature
181 if signature:
182 # Replace newlines and multiple whitespace with a single space
183 signature = re.sub(r'\s+', ' ', signature)
184 item['signature'] = signature.strip()
185
186 # Extract documentation
187 doc_elem = spec.find(class_="spec-doc")
188 if doc_elem:
189 # Get the raw HTML content and remove all HTML tags
190 html_content = str(doc_elem)
191 # First, convert <br> tags to spaces
192 html_content = re.sub(r'<br\s*/?\s*>', ' ', html_content)
193 # Parse the modified HTML
194 soup_doc = BeautifulSoup(html_content, 'html.parser')
195 # Get text with all whitespace normalized
196 doc = soup_doc.get_text()
197 # Replace all newlines and multiple spaces with a single space
198 doc = re.sub(r'\s+', ' ', doc)
199 item['documentation'] = doc.strip()
200
201 # Add the item to our results
202 result.append(item)
203
204 return result
205
206
207def process_json_file(file_path: str, package_name: str, package_version: str) -> List[Dict[str, Any]]:
208 """
209 Process a single odoc JSON file and extract the relevant information.
210
211 Args:
212 file_path: Path to the JSON file
213 package_name: Name of the package
214 package_version: Version of the package
215
216 Returns:
217 List of dictionaries containing extracted information
218 """
219 # Extract package and version from file path if not already properly set
220 if package_version == "unknown" or package_name == "unknown":
221 # Check if this file is in a test directory structure
222 file_path_parts = Path(file_path).resolve().parts
223
224 # Look for test/package-name/version pattern in the path
225 for i, part in enumerate(file_path_parts):
226 if part == "test" and i + 2 < len(file_path_parts):
227 # We found a test directory, extract package name and version
228 package_name = file_path_parts[i + 1]
229 package_version = file_path_parts[i + 2]
230 break
231
232 try:
233 with open(file_path, 'r', encoding='utf-8') as f:
234 try:
235 data = json.load(f)
236 except json.JSONDecodeError:
237 print(f"Error decoding JSON from {file_path}")
238 return []
239 except UnicodeDecodeError:
240 # Try opening with a different encoding or with errors='ignore'
241 try:
242 with open(file_path, 'r', encoding='latin-1') as f:
243 try:
244 data = json.load(f)
245 except json.JSONDecodeError:
246 print(f"Error decoding JSON from {file_path} with latin-1 encoding")
247 return []
248 except Exception as e:
249 print(f"Error reading {file_path}: {str(e)}")
250 return []
251
252 if 'content' not in data:
253 return []
254
255 # Extract module path from breadcrumbs
256 module_path = []
257 if 'breadcrumbs' in data:
258 for crumb in data['breadcrumbs']:
259 if crumb.get('kind') == 'module':
260 module_path.append(crumb.get('name'))
261
262 module_name = ".".join(module_path) if module_path else os.path.basename(os.path.dirname(file_path))
263
264 # Extract items from the content
265 items = parse_html_content(data['content'])
266
267 # Add package and module information to each item
268 for item in items:
269 item['package_name'] = package_name
270 item['package_version'] = package_version
271 item['module_name'] = module_name
272
273 # Create a full path for the item that includes the item name
274 # - module_name: just the module hierarchy (e.g., "Math.Operations")
275 # - full_path: complete path including item name (e.g., "Math.Operations.add")
276 if 'name' in item:
277 item['full_path'] = f"{module_name}.{item['name']}"
278 else:
279 item['full_path'] = module_name
280
281 return items
282
283
284def process_directory(directory: str, mode: str = 'full',
285 override_package_name: Optional[str] = None,
286 override_package_version: Optional[str] = None) -> List[Dict[str, Any]]:
287 """
288 Process all JSON files in a directory recursively.
289
290 Args:
291 directory: Path to the directory containing odoc JSON files
292 mode: Operating mode - 'full' for full packages list, 'single' for a single package
293 override_package_name: Optional override for package name
294 override_package_version: Optional override for package version
295
296 Returns:
297 List of all extracted items from all files
298 """
299 all_items = []
300 package_name, package_version = extract_package_info(
301 directory,
302 mode=mode,
303 override_package_name=override_package_name,
304 override_package_version=override_package_version
305 )
306
307 # First count total files to process for progress tracking
308 total_files = 0
309 for root, _, files in os.walk(directory):
310 for file in files:
311 if file.endswith('.html.json'):
312 total_files += 1
313
314 if total_files == 0:
315 print(f"No .html.json files found in {directory}")
316 return all_items
317
318 mode_str = f"single package mode" if mode == 'single' else "full packages mode"
319 print(f"Processing {total_files} files from {package_name} {package_version} in {mode_str}...")
320
321 # Process each file with progress indicator
322 processed_files = 0
323 extracted_items = 0
324
325 for root, _, files in os.walk(directory):
326 for file in files:
327 if file.endswith('.html.json'):
328 file_path = os.path.join(root, file)
329 items = process_json_file(file_path, package_name, package_version)
330 all_items.extend(items)
331
332 # Update progress
333 processed_files += 1
334 extracted_items += len(items)
335
336 # Print progress every 100 files or on the last file
337 if processed_files % 100 == 0 or processed_files == total_files:
338 percent = (processed_files / total_files) * 100
339 print(f"Progress: {processed_files}/{total_files} files ({percent:.1f}%) - {extracted_items} items extracted",
340 end="\r", flush=True)
341
342 print(f"\nCompleted processing {processed_files} files - extracted {extracted_items} items total.")
343 return all_items
344
345
346def main():
347 """
348 Main entry point for the script.
349
350 Usage examples:
351
352 # Process in full mode (multiple packages)
353 python odoc2json.py /path/to/odoc/output output.json
354
355 # Process a single package with automatic detection
356 python odoc2json.py /path/to/odoc/package output.json --mode single
357
358 # Process with explicit package name and version
359 python odoc2json.py /path/to/odoc/package output.json --mode single --package-name package-name --package-version 5.0.0
360 """
361 parser = argparse.ArgumentParser(description='Convert odoc JSON to structured JSON records')
362 parser.add_argument('input_dir', help='Directory containing odoc JSON output')
363 parser.add_argument('output_file', help='Output JSON file path')
364 parser.add_argument('--pretty', action='store_true', help='Pretty-print the JSON output')
365 parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose output')
366 parser.add_argument('--mode', choices=['full', 'single'], default='full',
367 help='Run mode: "full" for complete list of packages, "single" for a single package')
368 parser.add_argument('--package-name', help='Override the package name (useful in single mode)')
369 parser.add_argument('--package-version', help='Override the package version (useful in single mode)')
370 args = parser.parse_args()
371
372 start_time = time.time()
373 print(f"Starting extraction from {args.input_dir} in {args.mode} mode")
374
375 # Process all files in the directory
376 items = process_directory(
377 args.input_dir,
378 mode=args.mode,
379 override_package_name=args.package_name,
380 override_package_version=args.package_version
381 )
382
383 # Write the output
384 print(f"Writing {len(items)} items to {args.output_file}...")
385 with open(args.output_file, 'w', encoding='utf-8') as f:
386 if args.pretty:
387 json.dump(items, f, indent=2, ensure_ascii=False)
388 else:
389 json.dump(items, f, ensure_ascii=False)
390
391 elapsed_time = time.time() - start_time
392 print(f"Processed {len(items)} items in {elapsed_time:.2f} seconds")
393 print(f"Output saved to {args.output_file}")
394
395
396if __name__ == "__main__":
397 main()