Files
StorageSecurity/Server/webhook_app.py
2025-07-06 21:54:54 -06:00

222 lines
8.7 KiB
Python

from flask import Flask, request, jsonify
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import os
import hashlib
import hmac
import logging
from datetime import datetime
import json
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
# Security configurations
app.config['SECRET_KEY'] = os.environ.get('FLASK_SECRET_KEY', 'your-secret-key-here')
WEBHOOK_SECRET = os.environ.get('WEBHOOK_SECRET', 'your-webhook-secret')
PARTICLE_WEBHOOK_SECRET = os.environ.get('PARTICLE_WEBHOOK_SECRET', 'Not-Loaded')
SMTP_EMAIL = os.environ.get('SMTP_EMAIL', 'your-email@gmail.com')
SMTP_PASSWORD = os.environ.get('SMTP_PASSWORD', 'your-app-password')
RECIPIENT_EMAIL = os.environ.get('RECIPIENT_EMAIL', 'your-phone@carrier.com')
def verify_webhook_signature(payload, signature):
"""Verify webhook signature for security"""
if not signature:
return False
# Create expected signature
expected_signature = hmac.new(
WEBHOOK_SECRET.encode('utf-8'),
payload,
hashlib.sha256
).hexdigest()
# Compare signatures securely
return hmac.compare_digest(f"sha256={expected_signature}", signature)
def validate_webhook_data(data):
"""Validate webhook data structure - handles both Particle and generic formats"""
# Particle.io webhook format
particle_fields = ['event', 'data', 'published_at', 'coreid']
# Generic webhook format
generic_fields = ['event', 'data', 'published_at']
if not isinstance(data, dict):
return False, "Invalid data format"
# Check if it's a Particle webhook (has coreid field)
if 'coreid' in data:
required_fields = particle_fields
else:
required_fields = generic_fields
for field in required_fields:
if field not in data:
return False, f"Missing required field: {field}"
return True, "Valid"
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint for monitoring"""
return jsonify({"status": "healthy", "timestamp": datetime.utcnow().isoformat()}), 200
def verify_particle_auth(auth_header):
"""Verify Particle webhook authentication header"""
if not auth_header:
logger.warning("No authorization header provided for Particle webhook")
return False
if not PARTICLE_WEBHOOK_SECRET:
logger.error("PARTICLE_WEBHOOK_SECRET not configured")
return False
# Support both "Bearer token" and direct token formats
if auth_header.startswith('Bearer '):
token = auth_header[7:] # Remove "Bearer " prefix
else:
token = auth_header
# Use secure comparison to prevent timing attacks
is_valid = hmac.compare_digest(token, PARTICLE_WEBHOOK_SECRET)
if is_valid:
logger.info("Particle webhook authentication successful")
else:
logger.warning("Particle webhook authentication failed - invalid token")
return is_valid
@app.route('/webhook', methods=['POST'])
def webhook():
"""Secure webhook endpoint with validation and logging"""
try:
# Get raw payload for signature verification
payload = request.get_data()
signature = request.headers.get('X-Hub-Signature-256')
# Check if this is a Particle webhook
user_agent = request.headers.get('User-Agent', '')
is_particle_webhook = 'ParticleBot' in user_agent or 'Particle' in user_agent
if is_particle_webhook:
# Verify Particle webhook authentication
auth_header = request.headers.get('Authorization') or request.headers.get('X-Particle-Webhook-Secret')
logger.info(f"Particle webhook detected. Auth header present: {bool(auth_header)}")
if not verify_particle_auth(auth_header):
logger.warning(f"Invalid Particle webhook authentication from {request.remote_addr}")
logger.debug(f"Expected secret length: {len(PARTICLE_WEBHOOK_SECRET) if PARTICLE_WEBHOOK_SECRET else 0}")
logger.debug(f"Received auth header: {auth_header[:10] + '...' if auth_header and len(auth_header) > 10 else auth_header}")
return jsonify({"error": "Authentication failed"}), 403
logger.info(f"Particle webhook authenticated from {request.remote_addr}")
else:
# Verify webhook signature for non-Particle webhooks
if not verify_webhook_signature(payload, signature):
logger.warning(f"Invalid webhook signature from {request.remote_addr}")
return jsonify({"error": "Invalid signature"}), 403
# Parse JSON data
try:
data = request.get_json()
except Exception as e:
logger.error(f"Invalid JSON payload: {e}")
return jsonify({"error": "Invalid JSON"}), 400
# Validate data structure
is_valid, message = validate_webhook_data(data)
if not is_valid:
logger.error(f"Invalid webhook data: {message}")
return jsonify({"error": message}), 400
# Log the webhook event with device info if available
device_info = f" (Device: {data.get('coreid', 'unknown')})" if 'coreid' in data else ""
logger.info(f"Received webhook: {data['event']}{device_info} from {request.remote_addr}")
# Send notification with enhanced formatting for Particle devices
if 'coreid' in data:
# Particle.io webhook format
device_name = data.get('device_name', data.get('coreid', 'Unknown Device'))
notification_message = f"🔒 SECURITY ALERT\nDevice: {device_name}\nEvent: {data['event']}\nData: {data['data']}\nTime: {data['published_at']}"
else:
# Generic webhook format
notification_message = f"🔒 SECURITY ALERT\nEvent: {data['event']}\nData: {data['data']}\nTime: {data['published_at']}"
send_notification(notification_message, data)
# Return success response
response = {
"status": "success",
"message": "Webhook processed successfully",
"timestamp": datetime.utcnow().isoformat()
}
return jsonify(response), 200
except Exception as e:
logger.error(f"Webhook processing error: {e}")
return jsonify({"error": "Internal server error"}), 500
def send_notification(message, webhook_data=None):
"""Send notification via email/SMS with better error handling and device context"""
try:
# Create message
msg = MIMEMultipart()
msg['From'] = SMTP_EMAIL
msg['To'] = RECIPIENT_EMAIL
# Enhanced subject line for security alerts
if webhook_data and 'coreid' in webhook_data:
device_name = webhook_data.get('device_name', webhook_data.get('coreid', 'Device'))
msg['Subject'] = f'🔒 Security Alert - {device_name}'
else:
msg['Subject'] = '🔒 Security Alert'
# Add timestamp and formatting
formatted_message = f"{message}\n\nAlert processed: {datetime.utcnow().isoformat()}"
# Add device details if available
if webhook_data and 'coreid' in webhook_data:
device_details = f"\n\nDevice Details:\n"
device_details += f"Device ID: {webhook_data.get('coreid', 'N/A')}\n"
device_details += f"Device Name: {webhook_data.get('device_name', 'N/A')}\n"
device_details += f"Firmware Version: {webhook_data.get('fw_version', 'N/A')}\n"
formatted_message += device_details
msg.attach(MIMEText(formatted_message, 'plain'))
# Send email
with smtplib.SMTP_SSL('smtp.gmail.com', 465) as server:
server.login(SMTP_EMAIL, SMTP_PASSWORD)
server.send_message(msg)
logger.info("Security alert notification sent successfully")
except Exception as e:
logger.error(f"Failed to send notification: {e}")
# Could add fallback notification methods here
@app.errorhandler(404)
def not_found(error):
"""Custom 404 handler"""
return jsonify({"error": "Endpoint not found"}), 404
@app.errorhandler(405)
def method_not_allowed(error):
"""Custom 405 handler"""
return jsonify({"error": "Method not allowed"}), 405
@app.errorhandler(500)
def internal_error(error):
"""Custom 500 handler"""
logger.error(f"Internal server error: {error}")
return jsonify({"error": "Internal server error"}), 500
if __name__ == '__main__':
# Only for development - production uses WSGI server
app.run(host='0.0.0.0', port=5000, debug=False)