Skip to content

Commit

Permalink
Inbound email: download from S3 + convert HTML to plaintext (#5348)
Browse files Browse the repository at this point in the history
# What this PR does

* Make `AmazonSESValidatedInboundWebhookView` able to download emails
from S3 by providing AWS credentials via env variables
* Convert HTML to plaintext when there's only `text/html` available

## Which issue(s) this PR closes

Related to grafana/oncall-private#2905

## Checklist

- [x] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] Added the relevant release notes label (see labels prefixed w/
`release:`). These labels dictate how your PR will
    show up in the autogenerated release notes.
  • Loading branch information
vadimkerr authored Dec 18, 2024
1 parent 0694fe5 commit c36761e
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 31 deletions.
129 changes: 115 additions & 14 deletions engine/apps/email/inbound.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from anymail.inbound import AnymailInboundMessage
from anymail.signals import AnymailInboundEvent
from anymail.webhooks import amazon_ses, mailgun, mailjet, mandrill, postal, postmark, sendgrid, sparkpost
from bs4 import BeautifulSoup
from django.conf import settings
from django.http import HttpResponse, HttpResponseNotAllowed
from django.utils import timezone
from rest_framework import status
Expand All @@ -25,6 +27,15 @@ class AmazonSESValidatedInboundWebhookView(amazon_ses.AmazonSESInboundWebhookVie
# disable "Your Anymail webhooks are insecure and open to anyone on the web." warning
warn_if_no_basic_auth = False

def __init__(self):
super().__init__(
session_params={
"aws_access_key_id": settings.INBOUND_EMAIL_AWS_ACCESS_KEY_ID,
"aws_secret_access_key": settings.INBOUND_EMAIL_AWS_SECRET_ACCESS_KEY,
"region_name": settings.INBOUND_EMAIL_AWS_REGION,
},
)

def validate_request(self, request):
"""Add SNS message validation to Amazon SES inbound webhook view, which is not implemented in Anymail."""
if not validate_amazon_sns_message(self._parse_sns_message(request)):
Expand Down Expand Up @@ -74,11 +85,10 @@ def dispatch(self, request):
if request.method.lower() == "head":
return HttpResponse(status=status.HTTP_200_OK)

integration_token = self.get_integration_token_from_request(request)
if integration_token is None:
if self.integration_token is None:
return HttpResponse(status=status.HTTP_400_BAD_REQUEST)
request.inbound_email_integration_token = integration_token # used in RequestTimeLoggingMiddleware
return super().dispatch(request, alert_channel_key=integration_token)
request.inbound_email_integration_token = self.integration_token # used in RequestTimeLoggingMiddleware
return super().dispatch(request, alert_channel_key=self.integration_token)

def post(self, request):
payload = self.get_alert_payload_from_email_message(self.message)
Expand All @@ -94,7 +104,8 @@ def post(self, request):
)
return Response("OK", status=status.HTTP_200_OK)

def get_integration_token_from_request(self, request) -> Optional[str]:
@cached_property
def integration_token(self) -> Optional[str]:
if not self.message:
return None
# First try envelope_recipient field.
Expand Down Expand Up @@ -151,7 +162,8 @@ def message(self) -> AnymailInboundMessage | None:
logger.error("Failed to parse inbound email message")
return None

def check_inbound_email_settings_set(self):
@staticmethod
def check_inbound_email_settings_set():
"""
Guard method to checks if INBOUND_EMAIL settings present.
Returns InternalServerError if not.
Expand All @@ -167,16 +179,105 @@ def check_inbound_email_settings_set(self):
logger.error("InboundEmailWebhookView: INBOUND_EMAIL_DOMAIN env variable must be set.")
return HttpResponse(status=status.HTTP_500_INTERNAL_SERVER_ERROR)

def get_alert_payload_from_email_message(self, email: AnymailInboundMessage) -> EmailAlertPayload:
subject = email.subject or ""
subject = subject.strip()
message = email.text or ""
message = message.strip()
sender = self.get_sender_from_email_message(email)
@classmethod
def get_alert_payload_from_email_message(cls, email: AnymailInboundMessage) -> EmailAlertPayload:
if email.text:
message = email.text.strip()
elif email.html:
message = cls.html_to_plaintext(email.html)
else:
message = ""

return {
"subject": email.subject.strip() if email.subject else "",
"message": message,
"sender": cls.get_sender_from_email_message(email),
}

@staticmethod
def html_to_plaintext(html: str) -> str:
"""
Converts HTML to plain text. Renders links as "text (href)" and removes any empty lines.
Converting HTML to plaintext is a non-trivial task, so this method may not work perfectly for all cases.
"""
soup = BeautifulSoup(html, "html.parser")

# Browsers typically render these elements on their own line.
# There is no single official HTML5 list for this, so we go with HTML tags that render as
# display: block, display: list-item, display: table, display: table-row by default according to the HTML standard:
# https://html.spec.whatwg.org/multipage/rendering.html
newline_tags = [
"address",
"article",
"aside",
"blockquote",
"body",
"center",
"dd",
"details",
"dialog",
"dir",
"div",
"dl",
"dt",
"fieldset",
"figcaption",
"figure",
"footer",
"form",
"h1",
"h2",
"h3",
"h4",
"h5",
"h6",
"header",
"hgroup",
"hr",
"html",
"legend",
"li",
"listing",
"main",
"menu",
"nav",
"ol",
"p",
"plaintext",
"pre",
"search",
"section",
"summary",
"table",
"tr",
"ul",
"xmp",
]
# Insert a newline after each block-level element
for tag in soup.find_all(newline_tags):
tag.insert_before("\n")
tag.insert_after("\n")

# <br> tags are also typically rendered as newlines
for br in soup.find_all("br"):
br.replace_with("\n")

# example: "<a href="https://example.com">example</a>" -> "example (https://example.com)"
for a in soup.find_all("a"):
if href := a.get("href"):
a.append(f" ({href})")

for li in soup.find_all("li"):
li.insert_before("* ")

for hr in soup.find_all("hr"):
hr.replace_with("-" * 32)

return {"subject": subject, "message": message, "sender": sender}
# remove empty lines
return "\n".join(line.strip() for line in soup.get_text().splitlines() if line.strip())

def get_sender_from_email_message(self, email: AnymailInboundMessage) -> str:
@staticmethod
def get_sender_from_email_message(email: AnymailInboundMessage) -> str:
try:
if isinstance(email.from_email, list):
sender = email.from_email[0].addr_spec
Expand Down
Loading

0 comments on commit c36761e

Please sign in to comment.