#!/usr/bin/env python3 # /// script # requires-python = ">=3.8" # dependencies = [ # "paho-mqtt>=1.6.0", # ] # /// """ Enhanced LifeCycle to OwnTracks Upload Script Adds motion activities and improved data handling with update capability """ import json import ssl import time import os import sys from typing import Dict, Any, List, Set, Optional, Tuple from datetime import datetime, timezone import hashlib import paho.mqtt.client as mqtt from collections import defaultdict # Default configuration DEFAULT_CONFIG = { 'mqtt_host': 'your-mqtt-host.com', 'mqtt_port': 8883, 'mqtt_user': 'your-username', 'mqtt_pass': 'your-password', 'device_id': 'lifecycle', 'data_dir': 'lifecycle_export', 'state_file': 'owntracks_upload_state.json', 'max_accuracy': 500.0, 'min_timestamp': 946684800, 'motion_time_window': 300 } def load_config(config_file: str = None) -> Dict[str, Any]: """Load configuration from file or environment variables""" config = DEFAULT_CONFIG.copy() # Try to load from config file if config_file and os.path.exists(config_file): try: with open(config_file, 'r') as f: file_config = json.load(f) config.update(file_config) print(f"šŸ“ Loaded config from {config_file}") except Exception as e: print(f"āš ļø Warning: Could not load config file {config_file}: {e}") # Override with environment variables if present env_mappings = { 'MQTT_HOST': 'mqtt_host', 'MQTT_PORT': 'mqtt_port', 'MQTT_USER': 'mqtt_user', 'MQTT_PASS': 'mqtt_pass', 'DEVICE_ID': 'device_id', 'DATA_DIR': 'data_dir', 'STATE_FILE': 'state_file' } for env_var, config_key in env_mappings.items(): if env_var in os.environ: value = os.environ[env_var] # Convert port to int if config_key == 'mqtt_port': try: value = int(value) except ValueError: print(f"āš ļø Warning: Invalid port value in {env_var}, using default") continue config[config_key] = value print(f"šŸ”§ Using {env_var} from environment") return config class EnhancedOwnTracksUploader: def __init__(self, config: Dict[str, Any], update_mode=False): self.config = config self.mqtt_client = None self.connected = False self.upload_count = 0 self.update_count = 0 self.skip_count = 0 self.error_count = 0 self.uploaded_timestamps = set() self.updated_timestamps = set() self.timezone_map = {} self.motion_data = {} self.update_mode = update_mode # Set up file paths from config self.data_dir = config['data_dir'] self.location_file = f"{self.data_dir}/LocationEvent.json" self.motion_file = f"{self.data_dir}/Motion.json" self.timezone_file = f"{self.data_dir}/TimeZone.json" self.state_file = config['state_file'] self.topic = f"owntracks/{config['mqtt_user']}/{config['device_id']}" # Load previous state if exists self.load_state() # Load timezone mapping self.load_timezones() # Load motion data for enhanced context self.load_motion_data() def load_state(self): """Load previously uploaded/updated timestamps""" if os.path.exists(self.state_file): try: with open(self.state_file, 'r') as f: state = json.load(f) self.uploaded_timestamps = set(state.get('uploaded_timestamps', [])) self.updated_timestamps = set(state.get('updated_timestamps', [])) self.upload_count = state.get('upload_count', 0) self.update_count = state.get('update_count', 0) print(f"šŸ“Š Loaded state: {len(self.uploaded_timestamps)} uploaded, {len(self.updated_timestamps)} updated") except Exception as e: print(f"āš ļø Warning: Could not load state file: {e}") def save_state(self): """Save current upload/update state""" try: state = { 'uploaded_timestamps': list(self.uploaded_timestamps), 'updated_timestamps': list(self.updated_timestamps), 'upload_count': self.upload_count, 'update_count': self.update_count, 'last_update': datetime.now().isoformat() } with open(self.state_file, 'w') as f: json.dump(state, f, indent=2) except Exception as e: print(f"āš ļø Warning: Could not save state: {e}") def load_timezones(self): """Load timezone mapping from LifeCycle data""" try: with open(self.timezone_file, 'r') as f: timezones = json.load(f) self.timezone_map = {tz['id']: tz['name'] for tz in timezones} print(f"šŸ“ Loaded {len(self.timezone_map)} timezones") except Exception as e: print(f"āš ļø Warning: Could not load timezones: {e}") self.timezone_map = {0: "Europe/London"} def load_motion_data(self): """Load motion classification data for enhanced context""" try: with open(self.motion_file, 'r') as f: motion_records = json.load(f) # Index motion data by timestamp for fast lookup for record in motion_records: timestamp = int(record.get('timestamp', 0)) if timestamp > 0: self.motion_data[timestamp] = record print(f"šŸƒ Loaded {len(self.motion_data)} motion records") except Exception as e: print(f"āš ļø Warning: Could not load motion data: {e}") def find_motion_context(self, location_timestamp: int) -> Optional[Dict[str, Any]]: """Find closest motion data for a location timestamp""" # Try exact match first if location_timestamp in self.motion_data: return self.motion_data[location_timestamp] # Find closest within time window closest_time = None min_diff = self.config['motion_time_window'] for motion_timestamp in self.motion_data: diff = abs(motion_timestamp - location_timestamp) if diff < min_diff: min_diff = diff closest_time = motion_timestamp if closest_time: return self.motion_data[closest_time] return None def build_motion_activities(self, motion_record: Dict[str, Any]) -> List[Dict[str, Any]]: """Convert LifeCycle motion data to OwnTracks motionactivities format""" activities = [] confidence = motion_record.get('confidence', 0) # Map LifeCycle motion types to OwnTracks activities motion_types = [ ('stationary', motion_record.get('stationary', 0)), ('walking', motion_record.get('walking', 0)), ('running', motion_record.get('running', 0)), ('automotive', motion_record.get('automotive', 0)), ('cycling', motion_record.get('cycling', 0)) ] # Add activities with non-zero values for activity_type, value in motion_types: if value > 0: activities.append({ 'type': activity_type, 'confidence': confidence }) # If unknown motion, add it if motion_record.get('unknown', 0) > 0: activities.append({ 'type': 'unknown', 'confidence': confidence }) return activities def setup_mqtt(self): """Setup MQTT client with SSL connection""" try: # Create client with callback API version 2 self.mqtt_client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2) self.mqtt_client.username_pw_set(self.config['mqtt_user'], self.config['mqtt_pass']) # Setup SSL context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) context.check_hostname = False context.verify_mode = ssl.CERT_NONE self.mqtt_client.tls_set_context(context) # Set callbacks self.mqtt_client.on_connect = self.on_connect self.mqtt_client.on_publish = self.on_publish self.mqtt_client.on_disconnect = self.on_disconnect # Connect print(f"šŸ”Œ Connecting to {self.config['mqtt_host']}:{self.config['mqtt_port']}...") self.mqtt_client.connect(self.config['mqtt_host'], self.config['mqtt_port'], 60) self.mqtt_client.loop_start() # Wait for connection timeout = 10 while not self.connected and timeout > 0: time.sleep(0.5) timeout -= 0.5 if not self.connected: raise Exception("Connection timeout") return True except Exception as e: print(f"āŒ MQTT setup failed: {e}") return False def on_connect(self, client, userdata, flags, reason_code, properties): """MQTT connection callback""" if reason_code == 0: self.connected = True print(f"āœ… Connected to OwnTracks MQTT broker") else: print(f"āŒ Connection failed with code {reason_code}") def on_publish(self, client, userdata, mid, reason_code, properties): """MQTT publish callback""" if reason_code != 0: self.error_count += 1 print(f"āŒ Publish failed: {reason_code}") def on_disconnect(self, client, userdata, reason_code, properties): """MQTT disconnect callback""" self.connected = False if reason_code != 0: print(f"āš ļø Unexpected disconnection: {reason_code}") def convert_to_owntracks(self, location_event: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Convert LifeCycle LocationEvent to enhanced OwnTracks format""" try: # Extract basic fields timestamp = location_event.get('timestamp') lat = location_event.get('latitude') lon = location_event.get('longitude') # Validation if not all([timestamp, lat is not None, lon is not None]): return None if timestamp < self.config['min_timestamp']: return None # Check accuracy threshold h_accuracy = location_event.get('hAccuracy', 0) if h_accuracy > self.config['max_accuracy']: return None # Build enhanced OwnTracks message owntracks_msg = { "_type": "location", "tst": int(timestamp), "lat": float(lat), "lon": float(lon), "acc": int(h_accuracy), "tid": "lc" # Tracker ID for LifeCycle } # Enhanced altitude handling with better precision altitude = location_event.get('altitude') if altitude is not None and altitude != -1.0 and altitude > -1000: # Sanity check owntracks_msg['alt'] = round(altitude, 1) # Keep decimal precision # Vertical accuracy v_accuracy = location_event.get('vAccuracy') if v_accuracy and v_accuracy > 0: owntracks_msg['vac'] = int(v_accuracy) # Enhanced speed handling speed = location_event.get('speed') if speed is not None and speed >= 0: # Convert m/s to km/h with better precision owntracks_msg['vel'] = round(speed * 3.6, 1) # Course over ground course = location_event.get('course') if course is not None and course >= 0: owntracks_msg['cog'] = int(course) # WiFi context wifi = location_event.get('wifi') if wifi: owntracks_msg['SSID'] = wifi wifi_id = location_event.get('wifiID') if wifi_id: owntracks_msg['BSSID'] = wifi_id # Enhanced: Add motion activities motion_context = self.find_motion_context(timestamp) if motion_context: activities = self.build_motion_activities(motion_context) if activities: owntracks_msg['motionactivities'] = activities # Add step count if available steps = motion_context.get('steps', 0) if steps > 0: owntracks_msg['steps'] = steps # Enhanced: Add barometric pressure estimate from altitude if altitude is not None and altitude > -1000: # Standard atmospheric pressure calculation: P = P0 * (1 - 0.0065*h/T0)^(g*M/(R*0.0065)) # Simplified approximation: P ā‰ˆ 101.325 * (1 - altitude/44330)^5.255 try: pressure = 101.325 * pow((1 - altitude / 44330.0), 5.255) if 50 <= pressure <= 110: # Sanity check for reasonable pressure range owntracks_msg['p'] = round(pressure, 2) except: pass # Skip if calculation fails return owntracks_msg except Exception as e: print(f"āŒ Conversion error: {e}") return None def load_location_data(self) -> List[Dict[str, Any]]: """Load LocationEvent data from JSON file""" try: with open(self.location_file, 'r') as f: data = json.load(f) print(f"šŸ“ Loaded {len(data)} location records") if self.update_mode: # In update mode, process already uploaded records to enhance them update_data = [record for record in data if (record.get('timestamp') in self.uploaded_timestamps and record.get('timestamp') not in self.updated_timestamps)] print(f"šŸ”„ {len(update_data)} records to update with enhanced data") return update_data else: # Normal mode - filter out already uploaded records new_data = [record for record in data if record.get('timestamp') not in self.uploaded_timestamps] print(f"šŸ“ {len(new_data)} new records to upload") return new_data except Exception as e: print(f"āŒ Error loading location data: {e}") return [] def upload_batch(self, batch: List[Dict[str, Any]], batch_num: int, total_batches: int): """Upload or update a batch of location records""" mode_label = "Updating" if self.update_mode else "Uploading" print(f"\nšŸ“¤ {mode_label} batch {batch_num}/{total_batches} ({len(batch)} records)...") batch_success = 0 batch_skip = 0 for i, location_event in enumerate(batch): try: # Convert to enhanced OwnTracks format owntracks_msg = self.convert_to_owntracks(location_event) if not owntracks_msg: batch_skip += 1 continue timestamp = location_event.get('timestamp') # Publish to MQTT payload = json.dumps(owntracks_msg) result = self.mqtt_client.publish(self.topic, payload, qos=1, retain=False) if result.rc == 0: if self.update_mode: self.updated_timestamps.add(timestamp) self.update_count += 1 else: self.uploaded_timestamps.add(timestamp) self.upload_count += 1 batch_success += 1 # Show progress with enhanced data indicators if batch_success % 10 == 0: dt = datetime.fromtimestamp(timestamp) motion_indicator = "šŸƒ" if 'motionactivities' in owntracks_msg else "" pressure_indicator = "šŸŒ”ļø" if 'p' in owntracks_msg else "" print(f" šŸ“ {batch_success}/{len(batch)} - {dt.strftime('%Y-%m-%d %H:%M:%S')} {motion_indicator}{pressure_indicator}") else: self.error_count += 1 print(f"āŒ Publish failed for timestamp {timestamp}") # Small delay to avoid overwhelming the broker time.sleep(0.01) except Exception as e: self.error_count += 1 print(f"āŒ Error processing record: {e}") self.skip_count += batch_skip print(f"āœ… Batch complete: {batch_success} processed, {batch_skip} skipped") # Save state periodically if batch_num % 5 == 0: self.save_state() def run_upload(self, batch_size: int = 100): """Main upload/update process""" mode_label = "update" if self.update_mode else "upload" print(f"šŸš€ Starting enhanced LifeCycle to OwnTracks {mode_label}...") # Setup MQTT connection if not self.setup_mqtt(): return False # Load location data location_data = self.load_location_data() if not location_data: print(f"āŒ No data to {mode_label}") return False # Upload/update in batches total_batches = (len(location_data) + batch_size - 1) // batch_size try: for i in range(0, len(location_data), batch_size): batch = location_data[i:i + batch_size] batch_num = (i // batch_size) + 1 self.upload_batch(batch, batch_num, total_batches) # Check connection if not self.connected: print("āŒ Lost connection, reconnecting...") if not self.setup_mqtt(): break except KeyboardInterrupt: print(f"\nāš ļø {mode_label.capitalize()} interrupted by user") except Exception as e: print(f"āŒ {mode_label.capitalize()} error: {e}") finally: # Final state save self.save_state() # Cleanup if self.mqtt_client: self.mqtt_client.loop_stop() self.mqtt_client.disconnect() # Enhanced summary total_processed = self.upload_count + self.update_count print(f"\nšŸ“Š Enhanced {mode_label.capitalize()} Summary:") if self.update_mode: print(f" šŸ”„ Updated: {self.update_count}") else: print(f" āœ… Uploaded: {self.upload_count}") print(f" ā­ļø Skipped: {self.skip_count}") print(f" āŒ Errors: {self.error_count}") print(f" šŸƒ With motion data: ~{len(self.motion_data)} available") print(f" šŸ“ State saved to: {self.state_file}") return True def create_example_config(): """Create an example configuration file""" example_config = { "mqtt_host": "your-mqtt-host.com", "mqtt_port": 8883, "mqtt_user": "your-username", "mqtt_pass": "your-password", "device_id": "lifecycle", "data_dir": "lifecycle_export", "state_file": "owntracks_upload_state.json", "max_accuracy": 500.0, "min_timestamp": 946684800, "motion_time_window": 300 } with open('config.json.example', 'w') as f: json.dump(example_config, f, indent=2) print("šŸ“ Created config.json.example - copy to config.json and edit with your settings") def main(): import argparse parser = argparse.ArgumentParser( description='Enhanced LifeCycle to OwnTracks uploader', epilog='Configuration can be provided via config.json file or environment variables' ) parser.add_argument('--config', '-c', type=str, default='config.json', help='Path to configuration file (default: config.json)') parser.add_argument('--create-config', action='store_true', help='Create example configuration file and exit') parser.add_argument('--update', action='store_true', help='Update existing records with enhanced data instead of uploading new ones') parser.add_argument('--batch-size', type=int, default=50, help='Batch size for uploads (default: 50)') args = parser.parse_args() if args.create_config: create_example_config() return 0 # Load configuration config = load_config(args.config) # Validate required configuration required_fields = ['mqtt_host', 'mqtt_user', 'mqtt_pass'] missing_fields = [field for field in required_fields if not config.get(field) or config[field] in ['your-mqtt-host.com', 'your-username', 'your-password']] if missing_fields: print(f"āŒ Missing or default configuration for: {', '.join(missing_fields)}") print("Please update your config.json file or set environment variables:") for field in missing_fields: env_var = field.upper().replace('_', '_') print(f" export {env_var}=your_value") print("\nOr run with --create-config to create an example configuration file") return 1 location_file = f"{config['data_dir']}/LocationEvent.json" if not os.path.exists(location_file): print(f"āŒ Location data file not found: {location_file}") print("Run ./export_lifecycle_data.sh first to export data") return 1 uploader = EnhancedOwnTracksUploader(config, update_mode=args.update) success = uploader.run_upload(batch_size=args.batch_size) return 0 if success else 1 if __name__ == "__main__": sys.exit(main())