-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Ralf Ueberfuhr
committed
Dec 20, 2024
1 parent
6342072
commit 7b2297a
Showing
12 changed files
with
1,039 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,8 @@ | ||
root = true | ||
|
||
[*] | ||
indent_style = space | ||
indent_size = 2 | ||
|
||
[*.md] | ||
indent_style = tab |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
# Wofür Kafka? | ||
|
||
Apache Kafka ist eine verteilte Streaming-Plattform, die häufig für das Verarbeiten und Übertragen von Datenströmen in | ||
Echtzeit verwendet wird. | ||
|
||
Es wird in verschiedenen Szenarien eingesetzt, darunter: | ||
|
||
## Echtzeit-Datenverarbeitung | ||
|
||
Kafka wird genutzt, um Datenströme in Echtzeit zu sammeln, zu speichern und zu verarbeiten. Beispiele: | ||
|
||
- Finanztransaktionen überwachen: Banken nutzen Kafka, um verdächtige Aktivitäten sofort zu erkennen. | ||
- IoT-Geräte überwachen: Sensoren können kontinuierlich Daten an Kafka senden, die dann verarbeitet werden. | ||
|
||
## Messaging-System | ||
|
||
Kafka fungiert als Message-Broker zwischen verschiedenen Anwendungen oder Diensten. Es ist skalierbar und hoch | ||
performant und wird oft anstelle traditioneller Message Queues wie RabbitMQ eingesetzt. Beispiele: | ||
|
||
- Log-Verarbeitung: Anwendungen senden Logs an Kafka, wo sie gesammelt und analysiert werden können. | ||
- Ereignisbasierte Kommunikation: Microservices kommunizieren über Kafka, indem sie Events veröffentlichen und | ||
abonnieren. | ||
|
||
## Datenintegration | ||
|
||
Kafka dient als Daten-Hub, der Daten aus unterschiedlichen Quellen zentralisiert und an Zielsysteme verteilt. Beispiele: | ||
|
||
- Datenbankänderungen verfolgen: Änderungen in einer Datenbank können mit Kafka an andere Systeme weitergeleitet werden. | ||
- Daten in Data Warehouses laden: Datenströme von verschiedenen Systemen können in Echtzeit in analytische Systeme wie | ||
Snowflake oder Apache Hive integriert werden. | ||
|
||
## Speicherung von Datenströmen | ||
|
||
Kafka speichert Datenströme in sogenannten Topics und erlaubt deren Verarbeitung später. Diese Persistenz ermöglicht: | ||
|
||
- Wiederholte Verarbeitung: Daten können mehrmals verarbeitet werden, falls ein Fehler auftritt. | ||
- Langfristige Speicherung: Kafka kann als Datenarchiv für Streams fungieren. | ||
|
||
## Typische Anwendungsfälle | ||
|
||
- E-Commerce-Plattformen: Echtzeit-Verarbeitung von Bestellungen, Lagerbeständen und Benutzeraktivitäten. | ||
- Social Media: Verarbeitung von Likes, Kommentaren und Posts in Echtzeit. | ||
- Medien-Streaming: Übertragung von Datenströmen, z. B. für Video- oder Audiostreaming. | ||
- Big Data: Daten aus verschiedenen Quellen in eine zentrale Plattform wie Hadoop oder Spark integrieren. | ||
|
||
Kafka ist besonders geeignet, wenn es ankommt auf: | ||
|
||
- Hohe Durchsatzrate | ||
- Niedrige Latenz | ||
- Skalierbarkeit | ||
- Verlässliche Datenübertragung |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
# CloudEvents | ||
|
||
CloudEvents ist ein standardisiertes, plattformunabhängiges Datenformat, das für die Beschreibung von Ereignissen | ||
entwickelt wurde. Es wird von der Cloud Native Computing Foundation (CNCF) verwaltet und dient dazu, die | ||
Interoperabilität zwischen verschiedenen Systemen und Diensten zu erleichtern. | ||
|
||
## Hauptziele von CloudEvents | ||
|
||
1. Standardisierung von Ereignissen: | ||
- CloudEvents definiert ein einheitliches Format für Ereignisse, das plattformübergreifend verwendet werden kann. | ||
- Dadurch können verschiedene Anwendungen, Tools und Cloud-Plattformen leichter zusammenarbeiten. | ||
2. Interoperabilität: | ||
- Ermöglicht die einfache Integration zwischen verschiedenen Diensten und Event-Quellen, unabhängig davon, welche | ||
Programmiersprache, Plattform oder Cloud verwendet wird. | ||
3. Portabilität: | ||
- Ereignisse, die dem CloudEvents-Standard entsprechen, können problemlos zwischen verschiedenen Systemen | ||
transportiert und verarbeitet werden. | ||
|
||
## Grundstruktur eines CloudEvents | ||
|
||
Ein CloudEvent ist in JSON, Avro oder Protobuf formatiert und enthält eine Reihe standardisierter Attribute. Ein | ||
typisches Beispiel in JSON: | ||
|
||
```json | ||
{ | ||
"specversion": "1.0", | ||
"type": "com.example.someevent", | ||
"source": "/mycontext", | ||
"id": "1234-1234-1234", | ||
"time": "2024-12-19T12:34:56Z", | ||
"datacontenttype": "application/json", | ||
"data": { | ||
"message": "Hello, World!" | ||
} | ||
} | ||
``` | ||
|
||
**Wichtige Attribute:** | ||
|
||
- `specversion`: Gibt die Version des CloudEvents-Standards an (z. B. "1.0"). | ||
- `type`: Der Typ des Ereignisses, z. B. "com.example.someevent". | ||
- `source`: Die Quelle des Ereignisses (z. B. ein URI, der den Ursprungsdienst beschreibt). | ||
- `id`: Eine eindeutige Kennung für das Ereignis. | ||
- `time`: Der Zeitpunkt, an dem das Ereignis erstellt wurde (ISO-8601-Format). | ||
- `datacontenttype`: Der Medientyp des Dateninhalts (z. B. "application/json"). | ||
- `data`: Der eigentliche Nutzdateninhalt des Ereignisses. | ||
|
||
## Vorteile von CloudEvents | ||
|
||
- **Standardisierte Kommunikation**: Dienste können Ereignisse auf eine einheitliche Weise generieren und empfangen. | ||
- **Einfache Integration**: Minimiert den Aufwand für die Anpassung von Ereignisformaten zwischen Systemen. | ||
- **Unterstützung durch Ökosystem**: Viele Event-Driven-Frameworks (z. B. Knative, Apache Kafka, AWS EventBridge) | ||
unterstützen CloudEvents. | ||
- **Flexibilität**: Unterstützt unterschiedliche Transportprotokolle wie HTTP, AMQP, MQTT oder Kafka. | ||
|
||
## Typische Anwendungsfälle | ||
|
||
1. Event-getriebene Architekturen: | ||
- Microservices, die auf Ereignisse reagieren, können CloudEvents verwenden, um standardisiert zu kommunizieren. | ||
2. Serverless Computing: | ||
- CloudEvents werden oft in serverlosen Architekturen eingesetzt, um Ereignisse zwischen Triggern und Funktionen zu | ||
übertragen. | ||
3. Integration von Drittanbieter-Diensten: | ||
- CloudEvents erleichtern die Integration zwischen Cloud-Diensten verschiedener Anbieter. | ||
4. IoT-Systeme: | ||
- Sensoren oder Geräte senden Ereignisse (z. B. Messwerte), die in CloudEvents strukturiert werden können. | ||
|
||
## Transportprotokolle | ||
|
||
CloudEvents ist unabhängig vom Transportprotokoll, unterstützt aber mehrere Protokollbindungen, darunter: | ||
|
||
- HTTP (RESTful APIs) | ||
- Kafka (für Event-Streams) | ||
- AMQP (Advanced Message Queuing Protocol) | ||
- MQTT (für IoT) | ||
- gRPC | ||
|
||
## Example: Handling CloudEvents with Spring | ||
|
||
### Consuming CloudEvents via HTTP | ||
|
||
Using Spring Boot with WebFlux: | ||
|
||
```java | ||
|
||
@RestController | ||
public class CloudEventController { | ||
@PostMapping("/event") | ||
public ResponseEntity<String> handleCloudEvent(@RequestBody CloudEvent event) { | ||
// Access CloudEvent attributes | ||
String type = event.getType(); | ||
Object data = event.getData(); | ||
System.out.println("Received event of type: " + type); | ||
System.out.println("Event data: " + data); | ||
return ResponseEntity.ok("Event processed"); | ||
} | ||
} | ||
``` | ||
|
||
### Producing CloudEvents via Kafka | ||
|
||
Using Spring Kafka to produce CloudEvents: | ||
|
||
```java | ||
|
||
@Autowired | ||
private KafkaTemplate<String, CloudEvent> kafkaTemplate; | ||
|
||
public void sendCloudEvent() { | ||
CloudEvent event = CloudEventBuilder.v1() | ||
.withId("1234-1234-1234") | ||
.withSource(URI.create("/example")) | ||
.withType("com.example.event") | ||
.withTime(OffsetDateTime.now()) | ||
.withDataContentType("application/json") | ||
.withData("{\"message\": \"Hello, CloudEvents!\"}".getBytes(StandardCharsets.UTF_8)) | ||
.build(); | ||
kafkaTemplate.send("my-topic", event); | ||
} | ||
``` | ||
|
||
### Function-Based Approach | ||
|
||
With Spring Cloud Function: | ||
|
||
```java | ||
|
||
@Bean | ||
public Function<CloudEvent, String> processEvent() { | ||
return event -> { | ||
System.out.println("Processing event: " + event.getId()); | ||
return "Processed event of type: " + event.getType(); | ||
}; | ||
} | ||
``` | ||
|
||
This function can be deployed to various platforms (e.g., AWS Lambda, Azure Functions) and will automatically process | ||
CloudEvents. | ||
|
||
### Key Dependencies | ||
|
||
To work with CloudEvents in Spring, you may need these dependencies in your `pom.xml`: | ||
|
||
```xml | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>org.springframework.cloud</groupId> | ||
<artifactId>spring-cloud-function-adapter</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.cloudevents</groupId> | ||
<artifactId>cloudevents-api</artifactId> | ||
<version>2.4.0</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.cloudevents</groupId> | ||
<artifactId>cloudevents-spring</artifactId> | ||
<version>2.4.0</version> | ||
</dependency> | ||
</dependencies> | ||
``` | ||
|
||
## Fazit | ||
|
||
CloudEvents bietet eine einheitliche Möglichkeit, Ereignisse in verteilten Systemen zu strukturieren und auszutauschen. | ||
Durch die Standardisierung erleichtert es die Entwicklung von event-driven Architekturen, die Portabilität zwischen | ||
Cloud-Plattformen und die Integration zwischen Diensten. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
# JSON-Schema-Validierung | ||
|
||
Um JSON-Nachrichten in Apache Kafka gegen ein JSON Schema zu validieren, wird üblicherweise der Confluent Schema | ||
Registry verwendet. Dieses Tool bietet eine zentrale Speicherung und Validierung von Schemas für Nachrichten, die in | ||
Kafka-Topics veröffentlicht werden. | ||
|
||
Hier ist eine schrittweise Anleitung, wie die Validierung funktioniert: | ||
|
||
## Einrichtung der Schema Registry | ||
|
||
- Installiere die Confluent Schema Registry und starte sie. Sie wird als separater Service bereitgestellt. | ||
- Konfiguriere Kafka-Broker und Producer/Consumer, um mit der Schema Registry zu kommunizieren. | ||
|
||
## Definition des JSON Schemas | ||
|
||
Ein JSON Schema definiert die Struktur, Typen und Validierungsregeln für die Nachrichten. Beispiel für ein Schema: | ||
|
||
```json | ||
{ | ||
"$schema": "http://json-schema.org/draft-07/schema#", | ||
"title": "User", | ||
"type": "object", | ||
"properties": { | ||
"id": { | ||
"type": "integer" | ||
}, | ||
"name": { | ||
"type": "string" | ||
}, | ||
"email": { | ||
"type": "string", | ||
"format": "email" | ||
} | ||
}, | ||
"required": [ | ||
"id", | ||
"name" | ||
] | ||
} | ||
``` | ||
|
||
Dieses Schema wird bei der Schema Registry registriert. | ||
|
||
## Registrierung des Schemas | ||
|
||
Das JSON Schema wird in der Schema Registry registriert. Dies erfolgt entweder über die REST API oder Tools wie | ||
`ccloud`. Beispiel mit REST API: | ||
|
||
```bash | ||
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \ | ||
--data '{ | ||
"schema": "{ \"$schema\": \"http://json-schema.org/draft-07/schema#\", \"type\": \"object\", \"properties\": {\"id\": {\"type\": \"integer\"}, \"name\": {\"type\": \"string\"}} }" | ||
}' \ | ||
http://localhost:8081/subjects/<topic-name>-value/versions | ||
``` | ||
|
||
## Produzieren von Nachrichten mit Validierung | ||
|
||
Kafka-Producer, die mit der Schema Registry arbeiten, verwenden spezielle Serializer: | ||
|
||
- **Avro**: Für Avro-Daten. | ||
- **Protobuf**: Für Protobuf-Daten. | ||
- **JSON Schema**: Für JSON-Daten. | ||
|
||
Um JSON-Daten zu serialisieren und gegen ein Schema zu validieren, wird die Bibliothek `kafka-json-schema-serializer` | ||
verwendet. Beispiel in Java: | ||
|
||
```java | ||
public void sendWithValidation() { | ||
Properties props = new Properties(); | ||
props.put("bootstrap.servers", "localhost:9092"); | ||
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | ||
props.put("value.serializer", "io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer"); | ||
props.put("schema.registry.url", "http://localhost:8081"); | ||
try (KafkaProducer<String, User> producer = new KafkaProducer<>(props)) { | ||
User user = new User(1, "Alice", "[email protected]"); | ||
ProducerRecord<String, User> record = new ProducerRecord<>("user-topic", user); | ||
producer.send(record); | ||
} | ||
} | ||
``` | ||
|
||
Hier wird das JSON Schema automatisch verwendet, um Nachrichten vor dem Versenden zu validieren. Wenn die Nachricht | ||
nicht dem Schema entspricht, wird eine Exception ausgelöst. | ||
|
||
## Konsumieren mit Validierung | ||
|
||
Consumer nutzen ebenfalls den entsprechenden Deserializer. Beispiel in Java: | ||
|
||
```java | ||
public void consumeWithValidation() { | ||
props.put("value.deserializer", "io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer"); | ||
try (KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props)) { | ||
consumer.subscribe(Collections.singletonList("user-topic")); | ||
while (true) { | ||
ConsumerRecords<String, User> records = consumer.poll(Duration.ofMillis(100)); | ||
for (ConsumerRecord<String, User> record : records) { | ||
System.out.println(record.value()); | ||
} | ||
} | ||
} | ||
} | ||
``` | ||
|
||
## Vorteile der JSON Schema Validierung | ||
|
||
- **Verlässlichkeit**: Sicherstellung, dass nur gültige Nachrichten in Topics gelangen. | ||
- **Kompatibilität**: Kontrolle über Schema-Evolution (z. B. Backward- oder Forward-Kompatibilität). | ||
- **Zentralisierung**: Ein zentraler Speicherort für alle Schemas, leicht zugänglich. | ||
|
||
## Zusammenfassung | ||
|
||
1. Schema Registry bereitstellen. | ||
2. JSON Schema definieren und registrieren. | ||
3. Kafka-Producer und -Consumer mit passenden Serializern konfigurieren. | ||
4. Nachrichten werden beim Senden (Producer) und Empfangen (Consumer) validiert. | ||
|
||
Dieses Setup gewährleistet, dass alle JSON-Nachrichten in Kafka-Topics dem definierten Schema entsprechen. |
Oops, something went wrong.