Exécuteurs

Syntaxe

  • ThreadPoolExecutor

  • ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, unité TimeUnit, BlockingQueue<Runnable> workQueue)

  • ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, unité TimeUnit, BlockingQueue<Runnable> workQueue, gestionnaire RejectedExecutionHandler)

  • ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, unité TimeUnit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)

  • ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, unité TimeUnit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, gestionnaire RejectedExecutionHandler)

  • Executors.callable(PrivilegedAction<?> action)

  • Executors.callable(PrivilegedExceptionAction<?> action)

  • Executors.callable(Tâche exécutable)

  • Executors.callable(Tâche exécutable, résultat T)

  • Executors.defaultThreadFactory()

  • Executors.newCachedThreadPool()

  • Executors.newCachedThreadPool(ThreadFactory threadFactory)

  • Executors.newFixedThreadPool(int nThreads)

  • Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory)

  • Executors.newScheduledThreadPool(int corePoolSize)

  • Executors.newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)

  • Executors.newSingleThreadExecutor()

  • Executors.newSingleThreadExecutor(ThreadFactory threadFactory)

Paramètres

Paramètre Détail
corePoolSize Nombre minimum de threads à conserver dans le pool.
maximumPoolSize Nombre maximal de threads à autoriser dans le pool.
keepAliveTime Lorsque le nombre de threads est supérieur au cœur, les threads non principaux (threads inactifs en excès) attendront le temps défini par ce paramètre pour les nouvelles tâches avant de se terminer.
unité Unité de temps pour keepAliveTime.
délai d’attente le temps maximum d’attente
workQueue Le type de file d’attente que notre exécuteur va utiliser
usine de fil La fabrique à utiliser lors de la création de nouveaux threads
nThreads Le nombre de threads dans le pool
exécuteur testamentaire L’implémentation sous-jacente
tâche la tâche à exécuter
résultat Le résultat à retourner
agir L’action privilégiée pour courir
appelable La tâche sous-jacente

Les différents types de threadpools et de files d’attente expliqués ci-dessous ont été tirés des informations et des connaissances de la [documentation oracle][1] et du blog [Jakob Jenkov][2] où vous pouvez en apprendre beaucoup sur la concurrence en java.

Différents types de pools de threads

SingleThreadExecutor : exécuteur qui utilise un seul thread de travail fonctionnant sur une file d’attente illimitée et utilise la ThreadFactory fournie pour créer un nouveau thread si nécessaire. Contrairement au newFixedThreadPool(1, threadFactory) autrement équivalent, l’exécuteur renvoyé est garanti de ne pas être reconfigurable pour utiliser des threads supplémentaires.

FixedThreadPool : pool de threads qui réutilise un nombre fixe de threads fonctionnant sur une file d’attente illimitée partagée, en utilisant la ThreadFactory fournie pour créer de nouveaux threads si nécessaire. À tout moment, au plus les threads nThreads seront des tâches de traitement actives. Si des tâches supplémentaires sont soumises lorsque tous les threads sont actifs, elles attendront dans la file d’attente jusqu’à ce qu’un thread soit disponible. Si un thread se termine en raison d’un échec lors de l’exécution avant l’arrêt, un nouveau prendra sa place si nécessaire pour exécuter les tâches suivantes. Les threads du pool existeront jusqu’à ce qu’il soit explicitement arrêté.

CachedThreadPool : Pool de threads qui crée de nouveaux threads selon les besoins, mais réutilise les threads précédemment construits lorsqu’ils sont disponibles, et utilise la ThreadFactory fournie pour créer de nouveaux threads si nécessaire.

SingleThreadScheduledExecutor : exécuteur mono-thread qui peut planifier l’exécution de commandes après un délai donné ou leur exécution périodique. (Notez cependant que si ce thread unique se termine en raison d’un échec lors de l’exécution avant l’arrêt, un nouveau prendra sa place si nécessaire pour exécuter les tâches suivantes.) Les tâches sont garanties de s’exécuter séquentiellement et pas plus d’une tâche ne sera active n’importe quand. Contrairement au newScheduledThreadPool(1, threadFactory) autrement équivalent, l’exécuteur renvoyé est garanti de ne pas être reconfigurable pour utiliser des threads supplémentaires.

ScheduledThreadPool : pool de threads pouvant planifier l’exécution de commandes après un délai donné ou leur exécution périodique. Différents types de files d’attente de travail

Runnables personnalisés au lieu de Callables

Une autre bonne pratique pour vérifier quand nos threads sont terminés sans bloquer le thread en attente de récupération de l’objet Future de notre Callable est de créer notre propre implémentation pour Runnables, en l’utilisant avec la méthode execute().

Dans l’exemple suivant, je montre une classe personnalisée qui implémente Runnable avec un rappel interne, ce qui nous permet de savoir quand les exécutables sont terminés et de l’utiliser plus tard dans notre ThreadPool :

public class CallbackTask implements Runnable {
    private final Runnable mTask;
    private final RunnableCallback mCallback;

    public CallbackTask(Runnable task, RunnableCallback runnableCallback) {
        this.mTask = task;
        this.mCallback = runnableCallback;
    }

    public void run() {
        long startRunnable = System.currentTimeMillis();
        mTask.run();
        mCallback.onRunnableComplete(startRunnable);
    }

    public interface RunnableCallback {
        void onRunnableComplete(long runnableStartTime);
    }
}

Et voici notre implémentation ThreadExecutor :

public class ThreadExecutorExample implements ThreadExecutor {

    private static String TAG = "ThreadExecutorExample";
    public static final int THREADPOOL_SIZE = 4;
    private long mSubmittedTasks;
    private long mCompletedTasks;
    private long mNotCompletedTasks;

    private ThreadPoolExecutor mThreadPoolExecutor;

    public ThreadExecutorExample() {
        Log.i(TAG, "[ThreadExecutorImpl] Initializing ThreadExecutorImpl");
        Log.i(TAG, "[ThreadExecutorImpl] current cores: " + Runtime.getRuntime().availableProcessors());
        this.mThreadPoolExecutor =
            (ThreadPoolExecutor) Executors.newFixedThreadPool(THREADPOOL_SIZE);

    }

    @Override
    public void execute(Runnable runnable) {
        try {
            if (runnable == null) {
                Log.e(TAG, "[execute] Runnable to execute cannot be null");
                return;
            }
            Log.i(TAG, "[execute] Executing new Thread");

            this.mThreadPoolExecutor.execute(new CallbackTask(runnable, new CallbackTask.RunnableCallback() {

                @Override
                public void onRunnableComplete(long RunnableStartTime) {
                    mSubmittedTasks = mThreadPoolExecutor.getTaskCount();
                    mCompletedTasks = mThreadPoolExecutor.getCompletedTaskCount();
                    mNotCompletedTasks = mSubmittedTasks - mCompletedTasks; // approximate

                    Log.i(TAG, "[execute] [onRunnableComplete] Runnable complete in " + (System.currentTimeMillis() - RunnableStartTime) + "ms");
                    Log.i(TAG, "[execute] [onRunnableComplete] Current threads working " + mNotCompletedTasks);
                }
            }));
        }
        catch (Exception e) {
            e.printStackTrace();
            Log.e(TAG, "[execute] Error, shutDown the Executor");
            this.mThreadPoolExecutor.shutdown();
        }
    }
}


 /**
 * Executor thread abstraction created to change the execution context from any thread from out ThreadExecutor.
 */
interface ThreadExecutor  extends Executor {

    void execute(Runnable runnable);

}

J’ai fait cet exemple pour vérifier la vitesse de mes threads en millisecondes lorsqu’ils sont exécutés, sans utiliser Future. Vous pouvez prendre cet exemple et l’ajouter à votre application pour contrôler le fonctionnement des tâches simultanées et celles terminées/terminées. Vérifier à tout moment, le temps dont vous avez besoin pour exécuter ces threads.

Définir un nouveau ThreadPool

Un ThreadPool est un ExecutorService qui exécute chaque tâche soumise en utilisant l’un des plusieurs threads mis en pool, normalement configurés à l’aide des méthodes d’usine Executors.

Voici un code de base pour initialiser un nouveau ThreadPool en tant que singleton à utiliser dans votre application :

public final class ThreadPool {

    private static final String TAG = "ThreadPool";
    private static final int CORE_POOL_SIZE = 4;
    private static final int MAX_POOL_SIZE = 8;
    private static final int KEEP_ALIVE_TIME = 10; // 10 seconds
    private final Executor mExecutor;

    private static ThreadPool sThreadPoolInstance;

    private ThreadPool() {
        mExecutor = new ThreadPoolExecutor(
            CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME,
            TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    }

    public void execute(Runnable runnable) {
        mExecutor.execute(runnable);
    }

    public synchronized static ThreadPool getThreadPoolInstance() {
        if (sThreadPoolInstance == null) {
            Log.i(TAG, "[getThreadManagerInstance] New Instance");
            sThreadPoolInstance = new ThreadPool();
        }
        return sThreadPoolInstance;
    }
}

Vous avez deux façons d’appeler votre méthode exécutable, utilisez execute() ou submit(). la différence entre eux est que submit() renvoie un objet Future qui vous permet d’annuler par programme le thread en cours d’exécution lorsque l’objet T est renvoyé par le rappel Callable. Vous pouvez en savoir plus sur “Future” [ici][1]

[1] : http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Future.html

Futures et exigibles

L’une des fonctionnalités que nous pouvons utiliser avec Threadpools est la méthode submit() qui nous permet de savoir quand le thread a terminé son travail. Nous pouvons le faire grâce à l’objet Future, qui nous renvoie un objet du Callable que nous pouvons utiliser pour nos propres objectifs.

Voici un exemple d’utilisation de l’instance Callable :

public class CallablesExample{

//Create MyCustomCallable instance
List<Future<String>> mFutureList = new ArrayList<Future<String>>();

//Create a list to save the Futures from the Callable
Callable<String> mCallable = new MyCustomCallable();

public void main(String args[]){
    //Get ExecutorService from Executors utility class, Creating a 5 threads pool.
    ExecutorService executor = Executors.newFixedThreadPool(5);
   
    
   
    for (int i = 0; i < 100; i++) {
        //submit Callable tasks to be executed by thread pool
        Future<String> future = executor.submit(mCallable);
        //add Future to the list, we can get return value using Future
        mFutureList.add(future);
    }
    for (Future<String> fut : mFutureList) {
        try {
            //Print the return value of Future, Notice the output delay in console
            //because Future.get() stop the thread till the task have been completed
            System.out.println(new Date() + "::" + fut.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
    //Shut down the service
    executor.shutdown();
}

 class MyCustomCallable implements Callable<String> {

    @Override
    public String call() throws Exception {
        Thread.sleep(1000);
        //return the thread name executing this callable task
        return Thread.currentThread().getName();
    }
}
}

Comme vous pouvez le voir, nous créons un Threadpool avec 5 Threads, cela signifie que nous pouvons lancer 5 callables en parallèle. Lorsque les threads se terminent, nous obtiendrons un objet Future à partir de l’appelable, dans ce cas le nom du thread.

ATTENTION

Dans cet exemple, nous utilisons simplement les Futures comme un objet à l’intérieur du tableau pour savoir combien de threads nous exécutons et imprimons autant de fois une console de connexion avec les données que nous voulons. Mais, si nous voulons utiliser la méthode Future.get(), pour nous renvoyer les données que nous avons enregistrées auparavant dans l’appelable, nous bloquerons le thread jusqu’à ce que la tâche soit terminée. Soyez prudent avec ce type d’appels lorsque vous souhaitez le faire le plus rapidement possible

Ajout de ThreadFactory à l’exécuteur

Nous utilisons ExecutorService pour attribuer des threads à partir du pool de threads internes ou les créer à la demande pour effectuer des tâches. Chaque ExecutorService a une ThreadFactory, mais l’ExecutorService utilisera toujours une valeur par défaut si nous n’en définissons pas une personnalisée. Pourquoi devrions-nous faire cela?

  • Pour définir un nom de fil plus descriptif. ThreadFactory par défaut donne des noms de thread sous la forme pool-m-thread-n, tels que pool-1-thread-1, pool-2-thread-1, pool-3-thread-1, etc. Si vous essayez de déboguer ou surveiller quelque chose, il est difficile de savoir ce que font ces threads

  • Définissez un statut Daemon personnalisé, la ThreadFactory par défaut produit des résultats non démons.

  • Définissez la priorité sur nos threads, la ThreadFactory par défaut définit une priorité moyenne sur tous leurs threads.

  • Vous pouvez spécifier UncaughtExceptionHandler pour notre thread en utilisant setUncaughtExceptionHandler() sur l’objet thread. Ceci est rappelé lorsque la méthode run de Thread lève une exception non interceptée.

Voici une implémentation simple d’une ThreadFactory sur un ThreadPool.

public class ThreadExecutorExample implements ThreadExecutor {
private static String TAG = "ThreadExecutorExample";
private static final int INITIAL_POOL_SIZE = 3;
private static final int MAX_POOL_SIZE = 5;

// Sets the amount of time an idle thread waits before terminating
private static final int KEEP_ALIVE_TIME = 10;

// Sets the Time Unit to seconds
private static final TimeUnit KEEP_ALIVE_TIME_UNIT = TimeUnit.SECONDS;

private final BlockingQueue<Runnable> workQueue;

private final ThreadPoolExecutor threadPoolExecutor;

private final ThreadFactory threadFactory;
private ThreadPoolExecutor mThreadPoolExecutor;

public ThreadExecutorExample() {
    this.workQueue = new LinkedBlockingQueue<>();
    this.threadFactory = new CustomThreadFactory();
    this.threadPoolExecutor = new ThreadPoolExecutor(INITIAL_POOL_SIZE, MAX_POOL_SIZE,
            KEEP_ALIVE_TIME, KEEP_ALIVE_TIME_UNIT, this.workQueue, this.threadFactory);
}

public void execute(Runnable runnable) {
    if (runnable == null) {
        return;
    }
    this.threadPoolExecutor.execute(runnable);
}

private static class CustomThreadFactory implements ThreadFactory {
    private static final String THREAD_NAME = "thread_";
    private int counter = 0;

    @Override public Thread newThread(Runnable runnable) {
        return new Thread(runnable, THREAD_NAME + counter++);
    }
}
}

/**
 * Executor thread abstraction created to change the execution context from any thread from out ThreadExecutor.
 */
interface ThreadExecutor extends Executor {
    
    void execute(Runnable runnable);
    
}

Cet exemple modifie simplement le nom du Thread avec un compteur, mais nous pouvons le modifier aussi longtemps que nous le voulons.