Script to upload Life Cycle database GPS locations to OwnTracks
lifecycle_to_owntracks.py
344 lines 12 kB view raw
1#!/usr/bin/env python3 2""" 3LifeCycle to OwnTracks Upload Script 4Converts and uploads LifeCycle GPS data to OwnTracks MQTT broker with high fidelity 5""" 6 7import json 8import ssl 9import time 10import os 11import sys 12from typing import Dict, Any, List, Set, Optional 13from datetime import datetime, timezone 14import hashlib 15import paho.mqtt.client as mqtt 16 17# Configuration 18MQTT_HOST = "" 19MQTT_PORT = 8883 20MQTT_USER = "" 21MQTT_PASS = "" 22DEVICE_ID = "" 23TOPIC = f"owntracks/{MQTT_USER}/{DEVICE_ID}" 24 25# Data file paths 26DATA_DIR = "lifecycle_export" 27LOCATION_FILE = f"{DATA_DIR}/LocationEvent.json" 28TIMEZONE_FILE = f"{DATA_DIR}/TimeZone.json" 29STATE_FILE = "owntracks_upload_state.json" 30 31# Quality thresholds 32MAX_ACCURACY = 500.0 # Skip points with accuracy > 500m 33MIN_TIMESTAMP = 946684800 # Year 2000 epoch (sanity check) 34 35class OwnTracksUploader: 36 def __init__(self): 37 self.mqtt_client = None 38 self.connected = False 39 self.upload_count = 0 40 self.skip_count = 0 41 self.error_count = 0 42 self.uploaded_timestamps = set() 43 self.timezone_map = {} 44 45 # Load previous state if exists 46 self.load_state() 47 48 # Load timezone mapping 49 self.load_timezones() 50 51 def load_state(self): 52 """Load previously uploaded timestamps to avoid duplicates""" 53 if os.path.exists(STATE_FILE): 54 try: 55 with open(STATE_FILE, 'r') as f: 56 state = json.load(f) 57 self.uploaded_timestamps = set(state.get('uploaded_timestamps', [])) 58 self.upload_count = state.get('upload_count', 0) 59 print(f"📊 Loaded state: {len(self.uploaded_timestamps)} previously uploaded records") 60 except Exception as e: 61 print(f"⚠️ Warning: Could not load state file: {e}") 62 63 def save_state(self): 64 """Save current upload state""" 65 try: 66 state = { 67 'uploaded_timestamps': list(self.uploaded_timestamps), 68 'upload_count': self.upload_count, 69 'last_update': datetime.now().isoformat() 70 } 71 with open(STATE_FILE, 'w') as f: 72 json.dump(state, f, indent=2) 73 except Exception as e: 74 print(f"⚠️ Warning: Could not save state: {e}") 75 76 def load_timezones(self): 77 """Load timezone mapping from LifeCycle data""" 78 try: 79 with open(TIMEZONE_FILE, 'r') as f: 80 timezones = json.load(f) 81 self.timezone_map = {tz['id']: tz['name'] for tz in timezones} 82 print(f"📍 Loaded {len(self.timezone_map)} timezones") 83 except Exception as e: 84 print(f"⚠️ Warning: Could not load timezones: {e}") 85 self.timezone_map = {0: "Europe/London"} # Default fallback 86 87 def setup_mqtt(self): 88 """Setup MQTT client with SSL connection""" 89 try: 90 # Create client with callback API version 2 91 self.mqtt_client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2) 92 self.mqtt_client.username_pw_set(MQTT_USER, MQTT_PASS) 93 94 # Setup SSL 95 context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) 96 context.check_hostname = False 97 context.verify_mode = ssl.CERT_NONE 98 self.mqtt_client.tls_set_context(context) 99 100 # Set callbacks 101 self.mqtt_client.on_connect = self.on_connect 102 self.mqtt_client.on_publish = self.on_publish 103 self.mqtt_client.on_disconnect = self.on_disconnect 104 105 # Connect 106 print(f"🔌 Connecting to {MQTT_HOST}:{MQTT_PORT}...") 107 self.mqtt_client.connect(MQTT_HOST, MQTT_PORT, 60) 108 self.mqtt_client.loop_start() 109 110 # Wait for connection 111 timeout = 10 112 while not self.connected and timeout > 0: 113 time.sleep(0.5) 114 timeout -= 0.5 115 116 if not self.connected: 117 raise Exception("Connection timeout") 118 119 return True 120 121 except Exception as e: 122 print(f"❌ MQTT setup failed: {e}") 123 return False 124 125 def on_connect(self, client, userdata, flags, reason_code, properties): 126 """MQTT connection callback""" 127 if reason_code == 0: 128 self.connected = True 129 print(f"✅ Connected to OwnTracks MQTT broker") 130 else: 131 print(f"❌ Connection failed with code {reason_code}") 132 133 def on_publish(self, client, userdata, mid, reason_code, properties): 134 """MQTT publish callback""" 135 if reason_code != 0: 136 self.error_count += 1 137 print(f"❌ Publish failed: {reason_code}") 138 139 def on_disconnect(self, client, userdata, reason_code, properties): 140 """MQTT disconnect callback""" 141 self.connected = False 142 if reason_code != 0: 143 print(f"⚠️ Unexpected disconnection: {reason_code}") 144 145 def convert_to_owntracks(self, location_event: Dict[str, Any]) -> Optional[Dict[str, Any]]: 146 """Convert LifeCycle LocationEvent to OwnTracks format""" 147 try: 148 # Extract basic fields 149 timestamp = location_event.get('timestamp') 150 lat = location_event.get('latitude') 151 lon = location_event.get('longitude') 152 153 # Validation 154 if not all([timestamp, lat is not None, lon is not None]): 155 return None 156 157 if timestamp < MIN_TIMESTAMP: 158 return None 159 160 # Check accuracy threshold 161 h_accuracy = location_event.get('hAccuracy', 0) 162 if h_accuracy > MAX_ACCURACY: 163 return None 164 165 # Build OwnTracks message 166 owntracks_msg = { 167 "_type": "location", 168 "tst": int(timestamp), 169 "lat": float(lat), 170 "lon": float(lon), 171 "acc": int(h_accuracy), 172 "tid": "lc" # Tracker ID for LifeCycle 173 } 174 175 # Add optional fields if valid 176 altitude = location_event.get('altitude') 177 if altitude and altitude != -1.0: 178 owntracks_msg['alt'] = int(altitude) 179 180 v_accuracy = location_event.get('vAccuracy') 181 if v_accuracy and v_accuracy > 0: 182 owntracks_msg['vac'] = int(v_accuracy) 183 184 speed = location_event.get('speed') 185 if speed and speed >= 0: 186 owntracks_msg['vel'] = int(speed * 3.6) # Convert m/s to km/h 187 188 course = location_event.get('course') 189 if course and course >= 0: 190 owntracks_msg['cog'] = int(course) 191 192 # Add WiFi info if available 193 wifi = location_event.get('wifi') 194 if wifi: 195 owntracks_msg['SSID'] = wifi 196 197 wifi_id = location_event.get('wifiID') 198 if wifi_id: 199 owntracks_msg['BSSID'] = wifi_id 200 201 return owntracks_msg 202 203 except Exception as e: 204 print(f"❌ Conversion error: {e}") 205 return None 206 207 def load_location_data(self) -> List[Dict[str, Any]]: 208 """Load LocationEvent data from JSON file""" 209 try: 210 with open(LOCATION_FILE, 'r') as f: 211 data = json.load(f) 212 print(f"📍 Loaded {len(data)} location records") 213 214 # Filter out already uploaded records 215 new_data = [record for record in data 216 if record.get('timestamp') not in self.uploaded_timestamps] 217 218 print(f"📍 {len(new_data)} new records to upload") 219 return new_data 220 221 except Exception as e: 222 print(f"❌ Error loading location data: {e}") 223 return [] 224 225 def upload_batch(self, batch: List[Dict[str, Any]], batch_num: int, total_batches: int): 226 """Upload a batch of location records""" 227 print(f"\n📤 Uploading batch {batch_num}/{total_batches} ({len(batch)} records)...") 228 229 batch_success = 0 230 batch_skip = 0 231 232 for i, location_event in enumerate(batch): 233 try: 234 # Convert to OwnTracks format 235 owntracks_msg = self.convert_to_owntracks(location_event) 236 237 if not owntracks_msg: 238 batch_skip += 1 239 continue 240 241 # Check if already uploaded 242 timestamp = location_event.get('timestamp') 243 if timestamp in self.uploaded_timestamps: 244 batch_skip += 1 245 continue 246 247 # Publish to MQTT 248 payload = json.dumps(owntracks_msg) 249 result = self.mqtt_client.publish(TOPIC, payload, qos=1, retain=False) 250 251 if result.rc == 0: 252 self.uploaded_timestamps.add(timestamp) 253 batch_success += 1 254 self.upload_count += 1 255 256 # Show progress 257 if batch_success % 10 == 0: 258 dt = datetime.fromtimestamp(timestamp) 259 print(f" 📍 {batch_success}/{len(batch)} - {dt.strftime('%Y-%m-%d %H:%M:%S')}") 260 else: 261 self.error_count += 1 262 print(f"❌ Publish failed for timestamp {timestamp}") 263 264 # Small delay to avoid overwhelming the broker 265 time.sleep(0.01) 266 267 except Exception as e: 268 self.error_count += 1 269 print(f"❌ Error uploading record: {e}") 270 271 self.skip_count += batch_skip 272 print(f"✅ Batch complete: {batch_success} uploaded, {batch_skip} skipped") 273 274 # Save state periodically 275 if batch_num % 10 == 0: 276 self.save_state() 277 278 def run_upload(self, batch_size: int = 100): 279 """Main upload process""" 280 print("🚀 Starting LifeCycle to OwnTracks upload...") 281 282 # Setup MQTT connection 283 if not self.setup_mqtt(): 284 return False 285 286 # Load location data 287 location_data = self.load_location_data() 288 if not location_data: 289 print("❌ No data to upload") 290 return False 291 292 # Upload in batches 293 total_batches = (len(location_data) + batch_size - 1) // batch_size 294 295 try: 296 for i in range(0, len(location_data), batch_size): 297 batch = location_data[i:i + batch_size] 298 batch_num = (i // batch_size) + 1 299 300 self.upload_batch(batch, batch_num, total_batches) 301 302 # Check connection 303 if not self.connected: 304 print("❌ Lost connection, reconnecting...") 305 if not self.setup_mqtt(): 306 break 307 308 except KeyboardInterrupt: 309 print("\n⚠️ Upload interrupted by user") 310 311 except Exception as e: 312 print(f"❌ Upload error: {e}") 313 314 finally: 315 # Final state save 316 self.save_state() 317 318 # Cleanup 319 if self.mqtt_client: 320 self.mqtt_client.loop_stop() 321 self.mqtt_client.disconnect() 322 323 # Summary 324 print(f"\n📊 Upload Summary:") 325 print(f" ✅ Uploaded: {self.upload_count}") 326 print(f" ⏭️ Skipped: {self.skip_count}") 327 print(f" ❌ Errors: {self.error_count}") 328 print(f" 📁 State saved to: {STATE_FILE}") 329 330 return True 331 332def main(): 333 if not os.path.exists(LOCATION_FILE): 334 print(f"❌ Location data file not found: {LOCATION_FILE}") 335 print("Run ./export_lifecycle_data.sh first to export data") 336 return 1 337 338 uploader = OwnTracksUploader() 339 success = uploader.run_upload() 340 341 return 0 if success else 1 342 343if __name__ == "__main__": 344 sys.exit(main())