2012. március 13., kedd

Szálak kordában tartása

Előfordul, hogy olyan programot írunk, amely előbb-utóbb 20-30, esetleg 50-100 szálat is indít, ami nem biztos, hogy egészséges. Szükségünk lehet arra, hogy limitáljuk az egy időben futó szálak számát. Ez azonban nem egy triviális feladat.

Első gondolat: statikus counter (a saját szál osztályunkban). Szinkronizált get, inc, dec, és várakoztatás indítás előtt, egy üres while ciklussal. Néhány szálnál ez látszólag működőképes megoldás (ha jól meg van írva). Azonban ez így önmagában nem elég. Elképzelhető, hogy két szál "egyszerre" lép ki a while ciklusból (vagyis "A" szál érzékeli, hogy csökkent a számláló, azaz kiléphet, de nem tudja olyan gyorsan visszanövelni a számlálót, hogy "B" szál ne tudjon szintén kilépni a várakozásból). Ebben az esetben máris limit+1 darab szálunk fut. Minél több szálunk van, annál többen várakoznak, annál nagyobb az esély arra, hogy "egyszerre" indulnak el a várakozás után, és annál több szál indulhat el, vagyis annál jobban meghaladja a limitet a szálak száma. Más megoldás kell: ütemező.

Ma sikerült saját magamtól összeraknom egy ilyet, és működik, nem merészkedik a limit fölé a counter. Igaz: nincs is counter! :-)

Alapötlet: tároljuk a futtatandó, ütemezendő szálakat egy listában, és legyen egy szál, ami menedzsel, ütemez. Folyamatosan figyeli a futó szálakat, és ha valamelyik leállt, akkor törli a listából, és indít egy újat (a legelső még nem futó szálat). Ennek értelmében a futó szálak egymás melletti listaelemek lesznek, mégpedig a lista elején, így elég csak a [0..(limit-1)] intervallumot figyelni. Ha egy szál leállt, töröljük a listából, ekkor a (limit-1)-es indexű listaelem lesz az a szál, amit el kell indítani (persze, ha van egyáltalán annyi elem).

Ezzel a megoldással teljes mértékben felügyeljük a szálak indítását, hiszen csak az ütemező maga indítja azokat. Mivel ciklusba van szervezve a vizsgálat, a szálakat, mint listaelemeket egymás után nézi meg, így konkurencia sincs, nem fordulhat elő, hogy "egyszerre" két szál is elinduljon. :-)

Ezen felül, ha a sok szálat elindító főszálnak szüksége van arra, hogy megvárja, amíg a szálai befejeződnek, elég csak az ütemező szálat figyelni, hogy fut-e még. Persze ez implementáció kérdése, de az én megvalósításom így működik.
import java.util.ArrayList;
import java.util.List;

public class ThreadScheduler extends Thread {

    private int limit = 3;
    private List<Thread> threads = new ArrayList<Thread>();

    public ThreadScheduler() {
        setName("ThreadScheduler");
    }

    public synchronized void add(Thread t) {
        threads.add(t);
    }

    private synchronized boolean manage() {
        for (int i = 0; i < limit && i < threads.size(); i++) {
            if (!threads.get(i).isAlive()) {
                threads.remove(i);
                if (threads.size() >= limit) {
                    threads.get(limit - 1).start();
                }
                break;
            }
        }
        return !threads.isEmpty();
    }

    @Override
    public void run() {
        for (int i = 0; i < limit && i < threads.size(); i++) {
            threads.get(i).start();
        }
        while (manage());
    }
}

A run() metódus először elindítja az első szálakat, a [0..(limit-1)] intervallumban, hogy a menedzselés ne törölje őket.

Használata pedig:
ThreadScheduler sched = new ThreadScheduler();

// szálak hozzáadása
sched.add(t);

// ütemezés indítása
sched.start();

// megvárhatjuk, mikor futott le mindegyik szál
while (sched.isAlive());
System.out.println("The End");

Egy lehetséges kiegészítés lehet még a megszakíthatóság. Ez esetben a run() metódus while ciklusának feltételéhez hozzá kell venni az isInterrupted() vizsgálatát, illetve a ciklus után egy elágazásban: ha megszakítás volt, akkor küldjön megszakítást a futó szálaknak. (És persze megvárni a még futó szálak befejeződését, végül pedig esetleg üríteni a listát.)