Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature : ACL integration features #287

Merged
merged 26 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
All notable changes to this project will be documented in this file. This change log follows the conventions
of [keepachangelog.com](http://keepachangelog.com/).

## 4.12.0
- Adds support for ACL auth for kafka streams.

## 4.11.1
- Fix retry-count returning nil if empty. Returns 0 by default now.

Expand Down
110 changes: 85 additions & 25 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,40 +1,100 @@
KAFKA_TOPICS = topic another-test-topic
KAFKA_BROKERS = kafka1:9095 kafka2:9096 kafka3:9097
ADMIN_CONFIG = /etc/kafka/secrets/config-admin.properties
KAFKA_CONTAINER = ziggurat_kafka1_1

.PHONY: all
all: test

topic="topic"
another_test_topic="another-test-topic"
# Main target to setup the entire cluster
setup-cluster: down up wait-for-kafka create-scram-credentials create-topics setup-acls

setup:
docker-compose down
lein deps
docker-compose up -d
sleep 10
docker exec ziggurat_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic $(topic) --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper
docker exec ziggurat_kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic $(another_test_topic) --partitions 3 --replication-factor 1 --zookeeper ziggurat_zookeeper
# Bring down all containers and clean volumes
down:
@echo "Bringing down all containers..."
docker-compose -f docker-compose-cluster.yml down -v

test: setup
TESTING_TYPE=local lein test
docker-compose down
# Start all containers
up:
@echo "Starting all containers..."
docker-compose -f docker-compose-cluster.yml up -d

setup-cluster:
rm -rf /tmp/ziggurat_kafka_cluster_data
docker-compose -f docker-compose-cluster.yml -p ziggurat down
lein deps
docker-compose -f docker-compose-cluster.yml -p ziggurat up -d
sleep 30
# Sleeping for 30s to allow the cluster to come up
docker exec ziggurat_kafka1_1 kafka-topics --create --topic $(topic) --partitions 3 --replication-factor 3 --if-not-exists --zookeeper ziggurat_zookeeper_1
docker exec ziggurat_kafka1_1 kafka-topics --create --topic $(another_test_topic) --partitions 3 --replication-factor 3 --if-not-exists --zookeeper ziggurat_zookeeper_1
# Wait for Kafka to be ready
wait-for-kafka:
@echo "Waiting for Kafka to be ready..."
@sleep 30

# Restart everything
restart: down up wait-for-kafka

# Create SCRAM credentials for admin user
create-scram-credentials:
@echo "Creating SCRAM credentials for admin user..."
@docker exec $(KAFKA_CONTAINER) kafka-configs \
--alter \
--zookeeper zookeeper:2181 \
--add-config 'SCRAM-SHA-256=[password=admin]' \
--entity-type users \
--entity-name admin

# Create all required topics
create-topics:
@for topic in $(KAFKA_TOPICS); do \
echo "Creating topic: $$topic"; \
docker exec $(KAFKA_CONTAINER) kafka-topics \
--create \
--zookeeper zookeeper:2181 \
--if-not-exists \
--topic $$topic \
--partitions 3 \
--replication-factor 3; \
done

# Setup ACLs for admin user on all brokers
setup-acls:
@for broker in $(KAFKA_BROKERS); do \
case $$broker in \
kafka1:9095) \
container="ziggurat_kafka1_1" ;; \
kafka2:9096) \
container="ziggurat_kafka2_1" ;; \
kafka3:9097) \
container="ziggurat_kafka3_1" ;; \
esac; \
for topic in $(KAFKA_TOPICS); do \
echo "Setting up ACLs for topic: $$topic on broker: $$broker using container: $$container"; \
docker exec $$container kafka-acls \
--bootstrap-server $$broker \
--command-config $(ADMIN_CONFIG) \
--add \
--allow-principal User:admin \
--operation All \
--topic $$topic; \
done \
done

# Clean up topics (can be used during development)
clean-topics:
@for topic in $(KAFKA_TOPICS); do \
echo "Deleting topic: $$topic"; \
docker exec $(KAFKA_CONTAINER) kafka-topics --bootstrap-server kafka1:9095 \
--delete \
--topic $$topic; \
done

# Show logs
logs:
docker-compose -f docker-compose-cluster.yml logs -f

test-cluster: setup-cluster
TESTING_TYPE=cluster lein test
docker-compose -f docker-compose-cluster.yml down
rm -rf /tmp/ziggurat_kafka_cluster_data

coverage: setup
coverage: setup-cluster
lein code-coverage
docker-compose down
docker-compose -f docker-compose-cluster.yml down


proto:
protoc -I=resources --java_out=test/ resources/proto/example.proto
protoc -I=resources --java_out=test/ resources/proto/person.proto
protoc -I=resources --java_out=test/ resources/proto/person.proto
5 changes: 5 additions & 0 deletions config-admin.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="admin";
71 changes: 57 additions & 14 deletions docker-compose-cluster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@ services:
container_name: 'ziggurat_rabbitmq'

zookeeper:
image: zookeeper:3.4.9
image: confluentinc/cp-zookeeper:5.5.0
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zookeeper:2888:3888
ZOO_TICK_TIME: 2000
ZOOKEEPER_CLIENT_PORT: 2181
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/zookeeper_server_jaas.conf
-Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
-Dzookeeper.allowSaslFailedClients=true
-Dzookeeper.requireClientAuthScheme=sasl"
volumes:
- ./zookeeper_server_jaas.conf:/etc/kafka/zookeeper_server_jaas.conf
- /tmp/ziggurat_kafka_cluster_data/zookeeper/data:/data
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why have we removed these. ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seemed redundant to mount this volume, will add back

- /tmp/ziggurat_kafka_cluster_data/zookeeper/datalog:/datalog

Expand All @@ -28,17 +30,32 @@ services:
- SYS_ADMIN
hostname: kafka1
ports:
- "9091:9091"
- "9094:9094"
- "9095:9095"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19094,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,SASL_PLAINTEXT://${DOCKER_HOST_IP:-127.0.0.1}:9095
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 60000
KAFKA_BROKER_ID: 1
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-256
KAFKA_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.scram.ScramLoginModule required \
username=\"client\" \
password=\"client-secret\";"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
KAFKA_SUPER_USERS: User:ANONYMOUS;User:admin
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "false"
KAFKA_SECURITY_PROTOCOL: SASL_PLAINTEXT
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf -Dzookeeper.sasl.client=true -Dzookeeper.sasl.clientconfig=Client"
KAFKA_ZOOKEEPER_SET_ACL: "true"
KAFKA_ZOOKEEPER_SASL_ENABLED: "true"
volumes:
- /tmp/ziggurat_kafka_cluster_data/kafka1/data:/var/lib/kafka/data
- ./kafka_server_jaas.conf:/etc/kafka/secrets/kafka_server_jaas.conf
- ./config-admin.properties:/etc/kafka/secrets/config-admin.properties
depends_on:
- zookeeper

Expand All @@ -50,16 +67,29 @@ services:
hostname: kafka2
ports:
- "9092:9092"
- "9096:9096"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,SASL_PLAINTEXT://${DOCKER_HOST_IP:-127.0.0.1}:9096
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 60000
KAFKA_BROKER_ID: 2
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-256
KAFKA_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.scram.ScramLoginModule required \
username=\"client\" \
password=\"client-secret\";"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
KAFKA_SUPER_USERS: User:ANONYMOUS;User:admin
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "false"
KAFKA_SECURITY_PROTOCOL: SASL_PLAINTEXT
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf"
volumes:
- /tmp/ziggurat_kafka_cluster_data/kafka2/data:/var/lib/kafka/data
- ./kafka_server_jaas.conf:/etc/kafka/secrets/kafka_server_jaas.conf
- ./config-admin.properties:/etc/kafka/secrets/config-admin.properties
depends_on:
- zookeeper

Expand All @@ -71,15 +101,28 @@ services:
hostname: kafka3
ports:
- "9093:9093"
- "9097:9097"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,SASL_PLAINTEXT://${DOCKER_HOST_IP:-127.0.0.1}:9097
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 60000
KAFKA_BROKER_ID: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3
KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-256
KAFKA_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.scram.ScramLoginModule required \
username=\"client\" \
password=\"client-secret\";"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
KAFKA_SUPER_USERS: User:ANONYMOUS;User:admin
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "false"
KAFKA_SECURITY_PROTOCOL: SASL_PLAINTEXT
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf"
volumes:
- /tmp/ziggurat_kafka_cluster_data/kafka3/data:/var/lib/kafka/data
- ./kafka_server_jaas.conf:/etc/kafka/secrets/kafka_server_jaas.conf
- ./config-admin.properties:/etc/kafka/secrets/config-admin.properties
depends_on:
- zookeeper
17 changes: 17 additions & 0 deletions kafka_server_jaas.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
};

Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="admin";
};

KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="client"
password="client-secret";
};
11 changes: 5 additions & 6 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
(cemerick.pomegranate.aether/register-wagon-factory!
"http" #(org.apache.maven.wagon.providers.http.HttpWagon.))

(defproject tech.gojek/ziggurat "4.11.1"
(defproject tech.gojek/ziggurat "4.12.0"
:description "A stream processing framework to build stateless applications on kafka"
:url "https://github.com/gojektech/ziggurat"
:license {:name "Apache License, Version 2.0"
Expand All @@ -15,7 +15,7 @@
[com.cemerick/url "0.1.1"]
[com.datadoghq/java-dogstatsd-client "2.4"]
[com.fasterxml.jackson.core/jackson-databind "2.9.9"]
[com.novemberain/langohr "5.2.0" :exclusions [org.clojure/clojure]]
[com.novemberain/langohr "5.2.0" :exclusions [org.clojure/clojure org.slf4j/slf4j-api]]
[com.taoensso/nippy "3.1.1"]
[io.dropwizard.metrics5/metrics-core "5.0.0" :scope "compile"]
[medley "1.3.0" :exclusions [org.clojure/clojure]]
Expand All @@ -41,10 +41,7 @@
[com.newrelic.agent.java/newrelic-api "6.5.0"]
[yleisradio/new-reliquary "1.1.0" :exclusions [org.clojure/clojure]]
[metosin/ring-swagger "0.26.2"
:exclusions [cheshire
com.fasterxml.jackson.core/jackson-core
com.fasterxml.jackson.dataformat/jackson-dataformat-smile
com.fasterxml.jackson.dataformat/jackson-dataformat-cbor]]
:exclusions [org.mozilla/rhino com.fasterxml.jackson.dataformat/jackson-dataformat-smile com.fasterxml.jackson.dataformat/jackson-dataformat-cbor cheshire com.google.code.findbugs/jsr305 com.fasterxml.jackson.core/jackson-core]]
[metosin/ring-swagger-ui "3.46.0"]
[cambium/cambium.core "1.1.0"]
[cambium/cambium.codec-cheshire "1.0.0"]
Expand Down Expand Up @@ -72,8 +69,10 @@
:dependencies [[com.google.protobuf/protobuf-java "3.17.0"]
[junit/junit "4.13.2"]
[org.hamcrest/hamcrest-core "2.2"]
[org.apache.kafka/kafka_2.12 "2.8.0"]
[org.apache.kafka/kafka-streams "2.8.2" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-clients "2.8.2" :classifier "test"]
[org.apache.kafka/kafka-streams-test-utils "2.8.2" :classifier "test"]
[org.clojure/test.check "1.1.0"]]
:plugins [[lein-cloverage "1.2.2" :exclusions [org.clojure/clojure]]]
:cloverage {:exclude-call ['cambium.core/info
Expand Down
1 change: 1 addition & 0 deletions resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,5 @@
:new-relic {:report-errors false}
:prometheus {:port 8002
:enabled false}
:ssl {:enabled false}
:log-format "text"}}
Loading