Többszálúság Javában

Kategória: Java standard könyvtárak.

Áttekintés

Ha lehetséges lenne statisztikát készíteni arról, hogy a valaha előfordult programozási hiba mire vezethető vissza, valószínűleg toronymagasan a többszálúság nyerne. Arról van itt szó, hogy egyszerre több folyamat fut párhuzamosan. Például képzeljük el azt, hogy egy webes program várja a felhasználói kéréseket, és a kérések feldolgozása időigényes. Ha úgy valósítanánk meg a programot, hogy amint beérkezik a kérés, rögtön feldolgozzuk adott szálon belül, akkor a feldolgozási idő alatt a többi kérést még fogadni sem tudjuk. Ehelyett tipikus megoldás az, hogy az egyik szál feladata az, hogy fogadja a beérkezett kéréseket és azonnal továbbítsa egy másik szálnak, és utána egyből fogadhatja a következőt, a szálak a háttérben pedig feldolgozzák a kérést és ha kész vannak, visszatérnek az eredménnyel.

Kezdetben egyébként a többszálúság virtuális volt: a valóságban az operációs rendszer szimulálta ezt oly módon, hogy rövid ideig az egyik szál futott, utána a másik, ténylegesen fizikailag tehát egyszerre egy szál futott, ma viszont a többmagos processzorok világában lehetne ténylegesen párhuzamosan futó szálak is. Programozási szempontból egyébként ennek nincs túl nagy jelentősége.

A többszálúság rendkívül sok olyan problémát vet fel, amire elsőre nem is gondolunk. Klasszikus példa az, hogy két szál ugyanazt az erőforrást szeretné használni. Ha mindegyik csak olvasni szeretné, az még jól megoldható, de ha többen szeretnék írni is és olvasni is, akkor az számos problémát felvet. Vegyünk például egy, a valóságtól nem is túl elrugaszkodott esetet: az egyik szál egy tranzakció belül módosítja az adatot, majd kiderül, hogy a tranzakció nem sikerül, és visszacsinálja. A másik szál viszont a két lépés között olvass ki az ideiglenesen módosított adatot.

Erre a problémára már rá lehet vágni a választ: ha valaki írja az adatot, addig a többiek ne férjenek hozzá. Persze azon túl, hogy ennek következtében belassul a program, mert sok lehet a holtidő (amikor sok szál vár egyre), újabb nem várt problémák merülnek fel: pl. az, hogy A erőforrás vár B-re, és B vár A-ra. Ezenkívül ott van az időzítés, annak minden nyűgével.

Ebben a fejezetben megnézzük, hogy hogyan lehet Java-ban szálakat létrehozni, futtatni, és alapvető műveleteket végrehajtani. A téma teljes mélységű elemzése meghaladja az erre a fejezetre szánt kereteket.

A Thread osztály

A Java nyelvben a Thread osztály segítségével tudunk szálakat létrehozni. Egyik módszer az, hogy ebből az osztályból származtatjuk az osztályunkat, és megvalósítjuk a run() metódust. Indítani példányosítás után a start() függvény meghívásával lehet. A szálat a Thread.sleep() eljárással tudjuk adott ideig megállítani, de sajnos itt kezelnünk kell a rendkívül ritkán bekövetkező InterruptedException kivételt. Lássunk egy példát!

class MyThread extends Thread {
    private int n;
 
    public MyThread(int n) {
        this.n = n;
    }
 
    @Override
    public void run() {
        System.out.println("Thread " + n + " started.");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Thread " + n +  " stopped.");
    }
}
 
public class ThreadExample {
    public static void main(String[] args) {
        Thread t1 = new MyThread(1);
        Thread t2 = new MyThread(2);
        Thread t3 = new MyThread(3);
        t1.start();
        t2.start();
        t3.start();
    }
}

A lefutás sorrendje nem garantált, pl. egy konkrét futás a következőképpen néz ki:

Thread 3 started.
Thread 1 started.
Thread 2 started.
Thread 2 stopped.
Thread 3 stopped.
Thread 1 stopped.

Kicsit formálisabban, egy szálnak a Java-ban ötféle állapota lehet:

  • New (új): ebbe kerül, amikor létrehozzuk a Thread példányt. Innen a start() hatására Runnable állapotba kerül, ill. Dead állapotba is kerülhet.
  • Runnable (futható): ez a start() hívás és a tényleges indulás közti rövid állapot. A run() függvényt a Java virtuális gép hívja meg a háttérben, aminek hatására Running állapotba kerül a szál.
  • Running (futó): futás során ebbe kerül. Innen Wait állapotba kerülhet a sleep() (ezt már láttuk) vagy a wait() (ezt majd látni fogjuk) hatására, ill. ha befejeződtt, akkor a Dead állapotba.
  • Waiting (várakozó): amikor ideiglenesen nem fut a szál, pl. mert vár valamire. Normál esetben visszakerül Running állapotba, de elvben egyből Dead állapotba is kerülhet.
  • Dead (halott): miután befejeződött a szál; ezt újraindítani nem lehet.

Egy szálnak van prioritása, ami egy egész szám 1 (legkisebb) és 10 (legnagyobb) között. Alapértelmezésben ez 5, de a setPriority() eljárással ezt át tudjuk állítani.

A Thread osztálynak számos egyéb metódusa van, melyekkel idővel érdemes megismerkedni.

A Runnable interfész

Mivel a Java csak egy ősosztályból való származtatást engedélyez, a fenti módszerrel nem tudnánk egyszerre örökölni is saját osztályból és futtathatóvá tenni. E problémát a következőképpen tudjuk megkerülni: az osztály nem a Thread osztályból származik, hanem a Runnable interfészt valósítja meg. Ugyanúgy a run() metódust kell megírni, az indításhoz viszont a saját osztályunk példányosítása mellett mellett egy Thread példányt is létre kell hoznunk, melynek átadjuk a saját példányunkat paraméterként. A következő példa a fentivel ekvivalens:

class MyRunnable implements Runnable {
    private int n;
 
    public MyRunnable(int n) {
        this.n = n;
    }
 
    public void run() {
        System.out.println("Thread " + n + " started.");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Thread " + n +  " stopped.");
    }
}
 
public class ThreadExample {
    public static void main(String[] args) {
        Thread t1 = new Thread(new MyRunnable(1));
        Thread t2 = new Thread(new MyRunnable(2));
        Thread t3 = new Thread(new MyRunnable(3));
 
        t1.start();
        t2.start();
        t3.start();
    }
}

A syncronized kulcsszó

Ezt követően a szakasz végéig szinkronizálásról lesz szó: hogyan kezeljük azokat a helyzeteket, amikor a szálaknak egymásra kell várniuk? Vegyünk a való életből is egy példát, amivel talán egyszerűbben meg tudjuk érteni az absztrakt problémát! Tegyük fel, hogy van egy orvos mint erőforrás, és betegek mint szálak. Mindenki előbb-utóbb be szeretne menni az orvoshoz, de az orvos egyszerre csak egy beteget tud fogadni.

Vegyük a következő, immár absztrakt problémát! Van egy MyCounter osztályunk, ami - ahogy azt a neve is sugallja - számol, mégpedig a következőképpen: a létrehozásakor 0-ra állítjuk a számlálót, majd egy increment() függvényhívással azt növeljük. A növelés, szándékosan kissé sután, a következőképpen történik: egy ideiglenes változóba tesszük az aktuális számláló értéke + 1-et, majd a számláló felveszi az ideiglenes változó értékét. Egyébként az n++ művelet a háttérben pont így működik, azaz ez nem atomi művelet. Viszont annak érdekében, hogy a fordítónak se legyen lehetősége ezt "kioptimalizálni", a két művelet közé tegyünk egy rövid szünetet.

Most ha több szál ezt az eljárást párhuzamosan hívhatja, akkor a következő problémába futunk:

  • Kezdetben a számláló értéke 0.
  • Az egyik szál meghívja a növelést, melynek hatására az ideiglenes változó értéke 1 lesz, és leáll a futás egy időre.
  • Közben elindul egy másik szál, a számláló még mindig 0, az abban szereplő, más memóriahelyen levő ideiglenes változó felveszi az 1 értéket.
  • Az első szál befejeződik, belehelyezi az ideiglenes változó értékét, azaz az 1-et a számláló értékébe.
  • A második szál is befejeződik, és az is behelyezi az ő ideiglenes változó értékét, azaz szintén az 1-et, a számláló értékébe.

Végeredményben tehát a számláló aktuális értéke 1 lesz, noha 2-nek kellene lennie. A problémát az okozta, hogy a növelés eljárást párhuzamosan hívta két szál, melyet nem lenne szabad megengedni. Ha azt szeretnénk, hogy jól működjön, ezt meg kellene tiltani, magyarán a második szálnak meg kellene várnia, míg az első szál által hívott eljárás befejeződik. A Java-ban ezt a synchronized kulcsszóval tudjuk megtenni, a következőképpen:

class MyCounter {
    private int n = 0;
 
    public synchronized void increment() {
        int temp = n + 1;
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        n = temp;
    }
 
    public int get() {
        return n;
    }
}
 
class MyThread extends Thread {
    private MyCounter counter;
 
    public MyThread(MyCounter counter) {
        this.counter = counter;
    }
 
    @Override
    public void run() {
        counter.increment();
    }
 
}
 
public class SynchronizedExample {
 
    public static void main(String[] args) {
        MyCounter counter = new MyCounter();
 
        Thread t1 = new MyThread(counter);
        Thread t2 = new MyThread(counter);
        Thread t3 = new MyThread(counter);
 
        t1.start();
        t2.start();
        t3.start();
 
        try {
            t1.join();
            t2.join();
            t3.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
 
        System.out.println("Final result: " + counter.get());
    }
 
}

Az orvosos példára visszatérve, a sychronized biztosítja azt, hogy egyszerre legfeljebb egy beteg lehet bent az orvosnál. Ha valaki kijön, akkor utána valaki bemegy, de a sorrend nem garantált.

Itt láthatunk egy példát a join() eljárásra is, ami megvárja, hogy a szál, amin meghívtuk, befejezze a futását, az aktuális szál csak akkor folytatódik. A példában megvárjuk, hogy a 3 szál befejeződjön, és utána írjuk ki a számláló értékét. Az orvosos példánál ez lehet mondjuk a portás, aki megvárja, míg az utolsó beteg is távozik.

Futtassuk le a fenti programot kétféleképpen! Az egyik legyen az, ahogy most kinéz, a másik pedig a synchronized nélkül! Az első esetben az eredmény mindig 3 lesz, a második esetben pedig majdnem biztosan 1. (Hogy miért csak majdnem biztosan, annak a következő az oka: semmi garancia sincs a szálak indítási sorrendjére, azok futására; még a prioritás is csak ajánlás. ELméletben előfordulhat az, hogy ténylegesen olyan lassan megy át Runnable státuszból Running-ba, hogy közben a korábban indított szál már be is fejeződött. Ez esetben 2, vagy akár 3 is lehet a végeredmény.)

A synchronized(this) és társai

Az orvosos példát folytatva tegyük fel, hogy a művelet több részből áll: vetkőzés, vizsgálat, öltözés. Tegyük fel továbbá, hogy van külön női és férfi öltöző, valamint a vizsgálat nettó hossza jelentősen rövidebb a bruttó hossznál (tehát ha beleértjük a vetkőzés és az öltözést is). Adja magát a megoldás: egyszerre több beteget behívnak vetkőzésre, kizárólagosságra csak a vizsgálat ideje alatt van szükség, és utána többen is öltözhetnek egyszerre. Ez a rész pont erről szól!

Írjuk át egy picit az increment() eljárást a következőképpen:

    public synchronized void increment() {
        try {
            System.out.println("increment()");
            Thread.sleep(1000);
            int temp = n + 1;
            Thread.sleep(10);
            n = temp;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

Ezzel azt szimuláljuk, hogy van egy viszonylag hosszú művelet, ami megelőzi a kritikus részt, de ez a szakasz mehetne párhuzamosan. A fenti megoldásban viszont lassú lesz a lefutás, 3 másodperc, mert mindegyik esetben meg kell várni az előző szál 1 másodperces lefutását. Igazából elég lenne csak pár sort szinkronizálttá tenni. Most eltekintve attól, hogy ebben a konkrét esetben át lehetne szervezni a kódot, a megoldás a következő:

    public void increment() {
        try {
            System.out.println("increment()");
            Thread.sleep(1000);
            synchronized (this) {
                int temp = n + 1;
                Thread.sleep(10);
                n = temp;
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

Tehát a teljes eljárás nem lesz szinkronizált, csak 3 utasítás. Ennek viszont egy picit más a szintaxisa, megjelent paraméterként a this. Ez egy referencia, ami az adott példányra hivatkozik. Valójában itt tetszőleges objektumot átadhatunk paraméterül, és azt veszi figyelembe a szinkronizálás során. Ha kettő vagy több eljáráson belül hivatkozunk ugyanarra az objektumra, tehát pl. lenne egy decrement() eljárás, ami hasonló módon csökkenti a számláló értékét, és az is synchronized(this) lenne, akkor amíg az increment() fut, a decrement() nem indulhatna el, és fordítva. Akár több osztályból is hivatkozhatunk ugyanarra az objektumra, és ez esetben az egyik "megakasztja" a másikat. Az orvosos példában elég nyakatekerten ezt úgy tudjuk elképzelni, mintha ugyanabban a szobában egyszerre két orvos is vizsgálna, és egyszerre csak egy beteg tartózkodhatna bent, függetlenül attől, hogy melyik orvoshoz jött. (Tehát nem véletlenül nem egy nagy hodályban rendelnek az orvosok, hanem külön szobákban…)

Ugyanakkor, ha két synchronized különböző objektumot kap paraméterül, akkor az egyik lefutása nem befolyásolja a másikat. Tehát ha van két orvos és két rendelő, akkor az, hogy valaki bent van az egyik rendelőben, nem akadályozza meg azt, hogy más valaki belépje egy másikba.

A következő példa a szintaxist illusztrálja:

public class MyCounter {
    private int m = 0;
    private int n = 0;
    private Object objectM = new Object();
    private Object objectN = new Object();
 
    public void incrementM() {
        synchronized (objectM) {m++;}
    }
 
    public void decrementM() {
        synchronized (objectM) {m--;}
    }
 
    public int getM() {
        return m;
    }
 
    public void incrementN() {
        synchronized (objectN) {n++;}
    }
 
    public void decrementN() {
        synchronized (objectN) {n--;}
    }
 
    public int getN() {
        return n;
    }
}

Az m növelése ill. csökkentése egyszerre egy példányban futhat (tehát ha pl. fut a növelés, akkor nem futhat a csökkenés), és ugyanez igaz az n-re is, de pl. az n növelése és az m csökkentése futhat párhuzamosan.

A wait-notify mechanizmus

Az orvosos példában tegyük fel, hogy a következőképpen zajlik a dolog. Ha megérkezik egy páciens, akkor vár. Amikor kijön az orvostól a beteg, akkor azt kell mondania, hogy "jöhet a következő", és a legfürgébb beteg besurran (nem feltétlenül az, aki legrégebb óta várakozik). Erről szól a wait-notify mechanizmus! Egy másik megoldás: a betegek sorban állnak, a nővér időnként kinéz, és ekkor mindenki beadja a TAJ kártáyáját. Ez utóbbi a wait-notifyAll.

Ha az egyik szálnak meg kell várnia azt, hogy egy másik szál egy bizonyos pontig eljusson, akkor tudjuk használni a wait-notify mechanizmust. Tehát itt nem arról van szó, hogy az egyik szál megvárja, míg a másik befejezi a futást, hanem utána mindkettő fut tovább, de valami miatt (pl. egy részeredmény kiszámolásáig) az egyiknek meg kell várnia a másikat.

Lássunk erre egy példát! A példában két szál fut: az egyik a várakozó (Waiter), a másik a küldő Notifier. A küldő egy másodperces várakozást követően küldi az üzenetet:

package basics.thread;
 
class Message {
    private String message;
 
    public String getMessage() {
        return message;
    }
 
    public void setMessage(String message) {
        this.message = message;
    }
}
 
class Waiter extends Thread {
    private Message message;
 
    public Waiter(Message message) {
        this.message = message;
    }
 
    @Override
    public void run() {
        System.out.println("[Waiter] thread started.");
        try {
            synchronized(message) {
                System.out.println("[Waiter] waiting for message.");
                message.wait();
                System.out.println("[Waiter] message received: " + message.getMessage());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("[Waiter] thread ended.");
    }
}
 
class Notifier extends Thread {
    private Message message;
 
    public Notifier(Message message) {
        this.message = message;
    }
 
    @Override
    public void run() {
        System.out.println("[Notifier] thread started.");
        synchronized(message) {
            System.out.println("[Notifier]  preparing for notify.");
            message.setMessage("my message");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            message.notifyAll();
            System.out.println("[Notifier] notify called.");
        }
        System.out.println("[Notifier] thread ended.");
    }
}
 
public class WaitNotifyExample {
    public static void main(String[] args) throws InterruptedException {
        Message message = new Message();
        Waiter waiter = new Waiter(message);
        Notifier notifier = new Notifier(message);
        waiter.start();
        notifier.start();
        waiter.join();
        notifier.join();
    }
}

Figyeljük meg a következőket:

  • Úgy a várakozónak mind a küldőnek ugyanarra az objektumra kell szinkronizálnia (ld. synchronized(message)).
  • A küldő kódjában van egy ilyen sor: message.notifyAll(), valójában ez az értesítés küldése. Ez kétféle lehet: vagy notify(), amikor pontosan egy várakozó lesz értesítve (ha több Waiter példány lenne, akkor közülük az egyik befejezné a futást, a többi várakozó állapotban maradna), ill. notifyAll(), amikor mindegyik várakozó folytatja a futást.

Lock

Az eddig említett megoldások viszonylag egyszerű szinkronizációs műveletek végrehajtására alkalmasak: egyik vár a másikra. Olyan finomhangolásokra viszont nem alkalmas, mint pl.:

  • A notify során garantáltan a legrégebb óta várakozó szál kapja meg a vezérlést. (Egy orvosnál ez elvárt.)
  • Nézzük meg, hogy a megakadna-e a futás, és ha nem, akkor akkor foglaljuk le az erőforrást, egyébként csináljunk valami mást. Tehát van a szálnak egy kritikus szakasza, de mindegy, mikor hajtja végre, feleslegesen nem szeretne megállni. (Pl. egy csomó mindent el kell intézni, az orvoshoz is el kell menni, de ebben a pillanatban annyira nem fontos, hogy csak akkor megyünk oda, ha nincs ott más. Tehát megnézzük, hogy van-e ott valaki; ha nincs, bemegyünk, ha van, akkor elintézünk mást, és megyünk később. Ill. nézzük me, hogy hányan állnak sorban. Vagy azt nézzük meg, hogy hányan állnak hosszú vizsgálatért sorban, és hányan jöttek csak receptért.)
  • Álljunk be a sorba egy erőforrásért, de ha nem kapjuk meg adott időn belül, akkor lépjünk tovább. (Pl. max. 10 percig állunk sorban az orvosnál, és ha nem szólítottak, akkor hazamegyünk.)
  • A szinkronizálás legyen eltérő adott erőforrás olvasása és írása során.
  • Kívülről meg lehessen szakítani egy szál várakozását abban az esetben is, ha nem kapja meg az erőforrást. (Az orvos kiszól a várakozó betegeknek, hogy menjenek haza, ma már nem kerülnek sorra.)

A példákat lehetne sorolni. Erre alkották meg a Java nyelvben az 1.5-ös verziótól kezdve a Lock osztályhierarchiát (a Lock önmaga egy interfész), melyben számos megvalósítás készült. A két legfontosabb eljárása a lock() és az unlock(), mellyel az erőforrást legyen lefoglalni ill. elengedni, de számos más eljárást is definiáltak.

Először lássunk egy példát, ami a fentiekkel együtt lehet futtathatóvá tenni, majd megnézzük a különböző megvalósításokban rejlő lehetőségeket!

import java.util.concurrent.locks.*;
 
class MyCounter {
    private int n = 0;
    private Lock lock = new ReentrantLock();
 
    public void increment() {
        lock.lock();
        try {
            int temp = n + 1;
            Thread.sleep(10);
            n = temp;
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
 
    public int get() {
        return n;
    }
}

A példából már látható, hogy ez sokkal rugalmasabb mint a synchronized, pl. eleve nem kell, hogy ugyanazon a logikai szinten legyen a lock és az unlock. De lássuk, mi mindent tudnak még!

  • A Lock interfész:
    • lock() és unlock(): ezeket már láttuk.
    • tryLock(): ha szabad a lock, akkor megkapja, és igaz értékkel tér vissza. Ha nem akkor a visszatérési érték hamis lesz, de a futás azonnal folytatódik. Ennek párja az, amikor paraméterként meg lehet neki adni, hogy mennyi ideig próbálkozzon. (Ez az a példa, amikor csak akkor megyek be az orvoshoz, ha azonnal be tudok menni, vagy max. 10 percig várok. Ne feledjük: alapértelmezésben a lock mechanizmus olyan, hogy addig vár, amíg az erőforrás meg nem kapja, ami akár észszerűtlenül hosszú is lehet.)
    • lockInterruptibly(): a lock kívülről megszakítható. (A beteg elhatározza, hogy addig vár, amíg nem szólítják, de szól az orvos vagy egy családtag, hogy ne várjon tovább.)
    • newCondition(): tetszőleges számú feltételt (Condition osztály) lehet ennek segítségével a lockhoz hozzáadni. Ez hasonlóképpen működik, mint a már megismert wait-notify: az await(), a notify() ill. notifyAll() a már megismert technológia szerint működik. (Tegyük fel, hogy külön sor van a receptekért és a vizsgálatokért.)
  • ReentrantLock: alap Lock megvalósítás, amely a fenti eljárásokon túl többek között a következőket is tartalmazza:
    • getQueueLength(): az adott sorra várakozó erőforrások hosszát adja vissza. (Tehát pl. hányan állnak sorban az orvosnál.)
    • getWaitQueueLength(condition): adott Condition-re vonatkozó várakozók száma. (Pl. hányan várnak vérvételre.)
    • getHoldCount(): azt adja vissza, hogy az adott szál hányszor foglalta le a lock-ot (ugyanis többször is lefoglalhatja). (Ezt úgy lehetne elképzelni, hogy a beteg bent van az orvosnál, majd indítanak egy újabb folyamatot, pl. EKG-t; azaz mintha kétszer lenne bent: először egy általános vizsgálat miatt, másodszor pedig az EKG miatt. Tehát meg kell várni az EKG-t, majd a teljes vizsgálat végét, hogy a következő beeg is beléphessen.)
    • hasQueueTreads() ill. hasQueueTread(thread): segítségével lekérdezhetjük, hogy van-e várakozó szál, ill. egy adott szál éppen várakozik-e. (Van-e még valaki, aki sorban áll, mert ettől függ, hogy végrehajtanak-e egy időigényes vizsgálatot. A másikra példa: Mari néni itt van-e még.)
    • isFair(): be lehet állítani azt, hogy a legrégebb óta várakozó kapja meg legelőször a felszabaduló erőforrást. Ez az eljárás azzal tér vissza, hogy ez be van-e állítva. (Az orvosi példában ez nyilván azt jelenti, hogy a legrégebb óta sorban álló beteg léphet be következőnek.)
  • ReentrantReadWriteLock: a valóságban leggyakrabban író és olvasó műveletek vannak. Az eddigi megoldásokkal íráskor és olvasáskor is meg kellett szerezni a lockot. De valójában ezt optimálisabban is meg lehet valósítani: párhuzamosan több szál is olvashat biztonságosan, azt viszont biztosítani kell, hogy egyszerre több ne írhasson, ill. ha valamelyik ír, akkor egyik se olvashasson. Kicsit precízebben: két olvasó futhat egyszerre; egy olvasó megakasztja az írót; az író megakasztja az olvasót; az író megakasztja a többi írót. A való életben képzeljünk el egy piacot, ahol néha felülírják az árakat. Akárhány vásárló nézheti az árakat egyszerre. Írás közben jó, ha senki sem látja, mert a 200 forintos paradicsom árának kiírásánál lesz olyan időpillanat, amikor 20-at lát a vevő. Vásárlás közben (pl. az áru kiválasztása és a fizetés között) nem illik árat emelni. Azt is biztosítani kell, hogy egyszerre egy eladó írja ki az árat a táblára. Ezt deklarálja a ReadWriteLock interfész ill. valósítja meg a ReentrantReadWriteLock osztály.
    • readLock(): visszaad egy read lockot, melyet hagyományos módon tudunk használni olvasásra: lock(), unlock().
    • writeLock(): hasonló a read lockhoz, csak a célja írás, és nem olvasás.
    • getReadLockCount() és getWriteLockCount: azoknak a szálaknak a számát adja vissza, amelyek a read ill. write lockra várnak. (Van még több, kifejezetten a read ill. write lockkal kapcsolatos lekérdezéssel kapcsolatos eljárás, melyhez érdemes részletesebben megnézni a dokumentációt.)
    • isFair(): a konstruktorban át lehet adni azt, hogy igazságos-e a lock. Alapértelmezésben nem az, de ha beállítjuk, akkor garantált lesz az, hogy a legrégebb óta sorban álló kapja meg az erőforrást legelőször.
  • StampedLock: a tényleges ütközések száma, tehát amikor egyszerre akarnak többen írni és olvasni, igazából elég ritka. Ha mondjuk ezer esetből csak egyszer fordul elő, akkor valójában 999 esetben feleslegesen hajtottuk végre a lock-unlock műveletet. Ennek kezelésére találták ki az optimista lockolási mechanizmust: azt feltételezzük, hogy nem okoz problémát a végrehajtandó művelet, és ha kiderül, hogy mégis, akkor újra végrehajtjuk, immár rendes lockolással. (A valóságban ezt úgy lehetne elképzelni, hogy van egy hivatal, ahol rendkívül ritka az ügyfél. Nehéz ezt persze elképzelni, de most tegyük fel! Ebben a hivatalban azonnal kiszolgálják az ügyfelet, ha nincs más. Egyébként sorszámot kell kérni, a sorszámkiadó automata viszont egy másik épületben van. A fenti megvalósítás úgy működne, hogy elmegyünk sorszámot kérni, majd utána ügyet intézni. Ha viszont 1000 esetből 999-ben feleslegesen kérünk sorszámot, jobban megéri először megpróbálni elintézni az ügyet, és ha vannak előttünk, csak akkor átmenni sorszámot kérni. Ebben az egyetlen esetben ez a módszer persze hosszabb lesz, mintha egyből sorszámot kértük volna, pl. akár közben a sorszámkiadó automatánál meg is előzhetnek minket, mégis, hosszú távon jobban járunk azzal, ha először bekockáztatjuk azt, hogy sorszám nélkül is kiszolgálnak.) Ezt valósítja meg a Java-ban a StampedLock, ami az (egyébként rendkívül sok alapvető újítást tartalmazó) 1.8-as verzióban jelent meg. Ebben megjelenik a bélyegző (stamp) fogalma: az egyes műveletek egy bélyeget adnak vissza. Lássuk a tipikus műveleteket:
    • long stamp = lock.tryOptimisticRead();: olvasás optimista hozzáállással.
    • if (!lock.validate(stamp)) {…}: annak ellenőrzése, hogy a bélyegző még érvényes-e. Ezt tipikusan az írás után hajtjuk végre, és ha érvényes (a visszatérési érték igaz), az azt jelenti, hogy az olvasás során nem történt pl. változás az adatbázisban, tehát a kiolvasott érték megfelelő. Ha nem, akkor a visszatérési érték hamis, és ez esetben a hagyományos módon kell végrehajtanunk a kiolvasást.
    • stamp = lock.readLock();: ez a nem optimista lockolás: megvárjuk, amíg az írók befejezték az írást, nem feltételezzük, hogy nincs ilyen, és csak utána olvasunk.
    • lock.unlock(stamp);: a lock elengedésének a módja. Ezt tipikusan egy finally blokkban hajtjuk végre, hogy mindenképpen lefusson, és ne fordulhasson elő az, hogy a végtelenségig "beragad".

Szemafor

Sokszor előfordul az a probléma, hogy a terhelés során elfogy valamilyen erőforrás, mert túl sokan használják egyszerre a rendszert. Erre túl drasztikus az a megoldás, hogy egyszerre csak egy használhassa, viszont korlátokat mégis érdemes bevezetni, pl. azt, hogy egyszerre maximálisan hányan használhatják. Ezt valósítja meg a szemafor, a Java-ban a Semaphore osztály. Lássunk egy példát!

import java.util.concurrent.*;
 
class Resource {
    private Semaphore semaphore = new Semaphore(3);
 
    public void process() {
        try {
            System.out.println(Thread.currentThread().getName() + " Resource.process() called");
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName() + " Resource.process() semaphore acquired, processing");
            Thread.sleep(1000);
            semaphore.release();
            System.out.println(Thread.currentThread().getName() + " Resource.process() semaphore released");
        } catch (InterruptedException ie) {
            ie.printStackTrace();
        }
    }
}
 
class MyThread extends Thread {
    private Resource resource;
 
    public MyThread(String name, Resource resource) {
        super(name);
        this.resource = resource;
    }
 
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " started");
        resource.process();
        System.out.println(Thread.currentThread().getName() + " ended");
    }
 
}
 
public class SemaphoreExample {
    public static void main(String[] args) throws InterruptedException {
        Resource resource = new Resource();
        MyThread t1 = new MyThread("T1", resource);
        MyThread t2 = new MyThread("T2", resource);
        MyThread t3 = new MyThread("T3", resource);
        MyThread t4 = new MyThread("T4", resource);
        MyThread t5 = new MyThread("T5", resource);
        t1.start();
        Thread.sleep(10);
        t2.start();
        Thread.sleep(10);
        t3.start();
        Thread.sleep(10);
        t4.start();
        Thread.sleep(10);
        t5.start();
    }
}

A futás eredménye:

T1 started
T1 Resource.process() called
T1 Resource.process() semaphore acquired, processing
T2 started
T2 Resource.process() called
T2 Resource.process() semaphore acquired, processing
T3 started
T3 Resource.process() called
T3 Resource.process() semaphore acquired, processing
T4 started
T4 Resource.process() called
T5 started
T5 Resource.process() called
T1 Resource.process() semaphore released
T1 ended
T4 Resource.process() semaphore acquired, processing
T2 Resource.process() semaphore released
T2 ended
T5 Resource.process() semaphore acquired, processing
T3 Resource.process() semaphore released
T3 ended
T4 Resource.process() semaphore released
T4 ended
T5 Resource.process() semaphore released
T5 ended

Tehát a T4 csak a T1, a T5 pedig a T2 lefutása után jut szóhoz.

A szál pool mechanizmus

A pool-t a programozásban nagyjából úgy kell elképzelni, mint egy többablakos postahivatalt: egyszerre annyi ügyfelet tudnak kiszolgálni, ahány ablak van, és ha egy ablak felszabadul, akkor szólítják a következőt. A szemafor felfogható egyfajta kezdeti megvalósításnak: a fenti példát elképzelhetjük úgy is, hogy egyszerre öten érnek a postára, de csak 3 ablak van. A pool általában viszont jobban hasonlít a postai példára, mint csak az, hogy egyszerre kizárjuk azt, hogy egy adott határnál több szál fusson: azt is meg tudjuk mondani, hogy ki melyik erőforrást használja.

Ez a mechanizmus az 5-ös Java-ban jelent meg. Lássunk egy példát!

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
class CommandThread extends Thread {
    private String command;
 
    public CommandThread(String command) {
        this.command = command;
    }
 
    @Override
    public void run() {
        System.out.println("Thread " + Thread.currentThread().getName() + " started. Command: " + command);
        try {
            Thread.sleep(Math.round(Math.random() * 5000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Thread " + Thread.currentThread().getName() + " ended. Command: " + command);
    }
}
 
public class ThreadPoolExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            CommandThread commandThread = new CommandThread("" + i);
            executor.execute(commandThread);
          }
        executor.shutdown();
        while (!executor.isTerminated()) {}
        System.out.println("Finished all threads");
    }
}

A szál maga egy maximálisan 5 másodperc hosszú véletlen ideig vár. A főprogramban létrehozunk egy 5 szálat tartalmazó pool-t. A háttérben feltehetőleg előkészületeket hajt végre a rendszer; ilyen értelemben hasonlít a pool a postás példára: a postán akkor is van 5 ablak, ha épp zárva van a posta, és nem az első ügyfélnél kell azt kialakítani. Majd indítunk 10 szálat. Ebből az első ötöt elindítja azonnal, hozzárendeli a pool megfelelő elemeihez, majd ha valamelyik végzett, akkor indítja a következőt. A végén megvárja hogy mind befejezze a futást.

A java.util.concurrent csomagban érdemes még szétnézni, mert olyan osztályokat is találunk, amellyel a fentinél még jobban tudjuk vezérelni a szál pool-okat. Pl. a ThreadPoolExecutor segítségével megadhatunk egy maximális értéket, ami fölé már a várakozók száma sem mehet. (A postás hasonlattal ez olyan, hogy 5 ablak van de ha már 20-an bent tartózkodnak, akkor a 21. már sorba sem állhat.) Azt is meg tudjuk adni, hogy ha egy bizonyos ideig nincs kihasználva a pool akkor a lefoglalt erőforrást felszabadítja, és újraindítja, ha szükséges. (A postai példát folytatva: ha tartósan legfeljebb 3 az egyszerre bent tartózkodó ügyfelek száma, akkor két alakot bezárnak, és újra nyitják, ha ismét szükség lesz rá.)

A jövőbeli eredmény

Ha egy művelet hosszú ideig tart, aszinkron módon is meghívhatjuk úgy, hogy új szálat indítunk. Ez esetben nem kapjuk meg rögtön az eredményt, azt rendszeresen le kell kérdezni. Noha magunk is meg tudjuk valósítani például globálisan elérhető (statikus) változók segítségével, sokkal elegánsabb ezt megtenni a Java által nyújtott osztályokat használva. Callable és Future osztályok segítségével. A legegyszerűbb egy példán keresztül megismerni!

import java.util.concurrent.*;
 
class SquareCalculator {
    private ExecutorService executor = Executors.newSingleThreadExecutor();
 
    public Future<Integer> calculate(Integer input) {
        return executor.submit(() -> {
            Thread.sleep(1000);
            return input * input;
        });
    }
}
 
public class FutureExample {
    public static void main(String[] args) throws Exception {
        Future<Integer> future = new SquareCalculator().calculate(5);
        while (!future.isDone()) {
            System.out.println("Calculating...");
            Thread.sleep(300);
        }
        Integer result = future.get();
        System.out.println(result);
        System.exit(0);
    }
}

A példában a négyzetszámoló vár egy másodpercet, mielőtt visszatérne az eredménnyel, a hívó pedig aszinkron módon hívja, és 3 tizedmásodpercenként lekérdezi, hogy meg van-e már az eredmény, az isDone() függvény segítségével. Végül a get() adja vissza az eredményt. Ezen eljárások során különféle kivételek váltódhatnak ki, melyeket a példában nem kezeljük, hogy lehetőleg minél tömörebb marajdon a kód.

Összefoglalás

Az alfejezet hosszából is láthatjuk, hogy valójában mennyire összetett probléma a többszálúság megfelelő kezelése. A Java nyelvi elemekkel (synchronized), az Object által nyújtott alapfüggvényekkel (wait, notify, notifyAll) és standard könyvtárakkal (a java.util.concurrent csomagban található osztályok, interfészek) támogatja a többszálúságot.

Unless otherwise stated, the content of this page is licensed under Creative Commons Attribution-ShareAlike 3.0 License