Programmering

Bygget til realtid: Big data messaging med Apache Kafka, del 2

I første halvdel af denne JavaWorld-introduktion til Apache Kafka udviklede du et par små producent- / forbrugerapplikationer ved hjælp af Kafka. Fra disse øvelser skal du være fortrolig med det grundlæggende i Apache Kafka messaging-systemet. I denne anden halvdel lærer du, hvordan du bruger partitioner til at distribuere belastning og skalere din applikation vandret, og håndtere op til millioner af beskeder om dagen. Du lærer også, hvordan Kafka bruger beskedforskydninger til at spore og administrere kompleks beskedbehandling, og hvordan du beskytter dit Apache Kafka-beskedsystem mod fejl, hvis en forbruger skulle gå ned. Vi udvikler eksempelapplikationen fra del 1 til både cases om public-subscribe og point-to-point use.

Skillevægge i Apache Kafka

Emner i Kafka kan opdeles i partitioner. For eksempel, mens du opretter et emne ved navn Demo, kan du konfigurere det til at have tre partitioner. Serveren opretter tre logfiler, en til hver af demopartitionerne. Når en producent offentliggjorde en besked til emnet, tildelte den et partitions-id til den besked. Serveren tilføjede derefter kun meddelelsen til logfilen for den partition.

Hvis du derefter startede to forbrugere, tildelte serveren muligvis partitioner 1 og 2 til den første forbruger og partition 3 til den anden forbruger. Hver forbruger læste kun fra sine tildelte partitioner. Du kan se demo-emnet konfigureret til tre partitioner i figur 1.

For at udvide scenariet kan du forestille dig en Kafka-klynge med to mæglere, der ligger i to maskiner. Når du partitionerede demo-emnet, ville du konfigurere det til at have to partitioner og to replikaer. For denne type konfiguration tildelte Kafka-serveren de to partitioner til de to mæglere i din klynge. Hver mægler ville være leder for en af ​​partitionerne.

Når en producent offentliggjorde en besked, ville den gå til partitionslederen. Lederen ville tage beskeden og tilføje den til logfilen på den lokale maskine. Den anden mægler ville passivt replikere den forpligtende log til sin egen maskine. Hvis partitionslederen gik ned, ville den anden mægler blive den nye leder og begynde at betjene kundeanmodninger. På samme måde, når en forbruger sendte en anmodning til en partition, ville denne anmodning først gå til partitionslederen, som ville returnere de anmodede meddelelser.

Fordele ved partitionering

Overvej fordelene ved at opdele et Kafka-baseret messaging-system:

  1. Skalerbarhed: I et system med kun en partition lagres meddelelser, der er offentliggjort til et emne, i en logfil, der findes på en enkelt maskine. Antallet af meddelelser for et emne skal passe ind i en enkelt logfil, og størrelsen på de gemte meddelelser kan aldrig være mere end maskinens diskplads. Ved at opdele et emne kan du skalere dit system ved at gemme meddelelser på forskellige maskiner i en klynge. Hvis du f.eks. Vil gemme 30 gigabyte (GB) meddelelser til demo-emnet, kan du opbygge en Kafka-klynge på tre maskiner, hver med 10 GB diskplads. Derefter ville du konfigurere emnet til at have tre partitioner.
  2. Balance mellem serverbelastning: At have flere partitioner giver dig mulighed for at sprede meddelelsesanmodninger på tværs af mæglere. For eksempel, hvis du havde et emne, der behandlede 1 million meddelelser pr. Sekund, kunne du opdele det i 100 partitioner og tilføje 100 mæglere til din klynge. Hver mægler ville være leder for en enkelt partition, der er ansvarlig for at svare på kun 10.000 kundeanmodninger i sekundet.
  3. Balance mellem forbruger og belastning: På samme måde som serverbelastningsbalancering kan hosting af flere forbrugere på forskellige maskiner dig sprede forbrugerbelastningen. Lad os sige, at du ville forbruge 1 million meddelelser pr. Sekund fra et emne med 100 partitioner. Du kan oprette 100 forbrugere og køre dem parallelt. Kafka-serveren tildelte en enkelt partition til hver af forbrugerne, og hver forbruger ville behandle 10.000 meddelelser parallelt. Da Kafka kun tildeler hver partition til en forbruger, vil hver meddelelse inden for partitionen blive fortæret i rækkefølge.

To måder at opdele på

Producenten er ansvarlig for at beslutte, hvilken partition en meddelelse skal gå til. Producenten har to muligheder for at kontrollere denne opgave:

  • Brugerdefineret partitioner: Du kan oprette en klasse, der implementerer org.apache.kafka.clients.producer.Partitioner interface. Denne skik Partitioner implementerer forretningslogikken for at afgøre, hvor beskeder sendes.
  • Standardpartitioner: Hvis du ikke opretter en brugerdefineret partitionerklasse, skal du som standard org.apache.kafka.clients.producer.internals.DefaultPartitioner klasse vil blive brugt. Standardpartitioneren er god nok i de fleste tilfælde og giver tre muligheder:
    1. brugervejledning: Når du opretter en Producentoptagelse, brug den overbelastede konstruktør ny ProducerRecord (topicName, partitionId, messageKey, message) for at specificere et partitions-id.
    2. Hashing (lokalitetsfølsom): Når du opretter en Producentoptagelse, angiv en beskednøgleved at ringe nyt ProducerRecord (topicName, messageKey, message). Standardpartitioner vil bruge hash for nøglen til at sikre, at alle meddelelser for den samme nøgle går til samme producent. Dette er den nemmeste og mest almindelige tilgang.
    3. Sprøjtning (tilfældig belastningsafbalancering): Hvis du ikke vil kontrollere, hvilke partitionsbeskeder der går til, skal du blot ringe nyt ProducerRecord (topicName, besked) at oprette din Producentoptagelse. I dette tilfælde vil partitioneren sende meddelelser til alle partitionerne på rund-robin måde, hvilket sikrer en afbalanceret serverbelastning.

Partitionering af et Apache Kafka-program

Til det simple producent / forbrugereksempel i del 1 brugte vi a Standardpartitioner. Nu prøver vi at oprette en brugerdefineret partitioner i stedet. Lad os i dette eksempel antage, at vi har et detailwebsted, som forbrugerne kan bruge til at bestille produkter overalt i verden. Baseret på brug ved vi, at de fleste forbrugere er i enten USA eller Indien. Vi vil opdele vores ansøgning om at sende ordrer fra USA eller Indien til deres egne respektive forbrugere, mens ordrer fra andre steder vil gå til en tredje forbruger.

For at starte opretter vi en CountryPartitioner der implementerer org.apache.kafka.clients.producer.Partitioner interface. Vi skal implementere følgende metoder:

  1. Kafka ringer konfigurer () når vi initialiserer Partitioner klasse med en Kort af konfigurationsegenskaber. Denne metode initialiserer funktioner, der er specifikke for applikationens forretningslogik, såsom at oprette forbindelse til en database. I dette tilfælde vil vi have en ret generisk partitioner, der tager landenavn som en ejendom. Vi kan derefter bruge configProperties.put ("partitions.0", "USA") for at kortlægge strømmen af ​​meddelelser til partitioner. I fremtiden kan vi bruge dette format til at ændre, hvilke lande der får deres egen partition.
  2. Det Producent API-opkald skillevæg() en gang for hver besked. I dette tilfælde bruger vi den til at læse beskeden og analysere landets navn fra meddelelsen. Hvis navnet på landet er i countryToPartitionMap, det vender tilbage partitionId gemt i Kort. Hvis ikke, vil det hash værdien af ​​landet og bruge det til at beregne, hvilken partition det skal gå til.
  3. Vi ringer tæt() at lukke partitioneren. Brug af denne metode sikrer, at alle ressourcer, der er erhvervet under initialiseringen, renses under nedlukning.

Bemærk, at når Kafka ringer konfigurer (), vil Kafka-producenten videregive alle de egenskaber, som vi har konfigureret til producenten, til Partitioner klasse. Det er vigtigt, at vi kun læser de egenskaber, der starter med skillevægge., analyser dem for at få partitionId, og gem ID'et i countryToPartitionMap.

Nedenfor er vores brugerdefinerede implementering af Partitioner interface.

Notering 1. CountryPartitioner

 offentlig klasse CountryPartitioner implementerer Partitioner {privat statisk kort countryToPartitionMap; offentlig ugyldig konfiguration (Map configs) {System.out.println ("Inside CountryPartitioner.configure" + configs); countryToPartitionMap = ny HashMap (); for (Map.Entry entry: configs.entrySet ()) {if (entry.getKey (). startsWith ("partitions.")) {String keyName = entry.getKey (); Strengværdi = (String) entry.getValue (); System.out.println (keyName.substring (11)); int paritionId = Integer.parseInt (keyName.substring (11)); countryToPartitionMap.put (værdi, paritionId); }}} offentlig int-partition (strengemne, objektnøgle, byte [] keyBytes, objektværdi, byte [] valueBytes, klyngeklynge) {Liste partitioner = cluster.availablePartitionsForTopic (emne); String valueStr = (String) værdi; String countryName = ((String) value) .split (":") [0]; hvis (countryToPartitionMap.containsKey (countryName)) {// Hvis landet er kortlagt til en bestemt partition, returneres det countryToPartitionMap.get (countryName); } ellers {// Hvis intet land er kortlagt til en bestemt partition, skal du distribuere mellem de resterende partitioner int noOfPartitions = cluster.topics (). størrelse (); return value.hashCode ()% noOfPartitions + countryToPartitionMap.size (); }} offentlig tomrum lukket () {}} 

Det Producent klasse i liste 2 (nedenfor) ligner meget vores enkle producent fra del 1 med to ændringer markeret med fed skrift:

  1. Vi indstiller en konfigurationsegenskab med en nøgle svarende til værdien af ProducerConfig.PARTITIONER_CLASS_CONFIG, der matcher det fuldt kvalificerede navn på vores CountryPartitioner klasse. Vi sætter også landenavn til partitionIdog kortlægger dermed de egenskaber, som vi vil overføre til CountryPartitioner.
  2. Vi passerer en forekomst af en klasse, der implementerer org.apache.kafka.clients.producer.Callback interface som et andet argument til producer.send () metode. Kafka-klienten vil ringe til sin onCompletion () metode, når en meddelelse er offentliggjort med vedhæftning af en RecordMetadata objekt. Vi vil være i stand til at bruge dette objekt til at finde ud af, hvilken partition en meddelelse blev sendt til, samt den forskydning, der er tildelt den offentliggjorte meddelelse.

Notering 2. En opdelt producent

 offentlig klasse Producer {privat statisk scanner i; public static void main (String [] argv) throw Exception {if (argv.length! = 1) {System.err.println ("Angiv 1 parametre"); System.exit (-1); } Streng topicName = argv [0]; i = ny scanner (System.in); System.out.println ("Indtast besked (skriv exit for at afslutte)"); // Konfigurer producentegenskaberne configProperties = nye egenskaber (); configProperties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost: 9092"); configProperties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");  configProperties.put (ProducerConfig.PARTITIONER_CLASS_CONFIG, CountryPartitioner.class.getCanonicalName ()); configProperties.put ("partition.1", "USA"); configProperties.put ("partition.2", "Indien");  org.apache.kafka.clients.producer.Producer producer = ny KafkaProducer (configProperties); String line = in.nextLine (); while (! line.equals ("exit")) {ProducerRecord rec = new ProducerRecord (topicName, null, line); producer.send (rec, ny Callback () {public void onCompletion (RecordMetadata metadata, Exception undtagelse) {System.out.println ("Besked sendt til emne ->" + metadata.topic () + ", parition->" + metadata.partition () + "lagret ved offset->" + metadata.offset ()); ; }}); linje = in.nextLine (); } in.close (); producer.close (); }} 

Tildeling af partitioner til forbrugere

Kafka-serveren garanterer, at en partition er tildelt kun en forbruger og derved garanterer rækkefølgen af ​​meddelelsesforbrug. Du kan tildele en partition manuelt eller få den tildelt automatisk.

Hvis din forretningslogik kræver mere kontrol, skal du manuelt tildele partitioner. I dette tilfælde vil du bruge KafkaConsumer.assign () at sende en liste over partitioner, som hver forbruger var interesseret i, til Kakfa-serveren.

At have partitioner tildelt automatisk er standard og mest almindelige valg. I dette tilfælde tildeler Kafka-serveren en partition til hver forbruger og tildeler partitioner igen, så de kan skaleres til nye forbrugere.

Sig, at du opretter et nyt emne med tre partitioner. Når du starter den første forbruger til det nye emne, tildeler Kafka alle tre partitioner til den samme forbruger. Hvis du derefter starter en anden forbruger, tildeler Kafka alle partitionerne igen og tildeler en partition til den første forbruger og de resterende to partitioner til den anden forbruger. Hvis du tilføjer en tredje forbruger, tildeler Kafka partitionerne igen, så hver forbruger tildeles en enkelt partition. Endelig, hvis du starter fjerde og femte forbruger, så vil tre af forbrugerne have en tildelt partition, men de andre modtager ingen meddelelser. Hvis en af ​​de første tre partitioner går ned, bruger Kafka den samme partitioneringslogik til at omfordele forbrugerens partition til en af ​​de ekstra forbrugere.

$config[zx-auto] not found$config[zx-overlay] not found