Python総合演習問題50問

2025-11-30

初級問題(15問)

1. 基本データ型操作

整数、浮動小数点数、文字列、ブーリアンの各データ型を使用して、簡単な計算と文字列操作を行うプログラムを作成してください。

2. 条件分岐の基本

ユーザーから数値を入力させ、その数値が正の数、負の数、ゼロのどれかを判定するプログラムを作成してください。

3. ループ処理の基礎

1から100までの数字の中で、3の倍数のときは”Fizz”、5の倍数のときは”Buzz”、3と5の倍数のときは”FizzBuzz”を表示するプログラムを作成してください。

4. リスト操作

数値のリストから最大値、最小値、平均値を求めるプログラムを作成してください。

5. 辞書の基本操作

学生の名前と点数を管理する辞書を作成し、特定の学生の点数を検索・更新・削除する機能を実装してください。

6. 関数の作成

2つの数値を受け取り、その和、差、積、商を返す関数を作成してください。ゼロ除算の例外処理も含めてください。

7. ファイル読み込み

テキストファイルを読み込み、行数をカウントするプログラムを作成してください。

8. ファイル書き込み

ユーザーに入力させた内容を新しいテキストファイルに保存するプログラムを作成してください。

9. クラスの基本

「Person」クラスを作成し、名前と年齢を属性として持ち、自己紹介メソッドを実装してください。

10. 例外処理の基礎

数値入力を受け付けるプログラムで、数字以外が入力された場合に適切なエラーメッセージを表示するようにしてください。

11. リスト内包表記

1から20までの数字のリストから、偶数のみを取り出して新しいリストを作成するプログラムを、リスト内包表記を使用して書いてください。

12. 文字列操作

英文の文字列を受け取り、単語数をカウントし、各単語の先頭を大文字にするプログラムを作成してください。

13. 基本的な演算

円の半径を受け取り、円周と面積を計算するプログラムを作成してください。

14. 日付と時間

現在の日付と時刻を表示し、100日後の日付を計算するプログラムを作成してください。

15. モジュールの利用

mathモジュールを使用して、三角関数の計算を行うプログラムを作成してください。

中級問題(25問)

16. オブジェクト指向プログラミング

銀行口座を管理する「BankAccount」クラスを作成し、入金、出金、残高照会のメソッドを実装してください。

17. 継承の実装

「Vehicle」クラスを基底クラスとし、「Car」クラスと「Motorcycle」クラスを継承して作成してください。

18. ポリモーフィズムの実装

異なる図形クラス(円、四角形、三角形)を作成し、面積を計算する共通メソッドを持たせてポリモーフィズムを実現してください。

19. カプセル化の実装

プライベート属性とゲッター/セッターメソッドを使用して、データのカプセル化を実装してください。

20. 特殊メソッドの活用

__str____repr____add__などの特殊メソッドを実装したクラスを作成してください。

21. 独自例外の定義

残高不足の際に発生する独自例外「InsufficientFundsError」を定義し、使用してください。

22. デコレータの作成

関数の実行時間を計測するデコレータを作成してください。

23. ジェネレータの実装

フィボナッチ数列を生成するジェネレータ関数を作成してください。

24. コンテキストマネージャ

ファイル操作のためのコンテキストマネージャをクラスおよびcontextlibを使用して実装してください。

25. ログ出力の実装

アプリケーションにログ出力機能を追加し、異なるログレベルでメッセージを出力するプログラムを作成してください。

26. JSONデータの処理

辞書データをJSONファイルに保存し、読み込むプログラムを作成してください。

27. CSVデータの操作

CSVファイルを読み込み、データを加工して新しいCSVファイルに出力するプログラムを作成してください。

28. データベース接続

SQLiteデータベースに接続し、簡単なテーブルを作成するプログラムを作成してください。

29. CRUD操作の実装

SQLiteデータベースで、作成、読み込み、更新、削除の各操作を行うプログラムを作成してください。

30. データ永続化クラス

ユーザー設定をJSONファイルで永続化するクラスを作成してください。

31. HTTPリクエストの送信

requestsライブラリを使用して、Web APIからデータを取得するプログラムを作成してください。

32. REST APIクライアント

公開されているREST API(例: JSONPlaceholder)と連携するクライアントプログラムを作成してください。

33. データ可視化の基礎

matplotlibを使用して、簡単な折れ線グラフと棒グラフを作成するプログラムを作成してください。

34. データ分析の基本

Pandasを使用してCSVデータを読み込み、基本的な統計情報を表示するプログラムを作成してください。

35. 正規表現の活用

正規表現を使用して、テキストからメールアドレスや電話番号を抽出するプログラムを作成してください。

36. マルチスレッド処理

threadingモジュールを使用して、簡単なマルチスレッドプログラムを作成してください。

37. イテレータの実装

独自のイテレータクラスを作成し、forループで使用できるようにしてください。

38. プロパティの使用

プロパティデコレータを使用して、属性のゲッターとセッターを実装してください。

39. モックオブジェクトの使用

unittest.mockを使用して、外部依存のあるコードのテストを作成してください。

40. パッケージ作成

複数のモジュールからなる簡単なパッケージを作成し、setup.pyでインストール可能にしてください。

上級問題(10問)

41. Webアプリケーションフレームワーク

Flaskを使用して、簡単なWebアプリケーションを作成してください。

42. 非同期プログラミング

asyncioを使用して、非同期処理を行うプログラムを作成してください。

43. データベースORM

SQLAlchemyを使用して、データベース操作を行うプログラムを作成してください。

44. APIサーバーの実装

FlaskでREST APIサーバーを実装し、CRUD操作を提供してください。

45. ユニットテストの包括的実装

unittestまたはpytestを使用して、これまで作成したクラスや関数の包括的なテストスイートを作成してください。

46. デザインパターンの実装

ObserverパターンまたはSingletonパターンをPythonで実装してください。

47. データ可視化ダッシュボード

複数のグラフを含むデータ可視化ダッシュボードを作成してください。

48. 機械学習の基礎

scikit-learnを使用して、簡単な回帰または分類モデルを作成してください。

49. パフォーマンス最適化

プロファイリングツールを使用してコードのボトルネックを特定し、最適化してください。

50. 総合プロジェクト

これまで学んだすべての概念を統合した、データ管理Webアプリケーションを作成してください。

Python総合演習問題50問 解答例

初級問題(15問)

1. 基本データ型操作

def basic_data_types():
    # 整数
    integer_num = 10
    # 浮動小数点数
    float_num = 3.14
    # 文字列
    string_text = "Hello, Python!"
    # ブーリアン
    boolean_value = True

    # 計算
    result1 = integer_num + float_num
    result2 = integer_num * 2

    # 文字列操作
    upper_text = string_text.upper()
    text_length = len(string_text)

    print(f"整数: {integer_num}")
    print(f"浮動小数点数: {float_num}")
    print(f"文字列: {string_text}")
    print(f"ブーリアン: {boolean_value}")
    print(f"計算結果1: {result1}")
    print(f"計算結果2: {result2}")
    print(f"大文字変換: {upper_text}")
    print(f"文字列長: {text_length}")

basic_data_types()

2. 条件分岐の基本

def number_classifier():
    try:
        number = float(input("数値を入力してください: "))

        if number > 0:
            print("正の数です")
        elif number < 0:
            print("負の数です")
        else:
            print("ゼロです")
    except ValueError:
        print("数値を入力してください")

number_classifier()

3. ループ処理の基礎

def fizzbuzz():
    for i in range(1, 101):
        if i % 3 == 0 and i % 5 == 0:
            print("FizzBuzz")
        elif i % 3 == 0:
            print("Fizz")
        elif i % 5 == 0:
            print("Buzz")
        else:
            print(i)

fizzbuzz()

4. リスト操作

def list_operations():
    numbers = [12, 45, 23, 67, 34, 89, 1, 56]

    max_value = max(numbers)
    min_value = min(numbers)
    average_value = sum(numbers) / len(numbers)

    print(f"リスト: {numbers}")
    print(f"最大値: {max_value}")
    print(f"最小値: {min_value}")
    print(f"平均値: {average_value:.2f}")

list_operations()

5. 辞書の基本操作

def student_management():
    students = {
        "山田太郎": 85,
        "佐藤花子": 92,
        "鈴木一郎": 78,
        "田中みどり": 88
    }

    print("初期データ:", students)

    # 検索
    name = "佐藤花子"
    if name in students:
        print(f"{name}の点数: {students[name]}")

    # 更新
    students["鈴木一郎"] = 85
    print("更新後:", students)

    # 削除
    del students["田中みどり"]
    print("削除後:", students)

student_management()

6. 関数の作成

def calculator(a, b):
    try:
        addition = a + b
        subtraction = a - b
        multiplication = a * b
        division = a / b

        return {
            "和": addition,
            "差": subtraction,
            "積": multiplication,
            "商": division
        }
    except ZeroDivisionError:
        return {"エラー": "ゼロ除算はできません"}

# 使用例
result1 = calculator(10, 5)
print(result1)

result2 = calculator(10, 0)
print(result2)

7. ファイル読み込み

def count_lines(filename):
    try:
        with open(filename, 'r', encoding='utf-8') as file:
            lines = file.readlines()
            line_count = len(lines)
            print(f"ファイル '{filename}' の行数: {line_count}")
            return line_count
    except FileNotFoundError:
        print(f"ファイル '{filename}' が見つかりません")
        return 0

# 使用例(ファイルが存在する場合)
count_lines("sample.txt")

8. ファイル書き込み

def write_to_file():
    user_input = input("ファイルに書き込む内容を入力してください: ")
    filename = input("ファイル名を入力してください: ")

    try:
        with open(filename, 'w', encoding='utf-8') as file:
            file.write(user_input)
        print(f"内容を '{filename}' に保存しました")
    except Exception as e:
        print(f"エラーが発生しました: {e}")

write_to_file()

9. クラスの基本

class Person:
    def __init__(self, name, age):
        self.name = name
        self.age = age

    def introduce(self):
        return f"私は{self.name}です。{self.age}歳です。"

    def have_birthday(self):
        self.age += 1
        return f"お誕生日おめでとう!{self.name}は{self.age}歳になりました。"

# 使用例
person1 = Person("山田太郎", 25)
print(person1.introduce())
print(person1.have_birthday())

10. 例外処理の基礎

def safe_number_input():
    while True:
        try:
            number = float(input("数値を入力してください: "))
            print(f"入力された数値: {number}")
            break
        except ValueError:
            print("エラー: 数値を入力してください")
        except Exception as e:
            print(f"予期せぬエラー: {e}")

safe_number_input()

11. リスト内包表記

def list_comprehension_example():
    # 1から20までのリストを作成
    numbers = list(range(1, 21))
    print(f"元のリスト: {numbers}")

    # 偶数のみを取り出す(リスト内包表記)
    even_numbers = [x for x in numbers if x % 2 == 0]
    print(f"偶数のみ: {even_numbers}")

    # 各数字を2乗(リスト内包表記)
    squared_numbers = [x**2 for x in numbers]
    print(f"2乗したリスト: {squared_numbers}")

list_comprehension_example()

12. 文字列操作

def text_processor(text):
    # 単語数をカウント
    words = text.split()
    word_count = len(words)

    # 各単語の先頭を大文字にする
    capitalized_words = [word.capitalize() for word in words]
    processed_text = ' '.join(capitalized_words)

    print(f"元のテキスト: {text}")
    print(f"単語数: {word_count}")
    print(f"処理後のテキスト: {processed_text}")

    return processed_text

# 使用例
sample_text = "hello world, this is a python programming example"
text_processor(sample_text)

13. 基本的な演算

import math

def circle_calculations():
    try:
        radius = float(input("円の半径を入力してください: "))

        circumference = 2 * math.pi * radius
        area = math.pi * radius ** 2

        print(f"半径 {radius} の円:")
        print(f"円周: {circumference:.2f}")
        print(f"面積: {area:.2f}")

    except ValueError:
        print("数値を入力してください")

circle_calculations()

14. 日付と時間

from datetime import datetime, timedelta

def date_operations():
    # 現在の日付と時刻
    current_datetime = datetime.now()
    print(f"現在の日時: {current_datetime}")

    # 日付のみ
    current_date = current_datetime.date()
    print(f"現在の日付: {current_date}")

    # 100日後の日付
    future_date = current_date + timedelta(days=100)
    print(f"100日後の日付: {future_date}")

    # フォーマットして表示
    formatted_date = current_datetime.strftime("%Y年%m月%d日 %H時%M分%S秒")
    print(f"フォーマット済み: {formatted_date}")

date_operations()

15. モジュールの利用

import math

def trigonometry_calculations():
    angle_degrees = 45
    angle_radians = math.radians(angle_degrees)

    sin_value = math.sin(angle_radians)
    cos_value = math.cos(angle_radians)
    tan_value = math.tan(angle_radians)

    print(f"{angle_degrees}度の三角関数値:")
    print(f"sin({angle_degrees}°) = {sin_value:.4f}")
    print(f"cos({angle_degrees}°) = {cos_value:.4f}")
    print(f"tan({angle_degrees}°) = {tan_value:.4f}")

    # 逆三角関数
    asin_value = math.degrees(math.asin(sin_value))
    print(f"asin({sin_value:.4f}) = {asin_value:.2f}度")

trigonometry_calculations()

中級問題(25問)

16. オブジェクト指向プログラミング

class BankAccount:
    def __init__(self, account_holder, initial_balance=0):
        self.account_holder = account_holder
        self._balance = initial_balance
        self._transaction_history = []

    def deposit(self, amount):
        if amount > 0:
            self._balance += amount
            self._transaction_history.append(f"入金: +{amount}円")
            return True
        return False

    def withdraw(self, amount):
        if 0 < amount <= self._balance:
            self._balance -= amount
            self._transaction_history.append(f"出金: -{amount}円")
            return True
        return False

    def get_balance(self):
        return self._balance

    def get_transaction_history(self):
        return self._transaction_history.copy()

    def display_info(self):
        print(f"口座名義: {self.account_holder}")
        print(f"現在の残高: {self._balance}円")

# 使用例
account = BankAccount("山田太郎", 1000)
account.deposit(500)
account.withdraw(200)
account.display_info()
print("取引履歴:", account.get_transaction_history())

17. 継承の実装

class Vehicle:
    def __init__(self, make, model, year):
        self.make = make
        self.model = model
        self.year = year
        self._speed = 0

    def start_engine(self):
        return f"{self.make} {self.model}のエンジンを始動しました"

    def stop_engine(self):
        self._speed = 0
        return f"{self.make} {self.model}のエンジンを停止しました"

    def get_info(self):
        return f"{self.year}年式 {self.make} {self.model}"

class Car(Vehicle):
    def __init__(self, make, model, year, doors):
        super().__init__(make, model, year)
        self.doors = doors

    def honk(self):
        return "プップー!"

    def get_info(self):
        return f"{super().get_info()} ({self.doors}ドア)"

class Motorcycle(Vehicle):
    def __init__(self, make, model, year, engine_size):
        super().__init__(make, model, year)
        self.engine_size = engine_size

    def wheelie(self):
        return "ウィリー!"

    def get_info(self):
        return f"{super().get_info()} ({self.engine_size}cc)"

# 使用例
car = Car("Toyota", "Camry", 2023, 4)
bike = Motorcycle("Honda", "CBR", 2023, 600)

print(car.get_info())
print(car.start_engine())
print(car.honk())

print(bike.get_info())
print(bike.start_engine())
print(bike.wheelie())

18. ポリモーフィズムの実装

from math import pi

class Shape:
    def area(self):
        raise NotImplementedError("サブクラスで実装してください")

    def perimeter(self):
        raise NotImplementedError("サブクラスで実装してください")

class Circle(Shape):
    def __init__(self, radius):
        self.radius = radius

    def area(self):
        return pi * self.radius ** 2

    def perimeter(self):
        return 2 * pi * self.radius

class Rectangle(Shape):
    def __init__(self, width, height):
        self.width = width
        self.height = height

    def area(self):
        return self.width * self.height

    def perimeter(self):
        return 2 * (self.width + self.height)

class Triangle(Shape):
    def __init__(self, base, height, side1, side2, side3):
        self.base = base
        self.height = height
        self.side1 = side1
        self.side2 = side2
        self.side3 = side3

    def area(self):
        return 0.5 * self.base * self.height

    def perimeter(self):
        return self.side1 + self.side2 + self.side3

def print_shape_info(shape):
    print(f"面積: {shape.area():.2f}")
    print(f"周長: {shape.perimeter():.2f}")

# ポリモーフィズムの実演
shapes = [
    Circle(5),
    Rectangle(4, 6),
    Triangle(3, 4, 3, 4, 5)
]

for i, shape in enumerate(shapes, 1):
    print(f"\n図形{i}:")
    print_shape_info(shape)

19. カプセル化の実装

class Student:
    def __init__(self, name, age):
        self._name = name
        self._age = age
        self._grades = []
        self._is_graduated = False

    # ゲッターメソッド
    def get_name(self):
        return self._name

    def get_age(self):
        return self._age

    def get_grades(self):
        return self._grades.copy()

    def get_average_grade(self):
        if not self._grades:
            return 0
        return sum(self._grades) / len(self._grades)

    # セッターメソッド
    def set_name(self, name):
        if isinstance(name, str) and len(name) > 0:
            self._name = name
        else:
            raise ValueError("名前は空でない文字列である必要があります")

    def set_age(self, age):
        if isinstance(age, int) and 0 <= age <= 120:
            self._age = age
        else:
            raise ValueError("年齢は0から120の整数である必要があります")

    def add_grade(self, grade):
        if isinstance(grade, (int, float)) and 0 <= grade <= 100:
            self._grades.append(grade)
        else:
            raise ValueError("成績は0から100の数値である必要があります")

    def graduate(self):
        self._is_graduated = True
        return f"{self._name}は卒業しました"

# 使用例
student = Student("山田太郎", 20)
student.add_grade(85)
student.add_grade(92)
student.add_grade(78)

print(f"名前: {student.get_name()}")
print(f"年齢: {student.get_age()}")
print(f"成績: {student.get_grades()}")
print(f"平均点: {student.get_average_grade():.1f}")

20. 特殊メソッドの活用

class Vector:
    def __init__(self, x, y):
        self.x = x
        self.y = y

    def __str__(self):
        return f"Vector({self.x}, {self.y})"

    def __repr__(self):
        return f"Vector(x={self.x}, y={self.y})"

    def __add__(self, other):
        if isinstance(other, Vector):
            return Vector(self.x + other.x, self.y + other.y)
        return NotImplemented

    def __sub__(self, other):
        if isinstance(other, Vector):
            return Vector(self.x - other.x, self.y - other.y)
        return NotImplemented

    def __mul__(self, scalar):
        if isinstance(scalar, (int, float)):
            return Vector(self.x * scalar, self.y * scalar)
        return NotImplemented

    def __eq__(self, other):
        if isinstance(other, Vector):
            return self.x == other.x and self.y == other.y
        return False

    def __len__(self):
        # ベクトルの大きさ(整数部分)を返す
        return int((self.x**2 + self.y**2)**0.5)

    def magnitude(self):
        return (self.x**2 + self.y**2)**0.5

# 使用例
v1 = Vector(3, 4)
v2 = Vector(1, 2)

print(v1)  # __str__が呼ばれる
print(repr(v1))  # __repr__が呼ばれる

v3 = v1 + v2  # __add__が呼ばれる
print(f"加算: {v3}")

v4 = v1 * 2  # __mul__が呼ばれる
print(f"スカラー倍: {v4}")

print(f"ベクトルの大きさ: {v1.magnitude():.2f}")
print(f"長さ(整数): {len(v1)}")
print(f"等価比較: {v1 == v2}")

21. 独自例外の定義

class InsufficientFundsError(Exception):
    """残高不足エラー"""
    def __init__(self, current_balance, amount):
        self.current_balance = current_balance
        self.amount = amount
        self.shortage = amount - current_balance
        super().__init__(f"残高不足: 現在の残高 {current_balance}円, 要求額 {amount}円, 不足額 {self.shortage}円")

class AccountNotFoundError(Exception):
    """口座不存在エラー"""
    def __init__(self, account_number):
        self.account_number = account_number
        super().__init__(f"口座番号 {account_number} は存在しません")

class BankingSystem:
    def __init__(self):
        self.accounts = {}

    def create_account(self, account_number, initial_balance=0):
        self.accounts[account_number] = initial_balance

    def withdraw(self, account_number, amount):
        if account_number not in self.accounts:
            raise AccountNotFoundError(account_number)

        current_balance = self.accounts[account_number]

        if amount > current_balance:
            raise InsufficientFundsError(current_balance, amount)

        self.accounts[account_number] -= amount
        return self.accounts[account_number]

# 使用例
bank = BankingSystem()
bank.create_account("12345", 1000)

try:
    new_balance = bank.withdraw("12345", 500)
    print(f"引き出し成功。残高: {new_balance}円")

    # 残高不足のテスト
    bank.withdraw("12345", 1000)

except InsufficientFundsError as e:
    print(f"エラー: {e}")

except AccountNotFoundError as e:
    print(f"エラー: {e}")

22. デコレータの作成

import time
import functools

def timer(func):
    """関数の実行時間を計測するデコレータ"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        execution_time = end_time - start_time
        print(f"関数 {func.__name__} の実行時間: {execution_time:.4f}秒")
        return result
    return wrapper

def retry(max_attempts=3, delay=1):
    """リトライ機能を提供するデコレータ"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    print(f"試行 {attempt + 1}/{max_attempts} 失敗: {e}")
                    if attempt < max_attempts - 1:
                        print(f"{delay}秒後に再試行します...")
                        time.sleep(delay)
            print(f"すべての試行が失敗しました")
            raise last_exception
        return wrapper
    return decorator

# 使用例
@timer
def slow_function():
    """時間がかかる関数のシミュレーション"""
    time.sleep(1)
    return "完了"

@retry(max_attempts=3, delay=1)
def potentially_failing_function(should_fail=True):
    """時々失敗する関数のシミュレーション"""
    if should_fail:
        raise ValueError("何か問題が発生しました")
    return "成功"

# テスト
print(slow_function())

try:
    print(potentially_failing_function(should_fail=True))
except Exception as e:
    print(f"最終エラー: {e}")

23. ジェネレータの実装

def fibonacci_generator(limit=None):
    """
    フィボナッチ数列を生成するジェネレータ
    limit: 生成する最大値(オプション)
    """
    a, b = 0, 1
    count = 0

    while True:
        if limit is not None and a > limit:
            return
        yield a
        a, b = b, a + b
        count += 1

def prime_generator(limit):
    """素数を生成するジェネレータ"""
    def is_prime(n):
        if n < 2:
            return False
        for i in range(2, int(n**0.5) + 1):
            if n % i == 0:
                return False
        return True

    for num in range(2, limit + 1):
        if is_prime(num):
            yield num

# 使用例
print("フィボナッチ数列(100以下):")
fib_gen = fibonacci_generator(limit=100)
for num in fib_gen:
    print(num, end=" ")
print("\n")

print("100以下の素数:")
prime_gen = prime_generator(100)
for prime in prime_gen:
    print(prime, end=" ")
print()

# ジェネレータ式の例
squares = (x**2 for x in range(10))
print("\nジェネレータ式による2乗数列:")
for square in squares:
    print(square, end=" ")

24. コンテキストマネージャ

import sqlite3
from contextlib import contextmanager

class DatabaseConnection:
    """データベース接続のコンテキストマネージャ(クラス版)"""
    def __init__(self, db_path):
        self.db_path = db_path
        self.connection = None

    def __enter__(self):
        self.connection = sqlite3.connect(self.db_path)
        print(f"データベース '{self.db_path}' に接続しました")
        return self.connection

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.connection:
            if exc_type is not None:
                self.connection.rollback()
                print("変更をロールバックしました")
            else:
                self.connection.commit()
                print("変更をコミットしました")
            self.connection.close()
            print("データベース接続を閉じました")
        return False  # 例外を再発生させる

@contextmanager
def file_manager(filename, mode):
    """ファイル操作のコンテキストマネージャ(contextlib版)"""
    try:
        file = open(filename, mode, encoding='utf-8')
        print(f"ファイル '{filename}' を {mode} モードで開きました")
        yield file
    except Exception as e:
        print(f"エラーが発生しました: {e}")
        raise
    finally:
        file.close()
        print(f"ファイル '{filename}' を閉じました")

# 使用例
def test_database_context():
    with DatabaseConnection('test.db') as conn:
        cursor = conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY,
                name TEXT NOT NULL
            )
        ''')
        cursor.execute("INSERT INTO users (name) VALUES (?)", ("山田太郎",))
        print("ユーザーデータを挿入しました")

def test_file_context():
    with file_manager('test.txt', 'w') as f:
        f.write("Hello, Context Manager!\n")
        f.write("これはコンテキストマネージャのテストです\n")
        print("ファイルに書き込みました")

    with file_manager('test.txt', 'r') as f:
        content = f.read()
        print("ファイルの内容:")
        print(content)

# テスト実行
test_database_context()
print()
test_file_context()

25. ログ出力の実装

import logging
import logging.handlers
from datetime import datetime

def setup_logging():
    """ロギングの設定"""
    # ロガーの作成
    logger = logging.getLogger('my_app')
    logger.setLevel(logging.DEBUG)

    # フォーマッタの作成
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )

    # コンソールハンドラ
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    console_handler.setFormatter(formatter)

    # ファイルハンドラ
    file_handler = logging.FileHandler('app.log', encoding='utf-8')
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(formatter)

    # ローテーティングファイルハンドラ
    rotating_handler = logging.handlers.RotatingFileHandler(
        'app_rotating.log', maxBytes=1024*1024, backupCount=5, encoding='utf-8'
    )
    rotating_handler.setLevel(logging.DEBUG)
    rotating_handler.setFormatter(formatter)

    # ハンドラをロガーに追加
    logger.addHandler(console_handler)
    logger.addHandler(file_handler)
    logger.addHandler(rotating_handler)

    return logger

class Application:
    def __init__(self):
        self.logger = setup_logging()

    def process_data(self, data):
        self.logger.info(f"データ処理を開始: {data}")

        try:
            if not data:
                raise ValueError("データが空です")

            # 処理のシミュレーション
            result = len(data) * 2
            self.logger.debug(f"処理結果: {result}")

            self.logger.info("データ処理が正常に完了しました")
            return result

        except Exception as e:
            self.logger.error(f"データ処理中にエラーが発生: {e}")
            self.logger.exception("詳細な例外情報:")  # スタックトレースも出力
            raise

    def startup(self):
        self.logger.info("アプリケーションを起動しました")

        # 様々なレベルのログを出力
        self.logger.debug("デバッグ情報")
        self.logger.info("情報メッセージ")
        self.logger.warning("警告メッセージ")

        # データ処理のテスト
        self.process_data([1, 2, 3])
        self.process_data([])  # エラーを発生させる

        self.logger.info("アプリケーションを終了しました")

# 使用例
if __name__ == "__main__":
    app = Application()
    app.startup()

26. JSONデータの処理

import json
import os

class JSONDataManager:
    def __init__(self, filename):
        self.filename = filename
        self.data = self.load_data()

    def load_data(self):
        """JSONファイルからデータを読み込む"""
        try:
            if os.path.exists(self.filename):
                with open(self.filename, 'r', encoding='utf-8') as f:
                    return json.load(f)
            else:
                return {}
        except (json.JSONDecodeError, IOError) as e:
            print(f"データ読み込みエラー: {e}")
            return {}

    def save_data(self):
        """データをJSONファイルに保存する"""
        try:
            with open(self.filename, 'w', encoding='utf-8') as f:
                json.dump(self.data, f, ensure_ascii=False, indent=2)
            print(f"データを '{self.filename}' に保存しました")
        except IOError as e:
            print(f"データ保存エラー: {e}")

    def get_value(self, key, default=None):
        """値を取得する"""
        return self.data.get(key, default)

    def set_value(self, key, value):
        """値を設定する"""
        self.data[key] = value

    def delete_value(self, key):
        """値を削除する"""
        if key in self.data:
            del self.data[key]
            return True
        return False

    def display_data(self):
        """データを表示する"""
        print("現在のデータ:")
        print(json.dumps(self.data, ensure_ascii=False, indent=2))

# 使用例
def json_example():
    manager = JSONDataManager('config.json')

    # データの設定
    manager.set_value('app_name', 'My Application')
    manager.set_value('version', '1.0.0')
    manager.set_value('settings', {
        'theme': 'dark',
        'language': 'ja',
        'notifications': True
    })
    manager.set_value('recent_files', ['file1.txt', 'file2.py', 'file3.json'])

    # データの表示
    manager.display_data()

    # データの保存
    manager.save_data()

    # 値の取得と削除
    print(f"\nアプリ名: {manager.get_value('app_name')}")
    manager.delete_value('recent_files')

    # 変更後の表示
    manager.display_data()
    manager.save_data()

# 複雑なJSONデータの処理例
def complex_json_example():
    # ネストされたJSONデータ
    complex_data = {
        "users": [
            {
                "id": 1,
                "name": "山田太郎",
                "email": "taro@example.com",
                "preferences": {
                    "theme": "dark",
                    "notifications": True
                }
            },
            {
                "id": 2,
                "name": "佐藤花子",
                "email": "hanako@example.com",
                "preferences": {
                    "theme": "light",
                    "notifications": False
                }
            }
        ],
        "metadata": {
            "total_users": 2,
            "created_at": "2024-01-01"
        }
    }

    # JSON文字列に変換
    json_string = json.dumps(complex_data, ensure_ascii=False, indent=2)
    print("JSON文字列:")
    print(json_string)

    # ファイルに保存
    with open('users.json', 'w', encoding='utf-8') as f:
        json.dump(complex_data, f, ensure_ascii=False, indent=2)

    # ファイルから読み込み
    with open('users.json', 'r', encoding='utf-8') as f:
        loaded_data = json.load(f)

    print("\n読み込んだデータからユーザー情報を表示:")
    for user in loaded_data['users']:
        print(f"名前: {user['name']}, メール: {user['email']}")

# テスト実行
json_example()
print("\n" + "="*50)
complex_json_example()

27. CSVデータの操作

import csv
import os

class CSVManager:
    def __init__(self, filename):
        self.filename = filename

    def read_csv(self):
        """CSVファイルを読み込む"""
        data = []
        try:
            with open(self.filename, 'r', encoding='utf-8') as file:
                reader = csv.DictReader(file)
                for row in reader:
                    data.append(row)
            print(f"CSVファイル '{self.filename}' から {len(data)} 行読み込みました")
        except FileNotFoundError:
            print(f"ファイル '{self.filename}' が見つかりません")
        except Exception as e:
            print(f"読み込みエラー: {e}")

        return data

    def write_csv(self, data, fieldnames=None):
        """データをCSVファイルに書き込む"""
        if not data:
            print("書き込むデータがありません")
            return

        if fieldnames is None:
            fieldnames = data[0].keys()

        try:
            with open(self.filename, 'w', encoding='utf-8', newline='') as file:
                writer = csv.DictWriter(file, fieldnames=fieldnames)
                writer.writeheader()
                writer.writerows(data)
            print(f"データを '{self.filename}' に書き込みました")
        except Exception as e:
            print(f"書き込みエラー: {e}")

    def add_row(self, row_data):
        """CSVファイルに行を追加する"""
        try:
            # 既存のデータを読み込む
            existing_data = self.read_csv()

            # 新しい行を追加
            existing_data.append(row_data)

            # フィールド名を取得
            fieldnames = existing_data[0].keys() if existing_data else row_data.keys()

            # 書き戻す
            self.write_csv(existing_data, fieldnames)
            print("行を追加しました")

        except Exception as e:
            print(f"行追加エラー: {e}")

    def filter_data(self, condition_func):
        """条件に合致するデータをフィルタリングする"""
        data = self.read_csv()
        filtered_data = [row for row in data if condition_func(row)]
        print(f"フィルタリング結果: {len(filtered_data)} 行")
        return filtered_data

def create_sample_data():
    """サンプルデータを作成"""
    sample_data = [
        {'名前': '山田太郎', '年齢': '25', '都市': '東京', '給与': '500000'},
        {'名前': '佐藤花子', '年齢': '30', '都市': '大阪', '給与': '600000'},
        {'名前': '鈴木一郎', '年齢': '22', '都市': '名古屋', '給与': '450000'},
        {'名前': '田中みどり', '年齢': '28', '都市': '福岡', '給与': '550000'},
        {'名前': '高橋健太', '年齢': '35', '都市': '札幌', '給与': '700000'}
    ]

    manager = CSVManager('employees.csv')
    manager.write_csv(sample_data)
    return manager

def demonstrate_csv_operations():
    """CSV操作のデモンストレーション"""
    # サンプルデータ作成
    manager = create_sample_data()

    print("\n=== CSVデータ読み込み ===")
    data = manager.read_csv()
    for row in data:
        print(row)

    print("\n=== フィルタリング(年齢25歳以上) ===")
    def age_filter(row):
        return int(row['年齢']) >= 25

    filtered = manager.filter_data(age_filter)
    for row in filtered:
        print(row)

    print("\n=== 新しい行を追加 ===")
    new_row = {'名前': '伊藤さくら', '年齢': '26', '都市': '京都', '給与': '480000'}
    manager.add_row(new_row)

    # 追加後のデータを表示
    updated_data = manager.read_csv()
    for row in updated_data:
        print(row)

    print("\n=== 給与の統計情報 ===")
    salaries = [int(row['給与']) for row in updated_data]
    print(f"平均給与: {sum(salaries) / len(salaries):.0f}円")
    print(f"最高給与: {max(salaries)}円")
    print(f"最低給与: {min(salaries)}円")

def advanced_csv_processing():
    """高度なCSV処理"""
    import csv

    # カスタムDialectの定義
    csv.register_dialect('my_dialect', delimiter='|', quoting=csv.QUOTE_ALL)

    data = [
        ['商品名', '価格', '在庫'],
        ['ノートパソコン', '120000', '15'],
        ['スマートフォン', '80000', '30'],
        ['タブレット', '50000', '25']
    ]

    # カスタムDialectで書き込み
    with open('products.csv', 'w', encoding='utf-8', newline='') as f:
        writer = csv.writer(f, dialect='my_dialect')
        writer.writerows(data)

    print("カスタムDialectでCSVファイルを作成しました")

    # 読み込み
    with open('products.csv', 'r', encoding='utf-8') as f:
        reader = csv.reader(f, dialect='my_dialect')
        for row in reader:
            print(row)

# テスト実行
demonstrate_csv_operations()
print("\n" + "="*50)
advanced_csv_processing()

28. データベース接続

import sqlite3
import os

class DatabaseManager:
    def __init__(self, db_name):
        self.db_name = db_name
        self.connection = None

    def connect(self):
        """データベースに接続する"""
        try:
            self.connection = sqlite3.connect(self.db_name)
            print(f"データベース '{self.db_name}' に接続しました")
            return True
        except sqlite3.Error as e:
            print(f"データベース接続エラー: {e}")
            return False

    def disconnect(self):
        """データベース接続を閉じる"""
        if self.connection:
            self.connection.close()
            print("データベース接続を閉じました")

    def execute_query(self, query, params=None):
        """クエリを実行する"""
        try:
            cursor = self.connection.cursor()
            if params:
                cursor.execute(query, params)
            else:
                cursor.execute(query)

            self.connection.commit()
            return cursor
        except sqlite3.Error as e:
            print(f"クエリ実行エラー: {e}")
            return None

    def create_tables(self):
        """必要なテーブルを作成する"""
        tables = {
            'users': '''
                CREATE TABLE IF NOT EXISTS users (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    username TEXT UNIQUE NOT NULL,
                    email TEXT UNIQUE NOT NULL,
                    age INTEGER,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''',
            'products': '''
                CREATE TABLE IF NOT EXISTS products (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    name TEXT NOT NULL,
                    price REAL NOT NULL,
                    category TEXT,
                    stock INTEGER DEFAULT 0,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''',
            'orders': '''
                CREATE TABLE IF NOT EXISTS orders (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    user_id INTEGER,
                    product_id INTEGER,
                    quantity INTEGER NOT NULL,
                    total_price REAL NOT NULL,
                    order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    FOREIGN KEY (user_id) REFERENCES users (id),
                    FOREIGN KEY (product_id) REFERENCES products (id)
                )
            '''
        }

        for table_name, query in tables.items():
            self.execute_query(query)
            print(f"テーブル '{table_name}' を作成または確認しました")

    def insert_sample_data(self):
        """サンプルデータを挿入する"""
        # ユーザーデータ
        users = [
            ('山田太郎', 'taro@example.com', 25),
            ('佐藤花子', 'hanako@example.com', 30),
            ('鈴木一郎', 'ichiro@example.com', 22)
        ]

        for user in users:
            self.execute_query(
                "INSERT OR IGNORE INTO users (username, email, age) VALUES (?, ?, ?)",
                user
            )

        # 商品データ
        products = [
            ('ノートパソコン', 120000, '電子機器', 15),
            ('スマートフォン', 80000, '電子機器', 30),
            ('デスク', 25000, '家具', 10),
            ('椅子', 15000, '家具', 20)
        ]

        for product in products:
            self.execute_query(
                "INSERT OR IGNORE INTO products (name, price, category, stock) VALUES (?, ?, ?, ?)",
                product
            )

        print("サンプルデータを挿入しました")

    def display_tables(self):
        """テーブルの内容を表示する"""
        tables = ['users', 'products', 'orders']

        for table in tables:
            print(f"\n=== {table}テーブル ===")
            cursor = self.execute_query(f"SELECT * FROM {table}")
            if cursor:
                # カラム名を取得
                columns = [description[0] for description in cursor.description]
                print(" | ".join(columns))
                print("-" * 50)

                rows = cursor.fetchall()
                for row in rows:
                    print(" | ".join(str(item) for item in row))

def database_demo():
    """データベースデモンストレーション"""
    # データベースファイルが既に存在する場合は削除
    if os.path.exists('demo.db'):
        os.remove('demo.db')

    # データベースマネージャの作成
    db_manager = DatabaseManager('demo.db')

    # 接続
    if db_manager.connect():
        # テーブル作成
        db_manager.create_tables()

        # サンプルデータ挿入
        db_manager.insert_sample_data()

        # データ表示
        db_manager.display_tables()

        # 切断
        db_manager.disconnect()

# テスト実行
database_demo()

29. CRUD操作の実装

import sqlite3
from typing import List, Dict, Any, Optional

class CRUDOperations:
    def __init__(self, db_name: str):
        self.db_name = db_name
        self._create_tables()

    def _get_connection(self):
        """データベース接続を取得する"""
        return sqlite3.connect(self.db_name)

    def _create_tables(self):
        """必要なテーブルを作成する"""
        with self._get_connection() as conn:
            cursor = conn.cursor()

            # 学生テーブル
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS students (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    name TEXT NOT NULL,
                    email TEXT UNIQUE NOT NULL,
                    age INTEGER,
                    grade TEXT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')

            # 科目テーブル
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS courses (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    name TEXT NOT NULL,
                    instructor TEXT NOT NULL,
                    credits INTEGER DEFAULT 3
                )
            ''')

            # 成績テーブル
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS grades (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    student_id INTEGER,
                    course_id INTEGER,
                    score INTEGER CHECK(score >= 0 AND score <= 100),
                    semester TEXT,
                    FOREIGN KEY (student_id) REFERENCES students (id),
                    FOREIGN KEY (course_id) REFERENCES courses (id)
                )
            ''')

            conn.commit()

    # CREATE操作
    def create_student(self, name: str, email: str, age: int, grade: str) -> bool:
        """新しい学生を作成する"""
        try:
            with self._get_connection() as conn:
                cursor = conn.cursor()
                cursor.execute(
                    "INSERT INTO students (name, email, age, grade) VALUES (?, ?, ?, ?)",
                    (name, email, age, grade)
                )
                conn.commit()
                print(f"学生 '{name}' を作成しました (ID: {cursor.lastrowid})")
                return True
        except sqlite3.IntegrityError:
            print(f"エラー: メールアドレス '{email}' は既に存在します")
            return False
        except sqlite3.Error as e:
            print(f"学生作成エラー: {e}")
            return False

    def create_course(self, name: str, instructor: str, credits: int = 3) -> bool:
        """新しい科目を作成する"""
        try:
            with self._get_connection() as conn:
                cursor = conn.cursor()
                cursor.execute(
                    "INSERT INTO courses (name, instructor, credits) VALUES (?, ?, ?)",
                    (name, instructor, credits)
                )
                conn.commit()
                print(f"科目 '{name}' を作成しました (ID: {cursor.lastrowid})")
                return True
        except sqlite3.Error as e:
            print(f"科目作成エラー: {e}")
            return False

    # READ操作
    def get_all_students(self) -> List[Dict[str, Any]]:
        """全ての学生を取得する"""
        try:
            with self._get_connection() as conn:
                cursor = conn.cursor()
                cursor.execute("SELECT * FROM students")
                columns = [description[0] for description in cursor.description]
                students = []
                for row in cursor.fetchall():
                    students.append(dict(zip(columns, row)))
                return students
        except sqlite3.Error as e:
            print(f"学生取得エラー: {e}")
            return []

    def get_student_by_id(self, student_id: int) -> Optional[Dict[str, Any]]:
        """IDで学生を取得する"""
        try:
            with self._get_connection() as conn:
                cursor = conn.cursor()
                cursor.execute("SELECT * FROM students WHERE id = ?", (student_id,))
                row = cursor.fetchone()
                if row:
                    columns = [description[0] for description in cursor.description]
                    return dict(zip(columns, row))
                return None
        except sqlite3.Error as e:
            print(f"学生取得エラー: {e}")
            return None

    def search_students(self, **filters) -> List[Dict[str, Any]]:
        """条件で学生を検索する"""
        try:
            with self._get_connection() as conn:
                cursor = conn.cursor()

                where_clauses = []
                params = []

                for key, value in filters.items():
                    if value is not None:
                        where_clauses.append(f"{key} = ?")
                        params.append(value)

                query = "SELECT * FROM students"
                if where_clauses:
                    query += " WHERE " + " AND ".join(where_clauses)

                cursor.execute(query, params)
                columns = [description[0] for description in cursor.description]
                students = []
                for row in cursor.fetchall():
                    students.append(dict(zip(columns, row)))
                return students
        except sqlite3.Error as e:
            print(f"学生検索エラー: {e}")
            return []

    # UPDATE操作
    def update_student(self, student_id: int, **updates) -> bool:
        """学生情報を更新する"""
        try:
            with self._get_connection() as conn:
                cursor = conn.cursor()

                set_clauses = []
                params = []

                for key, value in updates.items():
                    if value is not None:
                        set_clauses.append(f"{key} = ?")
                        params.append(value)

                if not set_clauses:
                    print("更新する項目がありません")
                    return False

                params.append(student_id)
                query = f"UPDATE students SET {', '.join(set_clauses)} WHERE id = ?"

                cursor.execute(query, params)
                conn.commit()

                if cursor.rowcount > 0:
                    print(f"学生ID {student_id} を更新しました")
                    return True
                else:
                    print(f"学生ID {student_id} が見つかりません")
                    return False
        except sqlite3.Error as e:
            print(f"学生更新エラー: {e}")
            return False

    # DELETE操作
    def delete_student(self, student_id: int) -> bool:
        """学生を削除する"""
        try:
            with self._get_connection() as conn:
                cursor = conn.cursor()
                cursor.execute("DELETE FROM students WHERE id = ?", (student_id,))
                conn.commit()

                if cursor.rowcount > 0:
                    print(f"学生ID {student_id} を削除しました")
                    return True
                else:
                    print(f"学生ID {student_id} が見つかりません")
                    return False
        except sqlite3.Error as e:
            print(f"学生削除エラー: {e}")
            return False

    # 高度なREAD操作
    def get_student_grades(self, student_id: int) -> List[Dict[str, Any]]:
        """学生の成績を取得する"""
        try:
            with self._get_connection() as conn:
                cursor = conn.cursor()
                cursor.execute('''
                    SELECT s.name as student_name, c.name as course_name, 
                           g.score, g.semester, c.instructor
                    FROM grades g
                    JOIN students s ON g.student_id = s.id
                    JOIN courses c ON g.course_id = c.id
                    WHERE s.id = ?
                    ORDER BY g.semester, c.name
                ''', (student_id,))

                columns = [description[0] for description in cursor.description]
                grades = []
                for row in cursor.fetchall():
                    grades.append(dict(zip(columns, row)))
                return grades
        except sqlite3.Error as e:
            print(f"成績取得エラー: {e}")
            return []

    def add_grade(self, student_id: int, course_id: int, score: int, semester: str) -> bool:
        """成績を追加する"""
        try:
            with self._get_connection() as conn:
                cursor = conn.cursor()
                cursor.execute(
                    "INSERT INTO grades (student_id, course_id, score, semester) VALUES (?, ?, ?, ?)",
                    (student_id, course_id, score, semester)
                )
                conn.commit()
                print(f"成績を追加しました (ID: {cursor.lastrowid})")
                return True
        except sqlite3.Error as e:
            print(f"成績追加エラー: {e}")
            return False

def crud_demonstration():
    """CRUD操作のデモンストレーション"""
    # CRUDオペレーションのインスタンス作成
    crud = CRUDOperations('school.db')

    print("=== 学生の作成 ===")
    crud.create_student("山田太郎", "taro@example.com", 20, "A")
    crud.create_student("佐藤花子", "hanako@example.com", 22, "B")
    crud.create_student("鈴木一郎", "ichiro@example.com", 19, "A")

    print("\n=== 科目の作成 ===")
    crud.create_course("数学", "田中先生", 4)
    crud.create_course("英語", "佐々木先生", 3)
    crud.create_course("プログラミング", "山本先生", 3)

    print("\n=== 全ての学生を表示 ===")
    students = crud.get_all_students()
    for student in students:
        print(f"ID: {student['id']}, 名前: {student['name']}, 年齢: {student['age']}")

    print("\n=== 条件で学生を検索 (年齢20歳以上) ===")
    adult_students = crud.search_students(age=20)
    for student in adult_students:
        print(f"名前: {student['name']}, 年齢: {student['age']}")

    print("\n=== 学生情報の更新 ===")
    crud.update_student(1, age=21, grade="A+")

    print("\n=== 更新後の学生情報 ===")
    student = crud.get_student_by_id(1)
    if student:
        print(f"名前: {student['name']}, 年齢: {student['age']}, グレード: {student['grade']}")

    print("\n=== 成績の追加 ===")
    crud.add_grade(1, 1, 85, "2024前期")
    crud.add_grade(1, 2, 92, "2024前期")
    crud.add_grade(2, 1, 78, "2024前期")

    print("\n=== 学生の成績表示 ===")
    grades = crud.get_student_grades(1)
    for grade in grades:
        print(f"科目: {grade['course_name']}, 点数: {grade['score']}, 学期: {grade['semester']}")

    print("\n=== 学生の削除 ===")
    # 新しい学生を作成して削除
    crud.create_student("テスト学生", "test@example.com", 25, "C")
    crud.delete_student(4)

# テスト実行
crud_demonstration()

30. データ永続化クラス

import json
import os
from typing import Any, Dict, Optional
from datetime import datetime

class SettingsManager:
    """設定をJSONファイルで永続化するクラス"""

    def __init__(self, filename: str = "settings.json"):
        self.filename = filename
        self.settings = self._load_settings()

    def _load_settings(self) -> Dict[str, Any]:
        """設定をファイルから読み込む"""
        try:
            if os.path.exists(self.filename):
                with open(self.filename, 'r', encoding='utf-8') as f:
                    return json.load(f)
            else:
                return self._get_default_settings()
        except (json.JSONDecodeError, IOError) as e:
            print(f"設定読み込みエラー: {e}")
            return self._get_default_settings()

    def _get_default_settings(self) -> Dict[str, Any]:
        """デフォルト設定を返す"""
        return {
            "app_name": "My Application",
            "version": "1.0.0",
            "theme": "dark",
            "language": "ja",
            "notifications": True,
            "auto_save": True,
            "save_interval": 5,
            "recent_files": [],
            "window_size": {"width": 800, "height": 600},
            "created_at": datetime.now().isoformat(),
            "updated_at": datetime.now().isoformat()
        }

    def save_settings(self) -> bool:
        """設定をファイルに保存する"""
        try:
            # 更新日時を設定
            self.settings["updated_at"] = datetime.now().isoformat()

            with open(self.filename, 'w', encoding='utf-8') as f:
                json.dump(self.settings, f, ensure_ascii=False, indent=2)
            print(f"設定を '{self.filename}' に保存しました")
            return True
        except IOError as e:
            print(f"設定保存エラー: {e}")
            return False

    def get_setting(self, key: str, default: Any = None) -> Any:
        """設定値を取得する"""
        return self.settings.get(key, default)

    def set_setting(self, key: str, value: Any) -> None:
        """設定値を設定する"""
        self.settings[key] = value

    def update_settings(self, **kwargs) -> None:
        """複数の設定値を一度に更新する"""
        self.settings.update(kwargs)

    def reset_to_defaults(self) -> None:
        """設定をデフォルト値にリセットする"""
        self.settings = self._get_default_settings()
        self.save_settings()
        print("設定をデフォルト値にリセットしました")

    def add_recent_file(self, filepath: str) -> None:
        """最近使用したファイルを追加する"""
        recent_files = self.settings.get("recent_files", [])

        # 既に存在する場合は削除
        if filepath in recent_files:
            recent_files.remove(filepath)

        # 先頭に追加
        recent_files.insert(0, filepath)

        # 最大10個まで保持
        self.settings["recent_files"] = recent_files[:10]
        self.save_settings()

    def display_settings(self) -> None:
        """設定を表示する"""
        print("現在の設定:")
        print(json.dumps(self.settings, ensure_ascii=False, indent=2))

    def export_settings(self, export_filename: str) -> bool:
        """設定を別ファイルにエクスポートする"""
        try:
            with open(export_filename, 'w', encoding='utf-8') as f:
                json.dump(self.settings, f, ensure_ascii=False, indent=2)
            print(f"設定を '{export_filename}' にエクスポートしました")
            return True
        except IOError as e:
            print(f"設定エクスポートエラー: {e}")
            return False

    def import_settings(self, import_filename: str) -> bool:
        """設定を別ファイルからインポートする"""
        try:
            with open(import_filename, 'r', encoding='utf-8') as f:
                imported_settings = json.load(f)

            self.settings.update(imported_settings)
            self.save_settings()
            print(f"設定を '{import_filename}' からインポートしました")
            return True
        except (IOError, json.JSONDecodeError) as e:
            print(f"設定インポートエラー: {e}")
            return False

class UserPreferences:
    """ユーザー設定を管理するクラス(より具体的な実装)"""

    def __init__(self, username: str):
        self.username = username
        self.filename = f"{username}_preferences.json"
        self.manager = SettingsManager(self.filename)

    def set_theme(self, theme: str):
        """テーマを設定する"""
        valid_themes = ["light", "dark", "auto"]
        if theme in valid_themes:
            self.manager.set_setting("theme", theme)
            self.manager.save_settings()
        else:
            raise ValueError(f"無効なテーマ: {theme}. 有効な値: {valid_themes}")

    def set_language(self, language: str):
        """言語を設定する"""
        valid_languages = ["ja", "en", "es", "fr"]
        if language in valid_languages:
            self.manager.set_setting("language", language)
            self.manager.save_settings()
        else:
            raise ValueError(f"無効な言語: {language}. 有効な値: {valid_languages}")

    def get_preferences(self) -> Dict[str, Any]:
        """全ての設定を取得する"""
        return self.manager.settings

    def get_recent_projects(self) -> List[str]:
        """最近のプロジェクトを取得する"""
        return self.manager.get_setting("recent_projects", [])

    def add_recent_project(self, project_path: str):
        """最近のプロジェクトを追加する"""
        recent_projects = self.get_recent_projects()

        if project_path in recent_projects:
            recent_projects.remove(project_path)

        recent_projects.insert(0, project_path)
        self.manager.set_setting("recent_projects", recent_projects[:10])
        self.manager.save_settings()

def settings_demonstration():
    """設定管理のデモンストレーション"""
    print("=== 基本設定管理のデモ ===")

    # 設定マネージャの作成
    settings = SettingsManager()

    # 現在の設定を表示
    settings.display_settings()

    # 設定の変更
    print("\n=== 設定の変更 ===")
    settings.set_setting("theme", "light")
    settings.set_setting("save_interval", 10)
    settings.add_recent_file("/path/to/document1.txt")
    settings.add_recent_file("/path/to/spreadsheet.xlsx")

    # 変更後の設定を表示
    settings.display_settings()

    # 設定を保存
    settings.save_settings()

    # エクスポート
    settings.export_settings("settings_backup.json")

    print("\n=== ユーザー設定のデモ ===")
    user_prefs = UserPreferences("yamada")

    # ユーザー設定の変更
    user_prefs.set_theme("dark")
    user_prefs.set_language("ja")
    user_prefs.add_recent_project("/projects/web_app")
    user_prefs.add_recent_project("/projects/data_analysis")

    # ユーザー設定を表示
    print("ユーザー設定:")
    print(json.dumps(user_prefs.get_preferences(), ensure_ascii=False, indent=2))

def advanced_persistence_example():
    """高度な永続化の例"""
    import pickle

    class DataCache:
        """データキャッシュを管理するクラス(pickle使用)"""

        def __init__(self, cache_file: str = "cache.pkl"):
            self.cache_file = cache_file
            self.cache = self._load_cache()

        def _load_cache(self) -> Dict[str, Any]:
            """キャッシュを読み込む"""
            try:
                if os.path.exists(self.cache_file):
                    with open(self.cache_file, 'rb') as f:
                        return pickle.load(f)
                return {}
            except Exception as e:
                print(f"キャッシュ読み込みエラー: {e}")
                return {}

        def save_cache(self) -> bool:
            """キャッシュを保存する"""
            try:
                with open(self.cache_file, 'wb') as f:
                    pickle.dump(self.cache, f)
                return True
            except Exception as e:
                print(f"キャッシュ保存エラー: {e}")
                return False

        def set(self, key: str, value: Any, ttl: int = None) -> None:
            """キャッシュにデータを設定する"""
            cache_data = {
                'value': value,
                'created_at': datetime.now().isoformat(),
                'ttl': ttl
            }
            self.cache[key] = cache_data

        def get(self, key: str) -> Any:
            """キャッシュからデータを取得する"""
            if key not in self.cache:
                return None

            data = self.cache[key]

            # TTLチェック
            if data['ttl']:
                created = datetime.fromisoformat(data['created_at'])
                elapsed = (datetime.now() - created).seconds
                if elapsed > data['ttl']:
                    del self.cache[key]
                    return None

            return data['value']

        def clear(self) -> None:
            """キャッシュをクリアする"""
            self.cache.clear()
            self.save_cache()

    # キャッシュの使用例
    cache = DataCache()
    cache.set("user_data", {"name": "山田太郎", "age": 25}, ttl=3600)  # 1時間有効
    cache.set("app_config", {"theme": "dark", "language": "ja"})  # 無期限

    print("キャッシュからデータ取得:")
    print("user_data:", cache.get("user_data"))
    print("app_config:", cache.get("app_config"))

    cache.save_cache()

# テスト実行
settings_demonstration()
print("\n" + "="*50)
advanced_persistence_example()

31. HTTPリクエストの送信

import requests
import json
from typing import Dict, Any, Optional

class HTTPClient:
    """HTTPリクエストを送信するクライアントクラス"""

    def __init__(self, base_url: str = "", timeout: int = 10):
        self.base_url = base_url
        self.timeout = timeout
        self.session = requests.Session()

        # デフォルトヘッダー
        self.session.headers.update({
            'User-Agent': 'Python-HTTP-Client/1.0',
            'Accept': 'application/json'
        })

    def _build_url(self, endpoint: str) -> str:
        """完全なURLを構築する"""
        if self.base_url:
            return f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}"
        return endpoint

    def get(self, endpoint: str, params: Dict = None, **kwargs) -> Optional[Dict[str, Any]]:
        """GETリクエストを送信する"""
        url = self._build_url(endpoint)

        try:
            response = self.session.get(
                url, 
                params=params, 
                timeout=self.timeout,
                **kwargs
            )
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"GETリクエストエラー: {e}")
            return None

    def post(self, endpoint: str, data: Dict = None, json_data: Dict = None, **kwargs) -> Optional[Dict[str, Any]]:
        """POSTリクエストを送信する"""
        url = self._build_url(endpoint)

        try:
            response = self.session.post(
                url,
                data=data,
                json=json_data,
                timeout=self.timeout,
                **kwargs
            )
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"POSTリクエストエラー: {e}")
            return None

    def put(self, endpoint: str, data: Dict = None, json_data: Dict = None, **kwargs) -> Optional[Dict[str, Any]]:
        """PUTリクエストを送信する"""
        url = self._build_url(endpoint)

        try:
            response = self.session.put(
                url,
                data=data,
                json=json_data,
                timeout=self.timeout,
                **kwargs
            )
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"PUTリクエストエラー: {e}")
            return None

    def delete(self, endpoint: str, **kwargs) -> bool:
        """DELETEリクエストを送信する"""
        url = self._build_url(endpoint)

        try:
            response = self.session.delete(url, timeout=self.timeout, **kwargs)
            response.raise_for_status()
            return True
        except requests.exceptions.RequestException as e:
            print(f"DELETEリクエストエラー: {e}")
            return False

    def set_auth(self, auth_type: str, **auth_params):
        """認証を設定する"""
        if auth_type == 'basic':
            from requests.auth import HTTPBasicAuth
            self.session.auth = HTTPBasicAuth(auth_params.get('username'), auth_params.get('password'))
        elif auth_type == 'bearer':
            self.session.headers['Authorization'] = f"Bearer {auth_params.get('token')}"
        elif auth_type == 'api_key':
            self.session.headers[auth_params.get('header_name', 'X-API-Key')] = auth_params.get('api_key')

    def download_file(self, endpoint: str, local_filename: str) -> bool:
        """ファイルをダウンロードする"""
        url = self._build_url(endpoint)

        try:
            response = self.session.get(url, stream=True, timeout=self.timeout)
            response.raise_for_status()

            with open(local_filename, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)

            print(f"ファイルを '{local_filename}' にダウンロードしました")
            return True
        except requests.exceptions.RequestException as e:
            print(f"ファイルダウンロードエラー: {e}")
            return False

def http_client_demo():
    """HTTPクライアントのデモンストレーション"""
    print("=== HTTPクライアントデモ ===")

    # JSONPlaceholder APIを使用(テスト用の無料API)
    client = HTTPClient("https://jsonplaceholder.typicode.com")

    print("\n1. GETリクエスト - 投稿一覧を取得")
    posts = client.get("/posts")
    if posts:
        print(f"投稿数: {len(posts)}")
        for post in posts[:3]:  # 最初の3件のみ表示
            print(f"  - {post['title'][:50]}...")

    print("\n2. GETリクエスト - 特定の投稿を取得")
    post = client.get("/posts/1")
    if post:
        print(f"タイトル: {post['title']}")
        print(f"本文: {post['body'][:100]}...")

    print("\n3. POSTリクエスト - 新しい投稿を作成")
    new_post = {
        'title': 'テスト投稿',
        'body': 'これはテスト投稿です。',
        'userId': 1
    }
    created_post = client.post("/posts", json_data=new_post)
    if created_post:
        print(f"作成された投稿ID: {created_post.get('id')}")
        print(f"タイトル: {created_post.get('title')}")

    print("\n4. PUTリクエスト - 投稿を更新")
    updated_post = {
        'id': 1,
        'title': '更新されたタイトル',
        'body': '更新された本文',
        'userId': 1
    }
    result = client.put("/posts/1", json_data=updated_post)
    if result:
        print("投稿を更新しました")

    print("\n5. DELETEリクエスト - 投稿を削除")
    if client.delete("/posts/1"):
        print("投稿を削除しました")

def advanced_http_examples():
    """高度なHTTPリクエストの例"""
    print("\n=== 高度なHTTP例 ===")

    client = HTTPClient()

    # クエリパラメータ付きリクエスト
    print("1. クエリパラメータ付きリクエスト")
    params = {
        'q': 'Python',
        'page': 1,
        'limit': 10
    }
    # 注: これは実際のAPIではないのでエラーになりますが、構文の例として
    # result = client.get("https://api.example.com/search", params=params)

    # カスタムヘッダー
    print("2. カスタムヘッダー付きリクエスト")
    headers = {
        'X-Custom-Header': 'CustomValue',
        'Accept-Language': 'ja-JP'
    }
    # 一時的にカスタムヘッダーを設定
    client.session.headers.update(headers)

    # ファイルアップロード(概念実装)
    print("3. ファイルアップロードの例")
    def upload_file(file_path: str, upload_url: str) -> bool:
        try:
            with open(file_path, 'rb') as f:
                files = {'file': f}
                response = requests.post(upload_url, files=files)
                response.raise_for_status()
                print("ファイルアップロード成功")
                return True
        except Exception as e:
            print(f"ファイルアップロードエラー: {e}")
            return False

    # エラーハンドリングの詳細例
    print("4. 詳細なエラーハンドリング")
    try:
        response = requests.get("https://httpbin.org/status/404", timeout=5)
        response.raise_for_status()
    except requests.exceptions.HTTPError as e:
        print(f"HTTPエラー: {e}")
        print(f"ステータスコード: {response.status_code}")
    except requests.exceptions.ConnectionError as e:
        print(f"接続エラー: {e}")
    except requests.exceptions.Timeout as e:
        print(f"タイムアウトエラー: {e}")
    except requests.exceptions.RequestException as e:
        print(f"リクエストエラー: {e}")

def web_scraping_example():
    """ウェブスクレイピングの例(簡単な例)"""
    print("\n=== ウェブスクレイピング例 ===")

    try:
        from bs4 import BeautifulSoup

        # 簡単なHTMLパースの例
        html_content = """
        
            
                テストページ
            
            
                

メインタイトル

最初の段落

二番目の段落

リンク
  • 項目1
  • 項目2
  • 項目3
""" soup = BeautifulSoup(html_content, 'html.parser') print("タイトル:", soup.title.string) print("メインタイトル:", soup.h1.string) print("段落:") for p in soup.find_all('p'): print(" -", p.string) print("リンク:") for a in soup.find_all('a'): print(" -", a.get('href')) print("リスト項目:") for li in soup.find_all('li'): print(" -", li.string) except ImportError: print("BeautifulSoup4がインストールされていません。インストールするには: pip install beautifulsoup4") # テスト実行 http_client_demo() advanced_http_examples() web_scraping_example()

32. REST APIクライアント

```python
import requests
import json
from typing import List, Dict, Any, Optional

class JSONPlaceholderClient:
"""JSONPlaceholder APIのクライアント"""

def __init__(self):
    self.base_url = "https://jsonplaceholder.typicode.com"
    self.session = requests.Session()
    self.session.headers.update({
        'Content-Type': 'application/json',
        'User-Agent': 'JSONPlaceholder-Client/1.0'
    })

def _handle_response(self, response: requests.Response) -> Optional[Dict[str, Any]]:
    """レスポンスを処理する"""
    try:
        response.raise_for_status()
        return response.json()
    except requests.exceptions.HTTPError as e:
        print(f"HTTPエラー: {e}")
        return None
    except json.JSONDecodeError as e:
        print(f"JSONデコードエラー: {e}")
        return None

# 投稿関連のメソッド
def get_posts(self, user_id: Optional[int] = None) -> List[Dict[str, Any]]:
    """投稿一覧を取得する"""
    url = f"{self.base_url}/posts"
    params = {}
    if user_id:
        params['userId'] = user_id

    response = self.session.get(url, params=params)
    return self._handle_response(response) or []

def get_post(self, post_id: int) -> Optional[Dict[str, Any]]:
    """特定の投稿を取得する"""
    url = f"{self.base_url}/posts/{post_id}"
    response = self.session.get(url)
    return self._handle_response(response)

def create_post(self, title: str, body: str, user_id: int) -> Optional[Dict[str, Any]]:
    """新しい投稿を作成する"""
    url = f"{self.base_url}/posts"
    data = {
        'title': title,
        'body': body,
        'userId': user_id
    }
    response = self.session.post(url, json=data)
    return self._handle_response(response)

def update_post(self, post_id: int, title: str, body: str) -> Optional[Dict[str, Any]]:
    """投稿を更新する"""
    url = f"{self.base_url}/posts/{post_id}"
    data = {
        'title': title,
        'body': body
    }
    response = self.session.put(url, json=data)
    return self._handle_response(response)

def delete_post(self, post_id: int) -> bool:
    """投稿を削除する"""
    url = f"{self.base_url}/posts/{post_id}"
    response = self.session.delete(url)
    try:
        response.raise_for_status()
        return True
    except requests.exceptions.HTTPError as e:
        print(f"削除エラー: {e}")
        return False

# ユーザー関連のメソッド
def get_users(self) -> List[Dict[str, Any]]:
    """ユーザー一覧を取得する"""
    url = f"{self.base_url}/users"
    response = self.session.get(url)
    return self._handle_response(response) or []

def get_user(self, user_id: int) -> Optional[Dict[str, Any]]:
    """特定のユーザーを取得する"""
    url = f"{self.base_url}/users/{user_id}"
    response = self.session.get(url)
    return self._handle_response(response)

# コメント関連のメソッド
def get_comments(self, post_id: Optional[int] = None) -> List[Dict[str, Any]]:
    """コメント一覧を取得する"""
    url = f"{self.base_url}/comments"
    params = {}
    if post_id:
        params['postId'] = post_id

    response = self.session.get(url, params=params)
    return self._handle_response(response) or []

def get_post_comments(self, post_id: int) -> List[Dict[str, Any]]:
    """特定の投稿のコメントを取得する"""
    url = f"{self.base_url}/posts/{post_id}/comments"
    response = self.session.get(url)
    return self._handle_response(response) or []

33. データ可視化の基礎

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from datetime import datetime, timedelta

class DataVisualizer:
    """データ可視化クラス"""

    def __init__(self, style='seaborn'):
        plt.style.use(style)
        self.fig_size = (10, 6)

    def line_chart(self, x_data, y_data, title="折れ線グラフ", xlabel="X軸", ylabel="Y軸"):
        """折れ線グラフを作成"""
        plt.figure(figsize=self.fig_size)
        plt.plot(x_data, y_data, marker='o', linewidth=2, markersize=6)
        plt.title(title, fontsize=14, fontweight='bold')
        plt.xlabel(xlabel)
        plt.ylabel(ylabel)
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()

    def bar_chart(self, categories, values, title="棒グラフ", xlabel="カテゴリ", ylabel="値"):
        """棒グラフを作成"""
        plt.figure(figsize=self.fig_size)
        bars = plt.bar(categories, values, color='skyblue', edgecolor='navy', alpha=0.7)
        plt.title(title, fontsize=14, fontweight='bold')
        plt.xlabel(xlabel)
        plt.ylabel(ylabel)

        # 値のラベルを追加
        for bar, value in zip(bars, values):
            plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.1,
                    f'{value}', ha='center', va='bottom')

        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()

    def pie_chart(self, labels, sizes, title="円グラフ"):
        """円グラフを作成"""
        plt.figure(figsize=(8, 8))
        colors = plt.cm.Pastel1(np.linspace(0, 1, len(labels)))
        plt.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%', startangle=90)
        plt.title(title, fontsize=14, fontweight='bold')
        plt.axis('equal')
        plt.tight_layout()
        plt.show()

    def scatter_plot(self, x_data, y_data, categories=None, title="散布図"):
        """散布図を作成"""
        plt.figure(figsize=self.fig_size)

        if categories:
            unique_categories = list(set(categories))
            colors = plt.cm.Set3(np.linspace(0, 1, len(unique_categories)))

            for i, category in enumerate(unique_categories):
                mask = [cat == category for cat in categories]
                plt.scatter(np.array(x_data)[mask], np.array(y_data)[mask],
                          color=colors[i], label=category, alpha=0.7, s=60)
            plt.legend()
        else:
            plt.scatter(x_data, y_data, alpha=0.7, s=60, color='red')

        plt.title(title, fontsize=14, fontweight='bold')
        plt.xlabel('X軸')
        plt.ylabel('Y軸')
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()

    def histogram(self, data, bins=10, title="ヒストグラム"):
        """ヒストグラムを作成"""
        plt.figure(figsize=self.fig_size)
        plt.hist(data, bins=bins, color='lightgreen', edgecolor='black', alpha=0.7)
        plt.title(title, fontsize=14, fontweight='bold')
        plt.xlabel('値')
        plt.ylabel('頻度')
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()

    def multiple_plots(self, data_dict, plot_type='line'):
        """複数のプロットを一度に表示"""
        fig, axes = plt.subplots(2, 2, figsize=(12, 10))
        axes = axes.flatten()

        for i, (title, data) in enumerate(data_dict.items()):
            if i >= len(axes):
                break

            if plot_type == 'line':
                axes[i].plot(data['x'], data['y'])
            elif plot_type == 'bar':
                axes[i].bar(data['categories'], data['values'])

            axes[i].set_title(title)
            axes[i].grid(True, alpha=0.3)

        plt.tight_layout()
        plt.show()

def visualization_demo():
    """可視化デモンストレーション"""
    viz = DataVisualizer()

    # サンプルデータの生成
    x = list(range(1, 11))
    y = [i**2 for i in x]

    # 1. 折れ線グラフ
    print("1. 折れ線グラフ")
    viz.line_chart(x, y, "2乗関数", "X", "X²")

    # 2. 棒グラフ
    print("2. 棒グラフ")
    categories = ['Python', 'Java', 'JavaScript', 'C++', 'Go']
    values = [35, 25, 20, 15, 5]
    viz.bar_chart(categories, values, "プログラミング言語人気", "言語", "割合(%)")

    # 3. 円グラフ
    print("3. 円グラフ")
    viz.pie_chart(categories, values, "プログラミング言語シェア")

    # 4. 散布図
    print("4. 散布図")
    np.random.seed(42)
    x_scatter = np.random.randn(100)
    y_scatter = x_scatter * 2 + np.random.randn(100) * 0.5
    categories_scatter = ['A' if i < 50 else 'B' for i in range(100)]
    viz.scatter_plot(x_scatter, y_scatter, categories_scatter, "カテゴリ別散布図")

    # 5. ヒストグラム
    print("5. ヒストグラム")
    data_hist = np.random.normal(0, 1, 1000)
    viz.histogram(data_hist, bins=20, title="正規分布のヒストグラム")

# テスト実行
visualization_demo()

34. データ分析の基本

import pandas as pd
import numpy as np
from sklearn.datasets import load_iris
import matplotlib.pyplot as plt
import seaborn as sns

class DataAnalyzer:
    """データ分析クラス"""

    def __init__(self):
        self.df = None

    def load_sample_data(self):
        """サンプルデータを読み込み"""
        iris = load_iris()
        self.df = pd.DataFrame(iris.data, columns=iris.feature_names)
        self.df['species'] = [iris.target_names[i] for i in iris.target]
        return self.df

    def basic_info(self):
        """基本情報を表示"""
        print("=== データ基本情報 ===")
        print(f"形状: {self.df.shape}")
        print(f"\nカラム情報:")
        print(self.df.info())
        print(f"\n先頭5行:")
        print(self.df.head())

    def statistical_summary(self):
        """統計的要約を表示"""
        print("\n=== 統計的要約 ===")
        print(self.df.describe())

        print("\n=== カテゴリ変数の要約 ===")
        categorical_cols = self.df.select_dtypes(include=['object']).columns
        for col in categorical_cols:
            print(f"\n{col}の分布:")
            print(self.df[col].value_counts())

    def missing_values_analysis(self):
        """欠損値分析"""
        print("\n=== 欠損値分析 ===")
        missing = self.df.isnull().sum()
        missing_percent = (missing / len(self.df)) * 100

        missing_df = pd.DataFrame({
            '欠損値数': missing,
            '欠損率(%)': missing_percent
        })
        print(missing_df[missing_df['欠損値数'] > 0])

    def correlation_analysis(self):
        """相関分析"""
        print("\n=== 相関行列 ===")
        numeric_df = self.df.select_dtypes(include=[np.number])
        correlation = numeric_df.corr()
        print(correlation)

        # 相関行列のヒートマップ
        plt.figure(figsize=(8, 6))
        sns.heatmap(correlation, annot=True, cmap='coolwarm', center=0)
        plt.title('相関行列ヒートマップ')
        plt.tight_layout()
        plt.show()

    def outlier_detection(self):
        """外れ値検出"""
        print("\n=== 外れ値分析 ===")
        numeric_df = self.df.select_dtypes(include=[np.number])

        for col in numeric_df.columns:
            Q1 = numeric_df[col].quantile(0.25)
            Q3 = numeric_df[col].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR

            outliers = numeric_df[(numeric_df[col] < lower_bound) | (numeric_df[col] > upper_bound)]
            print(f"{col}: {len(outliers)}個の外れ値")

    def group_analysis(self, group_col, target_col):
        """グループ別分析"""
        print(f"\n=== {group_col}別{target_col}分析 ===")
        grouped = self.df.groupby(group_col)[target_col]

        print("基本統計量:")
        print(grouped.agg(['mean', 'median', 'std', 'min', 'max']))

        # ボックスプロット
        plt.figure(figsize=(10, 6))
        self.df.boxplot(column=target_col, by=group_col)
        plt.title(f'{group_col}別{target_col}の分布')
        plt.suptitle('')  # 自動タイトルを削除
        plt.tight_layout()
        plt.show()

    def time_series_analysis(self):
        """時系列分析の例"""
        print("\n=== 時系列分析例 ===")

        # サンプルの時系列データ作成
        dates = pd.date_range('2023-01-01', periods=100, freq='D')
        ts_data = pd.DataFrame({
            'date': dates,
            'value': np.cumsum(np.random.randn(100)) + 100,
            'volume': np.random.randint(100, 1000, 100)
        })
        ts_data.set_index('date', inplace=True)

        print("時系列データ基本情報:")
        print(ts_data.head())
        print(f"\nデータ範囲: {ts_data.index.min()} 〜 {ts_data.index.max()}")

        # 時系列プロット
        plt.figure(figsize=(12, 6))
        plt.plot(ts_data.index, ts_data['value'])
        plt.title('時系列データ')
        plt.xlabel('日付')
        plt.ylabel('値')
        plt.grid(True, alpha=0.3)
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()

def data_analysis_demo():
    """データ分析デモンストレーション"""
    analyzer = DataAnalyzer()

    # サンプルデータ読み込み
    df = analyzer.load_sample_data()

    # 各種分析の実行
    analyzer.basic_info()
    analyzer.statistical_summary()
    analyzer.missing_values_analysis()
    analyzer.correlation_analysis()
    analyzer.outlier_detection()
    analyzer.group_analysis('species', 'sepal length (cm)')
    analyzer.time_series_analysis()

# テスト実行
data_analysis_demo()

35. 正規表現の活用

import re
import pandas as pd

class TextProcessor:
    """テキスト処理クラス"""

    def __init__(self):
        # 一般的なパターンの定義
        self.patterns = {
            'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            'phone_jp': r'\b0\d{1,4}-\d{1,4}-\d{4}\b',
            'url': r'https?://[^\s]+',
            'ip_address': r'\b(?:\d{1,3}\.){3}\d{1,3}\b',
            'date_ymd': r'\b\d{4}[-/]\d{1,2}[-/]\d{1,2}\b',
            'time': r'\b\d{1,2}:\d{2}(?::\d{2})?\b',
            'hashtag': r'#\w+',
            'mention': r'@\w+'
        }

    def find_pattern(self, text, pattern_name):
        """特定のパターンを検索"""
        if pattern_name not in self.patterns:
            raise ValueError(f"未知のパターン: {pattern_name}")

        pattern = self.patterns[pattern_name]
        return re.findall(pattern, text)

    def extract_emails(self, text):
        """メールアドレスを抽出"""
        return self.find_pattern(text, 'email')

    def extract_phones(self, text):
        """電話番号を抽出"""
        return self.find_pattern(text, 'phone_jp')

    def extract_urls(self, text):
        """URLを抽出"""
        return self.find_pattern(text, 'url')

    def validate_email(self, email):
        """メールアドレスの検証"""
        pattern = self.patterns['email']
        return bool(re.fullmatch(pattern, email))

    def validate_phone(self, phone):
        """電話番号の検証"""
        pattern = self.patterns['phone_jp']
        return bool(re.fullmatch(pattern, phone))

    def clean_text(self, text, remove_patterns=None):
        """テキストのクリーニング"""
        if remove_patterns is None:
            remove_patterns = ['url', 'hashtag', 'mention']

        cleaned_text = text
        for pattern_name in remove_patterns:
            pattern = self.patterns.get(pattern_name, pattern_name)
            cleaned_text = re.sub(pattern, '', cleaned_text)

        # 余分な空白を削除
        cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip()
        return cleaned_text

    def extract_named_entities(self, text):
        """簡単な固有表現抽出(名前、組織など)"""
        entities = {}

        # 日本語の名前パターン(簡易版)
        name_pattern = r'[A-Za-z][a-z]+\s[A-Za-z][a-z]+|[々〇〻\u3400-\u9FFF\uF900-\uFAFF]{2,4}'
        entities['names'] = re.findall(name_pattern, text)

        # 組織名(大文字の単語の連続)
        org_pattern = r'[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\s+(Company|Corp|Inc|Ltd)'
        entities['organizations'] = re.findall(org_pattern, text)

        return entities

    def advanced_search(self, text, custom_pattern, flags=0):
        """高度な検索機能"""
        try:
            matches = re.finditer(custom_pattern, text, flags)
            results = []
            for match in matches:
                results.append({
                    'match': match.group(),
                    'start': match.start(),
                    'end': match.end(),
                    'groups': match.groups()
                })
            return results
        except re.error as e:
            print(f"正規表現エラー: {e}")
            return []

    def replace_pattern(self, text, pattern, replacement, count=0):
        """パターン置換"""
        return re.sub(pattern, replacement, text, count=count)

def regex_demo():
    """正規表現デモンストレーション"""
    processor = TextProcessor()

    # テスト用テキスト
    sample_text = """
    連絡先情報:
    メール: tanaka.taro@example.com, yamada.hanako@company.co.jp
    電話: 03-1234-5678, 090-1234-5678
    Web: https://www.example.com, http://test-site.org
    IP: 192.168.1.1, 10.0.0.1
    日付: 2024-01-15, 2024/12/25
    時間: 14:30, 09:00:00
    SNS: #python #programming @username
    住所: 〒100-0001 東京都千代田区千代田1-1-1
    """

    print("=== 正規表現デモ ===")

    # 1. メールアドレス抽出
    print("1. メールアドレス:")
    emails = processor.extract_emails(sample_text)
    for email in emails:
        print(f"  - {email} (有効: {processor.validate_email(email)})")

    # 2. 電話番号抽出
    print("\n2. 電話番号:")
    phones = processor.extract_phones(sample_text)
    for phone in phones:
        print(f"  - {phone} (有効: {processor.validate_phone(phone)})")

    # 3. URL抽出
    print("\n3. URL:")
    urls = processor.extract_urls(sample_text)
    for url in urls:
        print(f"  - {url}")

    # 4. テキストクリーニング
    print("\n4. クリーニング後のテキスト:")
    cleaned = processor.clean_text(sample_text)
    print(cleaned)

    # 5. 高度な検索
    print("\n5. 高度な検索(日付の検出):")
    date_pattern = r'\d{4}[-/]\d{1,2}[-/]\d{1,2}'
    date_matches = processor.advanced_search(sample_text, date_pattern)
    for match in date_matches:
        print(f"  - {match['match']} (位置: {match['start']}-{match['end']})")

    # 6. パターン置換
    print("\n6. メールアドレスのマスキング:")
    masked_text = processor.replace_pattern(
        sample_text, 
        processor.patterns['email'], 
        '***@***.***'
    )
    print(masked_text)

    # 7. 固有表現抽出
    print("\n7. 固有表現抽出:")
    entity_text = "田中太郎さんはABC Companyで働いています。山田花子さんはXYZ Corpの社員です。"
    entities = processor.extract_named_entities(entity_text)
    for entity_type, values in entities.items():
        print(f"  {entity_type}: {values}")

def password_validator():
    """パスワードバリデーター"""
    def validate_password(password):
        patterns = {
            'length': r'^.{8,}$',
            'uppercase': r'[A-Z]',
            'lowercase': r'[a-z]',
            'digit': r'\d',
            'special': r'[!@#$%^&*(),.?":{}|<>]'
        }

        results = {}
        for rule, pattern in patterns.items():
            results[rule] = bool(re.search(pattern, password))

        return results

    test_passwords = [
        "weak",
        "Medium123",
        "Strong@123",
        "NoSpecial123",
        "nouppercase123!",
        "Perfect@Password123"
    ]

    print("\n=== パスワード強度チェック ===")
    for pwd in test_passwords:
        validation = validate_password(pwd)
        score = sum(validation.values())
        status = "強" if score >= 4 else "中" if score >= 3 else "弱"
        print(f"{pwd}: {status} (スコア: {score}/5)")
        for rule, passed in validation.items():
            print(f"  {rule}: {'✓' if passed else '✗'}")

# テスト実行
regex_demo()
password_validator()

36. マルチスレッド処理

import threading
import time
import queue
import concurrent.futures
import requests
from typing import List, Callable

class ThreadManager:
    """スレッド管理クラス"""

    def __init__(self, max_workers=5):
        self.max_workers = max_workers
        self.results = queue.Queue()
        self.lock = threading.Lock()

    def simple_threads_demo(self):
        """簡単なスレッドデモ"""
        def worker(thread_id, duration):
            print(f"スレッド {thread_id} 開始")
            time.sleep(duration)
            print(f"スレッド {thread_id} 終了")
            return f"スレッド {thread_id} の結果"

        threads = []
        start_time = time.time()

        # スレッドの作成と開始
        for i in range(3):
            thread = threading.Thread(
                target=worker,
                args=(i, 2 - i * 0.5)  # 各スレッドで異なる待機時間
            )
            threads.append(thread)
            thread.start()

        # すべてのスレッドの終了を待機
        for thread in threads:
            thread.join()

        end_time = time.time()
        print(f"総実行時間: {end_time - start_time:.2f}秒")

    def synchronized_counter(self):
        """同期されたカウンター"""
        class Counter:
            def __init__(self):
                self.value = 0
                self._lock = threading.Lock()

            def increment(self):
                with self._lock:
                    current = self.value
                    time.sleep(0.001)  # 競合状態をシミュレート
                    self.value = current + 1

        counter = Counter()

        def increment_worker():
            for _ in range(100):
                counter.increment()

        threads = []
        for _ in range(10):
            thread = threading.Thread(target=increment_worker)
            threads.append(thread)
            thread.start()

        for thread in threads:
            thread.join()

        print(f"最終カウント: {counter.value} (期待値: 1000)")

    def producer_consumer_pattern(self):
        """プロデューサー・コンシューマーパターン"""
        def producer(q, items):
            for item in items:
                print(f"生産: {item}")
                q.put(item)
                time.sleep(0.1)
            q.put(None)  # 終了信号

        def consumer(q, name):
            while True:
                item = q.get()
                if item is None:
                    q.put(None)  # 他のコンシューマーのために戻す
                    break
                print(f"消費 {name}: {item}")
                time.sleep(0.2)
                q.task_done()

        q = queue.Queue()
        items = [f"アイテム{i}" for i in range(10)]

        # プロデューサースレッド
        producer_thread = threading.Thread(target=producer, args=(q, items))

        # コンシューマースレッド
        consumer_threads = []
        for i in range(2):
            thread = threading.Thread(target=consumer, args=(q, f"コンシューマー{i}"))
            consumer_threads.append(thread)

        # スレッド開始
        producer_thread.start()
        for thread in consumer_threads:
            thread.start()

        # 終了待機
        producer_thread.join()
        for thread in consumer_threads:
            thread.join()

    def thread_pool_executor_demo(self):
        """ThreadPoolExecutorのデモ"""
        def download_url(url):
            try:
                response = requests.get(url, timeout=5)
                return f"{url}: {len(response.content)} bytes"
            except Exception as e:
                return f"{url}: エラー - {e}"

        urls = [
            'https://httpbin.org/delay/1',
            'https://httpbin.org/delay/2',
            'https://httpbin.org/delay/1',
            'https://httpbin.org/status/404',
            'https://invalid-url-test.com'
        ]

        print("ThreadPoolExecutorによる並列ダウンロード:")
        start_time = time.time()

        with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
            future_to_url = {executor.submit(download_url, url): url for url in urls}

            for future in concurrent.futures.as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    result = future.result()
                    print(f"  {result}")
                except Exception as e:
                    print(f"  {url} で例外: {e}")

        end_time = time.time()
        print(f"総実行時間: {end_time - start_time:.2f}秒")

    def monitoring_threads(self):
        """スレッドの監視と管理"""
        class Monitor:
            def __init__(self):
                self.threads = {}
                self.lock = threading.Lock()

            def add_thread(self, name, target):
                with self.lock:
                    thread = threading.Thread(target=target, name=name)
                    self.threads[name] = {
                        'thread': thread,
                        'status': 'created',
                        'start_time': None
                    }
                    return thread

            def start_thread(self, name):
                with self.lock:
                    if name in self.threads:
                        self.threads[name]['status'] = 'running'
                        self.threads[name]['start_time'] = time.time()
                        self.threads[name]['thread'].start()

            def get_status(self):
                with self.lock:
                    status = {}
                    for name, info in self.threads.items():
                        thread = info['thread']
                        if thread.is_alive():
                            info['status'] = 'alive'
                        else:
                            info['status'] = 'dead'
                        status[name] = info.copy()
                    return status

        def long_running_task(name, duration):
            print(f"タスク {name} 開始")
            for i in range(duration):
                print(f"タスク {name}: {i+1}/{duration}")
                time.sleep(1)
            print(f"タスク {name} 終了")

        monitor = Monitor()

        # スレッドの作成と開始
        for i in range(3):
            name = f"worker-{i}"
            thread = monitor.add_thread(
                name, 
                lambda i=i: long_running_task(name, 3 + i)
            )
            monitor.start_thread(name)

        # 監視ループ
        for _ in range(10):
            status = monitor.get_status()
            print("\nスレッド状態:")
            for name, info in status.items():
                print(f"  {name}: {info['status']}")
            time.sleep(1)

            # すべてのスレッドが終了したらループを抜ける
            if all(info['status'] == 'dead' for info in status.values()):
                break

def threading_demo():
    """マルチスレッドデモンストレーション"""
    manager = ThreadManager()

    print("=== 1. 簡単なスレッドデモ ===")
    manager.simple_threads_demo()

    print("\n=== 2. 同期されたカウンター ===")
    manager.synchronized_counter()

    print("\n=== 3. プロデューサー・コンシューマーパターン ===")
    manager.producer_consumer_pattern()

    print("\n=== 4. ThreadPoolExecutorデモ ===")
    manager.thread_pool_executor_demo()

    print("\n=== 5. スレッド監視 ===")
    manager.monitoring_threads()

# テスト実行
threading_demo()

37. イテレータの実装

class RangeIterator:
    """範囲イテレータ"""

    def __init__(self, start, end, step=1):
        self.current = start
        self.end = end
        self.step = step

    def __iter__(self):
        return self

    def __next__(self):
        if (self.step > 0 and self.current >= self.end) or \
           (self.step < 0 and self.current <= self.end):
            raise StopIteration

        current = self.current
        self.current += self.step
        return current

class FibonacciIterator:
    """フィボナッチ数列イテレータ"""

    def __init__(self, limit=None):
        self.limit = limit
        self.a, self.b = 0, 1
        self.count = 0

    def __iter__(self):
        return self

    def __next__(self):
        if self.limit is not None and self.count >= self.limit:
            raise StopIteration

        if self.count == 0:
            self.count += 1
            return self.a
        elif self.count == 1:
            self.count += 1
            return self.b
        else:
            self.a, self.b = self.b, self.a + self.b
            self.count += 1
            return self.b

class FileLineIterator:
    """ファイル行イテレータ"""

    def __init__(self, filename):
        self.filename = filename
        self.file = None

    def __iter__(self):
        self.file = open(self.filename, 'r', encoding='utf-8')
        return self

    def __next__(self):
        line = self.file.readline()
        if not line:
            self.file.close()
            raise StopIteration
        return line.strip()

    def __del__(self):
        if self.file and not self.file.closed:
            self.file.close()

class BatchIterator:
    """バッチ処理イテレータ"""

    def __init__(self, data, batch_size):
        self.data = data
        self.batch_size = batch_size
        self.index = 0

    def __iter__(self):
        return self

    def __next__(self):
        if self.index >= len(self.data):
            raise StopIteration

        batch = self.data[self.index:self.index + self.batch_size]
        self.index += self.batch_size
        return batch

class TreeIterator:
    """ツリー構造イテレータ"""

    def __init__(self, tree, traversal='inorder'):
        self.tree = tree
        self.traversal = traversal
        self.stack = []
        self.current = tree
        self._setup_traversal()

    def _setup_traversal(self):
        """走査方法の設定"""
        if self.traversal == 'preorder' and self.current:
            self.stack.append(self.current)
        elif self.traversal == 'inorder':
            while self.current:
                self.stack.append(self.current)
                self.current = self.current.get('left', None)

    def __iter__(self):
        return self

    def __next__(self):
        if not self.stack:
            raise StopIteration

        if self.traversal == 'preorder':
            node = self.stack.pop()
            if node.get('right'):
                self.stack.append(node['right'])
            if node.get('left'):
                self.stack.append(node['left'])
            return node['value']

        elif self.traversal == 'inorder':
            node = self.stack.pop()
            current = node.get('right')
            while current:
                self.stack.append(current)
                current = current.get('left')
            return node['value']

        elif self.traversal == 'postorder':
            # 実装は少し複雑なので省略
            pass

def iterator_demo():
    """イテレータデモンストレーション"""

    print("=== 1. RangeIterator ===")
    print("1から10まで2ステップ:")
    for i in RangeIterator(1, 10, 2):
        print(i, end=' ')
    print()

    print("\n=== 2. FibonacciIterator ===")
    print("フィボナッチ数列(最初の10個):")
    fib_iter = FibonacciIterator(10)
    for num in fib_iter:
        print(num, end=' ')
    print()

    print("\n=== 3. BatchIterator ===")
    data = list(range(20))
    print("データをバッチ処理:")
    batch_iter = BatchIterator(data, 5)
    for i, batch in enumerate(batch_iter):
        print(f"バッチ {i+1}: {batch}")

    print("\n=== 4. カスタムイテレータの実装例 ===")

    class Countdown:
        """カウントダウンイテレータ"""
        def __init__(self, start):
            self.start = start

        def __iter__(self):
            self.n = self.start
            return self

        def __next__(self):
            if self.n < 0:
                raise StopIteration
            current = self.n
            self.n -= 1
            return current

    print("カウントダウン:")
    for i in Countdown(5):
        print(i, end=' ')
    print()

    print("\n=== 5. ジェネレータとしてのイテレータ ===")

    def prime_generator(limit):
        """素数ジェネレータ"""
        def is_prime(n):
            if n < 2:
                return False
            for i in range(2, int(n**0.5) + 1):
                if n % i == 0:
                    return False
            return True

        for num in range(2, limit + 1):
            if is_prime(num):
                yield num

    print("100以下の素数:")
    for prime in prime_generator(100):
        print(prime, end=' ')
    print()

    print("\n=== 6. itertoolsモジュールの使用例 ===")
    import itertools

    # 無限イテレータ
    print("無限カウント:")
    counter = itertools.count(10, 2)
    for i, num in enumerate(counter):
        if i >= 5:
            break
        print(num, end=' ')
    print()

    # 循環イテレータ
    print("循環イテレータ:")
    cycle_iter = itertools.cycle(['A', 'B', 'C'])
    for i, item in enumerate(cycle_iter):
        if i >= 8:
            break
        print(item, end=' ')
    print()

    # 組み合わせイテレータ
    print("組み合わせ:")
    combinations = itertools.combinations(['A', 'B', 'C', 'D'], 2)
    for combo in combinations:
        print(combo, end=' ')
    print()

def advanced_iterator_example():
    """高度なイテレータの例"""

    class PaginatedAPIIterator:
        """ページネーションAPIのイテレータ"""

        def __init__(self, api_client, endpoint, page_size=10):
            self.api_client = api_client
            self.endpoint = endpoint
            self.page_size = page_size
            self.current_page = 0
            self.current_data = []
            self.has_more = True

        def __iter__(self):
            return self

        def __next__(self):
            if not self.current_data and self.has_more:
                self._fetch_next_page()

            if not self.current_data:
                raise StopIteration

            return self.current_data.pop(0)

        def _fetch_next_page(self):
            """次のページを取得"""
            self.current_page += 1
            response = self.api_client.get(
                self.endpoint,
                params={'page': self.current_page, 'size': self.page_size}
            )

            if response and 'data' in response:
                self.current_data = response['data']
                self.has_more = response.get('has_more', False)
            else:
                self.has_more = False
                self.current_data = []

# テスト実行
iterator_demo()

38. プロパティの使用

class BankAccount:
    """銀行口座クラス(プロパティ使用)"""

    def __init__(self, account_holder, initial_balance=0):
        self._account_holder = account_holder
        self._balance = initial_balance
        self._transaction_history = []
        self._is_active = True

    @property
    def account_holder(self):
        """口座名義人(読み取り専用)"""
        return self._account_holder

    @property
    def balance(self):
        """残高(読み取り専用)"""
        return self._balance

    @property
    def is_active(self):
        """口座状態"""
        return self._is_active

    @is_active.setter
    def is_active(self, value):
        """口座状態の設定"""
        if not isinstance(value, bool):
            raise ValueError("is_activeはブーリアン値である必要があります")
        self._is_active = value
        self._transaction_history.append(
            f"口座状態変更: {'有効' if value else '無効'}"
        )

    @property
    def transaction_count(self):
        """取引回数(計算プロパティ)"""
        return len(self._transaction_history)

    @property
    def recent_transactions(self):
        """最近の取引(最後の5件)"""
        return self._transaction_history[-5:]

    def deposit(self, amount):
        """入金"""
        if not self._is_active:
            raise ValueError("口座が無効です")

        if amount <= 0:
            raise ValueError("入金額は正の数である必要があります")

        self._balance += amount
        self._transaction_history.append(f"入金: +{amount}円")
        return self._balance

    def withdraw(self, amount):
        """出金"""
        if not self._is_active:
            raise ValueError("口座が無効です")

        if amount <= 0:
            raise ValueError("出金額は正の数である必要があります")

        if amount > self._balance:
            raise ValueError("残高不足")

        self._balance -= amount
        self._transaction_history.append(f"出金: -{amount}円")
        return self._balance

class Temperature:
    """温度クラス(摂氏と華氏の変換)"""

    def __init__(self, celsius=0):
        self._celsius = celsius

    @property
    def celsius(self):
        """摂氏温度"""
        return self._celsius

    @celsius.setter
    def celsius(self, value):
        """摂氏温度の設定"""
        self._celsius = value

    @property
    def fahrenheit(self):
        """華氏温度(計算プロパティ)"""
        return (self._celsius * 9/5) + 32

    @fahrenheit.setter
    def fahrenheit(self, value):
        """華氏温度から摂氏を設定"""
        self._celsius = (value - 32) * 5/9

    @property
    def kelvin(self):
        """ケルビン温度(計算プロパティ)"""
        return self._celsius + 273.15

    @kelvin.setter
    def kelvin(self, value):
        """ケルビン温度から摂氏を設定"""
        self._celsius = value - 273.15

class Person:
    """人物クラス(バリデーション付きプロパティ)"""

    def __init__(self, name, age):
        self.name = name
        self.age = age

    @property
    def name(self):
        return self._name

    @name.setter
    def name(self, value):
        if not isinstance(value, str):
            raise TypeError("名前は文字列である必要があります")
        if len(value.strip()) == 0:
            raise ValueError("名前は空にできません")
        if len(value) > 50:
            raise ValueError("名前は50文字以内である必要があります")
        self._name = value.strip()

    @property
    def age(self):
        return self._age

    @age.setter
    def age(self, value):
        if not isinstance(value, int):
            raise TypeError("年齢は整数である必要があります")
        if value < 0:
            raise ValueError("年齢は0以上である必要があります")
        if value > 150:
            raise ValueError("年齢は150以下である必要があります")
        self._age = value

    @property
    def is_adult(self):
        """成人かどうか(計算プロパティ)"""
        return self._age >= 20

    @property
    def generation(self):
        """世代の分類(計算プロパティ)"""
        if self._age >= 75:
            return "団塊の世代"
        elif self._age >= 60:
            return "バブル世代"
        elif self._age >= 45:
            return "団塊ジュニア"
        elif self._age >= 30:
            return "ゆとり世代"
        else:
            return "Z世代"

class Rectangle:
    """四角形クラス(依存プロパティ)"""

    def __init__(self, width=0, height=0):
        self.width = width
        self.height = height

    @property
    def width(self):
        return self._width

    @width.setter
    def width(self, value):
        if value < 0:
            raise ValueError("幅は0以上である必要があります")
        self._width = value

    @property
    def height(self):
        return self._height

    @height.setter
    def height(self, value):
        if value < 0:
            raise ValueError("高さは0以上である必要があります")
        self._height = value

    @property
    def area(self):
        """面積(計算プロパティ)"""
        return self._width * self._height

    @property
    def perimeter(self):
        """周長(計算プロパティ)"""
        return 2 * (self._width + self._height)

    @property
    def is_square(self):
        """正方形かどうか(計算プロパティ)"""
        return self._width == self._height

    @classmethod
    def from_area(cls, area, aspect_ratio=1):
        """面積とアスペクト比から四角形を作成"""
        width = (area * aspect_ratio) ** 0.5
        height = area / width
        return cls(width, height)

def property_demo():
    """プロパティデモンストレーション"""

    print("=== 1. BankAccountクラス ===")
    account = BankAccount("山田太郎", 1000)
    print(f"口座名義: {account.account_holder}")
    print(f"初期残高: {account.balance}円")

    account.deposit(500)
    account.withdraw(200)
    print(f"現在残高: {account.balance}円")
    print(f"取引回数: {account.transaction_count}")
    print(f"最近の取引: {account.recent_transactions}")

    # 口座状態の変更
    account.is_active = False
    print(f"口座状態: {'有効' if account.is_active else '無効'}")

    print("\n=== 2. Temperatureクラス ===")
    temp = Temperature(25)
    print(f"摂氏: {temp.celsius}°C")
    print(f"華氏: {temp.fahrenheit}°F")
    print(f"ケルビン: {temp.kelvin}K")

    # 華氏から設定
    temp.fahrenheit = 100
    print(f"\n華氏100°F → 摂氏: {temp.celsius:.1f}°C")

    # ケルビンから設定
    temp.kelvin = 300
    print(f"ケルビン300K → 摂氏: {temp.celsius:.1f}°C")

    print("\n=== 3. Personクラス ===")
    person = Person("田中花子", 25)
    print(f"名前: {person.name}")
    print(f"年齢: {person.age}")
    print(f"成人: {person.is_adult}")
    print(f"世代: {person.generation}")

    # バリデーションのテスト
    try:
        person.age = -5
    except ValueError as e:
        print(f"年齢バリデーション: {e}")

    try:
        person.name = ""
    except ValueError as e:
        print(f"名前バリデーション: {e}")

    print("\n=== 4. Rectangleクラス ===")
    rect = Rectangle(5, 3)
    print(f"幅: {rect.width}, 高さ: {rect.height}")
    print(f"面積: {rect.area}")
    print(f"周長: {rect.perimeter}")
    print(f"正方形: {rect.is_square}")

    # 面積から作成
    rect2 = Rectangle.from_area(100, aspect_ratio=2)
    print(f"\n面積100、アスペクト比2の四角形:")
    print(f"幅: {rect2.width:.1f}, 高さ: {rect2.height:.1f}")
    print(f"面積: {rect2.area:.1f}")

def advanced_property_example():
    """高度なプロパティの例"""

    class CachedProperty:
        """キャッシュ付きプロパティデコレータ"""

        def __init__(self, func):
            self.func = func
            self.name = func.__name__

        def __get__(self, instance, owner):
            if instance is None:
                return self

            # キャッシュがなければ計算
            if not hasattr(instance, '_cache'):
                instance._cache = {}

            if self.name not in instance._cache:
                instance._cache[self.name] = self.func(instance)

            return instance._cache[self.name]

    class DataProcessor:
        """データプロセッサ(キャッシュ付きプロパティ使用)"""

        def __init__(self, data):
            self.data = data
            self._cache = {}

        @CachedProperty
        def sorted_data(self):
            """ソートされたデータ(重い計算)"""
            print("ソート実行中...")
            return sorted(self.data)

        @CachedProperty
        def statistics(self):
            """統計情報(重い計算)"""
            print("統計計算中...")
            return {
                'mean': sum(self.data) / len(self.data),
                'max': max(self.data),
                'min': min(self.data),
                'count': len(self.data)
            }

    print("\n=== 5. キャッシュ付きプロパティ ===")
    processor = DataProcessor([5, 2, 8, 1, 9, 3])

    print("初回アクセス(計算実行):")
    print(f"ソートデータ: {processor.sorted_data}")
    print(f"統計: {processor.statistics}")

    print("\n2回目アクセス(キャッシュから):")
    print(f"ソートデータ: {processor.sorted_data}")
    print(f"統計: {processor.statistics}")

# テスト実行
property_demo()
advanced_property_example()

39. モックオブジェクトの使用

import unittest
from unittest.mock import Mock, MagicMock, patch, PropertyMock
import requests

class PaymentProcessor:
    """支払い処理クラス"""

    def __init__(self, payment_gateway):
        self.payment_gateway = payment_gateway

    def process_payment(self, amount, card_number):
        """支払いを処理する"""
        if amount <= 0:
            raise ValueError("金額は正の数である必要があります")

        if not self._validate_card(card_number):
            raise ValueError("無効なカード番号です")

        return self.payment_gateway.charge(amount, card_number)

    def _validate_card(self, card_number):
        """カード番号の検証(簡易版)"""
        return len(str(card_number)) == 16 and str(card_number).isdigit()

class EmailService:
    """メールサービスクラス"""

    def send_email(self, to_address, subject, body):
        """メールを送信する"""
        # 実際の実装ではSMTPサーバーなどと連携
        print(f"メール送信: {to_address} - {subject}")
        return True

class UserService:
    """ユーザーサービスクラス"""

    def __init__(self, email_service):
        self.email_service = email_service

    def register_user(self, username, email):
        """ユーザーを登録する"""
        if not username or not email:
            raise ValueError("ユーザー名とメールアドレスは必須です")

        # ユーザー登録処理...
        user_id = hash(username + email) % 10000

        # ウェルカムメール送信
        self.email_service.send_email(
            email,
            "アカウント登録完了",
            f"{username}さん、登録ありがとうございます!"
        )

        return user_id

class TestPaymentProcessor(unittest.TestCase):
    """PaymentProcessorのテストクラス"""

    def setUp(self):
        """各テストの前処理"""
        self.mock_gateway = Mock()
        self.processor = PaymentProcessor(self.mock_gateway)

    def test_successful_payment(self):
        """成功する支払い処理のテスト"""
        # モックの設定
        self.mock_gateway.charge.return_value = {
            'success': True,
            'transaction_id': '12345',
            'message': '支払い成功'
        }

        # テスト実行
        result = self.processor.process_payment(1000, '1234567812345678')

        # 検証
        self.assertTrue(result['success'])
        self.mock_gateway.charge.assert_called_once_with(1000, '1234567812345678')

    def test_invalid_amount(self):
        """無効な金額のテスト"""
        with self.assertRaises(ValueError):
            self.processor.process_payment(0, '1234567812345678')

        # モックが呼ばれていないことを確認
        self.mock_gateway.charge.assert_not_called()

    def test_invalid_card_number(self):
        """無効なカード番号のテスト"""
        with self.assertRaises(ValueError):
            self.processor.process_payment(1000, '1234')

        self.mock_gateway.charge.assert_not_called()

    def test_gateway_failure(self):
        """支払いゲートウェイの失敗テスト"""
        self.mock_gateway.charge.side_effect = Exception("ネットワークエラー")

        with self.assertRaises(Exception):
            self.processor.process_payment(1000, '1234567812345678')

class TestUserService(unittest.TestCase):
    """UserServiceのテストクラス"""

    def test_successful_registration(self):
        """成功するユーザー登録のテスト"""
        # メールサービスのモック作成
        mock_email_service = MagicMock()
        mock_email_service.send_email.return_value = True

        user_service = UserService(mock_email_service)

        # テスト実行
        user_id = user_service.register_user("testuser", "test@example.com")

        # 検証
        self.assertIsInstance(user_id, int)
        mock_email_service.send_email.assert_called_once_with(
            "test@example.com",
            "アカウント登録完了",
            "testuserさん、登録ありがとうございます!"
        )

    def test_registration_with_invalid_data(self):
        """無効なデータでの登録テスト"""
        mock_email_service = MagicMock()
        user_service = UserService(mock_email_service)

        with self.assertRaises(ValueError):
            user_service.register_user("", "test@example.com")

        with self.assertRaises(ValueError):
            user_service.register_user("testuser", "")

        # メール送信が呼ばれていないことを確認
        mock_email_service.send_email.assert_not_called()

class TestWithPatch(unittest.TestCase):
    """patchデコレータを使用したテスト"""

    @patch('requests.get')
    def test_api_call_success(self, mock_get):
        """API呼び出し成功のテスト"""
        # モックの設定
        mock_response = Mock()
        mock_response.status_code = 200
        mock_response.json.return_value = {'data': 'test'}
        mock_get.return_value = mock_response

        # テスト実行
        response = requests.get('https://api.example.com/data')

        # 検証
        self.assertEqual(response.status_code, 200)
        self.assertEqual(response.json(), {'data': 'test'})
        mock_get.assert_called_once_with('https://api.example.com/data')

    @patch('requests.get')
    def test_api_call_failure(self, mock_get):
        """API呼び出し失敗のテスト"""
        mock_get.side_effect = requests.exceptions.ConnectionError("接続エラー")

        with self.assertRaises(requests.exceptions.ConnectionError):
            requests.get('https://api.example.com/data')

class AdvancedMockExamples:
    """高度なモックの使用例"""

    def test_property_mock(self):
        """プロパティのモック"""
        class MyClass:
            @property
            def my_property(self):
                return "original"

        obj = MyClass()

        with patch.object(MyClass, 'my_property', new_callable=PropertyMock) as mock_prop:
            mock_prop.return_value = "mocked"
            self.assertEqual(obj.my_property, "mocked")

    def test_multiple_calls(self):
        """複数回の呼び出しテスト"""
        mock_obj = Mock()

        # 異なる呼び出しで異なる値を返す
        mock_obj.method.side_effect = [10, 20, 30]

        self.assertEqual(mock_obj.method(), 10)
        self.assertEqual(mock_obj.method(), 20)
        self.assertEqual(mock_obj.method(), 30)

    def test_call_args(self):
        """呼び出し引数の検証"""
        mock_obj = Mock()

        mock_obj.method(1, 2, 3, key='value')

        # 呼び出しの検証
        mock_obj.method.assert_called_once()
        mock_obj.method.assert_called_with(1, 2, 3, key='value')

        # 呼び出し履歴
        print("呼び出し履歴:", mock_obj.method.call_args_list)

def mock_demo():
    """モックデモンストレーション"""

    print("=== 1. 基本的なモックの使用 ===")

    # 簡単なモックの作成
    mock_obj = Mock()
    mock_obj.method.return_value = "mock response"

    print(f"モックメソッド: {mock_obj.method()}")
    print(f"呼び出し回数: {mock_obj.method.call_count}")

    # 副作用の設定
    mock_obj.method_with_side_effect = Mock(side_effect=[1, 2, 3])
    print(f"副作用1: {mock_obj.method_with_side_effect()}")
    print(f"副作用2: {mock_obj.method_with_side_effect()}")

    print("\n=== 2. MagicMockの使用 ===")

    magic_mock = MagicMock()
    magic_mock.__len__.return_value = 5
    magic_mock.__getitem__.return_value = "item"

    print(f"長さ: {len(magic_mock)}")
    print(f"インデックスアクセス: {magic_mock[0]}")

    print("\n=== 3. テストの実行 ===")

    # テストスイートの作成と実行
    loader = unittest.TestLoader()
    suite = loader.loadTestsFromTestCase(TestPaymentProcessor)
    runner = unittest.TextTestRunner(verbosity=2)
    result = runner.run(suite)

# テスト実行
mock_demo()

# 個別のテスト実行(必要に応じてコメントアウトを解除)
if __name__ == '__main__':
    unittest.main()

40. パッケージ作成

"""
mycalculatorパッケージの例
ディレクトリ構造:
mycalculator/
├── __init__.py
├── basic_operations.py
├── advanced_operations.py
├── constants.py
└── utils.py
"""

# mycalculator/__init__.py
"""
mycalculatorパッケージ
"""

__version__ = "1.0.0"
__author__ = "Your Name"

from .basic_operations import add, subtract, multiply, divide
from .advanced_operations import power, sqrt, factorial
from .constants import PI, E

__all__ = [
    'add', 'subtract', 'multiply', 'divide',
    'power', 'sqrt', 'factorial',
    'PI', 'E'
]

# mycalculator/basic_operations.py
"""
基本的な算術演算
"""

def add(a, b):
    """2つの数値を加算する"""
    return a + b

def subtract(a, b):
    """2つの数値を減算する"""
    return a - b

def multiply(a, b):
    """2つの数値を乗算する"""
    return a * b

def divide(a, b):
    """2つの数値を除算する"""
    if b == 0:
        raise ValueError("0で除算することはできません")
    return a / b

# mycalculator/advanced_operations.py
"""
高度な数学演算
"""

import math

def power(base, exponent):
    """べき乗を計算する"""
    return base ** exponent

def sqrt(number):
    """平方根を計算する"""
    if number < 0:
        raise ValueError("負の数の平方根は計算できません")
    return math.sqrt(number)

def factorial(n):
    """階乗を計算する"""
    if not isinstance(n, int) or n < 0:
        raise ValueError("階乗は非負の整数でのみ計算できます")
    return math.factorial(n)

# mycalculator/constants.py
"""
数学定数
"""

PI = 3.141592653589793
E = 2.718281828459045

# mycalculator/utils.py
"""
ユーティリティ関数
"""

def is_number(value):
    """値が数値かどうかを判定する"""
    return isinstance(value, (int, float, complex))

def format_result(result, decimals=2):
    """結果を指定された小数点以下桁数でフォーマットする"""
    if isinstance(result, (int, float)):
        return round(result, decimals)
    return result

# setup.py
"""
パッケージ設定ファイル
"""

from setuptools import setup, find_packages

setup(
    name="mycalculator",
    version="1.0.0",
    description="A simple calculator package",
    author="Your Name",
    author_email="your.email@example.com",
    packages=find_packages(),
    install_requires=[
        # 依存パッケージ(必要な場合)
    ],
    classifiers=[
        "Development Status :: 4 - Beta",
        "Intended Audience :: Developers",
        "License :: OSI Approved :: MIT License",
        "Programming Language :: Python :: 3",
        "Programming Language :: Python :: 3.8",
        "Programming Language :: Python :: 3.9",
        "Programming Language :: Python :: 3.10",
    ],
    python_requires=">=3.8",
)

def package_usage_example():
    """パッケージ使用例"""

    # パッケージの使用(実際には別ファイルでインポート)
    print("=== パッケージ使用例 ===")

    # 基本的な演算
    print("基本的な演算:")
    print(f"5 + 3 = {5 + 3}")
    print(f"10 - 4 = {10 - 4}")
    print(f"6 * 7 = {6 * 7}")
    print(f"15 / 3 = {15 / 3}")

    # 高度な演算
    print("\n高度な演算:")
    print(f"2^8 = {2**8}")
    print(f"√25 = {25**0.5}")

    # 定数
    print("\n数学定数:")
    print(f"π ≈ {3.141592653589793}")
    print(f"e ≈ {2.718281828459045}")

def create_package_structure():
    """パッケージ構造の作成例"""
    import os
    import shutil

    package_structure = {
        'mycalculator': [
            '__init__.py',
            'basic_operations.py',
            'advanced_operations.py',
            'constants.py',
            'utils.py'
        ],
        'tests': [
            '__init__.py',
            'test_basic_operations.py',
            'test_advanced_operations.py'
        ],
        'docs': [
            'README.md',
            'LICENSE'
        ],
        'examples': [
            'basic_usage.py',
            'advanced_usage.py'
        ]
    }

    print("=== パッケージ構造 ===")
    for directory, files in package_structure.items():
        print(f"{directory}/")
        for file in files:
            print(f"  └── {file}")

def packaging_guidelines():
    """パッケージ作成ガイドライン"""

    guidelines = """
パッケージ作成のベストプラクティス:

1. 明確な構造:
   - 関連する機能ごとにモジュールを分割
   - __init__.pyで公開APIを定義
   - テストディレクトリを用意

2. ドキュメンテーション:
   - 各モジュール、クラス、関数にdocstringを記載
   - README.mdで使用方法を説明
   - 例となるコードを提供

3. 依存関係の管理:
   - requirements.txtまたはsetup.pyで依存関係を明記
   - バージョン指定を行う

4. テスト:
   - 単体テストを作成
   - カバレッジを測定
   - CI/CDパイプラインを設定

5. バージョン管理:
   - セマンティックバージョニングに従う
   - CHANGELOG.mdを維持

6. 配布:
   - PyPIへのアップロード
   - ホイールパッケージの作成
   - ソース配布の提供
"""
    print(guidelines)

# 実行例
package_usage_example()
print()
create_package_structure()
print()
packaging_guidelines()

上級問題(41-50)解答例

41. Webアプリケーションフレームワーク(Flask)

from flask import Flask, render_template, request, jsonify, redirect, url_for, flash
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
import os

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key-here'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///tasks.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

db = SQLAlchemy(app)

# データベースモデル
class Task(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(100), nullable=False)
    description = db.Column(db.Text)
    completed = db.Column(db.Boolean, default=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    due_date = db.Column(db.Date)

    def to_dict(self):
        return {
            'id': self.id,
            'title': self.title,
            'description': self.description,
            'completed': self.completed,
            'created_at': self.created_at.isoformat(),
            'due_date': self.due_date.isoformat() if self.due_date else None
        }

# ルートの定義
@app.route('/')
def index():
    """ホームページ"""
    return render_template('index.html')

@app.route('/tasks')
def tasks():
    """タスク一覧ページ"""
    tasks = Task.query.all()
    return render_template('tasks.html', tasks=tasks)

@app.route('/tasks/create', methods=['GET', 'POST'])
def create_task():
    """タスク作成"""
    if request.method == 'POST':
        title = request.form['title']
        description = request.form['description']
        due_date_str = request.form['due_date']

        due_date = datetime.strptime(due_date_str, '%Y-%m-%d').date() if due_date_str else None

        task = Task(title=title, description=description, due_date=due_date)
        db.session.add(task)
        db.session.commit()

        flash('タスクが作成されました!', 'success')
        return redirect(url_for('tasks'))

    return render_template('create_task.html')

@app.route('/tasks//edit', methods=['GET', 'POST'])
def edit_task(task_id):
    """タスク編集"""
    task = Task.query.get_or_404(task_id)

    if request.method == 'POST':
        task.title = request.form['title']
        task.description = request.form['description']
        due_date_str = request.form['due_date']
        task.due_date = datetime.strptime(due_date_str, '%Y-%m-%d').date() if due_date_str else None

        db.session.commit()
        flash('タスクが更新されました!', 'success')
        return redirect(url_for('tasks'))

    return render_template('edit_task.html', task=task)

@app.route('/tasks//delete', methods=['POST'])
def delete_task(task_id):
    """タスク削除"""
    task = Task.query.get_or_404(task_id)
    db.session.delete(task)
    db.session.commit()
    flash('タスクが削除されました!', 'success')
    return redirect(url_for('tasks'))

@app.route('/tasks//toggle', methods=['POST'])
def toggle_task(task_id):
    """タスク完了状態の切り替え"""
    task = Task.query.get_or_404(task_id)
    task.completed = not task.completed
    db.session.commit()
    return jsonify({'completed': task.completed})

# APIエンドポイント
@app.route('/api/tasks')
def api_tasks():
    """タスク一覧API"""
    tasks = Task.query.all()
    return jsonify([task.to_dict() for task in tasks])

@app.route('/api/tasks', methods=['POST'])
def api_create_task():
    """タスク作成API"""
    data = request.get_json()
    task = Task(
        title=data['title'],
        description=data.get('description', ''),
        due_date=datetime.strptime(data['due_date'], '%Y-%m-%d').date() if data.get('due_date') else None
    )
    db.session.add(task)
    db.session.commit()
    return jsonify(task.to_dict()), 201

@app.route('/api/tasks/')
def api_get_task(task_id):
    """タスク取得API"""
    task = Task.query.get_or_404(task_id)
    return jsonify(task.to_dict())

# エラーハンドラー
@app.errorhandler(404)
def not_found(error):
    if request.accept_mimetypes.accept_json and not request.accept_mimetypes.accept_html:
        return jsonify({'error': 'Not found'}), 404
    return render_template('404.html'), 404

@app.errorhandler(500)
def internal_error(error):
    if request.accept_mimetypes.accept_json and not request.accept_mimetypes.accept_html:
        return jsonify({'error': 'Internal server error'}), 500
    return render_template('500.html'), 500

# コンテキストプロセッサ
@app.context_processor
def inject_now():
    return {'now': datetime.utcnow()}

# テンプレートフィルター
@app.template_filter('format_date')
def format_date(value, format='%Y年%m月%d日'):
    if value is None:
        return ""
    return value.strftime(format)

def init_db():
    """データベースの初期化"""
    with app.app_context():
        db.create_all()

        # サンプルデータの追加
        if not Task.query.first():
            sample_tasks = [
                Task(title="Pythonの学習", description="Flaskフレームワークを学ぶ", completed=True),
                Task(title="買い物", description="牛乳と卵を買う", due_date=datetime.utcnow().date()),
                Task(title="プロジェクトの提案", description="新しいプロジェクトの提案書を作成")
            ]
            db.session.add_all(sample_tasks)
            db.session.commit()

if __name__ == '__main__':
    init_db()
    app.run(debug=True)

# templates/index.html
"""



    タスク管理アプリ
    


    

タスク管理アプリ

Flaskで作られたシンプルなタスク管理アプリケーション

機能
  • タスクの作成・編集・削除
  • タスクの完了状態管理
  • 期日設定
  • REST API
タスク一覧 新規タスク
""" # templates/tasks.html """ タスク一覧

タスク一覧

{% with messages = get_flashed_messages(with_categories=true) %} {% if messages %} {% for category, message in messages %}
{{ message }}
{% endfor %} {% endif %} {% endwith %} 新規タスク
{% for task in tasks %}
{% if task.completed %} {{ task.title }} {% else %} {{ task.title }} {% endif %}

{{ task.description }}

{% if task.due_date %}

期日: {{ task.due_date|format_date }}

{% endif %}
編集
{% endfor %}
"""

42. 非同期プログラミング

import asyncio
import aiohttp
import aiofiles
import asyncpg
from datetime import datetime
import json
import time

class AsyncWebScraper:
    """非同期ウェブスクレイパー"""

    def __init__(self, max_concurrent=5):
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch_url(self, session, url):
        """URLを非同期で取得"""
        async with self.semaphore:
            try:
                async with session.get(url, timeout=30) as response:
                    content = await response.text()
                    return {
                        'url': url,
                        'status': response.status,
                        'content_length': len(content),
                        'timestamp': datetime.now()
                    }
            except Exception as e:
                return {
                    'url': url,
                    'error': str(e),
                    'timestamp': datetime.now()
                }

    async def scrape_multiple_urls(self, urls):
        """複数URLを並行でスクレイピング"""
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch_url(session, url) for url in urls]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            return results

    async def save_results(self, results, filename):
        """結果を非同期でファイルに保存"""
        async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
            await f.write(json.dumps(results, indent=2, default=str))

class AsyncDatabaseManager:
    """非同期データベースマネージャー"""

    def __init__(self, connection_string):
        self.connection_string = connection_string
        self.pool = None

    async def connect(self):
        """データベース接続"""
        self.pool = await asyncpg.create_pool(self.connection_string)

    async def create_tables(self):
        """テーブル作成"""
        async with self.pool.acquire() as conn:
            await conn.execute('''
                CREATE TABLE IF NOT EXISTS scraped_data (
                    id SERIAL PRIMARY KEY,
                    url TEXT NOT NULL,
                    status INTEGER,
                    content_length INTEGER,
                    error TEXT,
                    timestamp TIMESTAMP
                )
            ''')

    async def insert_results(self, results):
        """結果をデータベースに挿入"""
        async with self.pool.acquire() as conn:
            for result in results:
                await conn.execute('''
                    INSERT INTO scraped_data 
                    (url, status, content_length, error, timestamp)
                    VALUES ($1, $2, $3, $4, $5)
                ''', result['url'], result.get('status'), 
                   result.get('content_length'), result.get('error'), 
                   result['timestamp'])

    async def get_statistics(self):
        """統計情報を取得"""
        async with self.pool.acquire() as conn:
            total = await conn.fetchval('SELECT COUNT(*) FROM scraped_data')
            successful = await conn.fetchval(
                'SELECT COUNT(*) FROM scraped_data WHERE status = 200'
            )
            avg_size = await conn.fetchval(
                'SELECT AVG(content_length) FROM scraped_data WHERE status = 200'
            )

            return {
                'total_requests': total,
                'successful_requests': successful,
                'success_rate': (successful / total * 100) if total > 0 else 0,
                'average_content_size': avg_size or 0
            }

class AsyncFileProcessor:
    """非同期ファイルプロセッサー"""

    async def process_file_chunked(self, input_file, output_file, chunk_size=1024):
        """ファイルをチャンクで非同期処理"""
        async with aiofiles.open(input_file, 'r', encoding='utf-8') as infile:
            async with aiofiles.open(output_file, 'w', encoding='utf-8') as outfile:
                async for chunk in infile:
                    processed_chunk = await self.process_chunk(chunk)
                    await outfile.write(processed_chunk)

    async def process_chunk(self, chunk):
        """チャンクを処理(例: 大文字に変換)"""
        # 非同期処理をシミュレート
        await asyncio.sleep(0.01)
        return chunk.upper()

class AsyncTaskScheduler:
    """非同期タスクスケジューラー"""

    def __init__(self):
        self.tasks = []
        self.running = False

    async def add_periodic_task(self, coro, interval):
        """定期的なタスクを追加"""
        while self.running:
            await coro()
            await asyncio.sleep(interval)

    async def add_delayed_task(self, coro, delay):
        """遅延タスクを追加"""
        await asyncio.sleep(delay)
        await coro()

    async def start(self):
        """スケジューラーを開始"""
        self.running = True
        await asyncio.gather(*self.tasks)

    def stop(self):
        """スケジューラーを停止"""
        self.running = False

async def performance_comparison():
    """同期 vs 非同期のパフォーマンス比較"""

    urls = [
        'https://httpbin.org/delay/1',
        'https://httpbin.org/delay/2',
        'https://httpbin.org/delay/1',
        'https://httpbin.org/delay/3',
        'https://httpbin.org/delay/1'
    ] * 4  # 20個のURL

    # 非同期版
    print("非同期スクレイピング開始...")
    start_time = time.time()

    scraper = AsyncWebScraper(max_concurrent=5)
    results = await scraper.scrape_multiple_urls(urls)

    async_time = time.time() - start_time
    print(f"非同期版 完了: {async_time:.2f}秒")
    print(f"処理したURL数: {len(results)}")

    # 同期版のシミュレーション(実際には実行しない)
    estimated_sync_time = sum([1, 2, 1, 3, 1]) * 4  # 各遅延の合計
    print(f"推定同期版時間: {estimated_sync_time}秒")
    print(f"速度向上: {estimated_sync_time/async_time:.1f}倍")

async def complex_async_example():
    """複雑な非同期処理の例"""

    async def monitor_system():
        """システム監視タスク"""
        while True:
            print(f"[監視] {datetime.now().strftime('%H:%M:%S')} - システム正常")
            await asyncio.sleep(10)

    async def data_processing_pipeline():
        """データ処理パイプライン"""
        # 1. データ収集
        urls = [
            'https://httpbin.org/json',
            'https://httpbin.org/xml',
            'https://httpbin.org/html'
        ]

        scraper = AsyncWebScraper()
        raw_data = await scraper.scrape_multiple_urls(urls)

        # 2. データ変換
        processed_data = []
        for item in raw_data:
            if 'content_length' in item:
                processed_data.append({
                    'source': item['url'],
                    'size': item['content_length'],
                    'processed_at': datetime.now()
                })

        # 3. データ保存
        async with aiofiles.open('processed_data.json', 'w') as f:
            await f.write(json.dumps(processed_data, indent=2, default=str))

        print(f"処理完了: {len(processed_data)}件のデータを保存")

    async def main():
        """メイン処理"""
        # 並行して実行するタスク
        tasks = [
            monitor_system(),
            data_processing_pipeline(),
            asyncio.sleep(30)  # タイムアウト
        ]

        try:
            await asyncio.wait_for(asyncio.gather(*tasks), timeout=35)
        except asyncio.TimeoutError:
            print("処理がタイムアウトしました")

    await main()

async def async_database_example():
    """非同期データベースの使用例"""

    # PostgreSQLの接続文字列(実際の環境に合わせて変更)
    connection_string = "postgresql://user:password@localhost/dbname"

    db_manager = AsyncDatabaseManager(connection_string)

    try:
        await db_manager.connect()
        await db_manager.create_tables()

        # サンプルデータの挿入
        sample_results = [
            {
                'url': 'https://example.com',
                'status': 200,
                'content_length': 1234,
                'timestamp': datetime.now()
            },
            {
                'url': 'https://test.org',
                'status': 404,
                'error': 'Not found',
                'timestamp': datetime.now()
            }
        ]

        await db_manager.insert_results(sample_results)

        # 統計情報の取得
        stats = await db_manager.get_statistics()
        print("データベース統計:")
        for key, value in stats.items():
            print(f"  {key}: {value}")

    except Exception as e:
        print(f"データベースエラー: {e}")

# メイン実行関数
async def main():
    """メイン関数"""
    print("=== 非同期プログラミングデモ ===")

    # 1. パフォーマンス比較
    await performance_comparison()

    print("\n=== 複雑な非同期処理 ===")
    await complex_async_example()

    print("\n=== 非同期データベース操作 ===")
    await async_database_example()

# 実行
if __name__ == "__main__":
    asyncio.run(main())

43. データベースORM(SQLAlchemy)

from sqlalchemy import create_engine, Column, Integer, String, Text, Boolean, DateTime, ForeignKey, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship, backref
from sqlalchemy import func
from datetime import datetime
import os

Base = declarative_base()

# データモデルの定義
class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True, nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    password_hash = Column(String(128), nullable=False)
    first_name = Column(String(50))
    last_name = Column(String(50))
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    # リレーションシップ
    posts = relationship('Post', back_populates='author', cascade='all, delete-orphan')
    comments = relationship('Comment', back_populates='user', cascade='all, delete-orphan')

    def __repr__(self):
        return f""

    def to_dict(self):
        return {
            'id': self.id,
            'username': self.username,
            'email': self.email,
            'first_name': self.first_name,
            'last_name': self.last_name,
            'is_active': self.is_active,
            'created_at': self.created_at.isoformat(),
            'post_count': len(self.posts)
        }

class Category(Base):
    __tablename__ = 'categories'

    id = Column(Integer, primary_key=True)
    name = Column(String(50), unique=True, nullable=False)
    description = Column(Text)
    created_at = Column(DateTime, default=datetime.utcnow)

    posts = relationship('Post', back_populates='category')

    def __repr__(self):
        return f""

class Post(Base):
    __tablename__ = 'posts'

    id = Column(Integer, primary_key=True)
    title = Column(String(200), nullable=False)
    content = Column(Text, nullable=False)
    summary = Column(Text)
    is_published = Column(Boolean, default=False)
    view_count = Column(Integer, default=0)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    published_at = Column(DateTime)

    # 外部キー
    author_id = Column(Integer, ForeignKey('users.id'), nullable=False)
    category_id = Column(Integer, ForeignKey('categories.id'))

    # リレーションシップ
    author = relationship('User', back_populates='posts')
    category = relationship('Category', back_populates='posts')
    comments = relationship('Comment', back_populates='post', cascade='all, delete-orphan')
    tags = relationship('Tag', secondary='post_tags', back_populates='posts')

    def __repr__(self):
        return f""

    def to_dict(self, include_comments=False):
        data = {
            'id': self.id,
            'title': self.title,
            'content': self.content,
            'summary': self.summary,
            'is_published': self.is_published,
            'view_count': self.view_count,
            'created_at': self.created_at.isoformat(),
            'author': self.author.username,
            'category': self.category.name if self.category else None,
            'tags': [tag.name for tag in self.tags]
        }

        if include_comments:
            data['comments'] = [comment.to_dict() for comment in self.comments]

        return data

class Tag(Base):
    __tablename__ = 'tags'

    id = Column(Integer, primary_key=True)
    name = Column(String(30), unique=True, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)

    posts = relationship('Post', secondary='post_tags', back_populates='tags')

    def __repr__(self):
        return f""

class Comment(Base):
    __tablename__ = 'comments'

    id = Column(Integer, primary_key=True)
    content = Column(Text, nullable=False)
    is_approved = Column(Boolean, default=False)
    created_at = Column(DateTime, default=datetime.utcnow)

    # 外部キー
    user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
    post_id = Column(Integer, ForeignKey('posts.id'), nullable=False)

    # リレーションシップ
    user = relationship('User', back_populates='comments')
    post = relationship('Post', back_populates='comments')

    def __repr__(self):
        return f""

    def to_dict(self):
        return {
            'id': self.id,
            'content': self.content,
            'is_approved': self.is_approved,
            'created_at': self.created_at.isoformat(),
            'user': self.user.username,
            'post_title': self.post.title
        }

# 多対多の関連テーブル
post_tags = Table('post_tags', Base.metadata,
    Column('post_id', Integer, ForeignKey('posts.id'), primary_key=True),
    Column('tag_id', Integer, ForeignKey('tags.id'), primary_key=True)
)

class BlogManager:
    """ブログ管理クラス"""

    def __init__(self, database_url='sqlite:///blog.db'):
        self.engine = create_engine(database_url)
        self.Session = sessionmaker(bind=self.engine)
        self.create_tables()

    def create_tables(self):
        """テーブルを作成"""
        Base.metadata.create_all(self.engine)

    def add_sample_data(self):
        """サンプルデータを追加"""
        session = self.Session()

        try:
            # カテゴリーの追加
            categories = [
                Category(name='Python', description='Pythonプログラミング'),
                Category(name='Web開発', description='Web開発関連'),
                Category(name='データベース', description='データベース技術')
            ]
            session.add_all(categories)

            # タグの追加
            tags = [
                Tag(name='flask'),
                Tag(name='django'),
                Tag(name='sqlalchemy'),
                Tag(name='orm'),
                Tag(name='web')
            ]
            session.add_all(tags)

            # ユーザーの追加
            users = [
                User(username='alice', email='alice@example.com', password_hash='hash1'),
                User(username='bob', email='bob@example.com', password_hash='hash2')
            ]
            session.add_all(users)

            session.commit()

            # 投稿の追加
            python_category = session.query(Category).filter_by(name='Python').first()
            web_category = session.query(Category).filter_by(name='Web開発').first()

            posts = [
                Post(
                    title='SQLAlchemy入門',
                    content='SQLAlchemyはPythonのORMです...',
                    summary='SQLAlchemyの基本的な使い方',
                    is_published=True,
                    author_id=users[0].id,
                    category_id=python_category.id,
                    published_at=datetime.utcnow()
                ),
                Post(
                    title='FlaskでWebアプリ開発',
                    content='Flaskは軽量なWebフレームワークです...',
                    summary='Flaskを使ったWeb開発',
                    is_published=True,
                    author_id=users[1].id,
                    category_id=web_category.id,
                    published_at=datetime.utcnow()
                )
            ]
            session.add_all(posts)

            # タグの関連付け
            posts[0].tags.extend([tags[2], tags[3]])  # sqlalchemy, orm
            posts[1].tags.extend([tags[0], tags[4]])  # flask, web

            # コメントの追加
            comments = [
                Comment(
                    content='とても参考になりました!',
                    is_approved=True,
                    user_id=users[1].id,
                    post_id=posts[0].id
                ),
                Comment(
                    content='続編を期待しています',
                    user_id=users[0].id,
                    post_id=posts[1].id
                )
            ]
            session.add_all(comments)

            session.commit()
            print("サンプルデータを追加しました")

        except Exception as e:
            session.rollback()
            print(f"エラー: {e}")
        finally:
            session.close()

    def get_published_posts(self):
        """公開済みの投稿を取得"""
        session = self.Session()
        try:
            posts = session.query(Post).filter_by(is_published=True).all()
            return [post.to_dict() for post in posts]
        finally:
            session.close()

    def get_posts_by_author(self, username):
        """特定の作者の投稿を取得"""
        session = self.Session()
        try:
            posts = session.query(Post).join(User).filter(User.username == username).all()
            return [post.to_dict() for post in posts]
        finally:
            session.close()

    def get_posts_by_category(self, category_name):
        """カテゴリー別の投稿を取得"""
        session = self.Session()
        try:
            posts = session.query(Post).join(Category).filter(Category.name == category_name).all()
            return [post.to_dict() for post in posts]
        finally:
            session.close()

    def search_posts(self, keyword):
        """投稿を検索"""
        session = self.Session()
        try:
            posts = session.query(Post).filter(
                (Post.title.contains(keyword)) | 
                (Post.content.contains(keyword))
            ).all()
            return [post.to_dict() for post in posts]
        finally:
            session.close()

    def get_post_with_comments(self, post_id):
        """投稿とコメントを取得"""
        session = self.Session()
        try:
            post = session.query(Post).get(post_id)
            if post:
                return post.to_dict(include_comments=True)
            return None
        finally:
            session.close()

    def create_post(self, title, content, author_username, category_name=None, tags=None):
        """新しい投稿を作成"""
        session = self.Session()
        try:
            author = session.query(User).filter_by(username=author_username).first()
            if not author:
                raise ValueError("ユーザーが見つかりません")

            category = None
            if category_name:
                category = session.query(Category).filter_by(name=category_name).first()

            post = Post(
                title=title,
                content=content,
                author_id=author.id,
                category_id=category.id if category else None,
                is_published=True,
                published_at=datetime.utcnow()
            )

            if tags:
                tag_objects = session.query(Tag).filter(Tag.name.in_(tags)).all()
                post.tags.extend(tag_objects)

            session.add(post)
            session.commit()
            return post.to_dict()
        except Exception as e:
            session.rollback()
            raise e
        finally:
            session.close()

    def get_statistics(self):
        """統計情報を取得"""
        session = self.Session()
        try:
            stats = {}

            # 投稿数
            stats['total_posts'] = session.query(Post).count()
            stats['published_posts'] = session.query(Post).filter_by(is_published=True).count()

            # ユーザー数
            stats['total_users'] = session.query(User).count()
            stats['active_users'] = session.query(User).filter_by(is_active=True).count()

            # コメント数
            stats['total_comments'] = session.query(Comment).count()
            stats['approved_comments'] = session.query(Comment).filter_by(is_approved=True).count()

            # 人気のカテゴリー
            popular_categories = session.query(
                Category.name, 
                func.count(Post.id).label('post_count')
            ).join(Post).group_by(Category.id).order_by(func.count(Post.id).desc()).limit(5).all()

            stats['popular_categories'] = [
                {'name': name, 'post_count': count} 
                for name, count in popular_categories
            ]

            return stats
        finally:
            session.close()

def orm_demo():
    """ORMデモンストレーション"""
    manager = BlogManager()

    print("=== SQLAlchemy ORM デモ ===")

    # サンプルデータの追加
    manager.add_sample_data()

    print("\n1. 公開済み投稿の取得:")
    posts = manager.get_published_posts()
    for post in posts:
        print(f"  - {post['title']} by {post['author']}")

    print("\n2. 作者別投稿の取得:")
    author_posts = manager.get_posts_by_author('alice')
    for post in author_posts:
        print(f"  - {post['title']}")

    print("\n3. 投稿の検索:")
    search_results = manager.search_posts('Flask')
    for post in search_results:
        print(f"  - {post['title']}")

    print("\n4. 統計情報:")
    stats = manager.get_statistics()
    for key, value in stats.items():
        if key != 'popular_categories':
            print(f"  {key}: {value}")

    print("\n5. 人気カテゴリー:")
    for category in stats['popular_categories']:
        print(f"  - {category['name']}: {category['post_count']}投稿")

    print("\n6. 新しい投稿の作成:")
    try:
        new_post = manager.create_post(
            title='新しいORMの機能',
            content='SQLAlchemyの新しい機能について...',
            author_username='bob',
            category_name='Python',
            tags=['sqlalchemy', 'orm']
        )
        print(f"  作成した投稿: {new_post['title']}")
    except Exception as e:
        print(f"  エラー: {e}")

# 高度なクエリの例
def advanced_queries_demo():
    """高度なクエリのデモ"""
    manager = BlogManager()
    session = manager.Session()

    try:
        print("\n=== 高度なクエリ例 ===")

        # 1. 集計クエリ
        print("1. ユーザー別投稿数:")
        user_post_counts = session.query(
            User.username,
            func.count(Post.id).label('post_count')
        ).outerjoin(Post).group_by(User.id).all()

        for username, count in user_post_counts:
            print(f"  {username}: {count}投稿")

        # 2. サブクエリ
        print("\n2. コメント数の多い投稿トップ3:")
        subquery = session.query(
            Comment.post_id,
            func.count(Comment.id).label('comment_count')
        ).group_by(Comment.post_id).subquery()

        top_posts = session.query(
            Post.title,
            func.coalesce(subquery.c.comment_count, 0).label('comment_count')
        ).outerjoin(subquery, Post.id == subquery.c.post_id)\
         .order_by(func.coalesce(subquery.c.comment_count, 0).desc())\
         .limit(3).all()

        for title, count in top_posts:
            print(f"  {title}: {count}コメント")

        # 3. ウィンドウ関数
        print("\n3. カテゴリー別投稿ランキング:")
        from sqlalchemy import over
        from sqlalchemy import desc

        category_rank = session.query(
            Category.name,
            Post.title,
            func.row_number().over(
                partition_by=Category.id,
                order_by=desc(Post.created_at)
            ).label('rank_in_category')
        ).join(Post).filter(Post.is_published == True).all()

        for category_name, post_title, rank in category_rank:
            if rank <= 3:  # 各カテゴリーのトップ3のみ表示
                print(f"  {category_name} - {rank}位: {post_title}")

    finally:
        session.close()

# 実行
if __name__ == "__main__":
    orm_demo()
    advanced_queries_demo()

44. APIサーバーの実装

from flask import Flask, request, jsonify, abort
from flask_restful import Api, Resource, reqparse, fields, marshal_with
from flask_sqlalchemy import SQLAlchemy
from flask_jwt_extended import JWTManager, create_access_token, jwt_required, get_jwt_identity
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime, timedelta
import os

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key-here'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///api.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['JWT_SECRET_KEY'] = 'jwt-secret-key'
app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=24)

db = SQLAlchemy(app)
api = Api(app)
jwt = JWTManager(app)

# データモデル
class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)
    password_hash = db.Column(db.String(128))
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    is_active = db.Column(db.Boolean, default=True)

    products = db.relationship('Product', backref='owner', lazy=True)
    orders = db.relationship('Order', backref='user', lazy=True)

class Product(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(100), nullable=False)
    description = db.Column(db.Text)
    price = db.Column(db.Float, nullable=False)
    stock_quantity = db.Column(db.Integer, default=0)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
    category_id = db.Column(db.Integer, db.ForeignKey('category.id'))

    category = db.relationship('Category', backref='products')
    order_items = db.relationship('OrderItem', backref='product', lazy=True)

class Category(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(50), unique=True, nullable=False)
    description = db.Column(db.Text)

class Order(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    total_amount = db.Column(db.Float, nullable=False)
    status = db.Column(db.String(20), default='pending')  # pending, confirmed, shipped, delivered, cancelled
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)

    items = db.relationship('OrderItem', backref='order', lazy=True, cascade='all, delete-orphan')

class OrderItem(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    quantity = db.Column(db.Integer, nullable=False)
    unit_price = db.Column(db.Float, nullable=False)

    order_id = db.Column(db.Integer, db.ForeignKey('order.id'), nullable=False)
    product_id = db.Column(db.Integer, db.ForeignKey('product.id'), nullable=False)

# マーシャリングフィールド
user_fields = {
    'id': fields.Integer,
    'username': fields.String,
    'email': fields.String,
    'created_at': fields.DateTime(dt_format='iso8601'),
    'is_active': fields.Boolean
}

product_fields = {
    'id': fields.Integer,
    'name': fields.String,
    'description': fields.String,
    'price': fields.Float,
    'stock_quantity': fields.Integer,
    'created_at': fields.DateTime(dt_format='iso8601'),
    'owner': fields.String(attribute='owner.username'),
    'category': fields.String(attribute='category.name')
}

category_fields = {
    'id': fields.Integer,
    'name': fields.String,
    'description': fields.String
}

order_fields = {
    'id': fields.Integer,
    'total_amount': fields.Float,
    'status': fields.String,
    'created_at': fields.DateTime(dt_format='iso8601'),
    'user': fields.String(attribute='user.username'),
    'items': fields.List(fields.Nested({
        'product_name': fields.String(attribute='product.name'),
        'quantity': fields.Integer,
        'unit_price': fields.Float
    }))
}

# リクエストパーサー
user_parser = reqparse.RequestParser()
user_parser.add_argument('username', type=str, required=True, help='Username is required')
user_parser.add_argument('email', type=str, required=True, help='Email is required')
user_parser.add_argument('password', type=str, required=True, help='Password is required')

product_parser = reqparse.RequestParser()
product_parser.add_argument('name', type=str, required=True, help='Product name is required')
product_parser.add_argument('description', type=str)
product_parser.add_argument('price', type=float, required=True, help='Price is required')
product_parser.add_argument('stock_quantity', type=int, default=0)
product_parser.add_argument('category_id', type=int)

category_parser = reqparse.RequestParser()
category_parser.add_argument('name', type=str, required=True, help='Category name is required')
category_parser.add_argument('description', type=str)

order_parser = reqparse.RequestParser()
order_parser.add_argument('items', type=list, location='json', required=True, help='Order items are required')

# リソースクラス
class AuthResource(Resource):
    def post(self):
        """ユーザー認証"""
        data = request.get_json()
        username = data.get('username')
        password = data.get('password')

        user = User.query.filter_by(username=username).first()

        if user and check_password_hash(user.password_hash, password):
            access_token = create_access_token(identity=user.id)
            return {
                'access_token': access_token,
                'user': {
                    'id': user.id,
                    'username': user.username,
                    'email': user.email
                }
            }, 200

        return {'message': 'Invalid credentials'}, 401

class UserListResource(Resource):
    def post(self):
        """新規ユーザー登録"""
        args = user_parser.parse_args()

        if User.query.filter_by(username=args['username']).first():
            return {'message': 'Username already exists'}, 400

        if User.query.filter_by(email=args['email']).first():
            return {'message': 'Email already exists'}, 400

        user = User(
            username=args['username'],
            email=args['email'],
            password_hash=generate_password_hash(args['password'])
        )

        db.session.add(user)
        db.session.commit()

        return {'message': 'User created successfully'}, 201

class UserResource(Resource):
    @jwt_required()
    def get(self, user_id):
        """ユーザー情報取得"""
        current_user_id = get_jwt_identity()

        if current_user_id != user_id:
            return {'message': 'Access denied'}, 403

        user = User.query.get_or_404(user_id)
        return marshal_with(user_fields)(user)

class ProductListResource(Resource):
    @marshal_with(product_fields)
    def get(self):
        """商品一覧取得"""
        products = Product.query.all()
        return products

    @jwt_required()
    def post(self):
        """新規商品作成"""
        current_user_id = get_jwt_identity()
        args = product_parser.parse_args()

        product = Product(
            name=args['name'],
            description=args.get('description'),
            price=args['price'],
            stock_quantity=args.get('stock_quantity', 0),
            user_id=current_user_id,
            category_id=args.get('category_id')
        )

        db.session.add(product)
        db.session.commit()

        return marshal_with(product_fields)(product), 201

class ProductResource(Resource):
    @marshal_with(product_fields)
    def get(self, product_id):
        """商品詳細取得"""
        product = Product.query.get_or_404(product_id)
        return product

    @jwt_required()
    def put(self, product_id):
        """商品更新"""
        current_user_id = get_jwt_identity()
        product = Product.query.get_or_404(product_id)

        if product.owner.id != current_user_id:
            return {'message': 'Access denied'}, 403

        args = product_parser.parse_args()

        product.name = args['name']
        product.description = args.get('description')
        product.price = args['price']
        product.stock_quantity = args.get('stock_quantity', 0)
        product.category_id = args.get('category_id')

        db.session.commit()

        return marshal_with(product_fields)(product)

    @jwt_required()
    def delete(self, product_id):
        """商品削除"""
        current_user_id = get_jwt_identity()
        product = Product.query.get_or_404(product_id)

        if product.owner.id != current_user_id:
            return {'message': 'Access denied'}, 403

        db.session.delete(product)
        db.session.commit()

        return {'message': 'Product deleted'}, 200

class CategoryListResource(Resource):
    @marshal_with(category_fields)
    def get(self):
        """カテゴリー一覧取得"""
        categories = Category.query.all()
        return categories

    @jwt_required()
    def post(self):
        """新規カテゴリー作成"""
        args = category_parser.parse_args()

        if Category.query.filter_by(name=args['name']).first():
            return {'message': 'Category already exists'}, 400

        category = Category(
            name=args['name'],
            description=args.get('description')
        )

        db.session.add(category)
        db.session.commit()

        return marshal_with(category_fields)(category), 201

class OrderListResource(Resource):
    @jwt_required()
    @marshal_with(order_fields)
    def get(self):
        """注文一覧取得(自分の注文のみ)"""
        current_user_id = get_jwt_identity()
        orders = Order.query.filter_by(user_id=current_user_id).all()
        return orders

    @jwt_required()
    def post(self):
        """新規注文作成"""
        current_user_id = get_jwt_identity()
        data = request.get_json()

        if not data.get('items') or not isinstance(data['items'], list):
            return {'message': 'Order items are required'}, 400

        total_amount = 0
        order_items = []

        for item in data['items']:
            product_id = item.get('product_id')
            quantity = item.get('quantity', 1)

            product = Product.query.get(product_id)
            if not product:
                return {'message': f'Product {product_id} not found'}, 404

            if product.stock_quantity < quantity:
                return {'message': f'Insufficient stock for {product.name}'}, 400

            total_amount += product.price * quantity

            order_item = OrderItem(
                product_id=product_id,
                quantity=quantity,
                unit_price=product.price
            )
            order_items.append(order_item)

            # 在庫を減らす
            product.stock_quantity -= quantity

        order = Order(
            total_amount=total_amount,
            user_id=current_user_id,
            items=order_items
        )

        db.session.add(order)
        db.session.commit()

        return marshal_with(order_fields)(order), 201

class OrderResource(Resource):
    @jwt_required()
    @marshal_with(order_fields)
    def get(self, order_id):
        """注文詳細取得"""
        current_user_id = get_jwt_identity()
        order = Order.query.get_or_404(order_id)

        if order.user_id != current_user_id:
            abort(403)

        return order

    @jwt_required()
    def put(self, order_id):
        """注文ステータス更新"""
        current_user_id = get_jwt_identity()
        order = Order.query.get_or_404(order_id)

        if order.user_id != current_user_id:
            return {'message': 'Access denied'}, 403

        data = request.get_json()
        new_status = data.get('status')

        valid_statuses = ['pending', 'confirmed', 'shipped', 'delivered', 'cancelled']
        if new_status not in valid_statuses:
            return {'message': 'Invalid status'}, 400

        order.status = new_status
        db.session.commit()

        return marshal_with(order_fields)(order)

# ルーティング
api.add_resource(AuthResource, '/auth/login')
api.add_resource(UserListResource, '/users')
api.add_resource(UserResource, '/users/')
api.add_resource(ProductListResource, '/products')
api.add_resource(ProductResource, '/products/')
api.add_resource(CategoryListResource, '/categories')
api.add_resource(OrderListResource, '/orders')
api.add_resource(OrderResource, '/orders/')

# エラーハンドラー
@app.errorhandler(404)
def not_found(error):
    return jsonify({'message': 'Resource not found'}), 404

@app.errorhandler(500)
def internal_error(error):
    return jsonify({'message': 'Internal server error'}), 500

@jwt.unauthorized_loader
def unauthorized_callback(callback):
    return jsonify({'message': 'Missing or invalid token'}), 401

@jwt.invalid_token_loader
def invalid_token_callback(callback):
    return jsonify({'message': 'Invalid token'}), 401

@jwt.expired_token_loader
def expired_token_callback(callback):
    return jsonify({'message': 'Token has expired'}), 401

def init_db():
    """データベースの初期化"""
    with app.app_context():
        db.create_all()

        # サンプルデータの追加
        if not User.query.first():
            # カテゴリー
            categories = [
                Category(name='Electronics', description='Electronic devices'),
                Category(name='Books', description='Books and magazines'),
                Category(name='Clothing', description='Clothes and accessories')
            ]
            db.session.add_all(categories)

            # ユーザー
            users = [
                User(
                    username='admin',
                    email='admin@example.com',
                    password_hash=generate_password_hash('password')
                ),
                User(
                    username='user1',
                    email='user1@example.com',
                    password_hash=generate_password_hash('password')
                )
            ]
            db.session.add_all(users)
            db.session.commit()

            # 商品
            products = [
                Product(
                    name='Laptop',
                    description='High-performance laptop',
                    price=999.99,
                    stock_quantity=10,
                    user_id=users[0].id,
                    category_id=categories[0].id
                ),
                Product(
                    name='Python Book',
                    description='Learn Python programming',
                    price=39.99,
                    stock_quantity=50,
                    user_id=users[0].id,
                    category_id=categories[1].id
                )
            ]
            db.session.add_all(products)
            db.session.commit()

if __name__ == '__main__':
    init_db()
    app.run(debug=True, port=5000)

45. ユニットテストの包括的実装

import unittest
from unittest.mock import Mock, patch, MagicMock
import pytest
import sys
import os

# テスト対象のモジュールをインポート
sys.path.append(os.path.dirname(os.path.abspath(__file__)))

class TestCalculator(unittest.TestCase):
    """電卓クラスのテスト"""

    def setUp(self):
        """各テストの前処理"""
        self.calc = Calculator()

    def test_add(self):
        """加算のテスト"""
        self.assertEqual(self.calc.add(2, 3), 5)
        self.assertEqual(self.calc.add(-1, 1), 0)
        self.assertEqual(self.calc.add(0, 0), 0)

    def test_subtract(self):
        """減算のテスト"""
        self.assertEqual(self.calc.subtract(5, 3), 2)
        self.assertEqual(self.calc.subtract(0, 5), -5)

    def test_multiply(self):
        """乗算のテスト"""
        self.assertEqual(self.calc.multiply(3, 4), 12)
        self.assertEqual(self.calc.multiply(0, 5), 0)

    def test_divide(self):
        """除算のテスト"""
        self.assertEqual(self.calc.divide(10, 2), 5)
        self.assertEqual(self.calc.divide(5, 2), 2.5)

    def test_divide_by_zero(self):
        """ゼロ除算のテスト"""
        with self.assertRaises(ValueError):
            self.calc.divide(10, 0)

    def test_power(self):
        """べき乗のテスト"""
        self.assertEqual(self.calc.power(2, 3), 8)
        self.assertEqual(self.calc.power(5, 0), 1)

    @unittest.skip("まだ実装されていません")
    def test_future_feature(self):
        """将来の機能のテスト(スキップ)"""
        self.fail("このテストはまだ実装されていません")

class TestBankAccount(unittest.TestCase):
    """銀行口座クラスのテスト"""

    def setUp(self):
        self.account = BankAccount("test_user", 1000)

    def test_initial_balance(self):
        """初期残高のテスト"""
        self.assertEqual(self.account.get_balance(), 1000)

    def test_deposit(self):
        """入金のテスト"""
        self.account.deposit(500)
        self.assertEqual(self.account.get_balance(), 1500)

    def test_withdraw(self):
        """出金のテスト"""
        self.account.withdraw(300)
        self.assertEqual(self.account.get_balance(), 700)

    def test_withdraw_insufficient_funds(self):
        """残高不足のテスト"""
        with self.assertRaises(InsufficientFundsError):
            self.account.withdraw(2000)

    def test_transaction_history(self):
        """取引履歴のテスト"""
        self.account.deposit(500)
        self.account.withdraw(200)
        history = self.account.get_transaction_history()

        self.assertEqual(len(history), 2)
        self.assertIn("入金: +500", history[0])
        self.assertIn("出金: -200", history[1])

class TestMocking(unittest.TestCase):
    """モックを使用したテスト"""

    def test_api_call_with_mock(self):
        """API呼び出しのモックテスト"""
        # モックの作成
        mock_response = Mock()
        mock_response.status_code = 200
        mock_response.json.return_value = {'data': 'test'}

        with patch('requests.get') as mock_get:
            mock_get.return_value = mock_response

            # テスト対象の関数を呼び出し
            result = fetch_data_from_api('https://api.example.com/data')

            # 検証
            self.assertEqual(result, {'data': 'test'})
            mock_get.assert_called_once_with('https://api.example.com/data')

    def test_database_operation(self):
        """データベース操作のモックテスト"""
        with patch('sqlite3.connect') as mock_connect:
            mock_cursor = Mock()
            mock_connection = Mock()
            mock_connection.cursor.return_value = mock_cursor
            mock_connect.return_value = mock_connection

            # テスト実行
            save_user_to_db('test_user', 'test@example.com')

            # 検証
            mock_cursor.execute.assert_called_once()
            mock_connection.commit.assert_called_once()

class TestIntegration(unittest.TestCase):
    """統合テスト"""

    def setUp(self):
        self.app = create_test_app()
        self.client = self.app.test_client()
        self.db = SQLAlchemy(self.app)

    def test_user_registration_flow(self):
        """ユーザー登録フローの統合テスト"""
        # ユーザー登録
        response = self.client.post('/register', json={
            'username': 'testuser',
            'email': 'test@example.com',
            'password': 'password123'
        })

        self.assertEqual(response.status_code, 201)

        # ログイン
        response = self.client.post('/login', json={
            'username': 'testuser',
            'password': 'password123'
        })

        self.assertEqual(response.status_code, 200)
        self.assertIn('access_token', response.json)

    def test_product_purchase_flow(self):
        """商品購入フローの統合テスト"""
        # ログイン
        login_response = self.client.post('/login', json={
            'username': 'testuser',
            'password': 'password123'
        })
        token = login_response.json['access_token']

        # 商品一覧取得
        response = self.client.get('/products', 
                                 headers={'Authorization': f'Bearer {token}'})
        self.assertEqual(response.status_code, 200)

        # 注文作成
        order_data = {
            'items': [
                {'product_id': 1, 'quantity': 2},
                {'product_id': 2, 'quantity': 1}
            ]
        }
        response = self.client.post('/orders', 
                                  json=order_data,
                                  headers={'Authorization': f'Bearer {token}'})
        self.assertEqual(response.status_code, 201)

class TestPerformance(unittest.TestCase):
    """パフォーマンステスト"""

    def test_calculation_performance(self):
        """計算性能のテスト"""
        import time

        calc = Calculator()
        start_time = time.time()

        # 大量の計算を実行
        for i in range(10000):
            calc.add(i, i+1)
            calc.multiply(i, i+1)

        end_time = time.time()
        execution_time = end_time - start_time

        # 実行時間が1秒未満であることを確認
        self.assertLess(execution_time, 1.0,
                       f"計算が遅すぎます: {execution_time:.2f}秒")

class TestSecurity(unittest.TestCase):
    """セキュリティテスト"""

    def test_password_hashing(self):
        """パスワードハッシュのテスト"""
        password = "my_secret_password"
        hashed = hash_password(password)

        # 同じパスワードから同じハッシュが生成されること
        self.assertTrue(verify_password(password, hashed))

        # 異なるパスワードからは異なるハッシュが生成されること
        different_password = "different_password"
        self.assertFalse(verify_password(different_password, hashed))

    def test_sql_injection_prevention(self):
        """SQLインジェクション防止のテスト"""
        malicious_input = "'; DROP TABLE users; --"

        # 安全なクエリ構築のテスト
        user = get_user_safely(malicious_input)

        # ユーザーが存在しないことを確認(テーブルが削除されていない)
        self.assertIsNone(user)

# pytestを使用したテスト
class TestWithPytest:
    """pytestを使用したテストクラス"""

    def test_addition(self):
        """加算のテスト(pytest)"""
        calc = Calculator()
        assert calc.add(2, 3) == 5

    def test_division_by_zero(self):
        """ゼロ除算のテスト(pytest)"""
        calc = Calculator()
        with pytest.raises(ValueError):
            calc.divide(10, 0)

    @pytest.mark.parametrize("a,b,expected", [
        (1, 1, 2),
        (2, 3, 5),
        (-1, 1, 0),
        (0, 0, 0)
    ])
    def test_add_parameterized(self, a, b, expected):
        """パラメータ化された加算テスト"""
        calc = Calculator()
        assert calc.add(a, b) == expected

    @pytest.mark.slow
    def test_slow_operation(self):
        """遅い操作のテスト"""
        import time
        time.sleep(2)  # 遅い操作をシミュレート
        assert True

# カスタムテストランナー
class CustomTestRunner:
    """カスタムテストランナー"""

    def run_tests(self):
        """テストを実行"""
        loader = unittest.TestLoader()
        suite = loader.discover('.', pattern='test_*.py')

        runner = unittest.TextTestRunner(verbosity=2)
        result = runner.run(suite)

        # テスト結果の分析
        self.analyze_results(result)

    def analyze_results(self, result):
        """テスト結果を分析"""
        print(f"\nテスト結果分析:")
        print(f"実行テスト数: {result.testsRun}")
        print(f"成功: {result.testsRun - len(result.failures) - len(result.errors)}")
        print(f"失敗: {len(result.failures)}")
        print(f"エラー: {len(result.errors)}")
        print(f"スキップ: {len(getattr(result, 'skipped', []))}")

        if result.failures:
            print("\n失敗したテスト:")
            for test, traceback in result.failures:
                print(f"  - {test}")

# テスト対象のクラス(実際の実装)
class Calculator:
    def add(self, a, b):
        return a + b

    def subtract(self, a, b):
        return a - b

    def multiply(self, a, b):
        return a * b

    def divide(self, a, b):
        if b == 0:
            raise ValueError("Division by zero is not allowed")
        return a / b

    def power(self, base, exponent):
        return base ** exponent

class BankAccount:
    def __init__(self, account_holder, initial_balance=0):
        self.account_holder = account_holder
        self._balance = initial_balance
        self._transaction_history = []

    def deposit(self, amount):
        if amount <= 0:
            raise ValueError("Amount must be positive")
        self._balance += amount
        self._transaction_history.append(f"入金: +{amount}")

    def withdraw(self, amount):
        if amount <= 0:
            raise ValueError("Amount must be positive")
        if amount > self._balance:
            raise InsufficientFundsError("Insufficient funds")
        self._balance -= amount
        self._transaction_history.append(f"出金: -{amount}")

    def get_balance(self):
        return self._balance

    def get_transaction_history(self):
        return self._transaction_history.copy()

class InsufficientFundsError(Exception):
    pass

# ユーティリティ関数
def fetch_data_from_api(url):
    import requests
    response = requests.get(url)
    return response.json()

def save_user_to_db(username, email):
    import sqlite3
    conn = sqlite3.connect('test.db')
    cursor = conn.cursor()
    cursor.execute("INSERT INTO users (username, email) VALUES (?, ?)", 
                   (username, email))
    conn.commit()
    conn.close()

def hash_password(password):
    import hashlib
    return hashlib.sha256(password.encode()).hexdigest()

def verify_password(password, hashed):
    return hash_password(password) == hashed

def get_user_safely(username):
    import sqlite3
    conn = sqlite3.connect('test.db')
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users WHERE username = ?", (username,))
    user = cursor.fetchone()
    conn.close()
    return user

def create_test_app():
    from flask import Flask
    app = Flask(__name__)
    app.config['TESTING'] = True
    app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
    return app

# テストの実行
if __name__ == '__main__':
    # unittestの実行
    print("=== unittestの実行 ===")
    unittest.main(verbosity=2, exit=False)

    # カスタムテストランナーの実行
    print("\n=== カスタムテストランナーの実行 ===")
    runner = CustomTestRunner()
    runner.run_tests()

    # pytestの実行(コメントアウト。実際に実行する場合はpytestをインストール)
    # print("\n=== pytestの実行 ===")
    # pytest.main([__file__, '-v'])

46. デザインパターンの実装

from abc import ABC, abstractmethod
from typing import List, Dict, Any
import threading
import json

# Singletonパターン
class DatabaseConnection:
    """データベース接続のSingletonクラス"""

    _instance = None
    _lock = threading.Lock()

    def __new__(cls):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super().__new__(cls)
                cls._instance._initialize()
            return cls._instance

    def _initialize(self):
        """初期化処理"""
        self.connection_string = "database://localhost:5432/mydb"
        self.is_connected = False
        print("データベース接続インスタンスを作成しました")

    def connect(self):
        """データベースに接続"""
        if not self.is_connected:
            self.is_connected = True
            print("データベースに接続しました")

    def disconnect(self):
        """データベースから切断"""
        if self.is_connected:
            self.is_connected = False
            print("データベースから切断しました")

    def execute_query(self, query):
        """クエリを実行"""
        if self.is_connected:
            print(f"クエリを実行: {query}")
            return f"結果: {query}"
        else:
            raise Exception("データベースに接続されていません")

# Observerパターン
class Subject(ABC):
    """Subjectインターフェース"""

    def __init__(self):
        self._observers: List[Observer] = []

    def attach(self, observer: 'Observer'):
        """オブザーバーを登録"""
        if observer not in self._observers:
            self._observers.append(observer)

    def detach(self, observer: 'Observer'):
        """オブザーバーを解除"""
        self._observers.remove(observer)

    def notify(self):
        """すべてのオブザーバーに通知"""
        for observer in self._observers:
            observer.update(self)

class Observer(ABC):
    """Observerインターフェース"""

    @abstractmethod
    def update(self, subject: Subject):
        pass

class WeatherStation(Subject):
    """気象観測所(Subject)"""

    def __init__(self):
        super().__init__()
        self._temperature = 0
        self._humidity = 0
        self._pressure = 0

    @property
    def temperature(self):
        return self._temperature

    @property
    def humidity(self):
        return self._humidity

    @property
    def pressure(self):
        return self._pressure

    def set_measurements(self, temperature, humidity, pressure):
        """測定値を設定して通知"""
        self._temperature = temperature
        self._humidity = humidity
        self._pressure = pressure
        self.notify()

    def get_state(self) -> Dict[str, Any]:
        """状態を取得"""
        return {
            'temperature': self._temperature,
            'humidity': self._humidity,
            'pressure': self._pressure
        }

class DisplayDevice(Observer):
    """表示デバイス(Observer)"""

    def __init__(self, name: str):
        self.name = name
        self.temperature = 0
        self.humidity = 0

    def update(self, subject: Subject):
        if isinstance(subject, WeatherStation):
            self.temperature = subject.temperature
            self.humidity = subject.humidity
            self.display()

    def display(self):
        """測定値を表示"""
        print(f"[{self.name}] 温度: {self.temperature}°C, 湿度: {self.humidity}%")

class MobileApp(Observer):
    """モバイルアプリ(Observer)"""

    def __init__(self, user_name: str):
        self.user_name = user_name
        self.notifications = []

    def update(self, subject: Subject):
        if isinstance(subject, WeatherStation):
            state = subject.get_state()
            notification = f"{self.user_name}さん、天気が更新されました: {state}"
            self.notifications.append(notification)
            print(f"モバイル通知: {notification}")

# Factory Methodパターン
class Document(ABC):
    """ドキュメントの抽象クラス"""

    @abstractmethod
    def open(self):
        pass

    @abstractmethod
    def save(self):
        pass

class PDFDocument(Document):
    """PDFドキュメント"""

    def open(self):
        return "PDFドキュメントを開きました"

    def save(self):
        return "PDFドキュメントを保存しました"

class WordDocument(Document):
    """Wordドキュメント"""

    def open(self):
        return "Wordドキュメントを開きました"

    def save(self):
        return "Wordドキュメントを保存しました"

class DocumentFactory(ABC):
    """ドキュメントファクトリーの抽象クラス"""

    @abstractmethod
    def create_document(self) -> Document:
        pass

class PDFDocumentFactory(DocumentFactory):
    """PDFドキュメントファクトリー"""

    def create_document(self) -> Document:
        return PDFDocument()

class WordDocumentFactory(DocumentFactory):
    """Wordドキュメントファクトリー"""

    def create_document(self) -> Document:
        return WordDocument()

# Strategyパターン
class PaymentStrategy(ABC):
    """支払い戦略のインターフェース"""

    @abstractmethod
    def pay(self, amount: float) -> str:
        pass

class CreditCardPayment(PaymentStrategy):
    """クレジットカード支払い"""

    def __init__(self, card_number: str, expiry_date: str, cvv: str):
        self.card_number = card_number
        self.expiry_date = expiry_date
        self.cvv = cvv

    def pay(self, amount: float) -> str:
        return f"クレジットカードで{amount}円を支払いました"

class PayPalPayment(PaymentStrategy):
    """PayPal支払い"""

    def __init__(self, email: str):
        self.email = email

    def pay(self, amount: float) -> str:
        return f"PayPal({self.email})で{amount}円を支払いました"

class BankTransferPayment(PaymentStrategy):
    """銀行振込支払い"""

    def __init__(self, account_number: str):
        self.account_number = account_number

    def pay(self, amount: float) -> str:
        return f"銀行振込({self.account_number})で{amount}円を支払いました"

class PaymentProcessor:
    """支払いプロセッサー"""

    def __init__(self):
        self._strategy = None

    def set_payment_strategy(self, strategy: PaymentStrategy):
        """支払い戦略を設定"""
        self._strategy = strategy

    def process_payment(self, amount: float) -> str:
        """支払いを処理"""
        if self._strategy is None:
            raise ValueError("支払い方法が選択されていません")
        return self._strategy.pay(amount)

# Decoratorパターン
class Coffee(ABC):
    """コーヒーのインターフェース"""

    @abstractmethod
    def get_cost(self) -> float:
        pass

    @abstractmethod
    def get_description(self) -> str:
        pass

class SimpleCoffee(Coffee):
    """シンプルなコーヒー"""

    def get_cost(self) -> float:
        return 2.0

    def get_description(self) -> str:
        return "シンプルコーヒー"

class CoffeeDecorator(Coffee):
    """コーヒーデコレーターの基底クラス"""

    def __init__(self, coffee: Coffee):
        self._coffee = coffee

    def get_cost(self) -> float:
        return self._coffee.get_cost()

    def get_description(self) -> str:
        return self._coffee.get_description()

class MilkDecorator(CoffeeDecorator):
    """ミルクデコレーター"""

    def get_cost(self) -> float:
        return self._coffee.get_cost() + 0.5

    def get_description(self) -> str:
        return self._coffee.get_description() + ", ミルク"

class SugarDecorator(CoffeeDecorator):
    """砂糖デコレーター"""

    def get_cost(self) -> float:
        return self._coffee.get_cost() + 0.2

    def get_description(self) -> str:
        return self._coffee.get_description() + ", 砂糖"

class WhippedCreamDecorator(CoffeeDecorator):
    """ホイップクリームデコレーター"""

    def get_cost(self) -> float:
        return self._coffee.get_cost() + 0.7

    def get_description(self) -> str:
        return self._coffee.get_description() + ", ホイップクリーム"

# Adapterパターン
class LegacySystem:
    """レガシーシステム"""

    def specific_request(self) -> str:
        return "レガシーシステムのデータ"

class ModernSystem:
    """モダンシステムのインターフェース"""

    def request(self) -> str:
        pass

class LegacySystemAdapter(ModernSystem):
    """レガシーシステムのアダプター"""

    def __init__(self, legacy_system: LegacySystem):
        self.legacy_system = legacy_system

    def request(self) -> str:
        legacy_data = self.legacy_system.specific_request()
        return f"変換されたデータ: {legacy_data}"

def design_patterns_demo():
    """デザインパターンのデモンストレーション"""

    print("=== 1. Singletonパターン ===")
    # 同じインスタンスが返されることを確認
    db1 = DatabaseConnection()
    db2 = DatabaseConnection()

    print(f"同じインスタンスですか: {db1 is db2}")

    db1.connect()
    db1.execute_query("SELECT * FROM users")

    print("\n=== 2. Observerパターン ===")
    weather_station = WeatherStation()

    # オブザーバーの登録
    display1 = DisplayDevice("リビング表示機")
    display2 = DisplayDevice("寝室表示機")
    mobile_app = MobileApp("山田太郎")

    weather_station.attach(display1)
    weather_station.attach(display2)
    weather_station.attach(mobile_app)

    # 測定値の更新(自動的に通知される)
    weather_station.set_measurements(25, 60, 1013)
    print("---")
    weather_station.set_measurements(28, 55, 1012)

    # オブザーバーの解除
    weather_station.detach(display2)
    print("--- 寝室表示機を解除 ---")
    weather_station.set_measurements(22, 65, 1014)

    print("\n=== 3. Factory Methodパターン ===")
    pdf_factory = PDFDocumentFactory()
    word_factory = WordDocumentFactory()

    pdf_doc = pdf_factory.create_document()
    word_doc = word_factory.create_document()

    print(pdf_doc.open())
    print(word_doc.open())

    print("\n=== 4. Strategyパターン ===")
    payment_processor = PaymentProcessor()

    # クレジットカード支払い
    credit_card = CreditCardPayment("1234-5678-9012-3456", "12/25", "123")
    payment_processor.set_payment_strategy(credit_card)
    print(payment_processor.process_payment(5000))

    # PayPal支払い
    paypal = PayPalPayment("user@example.com")
    payment_processor.set_payment_strategy(paypal)
    print(payment_processor.process_payment(3000))

    print("\n=== 5. Decoratorパターン ===")
    # シンプルなコーヒー
    coffee = SimpleCoffee()
    print(f"{coffee.get_description()}: ${coffee.get_cost():.2f}")

    # ミルクと砂糖を追加
    coffee_with_milk_and_sugar = SugarDecorator(MilkDecorator(coffee))
    print(f"{coffee_with_milk_and_sugar.get_description()}: ${coffee_with_milk_and_sugar.get_cost():.2f}")

    # すべてのトッピングを追加
    deluxe_coffee = WhippedCreamDecorator(
        SugarDecorator(
            MilkDecorator(coffee)
        )
    )
    print(f"{deluxe_coffee.get_description()}: ${deluxe_coffee.get_cost():.2f}")

    print("\n=== 6. Adapterパターン ===")
    legacy_system = LegacySystem()
    adapter = LegacySystemAdapter(legacy_system)

    print(f"レガシーシステム: {legacy_system.specific_request()}")
    print(f"アダプター経由: {adapter.request()}")

# 追加のデザインパターン
class CommandPattern:
    """コマンドパターン"""

    class Command(ABC):
        @abstractmethod
        def execute(self):
            pass

        @abstractmethod
        def undo(self):
            pass

    class LightOnCommand(Command):
        def __init__(self, light):
            self.light = light

        def execute(self):
            return self.light.turn_on()

        def undo(self):
            return self.light.turn_off()

    class Light:
        def turn_on(self):
            return "照明をつけました"

        def turn_off(self):
            return "照明を消しました"

    class RemoteControl:
        def __init__(self):
            self.commands = []
            self.history = []

        def set_command(self, command):
            self.commands.append(command)

        def press_button(self, index):
            if 0 <= index < len(self.commands):
                result = self.commands[index].execute()
                self.history.append(self.commands[index])
                return result
            return "無効なボタン"

        def press_undo(self):
            if self.history:
                command = self.history.pop()
                return command.undo()
            return "実行できるコマンドがありません"

def additional_patterns_demo():
    """追加のデザインパターンデモ"""

    print("\n=== 7. Commandパターン ===")
    remote = CommandPattern.RemoteControl()
    light = CommandPattern.Light()

    light_on = CommandPattern.LightOnCommand(light)
    remote.set_command(light_on)

    print(remote.press_button(0))  # 照明をつける
    print(remote.press_undo())     # 元に戻す

if __name__ == "__main__":
    design_patterns_demo()
    additional_patterns_demo()

47. データ可視化ダッシュボード

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from datetime import datetime, timedelta
import matplotlib.gridspec as gridspec

plt.style.use('seaborn-v0_8')

class DataVisualizationDashboard:
    """データ可視化ダッシュボード"""

    def __init__(self):
        self.fig = None
        self.data = self.generate_sample_data()

    def generate_sample_data(self):
        """サンプルデータを生成"""
        np.random.seed(42)

        # 時系列データ
        dates = pd.date_range('2023-01-01', periods=100, freq='D')

        data = {
            'date': dates,
            'sales': np.random.normal(1000, 200, 100).cumsum(),
            'website_visitors': np.random.poisson(500, 100),
            'conversion_rate': np.random.beta(5, 95, 100) * 100,
            'customer_satisfaction': np.random.normal(4.2, 0.5, 100),
            'region': np.random.choice(['North', 'South', 'East', 'West'], 100),
            'product_category': np.random.choice(['Electronics', 'Clothing', 'Books', 'Home'], 100),
            'revenue': np.random.exponential(500, 100)
        }

        return pd.DataFrame(data)

    def create_sales_dashboard(self):
        """セールスダッシュボードを作成"""
        self.fig = plt.figure(figsize=(20, 12))
        gs = gridspec.GridSpec(3, 3, figure=self.fig)

        # 1. 売上推移
        ax1 = self.fig.add_subplot(gs[0, :])
        self._plot_sales_trend(ax1)

        # 2. 地域別売上
        ax2 = self.fig.add_subplot(gs[1, 0])
        self._plot_regional_sales(ax2)

        # 3. カテゴリー別売上
        ax3 = self.fig.add_subplot(gs[1, 1])
        self._plot_category_sales(ax3)

        # 4. コンバージョン率
        ax4 = self.fig.add_subplot(gs[1, 2])
        self._plot_conversion_rate(ax4)

        # 5. 顧客満足度
        ax5 = self.fig.add_subplot(gs[2, 0])
        self._plot_customer_satisfaction(ax5)

        # 6. 訪問者数 vs 売上
        ax6 = self.fig.add_subplot(gs[2, 1:])
        self._plot_visitors_vs_sales(ax6)

        plt.tight_layout()
        plt.show()

    def _plot_sales_trend(self, ax):
        """売上推移をプロット"""
        ax.plot(self.data['date'], self.data['sales'], 
                linewidth=2, color='#2E86AB', marker='o', markersize=3)
        ax.set_title('売上推移', fontsize=14, fontweight='bold', pad=20)
        ax.set_xlabel('日付')
        ax.set_ylabel('売上額')
        ax.grid(True, alpha=0.3)

        # トレンドライン
        z = np.polyfit(range(len(self.data)), self.data['sales'], 1)
        p = np.poly1d(z)
        ax.plot(self.data['date'], p(range(len(self.data))), 
                'r--', alpha=0.8, label='トレンド')
        ax.legend()

        # 最終売上額を表示
        last_sales = self.data['sales'].iloc[-1]
        ax.annotate(f'最終売上: {last_sales:,.0f}', 
                   xy=(self.data['date'].iloc[-1], last_sales),
                   xytext=(10, 10), textcoords='offset points',
                   bbox=dict(boxstyle='round,pad=0.3', facecolor='yellow', alpha=0.7))

    def _plot_regional_sales(self, ax):
        """地域別売上をプロット"""
        regional_sales = self.data.groupby('region')['revenue'].sum()
        colors = ['#A23B72', '#F18F01', '#C73E1D', '#2E86AB']

        wedges, texts, autotexts = ax.pie(regional_sales, labels=regional_sales.index,
                                         autopct='%1.1f%%', colors=colors,
                                         startangle=90)

        for autotext in autotexts:
            autotext.set_color('white')
            autotext.set_fontweight('bold')

        ax.set_title('地域別売上', fontsize=12, fontweight='bold')

    def _plot_category_sales(self, ax):
        """カテゴリー別売上をプロット"""
        category_sales = self.data.groupby('product_category')['revenue'].sum().sort_values()

        bars = ax.barh(category_sales.index, category_sales.values,
                      color=['#F18F01', '#C73E1D', '#A23B72', '#2E86AB'])

        ax.set_title('カテゴリー別売上', fontsize=12, fontweight='bold')
        ax.set_xlabel('売上額')

        # バーに数値を表示
        for bar in bars:
            width = bar.get_width()
            ax.text(width + max(category_sales.values) * 0.01, bar.get_y() + bar.get_height()/2,
                   f'{width:,.0f}', ha='left', va='center')

    def _plot_conversion_rate(self, ax):
        """コンバージョン率をプロット"""
        ax.hist(self.data['conversion_rate'], bins=15, alpha=0.7, 
               color='#2E86AB', edgecolor='black')
        ax.set_title('コンバージョン率分布', fontsize=12, fontweight='bold')
        ax.set_xlabel('コンバージョン率 (%)')
        ax.set_ylabel('頻度')
        ax.grid(True, alpha=0.3)

        # 平均値を表示
        mean_rate = self.data['conversion_rate'].mean()
        ax.axvline(mean_rate, color='red', linestyle='--', linewidth=2)
        ax.text(mean_rate, ax.get_ylim()[1] * 0.9, f'平均: {mean_rate:.2f}%',
               ha='center', va='center', backgroundcolor='white')

    def _plot_customer_satisfaction(self, ax):
        """顧客満足度をプロット"""
        satisfaction_by_region = self.data.groupby('region')['customer_satisfaction'].mean()

        bars = ax.bar(satisfaction_by_region.index, satisfaction_by_region.values,
                     color=['#A23B72', '#F18F01', '#C73E1D', '#2E86AB'])

        ax.set_title('地域別顧客満足度', fontsize=12, fontweight='bold')
        ax.set_ylabel('満足度 (1-5)')
        ax.set_ylim(0, 5)

        # バーに数値を表示
        for bar in bars:
            height = bar.get_height()
            ax.text(bar.get_x() + bar.get_width()/2, height + 0.05,
                   f'{height:.2f}', ha='center', va='bottom')

    def _plot_visitors_vs_sales(self, ax):
        """訪問者数 vs 売上の散布図"""
        scatter = ax.scatter(self.data['website_visitors'], self.data['sales'],
                           c=self.data['conversion_rate'], cmap='viridis',
                           alpha=0.6, s=50)

        ax.set_title('訪問者数 vs 売上', fontsize=12, fontweight='bold')
        ax.set_xlabel('ウェブサイト訪問者数')
        ax.set_ylabel('売上額')

        # カラーバーを追加
        cbar = plt.colorbar(scatter, ax=ax)
        cbar.set_label('コンバージョン率 (%)')

        # 相関係数を表示
        correlation = np.corrcoef(self.data['website_visitors'], self.data['sales'])[0,1]
        ax.text(0.05, 0.95, f'相関係数: {correlation:.2f}',
               transform=ax.transAxes, fontsize=10,
               bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8))

class RealTimeDashboard:
    """リアルタイムダッシュボード"""

    def __init__(self):
        self.fig, self.axes = plt.subplots(2, 2, figsize=(15, 10))
        self.data = self.generate_realtime_data()

    def generate_realtime_data(self):
        """リアルタイムデータを生成"""
        np.random.seed(42)
        timestamps = [datetime.now() - timedelta(minutes=i) for i in range(100, 0, -1)]

        return pd.DataFrame({
            'timestamp': timestamps,
            'cpu_usage': np.random.normal(50, 15, 100),
            'memory_usage': np.random.normal(60, 10, 100),
            'network_traffic': np.random.poisson(1000, 100),
            'active_users': np.random.randint(50, 200, 100)
        })

    def update_dashboard(self):
        """ダッシュボードを更新"""
        # 新しいデータポイントを追加
        new_timestamp = datetime.now()
        new_data = {
            'timestamp': new_timestamp,
            'cpu_usage': np.random.normal(50, 15),
            'memory_usage': np.random.normal(60, 10),
            'network_traffic': np.random.poisson(1000),
            'active_users': np.random.randint(50, 200)
        }

        self.data = self.data.append(new_data, ignore_index=True)
        self.data = self.data.tail(100)  # 最新100ポイントのみ保持

        self._clear_axes()
        self._plot_realtime_data()

    def _clear_axes(self):
        """軸をクリア"""
        for ax in self.axes.flat:
            ax.clear()

    def _plot_realtime_data(self):
        """リアルタイムデータをプロット"""
        # 1. CPU使用率
        self.axes[0, 0].plot(self.data['timestamp'], self.data['cpu_usage'], 
                            color='red', linewidth=2)
        self.axes[0, 0].set_title('CPU使用率 (%)', fontweight='bold')
        self.axes[0, 0].set_ylim(0, 100)
        self.axes[0, 0].grid(True, alpha=0.3)

        # 2. メモリ使用率
        self.axes[0, 1].plot(self.data['timestamp'], self.data['memory_usage'],
                            color='blue', linewidth=2)
        self.axes[0, 1].set_title('メモリ使用率 (%)', fontweight='bold')
        self.axes[0, 1].set_ylim(0, 100)
        self.axes[0, 1].grid(True, alpha=0.3)

        # 3. ネットワークトラフィック
        self.axes[1, 0].plot(self.data['timestamp'], self.data['network_traffic'],
                            color='green', linewidth=2)
        self.axes[1, 0].set_title('ネットワークトラフィック', fontweight='bold')
        self.axes[1, 0].grid(True, alpha=0.3)

        # 4. アクティブユーザー数
        self.axes[1, 1].bar(self.data['timestamp'], self.data['active_users'],
                           color='orange', alpha=0.7)
        self.axes[1, 1].set_title('アクティブユーザー数', fontweight='bold')
        self.axes[1, 1].tick_params(axis='x', rotation=45)

        plt.tight_layout()
        plt.pause(0.1)

def create_interactive_dashboard():
    """インタラクティブなダッシュボードを作成"""
    import plotly.express as px
    import plotly.graph_objects as go
    from plotly.subplots import make_subplots

    # サンプルデータの生成
    np.random.seed(42)
    categories = ['Electronics', 'Clothing', 'Books', 'Home', 'Sports']
    regions = ['North', 'South', 'East', 'West']

    data = []
    for month in range(1, 13):
        for category in categories:
            for region in regions:
                data.append({
                    'month': month,
                    'category': category,
                    'region': region,
                    'sales': np.random.exponential(1000),
                    'profit': np.random.normal(200, 50),
                    'units_sold': np.random.poisson(50)
                })

    df = pd.DataFrame(data)

    # インタラクティブなダッシュボードの作成
    fig = make_subplots(
        rows=2, cols=2,
        subplot_titles=('月別売上', 'カテゴリー別売上', '地域別利益', '売上 vs 利益'),
        specs=[[{"type": "scatter"}, {"type": "bar"}],
               [{"type": "bar"}, {"type": "scatter"}]]
    )

    # 1. 月別売上
    monthly_sales = df.groupby('month')['sales'].sum().reset_index()
    fig.add_trace(
        go.Scatter(x=monthly_sales['month'], y=monthly_sales['sales'],
                  mode='lines+markers', name='月別売上'),
        row=1, col=1
    )

    # 2. カテゴリー別売上
    category_sales = df.groupby('category')['sales'].sum().reset_index()
    fig.add_trace(
        go.Bar(x=category_sales['category'], y=category_sales['sales'],
               name='カテゴリー別売上'),
        row=1, col=2
    )

    # 3. 地域別利益
    region_profit = df.groupby('region')['profit'].sum().reset_index()
    fig.add_trace(
        go.Bar(x=region_profit['region'], y=region_profit['profit'],
               name='地域別利益'),
        row=2, col=1
    )

    # 4. 売上 vs 利益の散布図
    fig.add_trace(
        go.Scatter(x=df['sales'], y=df['profit'], mode='markers',
                  name='売上 vs 利益', marker=dict(size=8, opacity=0.6)),
        row=2, col=2
    )

    fig.update_layout(height=800, title_text="インタラクティブ販売ダッシュボード")
    fig.show()

# デモの実行
def run_dashboard_demo():
    """ダッシュボードデモを実行"""

    print("=== 静的ダッシュボード ===")
    static_dashboard = DataVisualizationDashboard()
    static_dashboard.create_sales_dashboard()

    print("=== インタラクティブダッシュボード ===")
    create_interactive_dashboard()

    # リアルタイムダッシュボードのデモ(コメントアウト)
    # print("=== リアルタイムダッシュボード ===")
    # realtime_dashboard = RealTimeDashboard()
    # for _ in range(10):  # 10回更新
    #     realtime_dashboard.update_dashboard()

if __name__ == "__main__":
    run_dashboard_demo()

48. 機械学習の基礎

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.svm import SVC
from sklearn.metrics import (accuracy_score, classification_report, 
                           confusion_matrix, mean_squared_error, r2_score)
from sklearn.datasets import load_iris, load_boston, make_classification
import warnings
warnings.filterwarnings('ignore')

class MachineLearningDemo:
    """機械学習デモクラス"""

    def __init__(self):
        self.models = {}
        self.scaler = StandardScaler()

    def load_iris_dataset(self):
        """アイリスデータセットをロード"""
        iris = load_iris()
        self.X_iris = iris.data
        self.y_iris = iris.target
        self.feature_names_iris = iris.feature_names
        self.target_names_iris = iris.target_names

        print("アイリスデータセット:")
        print(f"特徴量数: {self.X_iris.shape[1]}")
        print(f"サンプル数: {self.X_iris.shape[0]}")
        print(f"クラス: {self.target_names_iris}")

        return pd.DataFrame(self.X_iris, columns=self.feature_names_iris)

    def load_boston_dataset(self):
        """ボストンデータセットをロード"""
        boston = load_boston()
        self.X_boston = boston.data
        self.y_boston = boston.target
        self.feature_names_boston = boston.feature_names

        print("\nボストンデータセット:")
        print(f"特徴量数: {self.X_boston.shape[1]}")
        print(f"サンプル数: {self.X_boston.shape[0]}")
        print(f"ターゲット: 住宅価格")

        return pd.DataFrame(self.X_boston, columns=self.feature_names_boston)

    def create_synthetic_dataset(self):
        """合成データセットを作成"""
        X, y = make_classification(
            n_samples=1000, n_features=20, n_informative=15,
            n_redundant=5, n_clusters_per_class=1, random_state=42
        )

        self.X_synthetic = X
        self.y_synthetic = y

        print("\n合成データセット:")
        print(f"特徴量数: {self.X_synthetic.shape[1]}")
        print(f"サンプル数: {self.X_synthetic.shape[0]}")
        print(f"クラス数: {len(np.unique(y))}")

        return pd.DataFrame(X, columns=[f'feature_{i}' for i in range(20)])

    def exploratory_data_analysis(self, df, target=None):
        """探索的データ分析"""
        print("\n=== 探索的データ分析 ===")

        # 基本統計量
        print("基本統計量:")
        print(df.describe())

        # 欠損値の確認
        print(f"\n欠損値の数:")
        print(df.isnull().sum())

        # 相関行列(数値特徴量のみ)
        numeric_df = df.select_dtypes(include=[np.number])
        if len(numeric_df.columns) > 1:
            plt.figure(figsize=(10, 8))
            correlation_matrix = numeric_df.corr()
            sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0)
            plt.title('相関行列')
            plt.tight_layout()
            plt.show()

        # 特徴量の分布
        if len(numeric_df.columns) > 0:
            numeric_df.hist(bins=20, figsize=(12, 10))
            plt.suptitle('特徴量の分布')
            plt.tight_layout()
            plt.show()

    def train_classification_model(self, X, y, model_name='logistic_regression'):
        """分類モデルを訓練"""
        print(f"\n=== {model_name} 分類モデルの訓練 ===")

        # データ分割
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )

        # 特徴量の標準化
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)

        # モデルの選択
        if model_name == 'logistic_regression':
            model = LogisticRegression(random_state=42, max_iter=1000)
        elif model_name == 'random_forest':
            model = RandomForestClassifier(random_state=42, n_estimators=100)
        elif model_name == 'svm':
            model = SVC(random_state=42, probability=True)
        else:
            raise ValueError("未知のモデルです")

        # モデルの訓練
        model.fit(X_train_scaled, y_train)

        # 予測
        y_pred = model.predict(X_test_scaled)
        y_pred_proba = model.predict_proba(X_test_scaled)

        # 評価
        accuracy = accuracy_score(y_test, y_pred)
        print(f"正解率: {accuracy:.4f}")
        print(f"\n分類レポート:")
        print(classification_report(y_test, y_pred))

        # 混同行列
        plt.figure(figsize=(8, 6))
        cm = confusion_matrix(y_test, y_pred)
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
        plt.title(f'混同行列 - {model_name}')
        plt.ylabel('実際のクラス')
        plt.xlabel('予測クラス')
        plt.show()

        # モデルの保存
        self.models[model_name] = {
            'model': model,
            'accuracy': accuracy,
            'feature_importance': getattr(model, 'feature_importances_', None)
        }

        return accuracy

    def train_regression_model(self, X, y, model_name='linear_regression'):
        """回帰モデルを訓練"""
        print(f"\n=== {model_name} 回帰モデルの訓練 ===")

        # データ分割
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )

        # 特徴量の標準化
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)

        # モデルの選択
        if model_name == 'linear_regression':
            model = LinearRegression()
        elif model_name == 'random_forest':
            model = RandomForestRegressor(random_state=42, n_estimators=100)
        else:
            raise ValueError("未知のモデルです")

        # モデルの訓練
        model.fit(X_train_scaled, y_train)

        # 予測
        y_pred = model.predict(X_test_scaled)

        # 評価
        mse = mean_squared_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)

        print(f"平均二乗誤差: {mse:.4f}")
        print(f"決定係数 (R²): {r2:.4f}")

        # 予測値 vs 実際の値のプロット
        plt.figure(figsize=(10, 6))
        plt.scatter(y_test, y_pred, alpha=0.7)
        plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--', lw=2)
        plt.xlabel('実際の値')
        plt.ylabel('予測値')
        plt.title(f'予測値 vs 実際の値 - {model_name}')
        plt.grid(True, alpha=0.3)
        plt.show()

        # 残差のプロット
        residuals = y_test - y_pred
        plt.figure(figsize=(10, 6))
        plt.scatter(y_pred, residuals, alpha=0.7)
        plt.axhline(y=0, color='r', linestyle='--')
        plt.xlabel('予測値')
        plt.ylabel('残差')
        plt.title(f'残差プロット - {model_name}')
        plt.grid(True, alpha=0.3)
        plt.show()

        # モデルの保存
        self.models[model_name] = {
            'model': model,
            'mse': mse,
            'r2': r2,
            'feature_importance': getattr(model, 'feature_importances_', None)
        }

        return r2

    def hyperparameter_tuning(self, X, y, model_type='classification'):
        """ハイパーパラメータチューニング"""
        print(f"\n=== ハイパーパラメータチューニング ({model_type}) ===")

        # データ分割
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42,
            stratify=y if model_type == 'classification' else None
        )

        # 特徴量の標準化
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)

        if model_type == 'classification':
            # ランダムフォレストのハイパーパラメータチューニング
            param_grid = {
                'n_estimators': [50, 100, 200],
                'max_depth': [None, 10, 20],
                'min_samples_split': [2, 5, 10]
            }
            model = RandomForestClassifier(random_state=42)
        else:
            # ランダムフォレスト回帰のハイパーパラメータチューニング
            param_grid = {
                'n_estimators': [50, 100, 200],
                'max_depth': [None, 10, 20],
                'min_samples_split': [2, 5, 10]
            }
            model = RandomForestRegressor(random_state=42)

        # グリッドサーチ
        grid_search = GridSearchCV(
            model, param_grid, cv=5, scoring='accuracy' if model_type == 'classification' else 'r2',
            n_jobs=-1, verbose=1
        )

        grid_search.fit(X_train_scaled, y_train)

        print(f"最適なパラメータ: {grid_search.best_params_}")
        print(f"最良のスコア: {grid_search.best_score_:.4f}")

        # 最良モデルでの評価
        best_model = grid_search.best_estimator_
        y_pred = best_model.predict(X_test_scaled)

        if model_type == 'classification':
            accuracy = accuracy_score(y_test, y_pred)
            print(f"テストデータでの正解率: {accuracy:.4f}")
        else:
            r2 = r2_score(y_test, y_pred)
            print(f"テストデータでの決定係数: {r2:.4f}")

        return grid_search.best_estimator_

    def feature_importance_analysis(self, X, model, feature_names):
        """特徴量重要度の分析"""
        if hasattr(model, 'feature_importances_'):
            print("\n=== 特徴量重要度 ===")

            importances = model.feature_importances_
            indices = np.argsort(importances)[::-1]

            # 重要度のプロット
            plt.figure(figsize=(10, 6))
            plt.title("特徴量重要度")
            plt.bar(range(len(importances)), importances[indices])
            plt.xticks(range(len(importances)), 
                      [feature_names[i] for i in indices], rotation=45)
            plt.tight_layout()
            plt.show()

            # 重要度の表示
            print("特徴量重要度ランキング:")
            for i in range(min(10, len(importances))):
                print(f"{i+1:2d}. {feature_names[indices[i]]}: {importances[indices[i]]:.4f}")

    def cross_validation_evaluation(self, X, y, model, cv=5):
        """交差検証による評価"""
        print(f"\n=== {cv}-fold 交差検証 ===")

        X_scaled = self.scaler.fit_transform(X)

        if hasattr(model, 'predict_proba'):  # 分類モデル
            scores = cross_val_score(model, X_scaled, y, cv=cv, scoring='accuracy')
            print(f"交差検証スコア: {scores}")
            print(f"平均正解率: {scores.mean():.4f} (+/- {scores.std() * 2:.4f})")
        else:  # 回帰モデル
            scores = cross_val_score(model, X_scaled, y, cv=cv, scoring='r2')
            print(f"交差検証スコア: {scores}")
            print(f"平均決定係数: {scores.mean():.4f} (+/- {scores.std() * 2:.4f})")

        return scores

def machine_learning_demo():
    """機械学習デモの実行"""
    ml_demo = MachineLearningDemo()

    # データセットのロード
    iris_df = ml_demo.load_iris_dataset()
    boston_df = ml_demo.load_boston_dataset()
    synthetic_df = ml_demo.create_synthetic_dataset()

    # 探索的データ分析
    ml_demo.exploratory_data_analysis(iris_df)
    ml_demo.exploratory_data_analysis(boston_df)

    # 分類モデルの訓練(アイリスデータセット)
    ml_demo.train_classification_model(
        ml_demo.X_iris, ml_demo.y_iris, 'logistic_regression'
    )
    ml_demo.train_classification_model(
        ml_demo.X_iris, ml_demo.y_iris, 'random_forest'
    )

    # 回帰モデルの訓練(ボストンデータセット)
    ml_demo.train_regression_model(
        ml_demo.X_boston, ml_demo.y_boston, 'linear_regression'
    )
    ml_demo.train_regression_model(
        ml_demo.X_boston, ml_demo.y_boston, 'random_forest'
    )

    # ハイパーパラメータチューニング
    best_classifier = ml_demo.hyperparameter_tuning(
        ml_demo.X_iris, ml_demo.y_iris, 'classification'
    )
    best_regressor = ml_demo.hyperparameter_tuning(
        ml_demo.X_boston, ml_demo.y_boston, 'regression'
    )

    # 特徴量重要度の分析
    ml_demo.feature_importance_analysis(
        ml_demo.X_boston, best_regressor, ml_demo.feature_names_boston
    )

    # 交差検証
    ml_demo.cross_validation_evaluation(
        ml_demo.X_iris, ml_demo.y_iris, best_classifier
    )
    ml_demo.cross_validation_evaluation(
        ml_demo.X_boston, ml_demo.y_boston, best_regressor
    )

# 追加の機械学習例
class AdvancedMLExamples:
    """高度な機械学習の例"""

    @staticmethod
    def pca_analysis(X, feature_names):
        """主成分分析"""
        from sklearn.decomposition import PCA

        print("\n=== 主成分分析 ===")

        # データの標準化
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X)

        # PCAの実行
        pca = PCA()
        X_pca = pca.fit_transform(X_scaled)

        # 寄与率の表示
        explained_variance_ratio = pca.explained_variance_ratio_
        cumulative_variance = np.cumsum(explained_variance_ratio)

        print("各主成分の寄与率:")
        for i, (var, cum_var) in enumerate(zip(explained_variance_ratio, cumulative_variance)):
            print(f"PC{i+1}: {var:.4f} (累積: {cum_var:.4f})")

        # スクリープロット
        plt.figure(figsize=(10, 6))
        plt.plot(range(1, len(explained_variance_ratio) + 1), explained_variance_ratio, 'bo-')
        plt.plot(range(1, len(cumulative_variance) + 1), cumulative_variance, 'ro-')
        plt.xlabel('主成分')
        plt.ylabel('寄与率')
        plt.title('スクリープロット')
        plt.legend(['各主成分', '累積'])
        plt.grid(True, alpha=0.3)
        plt.show()

        # 第1主成分と第2主成分のプロット
        if X_pca.shape[1] >= 2:
            plt.figure(figsize=(10, 8))
            plt.scatter(X_pca[:, 0], X_pca[:, 1], alpha=0.7)
            plt.xlabel(f'PC1 ({explained_variance_ratio[0]:.2%})')
            plt.ylabel(f'PC2 ({explained_variance_ratio[1]:.2%})')
            plt.title('PCA: 第1主成分 vs 第2主成分')
            plt.grid(True, alpha=0.3)
            plt.show()

        return X_pca, pca

    @staticmethod
    def clustering_analysis(X, n_clusters=3):
        """クラスタリング分析"""
        from sklearn.cluster import KMeans
        from sklearn.metrics import silhouette_score

        print(f"\n=== K-meansクラスタリング (クラスタ数: {n_clusters}) ===")

        # データの標準化
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X)

        # K-meansクラスタリング
        kmeans = KMeans(n_clusters=n_clusters, random_state=42)
        clusters = kmeans.fit_predict(X_scaled)

        # シルエットスコアの計算
        silhouette_avg = silhouette_score(X_scaled, clusters)
        print(f"シルエットスコア: {silhouette_avg:.4f}")

        # クラスタの可視化(PCAを使用)
        pca = PCA(n_components=2)
        X_pca = pca.fit_transform(X_scaled)

        plt.figure(figsize=(10, 8))
        scatter = plt.scatter(X_pca[:, 0], X_pca[:, 1], c=clusters, cmap='viridis', alpha=0.7)
        plt.colorbar(scatter)
        plt.xlabel('PC1')
        plt.ylabel('PC2')
        plt.title(f'K-meansクラスタリング結果 (シルエットスコア: {silhouette_avg:.3f})')
        plt.grid(True, alpha=0.3)
        plt.show()

        return clusters, kmeans

if __name__ == "__main__":
    machine_learning_demo()

    # 高度な機械学習の例
    print("\n" + "="*50)
    print("高度な機械学習の例")
    print("="*50)

    ml_demo = MachineLearningDemo()
    iris_df = ml_demo.load_iris_dataset()

    advanced_ml = AdvancedMLExamples()

    # 主成分分析
    X_pca, pca = advanced_ml.pca_analysis(ml_demo.X_iris, ml_demo.feature_names_iris)

    # クラスタリング分析
    clusters, kmeans = advanced_ml.clustering_analysis(ml_demo.X_iris, n_clusters=3)

49. パフォーマンス最適化

import time
import cProfile
import pstats
import memory_profiler
from functools import lru_cache
import numpy as np
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import multiprocessing as mp

class PerformanceOptimizer:
    """パフォーマンス最適化クラス"""

    def __init__(self):
        self.results = {}

    def timer_decorator(self, func):
        """実行時間を計測するデコレータ"""
        def wrapper(*args, **kwargs):
            start_time = time.time()
            result = func(*args, **kwargs)
            end_time = time.time()
            execution_time = end_time - start_time
            print(f"{func.__name__} の実行時間: {execution_time:.4f}秒")
            self.results[func.__name__] = execution_time
            return result
        return wrapper

    def profile_function(self, func, *args, **kwargs):
        """関数のプロファイリング"""
        print(f"\n=== {func.__name__} のプロファイリング ===")
        profiler = cProfile.Profile()
        profiler.enable()

        result = func(*args, **kwargs)

        profiler.disable()
        stats = pstats.Stats(profiler)
        stats.sort_stats('cumulative')
        stats.print_stats(10)  # 上位10個の関数を表示

        return result

    def memory_usage_decorator(self, func):
        """メモリ使用量を計測するデコレータ"""
        def wrapper(*args, **kwargs):
            mem_usage = memory_profiler.memory_usage((func, args, kwargs))
            max_memory = max(mem_usage)
            print(f"{func.__name__} の最大メモリ使用量: {max_memory:.2f} MiB")
            self.results[f"{func.__name__}_memory"] = max_memory
            return func(*args, **kwargs)
        return wrapper

class OptimizationExamples:
    """最適化の例"""

    @staticmethod
    @PerformanceOptimizer().timer_decorator
    def slow_fibonacci(n):
        """遅いフィボナッチ数列の実装"""
        if n <= 1:
            return n
        return OptimizationExamples.slow_fibonacci(n-1) + OptimizationExamples.slow_fibonacci(n-2)

    @staticmethod
    @lru_cache(maxsize=None)
    def cached_fibonacci(n):
        """メモ化を使用したフィボナッチ数列"""
        if n <= 1:
            return n
        return OptimizationExamples.cached_fibonacci(n-1) + OptimizationExamples.cached_fibonacci(n-2)

    @staticmethod
    @PerformanceOptimizer().timer_decorator
    def fast_fibonacci(n):
        """高速なフィボナッチ数列の実装"""
        if n <= 1:
            return n

        a, b = 0, 1
        for _ in range(2, n + 1):
            a, b = b, a + b
        return b

    @staticmethod
    @PerformanceOptimizer().timer_decorator
    @PerformanceOptimizer().memory_usage_decorator
    def inefficient_data_processing(data_size=1000000):
        """非効率なデータ処理"""
        # リストを使用した処理
        numbers = list(range(data_size))

        # フィルタリングと変換
        result = []
        for num in numbers:
            if num % 2 == 0:
                result.append(num ** 2)

        return result

    @staticmethod
    @PerformanceOptimizer().timer_decorator
    @PerformanceOptimizer().memory_usage_decorator
    def efficient_data_processing(data_size=1000000):
        """効率的なデータ処理"""
        # NumPyを使用した処理
        numbers = np.arange(data_size)

        # ベクトル化された操作
        mask = numbers % 2 == 0
        result = numbers[mask] ** 2

        return result

    @staticmethod
    @PerformanceOptimizer().timer_decorator
    def slow_matrix_operations(size=1000):
        """遅い行列操作"""
        # ネストされたループを使用
        A = [[i * j for j in range(size)] for i in range(size)]
        B = [[i + j for j in range(size)] for i in range(size)]

        result = [[0 for _ in range(size)] for _ in range(size)]
        for i in range(size):
            for j in range(size):
                for k in range(size):
                    result[i][j] += A[i][k] * B[k][j]

        return result

    @staticmethod
    @PerformanceOptimizer().timer_decorator
    def fast_matrix_operations(size=1000):
        """高速な行列操作"""
        # NumPyを使用
        A = np.random.rand(size, size)
        B = np.random.rand(size, size)

        result = np.dot(A, B)
        return result

class ParallelProcessing:
    """並列処理の例"""

    @staticmethod
    @PerformanceOptimizer().timer_decorator
    def sequential_processing(data):
        """逐次処理"""
        results = []
        for item in data:
            results.append(ParallelProcessing.process_item(item))
        return results

    @staticmethod
    @PerformanceOptimizer().timer_decorator
    def thread_pool_processing(data):
        """スレッドプールを使用した並列処理"""
        with ThreadPoolExecutor(max_workers=4) as executor:
            results = list(executor.map(ParallelProcessing.process_item, data))
        return results

    @staticmethod
    @PerformanceOptimizer().timer_decorator
    def process_pool_processing(data):
        """プロセスプールを使用した並列処理"""
        with ProcessPoolExecutor(max_workers=4) as executor:
            results = list(executor.map(ParallelProcessing.process_item, data))
        return results

    @staticmethod
    def process_item(x):
        """処理するアイテム(重い計算をシミュレート)"""
        time.sleep(0.01)  # 重い処理をシミュレート
        return x ** 2

class MemoryOptimization:
    """メモリ最適化の例"""

    @staticmethod
    @PerformanceOptimizer().memory_usage_decorator
    def memory_inefficient_method():
        """メモリ非効率な方法"""
        # 大きなリストを作成
        big_list = [i for i in range(1000000)]

        # 不要なコピーを作成
        processed_data = []
        for item in big_list:
            temp = item * 2
            processed_data.append(temp)

        # 中間結果を保持
        intermediate = [x for x in processed_data if x % 2 == 0]
        result = [x ** 2 for x in intermediate]

        return result

    @staticmethod
    @PerformanceOptimizer().memory_usage_decorator
    def memory_efficient_method():
        """メモリ効率の良い方法"""
        # ジェネレータを使用
        def number_generator(n):
            for i in range(n):
                yield i

        # チェーンされたジェネレータ式
        result = (
            (x * 2) ** 2 
            for x in number_generator(1000000) 
            if (x * 2) % 2 == 0
        )

        # 必要に応じてリストに変換
        return list(result)

    @staticmethod
    def demonstrate_data_types():
        """データ型のメモリ使用量の比較"""
        import sys

        data_structures = {
            'list': list(range(1000)),
            'tuple': tuple(range(1000)),
            'set': set(range(1000)),
            'numpy_array': np.arange(1000),
            'pandas_series': pd.Series(range(1000))
        }

        print("\n=== データ構造のメモリ使用量 ===")
        for name, data in data_structures.items():
            memory_usage = sys.getsizeof(data)
            print(f"{name:15}: {memory_usage:8} bytes")

class AlgorithmOptimization:
    """アルゴリズム最適化の例"""

    @staticmethod
    @PerformanceOptimizer().timer_decorator
    def naive_search(numbers, target):
        """単純な線形探索"""
        for i, num in enumerate(numbers):
            if num == target:
                return i
        return -1

    @staticmethod
    @PerformanceOptimizer().timer_decorator
    def binary_search(numbers, target):
        """二分探索"""
        low, high = 0, len(numbers) - 1

        while low <= high:
            mid = (low + high) // 2
            if numbers[mid] == target:
                return mid
            elif numbers[mid] < target:
                low = mid + 1
            else:
                high = mid - 1

        return -1

    @staticmethod
    @PerformanceOptimizer().timer_decorator
    def bubble_sort(numbers):
        """バブルソート(O(n²))"""
        n = len(numbers)
        for i in range(n):
            for j in range(0, n - i - 1):
                if numbers[j] > numbers[j + 1]:
                    numbers[j], numbers[j + 1] = numbers[j + 1], numbers[j]
        return numbers

    @staticmethod
    @PerformanceOptimizer().timer_decorator
    def quick_sort(numbers):
        """クイックソート(O(n log n))"""
        if len(numbers) <= 1:
            return numbers

        pivot = numbers[len(numbers) // 2]
        left = [x for x in numbers if x < pivot]
        middle = [x for x in numbers if x == pivot]
        right = [x for x in numbers if x > pivot]

        return AlgorithmOptimization.quick_sort(left) + middle + AlgorithmOptimization.quick_sort(right)

def performance_optimization_demo():
    """パフォーマンス最適化デモ"""
    optimizer = PerformanceOptimizer()
    examples = OptimizationExamples()

    print("=== パフォーマンス最適化デモ ===")

    # フィボナッチ数列の比較
    print("\n1. フィボナッチ数列の計算比較")
    n = 35

    # 遅い実装
    try:
        examples.slow_fibonacci(n)
    except RecursionError:
        print("再帰の深さ制限に達しました")

    # メモ化を使用
    examples.cached_fibonacci(n)

    # 高速な実装
    examples.fast_fibonacci(n)

    # データ処理の比較
    print("\n2. データ処理の比較")
    data_size = 1000000

    examples.inefficient_data_processing(data_size // 10)  # サイズを小さくして実行
    examples.efficient_data_processing(data_size // 10)

    # 行列操作の比較
    print("\n3. 行列操作の比較")
    matrix_size = 500  # サイズを小さくして実行

    examples.slow_matrix_operations(matrix_size // 5)  # さらに小さく
    examples.fast_matrix_operations(matrix_size)

    # 並列処理の比較
    print("\n4. 並列処理の比較")
    data = list(range(100))

    ParallelProcessing.sequential_processing(data)
    ParallelProcessing.thread_pool_processing(data)
    ParallelProcessing.process_pool_processing(data)

    # メモリ使用量の比較
    print("\n5. メモリ使用量の比較")
    MemoryOptimization.memory_inefficient_method()
    MemoryOptimization.memory_efficient_method()
    MemoryOptimization.demonstrate_data_types()

    # アルゴリズムの比較
    print("\n6. アルゴリズムの比較")
    large_list = list(range(1000000))
    target = 999999

    AlgorithmOptimization.naive_search(large_list, target)
    AlgorithmOptimization.binary_search(large_list, target)

    # ソートアルゴリズムの比較
    small_list = list(range(1000, 0, -1))  # 逆順のリスト

    AlgorithmOptimization.bubble_sort(small_list.copy())
    AlgorithmOptimization.quick_sort(small_list.copy())

    # 結果のまとめ
    print("\n=== パフォーマンス結果のまとめ ===")
    for func_name, execution_time in optimizer.results.items():
        if 'memory' not in func_name:
            print(f"{func_name:30}: {execution_time:8.4f}秒")

# 高度な最適化テクニック
class AdvancedOptimization:
    """高度な最適化テクニック"""

    @staticmethod
    def just_in_time_compilation():
        """JITコンパイルの例"""
        try:
            from numba import jit
            import numba

            @jit(nopython=True)
            def numba_optimized_function(n):
                result = 0
                for i in range(n):
                    result += i ** 2
                return result

            print("\n=== Numba JITコンパイル ===")
            optimizer = PerformanceOptimizer()

            @optimizer.timer_decorator
            def regular_python(n):
                result = 0
                for i in range(n):
                    result += i ** 2
                return result

            @optimizer.timer_decorator
            def numba_compiled(n):
                return numba_optimized_function(n)

            n = 10000000
            regular_python(n)
            numba_compiled(n)

        except ImportError:
            print("Numbaがインストールされていません")

    @staticmethod
    def cython_optimization():
        """Cython最適化の例"""
        print("\n=== Cython最適化 ===")
        print("Cythonを使用すると、PythonコードをCのようにコンパイルして高速化できます")
        print("使用方法:")
        print("1. .pyxファイルを作成")
        print("2. setup.pyでコンパイル設定")
        print("3. コンパイルしてインポート")

    @staticmethod
    def pandas_optimization():
        """Pandas最適化の例"""
        print("\n=== Pandas最適化 ===")

        # 大きなDataFrameを作成
        n = 1000000
        df = pd.DataFrame({
            'A': np.random.randn(n),
            'B': np.random.randint(0, 100, n),
            'C': np.random.choice(['X', 'Y', 'Z'], n)
        })

        optimizer = PerformanceOptimizer()

        @optimizer.timer_decorator
        def iterative_operation(df):
            """イテレーティブな操作"""
            result = []
            for i in range(len(df)):
                if df.iloc[i]['A'] > 0:
                    result.append(df.iloc[i]['B'] * 2)
                else:
                    result.append(df.iloc[i]['B'])
            return result

        @optimizer.timer_decorator
        def vectorized_operation(df):
            """ベクトル化された操作"""
            return np.where(df['A'] > 0, df['B'] * 2, df['B'])

        @optimizer.timer_decorator
        def apply_operation(df):
            """applyを使用した操作"""
            return df.apply(lambda row: row['B'] * 2 if row['A'] > 0 else row['B'], axis=1)

        # 小さいデータでテスト
        small_df = df.head(10000)
        iterative_operation(small_df)
        vectorized_operation(small_df)
        apply_operation(small_df)

if __name__ == "__main__":
    performance_optimization_demo()

    # 高度な最適化テクニック
    print("\n" + "="*50)
    print("高度な最適化テクニック")
    print("="*50)

    AdvancedOptimization.just_in_time_compilation()
    AdvancedOptimization.cython_optimization()
    AdvancedOptimization.pandas_optimization()

50. 総合プロジェクト

from flask import Flask, render_template, request, jsonify, redirect, url_for, flash, session
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from io import BytesIO
import base64
import json
import os
from functools import wraps

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key-here'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///project_management.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

db = SQLAlchemy(app)
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'

# データモデル
class User(UserMixin, db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)
    password_hash = db.Column(db.String(128))
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    is_active = db.Column(db.Boolean, default=True)
    role = db.Column(db.String(20), default='user')  # user, manager, admin

    projects = db.relationship('Project', backref='owner', lazy=True)
    tasks = db.relationship('Task', backref='assigned_user', lazy=True)
    time_entries = db.relationship('TimeEntry', backref='user', lazy=True)

class Project(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(100), nullable=False)
    description = db.Column(db.Text)
    status = db.Column(db.String(20), default='active')  # active, completed, on_hold
    priority = db.Column(db.String(20), default='medium')  # low, medium, high
    start_date = db.Column(db.Date)
    end_date = db.Column(db.Date)
    budget = db.Column(db.Float)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)

    tasks = db.relationship('Task', backref='project', lazy=True, cascade='all, delete-orphan')
    time_entries = db.relationship('TimeEntry', backref='project', lazy=True)

class Task(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(200), nullable=False)
    description = db.Column(db.Text)
    status = db.Column(db.String(20), default='todo')  # todo, in_progress, review, done
    priority = db.Column(db.String(20), default='medium')
    due_date = db.Column(db.Date)
    estimated_hours = db.Column(db.Float)
    actual_hours = db.Column(db.Float, default=0)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    project_id = db.Column(db.Integer, db.ForeignKey('project.id'), nullable=False)
    assigned_to = db.Column(db.Integer, db.ForeignKey('user.id'))

    time_entries = db.relationship('TimeEntry', backref='task', lazy=True, cascade='all, delete-orphan')

class TimeEntry(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    description = db.Column(db.Text)
    hours = db.Column(db.Float, nullable=False)
    date = db.Column(db.Date, nullable=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)

    user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
    project_id = db.Column(db.Integer, db.ForeignKey('project.id'), nullable=False)
    task_id = db.Column(db.Integer, db.ForeignKey('task.id'))

@login_manager.user_loader
def load_user(user_id):
    return User.query.get(int(user_id))

def admin_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if not current_user.is_authenticated or current_user.role != 'admin':
            flash('管理者権限が必要です', 'error')
            return redirect(url_for('index'))
        return f(*args, **kwargs)
    return decorated_function

def manager_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if not current_user.is_authenticated or current_user.role not in ['manager', 'admin']:
            flash('マネージャー権限が必要です', 'error')
            return redirect(url_for('index'))
        return f(*args, **kwargs)
    return decorated_function

# ルートの定義
@app.route('/')
def index():
    """ホームページ"""
    if current_user.is_authenticated:
        return redirect(url_for('dashboard'))
    return render_template('index.html')

@app.route('/dashboard')
@login_required
def dashboard():
    """ダッシュボード"""
    # 基本統計
    total_projects = Project.query.filter_by(user_id=current_user.id).count()
    active_projects = Project.query.filter_by(user_id=current_user.id, status='active').count()
    total_tasks = Task.query.join(Project).filter(Project.user_id == current_user.id).count()
    completed_tasks = Task.query.join(Project).filter(
        Project.user_id == current_user.id, 
        Task.status == 'done'
    ).count()

    # 最近のタスク
    recent_tasks = Task.query.join(Project).filter(
        Project.user_id == current_user.id
    ).order_by(Task.created_at.desc()).limit(5).all()

    # 時間追跡の統計
    time_entries = TimeEntry.query.filter_by(user_id=current_user.id).all()
    total_hours = sum(entry.hours for entry in time_entries)

    # プロジェクト進捗状況
    projects = Project.query.filter_by(user_id=current_user.id).all()
    project_progress = []

    for project in projects:
        project_tasks = Task.query.filter_by(project_id=project.id).all()
        if project_tasks:
            completed = len([t for t in project_tasks if t.status == 'done'])
            progress = (completed / len(project_tasks)) * 100
        else:
            progress = 0
        project_progress.append({
            'name': project.name,
            'progress': progress
        })

    return render_template('dashboard.html',
                         total_projects=total_projects,
                         active_projects=active_projects,
                         total_tasks=total_tasks,
                         completed_tasks=completed_tasks,
                         recent_tasks=recent_tasks,
                         total_hours=total_hours,
                         project_progress=project_progress)

@app.route('/projects')
@login_required
def projects():
    """プロジェクト一覧"""
    user_projects = Project.query.filter_by(user_id=current_user.id).all()
    return render_template('projects.html', projects=user_projects)

@app.route('/projects/create', methods=['GET', 'POST'])
@login_required
def create_project():
    """プロジェクト作成"""
    if request.method == 'POST':
        name = request.form['name']
        description = request.form['description']
        status = request.form['status']
        priority = request.form['priority']
        start_date = datetime.strptime(request.form['start_date'], '%Y-%m-%d').date() if request.form['start_date'] else None
        end_date = datetime.strptime(request.form['end_date'], '%Y-%m-%d').date() if request.form['end_date'] else None
        budget = float(request.form['budget']) if request.form['budget'] else None

        project = Project(
            name=name,
            description=description,
            status=status,
            priority=priority,
            start_date=start_date,
            end_date=end_date,
            budget=budget,
            user_id=current_user.id
        )

        db.session.add(project)
        db.session.commit()

        flash('プロジェクトが作成されました', 'success')
        return redirect(url_for('projects'))

    return render_template('create_project.html')

@app.route('/projects/')
@login_required
def project_detail(project_id):
    """プロジェクト詳細"""
    project = Project.query.get_or_404(project_id)

    # 権限チェック
    if project.user_id != current_user.id and current_user.role not in ['manager', 'admin']:
        flash('アクセス権限がありません', 'error')
        return redirect(url_for('projects'))

    tasks = Task.query.filter_by(project_id=project_id).all()
    time_entries = TimeEntry.query.filter_by(project_id=project_id).all()

    return render_template('project_detail.html', 
                         project=project, 
                         tasks=tasks,
                         time_entries=time_entries)

@app.route('/tasks')
@login_required
def tasks():
    """タスク一覧"""
    user_tasks = Task.query.join(Project).filter(Project.user_id == current_user.id).all()
    return render_template('tasks.html', tasks=user_tasks)

@app.route('/time_entries')
@login_required
def time_entries():
    """時間記録一覧"""
    user_time_entries = TimeEntry.query.filter_by(user_id=current_user.id).all()
    return render_template('time_entries.html', time_entries=user_time_entries)

@app.route('/analytics')
@login_required
def analytics():
    """分析ダッシュボード"""
    # プロジェクト別時間集計
    project_hours = db.session.query(
        Project.name,
        db.func.sum(TimeEntry.hours).label('total_hours')
    ).join(TimeEntry).filter(
        TimeEntry.user_id == current_user.id
    ).group_by(Project.id).all()

    # タスクステータス分布
    task_status = db.session.query(
        Task.status,
        db.func.count(Task.id).label('count')
    ).join(Project).filter(
        Project.user_id == current_user.id
    ).group_by(Task.status).all()

    # 週次時間追跡
    weekly_hours = db.session.query(
        db.func.strftime('%Y-%W', TimeEntry.date).label('week'),
        db.func.sum(TimeEntry.hours).label('total_hours')
    ).filter(
        TimeEntry.user_id == current_user.id
    ).group_by('week').order_by('week').all()

    # チャートの生成
    charts = {
        'project_hours': generate_pie_chart(project_hours),
        'task_status': generate_bar_chart(task_status),
        'weekly_hours': generate_line_chart(weekly_hours)
    }

    return render_template('analytics.html', charts=charts)

def generate_pie_chart(data):
    """円グラフを生成"""
    labels = [item[0] for item in data]
    values = [float(item[1]) for item in data]

    plt.figure(figsize=(8, 6))
    plt.pie(values, labels=labels, autopct='%1.1f%%', startangle=90)
    plt.axis('equal')
    plt.title('プロジェクト別作業時間')

    # 画像をBase64エンコード
    buffer = BytesIO()
    plt.savefig(buffer, format='png', bbox_inches='tight')
    buffer.seek(0)
    image_png = buffer.getvalue()
    buffer.close()

    graphic = base64.b64encode(image_png).decode('utf-8')
    plt.close()

    return f"data:image/png;base64,{graphic}"

def generate_bar_chart(data):
    """棒グラフを生成"""
    labels = [item[0] for item in data]
    values = [item[1] for item in data]

    plt.figure(figsize=(10, 6))
    plt.bar(labels, values, color=['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4'])
    plt.title('タスクステータス分布')
    plt.xlabel('ステータス')
    plt.ylabel('タスク数')
    plt.xticks(rotation=45)

    buffer = BytesIO()
    plt.savefig(buffer, format='png', bbox_inches='tight')
    buffer.seek(0)
    image_png = buffer.getvalue()
    buffer.close()

    graphic = base64.b64encode(image_png).decode('utf-8')
    plt.close()

    return f"data:image/png;base64,{graphic}"

def generate_line_chart(data):
    """折れ線グラフを生成"""
    weeks = [item[0] for item in data]
    hours = [float(item[1]) for item in data]

    plt.figure(figsize=(12, 6))
    plt.plot(weeks, hours, marker='o', linewidth=2)
    plt.title('週次作業時間')
    plt.xlabel('週')
    plt.ylabel('作業時間 (時間)')
    plt.xticks(rotation=45)
    plt.grid(True, alpha=0.3)

    buffer = BytesIO()
    plt.savefig(buffer, format='png', bbox_inches='tight')
    buffer.seek(0)
    image_png = buffer.getvalue()
    buffer.close()

    graphic = base64.b64encode(image_png).decode('utf-8')
    plt.close()

    return f"data:image/png;base64,{graphic}"

# APIエンドポイント
@app.route('/api/projects//progress')
@login_required
def api_project_progress(project_id):
    """プロジェクト進捗API"""
    project = Project.query.get_or_404(project_id)

    if project.user_id != current_user.id and current_user.role not in ['manager', 'admin']:
        return jsonify({'error': 'アクセス権限がありません'}), 403

    tasks = Task.query.filter_by(project_id=project_id).all()
    total_tasks = len(tasks)
    completed_tasks = len([t for t in tasks if t.status == 'done'])

    progress = (completed_tasks / total_tasks * 100) if total_tasks > 0 else 0

    return jsonify({
        'project_id': project_id,
        'total_tasks': total_tasks,
        'completed_tasks': completed_tasks,
        'progress': progress
    })

@app.route('/api/time_entries', methods=['POST'])
@login_required
def api_create_time_entry():
    """時間記録作成API"""
    data = request.get_json()

    time_entry = TimeEntry(
        description=data['description'],
        hours=float(data['hours']),
        date=datetime.strptime(data['date'], '%Y-%m-%d').date(),
        user_id=current_user.id,
        project_id=data['project_id'],
        task_id=data.get('task_id')
    )

    db.session.add(time_entry)
    db.session.commit()

    return jsonify({'message': '時間記録が作成されました', 'id': time_entry.id}), 201

# 認証関連
@app.route('/register', methods=['GET', 'POST'])
def register():
    """ユーザー登録"""
    if request.method == 'POST':
        username = request.form['username']
        email = request.form['email']
        password = request.form['password']

        if User.query.filter_by(username=username).first():
            flash('ユーザー名は既に使用されています', 'error')
            return render_template('register.html')

        if User.query.filter_by(email=email).first():
            flash('メールアドレスは既に使用されています', 'error')
            return render_template('register.html')

        user = User(
            username=username,
            email=email,
            password_hash=generate_password_hash(password)
        )

        db.session.add(user)
        db.session.commit()

        flash('登録が完了しました。ログインしてください。', 'success')
        return redirect(url_for('login'))

    return render_template('register.html')

@app.route('/login', methods=['GET', 'POST'])
def login():
    """ログイン"""
    if request.method == 'POST':
        username = request.form['username']
        password = request.form['password']

        user = User.query.filter_by(username=username).first()

        if user and check_password_hash(user.password_hash, password):
            login_user(user)
            next_page = request.args.get('next')
            return redirect(next_page or url_for('dashboard'))
        else:
            flash('ユーザー名またはパスワードが正しくありません', 'error')

    return render_template('login.html')

@app.route('/logout')
@login_required
def logout():
    """ログアウト"""
    logout_user()
    flash('ログアウトしました', 'success')
    return redirect(url_for('index'))

# 管理機能
@app.route('/admin')
@admin_required
def admin_dashboard():
    """管理ダッシュボード"""
    total_users = User.query.count()
    total_projects = Project.query.count()
    total_tasks = Task.query.count()

    recent_users = User.query.order_by(User.created_at.desc()).limit(5).all()
    recent_projects = Project.query.order_by(Project.created_at.desc()).limit(5).all()

    return render_template('admin_dashboard.html',
                         total_users=total_users,
                         total_projects=total_projects,
                         total_tasks=total_tasks,
                         recent_users=recent_users,
                         recent_projects=recent_projects)

def init_db():
    """データベースの初期化"""
    with app.app_context():
        db.create_all()

        # 管理者ユーザーの作成
        if not User.query.filter_by(username='admin').first():
            admin_user = User(
                username='admin',
                email='admin@example.com',
                password_hash=generate_password_hash('admin123'),
                role='admin'
            )
            db.session.add(admin_user)
            db.session.commit()

            # サンプルプロジェクトの作成
            sample_project = Project(
                name='サンプルプロジェクト',
                description='これはサンプルプロジェクトです',
                status='active',
                priority='high',
                start_date=datetime.utcnow().date(),
                user_id=admin_user.id
            )
            db.session.add(sample_project)
            db.session.commit()

            # サンプルタスクの作成
            sample_tasks = [
                Task(
                    title='プロジェクト計画の作成',
                    description='プロジェクトの計画書を作成する',
                    status='done',
                    priority='high',
                    project_id=sample_project.id,
                    assigned_to=admin_user.id
                ),
                Task(
                    title='要件定義',
                    description='プロジェクトの要件を定義する',
                    status='in_progress',
                    priority='high',
                    project_id=sample_project.id,
                    assigned_to=admin_user.id
                ),
                Task(
                    title='開発環境のセットアップ',
                    description='開発環境を準備する',
                    status='todo',
                    priority='medium',
                    project_id=sample_project.id
                )
            ]
            db.session.add_all(sample_tasks)
            db.session.commit()

if __name__ == '__main__':
    init_db()
    app.run(debug=True, port=5000)

# テンプレートファイルの例 (templates/dashboard.html)
"""



    ダッシュボード - プロジェクト管理システム
    
    


    

    

ダッシュボード

{% with messages = get_flashed_messages(with_categories=true) %} {% if messages %} {% for category, message in messages %}
{{ message }}
{% endfor %} {% endif %} {% endwith %}

{{ total_projects }}

プロジェクト数

{{ active_projects }}

アクティブプロジェクト

{{ total_tasks }}

総タスク数

{{ total_hours|round(1) }}

総作業時間

最近のタスク
{% if recent_tasks %}
{% for task in recent_tasks %}
{{ task.title }}
{{ task.status }}

{{ task.description[:100] }}{% if task.description|length > 100 %}...{% endif %}

プロジェクト: {{ task.project.name }}
{% endfor %}
{% else %}

タスクがありません

{% endif %}
プロジェクト進捗
{% if project_progress %} {% for project in project_progress %}
{{ project.name }} {{ "%.1f"|format(project.progress) }}%
{% endfor %} {% else %}

プロジェクトがありません

{% endif %}
"""