Распараллеливание задач в Java через InvokeAll


Одно время у меня были некоторые сомнения имеет ли смысл публиковать эту статью, т.к. в целом мне нечего сказать такого, что нельзя было найти в официальной документации об invokeAll. Затем, при общении с другими программистами, стал время от времени замечать не совсем правильное (на мой взгляд) понимание его работы. Поэтому пока есть свободное время решил все-таки закончить эту заметку.

Сразу хочу заметить, что мне не очень хочется углубляться в описание всей мощи java concurrent API, а просто расставить некоторые акценты при работе над небольшой простой задачей. Она может возникнуть в жизни каждого программиста, он замечает некоторые независимые операции и у него появляется нестерпимое желание их распределить по нескольким потокам. Грубо говоря, у вас есть какой-то метод, который можно было бы безболезненно запихнуть в Runnable-ы, стартануть и подождать когда все закончат свою работу.

Понятно, что для этого может подойти invokeAll т.к. в описании про него явно сказано: “Executes the given tasks, returning a list of Futures holding their status and results when all complete.”

Так вот, некоторые программисты не дочитывают последние слова и думают, что нужно самому контролировать процесс, ждать и “жать на тормоз”, проверять все ли потоки завершились, периодически дергать в цикле future.get(), join-ить или делать другие различные телодвижения. Конечно так поступают не все, но тем не менее такое бывает. Для того, чтобы разобраться в том, что происходит на самом деле, достаточно взглянуть в базовую реализацию этого метода (AbstractExecutorService.java). Надо же пользоваться тем, что src.zip в JDK все еще продолжают подкладывать:

    public  List> invokeAll(Collection> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        List> futures = new ArrayList>(tasks.size());
        boolean done = false;
        try {
            for (Callable t : tasks) {
                RunnableFuture f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            for (Future f : futures) {
                if (!f.isDone()) {
                    try {
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future f : futures)
                    f.cancel(true);
        }
    }

Легко видеть, что действительно как сказано в документации:
во-первых будем так или иначе ждать пока не выполнятся все задачи (т.к. дергаем f.get)
во-вторых разработчики API не слишком бережно относятся к обработке исключений (о чем тоже сказано в документации).

Суть метода — “притормозить” в текущей нитке, пока все задачи из коллекции которую мы передали не закончат свою работу или не рухнут (по крайней мере в базовой реализации этого метода).

Пример.

Откликаюсь на просьбу друзей приводить ссылки или финальный пример, который бы подводил бы некую черту и окончательно всё разъяснял. Сразу хотел обратить внимание на то, что приведенный исходный код может быть использован лишь в ознакомительных целях, в “боевых” задачах его использовать конечно же нельзя, но возможно пример исходного кода поможет как-то лучше понять происходящее. Еще раз, пожалуйста не используйте его “как есть” в своей работе.

Допустим есть вымышленная задача.

Дано:
1. Массив URL-ов (String).
2. Пустая директория.

Задание:
Скачать содержимое URL-ов и поместить его в файлы указанной директории.

Решение:

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Main {
    public static void copy(String[] urls, Path dst) {
        int THREADS = 4; // кол-во потоков
        ExecutorService pool = Executors.newFixedThreadPool(THREADS);
        List> tasks = new ArrayList<>();
        try {
            for (int i = 0; i < urls.length; ++i) {
                final String url = urls[i];
                final Path dstPath = dst.resolve(i + ".html");
                // добавляем задачки
                tasks.add(new Callable() {
                    public Object call() throws Exception {
                        download(url, dstPath);
                        return null;
                    }
                });
            }
            // Запускаем пул потоков и ДОЖИДАЕМСЯ! 
            List> invokeAll = pool.invokeAll(tasks);
        } catch (InterruptedException e) {
            e.printStackTrace(); // так лучше не делать в продакшене
        } finally {
            pool.shutdown();
        }
    }
    // копируем содержимое url в dst (и так тоже лучше не делать в продакшене)
    private static long download(String url, Path dst) {
        try {
            URI u = URI.create(url);
            try (InputStream in = u.toURL().openStream()) {
                Files.copy(in, dst, StandardCopyOption.REPLACE_EXISTING);
            }
            System.out.printf("%s -> %s %n", url, dst);
            return dst.toFile().length();
        } catch (IOException e) {
            e.printStackTrace();
            return -1;
        }
    }

   public static void main(String[] args) throws Exception {
        String urls[] = {"http://oracle.com", "http://google.com", "http://apple.com",
            "http://ibm.com", "http://sap.com"};
        copy(urls, Paths.get("")); 
    }
}

Конечно в Java 8 аналогичный код с содержимым цикла будет смотреться более элегантно:

            for (int i = 0; i < urls.length; ++i) {
                String url = urls[i];
                Path dstPath = dst.resolve(i + ".html");
                // Да здравствуют лямбды!
                tasks.add(() -> download(url, dstPath));
            }

Применимость и граничные условия.

Из выше описанного думаю не сложно сделать вывод о том, что к использованию invokeAll следует подходить с пониманием происходящего и осторожностью.

Лично я использую его не слишком часто, как правило для написания “утилитных” программок, запускаемых из командной строки в обычной JVM. Например эпизодическая выгрузка данных, утилита для конвертации/загрузки чего-либо куда-либо и т.д., в условиях когда можно быстро в ручную проверить, что выполнение прошло успешно.

Также нужно учитывать, что при работе в Java EE на Application Server-ах так плодить потоки, мягко говоря, не рекомендуется, а спецификация Java EE прямо запрещает такие вольности с созданием и управлением потоков.

Известный шуточный принцип “Quod licet bovi, non licet Iovi”, как всегда проявляется на Java EE.

Думаю для Java EE программистов более привычна схема работы с асинхронностью/многопоточностью в ентерпрайзе:
– либо по старинке через JMS/MDB
– через managed thread (см. ManagedExecutorService)
– или выкручиваться на месте, используя умения и особенности конкретного Application Server-а.

Любое использование либо копирование материалов или подборки материалов сайта, элементов дизайна и оформления допускается лишь с разрешения правообладателя и только со ссылкой на источник: programador.ru

Телеграм канал: @prgrmdr
Почта для связи: vit [at] programmisty.com