from flask import Flask, request, jsonify import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import os import hashlib import hmac import logging from datetime import datetime import json # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = Flask(__name__) # Security configurations app.config['SECRET_KEY'] = os.environ.get('FLASK_SECRET_KEY', 'your-secret-key-here') WEBHOOK_SECRET = os.environ.get('WEBHOOK_SECRET', 'your-webhook-secret') PARTICLE_WEBHOOK_SECRET = os.environ.get('PARTICLE_WEBHOOK_SECRET', 'Not-Loaded') SMTP_EMAIL = os.environ.get('SMTP_EMAIL', 'your-email@gmail.com') SMTP_PASSWORD = os.environ.get('SMTP_PASSWORD', 'your-app-password') RECIPIENT_EMAIL = os.environ.get('RECIPIENT_EMAIL', 'your-phone@carrier.com') def verify_webhook_signature(payload, signature): """Verify webhook signature for security""" if not signature: return False # Create expected signature expected_signature = hmac.new( WEBHOOK_SECRET.encode('utf-8'), payload, hashlib.sha256 ).hexdigest() # Compare signatures securely return hmac.compare_digest(f"sha256={expected_signature}", signature) def validate_webhook_data(data): """Validate webhook data structure - handles both Particle and generic formats""" # Particle.io webhook format particle_fields = ['event', 'data', 'published_at', 'coreid'] # Generic webhook format generic_fields = ['event', 'data', 'published_at'] if not isinstance(data, dict): return False, "Invalid data format" # Check if it's a Particle webhook (has coreid field) if 'coreid' in data: required_fields = particle_fields else: required_fields = generic_fields for field in required_fields: if field not in data: return False, f"Missing required field: {field}" return True, "Valid" @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint for monitoring""" return jsonify({"status": "healthy", "timestamp": datetime.utcnow().isoformat()}), 200 def verify_particle_auth(auth_header): """Verify Particle webhook authentication header""" if not auth_header: logger.warning("No authorization header provided for Particle webhook") return False if not PARTICLE_WEBHOOK_SECRET: logger.error("PARTICLE_WEBHOOK_SECRET not configured") return False # Support both "Bearer token" and direct token formats if auth_header.startswith('Bearer '): token = auth_header[7:] # Remove "Bearer " prefix else: token = auth_header # Use secure comparison to prevent timing attacks is_valid = hmac.compare_digest(token, PARTICLE_WEBHOOK_SECRET) if is_valid: logger.info("Particle webhook authentication successful") else: logger.warning("Particle webhook authentication failed - invalid token") return is_valid @app.route('/webhook', methods=['POST']) def webhook(): """Secure webhook endpoint with validation and logging""" try: # Get raw payload for signature verification payload = request.get_data() signature = request.headers.get('X-Hub-Signature-256') # Check if this is a Particle webhook user_agent = request.headers.get('User-Agent', '') is_particle_webhook = 'ParticleBot' in user_agent or 'Particle' in user_agent if is_particle_webhook: # Verify Particle webhook authentication auth_header = request.headers.get('Authorization') or request.headers.get('X-Particle-Webhook-Secret') logger.info(f"Particle webhook detected. Auth header present: {bool(auth_header)}") if not verify_particle_auth(auth_header): logger.warning(f"Invalid Particle webhook authentication from {request.remote_addr}") logger.debug(f"Expected secret length: {len(PARTICLE_WEBHOOK_SECRET) if PARTICLE_WEBHOOK_SECRET else 0}") logger.debug(f"Received auth header: {auth_header[:10] + '...' if auth_header and len(auth_header) > 10 else auth_header}") return jsonify({"error": "Authentication failed"}), 403 logger.info(f"Particle webhook authenticated from {request.remote_addr}") else: # Verify webhook signature for non-Particle webhooks if not verify_webhook_signature(payload, signature): logger.warning(f"Invalid webhook signature from {request.remote_addr}") return jsonify({"error": "Invalid signature"}), 403 # Parse JSON data try: data = request.get_json() except Exception as e: logger.error(f"Invalid JSON payload: {e}") return jsonify({"error": "Invalid JSON"}), 400 # Validate data structure is_valid, message = validate_webhook_data(data) if not is_valid: logger.error(f"Invalid webhook data: {message}") return jsonify({"error": message}), 400 # Log the webhook event with device info if available device_info = f" (Device: {data.get('coreid', 'unknown')})" if 'coreid' in data else "" logger.info(f"Received webhook: {data['event']}{device_info} from {request.remote_addr}") # Send notification with enhanced formatting for Particle devices if 'coreid' in data: # Particle.io webhook format device_name = data.get('device_name', data.get('coreid', 'Unknown Device')) notification_message = f"🔒 SECURITY ALERT\nDevice: {device_name}\nEvent: {data['event']}\nData: {data['data']}\nTime: {data['published_at']}" else: # Generic webhook format notification_message = f"🔒 SECURITY ALERT\nEvent: {data['event']}\nData: {data['data']}\nTime: {data['published_at']}" send_notification(notification_message, data) # Return success response response = { "status": "success", "message": "Webhook processed successfully", "timestamp": datetime.utcnow().isoformat() } return jsonify(response), 200 except Exception as e: logger.error(f"Webhook processing error: {e}") return jsonify({"error": "Internal server error"}), 500 def send_notification(message, webhook_data=None): """Send notification via email/SMS with better error handling and device context""" try: # Create message msg = MIMEMultipart() msg['From'] = SMTP_EMAIL msg['To'] = RECIPIENT_EMAIL # Enhanced subject line for security alerts if webhook_data and 'coreid' in webhook_data: device_name = webhook_data.get('device_name', webhook_data.get('coreid', 'Device')) msg['Subject'] = f'🔒 Security Alert - {device_name}' else: msg['Subject'] = '🔒 Security Alert' # Add timestamp and formatting formatted_message = f"{message}\n\nAlert processed: {datetime.utcnow().isoformat()}" # Add device details if available if webhook_data and 'coreid' in webhook_data: device_details = f"\n\nDevice Details:\n" device_details += f"Device ID: {webhook_data.get('coreid', 'N/A')}\n" device_details += f"Device Name: {webhook_data.get('device_name', 'N/A')}\n" device_details += f"Firmware Version: {webhook_data.get('fw_version', 'N/A')}\n" formatted_message += device_details msg.attach(MIMEText(formatted_message, 'plain')) # Send email with smtplib.SMTP_SSL('smtp.gmail.com', 465) as server: server.login(SMTP_EMAIL, SMTP_PASSWORD) server.send_message(msg) logger.info("Security alert notification sent successfully") except Exception as e: logger.error(f"Failed to send notification: {e}") # Could add fallback notification methods here @app.errorhandler(404) def not_found(error): """Custom 404 handler""" return jsonify({"error": "Endpoint not found"}), 404 @app.errorhandler(405) def method_not_allowed(error): """Custom 405 handler""" return jsonify({"error": "Method not allowed"}), 405 @app.errorhandler(500) def internal_error(error): """Custom 500 handler""" logger.error(f"Internal server error: {error}") return jsonify({"error": "Internal server error"}), 500 if __name__ == '__main__': # Only for development - production uses WSGI server app.run(host='0.0.0.0', port=5000, debug=False)