Burst-Nachrichten sind der Erzfeind vieler Streaming-Anwendungen. Sie sind wie der Freund, der unangekündigt mit 50 Leuten zum Abendessen auftaucht. Du bist nicht vorbereitet, überfordert und hast definitiv keinen Spaß.

Kafka Streams und Quarkus treten auf den Plan

Warum also Kafka Streams und Quarkus für diese Herkulesaufgabe? Es ist, als würde man fragen, warum man einen Ferrari für ein Rennen wählt. Kafka Streams ist für die Verarbeitung von Ereignissen mit hohem Durchsatz gebaut, während Quarkus seine superschnellen, subatomaren Java-Fähigkeiten ins Spiel bringt.

  • Kafka Streams: Verteilt, skalierbar und fehlertolerant. Perfekt für die Verarbeitung massiver Datenströme.
  • Quarkus: Leichtgewichtig, schnelle Startzeiten und geringer Speicherbedarf. Ideal für cloud-native Umgebungen.

Zusammen sind sie das Batman und Robin der Burst-Nachrichtenverarbeitung. Schauen wir uns an, wie wir ihre Kräfte nutzen können.

Architektur für den Burst

Bevor wir in den Code eintauchen, wollen wir verstehen, wie Kafka Streams Daten verarbeitet. Es dreht sich alles um die Topologie!


StreamsBuilder builder = new StreamsBuilder();
KStream inputStream = builder.stream("input-topic");

KStream processedStream = inputStream
    .filter((key, value) -> value != null)
    .mapValues(value -> value.toUpperCase());

processedStream.to("output-topic");

Topology topology = builder.build();

Diese einfache Topologie liest von einem Eingabethema, filtert Nullwerte heraus, konvertiert Nachrichten in Großbuchstaben und schreibt in ein Ausgabethema. Aber wie machen wir sie burst-resistent?

Paralleles Universum: Konfiguration der Parallelität

Der Schlüssel zur Verarbeitung von Burst-Nachrichten ist Parallelität. Lassen Sie uns unsere Quarkus-Konfiguration anpassen, um die volle Leistung von Kafka Streams freizusetzen:


# application.properties
kafka-streams.num.stream.threads=4
kafka-streams.max.poll.records=500
quarkus.kafka-streams.topics=input-topic,output-topic

Das passiert hier:

  • num.stream.threads: Wir sagen Kafka Streams, dass es 4 Threads für die Verarbeitung verwenden soll. Passen Sie dies an Ihre CPU-Kerne an.
  • max.poll.records: Dies begrenzt die Anzahl der Datensätze, die in einem einzigen Abfragezyklus verarbeitet werden, und verhindert, dass unsere Anwendung mehr übernimmt, als sie bewältigen kann.

Pufferüberlauf: Datenfluss verwalten

Beim Umgang mit Burst-Nachrichten ist Pufferung Ihr bester Freund. Es ist wie ein Wartezimmer für Ihre Nachrichten. Lassen Sie uns einige pufferbezogene Eigenschaften konfigurieren:


kafka-streams.buffer.memory=67108864
kafka-streams.batch.size=16384
kafka-streams.linger.ms=100

Diese Einstellungen helfen, den Datenfluss zu verwalten:

  • buffer.memory: Gesamtspeicher in Bytes, den der Produzent zum Puffern von Datensätzen verwenden kann.
  • batch.size: Maximale Größe einer Anfrage in Bytes.
  • linger.ms: Wie lange gewartet wird, bevor ein Batch gesendet wird, wenn er nicht voll ist.

Backpressure: Die Kunst, "Langsamer" zu sagen

Backpressure ist entscheidend beim Umgang mit Burst-Nachrichten. Es ist, als würde man seinem gesprächigen Freund sagen: "Warte mal, ich brauche eine Minute, um zu verarbeiten, was du gerade gesagt hast." In Kafka Streams können wir Backpressure mit der Produced-Klasse implementieren:


processedStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String())
    .withStreamPartitioner((topic, key, value, numPartitions) -> {
        // Benutzerdefinierte Partitionierungslogik zur Lastverteilung
        return Math.abs(key.hashCode()) % numPartitions;
    }));

Dieser benutzerdefinierte Partitioner hilft, die Last über Partitionen zu verteilen und verhindert, dass eine einzelne Partition zum Engpass wird.

Geisteszustand: Optimierung der Statusspeicher

Statusspeicher in Kafka Streams können bei der Burst-Verarbeitung zu einem Leistungsengpass werden. Lassen Sie uns sie optimieren:


kafka-streams.state.dir=/path/to/state/dir
kafka-streams.commit.interval.ms=100
kafka-streams.cache.max.bytes.buffering=10485760

Diese Einstellungen helfen, den Status effizienter zu verwalten:

  • state.dir: Wo der Status gespeichert wird. Verwenden Sie eine schnelle SSD für die beste Leistung.
  • commit.interval.ms: Wie oft der Verarbeitungsfortschritt gespeichert wird.
  • cache.max.bytes.buffering: Maximaler Speicher für das Puffern von Datensätzen vor dem Commit.

Komprimieren, um zu beeindrucken: Nachrichtenkompression

Beim Umgang mit Burst-Nachrichten zählt jedes Byte. Lassen Sie uns die Kompression aktivieren:


kafka-streams.compression.type=lz4

LZ4 bietet ein gutes Gleichgewicht zwischen Kompressionsrate und Geschwindigkeit, perfekt für die Verarbeitung von Bursts.

Vertrauen, aber überprüfen: Testen und Überwachen

Jetzt, da wir unsere Anwendung optimiert haben, wie wissen wir, dass sie den Burst bewältigen kann? Hier kommen Stresstests und Überwachung ins Spiel.

Stresstests mit JMeter

Erstellen Sie einen JMeter-Testplan, um einen Burst von 50.000 Nachrichten zu simulieren:


<?xml version="1.0" encoding="UTF-8"?>
<jmeterTestPlan version="1.2" properties="5.0" jmeter="5.4.1">
  <hashTree>
    <TestPlan guiclass="TestPlanGui" testclass="TestPlan" testname="Kafka Burst Test" enabled="true">
      <stringProp name="TestPlan.comments"></stringProp>
      <boolProp name="TestPlan.functional_mode">false</boolProp>
      <boolProp name="TestPlan.tearDown_on_shutdown">true</boolProp>
      <boolProp name="TestPlan.serialize_threadgroups">false</boolProp>
      <elementProp name="TestPlan.user_defined_variables" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
        <collectionProp name="Arguments.arguments"/>
      </elementProp>
      <stringProp name="TestPlan.user_define_classpath"></stringProp>
    </TestPlan>
    <hashTree>
      <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Kafka Producers" enabled="true">
        <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
        <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="Loop Controller" enabled="true">
          <boolProp name="LoopController.continue_forever">false</boolProp>
          <stringProp name="LoopController.loops">50000</stringProp>
        </elementProp>
        <stringProp name="ThreadGroup.num_threads">10</stringProp>
        <stringProp name="ThreadGroup.ramp_time">1</stringProp>
        <boolProp name="ThreadGroup.scheduler">false</boolProp>
        <stringProp name="ThreadGroup.duration"></stringProp>
        <stringProp name="ThreadGroup.delay"></stringProp>
        <boolProp name="ThreadGroup.same_user_on_next_iteration">true</boolProp>
      </ThreadGroup>
      <hashTree>
        <JavaSampler guiclass="JavaTestSamplerGui" testclass="JavaSampler" testname="Java Request" enabled="true">
          <elementProp name="arguments" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" enabled="true">
            <collectionProp name="Arguments.arguments">
              <elementProp name="kafka.topic" elementType="Argument">
                <stringProp name="Argument.name">kafka.topic</stringProp>
                <stringProp name="Argument.value">input-topic</stringProp>
                <stringProp name="Argument.metadata">=</stringProp>
              </elementProp>
              <elementProp name="kafka.key" elementType="Argument">
                <stringProp name="Argument.name">kafka.key</stringProp>
                <stringProp name="Argument.value">${__UUID()}</stringProp>
                <stringProp name="Argument.metadata">=</stringProp>
              </elementProp>
              <elementProp name="kafka.message" elementType="Argument">
                <stringProp name="Argument.name">kafka.message</stringProp>
                <stringProp name="Argument.value">Test message ${__threadNum}</stringProp>
                <stringProp name="Argument.metadata">=</stringProp>
              </elementProp>
            </collectionProp>
          </elementProp>
          <stringProp name="classname">com.example.KafkaProducerSampler</stringProp>
        </JavaSampler>
        <hashTree/>
      </hashTree>
    </hashTree>
  </hashTree>
</jmeterTestPlan>

Dieser Testplan simuliert 10 Threads, die jeweils 5.000 Nachrichten senden, insgesamt also 50.000 Burst-Nachrichten.

Überwachung mit Prometheus und Grafana

Richten Sie Prometheus und Grafana ein, um Ihre Quarkus-Anwendung zu überwachen. Fügen Sie Folgendes zu Ihrer application.properties hinzu:


quarkus.micrometer.export.prometheus.enabled=true
quarkus.micrometer.binder.kafka.enabled=true

Erstellen Sie ein Grafana-Dashboard, um Metriken wie Nachrichtendurchsatz, Verarbeitungszeit und Ressourcennutzung zu visualisieren.

Das große Finale: Alles zusammenfügen

Jetzt, da wir unsere Kafka Streams-Anwendung auf Quarkus optimiert, konfiguriert und getestet haben, lassen Sie uns sehen, wie sie in Aktion tritt:


@ApplicationScoped
public class BurstMessageProcessor {

    @Inject
    StreamsBuilder streamsBuilder;

    @Produces
    @ApplicationScoped
    public Topology buildTopology() {
        KStream inputStream = streamsBuilder.stream("input-topic");

        KStream processedStream = inputStream
            .filter((key, value) -> value != null)
            .mapValues(value -> value.toUpperCase())
            .peek((key, value) -> System.out.println("Processing: " + value));

        processedStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String())
            .withStreamPartitioner((topic, key, value, numPartitions) -> {
                return Math.abs(key.hashCode()) % numPartitions;
            }));

        return streamsBuilder.build();
    }
}

Diese von Quarkus unterstützte Kafka Streams-Anwendung ist jetzt bereit, die 50.000 Burst-Nachrichten wie ein Champion zu verarbeiten!

Zusammenfassung: Gelerntes

Die Verarbeitung von Burst-Nachrichten in Kafka Streams auf Quarkus ist keine leichte Aufgabe, aber mit den richtigen Techniken ist sie durchaus machbar. Hier ist, was wir gelernt haben:

  • Parallelität ist der Schlüssel: Verwenden Sie mehrere Threads und Partitionen, um die Last zu verteilen.
  • Puffer klug: Konfigurieren Sie Ihre Puffer, um den Burst auszugleichen.
  • Implementieren Sie Backpressure: Lassen Sie Ihre Anwendung nicht mehr übernehmen, als sie bewältigen kann.
  • Optimieren Sie Statusspeicher: Schnelles, effizientes Statusmanagement ist entscheidend für die Verarbeitung mit hohem Durchsatz.
  • Nachrichten komprimieren: Sparen Sie Bandbreite und Rechenleistung mit intelligenter Kompression.
  • Testen und überwachen: Überprüfen Sie immer Ihre Optimierungen und behalten Sie die Leistung im Auge.

Denken Sie daran, dass die Verarbeitung von Burst-Nachrichten ebenso eine Kunst wie eine Wissenschaft ist. Experimentieren, testen und optimieren Sie weiter. Ihre Kafka Streams-Anwendung wird es Ihnen danken, und ebenso Ihre Benutzer, wenn sie blitzschnelle Verarbeitung auch in den geschäftigsten Zeiten erleben.

Gehen Sie nun hinaus und zähmen Sie diese Nachrichten-Bursts wie der Streaming-Superheld, der Sie sind!

"In der Welt der Stream-Verarbeitung geht es nicht darum, wie hart man zuschlagen kann. Es geht darum, wie hart man getroffen werden kann und trotzdem weitermacht." - Rocky Balboa (wenn er ein Dateningenieur wäre)

Viel Spaß beim Streamen!