···
+
# requires-python = ">=3.8"
+
Enhanced LifeCycle to OwnTracks Upload Script
+
Adds motion activities and improved data handling with update capability
+
from typing import Dict, Any, List, Set, Optional, Tuple
+
from datetime import datetime, timezone
+
import paho.mqtt.client as mqtt
+
from collections import defaultdict
+
# Default configuration
+
'mqtt_host': 'your-mqtt-host.com',
+
'mqtt_user': 'your-username',
+
'mqtt_pass': 'your-password',
+
'device_id': 'lifecycle',
+
'data_dir': 'lifecycle_export',
+
'state_file': 'owntracks_upload_state.json',
+
'min_timestamp': 946684800,
+
'motion_time_window': 300
+
def load_config(config_file: str = None) -> Dict[str, Any]:
+
"""Load configuration from file or environment variables"""
+
config = DEFAULT_CONFIG.copy()
+
# Try to load from config file
+
if config_file and os.path.exists(config_file):
+
with open(config_file, 'r') as f:
+
file_config = json.load(f)
+
config.update(file_config)
+
print(f"📝 Loaded config from {config_file}")
+
print(f"⚠️ Warning: Could not load config file {config_file}: {e}")
+
# Override with environment variables if present
+
'MQTT_HOST': 'mqtt_host',
+
'MQTT_PORT': 'mqtt_port',
+
'MQTT_USER': 'mqtt_user',
+
'MQTT_PASS': 'mqtt_pass',
+
'DEVICE_ID': 'device_id',
+
'DATA_DIR': 'data_dir',
+
'STATE_FILE': 'state_file'
+
for env_var, config_key in env_mappings.items():
+
if env_var in os.environ:
+
value = os.environ[env_var]
+
if config_key == 'mqtt_port':
+
print(f"⚠️ Warning: Invalid port value in {env_var}, using default")
+
config[config_key] = value
+
print(f"🔧 Using {env_var} from environment")
+
class EnhancedOwnTracksUploader:
+
def __init__(self, config: Dict[str, Any], update_mode=False):
+
self.mqtt_client = None
+
self.uploaded_timestamps = set()
+
self.updated_timestamps = set()
+
self.update_mode = update_mode
+
# Set up file paths from config
+
self.data_dir = config['data_dir']
+
self.location_file = f"{self.data_dir}/LocationEvent.json"
+
self.motion_file = f"{self.data_dir}/Motion.json"
+
self.timezone_file = f"{self.data_dir}/TimeZone.json"
+
self.state_file = config['state_file']
+
self.topic = f"owntracks/{config['mqtt_user']}/{config['device_id']}"
+
# Load previous state if exists
+
# Load timezone mapping
+
# Load motion data for enhanced context
+
self.load_motion_data()
+
"""Load previously uploaded/updated timestamps"""
+
if os.path.exists(self.state_file):
+
with open(self.state_file, 'r') as f:
+
self.uploaded_timestamps = set(state.get('uploaded_timestamps', []))
+
self.updated_timestamps = set(state.get('updated_timestamps', []))
+
self.upload_count = state.get('upload_count', 0)
+
self.update_count = state.get('update_count', 0)
+
print(f"📊 Loaded state: {len(self.uploaded_timestamps)} uploaded, {len(self.updated_timestamps)} updated")
+
print(f"⚠️ Warning: Could not load state file: {e}")
+
"""Save current upload/update state"""
+
'uploaded_timestamps': list(self.uploaded_timestamps),
+
'updated_timestamps': list(self.updated_timestamps),
+
'upload_count': self.upload_count,
+
'update_count': self.update_count,
+
'last_update': datetime.now().isoformat()
+
with open(self.state_file, 'w') as f:
+
json.dump(state, f, indent=2)
+
print(f"⚠️ Warning: Could not save state: {e}")
+
def load_timezones(self):
+
"""Load timezone mapping from LifeCycle data"""
+
with open(self.timezone_file, 'r') as f:
+
timezones = json.load(f)
+
self.timezone_map = {tz['id']: tz['name'] for tz in timezones}
+
print(f"📍 Loaded {len(self.timezone_map)} timezones")
+
print(f"⚠️ Warning: Could not load timezones: {e}")
+
self.timezone_map = {0: "Europe/London"}
+
def load_motion_data(self):
+
"""Load motion classification data for enhanced context"""
+
with open(self.motion_file, 'r') as f:
+
motion_records = json.load(f)
+
# Index motion data by timestamp for fast lookup
+
for record in motion_records:
+
timestamp = int(record.get('timestamp', 0))
+
self.motion_data[timestamp] = record
+
print(f"🏃 Loaded {len(self.motion_data)} motion records")
+
print(f"⚠️ Warning: Could not load motion data: {e}")
+
def find_motion_context(self, location_timestamp: int) -> Optional[Dict[str, Any]]:
+
"""Find closest motion data for a location timestamp"""
+
# Try exact match first
+
if location_timestamp in self.motion_data:
+
return self.motion_data[location_timestamp]
+
# Find closest within time window
+
min_diff = self.config['motion_time_window']
+
for motion_timestamp in self.motion_data:
+
diff = abs(motion_timestamp - location_timestamp)
+
closest_time = motion_timestamp
+
return self.motion_data[closest_time]
+
def build_motion_activities(self, motion_record: Dict[str, Any]) -> List[Dict[str, Any]]:
+
"""Convert LifeCycle motion data to OwnTracks motionactivities format"""
+
confidence = motion_record.get('confidence', 0)
+
# Map LifeCycle motion types to OwnTracks activities
+
('stationary', motion_record.get('stationary', 0)),
+
('walking', motion_record.get('walking', 0)),
+
('running', motion_record.get('running', 0)),
+
('automotive', motion_record.get('automotive', 0)),
+
('cycling', motion_record.get('cycling', 0))
+
# Add activities with non-zero values
+
for activity_type, value in motion_types:
+
'confidence': confidence
+
# If unknown motion, add it
+
if motion_record.get('unknown', 0) > 0:
+
'confidence': confidence
+
"""Setup MQTT client with SSL connection"""
+
# Create client with callback API version 2
+
self.mqtt_client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
+
self.mqtt_client.username_pw_set(self.config['mqtt_user'], self.config['mqtt_pass'])
+
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
+
context.check_hostname = False
+
context.verify_mode = ssl.CERT_NONE
+
self.mqtt_client.tls_set_context(context)
+
self.mqtt_client.on_connect = self.on_connect
+
self.mqtt_client.on_publish = self.on_publish
+
self.mqtt_client.on_disconnect = self.on_disconnect
+
print(f"🔌 Connecting to {self.config['mqtt_host']}:{self.config['mqtt_port']}...")
+
self.mqtt_client.connect(self.config['mqtt_host'], self.config['mqtt_port'], 60)
+
self.mqtt_client.loop_start()
+
while not self.connected and timeout > 0:
+
raise Exception("Connection timeout")
+
print(f"❌ MQTT setup failed: {e}")
+
def on_connect(self, client, userdata, flags, reason_code, properties):
+
"""MQTT connection callback"""
+
print(f"✅ Connected to OwnTracks MQTT broker")
+
print(f"❌ Connection failed with code {reason_code}")
+
def on_publish(self, client, userdata, mid, reason_code, properties):
+
"""MQTT publish callback"""
+
print(f"❌ Publish failed: {reason_code}")
+
def on_disconnect(self, client, userdata, reason_code, properties):
+
"""MQTT disconnect callback"""
+
print(f"⚠️ Unexpected disconnection: {reason_code}")
+
def convert_to_owntracks(self, location_event: Dict[str, Any]) -> Optional[Dict[str, Any]]:
+
"""Convert LifeCycle LocationEvent to enhanced OwnTracks format"""
+
timestamp = location_event.get('timestamp')
+
lat = location_event.get('latitude')
+
lon = location_event.get('longitude')
+
if not all([timestamp, lat is not None, lon is not None]):
+
if timestamp < self.config['min_timestamp']:
+
# Check accuracy threshold
+
h_accuracy = location_event.get('hAccuracy', 0)
+
if h_accuracy > self.config['max_accuracy']:
+
# Build enhanced OwnTracks message
+
"acc": int(h_accuracy),
+
"tid": "lc" # Tracker ID for LifeCycle
+
# Enhanced altitude handling with better precision
+
altitude = location_event.get('altitude')
+
if altitude is not None and altitude != -1.0 and altitude > -1000: # Sanity check
+
owntracks_msg['alt'] = round(altitude, 1) # Keep decimal precision
+
v_accuracy = location_event.get('vAccuracy')
+
if v_accuracy and v_accuracy > 0:
+
owntracks_msg['vac'] = int(v_accuracy)
+
# Enhanced speed handling
+
speed = location_event.get('speed')
+
if speed is not None and speed >= 0:
+
# Convert m/s to km/h with better precision
+
owntracks_msg['vel'] = round(speed * 3.6, 1)
+
course = location_event.get('course')
+
if course is not None and course >= 0:
+
owntracks_msg['cog'] = int(course)
+
wifi = location_event.get('wifi')
+
owntracks_msg['SSID'] = wifi
+
wifi_id = location_event.get('wifiID')
+
owntracks_msg['BSSID'] = wifi_id
+
# Enhanced: Add motion activities
+
motion_context = self.find_motion_context(timestamp)
+
activities = self.build_motion_activities(motion_context)
+
owntracks_msg['motionactivities'] = activities
+
# Add step count if available
+
steps = motion_context.get('steps', 0)
+
owntracks_msg['steps'] = steps
+
# Enhanced: Add barometric pressure estimate from altitude
+
if altitude is not None and altitude > -1000:
+
# Standard atmospheric pressure calculation: P = P0 * (1 - 0.0065*h/T0)^(g*M/(R*0.0065))
+
# Simplified approximation: P ≈ 101.325 * (1 - altitude/44330)^5.255
+
pressure = 101.325 * pow((1 - altitude / 44330.0), 5.255)
+
if 50 <= pressure <= 110: # Sanity check for reasonable pressure range
+
owntracks_msg['p'] = round(pressure, 2)
+
pass # Skip if calculation fails
+
print(f"❌ Conversion error: {e}")
+
def load_location_data(self) -> List[Dict[str, Any]]:
+
"""Load LocationEvent data from JSON file"""
+
with open(self.location_file, 'r') as f:
+
print(f"📍 Loaded {len(data)} location records")
+
# In update mode, process already uploaded records to enhance them
+
update_data = [record for record in data
+
if (record.get('timestamp') in self.uploaded_timestamps and
+
record.get('timestamp') not in self.updated_timestamps)]
+
print(f"🔄 {len(update_data)} records to update with enhanced data")
+
# Normal mode - filter out already uploaded records
+
new_data = [record for record in data
+
if record.get('timestamp') not in self.uploaded_timestamps]
+
print(f"📍 {len(new_data)} new records to upload")
+
print(f"❌ Error loading location data: {e}")
+
def upload_batch(self, batch: List[Dict[str, Any]], batch_num: int, total_batches: int):
+
"""Upload or update a batch of location records"""
+
mode_label = "Updating" if self.update_mode else "Uploading"
+
print(f"\n📤 {mode_label} batch {batch_num}/{total_batches} ({len(batch)} records)...")
+
for i, location_event in enumerate(batch):
+
# Convert to enhanced OwnTracks format
+
owntracks_msg = self.convert_to_owntracks(location_event)
+
timestamp = location_event.get('timestamp')
+
payload = json.dumps(owntracks_msg)
+
result = self.mqtt_client.publish(self.topic, payload, qos=1, retain=False)
+
self.updated_timestamps.add(timestamp)
+
self.uploaded_timestamps.add(timestamp)
+
# Show progress with enhanced data indicators
+
if batch_success % 10 == 0:
+
dt = datetime.fromtimestamp(timestamp)
+
motion_indicator = "🏃" if 'motionactivities' in owntracks_msg else ""
+
pressure_indicator = "🌡️" if 'p' in owntracks_msg else ""
+
print(f" 📍 {batch_success}/{len(batch)} - {dt.strftime('%Y-%m-%d %H:%M:%S')} {motion_indicator}{pressure_indicator}")
+
print(f"❌ Publish failed for timestamp {timestamp}")
+
# Small delay to avoid overwhelming the broker
+
print(f"❌ Error processing record: {e}")
+
self.skip_count += batch_skip
+
print(f"✅ Batch complete: {batch_success} processed, {batch_skip} skipped")
+
# Save state periodically
+
def run_upload(self, batch_size: int = 100):
+
"""Main upload/update process"""
+
mode_label = "update" if self.update_mode else "upload"
+
print(f"🚀 Starting enhanced LifeCycle to OwnTracks {mode_label}...")
+
# Setup MQTT connection
+
if not self.setup_mqtt():
+
location_data = self.load_location_data()
+
print(f"❌ No data to {mode_label}")
+
# Upload/update in batches
+
total_batches = (len(location_data) + batch_size - 1) // batch_size
+
for i in range(0, len(location_data), batch_size):
+
batch = location_data[i:i + batch_size]
+
batch_num = (i // batch_size) + 1
+
self.upload_batch(batch, batch_num, total_batches)
+
print("❌ Lost connection, reconnecting...")
+
if not self.setup_mqtt():
+
except KeyboardInterrupt:
+
print(f"\n⚠️ {mode_label.capitalize()} interrupted by user")
+
print(f"❌ {mode_label.capitalize()} error: {e}")
+
self.mqtt_client.loop_stop()
+
self.mqtt_client.disconnect()
+
total_processed = self.upload_count + self.update_count
+
print(f"\n📊 Enhanced {mode_label.capitalize()} Summary:")
+
print(f" 🔄 Updated: {self.update_count}")
+
print(f" ✅ Uploaded: {self.upload_count}")
+
print(f" ⏭️ Skipped: {self.skip_count}")
+
print(f" ❌ Errors: {self.error_count}")
+
print(f" 🏃 With motion data: ~{len(self.motion_data)} available")
+
print(f" 📁 State saved to: {self.state_file}")
+
def create_example_config():
+
"""Create an example configuration file"""
+
"mqtt_host": "your-mqtt-host.com",
+
"mqtt_user": "your-username",
+
"mqtt_pass": "your-password",
+
"device_id": "lifecycle",
+
"data_dir": "lifecycle_export",
+
"state_file": "owntracks_upload_state.json",
+
"min_timestamp": 946684800,
+
"motion_time_window": 300
+
with open('config.json.example', 'w') as f:
+
json.dump(example_config, f, indent=2)
+
print("📝 Created config.json.example - copy to config.json and edit with your settings")
+
parser = argparse.ArgumentParser(
+
description='Enhanced LifeCycle to OwnTracks uploader',
+
epilog='Configuration can be provided via config.json file or environment variables'
+
parser.add_argument('--config', '-c', type=str, default='config.json',
+
help='Path to configuration file (default: config.json)')
+
parser.add_argument('--create-config', action='store_true',
+
help='Create example configuration file and exit')
+
parser.add_argument('--update', action='store_true',
+
help='Update existing records with enhanced data instead of uploading new ones')
+
parser.add_argument('--batch-size', type=int, default=50,
+
help='Batch size for uploads (default: 50)')
+
args = parser.parse_args()
+
create_example_config()
+
config = load_config(args.config)
+
# Validate required configuration
+
required_fields = ['mqtt_host', 'mqtt_user', 'mqtt_pass']
+
missing_fields = [field for field in required_fields
+
if not config.get(field) or config[field] in ['your-mqtt-host.com', 'your-username', 'your-password']]
+
print(f"❌ Missing or default configuration for: {', '.join(missing_fields)}")
+
print("Please update your config.json file or set environment variables:")
+
for field in missing_fields:
+
env_var = field.upper().replace('_', '_')
+
print(f" export {env_var}=your_value")
+
print("\nOr run with --create-config to create an example configuration file")
+
location_file = f"{config['data_dir']}/LocationEvent.json"
+
if not os.path.exists(location_file):
+
print(f"❌ Location data file not found: {location_file}")
+
print("Run ./export_lifecycle_data.sh first to export data")
+
uploader = EnhancedOwnTracksUploader(config, update_mode=args.update)
+
success = uploader.run_upload(batch_size=args.batch_size)
+
return 0 if success else 1
+
if __name__ == "__main__":