デバッグ技法とログ出力の基本

2025-11-12

はじめに

プログラミングにおいて、バグのない完璧なコードを最初から書くことはほとんど不可能です。実際の開発プロセスでは、コードのデバッグ(バグの発見と修正)が開発時間の大部分を占めます。効果的なデバッグ技法と体系的なログ出力を習得することは、生産性の高い開発者になるために不可欠なスキルです。

本章では、Pythonにおける実践的なデバッグ技法と、プロフェッショナルなログ出力の基本を詳細に解説します。これらのスキルを習得することで、問題の早期発見、迅速な解決、そしてより堅牢なアプリケーションの構築が可能になります。

デバッグ技法の基礎

デバッグの基本プロセス

効果的なデバッグは、体系的なアプローチが必要です。
以下のステップに従うことで、効率的に問題を解決できます。

  1. 問題の再現: バグを確実に再現する方法を見つける
  2. 問題の局所化: 問題の発生箇所を特定する
  3. 原因の特定: 根本原因を突き止める
  4. 修正の実施: 適切な修正を行う
  5. 検証: 修正が問題を解決し、新たな問題を生んでいないことを確認する

printデバッグの限界とその先

多くの初学者が最初に学ぶデバッグ方法はprint文を使用した方法です。

def calculate_average(numbers):
    print(f"入力: {numbers}")  # デバッグ用print
    total = sum(numbers)
    print(f"合計: {total}")    # デバッグ用print
    average = total / len(numbers)
    print(f"平均: {average}")  # デバッグ用print
    return average

printデバッグはシンプルで直感的ですが、以下のような限界があります。

  • 大量の出力でコンソールが汚れる
  • デバッグ終了後のprint文削除の手間
  • 詳細な実行コンテキスト情報の欠如
  • 本番環境での使用困難

ロギングによるデバッグ

ロギングはprintデバッグの進化形として、より構造化されたアプローチを提供します。Pythonの logging モジュールを用いてリストの平均を計算するcalculate_average_with_logging 関数を定義してます。

import logging

# ロギングの基本設定
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

def calculate_average_with_logging(numbers):
    logging.debug(f"関数開始: calculate_average({numbers})")

    if not numbers:
        logging.warning("空のリストが渡されました")
        return 0

    try:
        total = sum(numbers)
        logging.debug(f"合計計算: {total}")

        average = total / len(numbers)
        logging.info(f"平均計算完了: {average}")

        return average

    except Exception as e:
        logging.error(f"平均計算中にエラー: {e}", exc_info=True)
        raise

# 使用例
numbers = [10, 20, 30, 40, 50]
result = calculate_average_with_logging(numbers)

関数開始時や合計計算時にはデバッグレベルで情報を記録し、平均計算完了時には情報レベルで結果を出力します。また空リストが渡された場合には警告を出力し、計算中に例外が発生した場合にはエラーレベルで詳細情報を記録して例外を再送出することで、計算処理の流れや問題発生時の原因を追跡しやすくしている設計になっています。

アサーションによる防御的プログラミング

アサーションは、プログラムの特定の時点で真であるべき条件をチェックします。このコードは、process_user_data 関数内で Python の アサーションを使って前提条件と事後条件をチェックし、ユーザーデータの整合性を保証する設計になっています。

def process_user_data(user_data):
    # 前提条件のチェック
    assert isinstance(user_data, dict), "user_dataは辞書である必要があります"
    assert 'name' in user_data, "user_dataにはnameキーが必要です"
    assert 'age' in user_data, "user_dataにはageキーが必要です"
    assert user_data['age'] >= 0, "年齢は0以上である必要があります"

    # メインの処理
    name = user_data['name']
    age = user_data['age']

    # 事後条件のチェック
    result = f"{name}は{age}歳です"
    assert isinstance(result, str), "結果は文字列である必要があります"

    return result

# アサーションを有効にして実行(本番環境では無効化可能)
try:
    print(process_user_data({'name': 'Alice', 'age': 25}))
    print(process_user_data({'name': 'Bob', 'age': -5}))  # アサーションエラー
except AssertionError as e:
    print(f"アサーションエラー: {e}")

具体的には、引数 user_data が辞書であることや 'name''age' キーが存在すること、年齢が 0 以上であることを前提条件として検証し、処理後には結果が文字列であることを確認してから返却します。アサーション違反が発生すると例外が投げられ、try ブロックで捕捉してエラーメッセージを表示することで、開発中にデータの不正を検出しやすくしています。

pdbによる対話的デバッグ

Python標準のデバッガpdbを使用すると、プログラムの実行を一時停止し、変数の検査やステップ実行が可能です。次のコードは、complex_calculation 関数の実行を対話的に追跡できるようにした例です。

import pdb

def complex_calculation(data):
    # デバッガーの開始
    pdb.set_trace()

    result = 0
    for i, item in enumerate(data):
        # 各ループで変数を検査可能
        processed = item * 2 + i
        result += processed

        # 条件付きブレークポイント
        if result > 100:
            pdb.set_trace()  # 結果が100を超えた時点で停止

    return result

# 使用例(実際の実行時に対話的デバッグが可能)
# data = [10, 20, 30, 40, 50]
# result = complex_calculation(data)

pdb.set_trace() によって処理の途中で実行を一時停止し、変数の値やループの進行状況を確認しながらデバッグできる設計になっています。また、結果が 100 を超えた場合に条件付きでブレークポイントを設定することで、特定の状況下のみ停止させて詳細な解析が可能となっています。

pdbの主要コマンド

  • l(ist): 現在のコード表示
  • n(ext): 次の行へ進む
  • s(tep): 関数の中へステップイン
  • c(ontinue): 実行再開
  • p(rint): 変数の値表示
  • q(uit): デバッガ終了

ログ出力の基本

ロギングの重要性

ログ出力は、単なるデバッグツールではなく、以下の目的で重要な役割を果たします。

  1. 問題調査: 本番環境での問題原因究明
  2. 監視: アプリケーションの健全性監視
  3. 監査: セキュリティイベントや重要な操作の記録
  4. パフォーマンス分析: 処理時間やリソース使用状況の追跡

ロギングレベル

Pythonのloggingモジュールは、重要度に応じたログレベルを提供します。debug は開発中の詳細情報、info は正常動作の重要なイベント、warning は問題の可能性がある状況、error は重大な問題、critical はプログラム継続が困難な致命的状況をそれぞれ記録する設計になっています。

import logging

# 各レベルの使用例
logging.debug("詳細なデバッグ情報")      # レベル10 - 開発中の詳細情報
logging.info("正常な動作情報")           # レベル20 - 正常系の重要なイベント
logging.warning("警告情報")              # レベル30 - 問題の可能性がある状況
logging.error("エラー情報")              # レベル40 - 重大な問題
logging.critical("致命的なエラー情報")   # レベル50 - プログラム継続困難な状況

基本的なロギング設定

次のコードは、Python の logging モジュールを用いて包括的なロギング設定を行う例で、アプリケーション専用のロガー my_application を作成し、ログの詳細をフォーマットで整えた上で、標準出力用のコンソールハンドラー(INFO 以上)、すべてのログを記録するファイルハンドラー(DEBUG 以上)、エラーレベルのみを記録する別ファイルハンドラー(ERROR 以上)を追加することで、ログの出力先や重要度に応じた管理を柔軟に行える設計になっています。

import logging
import sys

def setup_logging():
    """包括的なロギング設定"""

    # ロガーの作成
    logger = logging.getLogger('my_application')
    logger.setLevel(logging.DEBUG)

    # フォーマッターの作成
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s'
    )

    # コンソールハンドラー
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(logging.INFO)
    console_handler.setFormatter(formatter)

    # ファイルハンドラー
    file_handler = logging.FileHandler('application.log', encoding='utf-8')
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(formatter)

    # エラーファイルハンドラー(エラーレベルのみ)
    error_handler = logging.FileHandler('errors.log', encoding='utf-8')
    error_handler.setLevel(logging.ERROR)
    error_handler.setFormatter(formatter)

    # ハンドラーの追加
    logger.addHandler(console_handler)
    logger.addHandler(file_handler)
    logger.addHandler(error_handler)

    return logger

# ロギングのセットアップ
logger = setup_logging()

コンテキスト情報のロギング

デバッグを容易にするために、関連するコンテキスト情報をログに含めます。次のコードは、関数呼び出しを自動的にログに記録するデコレータ log_function_call を定義し、注文処理を行う process_order 関数に適用した例です。

デコレータは関数の開始時や正常終了時、例外発生時にそれぞれデバッグ・エラーログを記録し、関数内部では注文の開始、空注文の警告、合計金額の計算などの情報をログに出力することで、処理の流れや問題発生時の原因を追跡しやすくする設計になっています。

import logging
from functools import wraps

def log_function_call(func):
    """関数呼び出しをログに記録するデコレータ"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        logger = logging.getLogger(func.__module__)

        # 関数開始のログ
        logger.debug(f"関数開始: {func.__name__}(args={args}, kwargs={kwargs})")

        try:
            result = func(*args, **kwargs)
            # 関数正常終了のログ
            logger.debug(f"関数正常終了: {func.__name__} -> {result}")
            return result

        except Exception as e:
            # 関数異常終了のログ
            logger.error(
                f"関数異常終了: {func.__name__} - {e}",
                exc_info=True,
                extra={'args': args, 'kwargs': kwargs}
            )
            raise

    return wrapper

@log_function_call
def process_order(order_id, items, user_id):
    """注文処理関数"""
    logger = logging.getLogger(__name__)

    logger.info(f"注文処理開始: order_id={order_id}, user_id={user_id}")

    # 何らかの処理...
    if not items:
        logger.warning(f"空の注文: order_id={order_id}")
        return None

    total = sum(item['price'] * item['quantity'] for item in items)
    logger.info(f"注文合計計算: order_id={order_id}, total={total}")

    return total

# 使用例
try:
    order_items = [
        {'name': '商品A', 'price': 1000, 'quantity': 2},
        {'name': '商品B', 'price': 500, 'quantity': 1}
    ]
    result = process_order("ORD123", order_items, "USER456")
except Exception as e:
    logging.error("注文処理中にエラーが発生しました")

パフォーマンスロギング

次のコードは、Python の contextlib.contextmanager を用いて、処理の実行時間を計測してログに記録するコンテキストマネージャ log_execution_time を定義した例です。

with ブロック内で指定した処理を実行する前後で開始時刻と終了時刻を取得し、処理の開始・終了および実行時間をログに出力する設計になっています。

expensive_operation 関数では、このコンテキストマネージャを利用して高負荷処理の所要時間を記録し、処理結果をデバッグログとして出力することで、性能分析や処理時間の監視を容易に行えるようになっています。

import logging
import time
from contextlib import contextmanager

@contextmanager
def log_execution_time(operation_name):
    """実行時間を計測してログに記録するコンテキストマネージャ"""
    start_time = time.time()
    logger = logging.getLogger('performance')

    try:
        logger.info(f"操作開始: {operation_name}")
        yield
    finally:
        end_time = time.time()
        execution_time = end_time - start_time
        logger.info(f"操作終了: {operation_name} - 実行時間: {execution_time:.3f}秒")

# 使用例
def expensive_operation():
    """時間のかかる処理"""
    with log_execution_time("高負荷処理"):
        # 何らかの重い処理
        time.sleep(2)
        result = sum(i * i for i in range(10000))
        logging.debug(f"処理結果: {result}")
    return result

# 実行
expensive_operation()

デバッグ技法とベストプラクティス

条件付きブレークポイント

次のコードは、条件付きで Python のデバッガ pdb を起動できる debug_conditionally 関数を定義し、process_data_batch 関数内でデータバッチを処理する際に特定の条件下でデバッグを実行する設計になっています。

具体的には、6番目の要素で必ずデバッガを起動し、さらに値が閾値を超えた場合には条件付きでデバッグを開始することで、問題のあるデータや特定の処理状況を対話的に確認しながら処理の挙動を解析できるようになっています。

def debug_conditionally(condition=True):
    """条件付きデバッグ"""
    if condition:
        import pdb
        pdb.set_trace()

def process_data_batch(data_batch, threshold=100):
    """データバッチ処理"""
    results = []

    for i, data in enumerate(data_batch):
        # 特定の条件でデバッガを起動
        if i == 5:  # 6番目の要素で停止
            debug_conditionally()

        # 値が閾値を超えた場合にデバッグ
        if data > threshold:
            debug_conditionally(condition=(data > 1000))

        processed = data * 2
        results.append(processed)

    return results

例外の詳細ロギング

次のコードは、例外発生時に詳細な情報をログに記録する detailed_exception_logging 関数を定義した例で、risky_operation 内で意図的に例外が発生した場合に、例外の種類とメッセージをエラーレベルで記録します。

スタックトレースをデバッグレベルで出力するとともに、発生時点のローカル変数の状態も取得してデバッグログに記録することで、問題発生時の原因究明やデバッグ作業を容易に行える設計になっています。

import logging
import traceback

def detailed_exception_logging():
    """詳細な例外情報のロギング"""
    try:
        # 何らかの処理
        risky_operation()

    except Exception as e:
        logger = logging.getLogger('exception_tracking')

        # 基本例外情報
        logger.error(f"例外発生: {type(e).__name__}: {e}")

        # スタックトレース
        logger.debug("スタックトレース:", exc_info=True)

        # ローカル変数の情報(デバッグレベル)
        import inspect
        frame = inspect.currentframe()
        try:
            locals_info = frame.f_back.f_locals
            logger.debug(f"ローカル変数: {locals_info}")
        finally:
            del frame

        raise

def risky_operation():
    """リスクのある操作"""
    data = {"key": "value"}
    # 意図的に例外を発生
    return data["nonexistent_key"]

ログローテーション

長時間実行されるアプリケーションでは、ログファイルのローテーションが重要です。次のコードは、ログをサイズや時間に応じて自動的にローテーションさせる設定を行う setup_rotating_logging 関数を定義した例です。

サイズベースの RotatingFileHandler により 10MB ごとにログファイルを切り替え最大 3 ファイルを保持し、時間ベースの TimedRotatingFileHandler により日次でログをローテーションして過去 7 日分を保持することで、ログファイルの肥大化を防ぎつつ、重要なログ情報を効率的に管理できる設計になっています。

import logging
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler

def setup_rotating_logging():
    """ローテーションするロギング設定"""
    logger = logging.getLogger('rotating_logger')
    logger.setLevel(logging.INFO)

    # サイズベースのローテーション(10MBごと、3ファイル保持)
    size_handler = RotatingFileHandler(
        'app.log', maxBytes=10*1024*1024, backupCount=3, encoding='utf-8'
    )

    # 時間ベースのローテーション(日次、7日分保持)
    time_handler = TimedRotatingFileHandler(
        'app_time.log', when='midnight', interval=1, backupCount=7, encoding='utf-8'
    )

    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )

    size_handler.setFormatter(formatter)
    time_handler.setFormatter(formatter)

    logger.addHandler(size_handler)
    logger.addHandler(time_handler)

    return logger

構造化ロギング

次のコードは、ログを JSON 形式などで構造化して出力する StructuredFormatter クラスを定義し、setup_structured_logging 関数で構造化ログ用のロガーを設定した例です。

タイムスタンプ、ログレベル、ロガー名、メッセージ、モジュール名、行番号などの基本情報に加え、extra で渡された追加情報も JSON に含めて出力することで、ログ解析や外部システムへの連携に適した形式でログを管理できる設計になっています。

import logging
import json

class StructuredFormatter(logging.Formatter):
    """構造化ログフォーマッタ"""

    def format(self, record):
        log_entry = {
            'timestamp': self.formatTime(record),
            'level': record.levelname,
            'logger': record.name,
            'message': record.getMessage(),
            'module': record.module,
            'line': record.lineno
        }

        # 追加のextra情報がある場合
        if hasattr(record, 'extra_data'):
            log_entry.update(record.extra_data)

        return json.dumps(log_entry, ensure_ascii=False)

def setup_structured_logging():
    """構造化ロギングの設定"""
    logger = logging.getLogger('structured')
    logger.setLevel(logging.INFO)

    handler = logging.StreamHandler()
    formatter = StructuredFormatter()
    handler.setFormatter(formatter)

    logger.addHandler(handler)
    return logger

# 使用例
structured_logger = setup_structured_logging()
structured_logger.info(
    "ユーザーアクション",
    extra={'extra_data': {'user_id': '123', 'action': 'login', 'ip': '192.168.1.1'}}
)

まとめ

効果的なデバッグ技法と体系的なログ出力は、現代のソフトウェア開発において不可欠なスキルです。printデバッグから始まり、ロギング、対話的デバッガの使用、そして構造化ロギングへと進化することで、より効率的な問題解決と、本番環境での効果的な監視が可能になります。

これらの技術を組み合わせることで、開発時のデバッグ効率が向上するだけでなく、本番環境での問題追跡やパフォーマンス分析も容易になります。実際のプロジェクトでは、プロジェクトの規模や要件に応じて、これらの技法を適切に組み合わせて使用することが重要です。


演習問題

初級問題

問題1: 基本的なロギング設定
loggingモジュールを使用して、コンソールとファイルの両方にログを出力する基本的な設定を行ってください。DEBUGレベル以上のログをファイルに、INFOレベル以上のログをコンソールに出力するようにしてください。

問題2: 関数の実行時間計測
任意の関数の実行時間を計測し、ログに記録するデコレータを作成してください。実行時間が1秒を超える場合はWARNINGレベルでログを出力してください。

問題3: 簡単なデバッグ用print関数
デバッグモードが有効な時のみメッセージを表示する、デバッグ用のprint関数を作成してください。関数名、変数名、値の情報を含めて表示できるようにしてください。

中級問題

問題4: 条件付きブレークポイント
リスト処理関数で、特定の条件(例えば値が100以上)を満たす要素を処理する時にデバッガが起動するようにしてください。pdbを使用した条件付きブレークポイントを実装してください。

問題5: 例外の詳細ロギング
例外が発生した際に、例外情報、スタックトレース、ローカル変数の状態を詳細にログに記録する関数を作成してください。

問題6: ログローテーションの設定
ファイルサイズが1MBを超えると新しいファイルにローテーションするロギング設定を作成してください。最大3ファイルまで保持するようにしてください。

問題7: コンテキスト情報付きロギング
関数呼び出しの開始、終了、引数、戻り値を自動的にログに記録するデコレータを作成してください。

問題8: パフォーマンスモニタリング
複数の関数の実行時間を計測し、統計情報(平均、最大、最小時間)をログに出力するシステムを作成してください。

問題9: 構造化ログ出力
JSON形式で構造化されたログを出力するフォーマッタを作成してください。タイムスタンプ、ログレベル、メッセージ、カスタムフィールドを含めるようにしてください。

上級問題

問題10: 分散システムでのトレーシング
複数の関数呼び出しを跨いで一意のトレースIDを伝播させ、関連するログを同じトレースIDで記録するシステムを作成してください。

問題11: 動的ログレベル制御
アプリケーションの実行中にログレベルを動的に変更できるようにする仕組みを実装してください。設定ファイルの変更やシグナル受信でログレベルを変更できるようにしてください。

問題12: 包括的なデバッグ環境
対話的デバッグ、詳細なロギング、パフォーマンス計測、カスタム例外処理を統合した包括的なデバッグ環境をクラスとして設計してください。

初級問題

問題1: 基本的なロギング設定

import logging
import sys

def setup_basic_logging():
    """
    基本的なロギング設定
    DEBUGレベル以上をファイルに、INFOレベル以上をコンソールに出力
    """
    # ルートロガーの設定
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)

    # フォーマッターの作成
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )

    # コンソールハンドラー (INFOレベル以上)
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(logging.INFO)
    console_handler.setFormatter(formatter)

    # ファイルハンドラー (DEBUGレベル以上)
    file_handler = logging.FileHandler('debug.log', encoding='utf-8')
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(formatter)

    # 既存のハンドラーをクリアして新しいハンドラーを追加
    logger.handlers.clear()
    logger.addHandler(console_handler)
    logger.addHandler(file_handler)

    return logger

# テスト
print("=== 問題1テスト ===")
logger = setup_basic_logging()

logger.debug("これはDEBUGレベルのメッセージです - ファイルのみに出力")
logger.info("これはINFOレベルのメッセージです - コンソールとファイルに出力")
logger.warning("これはWARNINGレベルのメッセージです")
logger.error("これはERRORレベルのメッセージです")

# 実際の使用例
def example_function():
    logger.info("関数の実行を開始します")
    try:
        result = 10 / 2
        logger.debug(f"計算結果: {result}")
        return result
    except Exception as e:
        logger.error(f"エラーが発生しました: {e}")
        raise

example_function()

問題2: 関数の実行時間計測

import logging
import time
from functools import wraps

def log_execution_time(func):
    """
    関数の実行時間を計測しログに記録するデコレータ
    実行時間が1秒を超える場合はWARNINGレベルで出力
    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        logger = logging.getLogger(func.__module__)

        start_time = time.time()
        try:
            result = func(*args, **kwargs)
            return result
        finally:
            end_time = time.time()
            execution_time = end_time - start_time

            if execution_time > 1.0:
                logger.warning(
                    f"関数 {func.__name__} の実行時間が長い: {execution_time:.3f}秒"
                )
            else:
                logger.info(
                    f"関数 {func.__name__} の実行時間: {execution_time:.3f}秒"
                )

    return wrapper

# テスト用のロギング設定
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

@log_execution_time
def quick_function():
    """速い関数"""
    time.sleep(0.1)
    return "quick done"

@log_execution_time
def slow_function():
    """遅い関数"""
    time.sleep(1.5)
    return "slow done"

@log_execution_time
def process_data(data_size):
    """データ処理関数"""
    data = [i * i for i in range(data_size)]
    time.sleep(data_size / 1000)  # データサイズに比例して時間がかかる
    return sum(data)

# テスト
print("\n=== 問題2テスト ===")
print(quick_function())
print(slow_function())
print(process_data(500))
print(process_data(2000))

問題3: 簡単なデバッグ用print関数

import inspect

class DebugPrinter:
    """デバッグ用print機能を提供するクラス"""

    def __init__(self, enabled=True):
        self.enabled = enabled

    def enable(self):
        """デバッグ出力を有効化"""
        self.enabled = True

    def disable(self):
        """デバッグ出力を無効化"""
        self.enabled = False

    def debug_print(self, *args, **kwargs):
        """デバッグ情報付きでprint"""
        if not self.enabled:
            return

        # 呼び出し元の情報を取得
        frame = inspect.currentframe().f_back
        filename = frame.f_code.co_filename
        lineno = frame.f_lineno
        function_name = frame.f_code.co_name

        # 変数情報の抽出
        local_vars = frame.f_locals

        # 基本情報の表示
        print(f"[DEBUG] {filename}:{lineno} in {function_name}")

        # 引数の表示
        if args:
            print("  値:", ", ".join(str(arg) for arg in args))

        # キーワード引数の表示
        if kwargs:
            print("  キーワード引数:")
            for key, value in kwargs.items():
                print(f"    {key} = {value}")

        # 特定の変数を指定して表示
        if 'vars' in kwargs:
            var_names = kwargs['vars']
            if isinstance(var_names, str):
                var_names = [var_names]

            print("  変数値:")
            for var_name in var_names:
                if var_name in local_vars:
                    print(f"    {var_name} = {local_vars[var_name]}")
                else:
                    print(f"    {var_name} = (未定義)")

        print()  # 空行

    def watch(self, variable_name, value):
        """変数の監視表示"""
        if self.enabled:
            frame = inspect.currentframe().f_back
            filename = frame.f_code.co_filename
            lineno = frame.f_lineno
            print(f"[WATCH] {filename}:{lineno} - {variable_name} = {value}")

# グローバルなデバッグプリンターの作成
debug = DebugPrinter(enabled=True)

# テスト
print("\n=== 問題3テスト ===")
def test_function(x, y):
    z = x + y
    debug.debug_print("計算開始", x, y)
    debug.watch("z", z)

    result = z * 2
    debug.debug_print("中間結果", result, vars=['x', 'y', 'z', 'result'])

    final_result = result + 10
    debug.debug_print("最終結果", final_result)

    return final_result

# デバッグ有効で実行
result = test_function(5, 3)
print(f"結果: {result}")

print("\n--- デバッグ無効で実行 ---")
debug.disable()
result2 = test_function(10, 20)
print(f"結果: {result2}")

中級問題

問題4: 条件付きブレークポイント

import pdb
import logging

def conditional_breakpoint(condition=True, message=""):
    """
    条件付きブレークポイント
    conditionがTrueの場合にデバッガを起動
    """
    if condition:
        print(f"🔍 ブレークポイント: {message}")
        print("デバッガを起動します...")
        pdb.set_trace()

def process_data_list(data_list, threshold=100):
    """
    データリストを処理する関数
    条件付きブレークポイントを設定
    """
    results = []
    logger = logging.getLogger(__name__)

    logger.info(f"データ処理開始: {len(data_list)}件のデータ")

    for i, data in enumerate(data_list):
        # インデックスが5の倍数でブレーク
        conditional_breakpoint(
            condition=(i % 5 == 0 and i > 0),
            message=f"5の倍数のインデックス: {i}"
        )

        # データが閾値を超えた場合にブレーク
        conditional_breakpoint(
            condition=(data > threshold),
            message=f"閾値超過: data[{i}] = {data} > {threshold}"
        )

        # 特定の値の場合にブレーク
        conditional_breakpoint(
            condition=(data == 999),
            message=f"特殊値検出: data[{i}] = {data}"
        )

        # データ処理
        try:
            if data < 0:
                logger.warning(f"負の値 detected: data[{i}] = {data}")
                processed = abs(data)
            else:
                processed = data * 2

            results.append(processed)

        except Exception as e:
            logger.error(f"データ処理エラー at index {i}: {e}")
            conditional_breakpoint(
                condition=True,
                message=f"エラー発生: {e}"
            )

    logger.info(f"データ処理完了: {len(results)}件の結果")
    return results

# テスト用のロギング設定
logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(message)s')

# テストデータ
print("=== 問題4テスト ===")
test_data = [10, 25, 150, 30, 45, 999, 80, -5, 120, 60, 200]

print("テストデータ:", test_data)
print("注: 実際のデバッグセッションではpdbコマンドを使用してください")
print("    n(ext) - 次の行, c(ontinue) - 続行, p  - 変数表示, q(uit) - 終了")

# 条件付きブレークポイントを一時無効化して実行
print("\nブレークポイント無効で実行:")
results = process_data_list(test_data)
print("処理結果:", results)

問題5: 例外の詳細ロギング

import logging
import traceback
import sys

def setup_detailed_logging():
    """詳細な例外ロギング用の設定"""
    logger = logging.getLogger('detailed_exception')
    logger.setLevel(logging.DEBUG)

    # 詳細なフォーマット
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d\n%(message)s'
    )

    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(formatter)
    logger.addHandler(handler)

    return logger

def log_exception_details(func):
    """
    例外の詳細情報をログに記録するデコレータ
    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        logger = setup_detailed_logging()

        try:
            return func(*args, **kwargs)
        except Exception as e:
            # 基本例外情報
            logger.error("=" * 50)
            logger.error("🐛 例外が発生しました")
            logger.error("=" * 50)

            # 例外の基本情報
            logger.error(f"例外タイプ: {type(e).__name__}")
            logger.error(f"例外メッセージ: {e}")

            # 詳細なスタックトレース
            logger.error("\nスタックトレース:")
            logger.error(traceback.format_exc())

            # 関数の引数情報
            logger.error(f"\n関数引数:")
            logger.error(f"  位置引数: {args}")
            logger.error(f"  キーワード引数: {kwargs}")

            # ローカル変数の状態(可能な範囲で)
            try:
                frame = sys.exc_info()[2].tb_frame.f_back
                locals_info = frame.f_locals

                logger.error(f"\nローカル変数状態:")
                for var_name, var_value in list(locals_info.items())[:10]:  # 最初の10個のみ
                    logger.error(f"  {var_name} = {repr(var_value)}")

            except Exception as frame_error:
                logger.error(f"ローカル変数の取得に失敗: {frame_error}")

            logger.error("=" * 50)
            raise

    return wrapper

# テスト関数
@log_exception_details
def risky_operation(data_list, divisor=2):
    """
    リスクのある操作を行う関数
    """
    results = []

    for i, item in enumerate(data_list):
        # 様々な例外が発生する可能性のある操作
        processed = item / divisor
        results.append(processed)

        # リスト外アクセスのシミュレーション
        if i == len(data_list) - 1:
            next_item = data_list[i + 1]  # 意図的なIndexError

    return results

@log_exception_details  
def complex_data_processing(data):
    """
    複雑なデータ処理関数
    """
    if not data:
        raise ValueError("データが空です")

    if isinstance(data, str):
        # 文字列を数値に変換しようとして失敗
        return int(data)

    return sum(data) / len(data)

# テスト
print("=== 問題5テスト ===")

print("1. ZeroDivisionError テスト:")
try:
    risky_operation([10, 20, 30], 0)
except Exception:
    pass

print("\n2. IndexError テスト:")
try:
    risky_operation([10, 20])  # 意図的にIndexErrorを発生
except Exception:
    pass

print("\n3. ValueError テスト:")
try:
    complex_data_processing("not_a_number")
except Exception:
    pass

print("\n4. 正常系テスト:")
try:
    result = complex_data_processing([1, 2, 3, 4, 5])
    print(f"正常系結果: {result}")
except Exception as e:
    print(f"予期せぬエラー: {e}")

問題6: ログローテーションの設定

import logging
from logging.handlers import RotatingFileHandler
import time
import os

def setup_rotating_logs():
    """
    ログローテーションの設定
    ファイルサイズ1MBでローテーション、最大3ファイル保持
    """
    # ログディレクトリの作成
    log_dir = "rotating_logs"
    os.makedirs(log_dir, exist_ok=True)

    # メインロガーの設定
    logger = logging.getLogger('rotating_app')
    logger.setLevel(logging.DEBUG)

    # フォーマッター
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )

    # ローテーティングファイルハンドラー (1MBごと、3ファイル保持)
    log_file = os.path.join(log_dir, 'application.log')
    rotating_handler = RotatingFileHandler(
        log_file,
        maxBytes=1*1024*1024,  # 1MB
        backupCount=3,
        encoding='utf-8'
    )
    rotating_handler.setLevel(logging.DEBUG)
    rotating_handler.setFormatter(formatter)

    # コンソールハンドラー
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    console_handler.setFormatter(formatter)

    # ハンドラーの追加
    logger.addHandler(rotating_handler)
    logger.addHandler(console_handler)

    return logger, log_dir

def generate_log_messages(logger, num_messages=1000, message_size=1024):
    """
    テスト用のログメッセージを生成
    """
    import random
    import string

    log_levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR]

    for i in range(num_messages):
        # ランダムなメッセージを生成
        message = ''.join(
            random.choices(string.ascii_letters + string.digits, k=message_size)
        )

        level = random.choice(log_levels)
        logger.log(level, f"メッセージ {i+1}: {message}")

        # 進捗表示
        if (i + 1) % 100 == 0:
            print(f"生成済みメッセージ: {i + 1}/{num_messages}")

        # 少し待機して実際の時間経過をシミュレート
        time.sleep(0.01)

def check_log_files(log_dir):
    """ログファイルの状態をチェック"""
    print("\nログファイル状態:")
    files = sorted([f for f in os.listdir(log_dir) if f.startswith('application.')])

    for filename in files:
        filepath = os.path.join(log_dir, filename)
        size = os.path.getsize(filepath)
        print(f"  {filename}: {size:,} bytes ({size/1024/1024:.2f} MB)")

# テスト
print("=== 問題6テスト ===")
logger, log_dir = setup_rotating_logs()

print("ログローテーション設定完了")
print("1MBを超えると新しいファイルが作成され、最大3ファイル保持されます")

# 少量のテストメッセージで動作確認
print("\n少量のテストメッセージを生成中...")
for i in range(10):
    logger.info(f"テストメッセージ {i+1}")
    logger.debug(f"詳細デバッグ情報 {i+1}")

check_log_files(log_dir)

print("\n大きなログファイルを生成する場合は以下のコードを実行してください:")
print("# generate_log_messages(logger, 1000, 2048)  # 2KBのメッセージを1000件")
print("# check_log_files(log_dir)")

問題7: コンテキスト情報付きロギング

import logging
from functools import wraps
import time

def log_function_call(level=logging.INFO):
    """
    関数呼び出しを詳細にログに記録するデコレータ
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            logger = logging.getLogger(func.__module__)

            # 関数開始のログ
            start_time = time.time()
            logger.log(level, f"🚀 関数開始: {func.__name__}")
            logger.log(level, f"   位置引数: {args}")
            logger.log(level, f"   キーワード引数: {kwargs}")

            try:
                result = func(*args, **kwargs)
                execution_time = time.time() - start_time

                # 関数正常終了のログ
                logger.log(level, f"✅ 関数正常終了: {func.__name__}")
                logger.log(level, f"   実行時間: {execution_time:.3f}秒")
                logger.log(level, f"   戻り値: {result}")

                return result

            except Exception as e:
                execution_time = time.time() - start_time

                # 関数異常終了のログ
                logger.error(f"❌ 関数異常終了: {func.__name__}")
                logger.error(f"   実行時間: {execution_time:.3f}秒")
                logger.error(f"   例外: {type(e).__name__}: {e}")
                logger.error(f"   引数: args={args}, kwargs={kwargs}")

                raise

        return wrapper
    return decorator

# ロギング設定
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler('function_calls.log', encoding='utf-8')
    ]
)

# デコレータを使用した関数例
@log_function_call(level=logging.INFO)
def calculate_statistics(numbers, method='mean'):
    """
    統計量を計算する関数
    """
    if not numbers:
        raise ValueError("数値リストが空です")

    if method == 'mean':
        result = sum(numbers) / len(numbers)
    elif method == 'median':
        sorted_nums = sorted(numbers)
        n = len(sorted_nums)
        if n % 2 == 0:
            result = (sorted_nums[n//2 - 1] + sorted_nums[n//2]) / 2
        else:
            result = sorted_nums[n//2]
    else:
        raise ValueError(f"未知のメソッド: {method}")

    return result

@log_function_call(level=logging.DEBUG)
def process_user_data(user_id, data, options=None):
    """
    ユーザーデータを処理する関数
    """
    if options is None:
        options = {}

    # データ処理のシミュレーション
    time.sleep(0.1)

    if not user_id:
        raise ValueError("ユーザーIDが指定されていません")

    processed_data = {
        'user_id': user_id,
        'processed_at': time.time(),
        'data_length': len(data),
        **options
    }

    return processed_data

# テスト
print("=== 問題7テスト ===")

print("1. 正常系テスト:")
try:
    stats = calculate_statistics([1, 2, 3, 4, 5], 'mean')
    print(f"平均: {stats}")

    stats2 = calculate_statistics([1, 2, 3, 4, 5, 6], 'median')
    print(f"中央値: {stats2}")
except Exception as e:
    print(f"エラー: {e}")

print("\n2. ユーザーデータ処理テスト:")
try:
    user_data = process_user_data(
        "user123", 
        {"name": "Alice", "age": 30},
        {"verbose": True, "validate": True}
    )
    print(f"処理結果: {user_data}")
except Exception as e:
    print(f"エラー: {e}")

print("\n3. 例外発生テスト:")
try:
    calculate_statistics([], 'mean')
except Exception as e:
    print(f"期待通りのエラー: {e}")

print("\nログファイル 'function_calls.log' に関数呼び出しの詳細が記録されています")

問題8: パフォーマンスモニタリング

import logging
import time
from functools import wraps
from collections import defaultdict, deque
import statistics

class PerformanceMonitor:
    """
    パフォーマンス監視クラス
    複数関数の実行時間を計測し統計情報を提供
    """

    def __init__(self, max_records=100):
        self.stats = defaultdict(lambda: deque(maxlen=max_records))
        self.logger = logging.getLogger('performance')

    def monitor(self, func):
        """
        関数の実行時間を監視するデコレータ
        """
        @wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            try:
                result = func(*args, **kwargs)
                return result
            finally:
                end_time = time.time()
                execution_time = end_time - start_time
                self.stats[func.__name__].append(execution_time)

        return wrapper

    def log_statistics(self, level=logging.INFO):
        """
        統計情報をログに出力
        """
        if not self.stats:
            self.logger.warning("計測データがありません")
            return

        self.logger.log(level, "=" * 50)
        self.logger.log(level, "📊 パフォーマンス統計レポート")
        self.logger.log(level, "=" * 50)

        for func_name, times in self.stats.items():
            if times:
                avg_time = statistics.mean(times)
                max_time = max(times)
                min_time = min(times)
                count = len(times)

                self.logger.log(level, f"関数: {func_name}")
                self.logger.log(level, f"  実行回数: {count}")
                self.logger.log(level, f"  平均時間: {avg_time:.4f}秒")
                self.logger.log(level, f"  最大時間: {max_time:.4f}秒")
                self.logger.log(level, f"  最小時間: {min_time:.4f}秒")
                self.logger.log(level, f"  合計時間: {sum(times):.4f}秒")

                # パフォーマンス警告
                if avg_time > 1.0:
                    self.logger.warning(f"  ⚠️  平均実行時間が長いです")
                if max_time > 5.0:
                    self.logger.error(f"  🚨 最大実行時間が非常に長いです")

                self.logger.log(level, "-" * 30)

    def get_function_stats(self, func_name):
        """
        特定関数の統計情報を取得
        """
        if func_name not in self.stats or not self.stats[func_name]:
            return None

        times = self.stats[func_name]
        return {
            'count': len(times),
            'average': statistics.mean(times),
            'max': max(times),
            'min': min(times),
            'total': sum(times),
            'all_times': list(times)
        }

    def clear_stats(self, func_name=None):
        """
        統計情報をクリア
        """
        if func_name:
            if func_name in self.stats:
                self.stats[func_name].clear()
        else:
            self.stats.clear()

# ロギング設定
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# パフォーマンスモニターのインスタンス作成
monitor = PerformanceMonitor()

# 監視対象関数
@monitor.monitor
def fast_operation():
    """速い操作"""
    time.sleep(0.01)
    return "fast done"

@monitor.monitor
def medium_operation():
    """中程度の操作"""
    time.sleep(0.1)
    return "medium done"

@monitor.monitor
def slow_operation():
    """遅い操作"""
    time.sleep(0.5)
    return "slow done"

@monitor.monitor
def variable_operation(complexity):
    """可変的な操作"""
    time.sleep(complexity * 0.1)
    return f"variable done with complexity {complexity}"

@monitor.monitor
def database_query_simulation(query_type):
    """データベースクエリのシミュレーション"""
    if query_type == "simple":
        time.sleep(0.05)
    elif query_type == "complex":
        time.sleep(0.3)
    elif query_type == "aggregate":
        time.sleep(1.0)
    else:
        time.sleep(0.1)

    return f"query {query_type} completed"

# テスト
print("=== 問題8テスト ===")

# 様々な操作を実行
for i in range(5):
    fast_operation()
    medium_operation()
    slow_operation()
    variable_operation(i % 3 + 1)
    database_query_simulation(["simple", "complex", "aggregate"][i % 3])

print("パフォーマンス統計を出力します...")
monitor.log_statistics()

print("\n個別関数の統計:")
func_names = ['fast_operation', 'slow_operation', 'database_query_simulation']
for func_name in func_names:
    stats = monitor.get_function_stats(func_name)
    if stats:
        print(f"{func_name}: {stats['count']}回実行, 平均{stats['average']:.3f}秒")

print("\n継続的な監視が可能です。追加の関数実行後にも統計を出力できます")

問題9: 構造化ログ出力

import logging
import json
from datetime import datetime
import sys

class StructuredFormatter(logging.Formatter):
    """
    JSON形式で構造化ログを出力するフォーマッタ
    """

    def format(self, record):
        """
        ログレコードをJSON形式に変換
        """
        # 基本ログ情報
        log_entry = {
            'timestamp': datetime.fromtimestamp(record.created).isoformat(),
            'level': record.levelname,
            'logger': record.name,
            'message': record.getMessage(),
            'module': record.module,
            'function': record.funcName,
            'line': record.lineno
        }

        # 例外情報がある場合
        if record.exc_info:
            log_entry['exception'] = {
                'type': record.exc_info[0].__name__,
                'message': str(record.exc_info[1]),
                'stack_trace': self.formatException(record.exc_info)
            }

        # extraパラメータから追加フィールドを取得
        if hasattr(record, 'extra_data'):
            if isinstance(record.extra_data, dict):
                log_entry.update(record.extra_data)

        # 特定のextraフィールドを個別に処理
        extra_fields = ['user_id', 'session_id', 'request_id', 'correlation_id']
        for field in extra_fields:
            if hasattr(record, field):
                log_entry[field] = getattr(record, field)

        return json.dumps(log_entry, ensure_ascii=False, indent=2)

def setup_structured_logging():
    """
    構造化ロギングの設定
    """
    logger = logging.getLogger('structured')
    logger.setLevel(logging.DEBUG)

    # JSON形式のハンドラー
    json_handler = logging.StreamHandler(sys.stdout)
    json_formatter = StructuredFormatter()
    json_handler.setFormatter(json_formatter)

    # 人間が読みやすい形式のハンドラー(開発用)
    dev_handler = logging.StreamHandler(sys.stderr)
    dev_formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    dev_handler.setFormatter(dev_formatter)
    dev_handler.setLevel(logging.INFO)  # 開発時はINFO以上のみ

    logger.addHandler(json_handler)
    logger.addHandler(dev_handler)

    return logger

class StructuredLogger:
    """
    構造化ロギングを簡単に行うためのラッパークラス
    """

    def __init__(self, name='structured'):
        self.logger = logging.getLogger(name)

    def _log_with_context(self, level, message, **context):
        """
        コンテキスト情報付きでログを出力
        """
        extra_data = {'extra_data': context}
        self.logger.log(level, message, extra=extra_data)

    def debug(self, message, **context):
        self._log_with_context(logging.DEBUG, message, **context)

    def info(self, message, **context):
        self._log_with_context(logging.INFO, message, **context)

    def warning(self, message, **context):
        self._log_with_context(logging.WARNING, message, **context)

    def error(self, message, **context):
        self._log_with_context(logging.ERROR, message, **context)

    def critical(self, message, **context):
        self._log_with_context(logging.CRITICAL, message, **context)

    def log_event(self, event_type, **event_data):
        """
        イベントログを出力
        """
        self.info(f"イベント: {event_type}", event_type=event_type, **event_data)

# テスト
print("=== 問題9テスト ===")
structured_logger = StructuredLogger()

print("1. 基本的な構造化ログ:")
structured_logger.info("アプリケーション起動", component="main", version="1.0.0")

print("\n2. ユーザーアクションのログ:")
structured_logger.info(
    "ユーザーログイン",
    user_id="user123",
    action="login",
    ip_address="192.168.1.100",
    user_agent="Mozilla/5.0..."
)

print("\n3. ビジネスイベントのログ:")
structured_logger.log_event(
    "order_created",
    order_id="ORD-001",
    customer_id="CUST-123",
    amount=15000,
    currency="JPY",
    items=[{"product": "商品A", "quantity": 2}]
)

print("\n4. エラー状況のログ:")
try:
    # 意図的にエラーを発生させる
    result = 10 / 0
except Exception as e:
    structured_logger.error(
        "除算エラーが発生",
        operation="calculate_total",
        dividend=10,
        divisor=0,
        error_type=type(e).__name__
    )

print("\n5. パフォーマンスメトリクスのログ:")
structured_logger.info(
    "APIレスポンス時間",
    endpoint="/api/users",
    method="GET",
    response_time_ms=45.2,
    status_code=200,
    database_queries=3
)

print("\n6. システム状態のログ:")
structured_logger.debug(
    "メモリ使用量",
    memory_used_mb=256,
    memory_total_mb=1024,
    memory_percent=25.0,
    active_threads=8
)

上級問題

問題10: 分散システムでのトレーシング

import logging
import uuid
import threading
from contextvars import ContextVar
from functools import wraps

# トレースコンテキストの管理
trace_id_var = ContextVar('trace_id', default=None)
span_id_var = ContextVar('span_id', default=None)

class TraceContext:
    """
    分散トレーシングのコンテキスト管理
    """

    def __init__(self):
        self.local = threading.local()

    def generate_trace_id(self):
        """トレースIDを生成"""
        return f"trace_{uuid.uuid4().hex[:16]}"

    def generate_span_id(self):
        """スパンIDを生成"""
        return f"span_{uuid.uuid4().hex[:8]}"

    def get_current_trace(self):
        """現在のトレースコンテキストを取得"""
        return {
            'trace_id': getattr(self.local, 'trace_id', None),
            'span_id': getattr(self.local, 'span_id', None)
        }

    def set_trace_context(self, trace_id=None, span_id=None):
        """トレースコンテキストを設定"""
        if trace_id:
            self.local.trace_id = trace_id
        if span_id:
            self.local.span_id = span_id

    def clear_trace_context(self):
        """トレースコンテキストをクリア"""
        if hasattr(self.local, 'trace_id'):
            del self.local.trace_id
        if hasattr(self.local, 'span_id'):
            del self.local.span_id

class TracingLogger:
    """
    トレーシング対応ロガー
    """

    def __init__(self, name='tracing'):
        self.logger = logging.getLogger(name)
        self.trace_context = TraceContext()

        # トレーシング対応のフォーマッタを設定
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - [%(trace_id)s] [%(span_id)s] - %(message)s'
        )

        handler = logging.StreamHandler()
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)

    def _get_trace_info(self):
        """現在のトレース情報を取得"""
        trace_info = self.trace_context.get_current_trace()
        return {
            'trace_id': trace_info['trace_id'] or 'NO_TRACE',
            'span_id': trace_info['span_id'] or 'NO_SPAN'
        }

    def log(self, level, message, **kwargs):
        """トレース情報付きでログ出力"""
        extra = self._get_trace_info()
        extra.update(kwargs)
        self.logger.log(level, message, extra=extra)

    def trace_function(self, func):
        """
        関数呼び出しをトレースするデコレータ
        """
        @wraps(func)
        def wrapper(*args, **kwargs):
            # トレースコンテキストの設定
            parent_trace = self.trace_context.get_current_trace()
            trace_id = parent_trace['trace_id'] or self.trace_context.generate_trace_id()
            span_id = self.trace_context.generate_span_id()

            self.trace_context.set_trace_context(trace_id, span_id)

            try:
                self.log(
                    logging.INFO,
                    f"🚀 関数開始: {func.__name__}",
                    function=func.__name__,
                    args=str(args)[:100],  # 長すぎる場合は切り詰め
                    kwargs=str(kwargs)[:100]
                )

                result = func(*args, **kwargs)

                self.log(
                    logging.INFO,
                    f"✅ 関数終了: {func.__name__}",
                    function=func.__name__,
                    result=str(result)[:200]  # 長すぎる場合は切り詰め
                )

                return result

            except Exception as e:
                self.log(
                    logging.ERROR,
                    f"❌ 関数エラー: {func.__name__}",
                    function=func.__name__,
                    error_type=type(e).__name__,
                    error_message=str(e)
                )
                raise
            finally:
                # スパンIDのみをクリア(トレースIDは維持)
                self.trace_context.set_trace_context(span_id=None)

        return wrapper

    def start_trace(self, trace_id=None):
        """新しいトレースを開始"""
        new_trace_id = trace_id or self.trace_context.generate_trace_id()
        self.trace_context.set_trace_context(
            trace_id=new_trace_id,
            span_id=self.trace_context.generate_span_id()
        )
        self.log(
            logging.INFO,
            "🔄 トレース開始",
            operation="start_trace"
        )
        return new_trace_id

    def end_trace(self):
        """トレースを終了"""
        trace_info = self.trace_context.get_current_trace()
        self.log(
            logging.INFO,
            "🔚 トレース終了",
            operation="end_trace"
        )
        self.trace_context.clear_trace_context()
        return trace_info

# ロギング設定
logging.basicConfig(level=logging.INFO)

# トレーシングロガーの作成
tracing_logger = TracingLogger()

# トレーシング対応関数
@tracing_logger.trace_function
def process_order(order_data, user_id):
    """注文処理関数"""
    tracing_logger.log(logging.INFO, "注文処理を開始", order_id=order_data.get('id'))

    # 注文処理のシミュレーション
    validate_order(order_data)
    calculate_total(order_data)
    update_inventory(order_data)

    result = {
        'status': 'success',
        'order_id': order_data.get('id'),
        'user_id': user_id,
        'processed_at': '2024-01-01T00:00:00Z'
    }

    tracing_logger.log(logging.INFO, "注文処理完了", result=result)
    return result

@tracing_logger.trace_function
def validate_order(order_data):
    """注文バリデーション"""
    if not order_data.get('items'):
        raise ValueError("注文に商品が含まれていません")

    tracing_logger.log(logging.DEBUG, "注文バリデーション成功")
    return True

@tracing_logger.trace_function
def calculate_total(order_data):
    """合計金額計算"""
    items = order_data.get('items', [])
    total = sum(item.get('price', 0) * item.get('quantity', 0) for item in items)

    tracing_logger.log(
        logging.INFO,
        "合計金額計算",
        item_count=len(items),
        total_amount=total
    )

    order_data['total'] = total
    return total

@tracing_logger.trace_function
def update_inventory(order_data):
    """在庫更新"""
    items = order_data.get('items', [])

    for item in items:
        tracing_logger.log(
            logging.DEBUG,
            "在庫更新",
            product_id=item.get('product_id'),
            quantity=item.get('quantity')
        )

    tracing_logger.log(logging.INFO, "在庫更新完了", updated_items=len(items))

# テスト
print("=== 問題10テスト ===")

# トレースを開始して注文処理を実行
trace_id = tracing_logger.start_trace()

try:
    order_data = {
        'id': 'ORDER-001',
        'user_id': 'USER-123',
        'items': [
            {'product_id': 'PROD-001', 'name': '商品A', 'price': 1000, 'quantity': 2},
            {'product_id': 'PROD-002', 'name': '商品B', 'price': 500, 'quantity': 1}
        ]
    }

    result = process_order(order_data, 'USER-123')
    print(f"注文処理結果: {result}")

finally:
    # トレースを終了
    trace_info = tracing_logger.end_trace()
    print(f"トレース情報: {trace_info}")

print("\n同じトレースIDで複数の操作を関連付けることができます")

問題11: 動的ログレベル制御

import logging
import logging.config
import threading
import time
import signal
import json
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class DynamicLogLevelManager:
    """
    動的ログレベル管理クラス
    実行中にログレベルを変更可能
    """

    def __init__(self, config_file='logging_config.json'):
        self.config_file = config_file
        self.observer = None
        self.loggers = {}

        # 初期設定の読み込み
        self.load_config()

        # シグナルハンドラの設定
        self.setup_signal_handlers()

    def load_config(self):
        """設定ファイルからログ設定を読み込み"""
        if os.path.exists(self.config_file):
            try:
                with open(self.config_file, 'r', encoding='utf-8') as f:
                    config = json.load(f)
                logging.config.dictConfig(config)
                print(f"✅ ログ設定を読み込みました: {self.config_file}")
                return True
            except Exception as e:
                print(f"❌ 設定ファイルの読み込みに失敗: {e}")
                self.setup_default_logging()
                return False
        else:
            print(f"⚠️ 設定ファイルが存在しません: {self.config_file}")
            self.setup_default_logging()
            self.create_default_config()
            return False

    def setup_default_logging(self):
        """デフォルトのログ設定"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )

    def create_default_config(self):
        """デフォルト設定ファイルを作成"""
        default_config = {
            'version': 1,
            'disable_existing_loggers': False,
            'formatters': {
                'standard': {
                    'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
                }
            },
            'handlers': {
                'console': {
                    'class': 'logging.StreamHandler',
                    'level': 'INFO',
                    'formatter': 'standard',
                    'stream': 'ext://sys.stdout'
                },
                'file': {
                    'class': 'logging.FileHandler',
                    'level': 'DEBUG',
                    'formatter': 'standard',
                    'filename': 'application.log',
                    'encoding': 'utf-8'
                }
            },
            'loggers': {
                '': {  # ルートロガー
                    'handlers': ['console', 'file'],
                    'level': 'DEBUG',
                    'propagate': True
                },
                'security': {
                    'handlers': ['file'],
                    'level': 'INFO',
                    'propagate': False
                },
                'performance': {
                    'handlers': ['console'],
                    'level': 'WARNING',
                    'propagate': False
                }
            }
        }

        try:
            with open(self.config_file, 'w', encoding='utf-8') as f:
                json.dump(default_config, f, indent=2, ensure_ascii=False)
            print(f"✅ デフォルト設定ファイルを作成: {self.config_file}")
        except Exception as e:
            print(f"❌ 設定ファイルの作成に失敗: {e}")

    def change_log_level(self, logger_name, new_level):
        """
        特定ロガーのログレベルを変更
        """
        try:
            if logger_name == 'root':
                logger = logging.getLogger()
            else:
                logger = logging.getLogger(logger_name)

            # ログレベルの文字列をレベル値に変換
            if isinstance(new_level, str):
                level_value = getattr(logging, new_level.upper())
            else:
                level_value = new_level

            logger.setLevel(level_value)

            # ハンドラーのレベルも更新
            for handler in logger.handlers:
                handler.setLevel(level_value)

            print(f"✅ ロガー '{logger_name}' のレベルを {new_level} に変更")
            return True

        except Exception as e:
            print(f"❌ ログレベル変更失敗: {e}")
            return False

    def get_current_levels(self):
        """現在のログレベルを取得"""
        levels = {}
        loggers = [logging.getLogger()]  # ルートロガー
        loggers.extend(logging.Logger.manager.loggerDict.values())

        for logger in loggers:
            if isinstance(logger, logging.Logger):
                levels[logger.name] = logging.getLevelName(logger.getEffectiveLevel())

        return levels

    def setup_signal_handlers(self):
        """シグナルハンドラを設定"""
        def handle_reload_signal(signum, frame):
            print(f"\n🔄 シグナル {signum} を受信: ログ設定を再読み込み")
            self.load_config()

        def handle_level_change_signal(signum, frame):
            print(f"\n📊 現在のログレベル:")
            levels = self.get_current_levels()
            for logger_name, level in sorted(levels.items()):
                if logger_name:  # 空でないロガー名のみ表示
                    print(f"  {logger_name}: {level}")

        # シグナルハンドラを登録
        signal.signal(signal.SIGUSR1, handle_reload_signal)    # 設定再読み込み
        signal.signal(signal.SIGUSR2, handle_level_change_signal)  # レベル表示

    def start_config_watcher(self):
        """設定ファイルの変更監視を開始"""
        class ConfigFileHandler(FileSystemEventHandler):
            def __init__(self, manager):
                self.manager = manager

            def on_modified(self, event):
                if event.src_path.endswith(self.manager.config_file):
                    print(f"\n📁 設定ファイルが変更されました: {event.src_path}")
                    self.manager.load_config()

        event_handler = ConfigFileHandler(self)
        self.observer = Observer()
        self.observer.schedule(event_handler, path='.', recursive=False)
        self.observer.start()
        print(f"👀 設定ファイルの変更監視を開始: {self.config_file}")

    def stop(self):
        """監視を停止"""
        if self.observer:
            self.observer.stop()
            self.observer.join()

# テストアプリケーション
class TestApplication:
    """テスト用アプリケーション"""

    def __init__(self):
        self.log_manager = DynamicLogLevelManager()
        self.setup_loggers()

        # 設定ファイル監視を開始
        self.log_manager.start_config_watcher()

    def setup_loggers(self):
        """アプリケーションロガーを設定"""
        self.main_logger = logging.getLogger('main')
        self.security_logger = logging.getLogger('security')
        self.performance_logger = logging.getLogger('performance')
        self.database_logger = logging.getLogger('database')

    def run(self):
        """アプリケーションを実行"""
        self.main_logger.info("アプリケーションを開始します")

        try:
            while True:
                # 様々なログメッセージを出力
                self.main_logger.debug("詳細なデバッグ情報")
                self.main_logger.info("通常の情報メッセージ")
                self.security_logger.warning("セキュリティ警告")
                self.performance_logger.info("パフォーマンス情報")
                self.database_logger.debug("データベースクエリ詳細")

                # エラーログのテスト
                if int(time.time()) % 10 == 0:
                    self.main_logger.error("定期的なエラーログ")

                time.sleep(2)

        except KeyboardInterrupt:
            self.main_logger.info("アプリケーションを終了します")
            self.log_manager.stop()

# テスト
print("=== 問題11テスト ===")

# 使用方法の説明
print("""
使用方法:
1. 別のターミナルで以下のコマンドを実行:

   # 設定を再読み込み
   kill -USR1 <プロセスID>

   # 現在のログレベルを表示
   kill -USR2 <プロセスID>

   # 設定ファイルを編集して自動リロードをテスト
   nano logging_config.json

2. 設定ファイル例 (logging_config.json):
   {
     "version": 1,
     "disable_existing_loggers": false,
     "formatters": {...},
     "handlers": {...},
     "loggers": {
       "main": {
         "handlers": ["console"],
         "level": "DEBUG",  # ここを変更してテスト
         "propagate": false
       }
     }
   }
""")

# アプリケーションの実行(Ctrl+Cで終了)
app = TestApplication()

# 実際の実行はコメントアウト(テスト時は有効化)
# print("アプリケーションを起動しています...")
# print("プロセスID:", os.getpid())
# app.run()

問題12: 包括的なデバッグ環境

import logging
import pdb
import time
import inspect
from functools import wraps
from contextlib import contextmanager

class ComprehensiveDebugEnvironment:
    """
    包括的なデバッグ環境
    対話的デバッグ、ロギング、パフォーマンス計測、例外処理を統合
    """

    def __init__(self, app_name="DebugApp", log_level=logging.DEBUG):
        self.app_name = app_name
        self.log_level = log_level
        self.performance_data = {}
        self.setup_logging()
        self.setup_debug_tools()

    def setup_logging(self):
        """包括的なロギング設定"""
        self.logger = logging.getLogger(self.app_name)
        self.logger.setLevel(self.log_level)

        # 既存のハンドラーをクリア
        self.logger.handlers.clear()

        # 詳細なフォーマッタ
        formatter = logging.Formatter(
            '%(asctime)s.%(msecs)03d - %(name)s - %(levelname)s - '
            '%(filename)s:%(lineno)d - %(funcName)s - %(message)s',
            datefmt='%H:%M:%S'
        )

        # コンソールハンドラー
        console_handler = logging.StreamHandler()
        console_handler.setLevel(self.log_level)
        console_handler.setFormatter(formatter)

        # ファイルハンドラー
        file_handler = logging.FileHandler(f'{self.app_name}.log', encoding='utf-8')
        file_handler.setLevel(logging.DEBUG)
        file_handler.setFormatter(formatter)

        self.logger.addHandler(console_handler)
        self.logger.addHandler(file_handler)

        self.logger.info(f"🔧 デバッグ環境を初期化: {self.app_name}")

    def setup_debug_tools(self):
        """デバッグツールの設定"""
        self.debug_enabled = True
        self.breakpoints = {}

    def debug(self, message, **context):
        """デバッグログを出力"""
        if context:
            message = f"{message} [{' '.join(f'{k}={v}' for k, v in context.items())}]"
        self.logger.debug(f"🐛 {message}")

    def info(self, message, **context):
        """情報ログを出力"""
        if context:
            message = f"{message} [{' '.join(f'{k}={v}' for k, v in context.items())}]"
        self.logger.info(f"ℹ️  {message}")

    def warning(self, message, **context):
        """警告ログを出力"""
        if context:
            message = f"{message} [{' '.join(f'{k}={v}' for k, v in context.items())}]"
        self.logger.warning(f"⚠️  {message}")

    def error(self, message, **context):
        """エラーログを出力"""
        if context:
            message = f"{message} [{' '.join(f'{k}={v}' for k, v in context.items())}]"
        self.logger.error(f"❌ {message}")

    def trace(self, func):
        """
        関数の実行をトレースするデコレータ
        """
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 関数開始情報
            frame = inspect.currentframe()
            self.debug(
                f"関数開始: {func.__name__}",
                args=args,
                kwargs=kwargs,
                caller=frame.f_back.f_code.co_name
            )

            start_time = time.time()

            try:
                result = func(*args, **kwargs)
                execution_time = time.time() - start_time

                # 関数終了情報
                self.debug(
                    f"関数終了: {func.__name__}",
                    result=result,
                    execution_time=f"{execution_time:.3f}s"
                )

                # パフォーマンスデータの記録
                self.record_performance(func.__name__, execution_time)

                return result

            except Exception as e:
                execution_time = time.time() - start_time

                # エラー情報
                self.error(
                    f"関数エラー: {func.__name__}",
                    error_type=type(e).__name__,
                    error_message=str(e),
                    execution_time=f"{execution_time:.3f}s"
                )

                # 対話的デバッグが有効な場合
                if self.debug_enabled:
                    self.interactive_debug(e, func.__name__, args, kwargs)

                raise

        return wrapper

    def record_performance(self, func_name, execution_time):
        """パフォーマンスデータを記録"""
        if func_name not in self.performance_data:
            self.performance_data[func_name] = []

        self.performance_data[func_name].append(execution_time)

        # 定期的にパフォーマンスレポートを出力
        if len(self.performance_data[func_name]) % 10 == 0:
            self.log_performance_report(func_name)

    def log_performance_report(self, func_name=None):
        """パフォーマンスレポートを出力"""
        if func_name:
            times = self.performance_data.get(func_name, [])
            if times:
                avg_time = sum(times) / len(times)
                max_time = max(times)
                self.info(
                    f"パフォーマンスレポート: {func_name}",
                    calls=len(times),
                    average_time=f"{avg_time:.3f}s",
                    max_time=f"{max_time:.3f}s"
                )
        else:
            for name, times in self.performance_data.items():
                if times:
                    avg_time = sum(times) / len(times)
                    self.debug(
                        f"パフォーマンスサマリー: {name}",
                        calls=len(times),
                        average_time=f"{avg_time:.3f}s"
                    )

    def interactive_debug(self, exception, func_name, args, kwargs):
        """
        対話的デバッグセッションを開始
        """
        if not self.debug_enabled:
            return

        self.info(
            "対話的デバッグセッションを開始",
            function=func_name,
            exception=type(exception).__name__
        )

        print(f"\n{'='*60}")
        print(f"🔧 対話的デバッグセッション")
        print(f"{'='*60}")
        print(f"関数: {func_name}")
        print(f"例外: {type(exception).__name__}: {exception}")
        print(f"引数: args={args}, kwargs={kwargs}")
        print(f"\n利用可能なコマンド:")
        print(f"  pdb - pdbデバッガを起動")
        print(f"  vars - ローカル変数を表示")
        print(f"  continue - デバッグを続行")
        print(f"  quit - プログラムを終了")
        print(f"{'='*60}")

        while True:
            try:
                command = input("\nデバッグコマンド> ").strip().lower()

                if command == 'pdb':
                    print("pdbデバッガを起動します...")
                    pdb.set_trace()
                    break

                elif command == 'vars':
                    frame = inspect.currentframe().f_back
                    print("ローカル変数:")
                    for var_name, var_value in frame.f_locals.items():
                        print(f"  {var_name} = {repr(var_value)}")

                elif command == 'continue':
                    print("デバッグを続行します")
                    break

                elif command == 'quit':
                    print("プログラムを終了します")
                    exit(1)

                else:
                    print("未知のコマンドです")

            except KeyboardInterrupt:
                print("\nデバッグセッションを終了します")
                break

    @contextmanager
    def breakpoint(self, condition=True, name="default"):
        """
        コンテキストマネージャとしてのブレークポイント
        """
        if condition and self.debug_enabled:
            self.info(f"ブレークポイントに到達: {name}")

            frame = inspect.currentframe()
            caller_frame = frame.f_back.f_back
            caller_info = f"{caller_frame.f_code.co_filename}:{caller_frame.f_lineno}"

            print(f"\n⏸️ ブレークポイント: {name}")
            print(f"場所: {caller_info}")
            print(f"コンテキスト: {caller_frame.f_code.co_name}")

            response = input("デバッガを起動しますか? (y/n): ").strip().lower()
            if response == 'y':
                pdb.set_trace()

        try:
            yield
        finally:
            pass

    def performance_monitor(self, func):
        """
        パフォーマンス監視デコレータ
        """
        @wraps(func)
        def wrapper(*args, **kwargs):
            with self.measure_performance(func.__name__):
                return func(*args, **kwargs)
        return wrapper

    @contextmanager
    def measure_performance(self, operation_name):
        """
        パフォーマンス計測コンテキストマネージャ
        """
        start_time = time.time()
        try:
            yield
        finally:
            end_time = time.time()
            execution_time = end_time - start_time
            self.record_performance(operation_name, execution_time)

            if execution_time > 1.0:
                self.warning(
                    f"処理時間が長い: {operation_name}",
                    execution_time=f"{execution_time:.3f}s"
                )

    def enable_debug(self):
        """デバッグを有効化"""
        self.debug_enabled = True
        self.logger.setLevel(logging.DEBUG)
        self.info("デバッグモードを有効化")

    def disable_debug(self):
        """デバッグを無効化"""
        self.debug_enabled = False
        self.logger.setLevel(logging.INFO)
        self.info("デバッグモードを無効化")

# テストアプリケーション
class TestApp:
    """テスト用アプリケーション"""

    def __init__(self):
        self.debug_env = ComprehensiveDebugEnvironment("TestApp")

        # デバッグ環境で関数を装飾
        self.process_data = self.debug_env.trace(self.process_data)
        self.validate_input = self.debug_env.trace(self.validate_input)
        self.complex_calculation = self.debug_env.performance_monitor(
            self.debug_env.trace(self.complex_calculation)
        )

    def process_data(self, data):
        """データ処理関数"""
        self.debug_env.debug("データ処理を開始", data_length=len(data))

        with self.debug_env.breakpoint(len(data) > 5, "large_dataset"):
            # データ検証
            self.validate_input(data)

            # 複雑な計算
            result = self.complex_calculation(data)

            self.debug_env.info("データ処理完了", result=result)
            return result

    def validate_input(self, data):
        """入力検証関数"""
        if not data:
            raise ValueError("データが空です")

        if any(x < 0 for x in data):
            self.debug_env.warning("負の値が含まれています")

        return True

    def complex_calculation(self, data):
        """複雑な計算関数"""
        total = 0
        for i, value in enumerate(data):
            # 特定の条件でブレークポイント
            with self.debug_env.breakpoint(value == 999, "special_value"):
                calculated = value ** 2 + i
                total += calculated

            # 人工的な遅延
            time.sleep(0.01)

        return total

    def run(self):
        """アプリケーションを実行"""
        self.debug_env.info("アプリケーションを開始")

        test_datasets = [
            [1, 2, 3, 4, 5],
            [10, -5, 20, 999, 30],  # 負の値と特殊値を含む
            [],  # 空のデータ(エラーを発生)
            [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]  # 大きなデータセット
        ]

        for i, data in enumerate(test_datasets):
            self.debug_env.info(f"データセット {i+1} を処理", data=data)

            try:
                result = self.process_data(data)
                self.debug_env.info(f"処理結果: {result}")

            except Exception as e:
                self.debug_env.error(f"処理に失敗: {e}")

            print("-" * 50)

        # パフォーマンスレポートを表示
        self.debug_env.log_performance_report()
        self.debug_env.info("アプリケーションを終了")

# テスト
print("=== 問題12テスト ===")
print("包括的なデバッグ環境のデモを実行します")

app = TestApp()
app.run()

print("\nデバッグ環境の機能:")
print("✅ 詳細なロギング")
print("✅ 関数トレーシング")
print("✅ パフォーマンス監視")
print("✅ 対話的デバッグ")
print("✅ 条件付きブレークポイント")
print("✅ パフォーマンスレポート")