PythonによるJSON/CSVデータの扱い
2025-11-15はじめに
Pythonを学び、基本的な文法を習得した後の次のステップとして、実際のデータを扱うスキルは非常に重要です。現代のプログラミングでは、Web APIからのデータ取得やデータ分析、システム間連携など、様々な場面でJSONやCSV形式のデータを扱う機会が多々あります。本記事では、Pythonを使ってJSONとCSVデータを効果的に処理する方法を、実践的な例を交えて詳しく解説します。
JSONデータの扱い
JSONとは
JSON(JavaScript Object Notation)は、軽量なデータ交換フォーマットです。人間にとって読み書きが容易で、マシンにとって解析や生成が簡単な形式となっています。Web APIの応答データや設定ファイル、データ保存形式など、幅広い用途で使用されています。
JSONの主なデータ型は以下の通りです。
- オブジェクト:
{"key": "value"} - 配列:
["item1", "item2"] - 文字列:
"text" - 数値:
123または12.3 - 真偽値:
trueまたはfalse - null:
null
PythonでのJSON処理基本
Pythonでは標準ライブラリのjsonモジュールを使用してJSONデータを扱います。
json.loadsでJSON文字列をPythonの辞書に変換して値を参照し、json.dumpsでPythonの辞書をJSON形式の文字列に変換して整形して出力する処理を示しています。
import json
# JSON文字列のパース(JSON → Pythonオブジェクト)
json_string = '{"name": "山田太郎", "age": 25, "hobbies": ["読書", "旅行"]}'
python_dict = json.loads(json_string)
print(python_dict["name"]) # 山田太郎
print(python_dict["hobbies"][0]) # 読書
# PythonオブジェクトのJSONシリアライズ(Pythonオブジェクト → JSON)
data = {
"company": "テック株式会社",
"employees": 150,
"is_listed": True,
"departments": ["営業", "開発", "人事"]
}
json_output = json.dumps(data, ensure_ascii=False, indent=2)
print(json_output)
ファイルからのJSON読み込み・書き込み
JSONデータをファイルから読み込んだり、ファイルに保存したりする方法も簡単です。
json.loadでJSONファイルを読み込んでPythonのオブジェクトとして扱い、その後json.dumpでPythonの辞書データを別のJSONファイルに書き込む処理を行っています。
import json
# JSONファイルの読み込み
with open('data.json', 'r', encoding='utf-8') as file:
data = json.load(file)
print(data)
# JSONファイルへの書き込み
company_data = {
"name": "サンプル企業",
"founded": 1999,
"locations": ["東京", "大阪", "福岡"]
}
with open('company.json', 'w', encoding='utf-8') as file:
json.dump(company_data, file, ensure_ascii=False, indent=2)
実践的なJSON処理例
実際のデータ処理では、複雑な構造のJSONを扱うことが多いです。ネストされたデータへのアクセス方法を学びましょう。
複雑なJSON文字列をPythonで読み込んで従業員情報を取り出し、全従業員の名前と部署を表示したうえで、特定のスキル(Python)を持つ従業員だけを抽出して名前を出力する処理を行っています。
import json
# 複雑なJSONデータの例
complex_json = '''
{
"company": {
"name": "テクノロジーソリューションズ",
"employees": [
{
"id": 1,
"name": "佐藤一郎",
"department": "開発部",
"skills": ["Python", "JavaScript", "SQL"]
},
{
"id": 2,
"name": "鈴木花子",
"department": "デザイン部",
"skills": ["UI/UXデザイン", "Illustrator", "Photoshop"]
}
]
}
}
'''
# データの解析
data = json.loads(complex_json)
# 従業員名の一覧を取得
employees = data["company"]["employees"]
for employee in employees:
print(f"名前: {employee['name']}, 部署: {employee['department']}")
# 特定のスキルを持つ従業員を検索
python_developers = [
emp for emp in employees
if "Python" in emp["skills"]
]
print("Python開発者:", [emp["name"] for emp in python_developers])
CSVデータの扱い
CSVとは
CSV(Comma-Separated Values)は、表形式データをテキスト形式で保存するためのファイル形式です。各行がデータの行を表し、各フィールドがカンマで区切られています。スプレッドシートソフトやデータベースとのデータ交換によく使用されます。
PythonでのCSV処理基本
Pythonでは標準ライブラリのcsvモジュールを使用してCSVデータを扱います。
CSVファイルを通常の行として読み込んで内容を表示し、さらにDictReaderを使って各行を辞書として扱いながら名前・年齢・部署を取り出して表示する処理を行っています。
import csv
# CSVファイルの読み込み
with open('employees.csv', 'r', encoding='utf-8') as file:
reader = csv.reader(file)
header = next(reader) # ヘッダー行を読み込み
print("ヘッダー:", header)
for row in reader:
print(f"行データ: {row}")
# 辞書形式でのCSV読み込み(より実践的)
with open('employees.csv', 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
print(f"名前: {row['name']}, 年齢: {row['age']}, 部署: {row['department']}")
CSVファイルへの書き込み
データをCSV形式で保存する方法も重要です。
リストを使ってCSVファイルに複数行を書き込む方法と、辞書データを使ってヘッダー付きのCSVファイルを作成する方法を示しています。
import csv
# リスト形式でのCSV書き込み
data = [
["名前", "年齢", "部署"],
["山田太郎", 28, "営業部"],
["佐藤花子", 32, "開発部"],
["鈴木一郎", 45, "人事部"]
]
with open('output_list.csv', 'w', encoding='utf-8', newline='') as file:
writer = csv.writer(file)
writer.writerows(data)
# 辞書形式でのCSV書き込み
employees = [
{"name": "山田太郎", "age": 28, "department": "営業部"},
{"name": "佐藤花子", "age": 32, "department": "開発部"},
{"name": "鈴木一郎", "age": 45, "department": "人事部"}
]
with open('output_dict.csv', 'w', encoding='utf-8', newline='') as file:
fieldnames = ["name", "age", "department"]
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(employees)
実践的なCSV処理例
実際の業務では、CSVデータに対して様々な処理を行う必要があります。
CSVファイルから部署ごとの平均年齢と人数を集計する処理と、年齢や部署の条件で従業員データを抽出して新しいCSVとして保存する処理を行っています。
import csv
from collections import defaultdict
# 部署別の平均年齢を計算
def calculate_department_stats(filename):
department_ages = defaultdict(list)
with open(filename, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
department = row['department']
age = int(row['age'])
department_ages[department].append(age)
# 結果の表示
for department, ages in department_ages.items():
avg_age = sum(ages) / len(ages)
print(f"{department}: 平均年齢 {avg_age:.1f}歳, 人数 {len(ages)}名")
# データのフィルタリングと新規CSV作成
def filter_employees(input_file, output_file, min_age=None, department=None):
with open(input_file, 'r', encoding='utf-8') as infile:
reader = csv.DictReader(infile)
filtered_rows = []
for row in reader:
# 条件に合致する行のみを抽出
age_condition = min_age is None or int(row['age']) >= min_age
dept_condition = department is None or row['department'] == department
if age_condition and dept_condition:
filtered_rows.append(row)
# フィルタリング結果を新しいCSVファイルに保存
if filtered_rows:
with open(output_file, 'w', encoding='utf-8', newline='') as outfile:
fieldnames = filtered_rows[0].keys()
writer = csv.DictWriter(outfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(filtered_rows)
print(f"フィルタリング結果を {output_file} に保存しました")
else:
print("条件に合致するデータはありませんでした")
# 使用例
calculate_department_stats('employees.csv')
filter_employees('employees.csv', 'senior_employees.csv', min_age=40)
JSONとCSVの変換
実際のプロジェクトでは、JSON形式のデータをCSVに変換したり、その逆を行ったりする必要がよくあります。
JSONからCSVへの変換
指定したJSONファイルを読み込んでその内容が辞書のリストであることを確認し、リスト内の最初の要素から取得したキーをヘッダーとして利用しながらCSVファイルに書き出すことで、JSON形式のデータをCSV形式へ変換する一連の処理を実行しています。
import json
import csv
def json_to_csv(json_file, csv_file):
# JSONファイルの読み込み
with open(json_file, 'r', encoding='utf-8') as file:
data = json.load(file)
# データがリスト形式であることを確認
if isinstance(data, list) and len(data) > 0:
# ヘッダーの取得
fieldnames = data[0].keys()
# CSVファイルへの書き込み
with open(csv_file, 'w', encoding='utf-8', newline='') as file:
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
print(f"JSONファイル {json_file} を CSVファイル {csv_file} に変換しました")
else:
print("適切なJSONデータ形式ではありません")
# 使用例
json_to_csv('employees.json', 'employees_from_json.csv')
CSVからJSONへの変換
CSVファイルを読み込んで各行を辞書として取得し、必要に応じて年齢などの数値項目を整数型に変換したうえでリストにまとめ、そのリスト全体をJSON形式に整形して指定したJSONファイルへ書き出すことで、CSVデータをJSONデータへ変換する処理を行っています。
import csv
import json
def csv_to_json(csv_file, json_file):
data = []
# CSVファイルの読み込み
with open(csv_file, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
# 数値フィールドの型変換(必要に応じて)
if 'age' in row:
row['age'] = int(row['age'])
data.append(row)
# JSONファイルへの書き込み
with open(json_file, 'w', encoding='utf-8') as file:
json.dump(data, file, ensure_ascii=False, indent=2)
print(f"CSVファイル {csv_file} を JSONファイル {json_file} に変換しました")
# 使用例
csv_to_json('employees.csv', 'employees_from_csv.json')
実践プロジェクト
次の例は、簡単な従業員管理システムになります。
EmployeeManager クラスを使って従業員データを管理する仕組みを提供しており、JSONファイルから従業員情報を読み込み、追加や一覧表示を行い、必要に応じてCSV形式でエクスポートする処理を行う一連の管理機能を実装しています。
import json
import csv
import os
class EmployeeManager:
def __init__(self, data_file='employees.json'):
self.data_file = data_file
self.employees = self.load_data()
def load_data(self):
"""データファイルから従業員データを読み込む"""
if os.path.exists(self.data_file):
with open(self.data_file, 'r', encoding='utf-8') as file:
return json.load(file)
return []
def save_data(self):
"""従業員データをファイルに保存する"""
with open(self.data_file, 'w', encoding='utf-8') as file:
json.dump(self.employees, file, ensure_ascii=False, indent=2)
def add_employee(self, name, age, department, skills):
"""新しい従業員を追加する"""
employee_id = len(self.employees) + 1
new_employee = {
"id": employee_id,
"name": name,
"age": age,
"department": department,
"skills": skills
}
self.employees.append(new_employee)
self.save_data()
print(f"従業員 {name} を追加しました")
def list_employees(self):
"""全従業員を表示する"""
for employee in self.employees:
print(f"ID: {employee['id']}, 名前: {employee['name']}, "
f"年齢: {employee['age']}, 部署: {employee['department']}")
def export_to_csv(self, csv_file):
"""従業員データをCSVにエクスポートする"""
with open(csv_file, 'w', encoding='utf-8', newline='') as file:
if self.employees:
fieldnames = self.employees[0].keys()
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(self.employees)
print(f"データを {csv_file} にエクスポートしました")
else:
print("エクスポートするデータがありません")
# 使用例
if __name__ == "__main__":
manager = EmployeeManager()
# サンプルデータの追加
manager.add_employee("山田太郎", 28, "営業部", ["コミュニケーション", "英語"])
manager.add_employee("佐藤花子", 32, "開発部", ["Python", "JavaScript", "SQL"])
# 従業員一覧の表示
print("=== 従業員一覧 ===")
manager.list_employees()
# CSVへのエクスポート
manager.export_to_csv("employees_export.csv")
実際のアプリケーションでは、エラーハンドリングが重要です。
JSONファイルを安全に読み込み、データが存在する場合にCSVファイルとして安全に書き出す処理を行い、ファイルが存在しない場合やJSON解析・書き込みでエラーが発生した場合に適切なエラーメッセージを表示する仕組みを提供しています。
import json
import csv
def safe_json_load(filepath):
"""安全なJSONファイル読み込み"""
try:
with open(filepath, 'r', encoding='utf-8') as file:
return json.load(file)
except FileNotFoundError:
print(f"エラー: ファイル {filepath} が見つかりません")
return []
except json.JSONDecodeError as e:
print(f"エラー: JSONの解析に失敗しました - {e}")
return []
def safe_csv_write(data, filepath):
"""安全なCSVファイル書き込み"""
try:
with open(filepath, 'w', encoding='utf-8', newline='') as file:
if data and isinstance(data, list):
fieldnames = data[0].keys()
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
return True
else:
print("エラー: 書き込むデータがありません")
return False
except IOError as e:
print(f"エラー: ファイル書き込みに失敗しました - {e}")
return False
# 使用例
data = safe_json_load('employees.json')
if data:
success = safe_csv_write(data, 'backup.csv')
if success:
print("バックアップが完了しました")
まとめ
本記事では、Pythonを使用したJSONとCSVデータの扱い方について詳しく解説しました。これらの形式は現代のプログラミングにおいて基本的かつ重要なデータ形式であり、習得することで以下のようなことが可能になります。
- Web APIからのデータ取得と処理
- 設定ファイルの読み書き
- データの永続化と交換
- 異なるシステム間でのデータ連携
JSONとCSVの操作は、データ分析、Web開発、自動化スクリプトなど、様々な分野で必要とされる基礎スキルです。実際のプロジェクトでは、ここで学んだ基本を組み合わせて、より複雑なデータ処理を行うことになります。
次のステップでは、Web APIからのデータ取得やデータベース操作など、より実践的なデータ処理技術を学んでいきましょう。これらのスキルを組み合わせることで、実際のアプリケーション開発に役立つ強力なデータ処理能力を身につけることができま
演習問題
初級問題(3問)
初級1: JSONデータの基本操作
以下のJSONデータをPythonで処理するコードを作成してください。
# 以下のJSONデータを文字列として定義
json_data = '''
{
"students": [
{
"id": 1,
"name": "山田太郎",
"scores": {
"math": 85,
"english": 92,
"science": 78
}
},
{
"id": 2,
"name": "佐藤花子",
"scores": {
"math": 90,
"english": 88,
"science": 95
}
}
]
}
'''
# 問題1: 全生徒の名前を表示してください
# 問題2: 佐藤花子の数学の点数を表示してください
# 問題3: 各科目の平均点を計算してください
初級2: CSVファイルの読み込みと基本処理
以下のようなCSVデータを処理するコードを作成してください。
# sales.csv(想定)
# date,product,quantity,price
# 2024-01-01,商品A,10,1500
# 2024-01-01,商品B,5,2000
# 2024-01-02,商品A,8,1500
# 2024-01-02,商品C,3,3000
# 問題1: CSVファイルを読み込み、全データを表示
# 問題2: 商品別の販売数量の合計を計算
# 問題3: 売上金額(quantity * price)の合計を計算
初級3: JSONとCSVの相互変換
以下のデータ変換を行うコードを作成してください。
# 以下のPythonリストをJSONファイルとCSVファイルの両方に保存
employees = [
{"id": 1, "name": "鈴木一郎", "department": "営業部", "salary": 500000},
{"id": 2, "name": "高橋美咲", "department": "開発部", "salary": 600000},
{"id": 3, "name": "渡辺健太", "department": "営業部", "salary": 480000}
]
# 問題1: 上記データをemployees.jsonに保存
# 問題2: 上記データをemployees.csvに保存
# 問題3: 保存したJSONファイルを読み込み、部署別の平均給与を計算
中級問題(6問)
中級1: ネストされたJSONデータの処理
# 以下の複雑なJSONデータを処理
company_data = {
"company_name": "テックソリューションズ",
"departments": [
{
"name": "開発部",
"manager": "田中宏",
"employees": [
{"name": "山本涼太", "skills": ["Python", "JavaScript"], "experience": 3},
{"name": "中村優子", "skills": ["Java", "SQL", "Python"], "experience": 5}
]
},
{
"name": "営業部",
"manager": "佐々木誠",
"employees": [
{"name": "小林健一", "skills": ["英語", "プレゼン"], "experience": 2},
{"name": "加藤由美", "skills": ["中国語", "交渉"], "experience": 4}
]
}
]
}
# 問題1: 部署名とマネージャー名の一覧を表示
# 問題2: 全従業員の名前と経験年数を表示
# 問題3: Pythonスキルを持つ従業員を検索
# 問題4: 部署別の平均経験年数を計算
中級2: CSVデータの高度な集計
# 以下の在庫データを処理(inventory.csv)
# product_id,product_name,category,current_stock,min_stock,price
# P001,ノートPC,電子機器,15,5,120000
# P002,デスク,家具,8,3,45000
# P003,椅子,家具,12,4,28000
# P004,スマートフォン,電子機器,20,8,80000
# P005,本棚,家具,3,2,35000
# 問題1: カテゴリー別の在庫数量合計を計算
# 問題2: 在庫切れ危険(current_stock <= min_stock)の商品を抽出
# 問題3: 在庫金額(current_stock * price)の合計を計算
# 問題4: カテゴリー別の平均単価を計算
中級3: JSONデータのフィルタリングと変換
# 以下の書籍データを処理
books_data = [
{
"isbn": "978-4-00-000000-1",
"title": "Python入門",
"author": "技術太郎",
"year": 2023,
"price": 2500,
"categories": ["プログラミング", "Python"],
"rating": 4.5
},
{
"isbn": "978-4-00-000000-2",
"title": "データ分析の基礎",
"author": "分析花子",
"year": 2022,
"price": 3200,
"categories": ["データ分析", "統計"],
"rating": 4.2
},
{
"isbn": "978-4-00-000000-3",
"title": "機械学習実践",
"author": "技術太郎",
"year": 2024,
"price": 3800,
"categories": ["機械学習", "Python"],
"rating": 4.8
}
]
# 問題1: 2023年以降に出版された書籍を抽出
# 問題2: 著者別の書籍数を集計
# 問題3: 価格が3000円以上の書籍を抽出
# 問題4: カテゴリーに「Python」を含む書籍を抽出
中級4: CSVデータのクリーニングと変換
# 以下の不完全な顧客データを処理(customers.csv)
# customer_id,name,email,age,city,registration_date
# 1,山田太郎,yamada@example.com,28,東京,2023-01-15
# 2,佐藤花子,,32,大阪,2023-02-20
# 3,鈴木一郎,suzuki@example.com,,名古屋,2023-03-10
# 4,,tanaka@example.com,45,福岡,2023-04-05
# 5,高橋美咲,takahashi@example.com,29,東京,2023-05-12
# 問題1: 欠損データ(空欄)のある顧客を特定
# 問題2: メールアドレスが欠けている顧客のデータを補完("no-email@example.com"を設定)
# 問題3: 年齢が欠けている顧客の平均年齢を計算して補完
# 問題4: 名前が欠けている顧客をリストから削除
中級5: JSONスキーマの検証
# 以下の商品データのスキーマを検証
def validate_product_schema(product):
"""商品データのスキーマ検証"""
required_fields = ["product_id", "name", "price", "category", "in_stock"]
# 問題1: 必須フィールドが全て存在するかチェック
# 問題2: priceが数値で0以上かチェック
# 問題3: in_stockが真偽値かチェック
# 問題4: カテゴリーが許可された値かチェック(許可値: "electronics", "books", "clothing")
pass
# テストデータ
test_products = [
{"product_id": "P001", "name": "スマートフォン", "price": 80000, "category": "electronics", "in_stock": True},
{"product_id": "P002", "name": "Python本", "price": -2500, "category": "books", "in_stock": True},
{"product_id": "P003", "name": "Tシャツ", "price": 3000, "category": "fashion", "in_stock": "yes"},
{"name": "ノートPC", "price": 120000, "category": "electronics", "in_stock": True}
]
# 各商品データを検証し、エラーメッセージを表示
中級6: 複数CSVファイルの結合処理
# 以下の2つのCSVファイルを結合
# orders.csv
# order_id,customer_id,order_date,total_amount
# 1001,1,2024-01-15,15000
# 1002,2,2024-01-16,28000
# 1003,1,2024-01-18,12000
# customers.csv
# customer_id,customer_name,email,city
# 1,山田太郎,yamada@example.com,東京
# 2,佐藤花子,sato@example.com,大阪
# 3,鈴木一郎,suzuki@example.com,名古屋
# 問題1: 2つのCSVファイルをcustomer_idで結合
# 問題2: 顧客別の注文回数を計算
# 問題3: 顧客別の総購入金額を計算
# 問題4: 都市別の平均注文金額を計算
上級問題(3問)
上級1: JSON APIレスポンスのシミュレーションと分析
import json
from datetime import datetime, timedelta
import random
def generate_sales_data():
"""模擬販売データの生成"""
# 問題1: 過去30日分の日次販売データを生成する関数を作成
# データ形式: [{"date": "2024-01-01", "product": "商品A", "quantity": 10, "revenue": 15000}, ...]
# 商品は3種類(商品A, 商品B, 商品C)、数量は1-20の乱数、単価は商品別に固定
# 問題2: 生成したデータをsales.jsonに保存
# 問題3: 以下の分析を実行
# - 商品別の総販売数量と総売上
# - 日別の総売上推移
# - 週次(7日間隔)の売上集計
# 問題4: 分析結果をレポート形式でreport.jsonに保存
pass
上級2: 大規模CSVデータの効率的な処理
# 大規模な在庫データを想定(メモリ効率を考慮した処理)
def process_large_inventory_file(input_file, output_file):
"""
大規模在庫CSVファイルを処理する関数
input_file: 入力CSVファイルパス
output_file: 出力CSVファイルパス
"""
# 問題1: 在庫切れ危険(在庫数 <= 最小在庫数)の商品を抽出して別ファイルに保存
# 問題2: カテゴリー別の在庫金額合計を計算(メモリ効率を考慮)
# 問題3: 在庫回転率の計算(仮定: 月間販売数 = 在庫数 * 0.3)
# 問題4: 処理の進捗状況を定期的に表示(例: 1000行処理ごとに進捗%を表示)
# 注意: ファイル全体をメモリに読み込まず、行ごとに処理する方法を使用
pass
# チャレンジ問題: マルチスレッドを使用した並列処理の実装
上級3: データ変換パイプラインの構築
class DataPipeline:
"""JSON/CSVデータ変換パイプライン"""
def __init__(self):
self.processors = []
def add_processor(self, processor):
self.processors.append(processor)
def process(self, input_data):
# 問題1: 登録されたプロセッサを順次適用する処理を実装
pass
# 以下のプロセッサを実装し、パイプラインを構築
class JSONLoader:
"""JSONファイル読み込みプロセッサ"""
pass
class CSVLoader:
"""CSVファイル読み込みプロセッサ"""
pass
class DataFilter:
"""データフィルタリングプロセッサ"""
pass
class DataTransformer:
"""データ変換プロセッサ"""
pass
class DataAggregator:
"""データ集計プロセッサ"""
pass
class JSONExporter:
"""JSONファイル出力プロセッサ"""
pass
class CSVExporter:
"""CSVファイル出力プロセッサ"""
pass
# 問題2: 販売データを読み込み、フィルタリング、変換、集計、出力するパイプラインを構築
# 問題3: 設定ファイル(JSON)からパイプラインの設定を読み込む機能を追加
# 問題4: 処理結果の検証とエラーレポート生成機能を追加
演習問題 解答例
初級問題 解答
初級1: JSONデータの基本操作
import json
json_data = '''
{
"students": [
{
"id": 1,
"name": "山田太郎",
"scores": {
"math": 85,
"english": 92,
"science": 78
}
},
{
"id": 2,
"name": "佐藤花子",
"scores": {
"math": 90,
"english": 88,
"science": 95
}
}
]
}
'''
# JSONデータをPythonオブジェクトに変換
data = json.loads(json_data)
# 問題1: 全生徒の名前を表示
print("=== 全生徒の名前 ===")
for student in data["students"]:
print(student["name"])
# 問題2: 佐藤花子の数学の点数を表示
print("\n=== 佐藤花子の数学の点数 ===")
for student in data["students"]:
if student["name"] == "佐藤花子":
print(f"佐藤花子の数学: {student['scores']['math']}点")
# 問題3: 各科目の平均点を計算
print("\n=== 各科目の平均点 ===")
subjects = ["math", "english", "science"]
for subject in subjects:
total = 0
count = 0
for student in data["students"]:
total += student["scores"][subject]
count += 1
average = total / count
print(f"{subject}: {average:.1f}点")
初級2: CSVファイルの読み込みと基本処理
import csv
# まずサンプルCSVファイルを作成
with open('sales.csv', 'w', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
writer.writerow(['date', 'product', 'quantity', 'price'])
writer.writerow(['2024-01-01', '商品A', 10, 1500])
writer.writerow(['2024-01-01', '商品B', 5, 2000])
writer.writerow(['2024-01-02', '商品A', 8, 1500])
writer.writerow(['2024-01-02', '商品C', 3, 3000])
# 問題1: CSVファイルを読み込み、全データを表示
print("=== 全販売データ ===")
with open('sales.csv', 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
print(f"日付: {row['date']}, 商品: {row['product']}, 数量: {row['quantity']}, 単価: {row['price']}")
# 問題2: 商品別の販売数量の合計を計算
print("\n=== 商品別販売数量合計 ===")
product_quantities = {}
with open('sales.csv', 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
product = row['product']
quantity = int(row['quantity'])
if product in product_quantities:
product_quantities[product] += quantity
else:
product_quantities[product] = quantity
for product, total_quantity in product_quantities.items():
print(f"{product}: {total_quantity}個")
# 問題3: 売上金額の合計を計算
print("\n=== 総売上金額 ===")
total_revenue = 0
with open('sales.csv', 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
revenue = int(row['quantity']) * int(row['price'])
total_revenue += revenue
print(f"総売上金額: {total_revenue}円")
初級3: JSONとCSVの相互変換
import json
import csv
employees = [
{"id": 1, "name": "鈴木一郎", "department": "営業部", "salary": 500000},
{"id": 2, "name": "高橋美咲", "department": "開発部", "salary": 600000},
{"id": 3, "name": "渡辺健太", "department": "営業部", "salary": 480000}
]
# 問題1: JSONファイルに保存
with open('employees.json', 'w', encoding='utf-8') as f:
json.dump(employees, f, ensure_ascii=False, indent=2)
print("employees.json を保存しました")
# 問題2: CSVファイルに保存
with open('employees.csv', 'w', encoding='utf-8', newline='') as f:
writer = csv.DictWriter(f, fieldnames=['id', 'name', 'department', 'salary'])
writer.writeheader()
writer.writerows(employees)
print("employees.csv を保存しました")
# 問題3: JSONファイルを読み込み、部署別の平均給与を計算
print("\n=== 部署別平均給与 ===")
with open('employees.json', 'r', encoding='utf-8') as f:
employees_data = json.load(f)
department_salaries = {}
for employee in employees_data:
dept = employee['department']
salary = employee['salary']
if dept in department_salaries:
department_salaries[dept].append(salary)
else:
department_salaries[dept] = [salary]
for dept, salaries in department_salaries.items():
avg_salary = sum(salaries) / len(salaries)
print(f"{dept}: {avg_salary:.0f}円")
中級問題 解答
中級1: ネストされたJSONデータの処理
import json
company_data = {
"company_name": "テックソリューションズ",
"departments": [
{
"name": "開発部",
"manager": "田中宏",
"employees": [
{"name": "山本涼太", "skills": ["Python", "JavaScript"], "experience": 3},
{"name": "中村優子", "skills": ["Java", "SQL", "Python"], "experience": 5}
]
},
{
"name": "営業部",
"manager": "佐々木誠",
"employees": [
{"name": "小林健一", "skills": ["英語", "プレゼン"], "experience": 2},
{"name": "加藤由美", "skills": ["中国語", "交渉"], "experience": 4}
]
}
]
}
# 問題1: 部署名とマネージャー名の一覧を表示
print("=== 部署とマネージャー ===")
for dept in company_data["departments"]:
print(f"部署: {dept['name']}, マネージャー: {dept['manager']}")
# 問題2: 全従業員の名前と経験年数を表示
print("\n=== 全従業員の経験年数 ===")
for dept in company_data["departments"]:
for employee in dept["employees"]:
print(f"名前: {employee['name']}, 経験年数: {employee['experience']}年")
# 問題3: Pythonスキルを持つ従業員を検索
print("\n=== Pythonスキル保有者 ===")
python_developers = []
for dept in company_data["departments"]:
for employee in dept["employees"]:
if "Python" in employee["skills"]:
python_developers.append(employee["name"])
print("Pythonスキル保有者:", ", ".join(python_developers))
# 問題4: 部署別の平均経験年数を計算
print("\n=== 部署別平均経験年数 ===")
for dept in company_data["departments"]:
total_experience = 0
employee_count = len(dept["employees"])
for employee in dept["employees"]:
total_experience += employee["experience"]
avg_experience = total_experience / employee_count
print(f"{dept['name']}: {avg_experience:.1f}年")
中級2: CSVデータの高度な集計
import csv
# サンプルCSVファイルを作成
with open('inventory.csv', 'w', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
writer.writerow(['product_id', 'product_name', 'category', 'current_stock', 'min_stock', 'price'])
writer.writerow(['P001', 'ノートPC', '電子機器', 15, 5, 120000])
writer.writerow(['P002', 'デスク', '家具', 8, 3, 45000])
writer.writerow(['P003', '椅子', '家具', 12, 4, 28000])
writer.writerow(['P004', 'スマートフォン', '電子機器', 20, 8, 80000])
writer.writerow(['P005', '本棚', '家具', 3, 2, 35000])
# 問題1: カテゴリー別の在庫数量合計
print("=== カテゴリー別在庫数量 ===")
category_stock = {}
with open('inventory.csv', 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
category = row['category']
stock = int(row['current_stock'])
if category in category_stock:
category_stock[category] += stock
else:
category_stock[category] = stock
for category, total_stock in category_stock.items():
print(f"{category}: {total_stock}個")
# 問題2: 在庫切れ危険の商品を抽出
print("\n=== 在庫切れ危険商品 ===")
with open('inventory.csv', 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
current_stock = int(row['current_stock'])
min_stock = int(row['min_stock'])
if current_stock <= min_stock:
print(f"商品ID: {row['product_id']}, 商品名: {row['product_name']}, 在庫: {current_stock}個")
# 問題3: 在庫金額の合計
print("\n=== 在庫金額合計 ===")
total_inventory_value = 0
with open('inventory.csv', 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
value = int(row['current_stock']) * int(row['price'])
total_inventory_value += value
print(f"総在庫金額: {total_inventory_value:,}円")
# 問題4: カテゴリー別の平均単価
print("\n=== カテゴリー別平均単価 ===")
category_prices = {}
with open('inventory.csv', 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
category = row['category']
price = int(row['price'])
if category in category_prices:
category_prices[category].append(price)
else:
category_prices[category] = [price]
for category, prices in category_prices.items():
avg_price = sum(prices) / len(prices)
print(f"{category}: {avg_price:,.0f}円")
中級3: JSONデータのフィルタリングと変換
import json
books_data = [
{
"isbn": "978-4-00-000000-1",
"title": "Python入門",
"author": "技術太郎",
"year": 2023,
"price": 2500,
"categories": ["プログラミング", "Python"],
"rating": 4.5
},
{
"isbn": "978-4-00-000000-2",
"title": "データ分析の基礎",
"author": "分析花子",
"year": 2022,
"price": 3200,
"categories": ["データ分析", "統計"],
"rating": 4.2
},
{
"isbn": "978-4-00-000000-3",
"title": "機械学習実践",
"author": "技術太郎",
"year": 2024,
"price": 3800,
"categories": ["機械学習", "Python"],
"rating": 4.8
}
]
# 問題1: 2023年以降に出版された書籍を抽出
print("=== 2023年以降の書籍 ===")
recent_books = [book for book in books_data if book["year"] >= 2023]
for book in recent_books:
print(f"タイトル: {book['title']}, 出版年: {book['year']}")
# 問題2: 著者別の書籍数を集計
print("\n=== 著者別書籍数 ===")
author_counts = {}
for book in books_data:
author = book["author"]
if author in author_counts:
author_counts[author] += 1
else:
author_counts[author] = 1
for author, count in author_counts.items():
print(f"{author}: {count}冊")
# 問題3: 価格が3000円以上の書籍を抽出
print("\n=== 3000円以上の書籍 ===")
expensive_books = [book for book in books_data if book["price"] >= 3000]
for book in expensive_books:
print(f"タイトル: {book['title']}, 価格: {book['price']}円")
# 問題4: カテゴリーに「Python」を含む書籍を抽出
print("\n=== Python関連書籍 ===")
python_books = []
for book in books_data:
if "Python" in book["categories"]:
python_books.append(book)
for book in python_books:
print(f"タイトル: {book['title']}, カテゴリー: {', '.join(book['categories'])}")
中級4: CSVデータのクリーニングと変換
import csv
# サンプルCSVファイルを作成
with open('customers.csv', 'w', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
writer.writerow(['customer_id', 'name', 'email', 'age', 'city', 'registration_date'])
writer.writerow(['1', '山田太郎', 'yamada@example.com', '28', '東京', '2023-01-15'])
writer.writerow(['2', '佐藤花子', '', '32', '大阪', '2023-02-20'])
writer.writerow(['3', '鈴木一郎', 'suzuki@example.com', '', '名古屋', '2023-03-10'])
writer.writerow(['4', '', 'tanaka@example.com', '45', '福岡', '2023-04-05'])
writer.writerow(['5', '高橋美咲', 'takahashi@example.com', '29', '東京', '2023-05-12'])
# 問題1: 欠損データのある顧客を特定
print("=== 欠損データのある顧客 ===")
with open('customers.csv', 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
missing_fields = []
for field, value in row.items():
if not value:
missing_fields.append(field)
if missing_fields:
print(f"顧客ID {row['customer_id']}: 欠損フィールド {missing_fields}")
# 問題2-4: データクリーニング
cleaned_data = []
ages = []
# まず年齢の平均を計算するためにデータを収集
with open('customers.csv', 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
if row['age']:
ages.append(int(row['age']))
average_age = sum(ages) / len(ages) if ages else 0
# データクリーニングを実行
with open('customers.csv', 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
# 問題2: メールアドレスの補完
if not row['email']:
row['email'] = 'no-email@example.com'
# 問題3: 年齢の補完
if not row['age']:
row['age'] = str(int(average_age))
# 問題4: 名前があるデータのみ保持
if row['name']:
cleaned_data.append(row)
# クリーニング結果を表示
print("\n=== クリーニング後のデータ ===")
for customer in cleaned_data:
print(f"ID: {customer['customer_id']}, 名前: {customer['name']}, メール: {customer['email']}, 年齢: {customer['age']}")
# クリーニングデータを保存
with open('customers_cleaned.csv', 'w', encoding='utf-8', newline='') as f:
writer = csv.DictWriter(f, fieldnames=cleaned_data[0].keys())
writer.writeheader()
writer.writerows(cleaned_data)
中級5: JSONスキーマの検証
import json
def validate_product_schema(product):
"""商品データのスキーマ検証"""
errors = []
# 問題1: 必須フィールドが全て存在するかチェック
required_fields = ["product_id", "name", "price", "category", "in_stock"]
for field in required_fields:
if field not in product:
errors.append(f"必須フィールド '{field}' がありません")
# 問題2: priceが数値で0以上かチェック
if "price" in product:
if not isinstance(product["price"], (int, float)):
errors.append("priceは数値である必要があります")
elif product["price"] < 0:
errors.append("priceは0以上である必要があります")
# 問題3: in_stockが真偽値かチェック
if "in_stock" in product:
if not isinstance(product["in_stock"], bool):
errors.append("in_stockは真偽値である必要があります")
# 問題4: カテゴリーが許可された値かチェック
allowed_categories = ["electronics", "books", "clothing"]
if "category" in product:
if product["category"] not in allowed_categories:
errors.append(f"カテゴリーは {allowed_categories} のいずれかである必要があります")
return errors
# テストデータ
test_products = [
{"product_id": "P001", "name": "スマートフォン", "price": 80000, "category": "electronics", "in_stock": True},
{"product_id": "P002", "name": "Python本", "price": -2500, "category": "books", "in_stock": True},
{"product_id": "P003", "name": "Tシャツ", "price": 3000, "category": "fashion", "in_stock": "yes"},
{"name": "ノートPC", "price": 120000, "category": "electronics", "in_stock": True}
]
# 各商品データを検証
print("=== 商品データ検証結果 ===")
for i, product in enumerate(test_products, 1):
print(f"\n商品{i}:")
errors = validate_product_schema(product)
if errors:
for error in errors:
print(f" ✗ {error}")
else:
print(" ✓ 検証合格")
中級6: 複数CSVファイルの結合処理
import csv
# 注文データCSVを作成
with open('orders.csv', 'w', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
writer.writerow(['order_id', 'customer_id', 'order_date', 'total_amount'])
writer.writerow(['1001', '1', '2024-01-15', '15000'])
writer.writerow(['1002', '2', '2024-01-16', '28000'])
writer.writerow(['1003', '1', '2024-01-18', '12000'])
# 顧客データCSVを作成
with open('customers_join.csv', 'w', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
writer.writerow(['customer_id', 'customer_name', 'email', 'city'])
writer.writerow(['1', '山田太郎', 'yamada@example.com', '東京'])
writer.writerow(['2', '佐藤花子', 'sato@example.com', '大阪'])
writer.writerow(['3', '鈴木一郎', 'suzuki@example.com', '名古屋'])
# 問題1: 2つのCSVファイルをcustomer_idで結合
print("=== 結合データ ===")
joined_data = []
# 顧客データを読み込んで辞書に格納
customers = {}
with open('customers_join.csv', 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
customers[row['customer_id']] = row
# 注文データと結合
with open('orders.csv', 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for order in reader:
customer_id = order['customer_id']
if customer_id in customers:
joined_row = {**customers[customer_id], **order}
joined_data.append(joined_row)
print(f"注文ID: {order['order_id']}, 顧客名: {customers[customer_id]['customer_name']}, 金額: {order['total_amount']}円")
# 問題2: 顧客別の注文回数を計算
print("\n=== 顧客別注文回数 ===")
order_counts = {}
for row in joined_data:
customer_name = row['customer_name']
if customer_name in order_counts:
order_counts[customer_name] += 1
else:
order_counts[customer_name] = 1
for customer, count in order_counts.items():
print(f"{customer}: {count}回")
# 問題3: 顧客別の総購入金額を計算
print("\n=== 顧客別総購入金額 ===")
total_spent = {}
for row in joined_data:
customer_name = row['customer_name']
amount = int(row['total_amount'])
if customer_name in total_spent:
total_spent[customer_name] += amount
else:
total_spent[customer_name] = amount
for customer, total in total_spent.items():
print(f"{customer}: {total:,}円")
# 問題4: 都市別の平均注文金額を計算
print("\n=== 都市別平均注文金額 ===")
city_orders = {}
for row in joined_data:
city = row['city']
amount = int(row['total_amount'])
if city in city_orders:
city_orders[city].append(amount)
else:
city_orders[city] = [amount]
for city, amounts in city_orders.items():
avg_amount = sum(amounts) / len(amounts)
print(f"{city}: {avg_amount:,.0f}円")
上級問題 解答
上級1: JSON APIレスポンスのシミュレーションと分析
import json
from datetime import datetime, timedelta
import random
def generate_sales_data():
"""模擬販売データの生成"""
# 問題1: 過去30日分の日次販売データを生成
end_date = datetime.now()
start_date = end_date - timedelta(days=29)
products = {
"商品A": 1500,
"商品B": 2000,
"商品C": 3000
}
sales_data = []
current_date = start_date
while current_date <= end_date:
for product_name, price in products.items():
# ランダムな販売数量を生成(0-20個)
quantity = random.randint(0, 20)
revenue = quantity * price
sales_data.append({
"date": current_date.strftime("%Y-%m-%d"),
"product": product_name,
"quantity": quantity,
"revenue": revenue
})
current_date += timedelta(days=1)
# 問題2: 生成したデータをsales.jsonに保存
with open('sales_simulation.json', 'w', encoding='utf-8') as f:
json.dump(sales_data, f, ensure_ascii=False, indent=2)
print("販売データを sales_simulation.json に保存しました")
return sales_data
def analyze_sales_data(sales_data):
"""販売データの分析"""
# 問題3: 各種分析を実行
# 商品別の総販売数量と総売上
product_stats = {}
daily_revenue = {}
for sale in sales_data:
product = sale["product"]
date = sale["date"]
# 商品別統計
if product not in product_stats:
product_stats[product] = {"total_quantity": 0, "total_revenue": 0}
product_stats[product]["total_quantity"] += sale["quantity"]
product_stats[product]["total_revenue"] += sale["revenue"]
# 日別売上
if date not in daily_revenue:
daily_revenue[date] = 0
daily_revenue[date] += sale["revenue"]
# 週次売上集計(7日間隔)
weekly_revenue = {}
dates = sorted(daily_revenue.keys())
for i in range(0, len(dates), 7):
week_dates = dates[i:i+7]
week_key = f"{week_dates[0]} ~ {week_dates[-1]}"
weekly_revenue[week_key] = sum(daily_revenue[date] for date in week_dates)
# 問題4: 分析結果をレポート形式で保存
report = {
"analysis_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"analysis_period": f"{dates[0]} ~ {dates[-1]}",
"product_statistics": product_stats,
"daily_revenue_trend": daily_revenue,
"weekly_revenue": weekly_revenue,
"summary": {
"total_revenue": sum(stats["total_revenue"] for stats in product_stats.values()),
"best_selling_product": max(product_stats.items(), key=lambda x: x[1]["total_quantity"])[0]
}
}
with open('sales_report.json', 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
# 分析結果を表示
print("\n=== 販売分析レポート ===")
print(f"分析期間: {report['analysis_period']}")
print(f"総売上: {report['summary']['total_revenue']:,}円")
print(f"ベストセラー商品: {report['summary']['best_selling_product']}")
print("\n商品別統計:")
for product, stats in product_stats.items():
print(f" {product}: 数量{stats['total_quantity']}個, 売上{stats['total_revenue']:,}円")
print("\n週次売上:")
for week, revenue in weekly_revenue.items():
print(f" {week}: {revenue:,}円")
return report
# 実行
sales_data = generate_sales_data()
report = analyze_sales_data(sales_data)
上級2: 大規模CSVデータの効率的な処理
import csv
import threading
import queue
def process_large_inventory_file(input_file, output_file):
"""
大規模在庫CSVファイルを処理する関数
"""
low_stock_products = []
category_totals = {}
total_rows = 0
# まず行数をカウント(進捗表示用)
with open(input_file, 'r', encoding='utf-8') as f:
total_rows = sum(1 for _ in f) - 1 # ヘッダー行を除く
print(f"処理対象行数: {total_rows}行")
# 問題1-4: 効率的なデータ処理
processed_rows = 0
with open(input_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
processed_rows += 1
# 問題1: 在庫切れ危険商品を抽出
current_stock = int(row['current_stock'])
min_stock = int(row['min_stock'])
if current_stock <= min_stock:
low_stock_products.append(row)
# 問題2: カテゴリー別在庫金額合計
category = row['category']
stock_value = current_stock * int(row['price'])
if category in category_totals:
category_totals[category] += stock_value
else:
category_totals[category] = stock_value
# 問題3: 在庫回転率の計算(月間販売数 = 在庫数 * 0.3)
monthly_sales = current_stock * 0.3
turnover_rate = monthly_sales / current_stock if current_stock > 0 else 0
row['turnover_rate'] = round(turnover_rate, 2)
# 問題4: 進捗状況の表示
if processed_rows % 1000 == 0:
progress = (processed_rows / total_rows) * 100
print(f"進捗: {processed_rows}/{total_rows}行 ({progress:.1f}%)")
# 在庫切れ危険商品を別ファイルに保存
with open(output_file, 'w', encoding='utf-8', newline='') as f:
if low_stock_products:
fieldnames = low_stock_products[0].keys()
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(low_stock_products)
# 結果を表示
print("\n=== 処理結果 ===")
print(f"在庫切れ危険商品数: {len(low_stock_products)}商品")
print("カテゴリー別在庫金額:")
for category, total in category_totals.items():
print(f" {category}: {total:,}円")
return low_stock_products, category_totals
# 大規模サンプルデータ生成(テスト用)
def generate_large_inventory_file(filename, num_rows=10000):
"""大規模在庫データを生成"""
categories = ['電子機器', '家具', '食品', '衣類', '書籍']
with open(filename, 'w', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
writer.writerow(['product_id', 'product_name', 'category', 'current_stock', 'min_stock', 'price'])
for i in range(num_rows):
category = categories[i % len(categories)]
current_stock = (i % 20) + 1 # 1-20の在庫
min_stock = (i % 5) + 2 # 2-6の最小在庫
price = (i % 1000) * 100 + 1000 # 1000-100000の価格
writer.writerow([
f"P{i:06d}",
f"商品{i}",
category,
current_stock,
min_stock,
price
])
# 実行例
print("大規模在庫データを生成中...")
generate_large_inventory_file('large_inventory.csv', 5000)
print("データ処理を開始...")
low_stock, category_totals = process_large_inventory_file('large_inventory.csv', 'low_stock_products.csv')
上級3: データ変換パイプラインの構築
import json
import csv
from abc import ABC, abstractmethod
# プロセッサの基底クラス
class Processor(ABC):
@abstractmethod
def process(self, data):
pass
# 具象プロセッサの実装
class JSONLoader(Processor):
def __init__(self, filename):
self.filename = filename
def process(self, data=None):
with open(self.filename, 'r', encoding='utf-8') as f:
return json.load(f)
class CSVLoader(Processor):
def __init__(self, filename):
self.filename = filename
def process(self, data=None):
result = []
with open(self.filename, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
result.append(row)
return result
class DataFilter(Processor):
def __init__(self, condition_func):
self.condition_func = condition_func
def process(self, data):
if isinstance(data, list):
return [item for item in data if self.condition_func(item)]
return data
class DataTransformer(Processor):
def __init__(self, transform_func):
self.transform_func = transform_func
def process(self, data):
if isinstance(data, list):
return [self.transform_func(item) for item in data]
return self.transform_func(data)
class DataAggregator(Processor):
def __init__(self, key_func, aggregate_func):
self.key_func = key_func
self.aggregate_func = aggregate_func
def process(self, data):
if isinstance(data, list):
aggregated = {}
for item in data:
key = self.key_func(item)
if key not in aggregated:
aggregated[key] = []
aggregated[key].append(item)
return {key: self.aggregate_func(items) for key, items in aggregated.items()}
return data
class JSONExporter(Processor):
def __init__(self, filename):
self.filename = filename
def process(self, data):
with open(self.filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return data
class CSVExporter(Processor):
def __init__(self, filename):
self.filename = filename
def process(self, data):
if isinstance(data, list) and data:
with open(self.filename, 'w', encoding='utf-8', newline='') as f:
writer = csv.DictWriter(f, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
return data
# データパイプラインクラス
class DataPipeline:
def __init__(self):
self.processors = []
def add_processor(self, processor):
self.processors.append(processor)
def process(self, input_data=None):
current_data = input_data
for processor in self.processors:
current_data = processor.process(current_data)
return current_data
# 使用例
def main():
# 問題2: 販売データ処理パイプラインの構築
pipeline = DataPipeline()
# データ読み込み
pipeline.add_processor(JSONLoader('sales_simulation.json'))
# データフィルタリング(売上があるデータのみ)
pipeline.add_processor(DataFilter(lambda x: x['revenue'] > 0))
# データ変換(日付をdatetimeオブジェクトに変換など)
def add_profit_margin(item):
item['profit_margin'] = item['revenue'] * 0.3 # 仮の利益率30%
return item
pipeline.add_processor(DataTransformer(add_profit_margin))
# データ集計(商品別売上集計)
def aggregate_by_product(items):
total_revenue = sum(item['revenue'] for item in items)
total_quantity = sum(item['quantity'] for item in items)
return {
'total_revenue': total_revenue,
'total_quantity': total_quantity,
'average_price': total_revenue / total_quantity if total_quantity > 0 else 0
}
pipeline.add_processor(DataAggregator(
key_func=lambda x: x['product'],
aggregate_func=aggregate_by_product
))
# 結果出力
pipeline.add_processor(JSONExporter('product_sales_analysis.json'))
# パイプライン実行
print("データパイプラインを実行中...")
result = pipeline.process()
print("処理が完了しました")
# 結果を表示
print("\n=== 商品別売上分析 ===")
for product, stats in result.items():
print(f"{product}:")
print(f" 総売上: {stats['total_revenue']:,}円")
print(f" 総数量: {stats['total_quantity']}個")
print(f" 平均単価: {stats['average_price']:,.0f}円")
# 問題3: 設定ファイルからのパイプライン構築
def create_pipeline_from_config(config_file):
"""設定ファイルからパイプラインを構築"""
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
pipeline = DataPipeline()
for step in config['pipeline_steps']:
processor_type = step['type']
params = step.get('params', {})
if processor_type == 'JSONLoader':
pipeline.add_processor(JSONLoader(params['filename']))
elif processor_type == 'CSVLoader':
pipeline.add_processor(CSVLoader(params['filename']))
elif processor_type == 'DataFilter':
# ここでは簡単のため固定条件を使用
pipeline.add_processor(DataFilter(lambda x: x['revenue'] > 0))
elif processor_type == 'JSONExporter':
pipeline.add_processor(JSONExporter(params['filename']))
elif processor_type == 'CSVExporter':
pipeline.add_processor(CSVExporter(params['filename']))
return pipeline
# 設定ファイルの例
pipeline_config = {
"pipeline_name": "sales_analysis",
"pipeline_steps": [
{
"type": "JSONLoader",
"params": {"filename": "sales_simulation.json"}
},
{
"type": "DataFilter",
"params": {}
},
{
"type": "JSONExporter",
"params": {"filename": "filtered_sales.json"}
}
]
}
# 設定ファイルを保存
with open('pipeline_config.json', 'w', encoding='utf-8') as f:
json.dump(pipeline_config, f, ensure_ascii=False, indent=2)
if __name__ == "__main__":
main()
# 設定ファイルからのパイプライン実行
print("\n設定ファイルからパイプラインを構築...")
config_pipeline = create_pipeline_from_config('pipeline_config.json')
config_result = config_pipeline.process()
print("設定ファイルベースのパイプライン実行完了")