first commit

This commit is contained in:
2025-07-06 21:54:54 -06:00
commit 0fdf023af1
5 changed files with 677 additions and 0 deletions

222
Server/webhook_app.py Normal file
View File

@ -0,0 +1,222 @@
from flask import Flask, request, jsonify
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import os
import hashlib
import hmac
import logging
from datetime import datetime
import json
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
# Security configurations
app.config['SECRET_KEY'] = os.environ.get('FLASK_SECRET_KEY', 'your-secret-key-here')
WEBHOOK_SECRET = os.environ.get('WEBHOOK_SECRET', 'your-webhook-secret')
PARTICLE_WEBHOOK_SECRET = os.environ.get('PARTICLE_WEBHOOK_SECRET', 'Not-Loaded')
SMTP_EMAIL = os.environ.get('SMTP_EMAIL', 'your-email@gmail.com')
SMTP_PASSWORD = os.environ.get('SMTP_PASSWORD', 'your-app-password')
RECIPIENT_EMAIL = os.environ.get('RECIPIENT_EMAIL', 'your-phone@carrier.com')
def verify_webhook_signature(payload, signature):
"""Verify webhook signature for security"""
if not signature:
return False
# Create expected signature
expected_signature = hmac.new(
WEBHOOK_SECRET.encode('utf-8'),
payload,
hashlib.sha256
).hexdigest()
# Compare signatures securely
return hmac.compare_digest(f"sha256={expected_signature}", signature)
def validate_webhook_data(data):
"""Validate webhook data structure - handles both Particle and generic formats"""
# Particle.io webhook format
particle_fields = ['event', 'data', 'published_at', 'coreid']
# Generic webhook format
generic_fields = ['event', 'data', 'published_at']
if not isinstance(data, dict):
return False, "Invalid data format"
# Check if it's a Particle webhook (has coreid field)
if 'coreid' in data:
required_fields = particle_fields
else:
required_fields = generic_fields
for field in required_fields:
if field not in data:
return False, f"Missing required field: {field}"
return True, "Valid"
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint for monitoring"""
return jsonify({"status": "healthy", "timestamp": datetime.utcnow().isoformat()}), 200
def verify_particle_auth(auth_header):
"""Verify Particle webhook authentication header"""
if not auth_header:
logger.warning("No authorization header provided for Particle webhook")
return False
if not PARTICLE_WEBHOOK_SECRET:
logger.error("PARTICLE_WEBHOOK_SECRET not configured")
return False
# Support both "Bearer token" and direct token formats
if auth_header.startswith('Bearer '):
token = auth_header[7:] # Remove "Bearer " prefix
else:
token = auth_header
# Use secure comparison to prevent timing attacks
is_valid = hmac.compare_digest(token, PARTICLE_WEBHOOK_SECRET)
if is_valid:
logger.info("Particle webhook authentication successful")
else:
logger.warning("Particle webhook authentication failed - invalid token")
return is_valid
@app.route('/webhook', methods=['POST'])
def webhook():
"""Secure webhook endpoint with validation and logging"""
try:
# Get raw payload for signature verification
payload = request.get_data()
signature = request.headers.get('X-Hub-Signature-256')
# Check if this is a Particle webhook
user_agent = request.headers.get('User-Agent', '')
is_particle_webhook = 'ParticleBot' in user_agent or 'Particle' in user_agent
if is_particle_webhook:
# Verify Particle webhook authentication
auth_header = request.headers.get('Authorization') or request.headers.get('X-Particle-Webhook-Secret')
logger.info(f"Particle webhook detected. Auth header present: {bool(auth_header)}")
if not verify_particle_auth(auth_header):
logger.warning(f"Invalid Particle webhook authentication from {request.remote_addr}")
logger.debug(f"Expected secret length: {len(PARTICLE_WEBHOOK_SECRET) if PARTICLE_WEBHOOK_SECRET else 0}")
logger.debug(f"Received auth header: {auth_header[:10] + '...' if auth_header and len(auth_header) > 10 else auth_header}")
return jsonify({"error": "Authentication failed"}), 403
logger.info(f"Particle webhook authenticated from {request.remote_addr}")
else:
# Verify webhook signature for non-Particle webhooks
if not verify_webhook_signature(payload, signature):
logger.warning(f"Invalid webhook signature from {request.remote_addr}")
return jsonify({"error": "Invalid signature"}), 403
# Parse JSON data
try:
data = request.get_json()
except Exception as e:
logger.error(f"Invalid JSON payload: {e}")
return jsonify({"error": "Invalid JSON"}), 400
# Validate data structure
is_valid, message = validate_webhook_data(data)
if not is_valid:
logger.error(f"Invalid webhook data: {message}")
return jsonify({"error": message}), 400
# Log the webhook event with device info if available
device_info = f" (Device: {data.get('coreid', 'unknown')})" if 'coreid' in data else ""
logger.info(f"Received webhook: {data['event']}{device_info} from {request.remote_addr}")
# Send notification with enhanced formatting for Particle devices
if 'coreid' in data:
# Particle.io webhook format
device_name = data.get('device_name', data.get('coreid', 'Unknown Device'))
notification_message = f"🔒 SECURITY ALERT\nDevice: {device_name}\nEvent: {data['event']}\nData: {data['data']}\nTime: {data['published_at']}"
else:
# Generic webhook format
notification_message = f"🔒 SECURITY ALERT\nEvent: {data['event']}\nData: {data['data']}\nTime: {data['published_at']}"
send_notification(notification_message, data)
# Return success response
response = {
"status": "success",
"message": "Webhook processed successfully",
"timestamp": datetime.utcnow().isoformat()
}
return jsonify(response), 200
except Exception as e:
logger.error(f"Webhook processing error: {e}")
return jsonify({"error": "Internal server error"}), 500
def send_notification(message, webhook_data=None):
"""Send notification via email/SMS with better error handling and device context"""
try:
# Create message
msg = MIMEMultipart()
msg['From'] = SMTP_EMAIL
msg['To'] = RECIPIENT_EMAIL
# Enhanced subject line for security alerts
if webhook_data and 'coreid' in webhook_data:
device_name = webhook_data.get('device_name', webhook_data.get('coreid', 'Device'))
msg['Subject'] = f'🔒 Security Alert - {device_name}'
else:
msg['Subject'] = '🔒 Security Alert'
# Add timestamp and formatting
formatted_message = f"{message}\n\nAlert processed: {datetime.utcnow().isoformat()}"
# Add device details if available
if webhook_data and 'coreid' in webhook_data:
device_details = f"\n\nDevice Details:\n"
device_details += f"Device ID: {webhook_data.get('coreid', 'N/A')}\n"
device_details += f"Device Name: {webhook_data.get('device_name', 'N/A')}\n"
device_details += f"Firmware Version: {webhook_data.get('fw_version', 'N/A')}\n"
formatted_message += device_details
msg.attach(MIMEText(formatted_message, 'plain'))
# Send email
with smtplib.SMTP_SSL('smtp.gmail.com', 465) as server:
server.login(SMTP_EMAIL, SMTP_PASSWORD)
server.send_message(msg)
logger.info("Security alert notification sent successfully")
except Exception as e:
logger.error(f"Failed to send notification: {e}")
# Could add fallback notification methods here
@app.errorhandler(404)
def not_found(error):
"""Custom 404 handler"""
return jsonify({"error": "Endpoint not found"}), 404
@app.errorhandler(405)
def method_not_allowed(error):
"""Custom 405 handler"""
return jsonify({"error": "Method not allowed"}), 405
@app.errorhandler(500)
def internal_error(error):
"""Custom 500 handler"""
logger.error(f"Internal server error: {error}")
return jsonify({"error": "Internal server error"}), 500
if __name__ == '__main__':
# Only for development - production uses WSGI server
app.run(host='0.0.0.0', port=5000, debug=False)