Export Lifecycle app GPS locations to Owntracks
1#!/usr/bin/env python3
2# /// script
3# requires-python = ">=3.8"
4# dependencies = [
5# "paho-mqtt>=1.6.0",
6# ]
7# ///
8"""
9Enhanced LifeCycle to OwnTracks Upload Script
10Adds motion activities and improved data handling with update capability
11"""
12
13import json
14import ssl
15import time
16import os
17import sys
18from typing import Dict, Any, List, Set, Optional, Tuple
19from datetime import datetime, timezone
20import hashlib
21import paho.mqtt.client as mqtt
22from collections import defaultdict
23
24# Default configuration
25DEFAULT_CONFIG = {
26 'mqtt_host': 'your-mqtt-host.com',
27 'mqtt_port': 8883,
28 'mqtt_user': 'your-username',
29 'mqtt_pass': 'your-password',
30 'device_id': 'lifecycle',
31 'data_dir': 'lifecycle_export',
32 'state_file': 'owntracks_upload_state.json',
33 'max_accuracy': 500.0,
34 'min_timestamp': 946684800,
35 'motion_time_window': 300
36}
37
38def load_config(config_file: str = None) -> Dict[str, Any]:
39 """Load configuration from file or environment variables"""
40 config = DEFAULT_CONFIG.copy()
41
42 # Try to load from config file
43 if config_file and os.path.exists(config_file):
44 try:
45 with open(config_file, 'r') as f:
46 file_config = json.load(f)
47 config.update(file_config)
48 print(f"📝 Loaded config from {config_file}")
49 except Exception as e:
50 print(f"⚠️ Warning: Could not load config file {config_file}: {e}")
51
52 # Override with environment variables if present
53 env_mappings = {
54 'MQTT_HOST': 'mqtt_host',
55 'MQTT_PORT': 'mqtt_port',
56 'MQTT_USER': 'mqtt_user',
57 'MQTT_PASS': 'mqtt_pass',
58 'DEVICE_ID': 'device_id',
59 'DATA_DIR': 'data_dir',
60 'STATE_FILE': 'state_file'
61 }
62
63 for env_var, config_key in env_mappings.items():
64 if env_var in os.environ:
65 value = os.environ[env_var]
66 # Convert port to int
67 if config_key == 'mqtt_port':
68 try:
69 value = int(value)
70 except ValueError:
71 print(f"⚠️ Warning: Invalid port value in {env_var}, using default")
72 continue
73 config[config_key] = value
74 print(f"🔧 Using {env_var} from environment")
75
76 return config
77
78class EnhancedOwnTracksUploader:
79 def __init__(self, config: Dict[str, Any], update_mode=False):
80 self.config = config
81 self.mqtt_client = None
82 self.connected = False
83 self.upload_count = 0
84 self.update_count = 0
85 self.skip_count = 0
86 self.error_count = 0
87 self.uploaded_timestamps = set()
88 self.updated_timestamps = set()
89 self.timezone_map = {}
90 self.motion_data = {}
91 self.update_mode = update_mode
92
93 # Set up file paths from config
94 self.data_dir = config['data_dir']
95 self.location_file = f"{self.data_dir}/LocationEvent.json"
96 self.motion_file = f"{self.data_dir}/Motion.json"
97 self.timezone_file = f"{self.data_dir}/TimeZone.json"
98 self.state_file = config['state_file']
99 self.topic = f"owntracks/{config['mqtt_user']}/{config['device_id']}"
100
101 # Load previous state if exists
102 self.load_state()
103
104 # Load timezone mapping
105 self.load_timezones()
106
107 # Load motion data for enhanced context
108 self.load_motion_data()
109
110 def load_state(self):
111 """Load previously uploaded/updated timestamps"""
112 if os.path.exists(self.state_file):
113 try:
114 with open(self.state_file, 'r') as f:
115 state = json.load(f)
116 self.uploaded_timestamps = set(state.get('uploaded_timestamps', []))
117 self.updated_timestamps = set(state.get('updated_timestamps', []))
118 self.upload_count = state.get('upload_count', 0)
119 self.update_count = state.get('update_count', 0)
120 print(f"📊 Loaded state: {len(self.uploaded_timestamps)} uploaded, {len(self.updated_timestamps)} updated")
121 except Exception as e:
122 print(f"⚠️ Warning: Could not load state file: {e}")
123
124 def save_state(self):
125 """Save current upload/update state"""
126 try:
127 state = {
128 'uploaded_timestamps': list(self.uploaded_timestamps),
129 'updated_timestamps': list(self.updated_timestamps),
130 'upload_count': self.upload_count,
131 'update_count': self.update_count,
132 'last_update': datetime.now().isoformat()
133 }
134 with open(self.state_file, 'w') as f:
135 json.dump(state, f, indent=2)
136 except Exception as e:
137 print(f"⚠️ Warning: Could not save state: {e}")
138
139 def load_timezones(self):
140 """Load timezone mapping from LifeCycle data"""
141 try:
142 with open(self.timezone_file, 'r') as f:
143 timezones = json.load(f)
144 self.timezone_map = {tz['id']: tz['name'] for tz in timezones}
145 print(f"📍 Loaded {len(self.timezone_map)} timezones")
146 except Exception as e:
147 print(f"⚠️ Warning: Could not load timezones: {e}")
148 self.timezone_map = {0: "Europe/London"}
149
150 def load_motion_data(self):
151 """Load motion classification data for enhanced context"""
152 try:
153 with open(self.motion_file, 'r') as f:
154 motion_records = json.load(f)
155
156 # Index motion data by timestamp for fast lookup
157 for record in motion_records:
158 timestamp = int(record.get('timestamp', 0))
159 if timestamp > 0:
160 self.motion_data[timestamp] = record
161
162 print(f"🏃 Loaded {len(self.motion_data)} motion records")
163 except Exception as e:
164 print(f"⚠️ Warning: Could not load motion data: {e}")
165
166 def find_motion_context(self, location_timestamp: int) -> Optional[Dict[str, Any]]:
167 """Find closest motion data for a location timestamp"""
168 # Try exact match first
169 if location_timestamp in self.motion_data:
170 return self.motion_data[location_timestamp]
171
172 # Find closest within time window
173 closest_time = None
174 min_diff = self.config['motion_time_window']
175
176 for motion_timestamp in self.motion_data:
177 diff = abs(motion_timestamp - location_timestamp)
178 if diff < min_diff:
179 min_diff = diff
180 closest_time = motion_timestamp
181
182 if closest_time:
183 return self.motion_data[closest_time]
184
185 return None
186
187 def build_motion_activities(self, motion_record: Dict[str, Any]) -> List[Dict[str, Any]]:
188 """Convert LifeCycle motion data to OwnTracks motionactivities format"""
189 activities = []
190 confidence = motion_record.get('confidence', 0)
191
192 # Map LifeCycle motion types to OwnTracks activities
193 motion_types = [
194 ('stationary', motion_record.get('stationary', 0)),
195 ('walking', motion_record.get('walking', 0)),
196 ('running', motion_record.get('running', 0)),
197 ('automotive', motion_record.get('automotive', 0)),
198 ('cycling', motion_record.get('cycling', 0))
199 ]
200
201 # Add activities with non-zero values
202 for activity_type, value in motion_types:
203 if value > 0:
204 activities.append({
205 'type': activity_type,
206 'confidence': confidence
207 })
208
209 # If unknown motion, add it
210 if motion_record.get('unknown', 0) > 0:
211 activities.append({
212 'type': 'unknown',
213 'confidence': confidence
214 })
215
216 return activities
217
218 def setup_mqtt(self):
219 """Setup MQTT client with SSL connection"""
220 try:
221 # Create client with callback API version 2
222 self.mqtt_client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
223 self.mqtt_client.username_pw_set(self.config['mqtt_user'], self.config['mqtt_pass'])
224
225 # Setup SSL
226 context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
227 context.check_hostname = False
228 context.verify_mode = ssl.CERT_NONE
229 self.mqtt_client.tls_set_context(context)
230
231 # Set callbacks
232 self.mqtt_client.on_connect = self.on_connect
233 self.mqtt_client.on_publish = self.on_publish
234 self.mqtt_client.on_disconnect = self.on_disconnect
235
236 # Connect
237 print(f"🔌 Connecting to {self.config['mqtt_host']}:{self.config['mqtt_port']}...")
238 self.mqtt_client.connect(self.config['mqtt_host'], self.config['mqtt_port'], 60)
239 self.mqtt_client.loop_start()
240
241 # Wait for connection
242 timeout = 10
243 while not self.connected and timeout > 0:
244 time.sleep(0.5)
245 timeout -= 0.5
246
247 if not self.connected:
248 raise Exception("Connection timeout")
249
250 return True
251
252 except Exception as e:
253 print(f"❌ MQTT setup failed: {e}")
254 return False
255
256 def on_connect(self, client, userdata, flags, reason_code, properties):
257 """MQTT connection callback"""
258 if reason_code == 0:
259 self.connected = True
260 print(f"✅ Connected to OwnTracks MQTT broker")
261 else:
262 print(f"❌ Connection failed with code {reason_code}")
263
264 def on_publish(self, client, userdata, mid, reason_code, properties):
265 """MQTT publish callback"""
266 if reason_code != 0:
267 self.error_count += 1
268 print(f"❌ Publish failed: {reason_code}")
269
270 def on_disconnect(self, client, userdata, reason_code, properties):
271 """MQTT disconnect callback"""
272 self.connected = False
273 if reason_code != 0:
274 print(f"⚠️ Unexpected disconnection: {reason_code}")
275
276 def convert_to_owntracks(self, location_event: Dict[str, Any]) -> Optional[Dict[str, Any]]:
277 """Convert LifeCycle LocationEvent to enhanced OwnTracks format"""
278 try:
279 # Extract basic fields
280 timestamp = location_event.get('timestamp')
281 lat = location_event.get('latitude')
282 lon = location_event.get('longitude')
283
284 # Validation
285 if not all([timestamp, lat is not None, lon is not None]):
286 return None
287
288 if timestamp < self.config['min_timestamp']:
289 return None
290
291 # Check accuracy threshold
292 h_accuracy = location_event.get('hAccuracy', 0)
293 if h_accuracy > self.config['max_accuracy']:
294 return None
295
296 # Build enhanced OwnTracks message
297 owntracks_msg = {
298 "_type": "location",
299 "tst": int(timestamp),
300 "lat": float(lat),
301 "lon": float(lon),
302 "acc": int(h_accuracy),
303 "tid": "lc" # Tracker ID for LifeCycle
304 }
305
306 # Enhanced altitude handling with better precision
307 altitude = location_event.get('altitude')
308 if altitude is not None and altitude != -1.0 and altitude > -1000: # Sanity check
309 owntracks_msg['alt'] = round(altitude, 1) # Keep decimal precision
310
311 # Vertical accuracy
312 v_accuracy = location_event.get('vAccuracy')
313 if v_accuracy and v_accuracy > 0:
314 owntracks_msg['vac'] = int(v_accuracy)
315
316 # Enhanced speed handling
317 speed = location_event.get('speed')
318 if speed is not None and speed >= 0:
319 # Convert m/s to km/h with better precision
320 owntracks_msg['vel'] = round(speed * 3.6, 1)
321
322 # Course over ground
323 course = location_event.get('course')
324 if course is not None and course >= 0:
325 owntracks_msg['cog'] = int(course)
326
327 # WiFi context
328 wifi = location_event.get('wifi')
329 if wifi:
330 owntracks_msg['SSID'] = wifi
331
332 wifi_id = location_event.get('wifiID')
333 if wifi_id:
334 owntracks_msg['BSSID'] = wifi_id
335
336 # Enhanced: Add motion activities
337 motion_context = self.find_motion_context(timestamp)
338 if motion_context:
339 activities = self.build_motion_activities(motion_context)
340 if activities:
341 owntracks_msg['motionactivities'] = activities
342
343 # Add step count if available
344 steps = motion_context.get('steps', 0)
345 if steps > 0:
346 owntracks_msg['steps'] = steps
347
348 # Enhanced: Add barometric pressure estimate from altitude
349 if altitude is not None and altitude > -1000:
350 # Standard atmospheric pressure calculation: P = P0 * (1 - 0.0065*h/T0)^(g*M/(R*0.0065))
351 # Simplified approximation: P ≈ 101.325 * (1 - altitude/44330)^5.255
352 try:
353 pressure = 101.325 * pow((1 - altitude / 44330.0), 5.255)
354 if 50 <= pressure <= 110: # Sanity check for reasonable pressure range
355 owntracks_msg['p'] = round(pressure, 2)
356 except:
357 pass # Skip if calculation fails
358
359 return owntracks_msg
360
361 except Exception as e:
362 print(f"❌ Conversion error: {e}")
363 return None
364
365 def load_location_data(self) -> List[Dict[str, Any]]:
366 """Load LocationEvent data from JSON file"""
367 try:
368 with open(self.location_file, 'r') as f:
369 data = json.load(f)
370 print(f"📍 Loaded {len(data)} location records")
371
372 if self.update_mode:
373 # In update mode, process already uploaded records to enhance them
374 update_data = [record for record in data
375 if (record.get('timestamp') in self.uploaded_timestamps and
376 record.get('timestamp') not in self.updated_timestamps)]
377 print(f"🔄 {len(update_data)} records to update with enhanced data")
378 return update_data
379 else:
380 # Normal mode - filter out already uploaded records
381 new_data = [record for record in data
382 if record.get('timestamp') not in self.uploaded_timestamps]
383 print(f"📍 {len(new_data)} new records to upload")
384 return new_data
385
386 except Exception as e:
387 print(f"❌ Error loading location data: {e}")
388 return []
389
390 def upload_batch(self, batch: List[Dict[str, Any]], batch_num: int, total_batches: int):
391 """Upload or update a batch of location records"""
392 mode_label = "Updating" if self.update_mode else "Uploading"
393 print(f"\n📤 {mode_label} batch {batch_num}/{total_batches} ({len(batch)} records)...")
394
395 batch_success = 0
396 batch_skip = 0
397
398 for i, location_event in enumerate(batch):
399 try:
400 # Convert to enhanced OwnTracks format
401 owntracks_msg = self.convert_to_owntracks(location_event)
402
403 if not owntracks_msg:
404 batch_skip += 1
405 continue
406
407 timestamp = location_event.get('timestamp')
408
409 # Publish to MQTT
410 payload = json.dumps(owntracks_msg)
411 result = self.mqtt_client.publish(self.topic, payload, qos=1, retain=False)
412
413 if result.rc == 0:
414 if self.update_mode:
415 self.updated_timestamps.add(timestamp)
416 self.update_count += 1
417 else:
418 self.uploaded_timestamps.add(timestamp)
419 self.upload_count += 1
420
421 batch_success += 1
422
423 # Show progress with enhanced data indicators
424 if batch_success % 10 == 0:
425 dt = datetime.fromtimestamp(timestamp)
426 motion_indicator = "🏃" if 'motionactivities' in owntracks_msg else ""
427 pressure_indicator = "🌡️" if 'p' in owntracks_msg else ""
428 print(f" 📍 {batch_success}/{len(batch)} - {dt.strftime('%Y-%m-%d %H:%M:%S')} {motion_indicator}{pressure_indicator}")
429 else:
430 self.error_count += 1
431 print(f"❌ Publish failed for timestamp {timestamp}")
432
433 # Small delay to avoid overwhelming the broker
434 time.sleep(0.01)
435
436 except Exception as e:
437 self.error_count += 1
438 print(f"❌ Error processing record: {e}")
439
440 self.skip_count += batch_skip
441 print(f"✅ Batch complete: {batch_success} processed, {batch_skip} skipped")
442
443 # Save state periodically
444 if batch_num % 5 == 0:
445 self.save_state()
446
447 def run_upload(self, batch_size: int = 100):
448 """Main upload/update process"""
449 mode_label = "update" if self.update_mode else "upload"
450 print(f"🚀 Starting enhanced LifeCycle to OwnTracks {mode_label}...")
451
452 # Setup MQTT connection
453 if not self.setup_mqtt():
454 return False
455
456 # Load location data
457 location_data = self.load_location_data()
458 if not location_data:
459 print(f"❌ No data to {mode_label}")
460 return False
461
462 # Upload/update in batches
463 total_batches = (len(location_data) + batch_size - 1) // batch_size
464
465 try:
466 for i in range(0, len(location_data), batch_size):
467 batch = location_data[i:i + batch_size]
468 batch_num = (i // batch_size) + 1
469
470 self.upload_batch(batch, batch_num, total_batches)
471
472 # Check connection
473 if not self.connected:
474 print("❌ Lost connection, reconnecting...")
475 if not self.setup_mqtt():
476 break
477
478 except KeyboardInterrupt:
479 print(f"\n⚠️ {mode_label.capitalize()} interrupted by user")
480
481 except Exception as e:
482 print(f"❌ {mode_label.capitalize()} error: {e}")
483
484 finally:
485 # Final state save
486 self.save_state()
487
488 # Cleanup
489 if self.mqtt_client:
490 self.mqtt_client.loop_stop()
491 self.mqtt_client.disconnect()
492
493 # Enhanced summary
494 total_processed = self.upload_count + self.update_count
495 print(f"\n📊 Enhanced {mode_label.capitalize()} Summary:")
496 if self.update_mode:
497 print(f" 🔄 Updated: {self.update_count}")
498 else:
499 print(f" ✅ Uploaded: {self.upload_count}")
500 print(f" ⏭️ Skipped: {self.skip_count}")
501 print(f" ❌ Errors: {self.error_count}")
502 print(f" 🏃 With motion data: ~{len(self.motion_data)} available")
503 print(f" 📁 State saved to: {self.state_file}")
504
505 return True
506
507def create_example_config():
508 """Create an example configuration file"""
509 example_config = {
510 "mqtt_host": "your-mqtt-host.com",
511 "mqtt_port": 8883,
512 "mqtt_user": "your-username",
513 "mqtt_pass": "your-password",
514 "device_id": "lifecycle",
515 "data_dir": "lifecycle_export",
516 "state_file": "owntracks_upload_state.json",
517 "max_accuracy": 500.0,
518 "min_timestamp": 946684800,
519 "motion_time_window": 300
520 }
521
522 with open('config.json.example', 'w') as f:
523 json.dump(example_config, f, indent=2)
524
525 print("📝 Created config.json.example - copy to config.json and edit with your settings")
526
527def main():
528 import argparse
529
530 parser = argparse.ArgumentParser(
531 description='Enhanced LifeCycle to OwnTracks uploader',
532 epilog='Configuration can be provided via config.json file or environment variables'
533 )
534 parser.add_argument('--config', '-c', type=str, default='config.json',
535 help='Path to configuration file (default: config.json)')
536 parser.add_argument('--create-config', action='store_true',
537 help='Create example configuration file and exit')
538 parser.add_argument('--update', action='store_true',
539 help='Update existing records with enhanced data instead of uploading new ones')
540 parser.add_argument('--batch-size', type=int, default=50,
541 help='Batch size for uploads (default: 50)')
542
543 args = parser.parse_args()
544
545 if args.create_config:
546 create_example_config()
547 return 0
548
549 # Load configuration
550 config = load_config(args.config)
551
552 # Validate required configuration
553 required_fields = ['mqtt_host', 'mqtt_user', 'mqtt_pass']
554 missing_fields = [field for field in required_fields
555 if not config.get(field) or config[field] in ['your-mqtt-host.com', 'your-username', 'your-password']]
556
557 if missing_fields:
558 print(f"❌ Missing or default configuration for: {', '.join(missing_fields)}")
559 print("Please update your config.json file or set environment variables:")
560 for field in missing_fields:
561 env_var = field.upper().replace('_', '_')
562 print(f" export {env_var}=your_value")
563 print("\nOr run with --create-config to create an example configuration file")
564 return 1
565
566 location_file = f"{config['data_dir']}/LocationEvent.json"
567 if not os.path.exists(location_file):
568 print(f"❌ Location data file not found: {location_file}")
569 print("Run ./export_lifecycle_data.sh first to export data")
570 return 1
571
572 uploader = EnhancedOwnTracksUploader(config, update_mode=args.update)
573 success = uploader.run_upload(batch_size=args.batch_size)
574
575 return 0 if success else 1
576
577if __name__ == "__main__":
578 sys.exit(main())