Wie lässt sich die parallele Ausführung mehrerer Aufgaben (Tasks) kontrollieren?

Die Interfaces Future und ExecutorService des Package java.util.concurrent verwalten die Ausführung eines oder mehrerer nebenläufiger Tasks.

Future erzeugen

Im ersten Beispiel wird eine Liste erzeugt und mit Zufallszahlen zwischen 0 und 1000 gefüllt. Die dort gespeicherten Werte werden addiert und die Summe nach Fertigstellung der Liste ausgegeben. Während des Erstellens der Liste erscheint kontinuierlich eine Fortschrittsanzeige auf der Konsole.

Die Erzeugung der Liste erfolgt einschließlich der Berechnung der Summe der in ihr gespeicherten Werte in der Methode createList() [15]. Um diesen Prozess zu Demonstrationszwecken etwas zu verzögern, wird der ausführende Thread nach jedem Wert für einige Millisekunden pausiert [23]. Die Methode gibt schließlich die erwähnte Summe aller gespeicherten Werte als Integer zurück.

In main() wird ein Objekt vom Typ ExecutorService erzeugt, dessen Methode submit() dann createList() ausführt und ein Future-Objekt zurückgibt [32]. Die Methode submit() ist dreifach überladen und kann als Parameter sowohl Callable als auch Runnable verarbeiten. Da im vorliegenden Fall eine Integer-Rückgabe erfolgt, wird der Aufruf von createList() hier im Rahmen eines Lambda-Ausdrucks durch die Methode call() des funktionalen Interface Callable ausgeführt.
Dieser Mechanismus bewirkt die nebenläufige Ausführung der Methode createList() und deren Verwaltung im Future.

import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class FutureBsp {

    private static final int RAND_MAX = 1000;
    private static final int LIST_LENGTH = 100;
    private static final int SNORE = 20;
    private static int counter = 0;

    private static Integer createList() throws InterruptedException {
        ArrayList<Integer> target = new ArrayList<Integer>();
        Random rand = new Random();
        Integer sum = 0, num = 0;
        for (int i = 0; i < LIST_LENGTH; i++) {
            num = rand.nextInt(RAND_MAX + 1);
            sum += num;
            target.add(num);
            Thread.sleep(SNORE);
            counter = i;
        }
        return sum;
    }

    public static void main(String[] args) {
        Integer result = null;
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<Integer> f = executor.submit(() -> createList());
        try {
            while (!f.isDone()) {
                System.out.println("Liste zu " + counter + "% belegt!");
                Thread.sleep(SNORE * 25);
            }
            result = f.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
        }
        System.out.println("Summe Liste: " + result);
    }
}

Auf das im Future gespeicherte Ergebnis kann durch get() zugegriffen werden [38], jedoch erst, nachdem der Task vollständig abgearbeitet wurde. Durch isDone() kann dieser Status abgefragt werden [34], sodass zwischenzeitlich weitere Aufgaben ausgeführt werden können [35].

Mehrere Tasks ausführen

In obigem Beispiel erfolgt die Abarbeitung des Tasks durch einen Executor, der durch Aufruf von newSingleThreadExecutor() hierzu einen einzelnen Thread verwendet. Die Übergabe mehrerer Tasks (nacheinander) an submit() einer ExecutorService-Instanz führt entsprechend zu deren sequenzieller Abarbeitung. Ändert man das Programm wie im Folgenden gezeigt ab, indem man drei Tasks erzeugt, so ergibt sich die nachstehende Ausgabe.

import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class FutureBsp {

    private static final int RAND_MAX = 1000;
    private static final int LIST_LENGTH = 100;
    private static final int SNORE = 20;
    private static int counter0 = 0, counter1 = 0, counter2 = 0;

    private static Integer createList(int index) throws InterruptedException {
        ArrayList<Integer> target = new ArrayList<Integer>();
        Random rand = new Random();
        Integer sum = 0, num = 0, percent = 0;
        for (int i = 0; i < LIST_LENGTH; i++) {
            num = rand.nextInt(RAND_MAX + 1);
            sum += num;
            target.add(num);
            percent = (i+1) * 100 / LIST_LENGTH;
            switch(index) {
            case 0:
                counter0 = percent;
                break;
            case 1:
                counter1 = percent;
                break;
            case 2:
                counter2 = percent;
                break;
            }
            Thread.sleep(SNORE);
        }
        return sum;
    }

    public static void main(String[] args) {
        Integer result0 = null, result1 = null, result2 = null;
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<Integer> f0 = executor.submit(() -> createList(0));
        Future<Integer> f1 = executor.submit(() -> createList(1));
        Future<Integer> f2 = executor.submit(() -> createList(2));
        try {
            while (!f0.isDone() || !f1.isDone() || !f2.isDone()) {
                System.out.println("Liste0 zu " + counter0 + "%, Liste1 zu " + counter1 + "%, Liste2 zu " + counter2 + "% belegt.");
                Thread.sleep(SNORE * 25);
            }
            result0 = f0.get();
            result1 = f1.get();
            result2 = f2.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
        }
        System.out.println("Summe Liste 0: " + result0 + ", Summe Liste 1: " + result1 + ", Summe Liste 2: " + result2);
    }
}
// Ausgabe

Liste0 zu 1%, Liste1 zu 0%, Liste2 zu 0% belegt.
Liste0 zu 22%, Liste1 zu 0%, Liste2 zu 0% belegt.
Liste0 zu 42%, Liste1 zu 0%, Liste2 zu 0% belegt.
Liste0 zu 64%, Liste1 zu 0%, Liste2 zu 0% belegt.
Liste0 zu 85%, Liste1 zu 0%, Liste2 zu 0% belegt.
Liste0 zu 100%, Liste1 zu 5%, Liste2 zu 0% belegt.
Liste0 zu 100%, Liste1 zu 27%, Liste2 zu 0% belegt.
Liste0 zu 100%, Liste1 zu 48%, Liste2 zu 0% belegt.
Liste0 zu 100%, Liste1 zu 69%, Liste2 zu 0% belegt.
Liste0 zu 100%, Liste1 zu 90%, Liste2 zu 0% belegt.
Liste0 zu 100%, Liste1 zu 100%, Liste2 zu 11% belegt.
Liste0 zu 100%, Liste1 zu 100%, Liste2 zu 33% belegt.
Liste0 zu 100%, Liste1 zu 100%, Liste2 zu 54% belegt.
Liste0 zu 100%, Liste1 zu 100%, Liste2 zu 76% belegt.
Liste0 zu 100%, Liste1 zu 100%, Liste2 zu 97% belegt.
Summe Liste 0: 52506, Summe Liste 1: 52889, Summe Liste 2: 44307

Es ist deutlich zu erkennen, dass die drei Aufgaben durch Verwendung von newSingleThreadExecutor() nacheinander abgearbeitet werden.
Die Hilfs-Klasse Executors stellt jedoch auch Methoden zur Realisierung paralleler Ausführung mehrerer Aufgaben bereit. Ersetzt man lediglich den Methodenaufruf von newSingleThreadExecutor() durch newCachedThreadPool(), so ändert sich die Ausgabe durch die parallele Verwendung mehrerer Threads wie folgt:

// Ausgabe
            
Liste0 zu 1%, Liste1 zu 1%, Liste2 zu 1% belegt.
Liste0 zu 22%, Liste1 zu 22%, Liste2 zu 22% belegt.
Liste0 zu 43%, Liste1 zu 43%, Liste2 zu 43% belegt.
Liste0 zu 64%, Liste1 zu 64%, Liste2 zu 64% belegt.
Liste0 zu 85%, Liste1 zu 86%, Liste2 zu 85% belegt.
Summe Liste 0: 48800, Summe Liste 1: 51293, Summe Liste 2: 55855

Die Tasks werden parallel abgearbeitet. Hierzu wird ein Threadpool erzeugt, der die jeweils benötigte Anzahl an Threads selbstständig einschätzt und bereitstellt. Soll die Zahl der eingesetzten Threads begrenzt werden, so ist das durch Verwendung von newFixedThreadPool(int nThreads) möglich. Übersteigt bei Verwendung dieser Methode die Anzahl der abzuarbeitenden Tasks diejenige der zur Verfügung stehenden Threads, so werden diese nach Beendigung ihrer Aufgabe für die nächste wiederverwendet.

Wenn Ihnen javabeginners.de gefällt, freue ich mich über eine Spende an diese gemeinnützigen Organisationen.