Programmering

Bygget til realtid: Big data messaging med Apache Kafka, del 1

Da big data-bevægelsen startede, var det mest fokuseret på batchbehandling. Distribuerede datalagrings- og forespørgselsværktøjer som MapReduce, Hive og Pig var alle designet til at behandle data i batches snarere end kontinuerligt. Virksomheder vil køre flere job hver nat for at udtrække data fra en database, derefter analysere, transformere og til sidst gemme dataene. For nylig har virksomheder opdaget styrken ved at analysere og behandle data og begivenheder som de sker, ikke kun en gang hvert par timer. De fleste traditionelle messaging-systemer skaleres dog ikke op til håndtering af store data i realtid. Så ingeniører hos LinkedIn byggede og open source Apache Kafka: en distribueret messaging-ramme, der opfylder kravene fra big data ved at skalere på råvarehardware.

I løbet af de sidste par år er Apache Kafka opstået for at løse en række brugssager. I det enkleste tilfælde kan det være en simpel buffer til lagring af applikationslogfiler. Kombineret med en teknologi som Spark Streaming kan den bruges til at spore dataændringer og tage handling på disse data, før den gemmes til en endelig destination. Kafkas forudsigelige tilstand gør det til et kraftfuldt værktøj til at opdage svindel, såsom at kontrollere gyldigheden af ​​en kreditkorttransaktion, når det sker, og ikke vente på batchbehandling timer senere.

Denne tutorial i to dele introducerer Kafka, startende med, hvordan du installerer og kører det i dit udviklingsmiljø. Du får et overblik over Kafkas arkitektur efterfulgt af en introduktion til at udvikle et out-of-the-box Apache Kafka messaging-system. Endelig bygger du en brugerdefineret producent / forbrugerapplikation, der sender og forbruger meddelelser via en Kafka-server. I anden halvdel af vejledningen lærer du, hvordan du partitionerer og grupperer meddelelser, og hvordan du styrer, hvilke meddelelser en Kafka-forbruger vil forbruge.

Hvad er Apache Kafka?

Apache Kafka er et messaging-system, der er designet til at skalere til store data. Svarende til Apache ActiveMQ eller RabbitMq gør Kafka det muligt for applikationer, der er bygget på forskellige platforme, at kommunikere via asynkron meddelelsesoverførsel. Men Kafka adskiller sig fra disse mere traditionelle messaging-systemer på nøglemåder:

  • Det er designet til at skalere vandret ved at tilføje flere handelsservere.
  • Det giver meget højere gennemløb for både producent- og forbrugerprocesser.
  • Det kan bruges til at understøtte både batch- og realtidsbrugssager.
  • Det understøtter ikke JMS, Java's meddelelsesorienterede middleware API.

Apache Kafkas arkitektur

Før vi udforsker Kafkas arkitektur, skal du kende dens grundlæggende terminologi:

  • EN producent er en proces, der kan offentliggøre en besked til et emne.
  • -en forbruger er en proces, der kan abonnere på et eller flere emner og forbruge meddelelser, der er offentliggjort til emner.
  • EN emnekategori er navnet på det feed, som meddelelser offentliggøres til.
  • EN mægler er en proces, der kører på en enkelt maskine.
  • EN klynge er en gruppe mæglere, der arbejder sammen.

Apache Kafkas arkitektur er meget enkel, hvilket kan resultere i bedre ydeevne og gennemstrømning i nogle systemer. Hvert emne i Kafka er som en simpel logfil. Når en producent offentliggør en besked, tilføjer Kafka-serveren den til slutningen af ​​logfilen for det givne emne. Serveren tildeler også en forskudt, som er et nummer, der bruges til permanent at identificere hver besked. Efterhånden som antallet af beskeder vokser, øges værdien af ​​hver forskydning; for eksempel hvis producenten offentliggør tre meddelelser, kan den første muligvis få en forskydning på 1, den anden en forskydning på 2 og den tredje en forskydning på 3.

Når Kafka-forbrugeren først starter, sender den en pull-anmodning til serveren og beder om at hente eventuelle meddelelser for et bestemt emne med en offsetværdi højere end 0. Serveren kontrollerer logfilen for dette emne og returnerer de tre nye meddelelser . Forbrugeren behandler meddelelserne og sender derefter en anmodning om beskeder med en forskydning højere end 3 osv.

I Kafka er klienten ansvarlig for at huske antallet af forskydninger og hente meddelelser. Kafka-serveren sporer ikke eller styrer beskedforbrug. Som standard holder en Kafka-server en besked i syv dage. En baggrundstråd på serveren kontrollerer og sletter meddelelser, der er syv dage eller ældre. En forbruger kan få adgang til meddelelser, så længe de er på serveren. Det kan læse en besked flere gange og endda læse beskeder i modsat rækkefølge af modtagelsen. Men hvis forbrugeren ikke henter beskeden, inden de syv dage er gået, vil den savne denne besked.

Kafka-benchmarks

Produktionsbrug af LinkedIn og andre virksomheder har vist, at Apache Kafka med korrekt konfiguration er i stand til at behandle hundreder af gigabyte data dagligt. I 2011 brugte tre LinkedIn-ingeniører benchmarktest for at demonstrere, at Kafka kunne opnå meget højere gennemstrømning end ActiveMQ og RabbitMQ.

Apache Kafka hurtig opsætning og demo

Vi bygger en brugerdefineret applikation i denne vejledning, men lad os starte med at installere og teste en Kafka-forekomst hos en out-of-the-box producent og forbruger.

  1. Besøg Kafka-download-siden for at installere den nyeste version (0,9 i skrivende stund).
  2. Uddrag binærfiler i en software / kafka folder. For den aktuelle version er det software / kafka_2.11-0.9.0.0.
  3. Skift dit aktuelle bibliotek for at pege på den nye mappe.
  4. Start Zookeeper-serveren ved at udføre kommandoen: bin / zookeeper-server-start.sh config / zookeeper.properties.
  5. Start Kafka-serveren ved at udføre: bin / kafka-server-start.sh config / server.properties.
  6. Opret et testemne, som du kan bruge til testning: bin / kafka-topics.sh --create - zookeeper localhost: 2181 - replikeringsfaktor 1 - partitioner 1 --topic javaworld.
  7. Start en simpel konsolforbruger, der kan forbruge meddelelser, der er offentliggjort til et givet emne, f.eks javaworld: bin / kafka-console-consumer.sh - zookeeper localhost: 2181 --topic javaworld - fra start.
  8. Start en simpel producentkonsol, der kan offentliggøre meddelelser til testemnet: bin / kafka-console-producer.sh - mæglerliste localhost: 9092 --topic javaworld.
  9. Prøv at skrive en eller to meddelelser i producentkonsollen. Dine meddelelser skal vises i forbrugerkonsollen.

Eksempel på applikation med Apache Kafka

Du har set, hvordan Apache Kafka fungerer ud af kassen. Lad os derefter udvikle en brugerdefineret producent / forbrugerapplikation. Producenten henter brugerinput fra konsollen og sender hver ny linje som en besked til en Kafka-server. Forbrugeren henter meddelelser for et givet emne og udskriver dem til konsollen. Producent- og forbrugerkomponenterne i dette tilfælde er dine egne implementeringer af kafka-console-producer.sh og kafka-console-consumer.sh.

Lad os starte med at oprette en Producent.java klasse. Denne klientklasse indeholder logik til at læse brugerinput fra konsollen og sende denne input som en besked til Kafka-serveren.

Vi konfigurerer producenten ved at oprette et objekt fra java.util.Ejendomme klasse og indstilling af dens egenskaber. Klassen ProducerConfig definerer alle de forskellige tilgængelige egenskaber, men Kafkas standardværdier er tilstrækkelige til de fleste anvendelser. For standardkonfigurationen behøver vi kun at indstille tre obligatoriske egenskaber:

  • BOOTSTRAP_SERVERS_CONFIG
  • KEY_SERIALIZER_CLASS_CONFIG
  • VALUE_SERIALIZER_CLASS_CONFIG

BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers) angiver en liste over værts: portpar, der bruges til at etablere de oprindelige forbindelser til Kakfa-klyngen i vært1: port1, vært2: port2, ... format. Selvom vi har mere end en mægler i vores Kafka-klynge, behøver vi kun at specificere værdien af ​​den første mægler vært: port. Kafka-klienten bruger denne værdi til at foretage et opdagelsesopkald hos mægleren, som returnerer en liste over alle mæglere i klyngen. Det er en god ide at angive mere end en mægler i BOOTSTRAP_SERVERS_CONFIG, så hvis den første mægler er nede, kan klienten prøve andre mæglere.

Kafka-serveren forventer beskeder i byte [] -tast, byte [] -værdi format. I stedet for at konvertere hver nøgle og værdi tillader Kafka's klientsidesbibliotek os at bruge venligere typer som Snor og int til afsendelse af beskeder. Biblioteket konverterer disse til den passende type. Eksempel-appen har f.eks. Ikke en meddelelsesspecifik nøgle, så vi bruger det nul for nøglen. For værdien bruger vi en Snor, som er de data, der er indtastet af brugeren på konsollen.

For at konfigurere beskednøgle, vi indstiller en værdi på KEY_SERIALIZER_CLASS_CONFIG på den org.apache.kafka.common.serialization.ByteArraySerializer. Dette fungerer fordi nul behøver ikke konverteres til byte []. Til beskedværdi, vi satte os VALUE_SERIALIZER_CLASS_CONFIG på den org.apache.kafka.common.serialization.StringSerializer, fordi den klasse ved, hvordan man konverterer en Snor ind i en byte [].

Brugerdefinerede nøgle / værdi objekter

Svarende til StringSerializer, Kafka leverer serialisatorer til andre primitiver såsom int og lang. For at kunne bruge et brugerdefineret objekt til vores nøgle eller værdi skal vi oprette en klasseimplementering org.apache.kafka.common.serialization.Serializer. Vi kunne derefter tilføje logik for at serieisere klassen til byte []. Vi bliver også nødt til at bruge en tilsvarende deserializer i vores forbrugerkode.

Kafka-producenten

Efter påfyldning af Ejendomme klasse med de nødvendige konfigurationsegenskaber, kan vi bruge den til at oprette et objekt af KafkaProducer. Hver gang vi vil sende en besked til Kafka-serveren efter det, opretter vi et objekt af Producentoptagelse og ring til KafkaProducer's sende() metode med den post for at sende beskeden. Det Producentoptagelse tager to parametre: navnet på det emne, som meddelelsen skal offentliggøres til, og den aktuelle meddelelse. Glem ikke at ringe til Producer.close () metode, når du er færdig med at bruge producenten:

Notering 1. KafkaProducer

 offentlig klasse Producer {privat statisk scanner i; public static void main (String [] argv) throw Exception {if (argv.length! = 1) {System.err.println ("Angiv 1 parametre"); System.exit (-1); } Streng topicName = argv [0]; i = ny scanner (System.in); System.out.println ("Indtast besked (skriv exit for at afslutte)"); // Konfigurer producentegenskaberne configProperties = nye egenskaber (); configProperties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost: 9092"); configProperties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); org.apache.kafka.clients.producer.Producer producer = ny KafkaProducer (configProperties); String line = in.nextLine (); mens (! line.equals ("exit")) {ProducerRecord rec = new ProducerRecord (topicName, line); producer.send (rec); linje = in.nextLine (); } in.close (); producer.close (); }} 

Konfiguration af beskedforbrugeren

Dernæst opretter vi en simpel forbruger, der abonnerer på et emne. Hver gang en ny besked offentliggøres til emnet, læser den beskeden og udskriver den til konsollen. Forbrugerkoden svarer meget til producentkoden. Vi starter med at skabe et objekt af java.util.Ejendomme, indstille dens forbrugerspecifikke egenskaber og derefter bruge den til at oprette et nyt objekt af KafkaConsumer. Klassen ConsumerConfig definerer alle de egenskaber, vi kan indstille. Der er kun fire obligatoriske egenskaber:

  • BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers)
  • KEY_DESERIALIZER_CLASS_CONFIG (key.deserializer)
  • VALUE_DESERIALIZER_CLASS_CONFIG (værdi. Deserializer)
  • GROUP_ID_CONFIG (bootstrap.servers)

Ligesom vi gjorde for producentklassen, bruger vi det BOOTSTRAP_SERVERS_CONFIG for at konfigurere værts- / portparene til forbrugerklassen. Denne konfiguration lader os etablere de oprindelige forbindelser til Kakfa-klyngen i vært1: port1, vært2: port2, ... format.

Som jeg tidligere har bemærket, forventer Kafka-serveren beskeder i byte [] nøgle og byte [] værdi formater, og har sin egen implementering til serieering af forskellige typer til byte []. Ligesom vi gjorde med producenten, skal vi på forbrugersiden bruge en brugerdefineret deserializer til at konvertere byte [] tilbage til den relevante type.

I tilfældet med eksempelapplikationen ved vi, at producenten bruger ByteArraySerializer for nøglen og StringSerializer for værdien. På klientsiden skal vi derfor bruge org.apache.kafka.common.serialization.ByteArrayDeserializer for nøglen og org.apache.kafka.common.serialization.StringDeserializer for værdien. Indstilling af disse klasser som værdier for KEY_DESERIALIZER_CLASS_CONFIG og VALUE_DESERIALIZER_CLASS_CONFIG vil gøre det muligt for forbrugeren at deserialisere byte [] kodede typer sendt af producenten.

Endelig er vi nødt til at indstille værdien af GROUP_ID_CONFIG. Dette skal være et gruppenavn i strengformat. Jeg forklarer mere om denne konfiguration om et øjeblik. For øjeblikket skal du bare se på Kafka-forbrugeren med de fire obligatoriske egenskaber:

$config[zx-auto] not found$config[zx-overlay] not found