Take OCaml odoc output into MCP

parallise

Changed files
+84 -34
+84 -34
odoc2json.py
···
import json
import re
import time
+
import multiprocessing as mp
from bs4 import BeautifulSoup
from typing import Dict, List, Any, Optional, Tuple
import argparse
from pathlib import Path
+
from functools import partial
def extract_package_info(path: str, mode: str = 'full',
···
return items
+
def worker_process_files(file_batch, package_name, package_version):
+
"""
+
Worker function to process a batch of files in parallel.
+
+
Args:
+
file_batch: List of files to process
+
package_name: Name of the package
+
package_version: Version of the package
+
+
Returns:
+
List of all extracted items from all files in the batch
+
"""
+
batch_items = []
+
for file_path in file_batch:
+
items = process_json_file(file_path, package_name, package_version)
+
batch_items.extend(items)
+
return batch_items
+
+
+
def collect_json_files(directory):
+
"""
+
Collect all JSON files in a directory recursively.
+
+
Args:
+
directory: Path to the directory to search
+
+
Returns:
+
List of file paths
+
"""
+
json_files = []
+
for root, _, files in os.walk(directory):
+
for file in files:
+
if file.endswith('.html.json'):
+
json_files.append(os.path.join(root, file))
+
return json_files
+
+
def process_directory(directory: str, mode: str = 'full',
override_package_name: Optional[str] = None,
-
override_package_version: Optional[str] = None) -> List[Dict[str, Any]]:
+
override_package_version: Optional[str] = None,
+
num_workers: int = 1) -> List[Dict[str, Any]]:
"""
-
Process all JSON files in a directory recursively.
+
Process all JSON files in a directory recursively using multiple processes.
Args:
directory: Path to the directory containing odoc JSON files
mode: Operating mode - 'full' for full packages list, 'single' for a single package
override_package_name: Optional override for package name
override_package_version: Optional override for package version
+
num_workers: Number of worker processes to use
Returns:
List of all extracted items from all files
"""
-
all_items = []
package_name, package_version = extract_package_info(
directory,
mode=mode,
···
override_package_version=override_package_version
)
-
# First count total files to process for progress tracking
-
total_files = 0
-
for root, _, files in os.walk(directory):
-
for file in files:
-
if file.endswith('.html.json'):
-
total_files += 1
+
# Collect all JSON files
+
json_files = collect_json_files(directory)
+
total_files = len(json_files)
if total_files == 0:
print(f"No .html.json files found in {directory}")
-
return all_items
+
return []
-
mode_str = f"single package mode" if mode == 'single' else "full packages mode"
+
mode_str = "single package mode" if mode == 'single' else "full packages mode"
print(f"Processing {total_files} files from {package_name} {package_version} in {mode_str}...")
+
print(f"Using {num_workers} worker processes")
-
# Process each file with progress indicator
-
processed_files = 0
-
extracted_items = 0
+
# Split files into batches for workers
+
batches = []
+
batch_size = max(1, total_files // num_workers)
+
for i in range(0, total_files, batch_size):
+
batches.append(json_files[i:i + batch_size])
-
for root, _, files in os.walk(directory):
-
for file in files:
-
if file.endswith('.html.json'):
-
file_path = os.path.join(root, file)
-
items = process_json_file(file_path, package_name, package_version)
-
all_items.extend(items)
-
-
# Update progress
-
processed_files += 1
-
extracted_items += len(items)
-
-
# Print progress every 100 files or on the last file
-
if processed_files % 100 == 0 or processed_files == total_files:
-
percent = (processed_files / total_files) * 100
-
print(f"Progress: {processed_files}/{total_files} files ({percent:.1f}%) - {extracted_items} items extracted",
-
end="\r", flush=True)
+
# Create partial function with fixed package name and version
+
process_batch = partial(worker_process_files, package_name=package_name, package_version=package_version)
-
print(f"\nCompleted processing {processed_files} files - extracted {extracted_items} items total.")
+
# Process batches in parallel
+
start_time = time.time()
+
all_items = []
+
+
if num_workers > 1:
+
# Use multiprocessing Pool
+
with mp.Pool(processes=num_workers) as pool:
+
# Submit all batches to the pool
+
results = pool.map(process_batch, batches)
+
# Collect all results
+
for batch_result in results:
+
all_items.extend(batch_result)
+
else:
+
# Single process mode
+
all_items = process_batch(json_files)
+
+
elapsed_time = time.time() - start_time
+
print(f"\nCompleted processing {total_files} files in {elapsed_time:.2f} seconds")
+
print(f"Extracted {len(all_items)} items total")
return all_items
···
# Process with explicit package name and version
python odoc2json.py /path/to/odoc/package output.json --mode single --package-name package-name --package-version 5.0.0
+
+
# Process with multiple cores
+
python odoc2json.py /path/to/odoc/output output.json --workers 8
"""
parser = argparse.ArgumentParser(description='Convert odoc JSON to structured JSON records')
parser.add_argument('input_dir', help='Directory containing odoc JSON output')
···
help='Run mode: "full" for complete list of packages, "single" for a single package')
parser.add_argument('--package-name', help='Override the package name (useful in single mode)')
parser.add_argument('--package-version', help='Override the package version (useful in single mode)')
+
parser.add_argument('--workers', type=int, default=mp.cpu_count(),
+
help=f'Number of worker processes (default: {mp.cpu_count()})')
args = parser.parse_args()
start_time = time.time()
print(f"Starting extraction from {args.input_dir} in {args.mode} mode")
-
# Process all files in the directory
+
# Process all files in the directory with multiple workers
items = process_directory(
args.input_dir,
mode=args.mode,
override_package_name=args.package_name,
-
override_package_version=args.package_version
+
override_package_version=args.package_version,
+
num_workers=args.workers
)
# Write the output
···
if __name__ == "__main__":
-
main()
+
main()