Terraform modules to configures an AWS lambda that connects Kafka to SQS.
The lambda is triggered by the AWS Kafka event source. It parses the kafka record from Avro or string and produce a new message
with the result to SQS. The parsed key and value are added to the original record received from the event source under the fields
parsed_key
and parsed_value
.
The parsed_key
is always a string while the parsed_value
can be an Avro json if the source was avro
otherwise a string.
Any process error is attached to the original event as well before it is sent to the DLQ with the field process_exception
.
- Serverless consumer
- Support for Avro and Schema Registry
- DLQ for poisoned pills and parsing errors
- Helper modules to easily configure the kafka credentials
- Support deploy lambda in a VPC subnet
See module documentation here
module "lambda_to_sqs" {
source = "https://github.com/andrewinci/kafka2sqs/releases/download/v2.3.0/module.zip//lambda"
function_name = "consumer"
kafka_endpoints = "kafka1.example.com:9092,kafka2.example.com:9092"
kafka_subnet_ids = ["subnet1"]
kafka_sg_ids = ["sg-example"]
kafka_authentication_type = "mTLS"
kafka_credentials_arn = aws_secretsmanager_secret.kafka_user_certificate.arn
kafka_ca_secret_arn = aws_secretsmanager_secret.kafka_ca_certificate.arn
kafka_topics = [{ topic_name = "test", is_avro = true }]
}
See module documentation here
module "sasl_secrets" {
source = "https://github.com/andrewinci/kafka2sqs/releases/download/v2.3.0/module.zip//sasl_secrets"
kafka_username = "kafka_username"
kafka_password = "kafka_password"
schema_registry_username = "schema_registry_username"
schema_registry_password = "schema_registry_password"
}
module "lambda_to_sqs" {
source = "https://github.com/andrewinci/kafka2sqs/releases/download/v2.3.0/module.zip//lambda"
function_name = "consumer"
kafka_endpoints = "whatever.europe-west1.gcp.confluent.cloud:9092"
kafka_authentication_type = "SASL"
kafka_credentials_arn = module.sasl_secrets.kafka_credentials_arn
schema_registry_endpoint = "https://schema_registry.endpoint.com"
schema_registry_credentials_arn = module.sasl_secrets.schema_registry_credentials_arn
kafka_topics = [
{ topic_name = "test", is_avro = true },
{ topic_name = "test-2", is_avro = false }
]
}
See module documentation here
module "mtls_secrets" {
source = "https://github.com/andrewinci/kafka2sqs/releases/download/v2.3.0/module.zip//mtls_secrets"
user_certificate = var.kafka_certificate
private_key = var.kafka_private_key
ca_certificate = var.kafka_ca_certificate
schema_registry_username = var.schema_registry_username
schema_registry_password = var.schema_registry_password
}
module "lambda_to_sqs" {
source = "https://github.com/andrewinci/kafka2sqs/releases/download/v2.3.0/module.zip//lambda"
function_name = "consumer"
kafka_endpoints = var.kafka_endpoints
kafka_authentication_type = "mTLS"
kafka_credentials_arn = module.mtls_secrets.kafka_credentials_arn
kafka_ca_secret_arn = module.mtls_secrets.kafka_ca_secret_arn
schema_registry_endpoint = var.schema_registry_endpoint
schema_registry_credentials_arn = module.mtls_secrets.schema_registry_credentials_arn
kafka_topics = [{ topic_name = "my_topic", is_avro = true }]
}
Package the terraform module with
make clean && make
Init the python virtualenv with
make venv && source .kafka2sqs/bin/activate
Lint the code with
make lint
Run tests and check the lint with
make && make check
Clean up with
make clean
Generate the documentation with
make docs
The documentation is generated with terraform-docs