Export Lifecycle app GPS locations to Owntracks
at main 23 kB view raw
1#!/usr/bin/env python3 2# /// script 3# requires-python = ">=3.8" 4# dependencies = [ 5# "paho-mqtt>=1.6.0", 6# ] 7# /// 8""" 9Enhanced LifeCycle to OwnTracks Upload Script 10Adds motion activities and improved data handling with update capability 11""" 12 13import json 14import ssl 15import time 16import os 17import sys 18from typing import Dict, Any, List, Set, Optional, Tuple 19from datetime import datetime, timezone 20import hashlib 21import paho.mqtt.client as mqtt 22from collections import defaultdict 23 24# Default configuration 25DEFAULT_CONFIG = { 26 'mqtt_host': 'your-mqtt-host.com', 27 'mqtt_port': 8883, 28 'mqtt_user': 'your-username', 29 'mqtt_pass': 'your-password', 30 'device_id': 'lifecycle', 31 'data_dir': 'lifecycle_export', 32 'state_file': 'owntracks_upload_state.json', 33 'max_accuracy': 500.0, 34 'min_timestamp': 946684800, 35 'motion_time_window': 300 36} 37 38def load_config(config_file: str = None) -> Dict[str, Any]: 39 """Load configuration from file or environment variables""" 40 config = DEFAULT_CONFIG.copy() 41 42 # Try to load from config file 43 if config_file and os.path.exists(config_file): 44 try: 45 with open(config_file, 'r') as f: 46 file_config = json.load(f) 47 config.update(file_config) 48 print(f"📝 Loaded config from {config_file}") 49 except Exception as e: 50 print(f"⚠️ Warning: Could not load config file {config_file}: {e}") 51 52 # Override with environment variables if present 53 env_mappings = { 54 'MQTT_HOST': 'mqtt_host', 55 'MQTT_PORT': 'mqtt_port', 56 'MQTT_USER': 'mqtt_user', 57 'MQTT_PASS': 'mqtt_pass', 58 'DEVICE_ID': 'device_id', 59 'DATA_DIR': 'data_dir', 60 'STATE_FILE': 'state_file' 61 } 62 63 for env_var, config_key in env_mappings.items(): 64 if env_var in os.environ: 65 value = os.environ[env_var] 66 # Convert port to int 67 if config_key == 'mqtt_port': 68 try: 69 value = int(value) 70 except ValueError: 71 print(f"⚠️ Warning: Invalid port value in {env_var}, using default") 72 continue 73 config[config_key] = value 74 print(f"🔧 Using {env_var} from environment") 75 76 return config 77 78class EnhancedOwnTracksUploader: 79 def __init__(self, config: Dict[str, Any], update_mode=False): 80 self.config = config 81 self.mqtt_client = None 82 self.connected = False 83 self.upload_count = 0 84 self.update_count = 0 85 self.skip_count = 0 86 self.error_count = 0 87 self.uploaded_timestamps = set() 88 self.updated_timestamps = set() 89 self.timezone_map = {} 90 self.motion_data = {} 91 self.update_mode = update_mode 92 93 # Set up file paths from config 94 self.data_dir = config['data_dir'] 95 self.location_file = f"{self.data_dir}/LocationEvent.json" 96 self.motion_file = f"{self.data_dir}/Motion.json" 97 self.timezone_file = f"{self.data_dir}/TimeZone.json" 98 self.state_file = config['state_file'] 99 self.topic = f"owntracks/{config['mqtt_user']}/{config['device_id']}" 100 101 # Load previous state if exists 102 self.load_state() 103 104 # Load timezone mapping 105 self.load_timezones() 106 107 # Load motion data for enhanced context 108 self.load_motion_data() 109 110 def load_state(self): 111 """Load previously uploaded/updated timestamps""" 112 if os.path.exists(self.state_file): 113 try: 114 with open(self.state_file, 'r') as f: 115 state = json.load(f) 116 self.uploaded_timestamps = set(state.get('uploaded_timestamps', [])) 117 self.updated_timestamps = set(state.get('updated_timestamps', [])) 118 self.upload_count = state.get('upload_count', 0) 119 self.update_count = state.get('update_count', 0) 120 print(f"📊 Loaded state: {len(self.uploaded_timestamps)} uploaded, {len(self.updated_timestamps)} updated") 121 except Exception as e: 122 print(f"⚠️ Warning: Could not load state file: {e}") 123 124 def save_state(self): 125 """Save current upload/update state""" 126 try: 127 state = { 128 'uploaded_timestamps': list(self.uploaded_timestamps), 129 'updated_timestamps': list(self.updated_timestamps), 130 'upload_count': self.upload_count, 131 'update_count': self.update_count, 132 'last_update': datetime.now().isoformat() 133 } 134 with open(self.state_file, 'w') as f: 135 json.dump(state, f, indent=2) 136 except Exception as e: 137 print(f"⚠️ Warning: Could not save state: {e}") 138 139 def load_timezones(self): 140 """Load timezone mapping from LifeCycle data""" 141 try: 142 with open(self.timezone_file, 'r') as f: 143 timezones = json.load(f) 144 self.timezone_map = {tz['id']: tz['name'] for tz in timezones} 145 print(f"📍 Loaded {len(self.timezone_map)} timezones") 146 except Exception as e: 147 print(f"⚠️ Warning: Could not load timezones: {e}") 148 self.timezone_map = {0: "Europe/London"} 149 150 def load_motion_data(self): 151 """Load motion classification data for enhanced context""" 152 try: 153 with open(self.motion_file, 'r') as f: 154 motion_records = json.load(f) 155 156 # Index motion data by timestamp for fast lookup 157 for record in motion_records: 158 timestamp = int(record.get('timestamp', 0)) 159 if timestamp > 0: 160 self.motion_data[timestamp] = record 161 162 print(f"🏃 Loaded {len(self.motion_data)} motion records") 163 except Exception as e: 164 print(f"⚠️ Warning: Could not load motion data: {e}") 165 166 def find_motion_context(self, location_timestamp: int) -> Optional[Dict[str, Any]]: 167 """Find closest motion data for a location timestamp""" 168 # Try exact match first 169 if location_timestamp in self.motion_data: 170 return self.motion_data[location_timestamp] 171 172 # Find closest within time window 173 closest_time = None 174 min_diff = self.config['motion_time_window'] 175 176 for motion_timestamp in self.motion_data: 177 diff = abs(motion_timestamp - location_timestamp) 178 if diff < min_diff: 179 min_diff = diff 180 closest_time = motion_timestamp 181 182 if closest_time: 183 return self.motion_data[closest_time] 184 185 return None 186 187 def build_motion_activities(self, motion_record: Dict[str, Any]) -> List[Dict[str, Any]]: 188 """Convert LifeCycle motion data to OwnTracks motionactivities format""" 189 activities = [] 190 confidence = motion_record.get('confidence', 0) 191 192 # Map LifeCycle motion types to OwnTracks activities 193 motion_types = [ 194 ('stationary', motion_record.get('stationary', 0)), 195 ('walking', motion_record.get('walking', 0)), 196 ('running', motion_record.get('running', 0)), 197 ('automotive', motion_record.get('automotive', 0)), 198 ('cycling', motion_record.get('cycling', 0)) 199 ] 200 201 # Add activities with non-zero values 202 for activity_type, value in motion_types: 203 if value > 0: 204 activities.append({ 205 'type': activity_type, 206 'confidence': confidence 207 }) 208 209 # If unknown motion, add it 210 if motion_record.get('unknown', 0) > 0: 211 activities.append({ 212 'type': 'unknown', 213 'confidence': confidence 214 }) 215 216 return activities 217 218 def setup_mqtt(self): 219 """Setup MQTT client with SSL connection""" 220 try: 221 # Create client with callback API version 2 222 self.mqtt_client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2) 223 self.mqtt_client.username_pw_set(self.config['mqtt_user'], self.config['mqtt_pass']) 224 225 # Setup SSL 226 context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) 227 context.check_hostname = False 228 context.verify_mode = ssl.CERT_NONE 229 self.mqtt_client.tls_set_context(context) 230 231 # Set callbacks 232 self.mqtt_client.on_connect = self.on_connect 233 self.mqtt_client.on_publish = self.on_publish 234 self.mqtt_client.on_disconnect = self.on_disconnect 235 236 # Connect 237 print(f"🔌 Connecting to {self.config['mqtt_host']}:{self.config['mqtt_port']}...") 238 self.mqtt_client.connect(self.config['mqtt_host'], self.config['mqtt_port'], 60) 239 self.mqtt_client.loop_start() 240 241 # Wait for connection 242 timeout = 10 243 while not self.connected and timeout > 0: 244 time.sleep(0.5) 245 timeout -= 0.5 246 247 if not self.connected: 248 raise Exception("Connection timeout") 249 250 return True 251 252 except Exception as e: 253 print(f"❌ MQTT setup failed: {e}") 254 return False 255 256 def on_connect(self, client, userdata, flags, reason_code, properties): 257 """MQTT connection callback""" 258 if reason_code == 0: 259 self.connected = True 260 print(f"✅ Connected to OwnTracks MQTT broker") 261 else: 262 print(f"❌ Connection failed with code {reason_code}") 263 264 def on_publish(self, client, userdata, mid, reason_code, properties): 265 """MQTT publish callback""" 266 if reason_code != 0: 267 self.error_count += 1 268 print(f"❌ Publish failed: {reason_code}") 269 270 def on_disconnect(self, client, userdata, reason_code, properties): 271 """MQTT disconnect callback""" 272 self.connected = False 273 if reason_code != 0: 274 print(f"⚠️ Unexpected disconnection: {reason_code}") 275 276 def convert_to_owntracks(self, location_event: Dict[str, Any]) -> Optional[Dict[str, Any]]: 277 """Convert LifeCycle LocationEvent to enhanced OwnTracks format""" 278 try: 279 # Extract basic fields 280 timestamp = location_event.get('timestamp') 281 lat = location_event.get('latitude') 282 lon = location_event.get('longitude') 283 284 # Validation 285 if not all([timestamp, lat is not None, lon is not None]): 286 return None 287 288 if timestamp < self.config['min_timestamp']: 289 return None 290 291 # Check accuracy threshold 292 h_accuracy = location_event.get('hAccuracy', 0) 293 if h_accuracy > self.config['max_accuracy']: 294 return None 295 296 # Build enhanced OwnTracks message 297 owntracks_msg = { 298 "_type": "location", 299 "tst": int(timestamp), 300 "lat": float(lat), 301 "lon": float(lon), 302 "acc": int(h_accuracy), 303 "tid": "lc" # Tracker ID for LifeCycle 304 } 305 306 # Enhanced altitude handling with better precision 307 altitude = location_event.get('altitude') 308 if altitude is not None and altitude != -1.0 and altitude > -1000: # Sanity check 309 owntracks_msg['alt'] = round(altitude, 1) # Keep decimal precision 310 311 # Vertical accuracy 312 v_accuracy = location_event.get('vAccuracy') 313 if v_accuracy and v_accuracy > 0: 314 owntracks_msg['vac'] = int(v_accuracy) 315 316 # Enhanced speed handling 317 speed = location_event.get('speed') 318 if speed is not None and speed >= 0: 319 # Convert m/s to km/h with better precision 320 owntracks_msg['vel'] = round(speed * 3.6, 1) 321 322 # Course over ground 323 course = location_event.get('course') 324 if course is not None and course >= 0: 325 owntracks_msg['cog'] = int(course) 326 327 # WiFi context 328 wifi = location_event.get('wifi') 329 if wifi: 330 owntracks_msg['SSID'] = wifi 331 332 wifi_id = location_event.get('wifiID') 333 if wifi_id: 334 owntracks_msg['BSSID'] = wifi_id 335 336 # Enhanced: Add motion activities 337 motion_context = self.find_motion_context(timestamp) 338 if motion_context: 339 activities = self.build_motion_activities(motion_context) 340 if activities: 341 owntracks_msg['motionactivities'] = activities 342 343 # Add step count if available 344 steps = motion_context.get('steps', 0) 345 if steps > 0: 346 owntracks_msg['steps'] = steps 347 348 # Enhanced: Add barometric pressure estimate from altitude 349 if altitude is not None and altitude > -1000: 350 # Standard atmospheric pressure calculation: P = P0 * (1 - 0.0065*h/T0)^(g*M/(R*0.0065)) 351 # Simplified approximation: P ≈ 101.325 * (1 - altitude/44330)^5.255 352 try: 353 pressure = 101.325 * pow((1 - altitude / 44330.0), 5.255) 354 if 50 <= pressure <= 110: # Sanity check for reasonable pressure range 355 owntracks_msg['p'] = round(pressure, 2) 356 except: 357 pass # Skip if calculation fails 358 359 return owntracks_msg 360 361 except Exception as e: 362 print(f"❌ Conversion error: {e}") 363 return None 364 365 def load_location_data(self) -> List[Dict[str, Any]]: 366 """Load LocationEvent data from JSON file""" 367 try: 368 with open(self.location_file, 'r') as f: 369 data = json.load(f) 370 print(f"📍 Loaded {len(data)} location records") 371 372 if self.update_mode: 373 # In update mode, process already uploaded records to enhance them 374 update_data = [record for record in data 375 if (record.get('timestamp') in self.uploaded_timestamps and 376 record.get('timestamp') not in self.updated_timestamps)] 377 print(f"🔄 {len(update_data)} records to update with enhanced data") 378 return update_data 379 else: 380 # Normal mode - filter out already uploaded records 381 new_data = [record for record in data 382 if record.get('timestamp') not in self.uploaded_timestamps] 383 print(f"📍 {len(new_data)} new records to upload") 384 return new_data 385 386 except Exception as e: 387 print(f"❌ Error loading location data: {e}") 388 return [] 389 390 def upload_batch(self, batch: List[Dict[str, Any]], batch_num: int, total_batches: int): 391 """Upload or update a batch of location records""" 392 mode_label = "Updating" if self.update_mode else "Uploading" 393 print(f"\n📤 {mode_label} batch {batch_num}/{total_batches} ({len(batch)} records)...") 394 395 batch_success = 0 396 batch_skip = 0 397 398 for i, location_event in enumerate(batch): 399 try: 400 # Convert to enhanced OwnTracks format 401 owntracks_msg = self.convert_to_owntracks(location_event) 402 403 if not owntracks_msg: 404 batch_skip += 1 405 continue 406 407 timestamp = location_event.get('timestamp') 408 409 # Publish to MQTT 410 payload = json.dumps(owntracks_msg) 411 result = self.mqtt_client.publish(self.topic, payload, qos=1, retain=False) 412 413 if result.rc == 0: 414 if self.update_mode: 415 self.updated_timestamps.add(timestamp) 416 self.update_count += 1 417 else: 418 self.uploaded_timestamps.add(timestamp) 419 self.upload_count += 1 420 421 batch_success += 1 422 423 # Show progress with enhanced data indicators 424 if batch_success % 10 == 0: 425 dt = datetime.fromtimestamp(timestamp) 426 motion_indicator = "🏃" if 'motionactivities' in owntracks_msg else "" 427 pressure_indicator = "🌡️" if 'p' in owntracks_msg else "" 428 print(f" 📍 {batch_success}/{len(batch)} - {dt.strftime('%Y-%m-%d %H:%M:%S')} {motion_indicator}{pressure_indicator}") 429 else: 430 self.error_count += 1 431 print(f"❌ Publish failed for timestamp {timestamp}") 432 433 # Small delay to avoid overwhelming the broker 434 time.sleep(0.01) 435 436 except Exception as e: 437 self.error_count += 1 438 print(f"❌ Error processing record: {e}") 439 440 self.skip_count += batch_skip 441 print(f"✅ Batch complete: {batch_success} processed, {batch_skip} skipped") 442 443 # Save state periodically 444 if batch_num % 5 == 0: 445 self.save_state() 446 447 def run_upload(self, batch_size: int = 100): 448 """Main upload/update process""" 449 mode_label = "update" if self.update_mode else "upload" 450 print(f"🚀 Starting enhanced LifeCycle to OwnTracks {mode_label}...") 451 452 # Setup MQTT connection 453 if not self.setup_mqtt(): 454 return False 455 456 # Load location data 457 location_data = self.load_location_data() 458 if not location_data: 459 print(f"❌ No data to {mode_label}") 460 return False 461 462 # Upload/update in batches 463 total_batches = (len(location_data) + batch_size - 1) // batch_size 464 465 try: 466 for i in range(0, len(location_data), batch_size): 467 batch = location_data[i:i + batch_size] 468 batch_num = (i // batch_size) + 1 469 470 self.upload_batch(batch, batch_num, total_batches) 471 472 # Check connection 473 if not self.connected: 474 print("❌ Lost connection, reconnecting...") 475 if not self.setup_mqtt(): 476 break 477 478 except KeyboardInterrupt: 479 print(f"\n⚠️ {mode_label.capitalize()} interrupted by user") 480 481 except Exception as e: 482 print(f"{mode_label.capitalize()} error: {e}") 483 484 finally: 485 # Final state save 486 self.save_state() 487 488 # Cleanup 489 if self.mqtt_client: 490 self.mqtt_client.loop_stop() 491 self.mqtt_client.disconnect() 492 493 # Enhanced summary 494 total_processed = self.upload_count + self.update_count 495 print(f"\n📊 Enhanced {mode_label.capitalize()} Summary:") 496 if self.update_mode: 497 print(f" 🔄 Updated: {self.update_count}") 498 else: 499 print(f" ✅ Uploaded: {self.upload_count}") 500 print(f" ⏭️ Skipped: {self.skip_count}") 501 print(f" ❌ Errors: {self.error_count}") 502 print(f" 🏃 With motion data: ~{len(self.motion_data)} available") 503 print(f" 📁 State saved to: {self.state_file}") 504 505 return True 506 507def create_example_config(): 508 """Create an example configuration file""" 509 example_config = { 510 "mqtt_host": "your-mqtt-host.com", 511 "mqtt_port": 8883, 512 "mqtt_user": "your-username", 513 "mqtt_pass": "your-password", 514 "device_id": "lifecycle", 515 "data_dir": "lifecycle_export", 516 "state_file": "owntracks_upload_state.json", 517 "max_accuracy": 500.0, 518 "min_timestamp": 946684800, 519 "motion_time_window": 300 520 } 521 522 with open('config.json.example', 'w') as f: 523 json.dump(example_config, f, indent=2) 524 525 print("📝 Created config.json.example - copy to config.json and edit with your settings") 526 527def main(): 528 import argparse 529 530 parser = argparse.ArgumentParser( 531 description='Enhanced LifeCycle to OwnTracks uploader', 532 epilog='Configuration can be provided via config.json file or environment variables' 533 ) 534 parser.add_argument('--config', '-c', type=str, default='config.json', 535 help='Path to configuration file (default: config.json)') 536 parser.add_argument('--create-config', action='store_true', 537 help='Create example configuration file and exit') 538 parser.add_argument('--update', action='store_true', 539 help='Update existing records with enhanced data instead of uploading new ones') 540 parser.add_argument('--batch-size', type=int, default=50, 541 help='Batch size for uploads (default: 50)') 542 543 args = parser.parse_args() 544 545 if args.create_config: 546 create_example_config() 547 return 0 548 549 # Load configuration 550 config = load_config(args.config) 551 552 # Validate required configuration 553 required_fields = ['mqtt_host', 'mqtt_user', 'mqtt_pass'] 554 missing_fields = [field for field in required_fields 555 if not config.get(field) or config[field] in ['your-mqtt-host.com', 'your-username', 'your-password']] 556 557 if missing_fields: 558 print(f"❌ Missing or default configuration for: {', '.join(missing_fields)}") 559 print("Please update your config.json file or set environment variables:") 560 for field in missing_fields: 561 env_var = field.upper().replace('_', '_') 562 print(f" export {env_var}=your_value") 563 print("\nOr run with --create-config to create an example configuration file") 564 return 1 565 566 location_file = f"{config['data_dir']}/LocationEvent.json" 567 if not os.path.exists(location_file): 568 print(f"❌ Location data file not found: {location_file}") 569 print("Run ./export_lifecycle_data.sh first to export data") 570 return 1 571 572 uploader = EnhancedOwnTracksUploader(config, update_mode=args.update) 573 success = uploader.run_upload(batch_size=args.batch_size) 574 575 return 0 if success else 1 576 577if __name__ == "__main__": 578 sys.exit(main())