Java 無名関数・ラムダ式・Stream API・Thread 演習問題 30問

2025-08-04

初級問題 (9問)

無名関数・ラムダ式基礎

  1. ラムダ式の基本構文
    Runnableインターフェースをラムダ式で実装し、”Hello Lambda”を表示するプログラムを作成してください。
  2. Comparatorのラムダ式
    StringのListをラムダ式を使用して文字列の長さでソートするプログラムを作成してください。
  3. 無名クラスとラムダ式の比較
    Buttonクリックイベントを無名クラスとラムダ式の両方で実装し、違いを説明してください。

Stream API基礎

  1. Streamの基本操作
    IntegerのListからStreamを作成し、各要素を2倍にして新しいListとして收集するプログラムを作成してください。
  2. filterとcollectの使用
    StringのListから5文字以上の文字列だけを抽出するプログラムをStream APIで作成してください。
  3. mapの使用
    PersonオブジェクトのListから名前だけを抽出して新しいListを作成するプログラムをStream APIで作成してください。

Thread基礎

  1. Threadの基本実装
    Threadクラスを継承する方法とRunnableインターフェースを実装する方法の両方で、1から5までを表示するプログラムを作成してください。
  2. ラムダ式でのRunnable実装
    Runnableインターフェースをラムダ式で実装し、3つのスレッドで同時にメッセージを表示するプログラムを作成してください。
  3. スレッドの基本制御
    メインスレッドと別スレッドで交互にメッセージを表示するプログラムを作成してください。

中級問題 (15問)

ラムダ式応用

  1. 関数型インターフェースの作成
    独自の関数型インターフェースStringProcessorを作成し、文字列処理を行うラムダ式を実装してください。
  2. メソッド参照の使用
    既存の静的メソッド、インスタンスメソッド、コンストラクタをメソッド参照で使用するプログラムを作成してください。
  3. ラムダ式の変数キャプチャ
    ラムダ式で外部の変数をキャプチャする例と、効果的なfinal変数の概念を説明するプログラムを作成してください。

Stream API応用

  1. flatMapの使用
    複数のListをflatMapを使用して1つのStreamに変換し、処理するプログラムを作成してください。
  2. reduceによる集約処理
    Streamのreduce操作を使用して、数値リストの合計、最大値、最小値を求めるプログラムを作成してください。
  3. グループ化と集計
    PersonオブジェクトのListを年齢でグループ化し、各グループの人数を集計するプログラムを作成してください。
  4. 並列ストリームの使用
    大きなデータセットに対して並列ストリームを使用して処理時間を比較するプログラムを作成してください。
  5. Streamの遅延評価
    Streamの中間操作が遅延評価されることを確認するプログラムを作成してください。
  6. Optionalの使用
    Stream操作でOptionalを使用してnull安全な処理を行うプログラムを作成してください。

Thread応用

  1. synchronizedの使用
    複数スレッドから共有リソースにアクセスする際の同期処理をsynchronizedで実装してください。
  2. Wait/Notifyの使用
    生産者-消費者パターンをwait/notifyを使用して実装してください。
  3. ExecutorServiceの使用
    ExecutorServiceを使用してスレッドプールを管理するプログラムを作成してください。
  4. CallableとFuture
    CallableインターフェースとFutureを使用して、戻り値のあるタスクを実行するプログラムを作成してください。
  5. CompletableFutureの基本
    CompletableFutureを使用して非同期処理をチェーンするプログラムを作成してください。

総合問題

  1. Streamとラムダ式の組み合わせ
    従業員データを処理し、部門別の平均給与を求めるプログラムをStreamとラムダ式で作成してください。
  2. 並列処理のエラーハンドリング
    並列ストリーム処理での例外処理を実装するプログラムを作成してください。

上級問題 (6問)

  1. カスタムコレクターの作成
    Stream APIで使用するためのカスタムコレクターを作成し、複雑な集計処理を実装してください。
  2. 非同期処理パイプライン
    CompletableFutureを使用して複数の非同期処理をパイプライン化し、依存関係を管理するプログラムを作成してください。
  3. リアクティブストリームの模倣
    Stream APIを使用して簡単なリアクティブプログラミングパターンを実装してください。
  4. パフォーマンス比較
    伝統的なforループ、Stream API、並列Streamのパフォーマンスを比較するベンチマークプログラムを作成してください。
  5. 大規模データ処理システムの設計
    マルチスレッド環境で大規模なデータを処理するシステムを設計し、Stream APIとExecutorServiceを組み合わせて実装してください。

演習問題 解答例

初級問題 解答例

1. ラムダ式の基本構文

public class LambdaBasic {
    public static void main(String[] args) {
        // 従来の無名クラス
        Runnable traditionalRunnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("Hello Anonymous Class");
            }
        };

        // ラムダ式
        Runnable lambdaRunnable = () -> System.out.println("Hello Lambda");

        traditionalRunnable.run();
        lambdaRunnable.run();
    }
}

2. Comparatorのラムダ式

import java.util.*;

public class LambdaComparator {
    public static void main(String[] args) {
        List names = Arrays.asList("John", "Alice", "Bob", "Charlie");

        // 従来のComparator
        Collections.sort(names, new Comparator() {
            @Override
            public int compare(String s1, String s2) {
                return Integer.compare(s1.length(), s2.length());
            }
        });

        // ラムダ式でのComparator
        names.sort((s1, s2) -> Integer.compare(s1.length(), s2.length()));

        // メソッド参照
        names.sort(Comparator.comparingInt(String::length));

        System.out.println(names);
    }
}

3. 無名クラスとラムダ式の比較

// シンプルなイベントリスナーインターフェース
interface ClickListener {
    void onClick(String message);
}

public class AnonymousVsLambda {
    public static void main(String[] args) {
        // 無名クラスでの実装
        ClickListener anonymousListener = new ClickListener() {
            @Override
            public void onClick(String message) {
                System.out.println("Anonymous: " + message);
            }
        };

        // ラムダ式での実装
        ClickListener lambdaListener = (message) -> 
            System.out.println("Lambda: " + message);

        anonymousListener.onClick("Button clicked!");
        lambdaListener.onClick("Button clicked!");
    }
}

4. Streamの基本操作

import java.util.*;
import java.util.stream.*;

public class StreamBasic {
    public static void main(String[] args) {
        List numbers = Arrays.asList(1, 2, 3, 4, 5);

        // Streamを使用して各要素を2倍
        List doubled = numbers.stream()
            .map(n -> n * 2)
            .collect(Collectors.toList());

        System.out.println("Original: " + numbers);
        System.out.println("Doubled: " + doubled);

        // メソッド参照を使用
        List tripled = numbers.stream()
            .map(StreamBasic::triple)  // 静的メソッド参照
            .collect(Collectors.toList());

        System.out.println("Tripled: " + tripled);
    }

    private static int triple(int n) {
        return n * 3;
    }
}

5. filterとcollectの使用

import java.util.*;
import java.util.stream.*;

public class StreamFilter {
    public static void main(String[] args) {
        List words = Arrays.asList("cat", "elephant", "dog", "butterfly", "ant");

        // 5文字以上の単語を抽出
        List longWords = words.stream()
            .filter(word -> word.length() >= 5)
            .collect(Collectors.toList());

        System.out.println("All words: " + words);
        System.out.println("Long words: " + longWords);

        // 大文字に変換して收集
        List upperCaseWords = words.stream()
            .filter(word -> word.length() >= 5)
            .map(String::toUpperCase)
            .collect(Collectors.toList());

        System.out.println("Upper case long words: " + upperCaseWords);
    }
}

6. mapの使用

import java.util.*;
import java.util.stream.*;

class Person {
    private String name;
    private int age;

    public Person(String name, int age) {
        this.name = name;
        this.age = age;
    }

    public String getName() { return name; }
    public int getAge() { return age; }
}

public class StreamMap {
    public static void main(String[] args) {
        List people = Arrays.asList(
            new Person("Alice", 25),
            new Person("Bob", 30),
            new Person("Charlie", 35)
        );

        // 名前だけを抽出
        List names = people.stream()
            .map(Person::getName)
            .collect(Collectors.toList());

        System.out.println("Names: " + names);

        // 年齢だけを抽出
        List ages = people.stream()
            .map(Person::getAge)
            .collect(Collectors.toList());

        System.out.println("Ages: " + ages);
    }
}

7. Threadの基本実装

public class ThreadBasic {
    // Threadクラスを継承する方法
    static class MyThread extends Thread {
        @Override
        public void run() {
            for (int i = 1; i <= 5; i++) {
                System.out.println("Thread extends: " + i);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    // Runnableインターフェースを実装する方法
    static class MyRunnable implements Runnable {
        @Override
        public void run() {
            for (int i = 1; i <= 5; i++) {
                System.out.println("Runnable: " + i);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        // Threadクラス継承
        MyThread thread1 = new MyThread();
        thread1.start();

        // Runnableインターフェース実装
        Thread thread2 = new Thread(new MyRunnable());
        thread2.start();

        // メインスレッドも実行
        for (int i = 1; i <= 5; i++) {
            System.out.println("Main thread: " + i);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

8. ラムダ式でのRunnable実装

public class LambdaThread {
    public static void main(String[] args) {
        // ラムダ式でRunnableを実装
        Thread thread1 = new Thread(() -> {
            for (int i = 1; i <= 3; i++) {
                System.out.println("Thread 1 - Count: " + i);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        Thread thread2 = new Thread(() -> {
            for (int i = 1; i <= 3; i++) {
                System.out.println("Thread 2 - Count: " + i);
                try {
                    Thread.sleep(800);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        Thread thread3 = new Thread(() -> {
            for (int i = 1; i <= 3; i++) {
                System.out.println("Thread 3 - Count: " + i);
                try {
                    Thread.sleep(600);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        thread1.start();
        thread2.start();
        thread3.start();
    }
}

9. スレッドの基本制御

public class ThreadCoordination {
    private static boolean mainTurn = true;

    public static void main(String[] args) {
        Object lock = new Object();

        Thread workerThread = new Thread(() -> {
            for (int i = 1; i <= 5; i++) {
                synchronized (lock) {
                    while (mainTurn) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("Worker: Message " + i);
                    mainTurn = true;
                    lock.notify();
                }
            }
        });

        workerThread.start();

        for (int i = 1; i <= 5; i++) {
            synchronized (lock) {
                while (!mainTurn) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("Main: Message " + i);
                mainTurn = false;
                lock.notify();
            }
        }
    }
}

中級問題 解答例

10. 関数型インターフェースの作成

@FunctionalInterface
interface StringProcessor {
    String process(String input);

    // デフォルトメソッド
    default StringProcessor andThen(StringProcessor after) {
        return input -> after.process(this.process(input));
    }
}

public class CustomFunctionalInterface {
    public static void main(String[] args) {
        // 大文字に変換
        StringProcessor toUpper = String::toUpperCase;

        // 感嘆符を追加
        StringProcessor addExclamation = s -> s + "!";

        // 複数回繰り返し
        StringProcessor repeatTwice = s -> s + " " + s;

        // チェーン処理
        StringProcessor pipeline = toUpper
            .andThen(addExclamation)
            .andThen(repeatTwice);

        String result = pipeline.process("hello");
        System.out.println("Result: " + result);

        // 様々な処理
        System.out.println(toUpper.process("java"));
        System.out.println(addExclamation.process("wow"));
    }
}

11. メソッド参照の使用

import java.util.*;
import java.util.function.*;

class Calculator {
    public static int square(int x) {
        return x * x;
    }

    public int cube(int x) {
        return x * x * x;
    }
}

public class MethodReferences {
    public static void main(String[] args) {
        List names = Arrays.asList("Alice", "Bob", "Charlie");

        // 1. 静的メソッド参照
        Function squareRef = Calculator::square;
        System.out.println("Square: " + squareRef.apply(5));

        // 2. インスタンスメソッド参照(特定のオブジェクト)
        Calculator calc = new Calculator();
        Function cubeRef = calc::cube;
        System.out.println("Cube: " + cubeRef.apply(3));

        // 3. インスタンスメソッド参照(任意のオブジェクト)
        names.forEach(System.out::println);

        // 4. コンストラクタ参照
        Function stringCreator = String::new;
        String newString = stringCreator.apply("Hello");
        System.out.println("New string: " + newString);

        // コレクションでの使用例
        List numbers = Arrays.asList(1, 2, 3, 4, 5);

        // メソッド参照でマッピング
        List squares = numbers.stream()
            .map(Calculator::square)
            .collect(Collectors.toList());

        System.out.println("Squares: " + squares);
    }
}

12. ラムダ式の変数キャプチャ

public class LambdaVariableCapture {
    private String instanceVariable = "Instance Variable";

    public void demonstrateCapture() {
        String effectivelyFinal = "Effectively Final";
        final String explicitlyFinal = "Explicitly Final";

        // ラムダ式で外部変数をキャプチャ
        Runnable lambda = () -> {
            // インスタンス変数はアクセス可能
            System.out.println(instanceVariable);

            // 実質的final変数はアクセス可能
            System.out.println(effectivelyFinal);

            // 明示的final変数もアクセス可能
            System.out.println(explicitlyFinal);

            // インスタンス変数の変更は可能
            instanceVariable = "Modified Instance Variable";
        };

        // ラムダ実行前
        System.out.println("Before lambda: " + instanceVariable);

        lambda.run();

        // ラムダ実行後
        System.out.println("After lambda: " + instanceVariable);

        // 以下のコードはコンパイルエラー(実質的finalでなくなる)
        // effectivelyFinal = "Modified";
    }

    public static void main(String[] args) {
        new LambdaVariableCapture().demonstrateCapture();
    }
}

13. flatMapの使用

import java.util.*;
import java.util.stream.*;

public class StreamFlatMap {
    public static void main(String[] args) {
        // 複数のリスト
        List> nestedLists = Arrays.asList(
            Arrays.asList("Apple", "Banana"),
            Arrays.asList("Orange", "Grape", "Mango"),
            Arrays.asList("Peach")
        );

        // flatMapで単一のStreamに変換
        List flatList = nestedLists.stream()
            .flatMap(List::stream)
            .collect(Collectors.toList());

        System.out.println("Flat list: " + flatList);

        // 文字列を文字に分解
        List words = Arrays.asList("Hello", "World");
        List characters = words.stream()
            .flatMap(word -> word.chars().mapToObj(c -> (char)c))
            .collect(Collectors.toList());

        System.out.println("Characters: " + characters);

        // 複雑な例:オブジェクトのリストをflatMap
        class Order {
            List items;
            Order(List items) { this.items = items; }
            List getItems() { return items; }
        }

        List orders = Arrays.asList(
            new Order(Arrays.asList("Laptop", "Mouse")),
            new Order(Arrays.asList("Keyboard", "Monitor", "Headphones"))
        );

        List allItems = orders.stream()
            .flatMap(order -> order.getItems().stream())
            .collect(Collectors.toList());

        System.out.println("All items: " + allItems);
    }
}

14. reduceによる集約処理

import java.util.*;
import java.util.stream.*;

public class StreamReduce {
    public static void main(String[] args) {
        List numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        // 合計を求める
        Optional sum = numbers.stream()
            .reduce(Integer::sum);
        System.out.println("Sum: " + sum.orElse(0));

        // 初期値を使用した合計
        Integer sumWithIdentity = numbers.stream()
            .reduce(0, Integer::sum);
        System.out.println("Sum with identity: " + sumWithIdentity);

        // 最大値を求める
        Optional max = numbers.stream()
            .reduce(Integer::max);
        System.out.println("Max: " + max.orElse(0));

        // 最小値を求める
        Optional min = numbers.stream()
            .reduce(Integer::min);
        System.out.println("Min: " + min.orElse(0));

        // 文字列の結合
        List words = Arrays.asList("Hello", "World", "Java", "Stream");
        String combined = words.stream()
            .reduce("", (s1, s2) -> s1 + " " + s2)
            .trim();
        System.out.println("Combined: " + combined);

        // 複雑なreduce:平均を計算
        double average = numbers.stream()
            .mapToInt(Integer::intValue)
            .average()
            .orElse(0.0);
        System.out.println("Average: " + average);
    }
}

15. グループ化と集計

import java.util.*;
import java.util.stream.*;

class Employee {
    private String name;
    private String department;
    private int salary;
    private int age;

    public Employee(String name, String department, int salary, int age) {
        this.name = name;
        this.department = department;
        this.salary = salary;
        this.age = age;
    }

    // getters
    public String getName() { return name; }
    public String getDepartment() { return department; }
    public int getSalary() { return salary; }
    public int getAge() { return age; }

    @Override
    public String toString() { return name + "(" + department + ")"; }
}

public class StreamGrouping {
    public static void main(String[] args) {
        List employees = Arrays.asList(
            new Employee("Alice", "IT", 5000, 25),
            new Employee("Bob", "HR", 4000, 30),
            new Employee("Charlie", "IT", 6000, 35),
            new Employee("Diana", "HR", 4500, 28),
            new Employee("Eve", "Finance", 5500, 32),
            new Employee("Frank", "IT", 5200, 29)
        );

        // 部門別グループ化
        Map> byDepartment = employees.stream()
            .collect(Collectors.groupingBy(Employee::getDepartment));

        System.out.println("By Department:");
        byDepartment.forEach((dept, emps) -> 
            System.out.println(dept + ": " + emps));

        // 部門別の人数集計
        Map countByDept = employees.stream()
            .collect(Collectors.groupingBy(
                Employee::getDepartment, 
                Collectors.counting()
            ));

        System.out.println("\nCount by Department:");
        countByDept.forEach((dept, count) -> 
            System.out.println(dept + ": " + count));

        // 部門別の平均給与
        Map avgSalaryByDept = employees.stream()
            .collect(Collectors.groupingBy(
                Employee::getDepartment,
                Collectors.averagingInt(Employee::getSalary)
            ));

        System.out.println("\nAverage Salary by Department:");
        avgSalaryByDept.forEach((dept, avg) -> 
            System.out.printf("%s: %.2f\n", dept, avg));

        // 年齢層別グループ化
        Map> byAgeGroup = employees.stream()
            .collect(Collectors.groupingBy(emp -> {
                if (emp.getAge() < 30) return "Under 30";
                else if (emp.getAge() < 40) return "30-39";
                else return "40 and above";
            }));

        System.out.println("\nBy Age Group:");
        byAgeGroup.forEach((group, emps) -> 
            System.out.println(group + ": " + emps));
    }
}

16. 並列ストリームの使用

import java.util.*;
import java.util.stream.*;
import java.util.concurrent.*;

public class ParallelStreamDemo {
    public static void main(String[] args) {
        // 大きなリストを作成
        List numbers = IntStream.range(0, 10_000_000)
            .boxed()
            .collect(Collectors.toList());

        // 逐次処理の時間計測
        long startTime = System.currentTimeMillis();
        long sequentialSum = numbers.stream()
            .mapToLong(Integer::longValue)
            .sum();
        long sequentialTime = System.currentTimeMillis() - startTime;

        // 並列処理の時間計測
        startTime = System.currentTimeMillis();
        long parallelSum = numbers.parallelStream()
            .mapToLong(Integer::longValue)
            .sum();
        long parallelTime = System.currentTimeMillis() - startTime;

        System.out.println("Sequential sum: " + sequentialSum + " in " + sequentialTime + "ms");
        System.out.println("Parallel sum: " + parallelSum + " in " + parallelTime + "ms");
        System.out.println("Speedup: " + (double)sequentialTime/parallelTime + "x");

        // 並列ストリームのスレッド使用状況を確認
        System.out.println("\nAvailable processors: " + Runtime.getRuntime().availableProcessors());

        // 並列ストリームでスレッド情報を表示
        numbers.parallelStream()
            .limit(20)
            .forEach(n -> {
                System.out.println(Thread.currentThread().getName() + " processing: " + n);
            });

        // 注意点:状態を持つ操作は避ける
        List problematicList = Collections.synchronizedList(new ArrayList<>());
        numbers.parallelStream()
            .filter(n -> n % 2 == 0)
            .forEach(problematicList::add); // スレッドセーフだが非効率

        System.out.println("Filtered count: " + problematicList.size());

        // 正しい方法
        List correctList = numbers.parallelStream()
            .filter(n -> n % 2 == 0)
            .collect(Collectors.toList());

        System.out.println("Correct filtered count: " + correctList.size());
    }
}

17. Streamの遅延評価

import java.util.*;
import java.util.stream.*;

public class LazyEvaluation {
    public static void main(String[] args) {
        List names = Arrays.asList("Alice", "Bob", "Charlie", "David", "Eve");

        System.out.println("Demonstrating Lazy Evaluation:");

        // 中間操作だけ定義(まだ実行されない)
        Stream intermediateStream = names.stream()
            .filter(name -> {
                System.out.println("Filtering: " + name);
                return name.length() > 3;
            })
            .map(name -> {
                System.out.println("Mapping: " + name);
                return name.toUpperCase();
            });

        System.out.println("Intermediate operations defined, but not executed yet.");

        // 終端操作で初めて実行される
        System.out.println("\nNow executing terminal operation:");
        List result = intermediateStream.collect(Collectors.toList());
        System.out.println("Result: " + result);

        // 短絡操作の例
        System.out.println("\nShort-circuit operation example:");
        Optional firstLongName = names.stream()
            .filter(name -> {
                System.out.println("Short-circuit filtering: " + name);
                return name.length() > 4;
            })
            .findFirst();

        System.out.println("First long name: " + firstLongName.orElse("None"));

        // 無限ストリームと遅延評価
        System.out.println("\nInfinite stream example:");
        Stream.iterate(1, n -> n + 1)
            .map(n -> {
                System.out.println("Processing: " + n);
                return n * 2;
            })
            .filter(n -> n > 10)
            .limit(3)
            .forEach(System.out::println);
    }
}

18. Optionalの使用

import java.util.*;
import java.util.stream.*;

class User {
    private String name;
    private String email;

    public User(String name, String email) {
        this.name = name;
        this.email = email;
    }

    public String getName() { return name; }
    public Optional getEmail() { 
        return Optional.ofNullable(email); 
    }
}

public class OptionalStream {
    public static void main(String[] args) {
        List users = Arrays.asList(
            new User("Alice", "alice@example.com"),
            new User("Bob", null), // メールなし
            new User("Charlie", "charlie@example.com"),
            new User("Diana", null)
        );

        // 従来のnullチェック
        System.out.println("Traditional null check:");
        for (User user : users) {
            if (user.getEmail() != null && user.getEmail().isPresent()) {
                System.out.println(user.getName() + ": " + user.getEmail().get());
            }
        }

        // Stream + Optionalでのエレガントな処理
        System.out.println("\nStream with Optional:");
        List validEmails = users.stream()
            .map(User::getEmail)          // OptionalのStream
            .flatMap(Optional::stream)    // 値があるものだけをStreamに変換
            .collect(Collectors.toList());

        System.out.println("Valid emails: " + validEmails);

        // デフォルト値の使用
        System.out.println("\nEmails with defaults:");
        users.stream()
            .map(user -> user.getEmail().orElse("no-email@example.com"))
            .forEach(email -> System.out.println("Email: " + email));

        // 条件付き処理
        System.out.println("\nConditional processing:");
        users.forEach(user -> {
            user.getEmail()
                .ifPresentOrElse(
                    email -> System.out.println(user.getName() + " has email: " + email),
                    () -> System.out.println(user.getName() + " has no email")
                );
        });

        // 複雑なチェーン
        System.out.println("\nComplex chain:");
        Optional firstValidEmail = users.stream()
            .map(User::getEmail)
            .flatMap(Optional::stream)
            .filter(email -> email.contains("example"))
            .findFirst();

        firstValidEmail.ifPresent(email -> 
            System.out.println("First valid email: " + email));
    }
}

19. synchronizedの使用

import java.util.*;

class SharedCounter {
    private int count = 0;

    // synchronizedメソッド
    public synchronized void increment() {
        count++;
    }

    // synchronizedブロック
    public void decrement() {
        synchronized(this) {
            count--;
        }
    }

    public synchronized int getCount() {
        return count;
    }
}

public class SynchronizedExample {
    public static void main(String[] args) throws InterruptedException {
        SharedCounter counter = new SharedCounter();
        List threads = new ArrayList<>();

        // インクリメントスレッドを100個作成
        for (int i = 0; i < 100; i++) {
            Thread thread = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    counter.increment();
                }
            });
            threads.add(thread);
        }

        // すべてのスレッドを開始
        threads.forEach(Thread::start);

        // すべてのスレッドの終了を待機
        for (Thread thread : threads) {
            thread.join();
        }

        System.out.println("Final count: " + counter.getCount());
        System.out.println("Expected: " + (100 * 1000));

        // 異なるオブジェクトでの同期
        Object lock1 = new Object();
        Object lock2 = new Object();

        Thread t1 = new Thread(() -> {
            synchronized(lock1) {
                System.out.println("Thread 1 acquired lock1");
                try { Thread.sleep(100); } catch (InterruptedException e) {}
                synchronized(lock2) {
                    System.out.println("Thread 1 acquired lock2");
                }
            }
        });

        Thread t2 = new Thread(() -> {
            synchronized(lock2) {
                System.out.println("Thread 2 acquired lock2");
                try { Thread.sleep(100); } catch (InterruptedException e) {}
                synchronized(lock1) {
                    System.out.println("Thread 2 acquired lock1");
                }
            }
        });

        t1.start();
        t2.start();

        t1.join();
        t2.join();
    }
}

20. Wait/Notifyの使用

import java.util.*;

class MessageQueue {
    private Queue queue = new LinkedList<>();
    private int capacity;

    public MessageQueue(int capacity) {
        this.capacity = capacity;
    }

    public synchronized void produce(String message) throws InterruptedException {
        while (queue.size() == capacity) {
            System.out.println("Queue full, producer waiting...");
            wait();
        }

        queue.add(message);
        System.out.println("Produced: " + message);
        notifyAll(); // 消費者に通知
    }

    public synchronized String consume() throws InterruptedException {
        while (queue.isEmpty()) {
            System.out.println("Queue empty, consumer waiting...");
            wait();
        }

        String message = queue.poll();
        System.out.println("Consumed: " + message);
        notifyAll(); // 生産者に通知
        return message;
    }
}

public class ProducerConsumer {
    public static void main(String[] args) {
        MessageQueue queue = new MessageQueue(5);

        // 生産者スレッド
        Thread producer = new Thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    queue.produce("Message " + i);
                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // 消費者スレッド
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    queue.consume();
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();

        try {
            producer.join();
            consumer.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Producer-Consumer completed");
    }
}

21. ExecutorServiceの使用

import java.util.concurrent.*;
import java.util.*;

public class ExecutorServiceDemo {
    public static void main(String[] args) throws InterruptedException {
        // 固定サイズのスレッドプール
        ExecutorService executor = Executors.newFixedThreadPool(3);

        List> futures = new ArrayList<>();

        // 10個のタスクを実行
        for (int i = 1; i <= 10; i++) {
            final int taskId = i;
            Future future = executor.submit(() -> {
                System.out.println("Task " + taskId + " started by " + 
                                 Thread.currentThread().getName());
                try {
                    Thread.sleep(1000); // シミュレートされた作業
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return "Result of task " + taskId;
            });
            futures.add(future);
        }

        // 結果の収集
        for (Future future : futures) {
            try {
                String result = future.get();
                System.out.println("Future result: " + result);
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        // スレッドプールのシャットダウン
        executor.shutdown();
        boolean terminated = executor.awaitTermination(10, TimeUnit.SECONDS);
        System.out.println("All tasks completed: " + terminated);

        // さまざまなExecutorServiceの例
        System.out.println("\n--- Different Executor Services ---");

        // 単一スレッドExecutor
        ExecutorService singleThread = Executors.newSingleThreadExecutor();
        singleThread.submit(() -> System.out.println("Single thread task"));
        singleThread.shutdown();

        // キャッシュスレッドプール
        ExecutorService cachedPool = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            cachedPool.submit(() -> {
                System.out.println("Cached pool task by " + Thread.currentThread().getName());
            });
        }
        cachedPool.shutdown();

        // スケジュールされたExecutor
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
        scheduler.schedule(() -> 
            System.out.println("Delayed task"), 2, TimeUnit.SECONDS);

        scheduler.scheduleAtFixedRate(() -> 
            System.out.println("Fixed rate task"), 1, 3, TimeUnit.SECONDS);

        Thread.sleep(10000);
        scheduler.shutdown();
    }
}

22. CallableとFuture

ComputationTask.java

import java.util.concurrent.Callable;

public class ComputationTask implements Callable {
    private final int number;

    public ComputationTask(int number) {
        this.number = number;
    }

    @Override
    public Long call() throws Exception {
        System.out.println("Computing factorial of " + number +
                         " in " + Thread.currentThread().getName());

        long result = 1;
        for (int i = 2; i <= number; i++) {
            result *= i;
            Thread.sleep(10); // 計算時間をシミュレート
        }

        return result;
    }
}

NetworkTask.java

import java.util.concurrent.Callable;

public class NetworkTask implements Callable {
    private final String url;

    public NetworkTask(String url) {
        this.url = url;
    }

    @Override
    public String call() throws Exception {
        System.out.println("Fetching data from " + url +
                         " in " + Thread.currentThread().getName());

        // ネットワーク遅延をシミュレート
        Thread.sleep(2000);

        return "Data from " + url;
    }
}

CallableFutureDemo.java

import java.util.concurrent.*;
import java.util.*;

public class CallableFutureDemo {

    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(3);

        // 複数のCallableタスクを作成
        List<Callable<Object>> tasks = new ArrayList<>();

        // 計算タスク
        tasks.add(Executors.callable(new ComputationTask(10)));
        tasks.add(Executors.callable(new ComputationTask(15)));

        // ネットワークタスク
        tasks.add(Executors.callable(new NetworkTask("http://api.example.com/data")));
        tasks.add(Executors.callable(new NetworkTask("http://api.example.com/users")));

        // 一括実行
        System.out.println("Submitting all tasks...");
        List<Future<Object>> futures = executor.invokeAll(tasks);

        // 結果の処理
        System.out.println("\nProcessing results:");
        for (int i = 0; i < futures.size(); i++) {
            Future<Object> future = futures.get(i);
            try {
                Object result = future.get(5, TimeUnit.SECONDS);
                System.out.println("Task " + i + " result: " + result);
            } catch (TimeoutException e) {
                System.out.println("Task " + i + " timed out");
                future.cancel(true);
            } catch (ExecutionException e) {
                System.out.println("Task " + i + " failed: " + e.getCause());
            }
        }

        // Futureの状態確認
        System.out.println("\nFuture states:");
        Future<Long> computationFuture = executor.submit(new ComputationTask(20));

        // 非同期で結果を待機しながら他の作業
        while (!computationFuture.isDone()) {
            System.out.println("Waiting for computation...");
            Thread.sleep(500);
        }

        if (computationFuture.isDone() && !computationFuture.isCancelled()) {
            Long result = computationFuture.get();
            System.out.println("Final computation result: " + result);
        }

        executor.shutdown();
        boolean terminated = executor.awaitTermination(5, TimeUnit.SECONDS);
        System.out.println("Executor terminated: " + terminated);
    }
}

23. CompletableFutureの基本

FetchUserDataService.java

// FetchUserDataService.java
import java.util.concurrent.CompletableFuture;

public class FetchUserDataService {
    public static CompletableFuture fetchUserData(int userId) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "User data for ID: " + userId;
        });
    }
}

CalculateScoreService.java

// CalculateScoreService.java
import java.util.concurrent.CompletableFuture;

public class CalculateScoreService {
    public static CompletableFuture calculateScore(String userData) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return userData.length() * 10;
        });
    }
}

SendNotificationService.java

// SendNotificationService.java
import java.util.concurrent.CompletableFuture;

public class SendNotificationService {
    public static CompletableFuture sendNotification(int score) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "Notification sent for score: " + score;
        });
    }
}

CompletableFutureDemo.java

// CompletableFutureDemo.java
import java.util.concurrent.*;

public class CompletableFutureDemo {

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "Hello, CompletableFuture!";
        });

        future.thenAccept(result ->
            System.out.println("Basic result: " + result));

        System.out.println("Starting chained operations...");

        CompletableFuture<Void> chainedFuture = FetchUserDataService.fetchUserData(123)
            .thenApply(userData -> {
                System.out.println("Processing: " + userData);
                return userData.toUpperCase();
            })
            .thenCompose(CalculateScoreService::calculateScore)
            .thenCompose(SendNotificationService::sendNotification)
            .thenAccept(notification ->
                System.out.println("Final: " + notification));

        chainedFuture.get();

        System.out.println("\nCombining multiple futures...");

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Result 1");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Result 2");
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Result 3");

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
        allFutures.thenRun(() -> {
            try {
                System.out.println("All completed: " + future1.get() + ", " +
                                 future2.get() + ", " + future3.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(
            CompletableFuture.supplyAsync(() -> {
                try { Thread.sleep(2000); return "Slow"; }
                catch (InterruptedException e) { throw new RuntimeException(e); }
            }),
            CompletableFuture.supplyAsync(() -> {
                try { Thread.sleep(500); return "Fast"; }
                catch (InterruptedException e) { throw new RuntimeException(e); }
            })
        );

        anyFuture.thenAccept(result ->
            System.out.println("First completed: " + result));

        CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("Simulated error");
            }
            return "Success";
        }).exceptionally(throwable -> {
            System.out.println("Handled exception: " + throwable.getMessage());
            return "Default value";
        }).thenAccept(result ->
            System.out.println("Exception handling result: " + result));

        Thread.sleep(3000);
        System.out.println("Demo completed");
    }
}

24. Streamとラムダ式の組み合わせ

import java.util.*;
import java.util.stream.*;

class Employee {
    private String name;
    private String department;
    private double salary;

    public Employee(String name, String department, double salary) {
        this.name = name;
        this.department = department;
        this.salary = salary;
    }

    public String getName() { return name; }
    public String getDepartment() { return department; }
    public double getSalary() { return salary; }

    @Override
    public String toString() {
        return name + "(" + department + "): $" + salary;
    }
}

public class EmployeeAnalysis {
    public static void main(String[] args) {
        List employees = Arrays.asList(
            new Employee("Alice", "Engineering", 75000),
            new Employee("Bob", "Engineering", 80000),
            new Employee("Charlie", "Engineering", 90000),
            new Employee("Diana", "Marketing", 60000),
            new Employee("Eve", "Marketing", 65000),
            new Employee("Frank", "Sales", 55000),
            new Employee("Grace", "Sales", 60000),
            new Employee("Henry", "HR", 50000),
            new Employee("Ivy", "HR", 52000)
        );

        // 部門別の平均給与
        System.out.println("Average salary by department:");
        Map avgSalaryByDept = employees.stream()
            .collect(Collectors.groupingBy(
                Employee::getDepartment,
                Collectors.averagingDouble(Employee::getSalary)
            ));

        avgSalaryByDept.forEach((dept, avg) -> 
            System.out.printf("%s: $%.2f\n", dept, avg));

        // 部門別の最高給与
        System.out.println("\nMax salary by department:");
        Map> maxSalaryByDept = employees.stream()
            .collect(Collectors.groupingBy(
                Employee::getDepartment,
                Collectors.maxBy(Comparator.comparing(Employee::getSalary))
            ));

        maxSalaryByDept.forEach((dept, emp) -> 
            emp.ifPresent(e -> 
                System.out.printf("%s: %s\n", dept, e)));

        // 給与階層別グループ化
        System.out.println("\nEmployees by salary range:");
        Map> bySalaryRange = employees.stream()
            .collect(Collectors.groupingBy(emp -> {
                if (emp.getSalary() < 60000) return "Low (< 60k)";
                else if (emp.getSalary() < 80000) return "Medium (60k-80k)";
                else return "High (> 80k)";
            }));

        bySalaryRange.forEach((range, emps) -> {
            System.out.println(range + ":");
            emps.forEach(emp -> System.out.println("  " + emp));
        });

        // 部門別の給与統計
        System.out.println("\nSalary statistics by department:");
        Map statsByDept = employees.stream()
            .collect(Collectors.groupingBy(
                Employee::getDepartment,
                Collectors.summarizingDouble(Employee::getSalary)
            ));

        statsByDept.forEach((dept, stats) -> 
            System.out.printf("%s: count=%d, avg=%.2f, min=%.2f, max=%.2f\n",
                dept, stats.getCount(), stats.getAverage(), 
                stats.getMin(), stats.getMax()));

        // 給与順トップ3
        System.out.println("\nTop 3 highest paid employees:");
        employees.stream()
            .sorted(Comparator.comparing(Employee::getSalary).reversed())
            .limit(3)
            .forEach(emp -> System.out.println(emp));

        // 部門別の給与合計
        System.out.println("\nTotal salary cost by department:");
        Map totalSalaryByDept = employees.stream()
            .collect(Collectors.groupingBy(
                Employee::getDepartment,
                Collectors.summingDouble(Employee::getSalary)
            ));

        totalSalaryByDept.forEach((dept, total) -> 
            System.out.printf("%s: $%.2f\n", dept, total));
    }
}

25. 並列処理のエラーハンドリング

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class ParallelErrorHandling {

    static class DataProcessor {
        public String process(int data) {
            // ランダムにエラーを発生させる
            if (data % 7 == 0) {
                throw new RuntimeException("Error processing: " + data);
            }

            try {
                Thread.sleep(100); // 処理時間をシミュレート
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }

            return "Processed: " + data;
        }
    }

    public static void main(String[] args) {
        DataProcessor processor = new DataProcessor();
        List data = IntStream.range(1, 51)
            .boxed()
            .collect(Collectors.toList());

        System.out.println("Sequential processing with error handling:");

        // 逐次処理でのエラーハンドリング
        data.stream()
            .map(item -> {
                try {
                    return Optional.of(processor.process(item));
                } catch (Exception e) {
                    System.out.println("Caught error: " + e.getMessage());
                    return Optional.empty();
                }
            })
            .filter(Optional::isPresent)
            .map(Optional::get)
            .forEach(System.out::println);

        System.out.println("\nParallel processing with error handling:");

        // 並列処理でのエラーハンドリング
        List successfulResults = data.parallelStream()
            .map(item -> {
                try {
                    return processor.process(item);
                } catch (Exception e) {
                    System.err.println("Error in parallel: " + e.getMessage() + 
                                     " on thread: " + Thread.currentThread().getName());
                    return null;
                }
            })
            .filter(Objects::nonNull)
            .collect(Collectors.toList());

        System.out.println("Successful results: " + successfulResults.size());

        // CompletableFutureでのエラーハンドリング
        System.out.println("\nCompletableFuture with error handling:");

        List> futures = data.stream()
            .map(item -> CompletableFuture.supplyAsync(() -> processor.process(item))
                .exceptionally(throwable -> {
                    System.err.println("Future error: " + throwable.getMessage());
                    return "Fallback for error";
                }))
            .collect(Collectors.toList());

        // すべてのFutureの完了を待機
        CompletableFuture allFutures = CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[0])
        );

        List futureResults = allFutures.thenApply(v -> 
            futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
        ).join();

        System.out.println("Future results count: " + futureResults.size());

        // カスタムコレクターでのエラーハンドリング
        System.out.println("\nCustom error handling in stream:");

        List customResults = data.parallelStream()
            .collect(ArrayList::new, (list, item) -> {
                try {
                    list.add(processor.process(item));
                } catch (Exception e) {
                    System.err.println("Skipped due to error: " + item);
                }
            }, ArrayList::addAll);

        System.out.println("Custom handling results: " + customResults.size());
    }
}

上級問題 解答例

26. カスタムコレクターの作成

import java.util.*;
import java.util.function.*;
import java.util.stream.*;

class Statistics {
    private final int count;
    private final double sum;
    private final double min;
    private final double max;

    public Statistics(int count, double sum, double min, double max) {
        this.count = count;
        this.sum = sum;
        this.min = min;
        this.max = max;
    }

    public double average() {
        return count > 0 ? sum / count : 0.0;
    }

    @Override
    public String toString() {
        return String.format("Count: %d, Sum: %.2f, Avg: %.2f, Min: %.2f, Max: %.2f",
                           count, sum, average(), min, max);
    }
}

public class CustomCollector {

    // カスタムコレクターの実装
    public static Collector statisticsCollector() {
        return Collector.of(
            // サプライヤー: 可変コンテナの生成
            () -> new double[]{0, Double.MAX_VALUE, Double.MIN_VALUE, 0},
            // アキュムレーター: 要素の蓄積
            (acc, value) -> {
                acc[0] += value;       // sum
                acc[1] = Math.min(acc[1], value); // min
                acc[2] = Math.max(acc[2], value); // max
                acc[3]++;              // count
            },
            // コンバイナー: 並列処理時の結合
            (acc1, acc2) -> {
                acc1[0] += acc2[0];
                acc1[1] = Math.min(acc1[1], acc2[1]);
                acc1[2] = Math.max(acc1[2], acc2[2]);
                acc1[3] += acc2[3];
                return acc1;
            },
            // フィニッシャー: 最終結果の変換
            acc -> new Statistics(
                (int)acc[3], acc[0], acc[1], acc[2]
            )
        );
    }

    // 文字列統計用コレクター
    public static Collector stringSummaryCollector() {
        return Collector.of(
            StringBuilder::new,
            StringBuilder::append,
            StringBuilder::append,
            StringBuilder::toString
        );
    }

    // グループ化された統計コレクター
    public static  Collector>> partitioningByCustom(
            Predicate predicate) {
        return Collector.of(
            () -> {
                Map> map = new HashMap<>();
                map.put(true, new ArrayList<>());
                map.put(false, new ArrayList<>());
                return map;
            },
            (map, element) -> {
                boolean key = predicate.test(element);
                map.get(key).add(element);
            },
            (map1, map2) -> {
                map1.get(true).addAll(map2.get(true));
                map1.get(false).addAll(map2.get(false));
                return map1;
            }
        );
    }

    public static void main(String[] args) {
        List numbers = Arrays.asList(1.0, 2.5, 3.7, 4.2, 5.8, 6.1, 7.9);

        // カスタム統計コレクターの使用
        Statistics stats = numbers.stream()
            .collect(statisticsCollector());

        System.out.println("Custom Statistics: " + stats);

        // 組み込みの統計コレクターと比較
        DoubleSummaryStatistics builtInStats = numbers.stream()
            .collect(Collectors.summarizingDouble(Double::doubleValue));

        System.out.println("Built-in Statistics: " + builtInStats);

        // 文字列統計コレクター
        String concatenated = Arrays.asList("A", "B", "C", "D").stream()
            .collect(stringSummaryCollector());

        System.out.println("Concatenated: " + concatenated);

        // カスタム partitioningBy
        Map> partitioned = numbers.stream()
            .collect(partitioningByCustom(n -> n > 4.0));

        System.out.println("Partitioned: " + partitioned);

        // 複雑なカスタムコレクター: 重み付き平均
        class WeightedValue {
            double value;
            double weight;
            WeightedValue(double value, double weight) {
                this.value = value;
                this.weight = weight;
            }
        }

        List weightedValues = Arrays.asList(
            new WeightedValue(10.0, 1.0),
            new WeightedValue(20.0, 2.0),
            new WeightedValue(30.0, 3.0)
        );

        Collector weightedAverageCollector = 
            Collector.of(
                () -> new double[2], // [weightedSum, totalWeight]
                (acc, wv) -> {
                    acc[0] += wv.value * wv.weight;
                    acc[1] += wv.weight;
                },
                (acc1, acc2) -> {
                    acc1[0] += acc2[0];
                    acc1[1] += acc2[1];
                    return acc1;
                },
                acc -> acc[1] > 0 ? acc[0] / acc[1] : 0.0
            );

        double weightedAvg = weightedValues.stream()
            .collect(weightedAverageCollector);

        System.out.println("Weighted Average: " + weightedAvg);
    }
}

27. 非同期処理パイプライン

import java.util.concurrent.*;
import java.util.*;
import java.util.function.*;

public class AsyncPipeline {

    // シミュレートされたサービス
    static class UserService {
        CompletableFuture fetchUser(int userId) {
            return CompletableFuture.supplyAsync(() -> {
                sleep(100);
                return "User-" + userId;
            });
        }
    }

    static class OrderService {
        CompletableFuture> fetchOrders(String username) {
            return CompletableFuture.supplyAsync(() -> {
                sleep(200);
                return Arrays.asList("Order1", "Order2", "Order3");
            });
        }
    }

    static class PaymentService {
        CompletableFuture calculateTotal(List orders) {
            return CompletableFuture.supplyAsync(() -> {
                sleep(150);
                return orders.size() * 25.0;
            });
        }
    }

    static class NotificationService {
        CompletableFuture sendReceipt(String username, double amount) {
            return CompletableFuture.supplyAsync(() -> {
                sleep(100);
                return "Receipt sent to " + username + " for $" + amount;
            });
        }
    }

    private static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public static void main(String[] args) throws Exception {
        UserService userService = new UserService();
        OrderService orderService = new OrderService();
        PaymentService paymentService = new PaymentService();
        NotificationService notificationService = new NotificationService();

        System.out.println("Starting async pipeline...");

        // 複雑な非同期パイプライン
        CompletableFuture pipeline = userService.fetchUser(123)
            .thenCompose(username -> {
                System.out.println("Fetched user: " + username);
                return orderService.fetchOrders(username)
                    .thenCompose(orders -> {
                        System.out.println("Fetched orders: " + orders);
                        return paymentService.calculateTotal(orders)
                            .thenCompose(amount -> {
                                System.out.println("Calculated total: $" + amount);
                                return notificationService.sendReceipt(username, amount);
                            });
                    });
            })
            .thenAccept(receipt -> {
                System.out.println("Pipeline completed: " + receipt);
            })
            .exceptionally(throwable -> {
                System.err.println("Pipeline failed: " + throwable.getMessage());
                return null;
            });

        // 並列実行と結合
        System.out.println("\n--- Parallel Execution ---");

        CompletableFuture userFuture = userService.fetchUser(456);
        CompletableFuture> productFuture = CompletableFuture.supplyAsync(() -> {
            sleep(150);
            return Arrays.asList("ProductA", "ProductB");
        });

        CompletableFuture combinedResult = userFuture
            .thenCombine(productFuture, (user, products) -> 
                user + " bought " + products)
            .thenApply(String::toUpperCase);

        combinedResult.thenAccept(System.out::println);

        // 複数パイプラインの管理
        System.out.println("\n--- Multiple Pipelines ---");

        List> pipelines = new ArrayList<>();

        for (int i = 1; i <= 5; i++) {
            final int userId = i;
            CompletableFuture userPipeline = userService.fetchUser(userId)
                .thenCompose(orderService::fetchOrders)
                .thenCompose(paymentService::calculateTotal)
                .thenCompose(amount -> 
                    notificationService.sendReceipt("User-" + userId, amount))
                .exceptionally(ex -> "Failed for user " + userId + ": " + ex.getMessage());

            pipelines.add(userPipeline);
        }

        // すべてのパイプラインの完了を待機
        CompletableFuture allPipelines = CompletableFuture.allOf(
            pipelines.toArray(new CompletableFuture[0])
        );

        allPipelines.thenRun(() -> {
            System.out.println("\nAll pipelines completed:");
            pipelines.forEach(future -> {
                try {
                    System.out.println("Result: " + future.get());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        });

        // タイムアウト処理
        System.out.println("\n--- Timeout Handling ---");

        CompletableFuture slowService = CompletableFuture.supplyAsync(() -> {
            sleep(5000); // 5秒かかるサービス
            return "Slow result";
        });

        CompletableFuture timeoutFuture = slowService
            .orTimeout(2, TimeUnit.SECONDS)
            .exceptionally(ex -> "Timeout: " + ex.getMessage());

        timeoutFuture.thenAccept(result -> 
            System.out.println("Timeout result: " + result));

        // 完了を待機
        pipeline.get();
        combinedResult.get();
        allPipelines.get();
        timeoutFuture.get();

        System.out.println("All async operations completed");
    }
}

28. リアクティブストリームの模倣

import java.util.*;
import java.util.concurrent.*;
import java.util.function.*;
import java.util.stream.*;

// シンプルなリアクティブストリームインターフェース
interface ReactiveStream {
    void subscribe(Consumer onNext, Consumer onError, Runnable onComplete);
    ReactiveStream filter(Predicate predicate);
     ReactiveStream map(Function mapper);
    ReactiveStream take(int n);
}

// リアクティブストリームの実装
class SimpleReactiveStream implements ReactiveStream {
    private final List> subscribers = new ArrayList<>();
    private final List> errorHandlers = new ArrayList<>();
    private final List completionHandlers = new ArrayList<>();

    public static  SimpleReactiveStream fromIterable(Iterable source) {
        SimpleReactiveStream stream = new SimpleReactiveStream<>();

        // 別スレッドでデータを発行
        CompletableFuture.runAsync(() -> {
            try {
                for (T item : source) {
                    stream.publish(item);
                }
                stream.complete();
            } catch (Exception e) {
                stream.error(e);
            }
        });

        return stream;
    }

    public static  SimpleReactiveStream fromStream(Stream source) {
        return fromIterable(source.collect(Collectors.toList()));
    }

    private void publish(T item) {
        subscribers.forEach(subscriber -> {
            try {
                subscriber.accept(item);
            } catch (Exception e) {
                errorHandlers.forEach(handler -> handler.accept(e));
            }
        });
    }

    private void error(Throwable throwable) {
        errorHandlers.forEach(handler -> handler.accept(throwable));
    }

    private void complete() {
        completionHandlers.forEach(Runnable::run);
    }

    @Override
    public void subscribe(Consumer onNext, Consumer onError, Runnable onComplete) {
        subscribers.add(onNext);
        errorHandlers.add(onError);
        completionHandlers.add(onComplete);
    }

    @Override
    public ReactiveStream filter(Predicate predicate) {
        SimpleReactiveStream newStream = new SimpleReactiveStream<>();

        this.subscribe(
            item -> {
                if (predicate.test(item)) {
                    newStream.publish(item);
                }
            },
            newStream::error,
            newStream::complete
        );

        return newStream;
    }

    @Override
    public  ReactiveStream map(Function mapper) {
        SimpleReactiveStream newStream = new SimpleReactiveStream<>();

        this.subscribe(
            item -> {
                try {
                    R mapped = mapper.apply(item);
                    newStream.publish(mapped);
                } catch (Exception e) {
                    newStream.error(e);
                }
            },
            newStream::error,
            newStream::complete
        );

        return newStream;
    }

    @Override
    public ReactiveStream take(int n) {
        SimpleReactiveStream newStream = new SimpleReactiveStream<>();
        AtomicInteger count = new AtomicInteger(0);

        this.subscribe(
            item -> {
                if (count.getAndIncrement() < n) {
                    newStream.publish(item);
                }
            },
            newStream::error,
            newStream::complete
        );

        return newStream;
    }
}

public class ReactiveStreamDemo {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("=== Simple Reactive Stream Demo ===");

        // データソースからリアクティブストリームを作成
        List numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        ReactiveStream numberStream = SimpleReactiveStream.fromIterable(numbers);

        // 基本的な購読
        System.out.println("Basic subscription:");
        numberStream.subscribe(
            item -> System.out.println("Received: " + item),
            error -> System.err.println("Error: " + error),
            () -> System.out.println("Completed!")
        );

        Thread.sleep(1000);

        // 操作のチェーン
        System.out.println("\nChained operations:");
        SimpleReactiveStream.fromIterable(numbers)
            .filter(n -> n % 2 == 0)
            .map(n -> n * 10)
            .take(3)
            .subscribe(
                item -> System.out.println("Processed: " + item),
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Processing completed!")
            );

        Thread.sleep(1000);

        // エラーハンドリング
        System.out.println("\nError handling:");
        List mixedData = Arrays.asList("1", "2", "abc", "4", "5");

        SimpleReactiveStream.fromIterable(mixedData)
            .map(Integer::parseInt) // ここでエラーが発生する
            .subscribe(
                item -> System.out.println("Parsed: " + item),
                error -> System.err.println("Parse error: " + error.getMessage()),
                () -> System.out.println("Parsing completed")
            );

        Thread.sleep(1000);

        // 無限ストリームのシミュレーション
        System.out.println("\nInfinite stream simulation:");
        ReactiveStream infiniteStream = new SimpleReactiveStream<>();

        // 別スレッドでデータを生成
        CompletableFuture.runAsync(() -> {
            long i = 0;
            while (i < 10) { // 有限で止める
                ((SimpleReactiveStream)infiniteStream).publish(i++);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
            ((SimpleReactiveStream)infiniteStream).complete();
        });

        infiniteStream
            .filter(n -> n % 2 == 0)
            .take(3)
            .subscribe(
                item -> System.out.println("Infinite item: " + item),
                error -> System.err.println("Infinite error: " + error),
                () -> System.out.println("Infinite completed")
            );

        Thread.sleep(2000);

        // バックプレッシャーの簡単な実装
        System.out.println("\nBackpressure simulation:");
        SimpleReactiveStream fastStream = SimpleReactiveStream.fromIterable(
            IntStream.range(0, 100).boxed().collect(Collectors.toList())
        );

        fastStream.subscribe(
            item -> {
                System.out.println("Slow processing: " + item);
                try {
                    Thread.sleep(10); // 遅い消費者
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            },
            error -> System.err.println("Error: " + error),
            () -> System.out.println("Backpressure completed")
        );

        Thread.sleep(2000);
    }
}

29. パフォーマンス比較

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class PerformanceComparison {
    private static final int DATA_SIZE = 10_000_000;
    private static final int WARMUP_ITERATIONS = 5;
    private static final int MEASURE_ITERATIONS = 10;

    public static void main(String[] args) {
        System.out.println("Performance Comparison: Traditional vs Stream vs Parallel Stream");
        System.out.println("Data size: " + DATA_SIZE);
        System.out.println();

        // テストデータの準備
        List data = new ArrayList<>(DATA_SIZE);
        Random random = new Random(42); // 再現性のためにシード固定

        for (int i = 0; i < DATA_SIZE; i++) {
            data.add(random.nextInt(1000));
        }

        // ウォームアップ
        System.out.println("Warming up...");
        for (int i = 0; i < WARMUP_ITERATIONS; i++) {
            traditionalSum(data);
            streamSum(data);
            parallelStreamSum(data);
        }

        // パフォーマンス計測
        System.out.println("Measuring performance...");

        long traditionalTime = measureTime(() -> traditionalSum(data));
        long streamTime = measureTime(() -> streamSum(data));
        long parallelTime = measureTime(() -> parallelStreamSum(data));

        // 結果表示
        System.out.println("\n=== RESULTS ===");
        System.out.printf("Traditional for-loop: %d ms\n", traditionalTime);
        System.out.printf("Stream API: %d ms\n", streamTime);
        System.out.printf("Parallel Stream: %d ms\n", parallelTime);

        System.out.println("\n=== COMPARISON ===");
        System.out.printf("Stream vs Traditional: %.2fx %s\n", 
            (double)traditionalTime/streamTime, 
            traditionalTime > streamTime ? "slower" : "faster");

        System.out.printf("Parallel vs Traditional: %.2fx %s\n", 
            (double)traditionalTime/parallelTime, 
            traditionalTime > parallelTime ? "slower" : "faster");

        System.out.printf("Parallel vs Stream: %.2fx %s\n", 
            (double)streamTime/parallelTime, 
            streamTime > parallelTime ? "slower" : "faster");

        // メモリ使用量の比較
        System.out.println("\n=== MEMORY USAGE ===");
        compareMemoryUsage(data);

        // 様々な操作での比較
        System.out.println("\n=== DIFFERENT OPERATIONS ===");
        compareDifferentOperations(data);
    }

    private static long traditionalSum(List data) {
        long sum = 0;
        for (int value : data) {
            if (value % 2 == 0) {
                sum += value * value;
            }
        }
        return sum;
    }

    private static long streamSum(List data) {
        return data.stream()
            .filter(n -> n % 2 == 0)
            .mapToLong(n -> (long)n * n)
            .sum();
    }

    private static long parallelStreamSum(List data) {
        return data.parallelStream()
            .filter(n -> n % 2 == 0)
            .mapToLong(n -> (long)n * n)
            .sum();
    }

    private static long measureTime(Runnable operation) {
        long totalTime = 0;

        for (int i = 0; i < MEASURE_ITERATIONS; i++) {
            long startTime = System.currentTimeMillis();
            operation.run();
            long endTime = System.currentTimeMillis();
            totalTime += (endTime - startTime);
        }

        return totalTime / MEASURE_ITERATIONS;
    }

    private static void compareMemoryUsage(List data) {
        Runtime runtime = Runtime.getRuntime();

        // ガベージコレクション
        System.gc();

        long memoryBefore = runtime.totalMemory() - runtime.freeMemory();
        long result1 = traditionalSum(data);
        long memoryAfterTraditional = runtime.totalMemory() - runtime.freeMemory();

        System.gc();

        memoryBefore = runtime.totalMemory() - runtime.freeMemory();
        long result2 = streamSum(data);
        long memoryAfterStream = runtime.totalMemory() - runtime.freeMemory();

        System.gc();

        memoryBefore = runtime.totalMemory() - runtime.freeMemory();
        long result3 = parallelStreamSum(data);
        long memoryAfterParallel = runtime.totalMemory() - runtime.freeMemory();

        System.out.printf("Traditional memory delta: %d bytes\n", 
                         memoryAfterTraditional - memoryBefore);
        System.out.printf("Stream memory delta: %d bytes\n", 
                         memoryAfterStream - memoryBefore);
        System.out.printf("Parallel memory delta: %d bytes\n", 
                         memoryAfterParallel - memoryBefore);

        // 結果の検証
        System.out.printf("Results consistent: %b\n", 
                         result1 == result2 && result2 == result3);
    }

    private static void compareDifferentOperations(List data) {
        System.out.println("1. Filter + Map + Sum:");
        long time1 = measureTime(() -> traditionalSum(data));
        long time2 = measureTime(() -> streamSum(data));
        long time3 = measureTime(() -> parallelStreamSum(data));
        printComparison("FilterMapSum", time1, time2, time3);

        System.out.println("2. Simple Sum:");
        long time4 = measureTime(() -> {
            long sum = 0;
            for (int n : data) sum += n;
        });
        long time5 = measureTime(() -> data.stream().mapToInt(Integer::intValue).sum());
        long time6 = measureTime(() -> data.parallelStream().mapToInt(Integer::intValue).sum());
        printComparison("SimpleSum", time4, time5, time6);

        System.out.println("3. Complex Transformation:");
        long time7 = measureTime(() -> {
            List result = new ArrayList<>();
            for (int n : data) {
                if (n > 500) {
                    result.add(String.valueOf(n * 2));
                }
            }
        });
        long time8 = measureTime(() -> {
            List result = data.stream()
                .filter(n -> n > 500)
                .map(n -> String.valueOf(n * 2))
                .collect(Collectors.toList());
        });
        long time9 = measureTime(() -> {
            List result = data.parallelStream()
                .filter(n -> n > 500)
                .map(n -> String.valueOf(n * 2))
                .collect(Collectors.toList());
        });
        printComparison("ComplexTransform", time7, time8, time9);
    }

    private static void printComparison(String operation, long traditional, long stream, long parallel) {
        System.out.printf("  Traditional: %dms, Stream: %dms, Parallel: %dms\n", 
                         traditional, stream, parallel);
        System.out.printf("  Best: %s\n", 
                         traditional < stream && traditional < parallel ? "Traditional" :
                         stream < parallel ? "Stream" : "Parallel");
    }
}

30. 大規模データ処理システムの設計

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
import java.nio.file.*;
import java.io.*;

public class LargeDataProcessingSystem {

    // データ処理ジョブ
    static class ProcessingJob {
        private final String jobId;
        private final List inputFiles;
        private final String outputDir;

        public ProcessingJob(String jobId, List inputFiles, String outputDir) {
            this.jobId = jobId;
            this.inputFiles = inputFiles;
            this.outputDir = outputDir;
        }

        public String getJobId() { return jobId; }
        public List getInputFiles() { return inputFiles; }
        public String getOutputDir() { return outputDir; }
    }

    // データプロセッサ
    static class DataProcessor {
        private final ExecutorService executor;
        private final int batchSize;

        public DataProcessor(int threadCount, int batchSize) {
            this.executor = Executors.newFixedThreadPool(threadCount);
            this.batchSize = batchSize;
        }

        public CompletableFuture processJob(ProcessingJob job) {
            System.out.println("Starting job: " + job.getJobId());

            // 各ファイルを並列処理
            List> fileFutures = job.getInputFiles().stream()
                .map(file -> processFile(file, job.getOutputDir()))
                .collect(Collectors.toList());

            // すべてのファイル処理が完了したらジョブ完了
            return CompletableFuture.allOf(
                fileFutures.toArray(new CompletableFuture[0])
            ).thenRun(() -> 
                System.out.println("Completed job: " + job.getJobId())
            );
        }

        private CompletableFuture processFile(Path inputFile, String outputDir) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    System.out.println("Processing file: " + inputFile + 
                                     " on " + Thread.currentThread().getName());

                    // ファイルをバッチで読み込み
                    List>> batchFutures = 
                        readFileInBatches(inputFile, batchSize);

                    // すべてのバッチの処理を待機
                    List> allBatches = batchFutures.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList());

                    // 結果を結合して出力
                    List allResults = allBatches.stream()
                        .flatMap(List::stream)
                        .collect(Collectors.toList());

                    // 出力ファイルに書き込み
                    Path outputFile = Paths.get(outputDir, 
                        inputFile.getFileName() + ".processed");
                    Files.write(outputFile, allResults);

                    return null;

                } catch (Exception e) {
                    throw new RuntimeException("Failed to process file: " + inputFile, e);
                }
            }, executor);
        }

        private List>> readFileInBatches(Path file, int batchSize) {
            try {
                List allLines = Files.readAllLines(file);

                // 行をバッチに分割
                List> batches = new ArrayList<>();
                for (int i = 0; i < allLines.size(); i += batchSize) {
                    int end = Math.min(i + batchSize, allLines.size());
                    batches.add(allLines.subList(i, end));
                }

                // 各バッチを並列処理
                return batches.stream()
                    .map(batch -> processBatchAsync(batch))
                    .collect(Collectors.toList());

            } catch (IOException e) {
                throw new RuntimeException("Failed to read file: " + file, e);
            }
        }

        private CompletableFuture> processBatchAsync(List batch) {
            return CompletableFuture.supplyAsync(() -> {
                // バッチ処理のシミュレーション
                return batch.stream()
                    .map(line -> line.toUpperCase()) // 単純な変換
                    .filter(line -> !line.isEmpty())
                    .collect(Collectors.toList());
            }, executor);
        }

        public void shutdown() {
            executor.shutdown();
            try {
                if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                    executor.shutdownNow();
                }
            } catch (InterruptedException e) {
                executor.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
    }

    // ジョブマネージャー
    static class JobManager {
        private final DataProcessor processor;
        private final Map> activeJobs;

        public JobManager(int threadCount, int batchSize) {
            this.processor = new DataProcessor(threadCount, batchSize);
            this.activeJobs = new ConcurrentHashMap<>();
        }

        public CompletableFuture submitJob(ProcessingJob job) {
            CompletableFuture jobFuture = processor.processJob(job)
                .exceptionally(throwable -> {
                    System.err.println("Job failed: " + job.getJobId() + " - " + throwable.getMessage());
                    return null;
                });

            activeJobs.put(job.getJobId(), jobFuture);

            // 完了時にアクティブジョブから削除
            jobFuture.thenRun(() -> activeJobs.remove(job.getJobId()));

            return jobFuture;
        }

        public void waitForCompletion() {
            // すべてのアクティブジョブの完了を待機
            CompletableFuture.allOf(
                activeJobs.values().toArray(new CompletableFuture[0])
            ).join();
        }

        public void shutdown() {
            processor.shutdown();
        }

        public int getActiveJobCount() {
            return activeJobs.size();
        }
    }

    // 使用例
    public static void main(String[] args) throws Exception {
        System.out.println("Large Data Processing System Demo");

        // テストデータの作成
        createTestData();

        // ジョブマネージャーの初期化
        JobManager jobManager = new JobManager(
            Runtime.getRuntime().availableProcessors(), // スレッド数
            1000 // バッチサイズ
        );

        // 処理ジョブの作成
        List inputFiles = Files.list(Paths.get("input"))
            .collect(Collectors.toList());

        ProcessingJob job1 = new ProcessingJob("JOB-001", inputFiles, "output");

        // ジョブの実行
        System.out.println("Submitting job...");
        CompletableFuture jobFuture = jobManager.submitJob(job1);

        // 進捗監視
        CompletableFuture.runAsync(() -> {
            while (!jobFuture.isDone()) {
                System.out.println("Active jobs: " + jobManager.getActiveJobCount());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });

        // 完了待機
        jobFuture.get(60, TimeUnit.SECONDS);
        jobManager.waitForCompletion();

        System.out.println("All jobs completed successfully");

        // クリーンアップ
        jobManager.shutdown();

        // 結果の検証
        verifyResults();
    }

    private static void createTestData() throws IOException {
        Files.createDirectories(Paths.get("input"));
        Files.createDirectories(Paths.get("output"));

        Random random = new Random(42);

        // テストファイルを5つ作成
        for (int i = 1; i <= 5; i++) {
            Path file = Paths.get("input", "data_" + i + ".txt");
            List lines = new ArrayList<>();

            // 各ファイルに10,000行のデータ
            for (int j = 0; j < 10000; j++) {
                lines.add("Line_" + j + "_Value_" + random.nextInt(1000));
            }

            Files.write(file, lines);
            System.out.println("Created test file: " + file + " with " + lines.size() + " lines");
        }
    }

    private static void verifyResults() throws IOException {
        long totalProcessedLines = Files.list(Paths.get("output"))
            .mapToLong(file -> {
                try {
                    return Files.readAllLines(file).size();
                } catch (IOException e) {
                    return 0;
                }
            })
            .sum();

        long totalInputLines = Files.list(Paths.get("input"))
            .mapToLong(file -> {
                try {
                    return Files.readAllLines(file).size();
                } catch (IOException e) {
                    return 0;
                }
            })
            .sum();

        System.out.println("Verification:");
        System.out.println("Total input lines: " + totalInputLines);
        System.out.println("Total processed lines: " + totalProcessedLines);
        System.out.println("Processing successful: " + (totalInputLines == totalProcessedLines));
    }
}