-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
1c98d05
commit 440582e
Showing
1 changed file
with
42 additions
and
44 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,26 +2,25 @@ | |
import logging | ||
from datetime import datetime | ||
from random import Random | ||
import requests | ||
import base64 | ||
|
||
from flask import Flask, render_template, request, jsonify | ||
from flask_recaptcha import ReCaptcha | ||
from flask_limiter import Limiter | ||
from flask_limiter.util import get_remote_address | ||
|
||
from sendgrid import SendGridAPIClient | ||
from sendgrid.helpers.mail import (Mail, Attachment, FileContent, FileName, FileType, Disposition) | ||
from sendgrid.helpers.mail import Mail, Attachment, FileContent, FileName, FileType, Disposition | ||
|
||
from dotenv import load_dotenv | ||
|
||
load_dotenv() | ||
|
||
class Config: | ||
MAX_CONTENT_LENGTH = 15 * 1024 * 1024 # 15 MB | ||
MAX_CONTENT_LENGTH = 15 * 1024 * 1024 # 15 MB | ||
EMAIL_DOMAIN = "@ethereum.org" | ||
DEFAULT_RECIPIENT_EMAIL = "[email protected]" | ||
NUMBER_OF_ATTACHMENTS = int(os.getenv('NUMBEROFATTACHMENTS', 10)) | ||
DEBUG_MODE = os.getenv('DEBUG', 'False').lower() == 'true' | ||
SECRET_KEY = os.getenv('SECRET_KEY', 'you-should-set-a-secret-key') | ||
|
||
def validate_env_vars(required_vars): | ||
|
@@ -81,7 +80,7 @@ def create_email(to_email, identifier, text, all_attachments, reference=''): | |
subject = f'Secure Form Submission {identifier}' | ||
if reference: | ||
subject = f'{reference} {subject}' | ||
|
||
message = Mail( | ||
from_email=FROMEMAIL, | ||
to_emails=to_email, | ||
|
@@ -102,26 +101,27 @@ def create_email(to_email, identifier, text, all_attachments, reference=''): | |
message.add_attachment(attachedFile) | ||
return message | ||
|
||
|
||
def validate_recaptcha(recaptcha_response): | ||
""" | ||
Validates the ReCaptcha response. | ||
Validates the ReCaptcha response using Google's API. | ||
""" | ||
try: | ||
if not recaptcha_response: | ||
logging.error('No ReCaptcha response provided.') | ||
raise ValueError('ReCaptcha verification failed: No response provided.') | ||
secret_key = os.getenv('RECAPTCHASECRETKEY') | ||
payload = { | ||
'secret': secret_key, | ||
'response': recaptcha_response | ||
} | ||
response = requests.post('https://www.google.com/recaptcha/api/siteverify', data=payload) | ||
result = response.json() | ||
|
||
# Perform the verification | ||
if not recaptcha.verify(response=recaptcha_response): | ||
logging.error('ReCaptcha verification failed for response: %s', recaptcha_response) | ||
raise ValueError('ReCaptcha verification failed.') | ||
# Log the validation result | ||
logging.info(f"ReCaptcha validation response: {result}") | ||
|
||
logging.info('ReCaptcha verification succeeded for response: %s', recaptcha_response) | ||
except Exception as e: | ||
logging.error('Error during ReCaptcha validation: %s', str(e)) | ||
raise | ||
if not result.get('success'): | ||
raise ValueError('ReCaptcha verification failed.') | ||
|
||
# Check action and score thresholds for additional security | ||
if result.get('score', 1.0) < 0.5: | ||
raise ValueError('ReCaptcha score is too low, indicating potential abuse.') | ||
|
||
def send_email(message): | ||
""" | ||
|
@@ -134,13 +134,10 @@ def send_email(message): | |
if response.status_code not in [200, 201, 202]: | ||
logging.error('SendGrid failed with status code: %s, response body: %s', response.status_code, response.body) | ||
raise ValueError(f"Error: Failed to send email. Status code: {response.status_code}, body: {response.body}") | ||
else: | ||
logging.info('Email sent successfully. Status code: %s, response body: %s', response.status_code, response.body) | ||
except Exception as e: | ||
logging.error('Error sending email via SendGrid: %s', str(e)) | ||
raise | ||
|
||
|
||
# Validate required environment variables | ||
required_env_vars = ['RECAPTCHASITEKEY', 'RECAPTCHASECRETKEY', 'SENDGRIDAPIKEY', 'SENDGRIDFROMEMAIL'] | ||
validate_env_vars(required_env_vars) | ||
|
@@ -152,40 +149,39 @@ def send_email(message): | |
|
||
app = Flask(__name__) | ||
app.config.from_object(Config) | ||
recaptcha = ReCaptcha(app) | ||
|
||
# Initialize rate limiting | ||
limiter = Limiter(get_remote_address, app=app, default_limits=["200 per day", "50 per hour"]) | ||
|
||
# Configure logging | ||
log_file = os.environ.get('LOG_FILE', '') | ||
|
||
if log_file: | ||
logging.basicConfig(filename=log_file, level=logging.INFO) | ||
else: | ||
logging.basicConfig(level=logging.INFO) | ||
|
||
@app.route('/', methods=['GET']) | ||
def index(): | ||
return render_template('index.html', notice='', hascaptcha=not Config.DEBUG_MODE, attachments_number=Config.NUMBER_OF_ATTACHMENTS, recaptcha_sitekey=RECAPTCHASITEKEY) | ||
|
||
return render_template('index.html', notice='', hascaptcha=True, attachments_number=Config.NUMBER_OF_ATTACHMENTS, recaptcha_sitekey=RECAPTCHASITEKEY) | ||
|
||
@app.route('/submit-encrypted-data', methods=['POST']) | ||
@limiter.limit("5 per minute") | ||
@limiter.limit("3 per minute") | ||
def submit(): | ||
try: | ||
# Parse JSON data from request | ||
data = request.get_json() | ||
|
||
# Validate ReCaptcha unless in debug mode | ||
if not Config.DEBUG_MODE: | ||
recaptcha_response = data.get('g-recaptcha-response', '') | ||
try: | ||
validate_recaptcha(recaptcha_response) | ||
except ValueError as e: | ||
return jsonify({'status': 'failure', 'message': str(e)}), 400 | ||
# Validate ReCaptcha | ||
recaptcha_response = data.get('g-recaptcha-response', '') | ||
if not recaptcha_response: | ||
logging.warning(f"Missing ReCaptcha response. Potential bypass attempt detected from IP: {request.remote_addr}") | ||
return jsonify({'status': 'failure', 'message': 'Missing ReCaptcha token'}), 400 | ||
|
||
try: | ||
validate_recaptcha(recaptcha_response) | ||
except ValueError as e: | ||
return jsonify({'status': 'failure', 'message': str(e)}), 400 | ||
|
||
# Extract fields from JSON data | ||
message = data['message'] | ||
recipient = data['recipient'] | ||
reference = data.get('reference', '') | ||
|
@@ -197,7 +193,6 @@ def submit(): | |
if not valid_recipient(recipient): | ||
raise ValueError('Error: Invalid recipient!') | ||
|
||
# Get submission statistics | ||
date = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | ||
message_length = len(message) | ||
file_count = len(files) | ||
|
@@ -212,24 +207,27 @@ def submit(): | |
|
||
message = create_email(to_email, identifier, message, files, reference) | ||
|
||
if Config.DEBUG_MODE: | ||
print(f"Attempt to send email to {to_email}") | ||
print(message.get()) | ||
send_email(message) | ||
else: | ||
send_email(message) | ||
send_email(message) | ||
|
||
notice = f'Thank you! The relevant team was notified of your submission. You could use the following identifier to refer to it in correspondence: <b>{identifier}</b>' | ||
|
||
# Return success response | ||
return jsonify({'status': 'success', 'message': notice}) | ||
|
||
except Exception as e: | ||
# Log error message and return failure response | ||
error_message = "An unexpected error occurred. Please try again later." | ||
logging.error(f"Internal error: {str(e)}") | ||
return jsonify({'status': 'failure', 'message': error_message}) | ||
|
||
@app.errorhandler(429) | ||
def rate_limit_exceeded(e): | ||
""" | ||
Handles requests that exceed the rate limit. | ||
""" | ||
return jsonify({ | ||
'status': 'failure', | ||
'message': 'Rate limit exceeded. You can only submit once per minute. Please try again later.' | ||
}), 429 | ||
|
||
|
||
@app.errorhandler(413) | ||
def error413(e): | ||
|