Programmering

Sådan oprettes stateful streaming-applikationer med Apache Flink

Fabian Hueske er kommitter og PMC-medlem af Apache Flink-projektet og medstifter af Data Artisans.

Apache Flink er en ramme til implementering af stateful stream-behandlingsapplikationer og kørsel af dem i skala i en computerklynge. I en tidligere artikel undersøgte vi, hvad statusfuld strømbehandling er, hvilke brugssager den adresserer, og hvorfor du skal implementere og køre dine streamingapplikationer med Apache Flink.

I denne artikel vil jeg præsentere eksempler på to almindelige anvendelsestilfælde af stateful stream-behandling og diskutere, hvordan de kan implementeres med Flink. Den første anvendelse er begivenhedsdrevne applikationer, dvs. applikationer, der indtager kontinuerlige strømme af begivenheder og anvender en vis forretningslogik til disse begivenheder. Det andet er streaming analytics use case, hvor jeg vil præsentere to analytiske forespørgsler implementeret med Flinks SQL API, som samler streaming af data i realtid. Vi hos Data Artisans leverer kildekoden til alle vores eksempler i et offentligt GitHub-lager.

Før vi dykker ned i detaljerne i eksemplerne, introducerer jeg begivenhedsstrømmen, der indtages af eksempelapplikationerne, og forklarer, hvordan du kan køre den kode, vi leverer.

En strøm af taxa-begivenheder

Vores eksempler på applikationer er baseret på et offentligt datasæt om taxikørsel, der skete i New York City i 2013. Arrangørerne af 2015 DEBS (ACM International Conference on Distributed Event-Based Systems) Grand Challenge omarrangerede det originale datasæt og konverterede det til en enkelt CSV-fil, hvorfra vi læser de følgende ni felter.

  • Medaljon - en MD5 sum id for taxien
  • Hack_license - en MD5-sum-id for taxalicensen
  • Pickup_datetime - tidspunktet, hvor passagerer blev afhentet
  • Dropoff_datetime - det tidspunkt, hvor passagererne blev afleveret
  • Pickup_longitude - længden af ​​afhentningsstedet
  • Pickup_latitude - breddegraden for afhentningsstedet
  • Afgivelseslængde - længden på afleveringsstedet
  • Dropoff_latitude - breddegraden for afleveringsstedet
  • Total_beløb - samlet betalt i dollars

CSV-filen gemmer posterne i stigende rækkefølge efter deres attribut for afleveringstid. Derfor kan filen behandles som en ordnet log over begivenheder, der blev offentliggjort, da en rejse sluttede. For at køre eksemplerne, som vi leverer på GitHub, skal du downloade datasættet for DEBS-udfordringen fra Google Drev.

Alle eksempler på applikationer læser i rækkefølge CSV-filen og indtager den som en strøm af taxiturbegivenheder. Derefter behandler applikationerne begivenhederne ligesom enhver anden strøm, dvs. som en strøm, der indtages fra et logbaseret public-subscribe-system, såsom Apache Kafka eller Kinesis. Faktisk er læsning af en fil (eller enhver anden type vedvarende data) og behandling af den som en stream en hjørnesten i Flinks tilgang til at samle batch- og stream-behandling.

Kørsel af Flink-eksemplerne

Som tidligere nævnt offentliggjorde vi kildekoden til vores eksempelapplikationer i et GitHub-arkiv. Vi opfordrer dig til at forkaste og klone lageret. Eksemplerne kan let udføres inden for dit IDE-valg; du behøver ikke at konfigurere og konfigurere en Flink-klynge til at køre dem. Først skal du importere kildekoden til eksemplerne som et Maven-projekt. Udfør derefter hovedklassen i en applikation og angiv datafilens lagerplacering (se ovenfor for linket til download af data) som et programparameter.

Når du har startet en applikation, starter den en lokal, integreret Flink-forekomst inde i applikationens JVM-proces og sender applikationen til at udføre den. Du vil se en masse logopgørelser, mens Flink starter, og opgavens opgaver planlægges. Når applikationen kører, vil dens output blive skrevet til standardoutputtet.

Opbygning af en begivenhedsdrevet applikation i Flink

Lad os nu diskutere vores første brugssag, som er en begivenhedsdrevet applikation. Begivenhedsdrevne applikationer indtager strømme af begivenheder, udfører beregninger, når begivenhederne modtages, og kan udsende nye begivenheder eller udløse eksterne handlinger. Flere hændelsesdrevne applikationer kan sammensættes ved at forbinde dem sammen via hændelseslogsystemer, svarende til hvordan store systemer kan sammensættes fra mikrotjenester. Begivenhedsdrevne applikationer, hændelseslogfiler og snapshot af applikationstilstand (kendt som savepoints i Flink) omfatter et meget kraftigt designmønster, fordi du kan nulstille deres tilstand og afspille deres input for at komme sig efter en fejl, rette en fejl eller migrere en applikation til en anden klynge.

I denne artikel vil vi undersøge en begivenhedsdrevet applikation, der bakker en service, der overvåger arbejdstiden for taxachauffører. I 2016 besluttede NYC Taxi and Limousine Commission at begrænse taxichaufførernes arbejdstid til 12 timers skift og kræve en pause på mindst otte timer inden næste skift kan startes. Et skift starter med begyndelsen af ​​den første tur. Fra da af kan en chauffør starte nye forlystelser inden for et vindue på 12 timer. Vores applikation sporer chaufførernes forlystelser, markerer sluttidspunktet for deres 12-timers vindue (dvs. det tidspunkt, hvor de kan starte den sidste tur) og markerer forlystelser, der overtrådte forordningen. Du kan finde den fulde kildekode til dette eksempel i vores GitHub-lager.

Vores applikation er implementeret med Flinks DataStream API og en KeyedProcessFunction. DataStream API er en funktionel API og baseret på begrebet typede datastrømme. EN DataStream er den logiske repræsentation af en strøm af begivenheder af typen T. En strøm behandles ved at anvende en funktion til den, der producerer en anden datastrøm, muligvis af en anden type. Flink behandler streams parallelt ved at distribuere begivenheder til stream partitioner og anvende forskellige forekomster af funktioner til hver partition.

Følgende kodestykke viser strømmen på højt niveau i vores overvågningsapplikation.

// indtage strøm af taxaturer.

DataStream rides = TaxiRides.getRides (env, inputPath);

DataStream meddelelser = forlystelser

// partitionsstrøm ved kørekort-id

.keyBy (r -> r.licenseId)

// overvåge ridehændelser og generere underretninger

.proces (ny MonitorWorkTime ());

// udskriv meddelelser

notifications.print ();

Applikationen begynder at indtage en strøm af taxiturbegivenheder. I vores eksempel læses begivenhederne fra en tekstfil, parses og gemmes i TaxiRide POJO-objekter. En applikation fra den virkelige verden indtager typisk begivenhederne fra en meddelelseskø eller hændelseslog, såsom Apache Kafka eller Pravega. Det næste trin er at nøgle TaxiRide begivenheder af licensId af chaufføren. Det keyBy operation partitionerer strømmen på det deklarerede felt, således at alle begivenheder med den samme nøgle behandles af den samme parallelle forekomst af følgende funktion. I vores tilfælde deler vi på licensId felt, fordi vi vil overvåge arbejdstiden for hver enkelt chauffør.

Dernæst anvender vi MonitorWorkTime funktion på den partitionerede TaxiRide begivenheder. Funktionen sporer kørsler pr. Chauffør og overvåger deres skift og pausetider. Den udsender begivenheder af typen Tuple2, hvor hver tuple repræsenterer en meddelelse bestående af førerens licens-id og en meddelelse. Endelig udsender vores applikation meddelelserne ved at udskrive dem til standardoutputtet. En applikation fra den virkelige verden ville skrive underretningerne til en ekstern besked eller et lagringssystem, som Apache Kafka, HDFS eller et databasesystem, eller udløse et eksternt opkald for straks at skubbe dem ud.

Nu hvor vi har drøftet den samlede strøm af applikationen, skal vi se på MonitorWorkTime funktion, som indeholder det meste af applikationens faktiske forretningslogik. Det MonitorWorkTime funktion er en stateful KeyedProcessFunction der indtager TaxiRide begivenheder og udsender Tuple2 optegnelser. Det KeyedProcessFunction interface har to metoder til at behandle data: processElement () og onTimer (). Det processElement () metode kaldes for hver ankomne begivenhed. Det onTimer () metode kaldes, når en tidligere registreret timer aktiveres. Følgende uddrag viser skelet af MonitorWorkTime funktion og alt, hvad der er erklæret uden for behandlingsmetoderne.

offentlig statisk klasse MonitorWorkTime

udvider KeyedProcessFunction {

// tidskonstanter i millisekunder

privat statisk endelig lang ALLOWED_WORK_TIME = 12 * 60 * 60 * 1000; // 12 timer

privat statisk endelig lang REQ_BREAK_TIME = 8 * 60 * 60 * 1000; // 8 timer

privat statisk endelig lang CLEAN_UP_INTERVAL = 28 * 60 * 60 * 1000; // 24 timer

privat forbigående DateTimeFormatter formater;

// tilstandshåndtag for at gemme starttidspunktet for et skift

ValueState shiftStart;

@Override

offentligt tomrum åbent (Konfigurationskonf.) {

// registrer tilstandshåndtag

shiftStart = getRuntimeContext (). getState (

ny ValueStateDescriptor (“shiftStart”, Types.LONG));

// initialiser tidsformatering

this.formatter = DateTimeFormat.forPattern (“åååå-MM-dd HH: mm: ss”);

  }

// processElement () og onTimer () diskuteres detaljeret nedenfor.

}

Funktionen erklærer et par konstanter for tidsintervaller i millisekunder, en tidsformatering og et tilstandshåndtag for tastet tilstand, der styres af Flink. Administreret tilstand kontrolleres periodisk og gendannes automatisk i tilfælde af en fejl. Tastetilstand er organiseret pr. Nøgle, hvilket betyder, at en funktion opretholder en værdi pr. Håndtag og nøgle. I vores tilfælde er MonitorWorkTime funktion opretholder en Lang værdi for hver nøgle, dvs. for hver licensId. Det shiftStart tilstand gemmer starttidspunktet for en chaufførs skift. Tilstandshåndtaget initialiseres i åben() metode, som kaldes en gang, før den første begivenhed behandles.

Lad os nu se på processElement () metode.

@Override

offentlig ugyldighedsprocesElement (

TaxiRide ride,

Kontekst ctx,

Samler ud) kaster undtagelse {

// slå starttidspunktet for det sidste skift op

Lange startTs = shiftStart.value ();

hvis (startTs == null ||

startTs <ride.pickUpTime - (ALLOWED_WORK_TIME + REQ_BREAK_TIME)) {

// dette er den første tur i et nyt skift.

startTs = ride.pickUpTime;

shiftStart.update (startTs);

lange endeTs = startTs + ALLOWED_WORK_TIME;

out.collect (Tuple2.of (ride.licenseId,

“Du har tilladelse til at acceptere nye passagerer indtil” + formatter.print (endTs)));

// registrer timer for at rydde op i 24 timer

ctx.timerService (). registerEventTimeTimer (startTs + CLEAN_UP_INTERVAL);

} ellers hvis (startTs <ride.pickUpTime - ALLOWED_WORK_TIME) {

// denne tur startede efter den tilladte arbejdstid sluttede.

// det er en overtrædelse af reglerne!

out.collect (Tuple2.of (ride.licenseId,

"Denne tur overtrådte arbejdstidens regler."));

  }

}

Det processElement () metode kaldes for hver TaxiRide begivenhed. Først henter metoden starttidspunktet for førerens skift fra tilstandshåndtaget. Hvis staten ikke indeholder et starttidspunkt (startTs == null) eller hvis det sidste skift startede mere end 20 timer (ALLOWED_WORK_TIME + REQ_BREAK_TIME) tidligere end den aktuelle tur, er den aktuelle tur den første tur i et nyt skift. I begge tilfælde starter funktionen et nyt skift ved at opdatere skiftets starttid til starttidspunktet for den aktuelle tur, udsender en besked til føreren med sluttidspunktet for det nye skift og registrerer en timer til at rydde op i tilstand på 24 timer.

Hvis den aktuelle tur ikke er den første tur i et nyt skift, kontrollerer funktionen, om den overtræder arbejdstidsreguleringen, dvs. om den startede mere end 12 timer senere end starten af ​​førerens nuværende skift. Hvis det er tilfældet, udsender funktionen en besked for at informere føreren om overtrædelsen.

Det processElement () metode til MonitorWorkTime funktionen registrerer en timer til at rydde op i tilstanden 24 timer efter starten af ​​et skift. Fjernelse af tilstand, der ikke længere er nødvendig, er vigtig for at forhindre voksende tilstandsstørrelser på grund af utæt tilstand. En timer aktiveres, når tidspunktet for applikationen passerer timeren. På det tidspunkt blev onTimer () metode kaldes. Svarende til tilstand opretholdes timere pr. Nøgle, og funktionen sættes i sammenhæng med den tilknyttede nøgle før onTimer () metode kaldes. Derfor rettes al tilstandsadgang til den nøgle, der var aktiv, da timeren blev registreret.

Lad os se på onTimer () metode til MonitorWorkTime.

@Override

offentlig ugyldighed påTimer (

lange timerTs,

OnTimerContext ctx,

Samler ud) kaster undtagelse {

// fjern skifttilstanden, hvis der ikke allerede var startet et nyt skift.

Lange startTs = shiftStart.value ();

hvis (startTs == timerTs - CLEAN_UP_INTERVAL) {

shiftStart.clear ();

  }

}

Det processElement () metoden registrerer timere i 24 timer efter et skift begyndte at rydde op i tilstand, der ikke længere er nødvendig. Oprydning af staten er den eneste logik, som onTimer () metode implementerer. Når en timer affyrer, kontrollerer vi, om føreren startede et nyt skift i mellemtiden, dvs. om skiftets starttid ændrede sig. Hvis det ikke er tilfældet, rydder vi skiftetilstanden for føreren.