Одно время у меня были некоторые сомнения имеет ли смысл публиковать эту статью, т.к. в целом мне нечего сказать такого, что нельзя было найти в официальной документации об invokeAll. Затем, при общении с другими программистами, стал время от времени замечать не совсем правильное (на мой взгляд) понимание его работы. Поэтому пока есть свободное время решил все-таки закончить эту заметку.
Сразу хочу заметить, что мне не очень хочется углубляться в описание всей мощи java concurrent API, а просто расставить некоторые акценты при работе над небольшой простой задачей. Она может возникнуть в жизни каждого программиста, он замечает некоторые независимые операции и у него появляется нестерпимое желание их распределить по нескольким потокам. Грубо говоря, у вас есть какой-то метод, который можно было бы безболезненно запихнуть в Runnable-ы, стартануть и подождать когда все закончат свою работу.
Понятно, что для этого может подойти invokeAll т.к. в описании про него явно сказано: “Executes the given tasks, returning a list of Futures holding their status and results when all complete.”
Так вот, некоторые программисты не дочитывают последние слова и думают, что нужно самому контролировать процесс, ждать и “жать на тормоз”, проверять все ли потоки завершились, периодически дергать в цикле future.get(), join-ить или делать другие различные телодвижения. Конечно так поступают не все, но тем не менее такое бывает. Для того, чтобы разобраться в том, что происходит на самом деле, достаточно взглянуть в базовую реализацию этого метода (AbstractExecutorService.java). Надо же пользоваться тем, что src.zip в JDK все еще продолжают подкладывать:
public List> invokeAll(Collection extends Callable> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
List> futures = new ArrayList>(tasks.size());
boolean done = false;
try {
for (Callable t : tasks) {
RunnableFuture f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (Future f : futures) {
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (Future f : futures)
f.cancel(true);
}
}
Легко видеть, что действительно как сказано в документации:
во-первых будем так или иначе ждать пока не выполнятся все задачи (т.к. дергаем f.get)
во-вторых разработчики API не слишком бережно относятся к обработке исключений (о чем тоже сказано в документации).
Суть метода — “притормозить” в текущей нитке, пока все задачи из коллекции которую мы передали не закончат свою работу или не рухнут (по крайней мере в базовой реализации этого метода).
Пример.
Откликаюсь на просьбу друзей приводить ссылки или финальный пример, который бы подводил бы некую черту и окончательно всё разъяснял. Сразу хотел обратить внимание на то, что приведенный исходный код может быть использован лишь в ознакомительных целях, в “боевых” задачах его использовать конечно же нельзя, но возможно пример исходного кода поможет как-то лучше понять происходящее. Еще раз, пожалуйста не используйте его “как есть” в своей работе.
Допустим есть вымышленная задача.
Дано:
1. Массив URL-ов (String).
2. Пустая директория.
Задание:
Скачать содержимое URL-ов и поместить его в файлы указанной директории.
Решение:
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Main {
public static void copy(String[] urls, Path dst) {
int THREADS = 4; // кол-во потоков
ExecutorService pool = Executors.newFixedThreadPool(THREADS);
List> tasks = new ArrayList<>();
try {
for (int i = 0; i < urls.length; ++i) {
final String url = urls[i];
final Path dstPath = dst.resolve(i + ".html");
// добавляем задачки
tasks.add(new Callable
Конечно в Java 8 аналогичный код с содержимым цикла будет смотреться более элегантно:
for (int i = 0; i < urls.length; ++i) {
String url = urls[i];
Path dstPath = dst.resolve(i + ".html");
// Да здравствуют лямбды!
tasks.add(() -> download(url, dstPath));
}
Применимость и граничные условия.
Из выше описанного думаю не сложно сделать вывод о том, что к использованию invokeAll следует подходить с пониманием происходящего и осторожностью.
Лично я использую его не слишком часто, как правило для написания “утилитных” программок, запускаемых из командной строки в обычной JVM. Например эпизодическая выгрузка данных, утилита для конвертации/загрузки чего-либо куда-либо и т.д., в условиях когда можно быстро в ручную проверить, что выполнение прошло успешно.
Также нужно учитывать, что при работе в Java EE на Application Server-ах так плодить потоки, мягко говоря, не рекомендуется, а спецификация Java EE прямо запрещает такие вольности с созданием и управлением потоков.
Известный шуточный принцип “Quod licet bovi, non licet Iovi”, как всегда проявляется на Java EE.
Думаю для Java EE программистов более привычна схема работы с асинхронностью/многопоточностью в ентерпрайзе:
– либо по старинке через JMS/MDB
– через managed thread (см. ManagedExecutorService)
– или выкручиваться на месте, используя умения и особенности конкретного Application Server-а.