#!/usr/bin/env python3 """ LifeCycle to OwnTracks Upload Script Converts and uploads LifeCycle GPS data to OwnTracks MQTT broker with high fidelity """ import json import ssl import time import os import sys from typing import Dict, Any, List, Set, Optional from datetime import datetime, timezone import hashlib import paho.mqtt.client as mqtt # Configuration MQTT_HOST = "" MQTT_PORT = 8883 MQTT_USER = "" MQTT_PASS = "" DEVICE_ID = "" TOPIC = f"owntracks/{MQTT_USER}/{DEVICE_ID}" # Data file paths DATA_DIR = "lifecycle_export" LOCATION_FILE = f"{DATA_DIR}/LocationEvent.json" TIMEZONE_FILE = f"{DATA_DIR}/TimeZone.json" STATE_FILE = "owntracks_upload_state.json" # Quality thresholds MAX_ACCURACY = 500.0 # Skip points with accuracy > 500m MIN_TIMESTAMP = 946684800 # Year 2000 epoch (sanity check) class OwnTracksUploader: def __init__(self): self.mqtt_client = None self.connected = False self.upload_count = 0 self.skip_count = 0 self.error_count = 0 self.uploaded_timestamps = set() self.timezone_map = {} # Load previous state if exists self.load_state() # Load timezone mapping self.load_timezones() def load_state(self): """Load previously uploaded timestamps to avoid duplicates""" if os.path.exists(STATE_FILE): try: with open(STATE_FILE, 'r') as f: state = json.load(f) self.uploaded_timestamps = set(state.get('uploaded_timestamps', [])) self.upload_count = state.get('upload_count', 0) print(f"šŸ“Š Loaded state: {len(self.uploaded_timestamps)} previously uploaded records") except Exception as e: print(f"āš ļø Warning: Could not load state file: {e}") def save_state(self): """Save current upload state""" try: state = { 'uploaded_timestamps': list(self.uploaded_timestamps), 'upload_count': self.upload_count, 'last_update': datetime.now().isoformat() } with open(STATE_FILE, 'w') as f: json.dump(state, f, indent=2) except Exception as e: print(f"āš ļø Warning: Could not save state: {e}") def load_timezones(self): """Load timezone mapping from LifeCycle data""" try: with open(TIMEZONE_FILE, 'r') as f: timezones = json.load(f) self.timezone_map = {tz['id']: tz['name'] for tz in timezones} print(f"šŸ“ Loaded {len(self.timezone_map)} timezones") except Exception as e: print(f"āš ļø Warning: Could not load timezones: {e}") self.timezone_map = {0: "Europe/London"} # Default fallback def setup_mqtt(self): """Setup MQTT client with SSL connection""" try: # Create client with callback API version 2 self.mqtt_client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2) self.mqtt_client.username_pw_set(MQTT_USER, MQTT_PASS) # Setup SSL context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) context.check_hostname = False context.verify_mode = ssl.CERT_NONE self.mqtt_client.tls_set_context(context) # Set callbacks self.mqtt_client.on_connect = self.on_connect self.mqtt_client.on_publish = self.on_publish self.mqtt_client.on_disconnect = self.on_disconnect # Connect print(f"šŸ”Œ Connecting to {MQTT_HOST}:{MQTT_PORT}...") self.mqtt_client.connect(MQTT_HOST, MQTT_PORT, 60) self.mqtt_client.loop_start() # Wait for connection timeout = 10 while not self.connected and timeout > 0: time.sleep(0.5) timeout -= 0.5 if not self.connected: raise Exception("Connection timeout") return True except Exception as e: print(f"āŒ MQTT setup failed: {e}") return False def on_connect(self, client, userdata, flags, reason_code, properties): """MQTT connection callback""" if reason_code == 0: self.connected = True print(f"āœ… Connected to OwnTracks MQTT broker") else: print(f"āŒ Connection failed with code {reason_code}") def on_publish(self, client, userdata, mid, reason_code, properties): """MQTT publish callback""" if reason_code != 0: self.error_count += 1 print(f"āŒ Publish failed: {reason_code}") def on_disconnect(self, client, userdata, reason_code, properties): """MQTT disconnect callback""" self.connected = False if reason_code != 0: print(f"āš ļø Unexpected disconnection: {reason_code}") def convert_to_owntracks(self, location_event: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Convert LifeCycle LocationEvent to OwnTracks format""" try: # Extract basic fields timestamp = location_event.get('timestamp') lat = location_event.get('latitude') lon = location_event.get('longitude') # Validation if not all([timestamp, lat is not None, lon is not None]): return None if timestamp < MIN_TIMESTAMP: return None # Check accuracy threshold h_accuracy = location_event.get('hAccuracy', 0) if h_accuracy > MAX_ACCURACY: return None # Build OwnTracks message owntracks_msg = { "_type": "location", "tst": int(timestamp), "lat": float(lat), "lon": float(lon), "acc": int(h_accuracy), "tid": "lc" # Tracker ID for LifeCycle } # Add optional fields if valid altitude = location_event.get('altitude') if altitude and altitude != -1.0: owntracks_msg['alt'] = int(altitude) v_accuracy = location_event.get('vAccuracy') if v_accuracy and v_accuracy > 0: owntracks_msg['vac'] = int(v_accuracy) speed = location_event.get('speed') if speed and speed >= 0: owntracks_msg['vel'] = int(speed * 3.6) # Convert m/s to km/h course = location_event.get('course') if course and course >= 0: owntracks_msg['cog'] = int(course) # Add WiFi info if available wifi = location_event.get('wifi') if wifi: owntracks_msg['SSID'] = wifi wifi_id = location_event.get('wifiID') if wifi_id: owntracks_msg['BSSID'] = wifi_id return owntracks_msg except Exception as e: print(f"āŒ Conversion error: {e}") return None def load_location_data(self) -> List[Dict[str, Any]]: """Load LocationEvent data from JSON file""" try: with open(LOCATION_FILE, 'r') as f: data = json.load(f) print(f"šŸ“ Loaded {len(data)} location records") # Filter out already uploaded records new_data = [record for record in data if record.get('timestamp') not in self.uploaded_timestamps] print(f"šŸ“ {len(new_data)} new records to upload") return new_data except Exception as e: print(f"āŒ Error loading location data: {e}") return [] def upload_batch(self, batch: List[Dict[str, Any]], batch_num: int, total_batches: int): """Upload a batch of location records""" print(f"\nšŸ“¤ Uploading batch {batch_num}/{total_batches} ({len(batch)} records)...") batch_success = 0 batch_skip = 0 for i, location_event in enumerate(batch): try: # Convert to OwnTracks format owntracks_msg = self.convert_to_owntracks(location_event) if not owntracks_msg: batch_skip += 1 continue # Check if already uploaded timestamp = location_event.get('timestamp') if timestamp in self.uploaded_timestamps: batch_skip += 1 continue # Publish to MQTT payload = json.dumps(owntracks_msg) result = self.mqtt_client.publish(TOPIC, payload, qos=1, retain=False) if result.rc == 0: self.uploaded_timestamps.add(timestamp) batch_success += 1 self.upload_count += 1 # Show progress if batch_success % 10 == 0: dt = datetime.fromtimestamp(timestamp) print(f" šŸ“ {batch_success}/{len(batch)} - {dt.strftime('%Y-%m-%d %H:%M:%S')}") else: self.error_count += 1 print(f"āŒ Publish failed for timestamp {timestamp}") # Small delay to avoid overwhelming the broker time.sleep(0.01) except Exception as e: self.error_count += 1 print(f"āŒ Error uploading record: {e}") self.skip_count += batch_skip print(f"āœ… Batch complete: {batch_success} uploaded, {batch_skip} skipped") # Save state periodically if batch_num % 10 == 0: self.save_state() def run_upload(self, batch_size: int = 100): """Main upload process""" print("šŸš€ Starting LifeCycle to OwnTracks upload...") # Setup MQTT connection if not self.setup_mqtt(): return False # Load location data location_data = self.load_location_data() if not location_data: print("āŒ No data to upload") return False # Upload in batches total_batches = (len(location_data) + batch_size - 1) // batch_size try: for i in range(0, len(location_data), batch_size): batch = location_data[i:i + batch_size] batch_num = (i // batch_size) + 1 self.upload_batch(batch, batch_num, total_batches) # Check connection if not self.connected: print("āŒ Lost connection, reconnecting...") if not self.setup_mqtt(): break except KeyboardInterrupt: print("\nāš ļø Upload interrupted by user") except Exception as e: print(f"āŒ Upload error: {e}") finally: # Final state save self.save_state() # Cleanup if self.mqtt_client: self.mqtt_client.loop_stop() self.mqtt_client.disconnect() # Summary print(f"\nšŸ“Š Upload Summary:") print(f" āœ… Uploaded: {self.upload_count}") print(f" ā­ļø Skipped: {self.skip_count}") print(f" āŒ Errors: {self.error_count}") print(f" šŸ“ State saved to: {STATE_FILE}") return True def main(): if not os.path.exists(LOCATION_FILE): print(f"āŒ Location data file not found: {LOCATION_FILE}") print("Run ./export_lifecycle_data.sh first to export data") return 1 uploader = OwnTracksUploader() success = uploader.run_upload() return 0 if success else 1 if __name__ == "__main__": sys.exit(main())