Java基本構文と制御構造 課題集 (全30問)
初級問題 (9問) 基本構文とデータ型 中級問題 (15問) 制御構造 配列と制御構造 上級問題 (6問) 解答例 初級問題の解答 int age = 25; […]
StringProcessorを作成し、文字列処理を行うラムダ式を実装してください。public class LambdaBasic {
public static void main(String[] args) {
// 従来の無名クラス
Runnable traditionalRunnable = new Runnable() {
@Override
public void run() {
System.out.println("Hello Anonymous Class");
}
};
// ラムダ式
Runnable lambdaRunnable = () -> System.out.println("Hello Lambda");
traditionalRunnable.run();
lambdaRunnable.run();
}
}
import java.util.*;
public class LambdaComparator {
public static void main(String[] args) {
List names = Arrays.asList("John", "Alice", "Bob", "Charlie");
// 従来のComparator
Collections.sort(names, new Comparator() {
@Override
public int compare(String s1, String s2) {
return Integer.compare(s1.length(), s2.length());
}
});
// ラムダ式でのComparator
names.sort((s1, s2) -> Integer.compare(s1.length(), s2.length()));
// メソッド参照
names.sort(Comparator.comparingInt(String::length));
System.out.println(names);
}
}
// シンプルなイベントリスナーインターフェース
interface ClickListener {
void onClick(String message);
}
public class AnonymousVsLambda {
public static void main(String[] args) {
// 無名クラスでの実装
ClickListener anonymousListener = new ClickListener() {
@Override
public void onClick(String message) {
System.out.println("Anonymous: " + message);
}
};
// ラムダ式での実装
ClickListener lambdaListener = (message) ->
System.out.println("Lambda: " + message);
anonymousListener.onClick("Button clicked!");
lambdaListener.onClick("Button clicked!");
}
}
import java.util.*;
import java.util.stream.*;
public class StreamBasic {
public static void main(String[] args) {
List numbers = Arrays.asList(1, 2, 3, 4, 5);
// Streamを使用して各要素を2倍
List doubled = numbers.stream()
.map(n -> n * 2)
.collect(Collectors.toList());
System.out.println("Original: " + numbers);
System.out.println("Doubled: " + doubled);
// メソッド参照を使用
List tripled = numbers.stream()
.map(StreamBasic::triple) // 静的メソッド参照
.collect(Collectors.toList());
System.out.println("Tripled: " + tripled);
}
private static int triple(int n) {
return n * 3;
}
}
import java.util.*;
import java.util.stream.*;
public class StreamFilter {
public static void main(String[] args) {
List words = Arrays.asList("cat", "elephant", "dog", "butterfly", "ant");
// 5文字以上の単語を抽出
List longWords = words.stream()
.filter(word -> word.length() >= 5)
.collect(Collectors.toList());
System.out.println("All words: " + words);
System.out.println("Long words: " + longWords);
// 大文字に変換して收集
List upperCaseWords = words.stream()
.filter(word -> word.length() >= 5)
.map(String::toUpperCase)
.collect(Collectors.toList());
System.out.println("Upper case long words: " + upperCaseWords);
}
}
import java.util.*;
import java.util.stream.*;
class Person {
private String name;
private int age;
public Person(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() { return name; }
public int getAge() { return age; }
}
public class StreamMap {
public static void main(String[] args) {
List people = Arrays.asList(
new Person("Alice", 25),
new Person("Bob", 30),
new Person("Charlie", 35)
);
// 名前だけを抽出
List names = people.stream()
.map(Person::getName)
.collect(Collectors.toList());
System.out.println("Names: " + names);
// 年齢だけを抽出
List ages = people.stream()
.map(Person::getAge)
.collect(Collectors.toList());
System.out.println("Ages: " + ages);
}
}
public class ThreadBasic {
// Threadクラスを継承する方法
static class MyThread extends Thread {
@Override
public void run() {
for (int i = 1; i <= 5; i++) {
System.out.println("Thread extends: " + i);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// Runnableインターフェースを実装する方法
static class MyRunnable implements Runnable {
@Override
public void run() {
for (int i = 1; i <= 5; i++) {
System.out.println("Runnable: " + i);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
// Threadクラス継承
MyThread thread1 = new MyThread();
thread1.start();
// Runnableインターフェース実装
Thread thread2 = new Thread(new MyRunnable());
thread2.start();
// メインスレッドも実行
for (int i = 1; i <= 5; i++) {
System.out.println("Main thread: " + i);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class LambdaThread {
public static void main(String[] args) {
// ラムダ式でRunnableを実装
Thread thread1 = new Thread(() -> {
for (int i = 1; i <= 3; i++) {
System.out.println("Thread 1 - Count: " + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread thread2 = new Thread(() -> {
for (int i = 1; i <= 3; i++) {
System.out.println("Thread 2 - Count: " + i);
try {
Thread.sleep(800);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread thread3 = new Thread(() -> {
for (int i = 1; i <= 3; i++) {
System.out.println("Thread 3 - Count: " + i);
try {
Thread.sleep(600);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread1.start();
thread2.start();
thread3.start();
}
}
public class ThreadCoordination {
private static boolean mainTurn = true;
public static void main(String[] args) {
Object lock = new Object();
Thread workerThread = new Thread(() -> {
for (int i = 1; i <= 5; i++) {
synchronized (lock) {
while (mainTurn) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Worker: Message " + i);
mainTurn = true;
lock.notify();
}
}
});
workerThread.start();
for (int i = 1; i <= 5; i++) {
synchronized (lock) {
while (!mainTurn) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Main: Message " + i);
mainTurn = false;
lock.notify();
}
}
}
}
@FunctionalInterface
interface StringProcessor {
String process(String input);
// デフォルトメソッド
default StringProcessor andThen(StringProcessor after) {
return input -> after.process(this.process(input));
}
}
public class CustomFunctionalInterface {
public static void main(String[] args) {
// 大文字に変換
StringProcessor toUpper = String::toUpperCase;
// 感嘆符を追加
StringProcessor addExclamation = s -> s + "!";
// 複数回繰り返し
StringProcessor repeatTwice = s -> s + " " + s;
// チェーン処理
StringProcessor pipeline = toUpper
.andThen(addExclamation)
.andThen(repeatTwice);
String result = pipeline.process("hello");
System.out.println("Result: " + result);
// 様々な処理
System.out.println(toUpper.process("java"));
System.out.println(addExclamation.process("wow"));
}
}
import java.util.*;
import java.util.function.*;
class Calculator {
public static int square(int x) {
return x * x;
}
public int cube(int x) {
return x * x * x;
}
}
public class MethodReferences {
public static void main(String[] args) {
List names = Arrays.asList("Alice", "Bob", "Charlie");
// 1. 静的メソッド参照
Function squareRef = Calculator::square;
System.out.println("Square: " + squareRef.apply(5));
// 2. インスタンスメソッド参照(特定のオブジェクト)
Calculator calc = new Calculator();
Function cubeRef = calc::cube;
System.out.println("Cube: " + cubeRef.apply(3));
// 3. インスタンスメソッド参照(任意のオブジェクト)
names.forEach(System.out::println);
// 4. コンストラクタ参照
Function stringCreator = String::new;
String newString = stringCreator.apply("Hello");
System.out.println("New string: " + newString);
// コレクションでの使用例
List numbers = Arrays.asList(1, 2, 3, 4, 5);
// メソッド参照でマッピング
List squares = numbers.stream()
.map(Calculator::square)
.collect(Collectors.toList());
System.out.println("Squares: " + squares);
}
}
public class LambdaVariableCapture {
private String instanceVariable = "Instance Variable";
public void demonstrateCapture() {
String effectivelyFinal = "Effectively Final";
final String explicitlyFinal = "Explicitly Final";
// ラムダ式で外部変数をキャプチャ
Runnable lambda = () -> {
// インスタンス変数はアクセス可能
System.out.println(instanceVariable);
// 実質的final変数はアクセス可能
System.out.println(effectivelyFinal);
// 明示的final変数もアクセス可能
System.out.println(explicitlyFinal);
// インスタンス変数の変更は可能
instanceVariable = "Modified Instance Variable";
};
// ラムダ実行前
System.out.println("Before lambda: " + instanceVariable);
lambda.run();
// ラムダ実行後
System.out.println("After lambda: " + instanceVariable);
// 以下のコードはコンパイルエラー(実質的finalでなくなる)
// effectivelyFinal = "Modified";
}
public static void main(String[] args) {
new LambdaVariableCapture().demonstrateCapture();
}
}
import java.util.*;
import java.util.stream.*;
public class StreamFlatMap {
public static void main(String[] args) {
// 複数のリスト
List> nestedLists = Arrays.asList(
Arrays.asList("Apple", "Banana"),
Arrays.asList("Orange", "Grape", "Mango"),
Arrays.asList("Peach")
);
// flatMapで単一のStreamに変換
List flatList = nestedLists.stream()
.flatMap(List::stream)
.collect(Collectors.toList());
System.out.println("Flat list: " + flatList);
// 文字列を文字に分解
List words = Arrays.asList("Hello", "World");
List characters = words.stream()
.flatMap(word -> word.chars().mapToObj(c -> (char)c))
.collect(Collectors.toList());
System.out.println("Characters: " + characters);
// 複雑な例:オブジェクトのリストをflatMap
class Order {
List items;
Order(List items) { this.items = items; }
List getItems() { return items; }
}
List orders = Arrays.asList(
new Order(Arrays.asList("Laptop", "Mouse")),
new Order(Arrays.asList("Keyboard", "Monitor", "Headphones"))
);
List allItems = orders.stream()
.flatMap(order -> order.getItems().stream())
.collect(Collectors.toList());
System.out.println("All items: " + allItems);
}
}
import java.util.*;
import java.util.stream.*;
public class StreamReduce {
public static void main(String[] args) {
List numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 合計を求める
Optional sum = numbers.stream()
.reduce(Integer::sum);
System.out.println("Sum: " + sum.orElse(0));
// 初期値を使用した合計
Integer sumWithIdentity = numbers.stream()
.reduce(0, Integer::sum);
System.out.println("Sum with identity: " + sumWithIdentity);
// 最大値を求める
Optional max = numbers.stream()
.reduce(Integer::max);
System.out.println("Max: " + max.orElse(0));
// 最小値を求める
Optional min = numbers.stream()
.reduce(Integer::min);
System.out.println("Min: " + min.orElse(0));
// 文字列の結合
List words = Arrays.asList("Hello", "World", "Java", "Stream");
String combined = words.stream()
.reduce("", (s1, s2) -> s1 + " " + s2)
.trim();
System.out.println("Combined: " + combined);
// 複雑なreduce:平均を計算
double average = numbers.stream()
.mapToInt(Integer::intValue)
.average()
.orElse(0.0);
System.out.println("Average: " + average);
}
}
import java.util.*;
import java.util.stream.*;
class Employee {
private String name;
private String department;
private int salary;
private int age;
public Employee(String name, String department, int salary, int age) {
this.name = name;
this.department = department;
this.salary = salary;
this.age = age;
}
// getters
public String getName() { return name; }
public String getDepartment() { return department; }
public int getSalary() { return salary; }
public int getAge() { return age; }
@Override
public String toString() { return name + "(" + department + ")"; }
}
public class StreamGrouping {
public static void main(String[] args) {
List employees = Arrays.asList(
new Employee("Alice", "IT", 5000, 25),
new Employee("Bob", "HR", 4000, 30),
new Employee("Charlie", "IT", 6000, 35),
new Employee("Diana", "HR", 4500, 28),
new Employee("Eve", "Finance", 5500, 32),
new Employee("Frank", "IT", 5200, 29)
);
// 部門別グループ化
Map> byDepartment = employees.stream()
.collect(Collectors.groupingBy(Employee::getDepartment));
System.out.println("By Department:");
byDepartment.forEach((dept, emps) ->
System.out.println(dept + ": " + emps));
// 部門別の人数集計
Map countByDept = employees.stream()
.collect(Collectors.groupingBy(
Employee::getDepartment,
Collectors.counting()
));
System.out.println("\nCount by Department:");
countByDept.forEach((dept, count) ->
System.out.println(dept + ": " + count));
// 部門別の平均給与
Map avgSalaryByDept = employees.stream()
.collect(Collectors.groupingBy(
Employee::getDepartment,
Collectors.averagingInt(Employee::getSalary)
));
System.out.println("\nAverage Salary by Department:");
avgSalaryByDept.forEach((dept, avg) ->
System.out.printf("%s: %.2f\n", dept, avg));
// 年齢層別グループ化
Map> byAgeGroup = employees.stream()
.collect(Collectors.groupingBy(emp -> {
if (emp.getAge() < 30) return "Under 30";
else if (emp.getAge() < 40) return "30-39";
else return "40 and above";
}));
System.out.println("\nBy Age Group:");
byAgeGroup.forEach((group, emps) ->
System.out.println(group + ": " + emps));
}
}
import java.util.*;
import java.util.stream.*;
import java.util.concurrent.*;
public class ParallelStreamDemo {
public static void main(String[] args) {
// 大きなリストを作成
List numbers = IntStream.range(0, 10_000_000)
.boxed()
.collect(Collectors.toList());
// 逐次処理の時間計測
long startTime = System.currentTimeMillis();
long sequentialSum = numbers.stream()
.mapToLong(Integer::longValue)
.sum();
long sequentialTime = System.currentTimeMillis() - startTime;
// 並列処理の時間計測
startTime = System.currentTimeMillis();
long parallelSum = numbers.parallelStream()
.mapToLong(Integer::longValue)
.sum();
long parallelTime = System.currentTimeMillis() - startTime;
System.out.println("Sequential sum: " + sequentialSum + " in " + sequentialTime + "ms");
System.out.println("Parallel sum: " + parallelSum + " in " + parallelTime + "ms");
System.out.println("Speedup: " + (double)sequentialTime/parallelTime + "x");
// 並列ストリームのスレッド使用状況を確認
System.out.println("\nAvailable processors: " + Runtime.getRuntime().availableProcessors());
// 並列ストリームでスレッド情報を表示
numbers.parallelStream()
.limit(20)
.forEach(n -> {
System.out.println(Thread.currentThread().getName() + " processing: " + n);
});
// 注意点:状態を持つ操作は避ける
List problematicList = Collections.synchronizedList(new ArrayList<>());
numbers.parallelStream()
.filter(n -> n % 2 == 0)
.forEach(problematicList::add); // スレッドセーフだが非効率
System.out.println("Filtered count: " + problematicList.size());
// 正しい方法
List correctList = numbers.parallelStream()
.filter(n -> n % 2 == 0)
.collect(Collectors.toList());
System.out.println("Correct filtered count: " + correctList.size());
}
}
import java.util.*;
import java.util.stream.*;
public class LazyEvaluation {
public static void main(String[] args) {
List names = Arrays.asList("Alice", "Bob", "Charlie", "David", "Eve");
System.out.println("Demonstrating Lazy Evaluation:");
// 中間操作だけ定義(まだ実行されない)
Stream intermediateStream = names.stream()
.filter(name -> {
System.out.println("Filtering: " + name);
return name.length() > 3;
})
.map(name -> {
System.out.println("Mapping: " + name);
return name.toUpperCase();
});
System.out.println("Intermediate operations defined, but not executed yet.");
// 終端操作で初めて実行される
System.out.println("\nNow executing terminal operation:");
List result = intermediateStream.collect(Collectors.toList());
System.out.println("Result: " + result);
// 短絡操作の例
System.out.println("\nShort-circuit operation example:");
Optional firstLongName = names.stream()
.filter(name -> {
System.out.println("Short-circuit filtering: " + name);
return name.length() > 4;
})
.findFirst();
System.out.println("First long name: " + firstLongName.orElse("None"));
// 無限ストリームと遅延評価
System.out.println("\nInfinite stream example:");
Stream.iterate(1, n -> n + 1)
.map(n -> {
System.out.println("Processing: " + n);
return n * 2;
})
.filter(n -> n > 10)
.limit(3)
.forEach(System.out::println);
}
}
import java.util.*;
import java.util.stream.*;
class User {
private String name;
private String email;
public User(String name, String email) {
this.name = name;
this.email = email;
}
public String getName() { return name; }
public Optional getEmail() {
return Optional.ofNullable(email);
}
}
public class OptionalStream {
public static void main(String[] args) {
List users = Arrays.asList(
new User("Alice", "alice@example.com"),
new User("Bob", null), // メールなし
new User("Charlie", "charlie@example.com"),
new User("Diana", null)
);
// 従来のnullチェック
System.out.println("Traditional null check:");
for (User user : users) {
if (user.getEmail() != null && user.getEmail().isPresent()) {
System.out.println(user.getName() + ": " + user.getEmail().get());
}
}
// Stream + Optionalでのエレガントな処理
System.out.println("\nStream with Optional:");
List validEmails = users.stream()
.map(User::getEmail) // OptionalのStream
.flatMap(Optional::stream) // 値があるものだけをStreamに変換
.collect(Collectors.toList());
System.out.println("Valid emails: " + validEmails);
// デフォルト値の使用
System.out.println("\nEmails with defaults:");
users.stream()
.map(user -> user.getEmail().orElse("no-email@example.com"))
.forEach(email -> System.out.println("Email: " + email));
// 条件付き処理
System.out.println("\nConditional processing:");
users.forEach(user -> {
user.getEmail()
.ifPresentOrElse(
email -> System.out.println(user.getName() + " has email: " + email),
() -> System.out.println(user.getName() + " has no email")
);
});
// 複雑なチェーン
System.out.println("\nComplex chain:");
Optional firstValidEmail = users.stream()
.map(User::getEmail)
.flatMap(Optional::stream)
.filter(email -> email.contains("example"))
.findFirst();
firstValidEmail.ifPresent(email ->
System.out.println("First valid email: " + email));
}
}
import java.util.*;
class SharedCounter {
private int count = 0;
// synchronizedメソッド
public synchronized void increment() {
count++;
}
// synchronizedブロック
public void decrement() {
synchronized(this) {
count--;
}
}
public synchronized int getCount() {
return count;
}
}
public class SynchronizedExample {
public static void main(String[] args) throws InterruptedException {
SharedCounter counter = new SharedCounter();
List threads = new ArrayList<>();
// インクリメントスレッドを100個作成
for (int i = 0; i < 100; i++) {
Thread thread = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counter.increment();
}
});
threads.add(thread);
}
// すべてのスレッドを開始
threads.forEach(Thread::start);
// すべてのスレッドの終了を待機
for (Thread thread : threads) {
thread.join();
}
System.out.println("Final count: " + counter.getCount());
System.out.println("Expected: " + (100 * 1000));
// 異なるオブジェクトでの同期
Object lock1 = new Object();
Object lock2 = new Object();
Thread t1 = new Thread(() -> {
synchronized(lock1) {
System.out.println("Thread 1 acquired lock1");
try { Thread.sleep(100); } catch (InterruptedException e) {}
synchronized(lock2) {
System.out.println("Thread 1 acquired lock2");
}
}
});
Thread t2 = new Thread(() -> {
synchronized(lock2) {
System.out.println("Thread 2 acquired lock2");
try { Thread.sleep(100); } catch (InterruptedException e) {}
synchronized(lock1) {
System.out.println("Thread 2 acquired lock1");
}
}
});
t1.start();
t2.start();
t1.join();
t2.join();
}
}
import java.util.*;
class MessageQueue {
private Queue queue = new LinkedList<>();
private int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
}
public synchronized void produce(String message) throws InterruptedException {
while (queue.size() == capacity) {
System.out.println("Queue full, producer waiting...");
wait();
}
queue.add(message);
System.out.println("Produced: " + message);
notifyAll(); // 消費者に通知
}
public synchronized String consume() throws InterruptedException {
while (queue.isEmpty()) {
System.out.println("Queue empty, consumer waiting...");
wait();
}
String message = queue.poll();
System.out.println("Consumed: " + message);
notifyAll(); // 生産者に通知
return message;
}
}
public class ProducerConsumer {
public static void main(String[] args) {
MessageQueue queue = new MessageQueue(5);
// 生産者スレッド
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
queue.produce("Message " + i);
Thread.sleep(500);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 消費者スレッド
Thread consumer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
queue.consume();
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
try {
producer.join();
consumer.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Producer-Consumer completed");
}
}
import java.util.concurrent.*;
import java.util.*;
public class ExecutorServiceDemo {
public static void main(String[] args) throws InterruptedException {
// 固定サイズのスレッドプール
ExecutorService executor = Executors.newFixedThreadPool(3);
List> futures = new ArrayList<>();
// 10個のタスクを実行
for (int i = 1; i <= 10; i++) {
final int taskId = i;
Future future = executor.submit(() -> {
System.out.println("Task " + taskId + " started by " +
Thread.currentThread().getName());
try {
Thread.sleep(1000); // シミュレートされた作業
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Result of task " + taskId;
});
futures.add(future);
}
// 結果の収集
for (Future future : futures) {
try {
String result = future.get();
System.out.println("Future result: " + result);
} catch (ExecutionException e) {
e.printStackTrace();
}
}
// スレッドプールのシャットダウン
executor.shutdown();
boolean terminated = executor.awaitTermination(10, TimeUnit.SECONDS);
System.out.println("All tasks completed: " + terminated);
// さまざまなExecutorServiceの例
System.out.println("\n--- Different Executor Services ---");
// 単一スレッドExecutor
ExecutorService singleThread = Executors.newSingleThreadExecutor();
singleThread.submit(() -> System.out.println("Single thread task"));
singleThread.shutdown();
// キャッシュスレッドプール
ExecutorService cachedPool = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
cachedPool.submit(() -> {
System.out.println("Cached pool task by " + Thread.currentThread().getName());
});
}
cachedPool.shutdown();
// スケジュールされたExecutor
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
scheduler.schedule(() ->
System.out.println("Delayed task"), 2, TimeUnit.SECONDS);
scheduler.scheduleAtFixedRate(() ->
System.out.println("Fixed rate task"), 1, 3, TimeUnit.SECONDS);
Thread.sleep(10000);
scheduler.shutdown();
}
}
ComputationTask.java
import java.util.concurrent.Callable;
public class ComputationTask implements Callable {
private final int number;
public ComputationTask(int number) {
this.number = number;
}
@Override
public Long call() throws Exception {
System.out.println("Computing factorial of " + number +
" in " + Thread.currentThread().getName());
long result = 1;
for (int i = 2; i <= number; i++) {
result *= i;
Thread.sleep(10); // 計算時間をシミュレート
}
return result;
}
}
NetworkTask.java
import java.util.concurrent.Callable;
public class NetworkTask implements Callable {
private final String url;
public NetworkTask(String url) {
this.url = url;
}
@Override
public String call() throws Exception {
System.out.println("Fetching data from " + url +
" in " + Thread.currentThread().getName());
// ネットワーク遅延をシミュレート
Thread.sleep(2000);
return "Data from " + url;
}
}
CallableFutureDemo.java
import java.util.concurrent.*;
import java.util.*;
public class CallableFutureDemo {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(3);
// 複数のCallableタスクを作成
List<Callable<Object>> tasks = new ArrayList<>();
// 計算タスク
tasks.add(Executors.callable(new ComputationTask(10)));
tasks.add(Executors.callable(new ComputationTask(15)));
// ネットワークタスク
tasks.add(Executors.callable(new NetworkTask("http://api.example.com/data")));
tasks.add(Executors.callable(new NetworkTask("http://api.example.com/users")));
// 一括実行
System.out.println("Submitting all tasks...");
List<Future<Object>> futures = executor.invokeAll(tasks);
// 結果の処理
System.out.println("\nProcessing results:");
for (int i = 0; i < futures.size(); i++) {
Future<Object> future = futures.get(i);
try {
Object result = future.get(5, TimeUnit.SECONDS);
System.out.println("Task " + i + " result: " + result);
} catch (TimeoutException e) {
System.out.println("Task " + i + " timed out");
future.cancel(true);
} catch (ExecutionException e) {
System.out.println("Task " + i + " failed: " + e.getCause());
}
}
// Futureの状態確認
System.out.println("\nFuture states:");
Future<Long> computationFuture = executor.submit(new ComputationTask(20));
// 非同期で結果を待機しながら他の作業
while (!computationFuture.isDone()) {
System.out.println("Waiting for computation...");
Thread.sleep(500);
}
if (computationFuture.isDone() && !computationFuture.isCancelled()) {
Long result = computationFuture.get();
System.out.println("Final computation result: " + result);
}
executor.shutdown();
boolean terminated = executor.awaitTermination(5, TimeUnit.SECONDS);
System.out.println("Executor terminated: " + terminated);
}
}
FetchUserDataService.java
// FetchUserDataService.java
import java.util.concurrent.CompletableFuture;
public class FetchUserDataService {
public static CompletableFuture fetchUserData(int userId) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "User data for ID: " + userId;
});
}
}
CalculateScoreService.java
// CalculateScoreService.java
import java.util.concurrent.CompletableFuture;
public class CalculateScoreService {
public static CompletableFuture calculateScore(String userData) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return userData.length() * 10;
});
}
}
SendNotificationService.java
// SendNotificationService.java
import java.util.concurrent.CompletableFuture;
public class SendNotificationService {
public static CompletableFuture sendNotification(int score) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Notification sent for score: " + score;
});
}
}
CompletableFutureDemo.java
// CompletableFutureDemo.java
import java.util.concurrent.*;
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Hello, CompletableFuture!";
});
future.thenAccept(result ->
System.out.println("Basic result: " + result));
System.out.println("Starting chained operations...");
CompletableFuture<Void> chainedFuture = FetchUserDataService.fetchUserData(123)
.thenApply(userData -> {
System.out.println("Processing: " + userData);
return userData.toUpperCase();
})
.thenCompose(CalculateScoreService::calculateScore)
.thenCompose(SendNotificationService::sendNotification)
.thenAccept(notification ->
System.out.println("Final: " + notification));
chainedFuture.get();
System.out.println("\nCombining multiple futures...");
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Result 1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Result 2");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Result 3");
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
allFutures.thenRun(() -> {
try {
System.out.println("All completed: " + future1.get() + ", " +
future2.get() + ", " + future3.get());
} catch (Exception e) {
e.printStackTrace();
}
});
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(
CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(2000); return "Slow"; }
catch (InterruptedException e) { throw new RuntimeException(e); }
}),
CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(500); return "Fast"; }
catch (InterruptedException e) { throw new RuntimeException(e); }
})
);
anyFuture.thenAccept(result ->
System.out.println("First completed: " + result));
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Simulated error");
}
return "Success";
}).exceptionally(throwable -> {
System.out.println("Handled exception: " + throwable.getMessage());
return "Default value";
}).thenAccept(result ->
System.out.println("Exception handling result: " + result));
Thread.sleep(3000);
System.out.println("Demo completed");
}
}
import java.util.*;
import java.util.stream.*;
class Employee {
private String name;
private String department;
private double salary;
public Employee(String name, String department, double salary) {
this.name = name;
this.department = department;
this.salary = salary;
}
public String getName() { return name; }
public String getDepartment() { return department; }
public double getSalary() { return salary; }
@Override
public String toString() {
return name + "(" + department + "): $" + salary;
}
}
public class EmployeeAnalysis {
public static void main(String[] args) {
List employees = Arrays.asList(
new Employee("Alice", "Engineering", 75000),
new Employee("Bob", "Engineering", 80000),
new Employee("Charlie", "Engineering", 90000),
new Employee("Diana", "Marketing", 60000),
new Employee("Eve", "Marketing", 65000),
new Employee("Frank", "Sales", 55000),
new Employee("Grace", "Sales", 60000),
new Employee("Henry", "HR", 50000),
new Employee("Ivy", "HR", 52000)
);
// 部門別の平均給与
System.out.println("Average salary by department:");
Map avgSalaryByDept = employees.stream()
.collect(Collectors.groupingBy(
Employee::getDepartment,
Collectors.averagingDouble(Employee::getSalary)
));
avgSalaryByDept.forEach((dept, avg) ->
System.out.printf("%s: $%.2f\n", dept, avg));
// 部門別の最高給与
System.out.println("\nMax salary by department:");
Map> maxSalaryByDept = employees.stream()
.collect(Collectors.groupingBy(
Employee::getDepartment,
Collectors.maxBy(Comparator.comparing(Employee::getSalary))
));
maxSalaryByDept.forEach((dept, emp) ->
emp.ifPresent(e ->
System.out.printf("%s: %s\n", dept, e)));
// 給与階層別グループ化
System.out.println("\nEmployees by salary range:");
Map> bySalaryRange = employees.stream()
.collect(Collectors.groupingBy(emp -> {
if (emp.getSalary() < 60000) return "Low (< 60k)";
else if (emp.getSalary() < 80000) return "Medium (60k-80k)";
else return "High (> 80k)";
}));
bySalaryRange.forEach((range, emps) -> {
System.out.println(range + ":");
emps.forEach(emp -> System.out.println(" " + emp));
});
// 部門別の給与統計
System.out.println("\nSalary statistics by department:");
Map statsByDept = employees.stream()
.collect(Collectors.groupingBy(
Employee::getDepartment,
Collectors.summarizingDouble(Employee::getSalary)
));
statsByDept.forEach((dept, stats) ->
System.out.printf("%s: count=%d, avg=%.2f, min=%.2f, max=%.2f\n",
dept, stats.getCount(), stats.getAverage(),
stats.getMin(), stats.getMax()));
// 給与順トップ3
System.out.println("\nTop 3 highest paid employees:");
employees.stream()
.sorted(Comparator.comparing(Employee::getSalary).reversed())
.limit(3)
.forEach(emp -> System.out.println(emp));
// 部門別の給与合計
System.out.println("\nTotal salary cost by department:");
Map totalSalaryByDept = employees.stream()
.collect(Collectors.groupingBy(
Employee::getDepartment,
Collectors.summingDouble(Employee::getSalary)
));
totalSalaryByDept.forEach((dept, total) ->
System.out.printf("%s: $%.2f\n", dept, total));
}
}
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
public class ParallelErrorHandling {
static class DataProcessor {
public String process(int data) {
// ランダムにエラーを発生させる
if (data % 7 == 0) {
throw new RuntimeException("Error processing: " + data);
}
try {
Thread.sleep(100); // 処理時間をシミュレート
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Processed: " + data;
}
}
public static void main(String[] args) {
DataProcessor processor = new DataProcessor();
List data = IntStream.range(1, 51)
.boxed()
.collect(Collectors.toList());
System.out.println("Sequential processing with error handling:");
// 逐次処理でのエラーハンドリング
data.stream()
.map(item -> {
try {
return Optional.of(processor.process(item));
} catch (Exception e) {
System.out.println("Caught error: " + e.getMessage());
return Optional.empty();
}
})
.filter(Optional::isPresent)
.map(Optional::get)
.forEach(System.out::println);
System.out.println("\nParallel processing with error handling:");
// 並列処理でのエラーハンドリング
List successfulResults = data.parallelStream()
.map(item -> {
try {
return processor.process(item);
} catch (Exception e) {
System.err.println("Error in parallel: " + e.getMessage() +
" on thread: " + Thread.currentThread().getName());
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
System.out.println("Successful results: " + successfulResults.size());
// CompletableFutureでのエラーハンドリング
System.out.println("\nCompletableFuture with error handling:");
List> futures = data.stream()
.map(item -> CompletableFuture.supplyAsync(() -> processor.process(item))
.exceptionally(throwable -> {
System.err.println("Future error: " + throwable.getMessage());
return "Fallback for error";
}))
.collect(Collectors.toList());
// すべてのFutureの完了を待機
CompletableFuture allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
List futureResults = allFutures.thenApply(v ->
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
).join();
System.out.println("Future results count: " + futureResults.size());
// カスタムコレクターでのエラーハンドリング
System.out.println("\nCustom error handling in stream:");
List customResults = data.parallelStream()
.collect(ArrayList::new, (list, item) -> {
try {
list.add(processor.process(item));
} catch (Exception e) {
System.err.println("Skipped due to error: " + item);
}
}, ArrayList::addAll);
System.out.println("Custom handling results: " + customResults.size());
}
}
import java.util.*;
import java.util.function.*;
import java.util.stream.*;
class Statistics {
private final int count;
private final double sum;
private final double min;
private final double max;
public Statistics(int count, double sum, double min, double max) {
this.count = count;
this.sum = sum;
this.min = min;
this.max = max;
}
public double average() {
return count > 0 ? sum / count : 0.0;
}
@Override
public String toString() {
return String.format("Count: %d, Sum: %.2f, Avg: %.2f, Min: %.2f, Max: %.2f",
count, sum, average(), min, max);
}
}
public class CustomCollector {
// カスタムコレクターの実装
public static Collector statisticsCollector() {
return Collector.of(
// サプライヤー: 可変コンテナの生成
() -> new double[]{0, Double.MAX_VALUE, Double.MIN_VALUE, 0},
// アキュムレーター: 要素の蓄積
(acc, value) -> {
acc[0] += value; // sum
acc[1] = Math.min(acc[1], value); // min
acc[2] = Math.max(acc[2], value); // max
acc[3]++; // count
},
// コンバイナー: 並列処理時の結合
(acc1, acc2) -> {
acc1[0] += acc2[0];
acc1[1] = Math.min(acc1[1], acc2[1]);
acc1[2] = Math.max(acc1[2], acc2[2]);
acc1[3] += acc2[3];
return acc1;
},
// フィニッシャー: 最終結果の変換
acc -> new Statistics(
(int)acc[3], acc[0], acc[1], acc[2]
)
);
}
// 文字列統計用コレクター
public static Collector stringSummaryCollector() {
return Collector.of(
StringBuilder::new,
StringBuilder::append,
StringBuilder::append,
StringBuilder::toString
);
}
// グループ化された統計コレクター
public static Collector>> partitioningByCustom(
Predicate predicate) {
return Collector.of(
() -> {
Map> map = new HashMap<>();
map.put(true, new ArrayList<>());
map.put(false, new ArrayList<>());
return map;
},
(map, element) -> {
boolean key = predicate.test(element);
map.get(key).add(element);
},
(map1, map2) -> {
map1.get(true).addAll(map2.get(true));
map1.get(false).addAll(map2.get(false));
return map1;
}
);
}
public static void main(String[] args) {
List numbers = Arrays.asList(1.0, 2.5, 3.7, 4.2, 5.8, 6.1, 7.9);
// カスタム統計コレクターの使用
Statistics stats = numbers.stream()
.collect(statisticsCollector());
System.out.println("Custom Statistics: " + stats);
// 組み込みの統計コレクターと比較
DoubleSummaryStatistics builtInStats = numbers.stream()
.collect(Collectors.summarizingDouble(Double::doubleValue));
System.out.println("Built-in Statistics: " + builtInStats);
// 文字列統計コレクター
String concatenated = Arrays.asList("A", "B", "C", "D").stream()
.collect(stringSummaryCollector());
System.out.println("Concatenated: " + concatenated);
// カスタム partitioningBy
Map> partitioned = numbers.stream()
.collect(partitioningByCustom(n -> n > 4.0));
System.out.println("Partitioned: " + partitioned);
// 複雑なカスタムコレクター: 重み付き平均
class WeightedValue {
double value;
double weight;
WeightedValue(double value, double weight) {
this.value = value;
this.weight = weight;
}
}
List weightedValues = Arrays.asList(
new WeightedValue(10.0, 1.0),
new WeightedValue(20.0, 2.0),
new WeightedValue(30.0, 3.0)
);
Collector weightedAverageCollector =
Collector.of(
() -> new double[2], // [weightedSum, totalWeight]
(acc, wv) -> {
acc[0] += wv.value * wv.weight;
acc[1] += wv.weight;
},
(acc1, acc2) -> {
acc1[0] += acc2[0];
acc1[1] += acc2[1];
return acc1;
},
acc -> acc[1] > 0 ? acc[0] / acc[1] : 0.0
);
double weightedAvg = weightedValues.stream()
.collect(weightedAverageCollector);
System.out.println("Weighted Average: " + weightedAvg);
}
}
import java.util.concurrent.*;
import java.util.*;
import java.util.function.*;
public class AsyncPipeline {
// シミュレートされたサービス
static class UserService {
CompletableFuture fetchUser(int userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(100);
return "User-" + userId;
});
}
}
static class OrderService {
CompletableFuture> fetchOrders(String username) {
return CompletableFuture.supplyAsync(() -> {
sleep(200);
return Arrays.asList("Order1", "Order2", "Order3");
});
}
}
static class PaymentService {
CompletableFuture calculateTotal(List orders) {
return CompletableFuture.supplyAsync(() -> {
sleep(150);
return orders.size() * 25.0;
});
}
}
static class NotificationService {
CompletableFuture sendReceipt(String username, double amount) {
return CompletableFuture.supplyAsync(() -> {
sleep(100);
return "Receipt sent to " + username + " for $" + amount;
});
}
}
private static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static void main(String[] args) throws Exception {
UserService userService = new UserService();
OrderService orderService = new OrderService();
PaymentService paymentService = new PaymentService();
NotificationService notificationService = new NotificationService();
System.out.println("Starting async pipeline...");
// 複雑な非同期パイプライン
CompletableFuture pipeline = userService.fetchUser(123)
.thenCompose(username -> {
System.out.println("Fetched user: " + username);
return orderService.fetchOrders(username)
.thenCompose(orders -> {
System.out.println("Fetched orders: " + orders);
return paymentService.calculateTotal(orders)
.thenCompose(amount -> {
System.out.println("Calculated total: $" + amount);
return notificationService.sendReceipt(username, amount);
});
});
})
.thenAccept(receipt -> {
System.out.println("Pipeline completed: " + receipt);
})
.exceptionally(throwable -> {
System.err.println("Pipeline failed: " + throwable.getMessage());
return null;
});
// 並列実行と結合
System.out.println("\n--- Parallel Execution ---");
CompletableFuture userFuture = userService.fetchUser(456);
CompletableFuture> productFuture = CompletableFuture.supplyAsync(() -> {
sleep(150);
return Arrays.asList("ProductA", "ProductB");
});
CompletableFuture combinedResult = userFuture
.thenCombine(productFuture, (user, products) ->
user + " bought " + products)
.thenApply(String::toUpperCase);
combinedResult.thenAccept(System.out::println);
// 複数パイプラインの管理
System.out.println("\n--- Multiple Pipelines ---");
List> pipelines = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
final int userId = i;
CompletableFuture userPipeline = userService.fetchUser(userId)
.thenCompose(orderService::fetchOrders)
.thenCompose(paymentService::calculateTotal)
.thenCompose(amount ->
notificationService.sendReceipt("User-" + userId, amount))
.exceptionally(ex -> "Failed for user " + userId + ": " + ex.getMessage());
pipelines.add(userPipeline);
}
// すべてのパイプラインの完了を待機
CompletableFuture allPipelines = CompletableFuture.allOf(
pipelines.toArray(new CompletableFuture[0])
);
allPipelines.thenRun(() -> {
System.out.println("\nAll pipelines completed:");
pipelines.forEach(future -> {
try {
System.out.println("Result: " + future.get());
} catch (Exception e) {
e.printStackTrace();
}
});
});
// タイムアウト処理
System.out.println("\n--- Timeout Handling ---");
CompletableFuture slowService = CompletableFuture.supplyAsync(() -> {
sleep(5000); // 5秒かかるサービス
return "Slow result";
});
CompletableFuture timeoutFuture = slowService
.orTimeout(2, TimeUnit.SECONDS)
.exceptionally(ex -> "Timeout: " + ex.getMessage());
timeoutFuture.thenAccept(result ->
System.out.println("Timeout result: " + result));
// 完了を待機
pipeline.get();
combinedResult.get();
allPipelines.get();
timeoutFuture.get();
System.out.println("All async operations completed");
}
}
import java.util.*;
import java.util.concurrent.*;
import java.util.function.*;
import java.util.stream.*;
// シンプルなリアクティブストリームインターフェース
interface ReactiveStream {
void subscribe(Consumer onNext, Consumer onError, Runnable onComplete);
ReactiveStream filter(Predicate predicate);
ReactiveStream map(Function mapper);
ReactiveStream take(int n);
}
// リアクティブストリームの実装
class SimpleReactiveStream implements ReactiveStream {
private final List> subscribers = new ArrayList<>();
private final List> errorHandlers = new ArrayList<>();
private final List completionHandlers = new ArrayList<>();
public static SimpleReactiveStream fromIterable(Iterable source) {
SimpleReactiveStream stream = new SimpleReactiveStream<>();
// 別スレッドでデータを発行
CompletableFuture.runAsync(() -> {
try {
for (T item : source) {
stream.publish(item);
}
stream.complete();
} catch (Exception e) {
stream.error(e);
}
});
return stream;
}
public static SimpleReactiveStream fromStream(Stream source) {
return fromIterable(source.collect(Collectors.toList()));
}
private void publish(T item) {
subscribers.forEach(subscriber -> {
try {
subscriber.accept(item);
} catch (Exception e) {
errorHandlers.forEach(handler -> handler.accept(e));
}
});
}
private void error(Throwable throwable) {
errorHandlers.forEach(handler -> handler.accept(throwable));
}
private void complete() {
completionHandlers.forEach(Runnable::run);
}
@Override
public void subscribe(Consumer onNext, Consumer onError, Runnable onComplete) {
subscribers.add(onNext);
errorHandlers.add(onError);
completionHandlers.add(onComplete);
}
@Override
public ReactiveStream filter(Predicate predicate) {
SimpleReactiveStream newStream = new SimpleReactiveStream<>();
this.subscribe(
item -> {
if (predicate.test(item)) {
newStream.publish(item);
}
},
newStream::error,
newStream::complete
);
return newStream;
}
@Override
public ReactiveStream map(Function mapper) {
SimpleReactiveStream newStream = new SimpleReactiveStream<>();
this.subscribe(
item -> {
try {
R mapped = mapper.apply(item);
newStream.publish(mapped);
} catch (Exception e) {
newStream.error(e);
}
},
newStream::error,
newStream::complete
);
return newStream;
}
@Override
public ReactiveStream take(int n) {
SimpleReactiveStream newStream = new SimpleReactiveStream<>();
AtomicInteger count = new AtomicInteger(0);
this.subscribe(
item -> {
if (count.getAndIncrement() < n) {
newStream.publish(item);
}
},
newStream::error,
newStream::complete
);
return newStream;
}
}
public class ReactiveStreamDemo {
public static void main(String[] args) throws InterruptedException {
System.out.println("=== Simple Reactive Stream Demo ===");
// データソースからリアクティブストリームを作成
List numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
ReactiveStream numberStream = SimpleReactiveStream.fromIterable(numbers);
// 基本的な購読
System.out.println("Basic subscription:");
numberStream.subscribe(
item -> System.out.println("Received: " + item),
error -> System.err.println("Error: " + error),
() -> System.out.println("Completed!")
);
Thread.sleep(1000);
// 操作のチェーン
System.out.println("\nChained operations:");
SimpleReactiveStream.fromIterable(numbers)
.filter(n -> n % 2 == 0)
.map(n -> n * 10)
.take(3)
.subscribe(
item -> System.out.println("Processed: " + item),
error -> System.err.println("Error: " + error),
() -> System.out.println("Processing completed!")
);
Thread.sleep(1000);
// エラーハンドリング
System.out.println("\nError handling:");
List mixedData = Arrays.asList("1", "2", "abc", "4", "5");
SimpleReactiveStream.fromIterable(mixedData)
.map(Integer::parseInt) // ここでエラーが発生する
.subscribe(
item -> System.out.println("Parsed: " + item),
error -> System.err.println("Parse error: " + error.getMessage()),
() -> System.out.println("Parsing completed")
);
Thread.sleep(1000);
// 無限ストリームのシミュレーション
System.out.println("\nInfinite stream simulation:");
ReactiveStream infiniteStream = new SimpleReactiveStream<>();
// 別スレッドでデータを生成
CompletableFuture.runAsync(() -> {
long i = 0;
while (i < 10) { // 有限で止める
((SimpleReactiveStream)infiniteStream).publish(i++);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
((SimpleReactiveStream)infiniteStream).complete();
});
infiniteStream
.filter(n -> n % 2 == 0)
.take(3)
.subscribe(
item -> System.out.println("Infinite item: " + item),
error -> System.err.println("Infinite error: " + error),
() -> System.out.println("Infinite completed")
);
Thread.sleep(2000);
// バックプレッシャーの簡単な実装
System.out.println("\nBackpressure simulation:");
SimpleReactiveStream fastStream = SimpleReactiveStream.fromIterable(
IntStream.range(0, 100).boxed().collect(Collectors.toList())
);
fastStream.subscribe(
item -> {
System.out.println("Slow processing: " + item);
try {
Thread.sleep(10); // 遅い消費者
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
},
error -> System.err.println("Error: " + error),
() -> System.out.println("Backpressure completed")
);
Thread.sleep(2000);
}
}
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
public class PerformanceComparison {
private static final int DATA_SIZE = 10_000_000;
private static final int WARMUP_ITERATIONS = 5;
private static final int MEASURE_ITERATIONS = 10;
public static void main(String[] args) {
System.out.println("Performance Comparison: Traditional vs Stream vs Parallel Stream");
System.out.println("Data size: " + DATA_SIZE);
System.out.println();
// テストデータの準備
List data = new ArrayList<>(DATA_SIZE);
Random random = new Random(42); // 再現性のためにシード固定
for (int i = 0; i < DATA_SIZE; i++) {
data.add(random.nextInt(1000));
}
// ウォームアップ
System.out.println("Warming up...");
for (int i = 0; i < WARMUP_ITERATIONS; i++) {
traditionalSum(data);
streamSum(data);
parallelStreamSum(data);
}
// パフォーマンス計測
System.out.println("Measuring performance...");
long traditionalTime = measureTime(() -> traditionalSum(data));
long streamTime = measureTime(() -> streamSum(data));
long parallelTime = measureTime(() -> parallelStreamSum(data));
// 結果表示
System.out.println("\n=== RESULTS ===");
System.out.printf("Traditional for-loop: %d ms\n", traditionalTime);
System.out.printf("Stream API: %d ms\n", streamTime);
System.out.printf("Parallel Stream: %d ms\n", parallelTime);
System.out.println("\n=== COMPARISON ===");
System.out.printf("Stream vs Traditional: %.2fx %s\n",
(double)traditionalTime/streamTime,
traditionalTime > streamTime ? "slower" : "faster");
System.out.printf("Parallel vs Traditional: %.2fx %s\n",
(double)traditionalTime/parallelTime,
traditionalTime > parallelTime ? "slower" : "faster");
System.out.printf("Parallel vs Stream: %.2fx %s\n",
(double)streamTime/parallelTime,
streamTime > parallelTime ? "slower" : "faster");
// メモリ使用量の比較
System.out.println("\n=== MEMORY USAGE ===");
compareMemoryUsage(data);
// 様々な操作での比較
System.out.println("\n=== DIFFERENT OPERATIONS ===");
compareDifferentOperations(data);
}
private static long traditionalSum(List data) {
long sum = 0;
for (int value : data) {
if (value % 2 == 0) {
sum += value * value;
}
}
return sum;
}
private static long streamSum(List data) {
return data.stream()
.filter(n -> n % 2 == 0)
.mapToLong(n -> (long)n * n)
.sum();
}
private static long parallelStreamSum(List data) {
return data.parallelStream()
.filter(n -> n % 2 == 0)
.mapToLong(n -> (long)n * n)
.sum();
}
private static long measureTime(Runnable operation) {
long totalTime = 0;
for (int i = 0; i < MEASURE_ITERATIONS; i++) {
long startTime = System.currentTimeMillis();
operation.run();
long endTime = System.currentTimeMillis();
totalTime += (endTime - startTime);
}
return totalTime / MEASURE_ITERATIONS;
}
private static void compareMemoryUsage(List data) {
Runtime runtime = Runtime.getRuntime();
// ガベージコレクション
System.gc();
long memoryBefore = runtime.totalMemory() - runtime.freeMemory();
long result1 = traditionalSum(data);
long memoryAfterTraditional = runtime.totalMemory() - runtime.freeMemory();
System.gc();
memoryBefore = runtime.totalMemory() - runtime.freeMemory();
long result2 = streamSum(data);
long memoryAfterStream = runtime.totalMemory() - runtime.freeMemory();
System.gc();
memoryBefore = runtime.totalMemory() - runtime.freeMemory();
long result3 = parallelStreamSum(data);
long memoryAfterParallel = runtime.totalMemory() - runtime.freeMemory();
System.out.printf("Traditional memory delta: %d bytes\n",
memoryAfterTraditional - memoryBefore);
System.out.printf("Stream memory delta: %d bytes\n",
memoryAfterStream - memoryBefore);
System.out.printf("Parallel memory delta: %d bytes\n",
memoryAfterParallel - memoryBefore);
// 結果の検証
System.out.printf("Results consistent: %b\n",
result1 == result2 && result2 == result3);
}
private static void compareDifferentOperations(List data) {
System.out.println("1. Filter + Map + Sum:");
long time1 = measureTime(() -> traditionalSum(data));
long time2 = measureTime(() -> streamSum(data));
long time3 = measureTime(() -> parallelStreamSum(data));
printComparison("FilterMapSum", time1, time2, time3);
System.out.println("2. Simple Sum:");
long time4 = measureTime(() -> {
long sum = 0;
for (int n : data) sum += n;
});
long time5 = measureTime(() -> data.stream().mapToInt(Integer::intValue).sum());
long time6 = measureTime(() -> data.parallelStream().mapToInt(Integer::intValue).sum());
printComparison("SimpleSum", time4, time5, time6);
System.out.println("3. Complex Transformation:");
long time7 = measureTime(() -> {
List result = new ArrayList<>();
for (int n : data) {
if (n > 500) {
result.add(String.valueOf(n * 2));
}
}
});
long time8 = measureTime(() -> {
List result = data.stream()
.filter(n -> n > 500)
.map(n -> String.valueOf(n * 2))
.collect(Collectors.toList());
});
long time9 = measureTime(() -> {
List result = data.parallelStream()
.filter(n -> n > 500)
.map(n -> String.valueOf(n * 2))
.collect(Collectors.toList());
});
printComparison("ComplexTransform", time7, time8, time9);
}
private static void printComparison(String operation, long traditional, long stream, long parallel) {
System.out.printf(" Traditional: %dms, Stream: %dms, Parallel: %dms\n",
traditional, stream, parallel);
System.out.printf(" Best: %s\n",
traditional < stream && traditional < parallel ? "Traditional" :
stream < parallel ? "Stream" : "Parallel");
}
}
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
import java.nio.file.*;
import java.io.*;
public class LargeDataProcessingSystem {
// データ処理ジョブ
static class ProcessingJob {
private final String jobId;
private final List inputFiles;
private final String outputDir;
public ProcessingJob(String jobId, List inputFiles, String outputDir) {
this.jobId = jobId;
this.inputFiles = inputFiles;
this.outputDir = outputDir;
}
public String getJobId() { return jobId; }
public List getInputFiles() { return inputFiles; }
public String getOutputDir() { return outputDir; }
}
// データプロセッサ
static class DataProcessor {
private final ExecutorService executor;
private final int batchSize;
public DataProcessor(int threadCount, int batchSize) {
this.executor = Executors.newFixedThreadPool(threadCount);
this.batchSize = batchSize;
}
public CompletableFuture processJob(ProcessingJob job) {
System.out.println("Starting job: " + job.getJobId());
// 各ファイルを並列処理
List> fileFutures = job.getInputFiles().stream()
.map(file -> processFile(file, job.getOutputDir()))
.collect(Collectors.toList());
// すべてのファイル処理が完了したらジョブ完了
return CompletableFuture.allOf(
fileFutures.toArray(new CompletableFuture[0])
).thenRun(() ->
System.out.println("Completed job: " + job.getJobId())
);
}
private CompletableFuture processFile(Path inputFile, String outputDir) {
return CompletableFuture.supplyAsync(() -> {
try {
System.out.println("Processing file: " + inputFile +
" on " + Thread.currentThread().getName());
// ファイルをバッチで読み込み
List>> batchFutures =
readFileInBatches(inputFile, batchSize);
// すべてのバッチの処理を待機
List> allBatches = batchFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
// 結果を結合して出力
List allResults = allBatches.stream()
.flatMap(List::stream)
.collect(Collectors.toList());
// 出力ファイルに書き込み
Path outputFile = Paths.get(outputDir,
inputFile.getFileName() + ".processed");
Files.write(outputFile, allResults);
return null;
} catch (Exception e) {
throw new RuntimeException("Failed to process file: " + inputFile, e);
}
}, executor);
}
private List>> readFileInBatches(Path file, int batchSize) {
try {
List allLines = Files.readAllLines(file);
// 行をバッチに分割
List> batches = new ArrayList<>();
for (int i = 0; i < allLines.size(); i += batchSize) {
int end = Math.min(i + batchSize, allLines.size());
batches.add(allLines.subList(i, end));
}
// 各バッチを並列処理
return batches.stream()
.map(batch -> processBatchAsync(batch))
.collect(Collectors.toList());
} catch (IOException e) {
throw new RuntimeException("Failed to read file: " + file, e);
}
}
private CompletableFuture> processBatchAsync(List batch) {
return CompletableFuture.supplyAsync(() -> {
// バッチ処理のシミュレーション
return batch.stream()
.map(line -> line.toUpperCase()) // 単純な変換
.filter(line -> !line.isEmpty())
.collect(Collectors.toList());
}, executor);
}
public void shutdown() {
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
// ジョブマネージャー
static class JobManager {
private final DataProcessor processor;
private final Map> activeJobs;
public JobManager(int threadCount, int batchSize) {
this.processor = new DataProcessor(threadCount, batchSize);
this.activeJobs = new ConcurrentHashMap<>();
}
public CompletableFuture submitJob(ProcessingJob job) {
CompletableFuture jobFuture = processor.processJob(job)
.exceptionally(throwable -> {
System.err.println("Job failed: " + job.getJobId() + " - " + throwable.getMessage());
return null;
});
activeJobs.put(job.getJobId(), jobFuture);
// 完了時にアクティブジョブから削除
jobFuture.thenRun(() -> activeJobs.remove(job.getJobId()));
return jobFuture;
}
public void waitForCompletion() {
// すべてのアクティブジョブの完了を待機
CompletableFuture.allOf(
activeJobs.values().toArray(new CompletableFuture[0])
).join();
}
public void shutdown() {
processor.shutdown();
}
public int getActiveJobCount() {
return activeJobs.size();
}
}
// 使用例
public static void main(String[] args) throws Exception {
System.out.println("Large Data Processing System Demo");
// テストデータの作成
createTestData();
// ジョブマネージャーの初期化
JobManager jobManager = new JobManager(
Runtime.getRuntime().availableProcessors(), // スレッド数
1000 // バッチサイズ
);
// 処理ジョブの作成
List inputFiles = Files.list(Paths.get("input"))
.collect(Collectors.toList());
ProcessingJob job1 = new ProcessingJob("JOB-001", inputFiles, "output");
// ジョブの実行
System.out.println("Submitting job...");
CompletableFuture jobFuture = jobManager.submitJob(job1);
// 進捗監視
CompletableFuture.runAsync(() -> {
while (!jobFuture.isDone()) {
System.out.println("Active jobs: " + jobManager.getActiveJobCount());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
// 完了待機
jobFuture.get(60, TimeUnit.SECONDS);
jobManager.waitForCompletion();
System.out.println("All jobs completed successfully");
// クリーンアップ
jobManager.shutdown();
// 結果の検証
verifyResults();
}
private static void createTestData() throws IOException {
Files.createDirectories(Paths.get("input"));
Files.createDirectories(Paths.get("output"));
Random random = new Random(42);
// テストファイルを5つ作成
for (int i = 1; i <= 5; i++) {
Path file = Paths.get("input", "data_" + i + ".txt");
List lines = new ArrayList<>();
// 各ファイルに10,000行のデータ
for (int j = 0; j < 10000; j++) {
lines.add("Line_" + j + "_Value_" + random.nextInt(1000));
}
Files.write(file, lines);
System.out.println("Created test file: " + file + " with " + lines.size() + " lines");
}
}
private static void verifyResults() throws IOException {
long totalProcessedLines = Files.list(Paths.get("output"))
.mapToLong(file -> {
try {
return Files.readAllLines(file).size();
} catch (IOException e) {
return 0;
}
})
.sum();
long totalInputLines = Files.list(Paths.get("input"))
.mapToLong(file -> {
try {
return Files.readAllLines(file).size();
} catch (IOException e) {
return 0;
}
})
.sum();
System.out.println("Verification:");
System.out.println("Total input lines: " + totalInputLines);
System.out.println("Total processed lines: " + totalProcessedLines);
System.out.println("Processing successful: " + (totalInputLines == totalProcessedLines));
}
}