···
+
import multiprocessing as mp
from bs4 import BeautifulSoup
from typing import Dict, List, Any, Optional, Tuple
+
from functools import partial
+
def extract_package_info(path: str, mode: str = 'full',
+
override_package_name: Optional[str] = None,
+
override_package_version: Optional[str] = None) -> Tuple[str, str]:
Extract package name and version from the path.
path: Path to the odoc output directory
+
mode: Operating mode - 'full' for full packages list, 'single' for a single package
+
override_package_name: Optional override for package name
+
override_package_version: Optional override for package version
Tuple of (package_name, package_version)
+
# Always prioritize explicit overrides if provided
+
if override_package_name:
+
package_name = override_package_name
+
package_name = "unknown"
+
if override_package_version:
+
package_version = override_package_version
+
package_version = "unknown"
+
# If we have both overrides, no need to analyze path
+
if override_package_name and override_package_version:
+
return package_name, package_version
# Use Path for more reliable path parsing
+
# In single package mode, the package name is typically the directory name
+
if not override_package_name and parts:
+
# Extract package name from the last part of the path
+
package_name = parts[-1]
+
# Check if there's a subdirectory in the path that seems like a package name
+
subdir = next((d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))), None)
+
# In full mode, we need to look at the directory structure more carefully
+
# For test/ directory, the structure is test/package-name/package-version/
+
# First, check if the directory structure matches the expected pattern
+
# Look for subdirectories in the current path
+
subdirs = [d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))]
+
# If we have subdirectories that might be package names
+
if subdirs and not override_package_name:
+
# For each subdirectory (potential package name), check if it contains version subdirectories
+
version_dirs = [d for d in os.listdir(os.path.join(path, subdir))
+
if os.path.isdir(os.path.join(path, subdir, d))]
+
# If this subdirectory contains potential version directories, it's likely a package
+
# We'll use the current file's path to determine which package and version it belongs to
+
# We're processing files at the specific file level elsewhere, so here we just return
+
# default values which will be overridden during actual file processing
+
return subdir, "unknown"
+
# If we found no package structure or we're processing a file already in a package context
+
# In this case, we'll determine package/version from the path of the file being processed
+
# Path structure might be test/package-name/version/...
+
# Check if the first part is "test"
+
if parts[-3] == "test" or "test" in str(p):
+
package_name = parts[-2] if not override_package_name else package_name
+
package_version = parts[-1] if not override_package_version else package_version
+
# Standard structure: .../package-name/package-version/...
+
package_name = parts[-2] if not override_package_name else package_name
+
package_version = parts[-1] if not override_package_version else package_version
+
except (FileNotFoundError, PermissionError) as e:
+
# Handle cases where we can't access the directory
+
print(f"Error accessing directory {path}: {str(e)}")
+
return package_name, package_version
def parse_html_content(content: str) -> List[Dict[str, Any]]:
···
List of dictionaries containing extracted information
+
# Extract package and version from file path if not already properly set
+
if package_version == "unknown" or package_name == "unknown":
+
# Check if this file is in a test directory structure
+
file_path_parts = Path(file_path).resolve().parts
+
# Look for test/package-name/version pattern in the path
+
for i, part in enumerate(file_path_parts):
+
if part == "test" and i + 2 < len(file_path_parts):
+
# We found a test directory, extract package name and version
+
package_name = file_path_parts[i + 1]
+
package_version = file_path_parts[i + 2]
+
with open(file_path, 'r', encoding='utf-8') as f:
+
except json.JSONDecodeError:
+
print(f"Error decoding JSON from {file_path}")
+
except UnicodeDecodeError:
+
# Try opening with a different encoding or with errors='ignore'
+
with open(file_path, 'r', encoding='latin-1') as f:
+
except json.JSONDecodeError:
+
print(f"Error decoding JSON from {file_path} with latin-1 encoding")
+
print(f"Error reading {file_path}: {str(e)}")
if 'content' not in data:
···
+
def worker_process_files(file_batch, package_name, package_version):
+
Worker function to process a batch of files in parallel.
+
file_batch: List of files to process
+
package_name: Name of the package
+
package_version: Version of the package
+
List of all extracted items from all files in the batch
+
for file_path in file_batch:
+
items = process_json_file(file_path, package_name, package_version)
+
batch_items.extend(items)
+
def collect_json_files(directory):
+
Collect all JSON files in a directory recursively.
+
directory: Path to the directory to search
+
for root, _, files in os.walk(directory):
+
if file.endswith('.html.json'):
+
json_files.append(os.path.join(root, file))
+
def process_directory(directory: str, mode: str = 'full',
+
override_package_name: Optional[str] = None,
+
override_package_version: Optional[str] = None,
+
num_workers: int = 1) -> List[Dict[str, Any]]:
+
Process all JSON files in a directory recursively using multiple processes.
directory: Path to the directory containing odoc JSON files
+
mode: Operating mode - 'full' for full packages list, 'single' for a single package
+
override_package_name: Optional override for package name
+
override_package_version: Optional override for package version
+
num_workers: Number of worker processes to use
List of all extracted items from all files
+
package_name, package_version = extract_package_info(
+
override_package_name=override_package_name,
+
override_package_version=override_package_version
+
# Collect all JSON files
+
json_files = collect_json_files(directory)
+
total_files = len(json_files)
+
print(f"No .html.json files found in {directory}")
+
mode_str = "single package mode" if mode == 'single' else "full packages mode"
+
print(f"Processing {total_files} files from {package_name} {package_version} in {mode_str}...")
+
print(f"Using {num_workers} worker processes")
+
# Split files into batches for workers
+
batch_size = max(1, total_files // num_workers)
+
for i in range(0, total_files, batch_size):
+
batches.append(json_files[i:i + batch_size])
+
# Create partial function with fixed package name and version
+
process_batch = partial(worker_process_files, package_name=package_name, package_version=package_version)
+
# Process batches in parallel
+
start_time = time.time()
+
# Use multiprocessing Pool
+
with mp.Pool(processes=num_workers) as pool:
+
# Submit all batches to the pool
+
results = pool.map(process_batch, batches)
+
for batch_result in results:
+
all_items.extend(batch_result)
+
all_items = process_batch(json_files)
+
elapsed_time = time.time() - start_time
+
print(f"\nCompleted processing {total_files} files in {elapsed_time:.2f} seconds")
+
print(f"Extracted {len(all_items)} items total")
+
Main entry point for the script.
+
# Process in full mode (multiple packages)
+
python odoc2json.py /path/to/odoc/output output.json
+
# Process a single package with automatic detection
+
python odoc2json.py /path/to/odoc/package output.json --mode single
+
# Process with explicit package name and version
+
python odoc2json.py /path/to/odoc/package output.json --mode single --package-name package-name --package-version 5.0.0
+
# Process with multiple cores
+
python odoc2json.py /path/to/odoc/output output.json --workers 8
parser = argparse.ArgumentParser(description='Convert odoc JSON to structured JSON records')
parser.add_argument('input_dir', help='Directory containing odoc JSON output')
parser.add_argument('output_file', help='Output JSON file path')
parser.add_argument('--pretty', action='store_true', help='Pretty-print the JSON output')
+
parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose output')
+
parser.add_argument('--mode', choices=['full', 'single'], default='full',
+
help='Run mode: "full" for complete list of packages, "single" for a single package')
+
parser.add_argument('--package-name', help='Override the package name (useful in single mode)')
+
parser.add_argument('--package-version', help='Override the package version (useful in single mode)')
+
parser.add_argument('--workers', type=int, default=mp.cpu_count(),
+
help=f'Number of worker processes (default: {mp.cpu_count()})')
args = parser.parse_args()
+
start_time = time.time()
+
print(f"Starting extraction from {args.input_dir} in {args.mode} mode")
+
# Process all files in the directory with multiple workers
+
items = process_directory(
+
override_package_name=args.package_name,
+
override_package_version=args.package_version,
+
num_workers=args.workers
+
print(f"Writing {len(items)} items to {args.output_file}...")
with open(args.output_file, 'w', encoding='utf-8') as f:
json.dump(items, f, indent=2, ensure_ascii=False)
json.dump(items, f, ensure_ascii=False)
+
elapsed_time = time.time() - start_time
+
print(f"Processed {len(items)} items in {elapsed_time:.2f} seconds")
+
print(f"Output saved to {args.output_file}")
if __name__ == "__main__":