Fabian Hueske er kommitter og PMC-medlem af Apache Flink-projektet og medstifter af Data Artisans.
Apache Flink er en ramme til implementering af stateful stream-behandlingsapplikationer og kørsel af dem i skala i en computerklynge. I en tidligere artikel undersøgte vi, hvad statusfuld strømbehandling er, hvilke brugssager den adresserer, og hvorfor du skal implementere og køre dine streamingapplikationer med Apache Flink.
I denne artikel vil jeg præsentere eksempler på to almindelige anvendelsestilfælde af stateful stream-behandling og diskutere, hvordan de kan implementeres med Flink. Den første anvendelse er begivenhedsdrevne applikationer, dvs. applikationer, der indtager kontinuerlige strømme af begivenheder og anvender en vis forretningslogik til disse begivenheder. Det andet er streaming analytics use case, hvor jeg vil præsentere to analytiske forespørgsler implementeret med Flinks SQL API, som samler streaming af data i realtid. Vi hos Data Artisans leverer kildekoden til alle vores eksempler i et offentligt GitHub-lager.
Før vi dykker ned i detaljerne i eksemplerne, introducerer jeg begivenhedsstrømmen, der indtages af eksempelapplikationerne, og forklarer, hvordan du kan køre den kode, vi leverer.
En strøm af taxa-begivenheder
Vores eksempler på applikationer er baseret på et offentligt datasæt om taxikørsel, der skete i New York City i 2013. Arrangørerne af 2015 DEBS (ACM International Conference on Distributed Event-Based Systems) Grand Challenge omarrangerede det originale datasæt og konverterede det til en enkelt CSV-fil, hvorfra vi læser de følgende ni felter.
- Medaljon - en MD5 sum id for taxien
- Hack_license - en MD5-sum-id for taxalicensen
- Pickup_datetime - tidspunktet, hvor passagerer blev afhentet
- Dropoff_datetime - det tidspunkt, hvor passagererne blev afleveret
- Pickup_longitude - længden af afhentningsstedet
- Pickup_latitude - breddegraden for afhentningsstedet
- Afgivelseslængde - længden på afleveringsstedet
- Dropoff_latitude - breddegraden for afleveringsstedet
- Total_beløb - samlet betalt i dollars
CSV-filen gemmer posterne i stigende rækkefølge efter deres attribut for afleveringstid. Derfor kan filen behandles som en ordnet log over begivenheder, der blev offentliggjort, da en rejse sluttede. For at køre eksemplerne, som vi leverer på GitHub, skal du downloade datasættet for DEBS-udfordringen fra Google Drev.
Alle eksempler på applikationer læser i rækkefølge CSV-filen og indtager den som en strøm af taxiturbegivenheder. Derefter behandler applikationerne begivenhederne ligesom enhver anden strøm, dvs. som en strøm, der indtages fra et logbaseret public-subscribe-system, såsom Apache Kafka eller Kinesis. Faktisk er læsning af en fil (eller enhver anden type vedvarende data) og behandling af den som en stream en hjørnesten i Flinks tilgang til at samle batch- og stream-behandling.
Kørsel af Flink-eksemplerne
Som tidligere nævnt offentliggjorde vi kildekoden til vores eksempelapplikationer i et GitHub-arkiv. Vi opfordrer dig til at forkaste og klone lageret. Eksemplerne kan let udføres inden for dit IDE-valg; du behøver ikke at konfigurere og konfigurere en Flink-klynge til at køre dem. Først skal du importere kildekoden til eksemplerne som et Maven-projekt. Udfør derefter hovedklassen i en applikation og angiv datafilens lagerplacering (se ovenfor for linket til download af data) som et programparameter.
Når du har startet en applikation, starter den en lokal, integreret Flink-forekomst inde i applikationens JVM-proces og sender applikationen til at udføre den. Du vil se en masse logopgørelser, mens Flink starter, og opgavens opgaver planlægges. Når applikationen kører, vil dens output blive skrevet til standardoutputtet.
Opbygning af en begivenhedsdrevet applikation i Flink
Lad os nu diskutere vores første brugssag, som er en begivenhedsdrevet applikation. Begivenhedsdrevne applikationer indtager strømme af begivenheder, udfører beregninger, når begivenhederne modtages, og kan udsende nye begivenheder eller udløse eksterne handlinger. Flere hændelsesdrevne applikationer kan sammensættes ved at forbinde dem sammen via hændelseslogsystemer, svarende til hvordan store systemer kan sammensættes fra mikrotjenester. Begivenhedsdrevne applikationer, hændelseslogfiler og snapshot af applikationstilstand (kendt som savepoints i Flink) omfatter et meget kraftigt designmønster, fordi du kan nulstille deres tilstand og afspille deres input for at komme sig efter en fejl, rette en fejl eller migrere en applikation til en anden klynge.
I denne artikel vil vi undersøge en begivenhedsdrevet applikation, der bakker en service, der overvåger arbejdstiden for taxachauffører. I 2016 besluttede NYC Taxi and Limousine Commission at begrænse taxichaufførernes arbejdstid til 12 timers skift og kræve en pause på mindst otte timer inden næste skift kan startes. Et skift starter med begyndelsen af den første tur. Fra da af kan en chauffør starte nye forlystelser inden for et vindue på 12 timer. Vores applikation sporer chaufførernes forlystelser, markerer sluttidspunktet for deres 12-timers vindue (dvs. det tidspunkt, hvor de kan starte den sidste tur) og markerer forlystelser, der overtrådte forordningen. Du kan finde den fulde kildekode til dette eksempel i vores GitHub-lager.
Vores applikation er implementeret med Flinks DataStream API og en KeyedProcessFunction
. DataStream API er en funktionel API og baseret på begrebet typede datastrømme. EN DataStream
er den logiske repræsentation af en strøm af begivenheder af typen T
. En strøm behandles ved at anvende en funktion til den, der producerer en anden datastrøm, muligvis af en anden type. Flink behandler streams parallelt ved at distribuere begivenheder til stream partitioner og anvende forskellige forekomster af funktioner til hver partition.
Følgende kodestykke viser strømmen på højt niveau i vores overvågningsapplikation.
// indtage strøm af taxaturer.DataStream rides = TaxiRides.getRides (env, inputPath);
DataStream
meddelelser = forlystelser // partitionsstrøm ved kørekort-id
.keyBy (r -> r.licenseId)
// overvåge ridehændelser og generere underretninger
.proces (ny MonitorWorkTime ());
// udskriv meddelelser
notifications.print ();
Applikationen begynder at indtage en strøm af taxiturbegivenheder. I vores eksempel læses begivenhederne fra en tekstfil, parses og gemmes i TaxiRide
POJO-objekter. En applikation fra den virkelige verden indtager typisk begivenhederne fra en meddelelseskø eller hændelseslog, såsom Apache Kafka eller Pravega. Det næste trin er at nøgle TaxiRide
begivenheder af licensId
af chaufføren. Det keyBy
operation partitionerer strømmen på det deklarerede felt, således at alle begivenheder med den samme nøgle behandles af den samme parallelle forekomst af følgende funktion. I vores tilfælde deler vi på licensId
felt, fordi vi vil overvåge arbejdstiden for hver enkelt chauffør.
Dernæst anvender vi MonitorWorkTime
funktion på den partitionerede TaxiRide
begivenheder. Funktionen sporer kørsler pr. Chauffør og overvåger deres skift og pausetider. Den udsender begivenheder af typen Tuple2
, hvor hver tuple repræsenterer en meddelelse bestående af førerens licens-id og en meddelelse. Endelig udsender vores applikation meddelelserne ved at udskrive dem til standardoutputtet. En applikation fra den virkelige verden ville skrive underretningerne til en ekstern besked eller et lagringssystem, som Apache Kafka, HDFS eller et databasesystem, eller udløse et eksternt opkald for straks at skubbe dem ud.
Nu hvor vi har drøftet den samlede strøm af applikationen, skal vi se på MonitorWorkTime
funktion, som indeholder det meste af applikationens faktiske forretningslogik. Det MonitorWorkTime
funktion er en stateful KeyedProcessFunction
der indtager TaxiRide
begivenheder og udsender Tuple2
optegnelser. Det KeyedProcessFunction
interface har to metoder til at behandle data: processElement ()
og onTimer ()
. Det processElement ()
metode kaldes for hver ankomne begivenhed. Det onTimer ()
metode kaldes, når en tidligere registreret timer aktiveres. Følgende uddrag viser skelet af MonitorWorkTime
funktion og alt, hvad der er erklæret uden for behandlingsmetoderne.
offentlig statisk klasse MonitorWorkTimeudvider KeyedProcessFunction
{ // tidskonstanter i millisekunder
privat statisk endelig lang ALLOWED_WORK_TIME = 12 * 60 * 60 * 1000; // 12 timer
privat statisk endelig lang REQ_BREAK_TIME = 8 * 60 * 60 * 1000; // 8 timer
privat statisk endelig lang CLEAN_UP_INTERVAL = 28 * 60 * 60 * 1000; // 24 timer
privat forbigående DateTimeFormatter formater;
// tilstandshåndtag for at gemme starttidspunktet for et skift
ValueState shiftStart;
@Override
offentligt tomrum åbent (Konfigurationskonf.) {
// registrer tilstandshåndtag
shiftStart = getRuntimeContext (). getState (
ny ValueStateDescriptor (“shiftStart”, Types.LONG));
// initialiser tidsformatering
this.formatter = DateTimeFormat.forPattern (“åååå-MM-dd HH: mm: ss”);
}
// processElement () og onTimer () diskuteres detaljeret nedenfor.
}
Funktionen erklærer et par konstanter for tidsintervaller i millisekunder, en tidsformatering og et tilstandshåndtag for tastet tilstand, der styres af Flink. Administreret tilstand kontrolleres periodisk og gendannes automatisk i tilfælde af en fejl. Tastetilstand er organiseret pr. Nøgle, hvilket betyder, at en funktion opretholder en værdi pr. Håndtag og nøgle. I vores tilfælde er MonitorWorkTime
funktion opretholder en Lang
værdi for hver nøgle, dvs. for hver licensId
. Det shiftStart
tilstand gemmer starttidspunktet for en chaufførs skift. Tilstandshåndtaget initialiseres i åben()
metode, som kaldes en gang, før den første begivenhed behandles.
Lad os nu se på processElement ()
metode.
@Overrideoffentlig ugyldighedsprocesElement (
TaxiRide ride,
Kontekst ctx,
Samler
ud) kaster undtagelse { // slå starttidspunktet for det sidste skift op
Lange startTs = shiftStart.value ();
hvis (startTs == null ||
startTs <ride.pickUpTime - (ALLOWED_WORK_TIME + REQ_BREAK_TIME)) {
// dette er den første tur i et nyt skift.
startTs = ride.pickUpTime;
shiftStart.update (startTs);
lange endeTs = startTs + ALLOWED_WORK_TIME;
out.collect (Tuple2.of (ride.licenseId,
“Du har tilladelse til at acceptere nye passagerer indtil” + formatter.print (endTs)));
// registrer timer for at rydde op i 24 timer
ctx.timerService (). registerEventTimeTimer (startTs + CLEAN_UP_INTERVAL);
} ellers hvis (startTs <ride.pickUpTime - ALLOWED_WORK_TIME) {
// denne tur startede efter den tilladte arbejdstid sluttede.
// det er en overtrædelse af reglerne!
out.collect (Tuple2.of (ride.licenseId,
"Denne tur overtrådte arbejdstidens regler."));
}
}
Det processElement ()
metode kaldes for hver TaxiRide
begivenhed. Først henter metoden starttidspunktet for førerens skift fra tilstandshåndtaget. Hvis staten ikke indeholder et starttidspunkt (startTs == null
) eller hvis det sidste skift startede mere end 20 timer (ALLOWED_WORK_TIME + REQ_BREAK_TIME
) tidligere end den aktuelle tur, er den aktuelle tur den første tur i et nyt skift. I begge tilfælde starter funktionen et nyt skift ved at opdatere skiftets starttid til starttidspunktet for den aktuelle tur, udsender en besked til føreren med sluttidspunktet for det nye skift og registrerer en timer til at rydde op i tilstand på 24 timer.
Hvis den aktuelle tur ikke er den første tur i et nyt skift, kontrollerer funktionen, om den overtræder arbejdstidsreguleringen, dvs. om den startede mere end 12 timer senere end starten af førerens nuværende skift. Hvis det er tilfældet, udsender funktionen en besked for at informere føreren om overtrædelsen.
Det processElement ()
metode til MonitorWorkTime
funktionen registrerer en timer til at rydde op i tilstanden 24 timer efter starten af et skift. Fjernelse af tilstand, der ikke længere er nødvendig, er vigtig for at forhindre voksende tilstandsstørrelser på grund af utæt tilstand. En timer aktiveres, når tidspunktet for applikationen passerer timeren. På det tidspunkt blev onTimer ()
metode kaldes. Svarende til tilstand opretholdes timere pr. Nøgle, og funktionen sættes i sammenhæng med den tilknyttede nøgle før onTimer ()
metode kaldes. Derfor rettes al tilstandsadgang til den nøgle, der var aktiv, da timeren blev registreret.
Lad os se på onTimer ()
metode til MonitorWorkTime
.
@Overrideoffentlig ugyldighed påTimer (
lange timerTs,
OnTimerContext ctx,
Samler
ud) kaster undtagelse { // fjern skifttilstanden, hvis der ikke allerede var startet et nyt skift.
Lange startTs = shiftStart.value ();
hvis (startTs == timerTs - CLEAN_UP_INTERVAL) {
shiftStart.clear ();
}
}
Det processElement ()
metoden registrerer timere i 24 timer efter et skift begyndte at rydde op i tilstand, der ikke længere er nødvendig. Oprydning af staten er den eneste logik, som onTimer ()
metode implementerer. Når en timer affyrer, kontrollerer vi, om føreren startede et nyt skift i mellemtiden, dvs. om skiftets starttid ændrede sig. Hvis det ikke er tilfældet, rydder vi skiftetilstanden for føreren.