lifecycle_to_owntracks.py
1#!/usr/bin/env python3
2"""
3LifeCycle to OwnTracks Upload Script
4Converts and uploads LifeCycle GPS data to OwnTracks MQTT broker with high fidelity
5"""
6
7import json
8import ssl
9import time
10import os
11import sys
12from typing import Dict, Any, List, Set, Optional
13from datetime import datetime, timezone
14import hashlib
15import paho.mqtt.client as mqtt
16
17# Configuration
18MQTT_HOST = ""
19MQTT_PORT = 8883
20MQTT_USER = ""
21MQTT_PASS = ""
22DEVICE_ID = ""
23TOPIC = f"owntracks/{MQTT_USER}/{DEVICE_ID}"
24
25# Data file paths
26DATA_DIR = "lifecycle_export"
27LOCATION_FILE = f"{DATA_DIR}/LocationEvent.json"
28TIMEZONE_FILE = f"{DATA_DIR}/TimeZone.json"
29STATE_FILE = "owntracks_upload_state.json"
30
31# Quality thresholds
32MAX_ACCURACY = 500.0 # Skip points with accuracy > 500m
33MIN_TIMESTAMP = 946684800 # Year 2000 epoch (sanity check)
34
35class OwnTracksUploader:
36 def __init__(self):
37 self.mqtt_client = None
38 self.connected = False
39 self.upload_count = 0
40 self.skip_count = 0
41 self.error_count = 0
42 self.uploaded_timestamps = set()
43 self.timezone_map = {}
44
45 # Load previous state if exists
46 self.load_state()
47
48 # Load timezone mapping
49 self.load_timezones()
50
51 def load_state(self):
52 """Load previously uploaded timestamps to avoid duplicates"""
53 if os.path.exists(STATE_FILE):
54 try:
55 with open(STATE_FILE, 'r') as f:
56 state = json.load(f)
57 self.uploaded_timestamps = set(state.get('uploaded_timestamps', []))
58 self.upload_count = state.get('upload_count', 0)
59 print(f"📊 Loaded state: {len(self.uploaded_timestamps)} previously uploaded records")
60 except Exception as e:
61 print(f"⚠️ Warning: Could not load state file: {e}")
62
63 def save_state(self):
64 """Save current upload state"""
65 try:
66 state = {
67 'uploaded_timestamps': list(self.uploaded_timestamps),
68 'upload_count': self.upload_count,
69 'last_update': datetime.now().isoformat()
70 }
71 with open(STATE_FILE, 'w') as f:
72 json.dump(state, f, indent=2)
73 except Exception as e:
74 print(f"⚠️ Warning: Could not save state: {e}")
75
76 def load_timezones(self):
77 """Load timezone mapping from LifeCycle data"""
78 try:
79 with open(TIMEZONE_FILE, 'r') as f:
80 timezones = json.load(f)
81 self.timezone_map = {tz['id']: tz['name'] for tz in timezones}
82 print(f"📍 Loaded {len(self.timezone_map)} timezones")
83 except Exception as e:
84 print(f"⚠️ Warning: Could not load timezones: {e}")
85 self.timezone_map = {0: "Europe/London"} # Default fallback
86
87 def setup_mqtt(self):
88 """Setup MQTT client with SSL connection"""
89 try:
90 # Create client with callback API version 2
91 self.mqtt_client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
92 self.mqtt_client.username_pw_set(MQTT_USER, MQTT_PASS)
93
94 # Setup SSL
95 context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
96 context.check_hostname = False
97 context.verify_mode = ssl.CERT_NONE
98 self.mqtt_client.tls_set_context(context)
99
100 # Set callbacks
101 self.mqtt_client.on_connect = self.on_connect
102 self.mqtt_client.on_publish = self.on_publish
103 self.mqtt_client.on_disconnect = self.on_disconnect
104
105 # Connect
106 print(f"🔌 Connecting to {MQTT_HOST}:{MQTT_PORT}...")
107 self.mqtt_client.connect(MQTT_HOST, MQTT_PORT, 60)
108 self.mqtt_client.loop_start()
109
110 # Wait for connection
111 timeout = 10
112 while not self.connected and timeout > 0:
113 time.sleep(0.5)
114 timeout -= 0.5
115
116 if not self.connected:
117 raise Exception("Connection timeout")
118
119 return True
120
121 except Exception as e:
122 print(f"❌ MQTT setup failed: {e}")
123 return False
124
125 def on_connect(self, client, userdata, flags, reason_code, properties):
126 """MQTT connection callback"""
127 if reason_code == 0:
128 self.connected = True
129 print(f"✅ Connected to OwnTracks MQTT broker")
130 else:
131 print(f"❌ Connection failed with code {reason_code}")
132
133 def on_publish(self, client, userdata, mid, reason_code, properties):
134 """MQTT publish callback"""
135 if reason_code != 0:
136 self.error_count += 1
137 print(f"❌ Publish failed: {reason_code}")
138
139 def on_disconnect(self, client, userdata, reason_code, properties):
140 """MQTT disconnect callback"""
141 self.connected = False
142 if reason_code != 0:
143 print(f"⚠️ Unexpected disconnection: {reason_code}")
144
145 def convert_to_owntracks(self, location_event: Dict[str, Any]) -> Optional[Dict[str, Any]]:
146 """Convert LifeCycle LocationEvent to OwnTracks format"""
147 try:
148 # Extract basic fields
149 timestamp = location_event.get('timestamp')
150 lat = location_event.get('latitude')
151 lon = location_event.get('longitude')
152
153 # Validation
154 if not all([timestamp, lat is not None, lon is not None]):
155 return None
156
157 if timestamp < MIN_TIMESTAMP:
158 return None
159
160 # Check accuracy threshold
161 h_accuracy = location_event.get('hAccuracy', 0)
162 if h_accuracy > MAX_ACCURACY:
163 return None
164
165 # Build OwnTracks message
166 owntracks_msg = {
167 "_type": "location",
168 "tst": int(timestamp),
169 "lat": float(lat),
170 "lon": float(lon),
171 "acc": int(h_accuracy),
172 "tid": "lc" # Tracker ID for LifeCycle
173 }
174
175 # Add optional fields if valid
176 altitude = location_event.get('altitude')
177 if altitude and altitude != -1.0:
178 owntracks_msg['alt'] = int(altitude)
179
180 v_accuracy = location_event.get('vAccuracy')
181 if v_accuracy and v_accuracy > 0:
182 owntracks_msg['vac'] = int(v_accuracy)
183
184 speed = location_event.get('speed')
185 if speed and speed >= 0:
186 owntracks_msg['vel'] = int(speed * 3.6) # Convert m/s to km/h
187
188 course = location_event.get('course')
189 if course and course >= 0:
190 owntracks_msg['cog'] = int(course)
191
192 # Add WiFi info if available
193 wifi = location_event.get('wifi')
194 if wifi:
195 owntracks_msg['SSID'] = wifi
196
197 wifi_id = location_event.get('wifiID')
198 if wifi_id:
199 owntracks_msg['BSSID'] = wifi_id
200
201 return owntracks_msg
202
203 except Exception as e:
204 print(f"❌ Conversion error: {e}")
205 return None
206
207 def load_location_data(self) -> List[Dict[str, Any]]:
208 """Load LocationEvent data from JSON file"""
209 try:
210 with open(LOCATION_FILE, 'r') as f:
211 data = json.load(f)
212 print(f"📍 Loaded {len(data)} location records")
213
214 # Filter out already uploaded records
215 new_data = [record for record in data
216 if record.get('timestamp') not in self.uploaded_timestamps]
217
218 print(f"📍 {len(new_data)} new records to upload")
219 return new_data
220
221 except Exception as e:
222 print(f"❌ Error loading location data: {e}")
223 return []
224
225 def upload_batch(self, batch: List[Dict[str, Any]], batch_num: int, total_batches: int):
226 """Upload a batch of location records"""
227 print(f"\n📤 Uploading batch {batch_num}/{total_batches} ({len(batch)} records)...")
228
229 batch_success = 0
230 batch_skip = 0
231
232 for i, location_event in enumerate(batch):
233 try:
234 # Convert to OwnTracks format
235 owntracks_msg = self.convert_to_owntracks(location_event)
236
237 if not owntracks_msg:
238 batch_skip += 1
239 continue
240
241 # Check if already uploaded
242 timestamp = location_event.get('timestamp')
243 if timestamp in self.uploaded_timestamps:
244 batch_skip += 1
245 continue
246
247 # Publish to MQTT
248 payload = json.dumps(owntracks_msg)
249 result = self.mqtt_client.publish(TOPIC, payload, qos=1, retain=False)
250
251 if result.rc == 0:
252 self.uploaded_timestamps.add(timestamp)
253 batch_success += 1
254 self.upload_count += 1
255
256 # Show progress
257 if batch_success % 10 == 0:
258 dt = datetime.fromtimestamp(timestamp)
259 print(f" 📍 {batch_success}/{len(batch)} - {dt.strftime('%Y-%m-%d %H:%M:%S')}")
260 else:
261 self.error_count += 1
262 print(f"❌ Publish failed for timestamp {timestamp}")
263
264 # Small delay to avoid overwhelming the broker
265 time.sleep(0.01)
266
267 except Exception as e:
268 self.error_count += 1
269 print(f"❌ Error uploading record: {e}")
270
271 self.skip_count += batch_skip
272 print(f"✅ Batch complete: {batch_success} uploaded, {batch_skip} skipped")
273
274 # Save state periodically
275 if batch_num % 10 == 0:
276 self.save_state()
277
278 def run_upload(self, batch_size: int = 100):
279 """Main upload process"""
280 print("🚀 Starting LifeCycle to OwnTracks upload...")
281
282 # Setup MQTT connection
283 if not self.setup_mqtt():
284 return False
285
286 # Load location data
287 location_data = self.load_location_data()
288 if not location_data:
289 print("❌ No data to upload")
290 return False
291
292 # Upload in batches
293 total_batches = (len(location_data) + batch_size - 1) // batch_size
294
295 try:
296 for i in range(0, len(location_data), batch_size):
297 batch = location_data[i:i + batch_size]
298 batch_num = (i // batch_size) + 1
299
300 self.upload_batch(batch, batch_num, total_batches)
301
302 # Check connection
303 if not self.connected:
304 print("❌ Lost connection, reconnecting...")
305 if not self.setup_mqtt():
306 break
307
308 except KeyboardInterrupt:
309 print("\n⚠️ Upload interrupted by user")
310
311 except Exception as e:
312 print(f"❌ Upload error: {e}")
313
314 finally:
315 # Final state save
316 self.save_state()
317
318 # Cleanup
319 if self.mqtt_client:
320 self.mqtt_client.loop_stop()
321 self.mqtt_client.disconnect()
322
323 # Summary
324 print(f"\n📊 Upload Summary:")
325 print(f" ✅ Uploaded: {self.upload_count}")
326 print(f" ⏭️ Skipped: {self.skip_count}")
327 print(f" ❌ Errors: {self.error_count}")
328 print(f" 📁 State saved to: {STATE_FILE}")
329
330 return True
331
332def main():
333 if not os.path.exists(LOCATION_FILE):
334 print(f"❌ Location data file not found: {LOCATION_FILE}")
335 print("Run ./export_lifecycle_data.sh first to export data")
336 return 1
337
338 uploader = OwnTracksUploader()
339 success = uploader.run_upload()
340
341 return 0 if success else 1
342
343if __name__ == "__main__":
344 sys.exit(main())