Programmering

Java - Detektion og håndtering af hængende tråd

Af Alex. C. Punnen

Arkitekt - Nokia Siemens Networks

Bangalore

Hængende tråde er en almindelig udfordring i udviklingen af ​​software, der skal interface med proprietære enheder ved hjælp af proprietære eller standardiserede grænseflader såsom SNMP, Q3 eller Telnet. Dette problem er ikke begrænset til netværksadministration, men forekommer på tværs af en bred vifte af felter som webservere, processer, der påberåber fjernprocedurkald og så videre.

En tråd, der starter en anmodning til en enhed, har brug for en mekanisme til at opdage, hvis enheden ikke reagerer eller kun reagerer delvist. I nogle tilfælde, hvor en sådan hængning opdages, skal der træffes en specifik handling. Den specifikke handling kan enten være ny prøve eller fortælle slutbrugeren om opgavefejlen eller en anden gendannelsesmulighed. I nogle tilfælde hvor et stort antal opgaver skal affyres til et stort antal netværkselementer af en komponent, er detektion af hængende tråd vigtig, så den ikke bliver en flaskehals til anden opgavebehandling. Så der er to aspekter ved håndtering af hængende tråde: ydeevne og notifikation.

Til meddelelsesaspekt vi kan skræddersy Java Observer-mønsteret, så det passer ind i den multitrådede verden.

Tilpasning af Java Observer-mønster til multitrådede systemer

På grund af hængende opgaver ved hjælp af Java TrådPool klasse med en passende strategi er den første løsning, der kommer til at tænke på. Men ved hjælp af Java TrådPool i sammenhæng med nogle tråde, der tilfældigt hænger over en periode, giver uønsket adfærd baseret på den anvendte bestemte strategi, som trådsult i tilfælde af en fast trådpoolstrategi. Dette skyldes hovedsageligt, at Java TrådPool har ikke en mekanisme til at detektere en trådhængning.

Vi kunne prøve en cachelagret trådpulje, men den har også problemer. Hvis der er en høj hastighed på opgaveafskydning, og nogle tråde hænger, kan antallet af tråde skyde op, hvilket til sidst medfører ressourceudsultning og undtagelser uden for hukommelsen. Eller vi kunne bruge en brugerdefineret TrådPool strategi, der påberåber sig en CallerRunsPolicy. I dette tilfælde kan et trådhæng også medføre, at alle tråde til sidst hænger. (Hovedtråden skal aldrig være den, der ringer op, da der er en mulighed for, at enhver opgave, der sendes til hovedtråden, kan hænge og få alt til at stoppe.)

Så hvad er løsningen? Jeg demonstrerer et ikke så simpelt ThreadPool-mønster, der justerer poolstørrelsen i henhold til opgavehastigheden og baseret på antallet af hængende tråde. Lad os først gå til problemet med at opdage hængende tråde.

Registrering af hængende tråde

Figur 1 viser en abstraktion af mønsteret:

Der er to vigtige klasser her: ThreadManager og ManagedThread. Begge strækker sig fra Java Tråd klasse. Det ThreadManager holder en container, der holder ManagedThreads. Når en ny ManagedThread er oprettet tilføjer den sig selv til denne container.

 ThreadHangTester testthread = new ThreadHangTester ("threadhangertest", 2000, false); testthread.start (); thrdManger.manage (testtråd, ThreadManager.RESTART_THREAD, 10); thrdManger.start (); 

Det ThreadManager det går igennem denne liste og kalder ManagedThread's isHung () metode. Dette er dybest set en logistik til kontrol af tidsstempel.

 hvis (System.currentTimeMillis () - lastprocessingtime.get ()> maxprocessingtime) {logger.debug ("Tråden er hængt"); returner sandt; } 

Hvis den finder ud af, at en tråd er gået i en opgaveløbe og aldrig har opdateret resultaterne, tager den en gendannelsesmekanisme som fastsat af ManageThread.

 while (isRunning) {for (Iterator iterator = managedThreads.iterator (); iterator.hasNext ();) {ManagedThreadData thrddata = (ManagedThreadData) iterator.next (); if (thrddata.getManagedThread (). isHung ()) {logger.warn ("Thread Hang detected for ThreadName =" + thrddata.getManagedThread (). getName ()); switch (thrddata.getManagedAction ()) {case RESTART_THREAD: // Handlingen her er at genstarte tråden // fjerne fra manager iterator.remove (); // stop evt. behandlingen af ​​denne tråd thrddata.getManagedThread (). stopProcessing (); if (thrddata.getManagedThread (). getClass () == ThreadHangTester.class) // At vide, hvilken type tråd der skal oprettes {ThreadHangTester newThread = new ThreadHangTester ("restarted_ThrdHangTest", 5000, true); // Opret en ny tråd newThread.start (); // tilføj det igen for at blive administreret (newThread, thrddata.getManagedAction (), thrddata.getThreadChecktime ()); } pause; ......... 

For et nyt ManagedThread skal oprettes og bruges i stedet for den ophængte, bør den ikke indeholde nogen tilstand eller nogen container. Til dette beholderen, hvorpå ManagedThread handlinger bør adskilles. Her bruger vi det ENUM-baserede Singleton-mønster til at holde opgavelisten. Så beholderen med opgaverne er uafhængig af tråden, der behandler opgaverne. Klik på følgende link for at downloade kilden til det beskrevne mønster: Java Thread Manager Source.

Hængende tråde og Java ThreadPool-strategier

Java TrådPool har ikke en mekanisme til at detektere hængende tråde. Brug af en strategi som fast threadpool (Executors.newFixedThreadPool ()) fungerer ikke, for hvis nogle opgaver hænger over tid, vil alle tråde i sidste ende være i hængt tilstand. En anden mulighed er at bruge en cache-ThreadPool-politik (Executors.newCachedThreadPool ()). Dette kan sikre, at der altid vil være tråde tilgængelige til behandling af en opgave, kun begrænset af VM-hukommelse, CPU og trådgrænser. Men med denne politik er der ingen kontrol over antallet af tråde, der oprettes. Uanset om en bearbejdningstråd hænger eller ej, fører brugen af ​​denne politik, mens task rate er høj, til et stort antal tråde, der oprettes. Hvis du ikke har nok ressourcer til JVM meget snart, rammer du den maksimale hukommelsestærskel eller høje CPU. Det er ret almindeligt at se antallet af tråde ramt hundreder eller tusinder. Selvom de frigives, når opgaven er behandlet, vil det store antal tråde undertiden overvælde systemressourcerne under burst-håndtering.

En tredje mulighed er at bruge tilpassede strategier eller politikker. En sådan mulighed er at have en trådpulje, der skalerer fra 0 til noget maksimalt antal. Så selvom en tråd hang, ville der blive oprettet en ny tråd, så længe det maksimale trådantal blev nået:

 execexec = ny ThreadPoolExecutor (0, 3, 60, TimeUnit.SECONDS, ny SynchronousQueue ()); 

Her 3 er det maksimale trådantal, og holdetid er indstillet til 60 sekunder, da dette er en opgaveintensiv proces. Hvis vi giver et højt nok maksimalt trådantal, er dette mere eller mindre en rimelig politik at bruge i forbindelse med hængende opgaver. Det eneste problem er, at hvis de hængende tråde ikke frigøres til sidst, er der en lille chance for, at alle tråde på et tidspunkt kan hænge. Hvis de maksimale tråde er tilstrækkeligt høje, og forudsat at en opgave hænger er et sjældent fænomen, ville denne politik passe regningen.

Det ville have været sødt, hvis TrådPool havde også en pluggbar mekanisme til at detektere hængende tråde. Jeg vil diskutere et sådant design senere. Selvfølgelig, hvis alle tråde er fryset op, kan du konfigurere og bruge den afviste opgavepolitik i trådpuljen. Hvis du ikke vil kassere de opgaver, skal du bruge CallerRunsPolicy:

 execexec = ny ThreadPoolExecutor (0, 20, 20, TimeUnit.MILLISECONDS, ny SynchronousQueue () ny ThreadPoolExecutor.CallerRunsPolicy ()); 

I dette tilfælde, hvis en trådhæng forårsagede, at en opgave blev afvist, ville denne opgave blive givet til den opkaldende tråd, der skulle håndteres. Der er altid en chance for, at den opgave også hænger. I dette tilfælde vil hele processen fryse. Så det er bedre ikke at tilføje en sådan politik i denne sammenhæng.

 offentlig klasse NotificationProcessor implementerer Runnable {private final NotificationOriginator notificationOrginator; boolsk isRunning = sand; privat endelig ExecutorService execexec; AlarmNotificationProcessor (NotificationOriginator norginator) {// ctor // execexec = Executors.newCachedThreadPool (); // For mange tråde // execexec = Executors.newFixedThreadPool (2); //, ingen hængende opdagelser execexec = ny ThreadPoolExecutor (0, 4 , 250, TimeUnit.MILLISECONDS, ny SynchronousQueue (), ny ThreadPoolExecutor.CallerRunsPolicy ()); } public void run () {while (isRunning) {prøv {final Task task = TaskQueue.INSTANCE.getTask (); Runnable thisTrap = ny Runnable () {public void run () {++ alarmid; notificaionOrginator.notify (ny OctetString (), // Opgavebehandling nbialarmnew.getOID (), nbialarmnew.createVariableBindingPayload ()); É ........}}; execexec.execute (thisTrap); } 

En brugerdefineret trådpool med hangdetektion

Et trådbassinbibliotek med mulighed for detektion og håndtering af opgavehæng ville være dejligt at have. Jeg har udviklet en, og jeg vil demonstrere den nedenfor. Dette er faktisk en port fra en C ++ - trådpulje, som jeg designet og brugte noget tid tilbage (se referencer). Dybest set bruger denne løsning kommandomønsteret og kæden for ansvarlighed. Imidlertid er det lidt svært at implementere kommandomønsteret i Java uden hjælp fra funktionsobjekt. Til dette måtte jeg ændre implementeringen lidt for at bruge Java-refleksion. Bemærk, at den sammenhæng, hvori dette mønster blev designet, var, hvor en trådpulje skulle monteres / tilsluttes uden at ændre nogen af ​​de eksisterende klasser. (Jeg tror, ​​at den ene store fordel ved objektorienteret programmering er, at den giver os en måde at designe klasser for at gøre effektiv brug af det åbne lukkede princip. Dette gælder især for komplekse gamle arvskoder og kan være af mindre relevans for ny produktudvikling.) Derfor brugte jeg refleksion i stedet for at bruge en grænseflade til at implementere kommandomønsteret. Resten af ​​koden kunne overføres uden større ændringer, da næsten alle trådsynkroniserings- og signalprimitiver er tilgængelige i Java 1.5 og fremefter.

 public class Command {private Object [] argParameter; ........ // Ctor til en metode med to args Kommando (T pObj, String methodName, lang timeout, String key, int arg1, int arg2) {m_objptr = pObj; m_metodnavn = mthodName; m_timeout = timeout; m_key = nøgle; argParameter = nyt objekt [2]; argParameter [0] = arg1; argParameter [1] = arg2; } // Kalder metoden til objektet ugyldig udfør () {Class klass = m_objptr.getClass (); Klasse [] paramTypes = ny klasse [] {int.class, int.class}; prøv {Method methodName = klass.getMethod (m_methodName, paramTypes); //System.out.println("Fund metoden -> "+ metodenavn); hvis (argParameter.length == 2) {methodName.invoke (m_objptr, (Object) argParameter [0], (Object) argParameter [1]); } 

Eksempel på brug af dette mønster:

 offentlig klasse CTask {.. offentlig int DoSomething (int a, int b) {...}} 

Kommando cmd4 = ny kommando (task4, "DoMultiplication", 1, "key2", 2,5);

Nu har vi to vigtige klasser her. Den ene er den Trådkæde klasse, der implementerer Chain of Responsibility-mønsteret:

 offentlig klasse ThreadChain implementerer Runnable {public ThreadChain (ThreadChain p, ThreadPool pool, String name) {AddRef (); deleteMe = falsk; optaget = falsk; // -> meget vigtigt næste = p; // indstil trådkæden - bemærk dette er som en linket liste impl threadpool = pool; // sæt trådpuljen - rod af trådpuljen ........ threadId = ++ ThreadId; ...... // start tråden thisThread = ny tråd (dette, navn + inttid.toString ()); thisThread.start (); } 

Denne klasse har to hovedmetoder. Den ene er boolsk CanHandle () som er indledt af TrådPool klasse og fortsætter derefter rekursivt. Dette kontrollerer, om den aktuelle tråd (nuværende Trådkæde eksempel) er fri til at håndtere opgaven. Hvis den allerede håndterer en opgave, kalder den den næste i kæden.

 public Boolean canHandle () {if (! busy) {// Hvis ikke optaget System.out.println ("Kan håndtere denne begivenhed i id =" + threadId); // todo signal en begivenhed prøv {condLock.lock (); condWait.signal (); // Signaler HandleRequest, som venter på dette i køremetoden .................................... ..... returner sandt; } ......................................... /// Se om den næste objekt i kæden er gratis /// til at håndtere anmodningen returnere next.canHandle (); 

Bemærk, at HandleRequest er en metode til Trådkæde der påberåbes fra den Trådkørsel () metode og afventer signalet fra kanHåndtere metode. Bemærk også, hvordan opgaven håndteres via kommandomønsteret.