Programmering

Sådan bruges Redis til realtids streambehandling

Roshan Kumar er senior produktchef hos Redis Labs.

Realtidsindtagelse af streaming af data er et almindeligt krav i mange big data-brugssager. Inden for områder som IoT, e-handel, sikkerhed, kommunikation, underholdning, økonomi og detailhandel, hvor så meget afhænger af rettidig og nøjagtig datadrevet beslutningstagning, er realtids dataindsamling og analyse faktisk kernen i virksomheden.

Imidlertid udgør indsamling, lagring og behandling af streamingdata i store mængder og med høj hastighed arkitektoniske udfordringer. Et vigtigt første skridt i leveringen af ​​realtidsdataanalyse er at sikre, at der er tilstrækkelige ressourcer til netværk, beregning, lagring og hukommelse til at opfange hurtige datastrømme. Men en virksomheds softwarestak skal matche effektiviteten af ​​dens fysiske infrastruktur. Ellers vil virksomheder stå over for en massiv efterslæb af data eller værre, manglende eller ufuldstændige data.

Redis er blevet et populært valg for sådanne hurtige dataindtagsscenarier. En letvægts databasehukommelse i hukommelse, Redis opnår kapacitet i de millioner af operationer pr. Sekund med forsinkelser på under millisekunder, mens de trækker på minimale ressourcer. Det tilbyder også enkle implementeringer, aktiveret af dets mange datastrukturer og funktioner.

I denne artikel vil jeg vise, hvordan Redis Enterprise kan løse almindelige udfordringer forbundet med indtagelse og behandling af store mængder data med høj hastighed. Vi gennemgår tre forskellige tilgange (inklusive kode) til behandling af et Twitter-feed i realtid ved hjælp af henholdsvis Redis Pub / Sub, Redis Lists og Redis Sorted Sets. Som vi ser, har alle tre metoder en rolle at spille i hurtig indtagelse af data, afhængigt af brugssagen.

Udfordringer i design af hurtige dataindtagsløsninger

Hurtig dataindtagelse involverer ofte flere forskellige typer kompleksitet:

  • Store mængder data ankommer nogle gange i bursts. Bursty-data kræver en løsning, der er i stand til at behandle store datamængder med minimal latens. Ideelt set skal det være i stand til at udføre millioner af skrivninger pr. Sekund med latens på under millisekunder ved hjælp af minimale ressourcer.
  • Data fra flere kilder. Dataindtagelsesløsninger skal være fleksible nok til at håndtere data i mange forskellige formater, om nødvendigt beholde kildeidentitet og transformere eller normalisere i realtid.
  • Data, der skal filtreres, analyseres eller videresendes. De fleste dataindtagsløsninger har en eller flere abonnenter, der bruger dataene. Dette er ofte forskellige applikationer, der fungerer på samme eller forskellige steder med et varieret sæt antagelser. I sådanne tilfælde skal databasen ikke kun transformere dataene, men også filtrere eller aggregere afhængigt af kravene til de forbrugende applikationer.
  • Data, der kommer fra geografisk distribuerede kilder. I dette scenarie er det ofte praktisk at distribuere dataindsamlingsnoderne og placere dem tæt på kilderne. Selve knudepunkterne bliver en del af den hurtige dataindtagelsesløsning til at indsamle, behandle, videresende eller omdirigere indtagsdata.

Håndtering af hurtige dataindtag i Redis

Mange løsninger, der understøtter hurtig dataindtagelse i dag, er komplekse, funktionsrige og overkonstruerede til enkle krav. Redis er derimod ekstremt let, hurtig og nem at bruge. Med klienter tilgængelige på mere end 60 sprog kan Redis nemt integreres med de populære softwarestakke.

Redis tilbyder datastrukturer som lister, sæt, sorterede sæt og bindestreg, der tilbyder enkel og alsidig databehandling. Redis leverer mere end en million læs / skriv-operationer pr. Sekund med forsinkelse på under millisekunder på en beskyttet størrelse skyforekomst, hvilket gør den ekstremt ressourceeffektiv til store datamængder. Redis understøtter også messaging-tjenester og klientbiblioteker på alle de populære programmeringssprog, hvilket gør det velegnet til at kombinere højhastighedsdataindtagelse og realtidsanalyse. Redis Pub / Sub-kommandoer giver det mulighed for at spille rollen som en meddelelsesmægler mellem udgivere og abonnenter, en funktion, der ofte bruges til at sende underretninger eller meddelelser mellem distribuerede dataindtagningsnoder.

Redis Enterprise forbedrer Redis med problemfri skalering, altid tilgængelighed, automatisk implementering og muligheden for at bruge omkostningseffektiv flash-hukommelse som en RAM-forlænger, så behandlingen af ​​store datasæt kan udføres omkostningseffektivt.

I nedenstående afsnit vil jeg skitsere, hvordan du bruger Redis Enterprise til at tackle almindelige dataindtagelsesudfordringer.

Redis med hastigheden på Twitter

For at illustrere Redis enkelhed undersøger vi en prøve med hurtig dataindtagelsesløsning, der samler beskeder fra et Twitter-feed. Målet med denne løsning er at behandle tweets i realtid og skubbe dem ned ad røret, når de behandles.

Twitter-data, der indtages af løsningen, forbruges derefter af flere processorer ned ad linjen. Som vist i figur 1 handler dette eksempel om to processorer - den engelske Tweet-processor og influencer-processoren. Hver processor filtrerer tweets og sender dem ned i sine respektive kanaler til andre forbrugere. Denne kæde kan gå så langt som løsningen kræver. I vores eksempel stopper vi dog på tredje niveau, hvor vi samler populære diskussioner blandt engelsktalende og topindflydende.

Redis Labs

Bemærk, at vi bruger eksemplet på behandling af Twitter-feeds på grund af hastigheden af ​​datainkomst og enkelhed. Bemærk også, at Twitter-data når vores hurtige dataindtag via en enkelt kanal. I mange tilfælde, såsom IoT, kan der være flere datakilder, der sender data til hovedmodtageren.

Der er tre mulige måder at implementere denne løsning ved hjælp af Redis: indtage med Redis Pub / Sub, indtage med List-datastrukturen eller indtage med datastrukturen Sorted Set. Lad os undersøge hver af disse muligheder.

Indtag med Redis Pub / Sub

Dette er den enkleste implementering af hurtig dataindtagelse. Denne løsning bruger Redis's Pub / Sub-funktion, som gør det muligt for applikationer at offentliggøre og abonnere på beskeder. Som vist i figur 2 behandler hvert trin dataene og offentliggør dem til en kanal. Det efterfølgende trin abonnerer på kanalen og modtager meddelelserne til yderligere behandling eller filtrering.

Redis Labs

Fordele

  • Let at implementere.
  • Fungerer godt, når datakilderne og processerne distribueres geografisk.

Ulemper

  • Løsningen kræver, at udgivere og abonnenter er oppe hele tiden. Abonnenter mister data, når de stoppes, eller når forbindelsen mistes.
  • Det kræver flere forbindelser. Et program kan ikke offentliggøre og abonnere på den samme forbindelse, så hver mellemliggende databehandler kræver to forbindelser - en for at abonnere og en for at offentliggøre. Hvis du kører Redis på en DBaaS-platform, er det vigtigt at kontrollere, om din pakke eller dit serviceniveau har nogen begrænsninger for antallet af forbindelser.

En note om forbindelser

Hvis mere end en klient abonnerer på en kanal, skubber Redis dataene til hver klient lineært, den ene efter den anden. Store datanyttelast og mange forbindelser kan indføre ventetid mellem en udgiver og dens abonnenter. Selvom standardhardgrænsen for maksimalt antal forbindelser er 10.000, skal du teste og benchmarke, hvor mange forbindelser der passer til din nyttelast.

Redis vedligeholder en klientoutputbuffer for hver klient. Standardgrænserne for klientens outputbuffer til Pub / Sub er indstillet som:

klient-output-buffer-grænse pubsub 32 MB 8 MB 60

Med denne indstilling vil Redis tvinge klienter til at afbryde forbindelsen under to betingelser: hvis outputbufferen vokser ud over 32 MB, eller hvis outputbufferen holder 8 MB data konsekvent i 60 sekunder.

Dette er tegn på, at klienter forbruger dataene langsommere, end de offentliggøres. Hvis en sådan situation opstår, skal du først prøve at optimere forbrugerne, så de ikke tilføjer latens, mens de forbruger dataene. Hvis du bemærker, at dine kunder stadig afbrydes, kan du muligvis øge grænserne for client-output-buffer-limit pubsub ejendom i redis.conf. Vær opmærksom på, at ændringer i indstillingerne kan øge latenstiden mellem udgiveren og abonnenten. Eventuelle ændringer skal testes og verificeres grundigt.

Kodedesign til Redis Pub / Sub-løsningen

Redis Labs

Dette er den enkleste af de tre løsninger, der er beskrevet i dette papir. Her er de vigtige Java-klasser, der er implementeret til denne løsning. Download kildekoden med fuld implementering her: //github.com/redislabsdemo/IngestPubSub.

Det Abonnent klasse er kerneklassen i dette design. Hver Abonnent objekt opretholder en ny forbindelse med Redis.

klasse Abonnent udvider JedisPubSub implementerer Runnable {

privat strengnavn;

privat RedisConnection conn = null;

private Jedis jedis = null;

private String subscriberChannel;

offentlig abonnent (String subscriberName, String channelName) kaster undtagelse {

navn = abonnentnavn;

subscriberChannel = kanalnavn;

Tråd t = ny tråd (denne);

t.start ();

       }

@Override

offentlig ugyldig kørsel () {

prøve{

conn = RedisConnection.getRedisConnection ();

jedis = konn.getJedis ();

mens (sandt) {

jedis.subscribe (dette, this.subscriberChannel);

                      }

} fangst (undtagelse e) {

e.printStackTrace ();

              }

       }

@Override

offentligt ugyldigt onMessage (strengkanal, strengmeddelelse) {

super.onMessage (kanal, besked);

       }

}

Det Forlægger klasse opretholder en separat forbindelse til Redis til offentliggørelse af meddelelser til en kanal.

public class Publisher {

RedisConnection connect = null;

Jedis jedis = null;

privat String kanal;

offentlig udgiver (String channelName) kaster undtagelse {

kanal = kanalnavn;

conn = RedisConnection.getRedisConnection ();

jedis = konn.getJedis ();

       }

public void publish (String msg) kaster Undtagelse {

jedis.publish (kanal, msg);

       }

}

Det EngelskTweetFilter, InfluencerTweetFilter, HashTagCollectorog InfluencerCollector filtre udvides Abonnent, som gør det muligt for dem at lytte til de indgående kanaler. Da du har brug for separate Redis-forbindelser for at abonnere og udgive, har hver filterklasse sin egen RedisConnection objekt. Filtre lytter til de nye meddelelser i deres kanaler i en løkke. Her er prøvekoden til EngelskTweetFilter klasse:

public class EnglishTweetFilter udvider abonnenten

{

privat RedisConnection connect = null;

private Jedis jedis = null;

private String publisherChannel = null;

public EnglishTweetFilter (String name, String subscriberChannel, String publisherChannel) throw Exception {

super (navn, subscriberChannel);

this.publisherChannel = publisherChannel;

conn = RedisConnection.getRedisConnection ();

jedis = conn.getJedis ();

       }

@Override

public void onMessage (String subscriberChannel, String message) {

JsonParser jsonParser = ny JsonParser ();

JsonElement jsonElement = jsonParser.parse (meddelelse);

JsonObject jsonObject = jsonElement.getAsJsonObject ();

// filterbeskeder: offentliggør kun engelske tweets

hvis (jsonObject.get (“lang”)! = null &&

jsonObject.get (“lang”). getAsString (). er lig med (“da”)) {

jedis.publish (publisherChannel, besked);

              }

       }

}

Det Forlægger klasse har en publiceringsmetode, der offentliggør meddelelser til den krævede kanal.

public class Publisher {

.

.     

public void publish (String msg) kaster Undtagelse {

jedis.publish (kanal, msg);

       }

.

}

Hovedklassen læser data fra indtastningsstrømmen og sender dem til AllData kanal. Hovedmetoden i denne klasse starter alle filterobjekterne.

offentlig klasse IngestPubSub

{

.

offentlig ugyldig start () kaster undtagelse {

       .

       .

udgiver = ny udgiver (“AllData”);

englishFilter = nyt engelskTweetFilter (“Engelsk filter”, ”AllData”,

"EnglishTweets");

influencerFilter = ny InfluencerTweetFilter (“Influencer Filter”,

“AllData”, “InfluencerTweets”);

hashtagCollector = ny HashTagCollector (“Hashtag Collector”,

"EnglishTweets");

influencerCollector = ny InfluencerCollector (“Influencer Collector”,

“InfluencerTweets”);

       .

       .

}

Indtag med Redis-lister

List-datastrukturen i Redis gør implementeringen af ​​en køløsning let og ligetil. I denne løsning skubber producenten hver besked bag på køen, og abonnenten afstemmer køen og trækker nye beskeder fra den anden ende.

Redis Labs

Fordele

  • Denne metode er pålidelig i tilfælde af forbindelsestab. Når data er skubbet ind på listerne, bevares de der, indtil abonnenterne læser dem. Dette gælder, selvom abonnenterne stoppes eller mister deres forbindelse til Redis-serveren.
  • Producenter og forbrugere kræver ingen forbindelse mellem dem.

Ulemper

  • Når data er trukket fra listen, fjernes de og kan ikke hentes igen. Medmindre forbrugerne vedvarer dataene, går de tabt, så snart de forbruges.
  • Hver forbruger har brug for en separat kø, som kræver lagring af flere kopier af dataene.

Kodedesign til Redis Lists-løsningen

Redis Labs

Du kan downloade kildekoden til Redis Lists-løsningen her: //github.com/redislabsdemo/IngestList. Nedenfor forklares denne løsnings hovedklasser.

Beskedliste indlejrer Redis List-datastrukturen. Det skubbe() metode skubber den nye besked til venstre for køen, og pop () venter på en ny besked fra højre, hvis køen er tom.

offentlig klasse MessageList {

beskyttet String name = “MyList”; // Navn

.

.     

public void push (String msg) kaster undtagelse {

jedis.lpush (navn, msg); // Venstre skub

       }

offentlig String pop () kaster undtagelse {

returner jedis.brpop (0, navn) .toString ();

       }

.

.

}

MessageListener er en abstrakt klasse, der implementerer lytter- og forlagslogik. EN MessageListener objekt lytter kun til en liste, men kan offentliggøre til flere kanaler (MessageFilter genstande). Denne løsning kræver en separat MessageFilter objekt for hver abonnent nede i røret.

klasse MessageListener implementerer Runnable {

privat strengnavn = null;

privat MessageList inboundList = null;

Kort outBoundMsgFilters = nyt HashMap ();

.

.     

offentligt ugyldigt registerOutBoundMessageList (MessageFilter msgFilter) {

hvis (msgFilter! = null) {

hvis (outBoundMsgFilters.get (msgFilter.name) == null) {

outBoundMsgFilters.put (msgFilter.name, msgFilter);

                      }

              }

       }

.

.

@Override

offentlig ugyldig kørsel () {

.

mens (sandt) {

String msg = inboundList.pop ();

processMessage (msg);

                      }                                  

.

       }

.

beskyttet ugyldigt pushMessage (String msg) kaster undtagelse {

Sæt outBoundMsgNames = outBoundMsgFilters.keySet ();

for (String name: outBoundMsgNames) {

MessageFilter msgList = outBoundMsgFilters.get (navn);

msgList.filterAndPush (msg);

              }

       }

}

MessageFilter er en overordnet klasse, der letter filterAndPush () metode. Da data strømmer gennem indtagningssystemet, filtreres det ofte eller transformeres, inden det sendes til næste trin. Klasser, der udvider MessageFilter klasse tilsidesætter filterAndPush () metode og implementere deres egen logik for at skubbe den filtrerede besked til den næste liste.

offentlig klasse MessageFilter {

MessageList messageList = null;

.

.

public void filterAndPush (String msg) kaster undtagelse {

messageList.push (msg);

       }

.

.     

}

AllTweetsListener er en prøveimplementering af en MessageListener klasse. Dette lytter til alle tweets på AllData kanal og offentliggør dataene til EngelskTweetsFilter og InfluencerFilter.

offentlig klasse AllTweetsListener udvider MessageListener {

.

.     

offentlig statisk ugyldigt hoved (String [] args) kaster undtagelse {

MessageListener allTweetsProcessor = AllTweetsListener.getInstance ();

allTweetsProcessor.registerOutBoundMessageList (ny

EnglishTweetsFilter (“EnglishTweetsFilter”, “EnglishTweets”));

allTweetsProcessor.registerOutBoundMessageList (ny

InfluencerFilter (“InfluencerFilter”, “Influencers”));

allTweetsProcessor.start ();

       }

.

.

}

EngelskTweetsFilter strækker sig MessageFilter. Denne klasse implementerer logik for kun at vælge de tweets, der er markeret som engelske tweets. Filteret kasserer ikke-engelske tweets og skubber engelske tweets til den næste liste.

offentlig klasse EnglishTweetsFilter udvider MessageFilter {

public EnglishTweetsFilter (String name, String listName) throw Exception {

super (navn, listenavn);

       }

@Override

offentligt ugyldigt filterAndPush (streng besked) kaster undtagelse {

JsonParser jsonParser = ny JsonParser ();

JsonElement jsonElement = jsonParser.parse (meddelelse);

JsonArray jsonArray = jsonElement.getAsJsonArray ();

JsonObject jsonObject = jsonArray.get (1) .getAsJsonObject ();

hvis (jsonObject.get (“lang”)! = null &&

jsonObject.get (“lang”). getAsString (). er lig med (“da”)) {

Jedis jedis = super.getJedisInstance ();

hvis (jedis! = null) {

jedis.lpush (super.name, jsonObject.toString ());

                             }

              }

       }

}