データベース操作とデータ永続化の実装
2025-11-25はじめに
Pythonの基本文法を学習した次のステップとして、データベース操作とデータ永続化は非常に重要なテーマです。アプリケーション開発において、ユーザー情報や設定、アプリケーションデータなどを永続的に保存することは必須です。本記事では、Pythonからデータベースを操作する方法と、データを永続化する実装手法について詳しく解説します。
データ永続化とは
データ永続化とは、プログラムの実行が終了した後もデータが失われずに保存されることを指します。Pythonプログラムで作成したデータは、変数に格納されている限りメモリ上に存在するため、プログラム終了とともに消えてしまいます。データを長期間保存するためには、ファイルやデータベースに書き出す必要があります。
主なデータ永続化の方法には以下のようなものがあります。
- テキストファイルやCSVファイルへの保存
- JSONやXML形式での保存
- データベースへの保存(SQLite、MySQL、PostgreSQLなど)
- オブジェクトシリアライゼーション(pickleなど)
SQLiteデータベースの基本
SQLiteとは
SQLiteは、軽量でファイルベースのデータベースエンジンです。サーバー設定が不要で、単一のファイルに全てのデータが格納されるため、初学者にとって最適なデータベースです。Pythonには標準ライブラリとしてsqlite3モジュールが含まれており、すぐに使い始めることができます。
データベース接続の確立
まずは、SQLiteデータベースへの接続方法から見ていきましょう。sqlite3.connect('example.db') で指定したファイル名のデータベースに接続し、存在しなければ新規作成されます。次に cursor() でカーソルオブジェクトを作成し、これを使ってSQLの実行やデータ操作を行います。操作が終わったら conn.close() でデータベース接続を閉じ、リソースを解放します。
import sqlite3
# データベースに接続(ファイルが存在しない場合は新規作成)
conn = sqlite3.connect('example.db')
# カーソルオブジェクトを作成
cursor = conn.cursor()
# データベース操作...
# 接続を閉じる
conn.close()
with文を使用すると、接続のクローズを自動的に処理できます。with sqlite3.connect('example.db') as conn: のブロック内で接続を管理することで、処理が終わると自動的に conn.close() が呼ばれ、接続が閉じられます。その中で cursor() を作成し、SQLの実行やデータ操作を行います。これにより、接続の閉じ忘れを防ぎ、リソース管理が簡単になります。
import sqlite3
with sqlite3.connect('example.db') as conn:
cursor = conn.cursor()
# データベース操作...
テーブルの作成
データを保存するためのテーブルを作成しましょう。with sqlite3.connect('example.db') as conn: でデータベースに接続し、自動的に接続を閉じる管理を行います。
cursor() を使ってSQL操作を実行し、CREATE TABLE IF NOT EXISTS users 文により、以下の列を持つテーブルを作成します。
id:自動増分の主キーname:NULL不可の文字列email:一意制約付きの文字列age:整数created_at:デフォルトで現在時刻が設定されるタイムスタンプ
import sqlite3
with sqlite3.connect('example.db') as conn:
cursor = conn.cursor()
# usersテーブルの作成
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
age INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
print("テーブルが作成されました")
テーブルが作成されると "テーブルが作成されました" と出力されます。
データの挿入(INSERT)
テーブルにデータを追加する方法です。
import sqlite3
with sqlite3.connect('example.db') as conn:
cursor = conn.cursor()
# 単一のデータ挿入
cursor.execute('''
INSERT INTO users (name, email, age)
VALUES (?, ?, ?)
''', ('山田太郎', 'taro@example.com', 30))
# 複数データの一括挿入
users_data = [
('佐藤花子', 'hanako@example.com', 25),
('鈴木一郎', 'ichiro@example.com', 35),
('田中みどり', 'midori@example.com', 28)
]
cursor.executemany('''
INSERT INTO users (name, email, age)
VALUES (?, ?, ?)
''', users_data)
# 変更を確定
conn.commit()
print("データが挿入されました")
データの検索(SELECT)
保存したデータを検索する方法です。with sqlite3.connect('example.db') as conn: でデータベースに接続し、cursor() でSQL操作を実行します。主な処理は以下の通りです。
- 全データの取得:
SELECT * FROM usersを実行し、fetchall()で全ユーザーを取得して表示。 - 条件付き検索:
age > 28の条件でユーザーを取得し、28歳以上のユーザーを表示。プレースホルダ?を使って安全に値を渡しています。 - 単一レコードの取得:特定のメールアドレス
taro@example.comを持つユーザーをfetchone()で取得して表示。
import sqlite3
with sqlite3.connect('example.db') as conn:
cursor = conn.cursor()
# 全データの取得
cursor.execute('SELECT * FROM users')
all_users = cursor.fetchall()
print("全ユーザー:")
for user in all_users:
print(user)
# 条件付き検索
cursor.execute('SELECT * FROM users WHERE age > ?', (28,))
adult_users = cursor.fetchall()
print("\n28歳以上のユーザー:")
for user in adult_users:
print(user)
# 単一レコードの取得
cursor.execute('SELECT * FROM users WHERE email = ?', ('taro@example.com',))
user = cursor.fetchone()
print(f"\n特定ユーザー: {user}")
これにより、全件取得、条件付き取得、特定レコード取得の基本操作を確認できます。
データの更新(UPDATE)と削除(DELETE)
既存データの更新と削除方法です。with sqlite3.connect('example.db') as conn: でデータベースに接続し、cursor() を使ってSQL操作を実行します。
- 更新:
UPDATE users SET age = ? WHERE name = ?により、名前が「山田太郎」のユーザーの年齢を31に変更しています。 - 削除:
DELETE FROM users WHERE name = ?により、名前が「田中みどり」のユーザーを削除しています。 conn.commit()で変更を確定し、最後に"データが更新されました"と出力されます。
import sqlite3
with sqlite3.connect('example.db') as conn:
cursor = conn.cursor()
# データの更新
cursor.execute('''
UPDATE users
SET age = ?
WHERE name = ?
''', (31, '山田太郎'))
# データの削除
cursor.execute('DELETE FROM users WHERE name = ?', ('田中みどり',))
conn.commit()
print("データが更新されました")
これにより、特定条件に基づくデータの修正と削除が行えます。
トランザクション処理
データベース操作では、複数の操作を一つの単位として扱うトランザクションが重要です。関数 transfer_funds(from_user, to_user, amount) は、指定した送金元ユーザーから送金先ユーザーへ指定額を移動させます。処理の流れは以下の通りです。
- 接続:
with sqlite3.connect('bank.db') as conn:でデータベースに接続し、自動的に接続管理。 - 残高確認:送金元ユーザーの残高を
SELECTで取得し、残高不足の場合はValueErrorを発生。 - 送金処理:送金元の残高から指定額を引き、送金先に同額を加える
UPDATEを実行。 - トランザクション確定:
conn.commit()で変更を確定。 - エラー処理:例外発生時は
conn.rollback()で変更を取り消し、エラーメッセージを表示。
import sqlite3
def transfer_funds(from_user, to_user, amount):
try:
with sqlite3.connect('bank.db') as conn:
cursor = conn.cursor()
# 送金元の残高を確認
cursor.execute('SELECT balance FROM accounts WHERE user_id = ?', (from_user,))
from_balance = cursor.fetchone()[0]
if from_balance < amount:
raise ValueError("残高不足")
# 送金元から引き落とし
cursor.execute('''
UPDATE accounts
SET balance = balance - ?
WHERE user_id = ?
''', (amount, from_user))
# 送金先に入金
cursor.execute('''
UPDATE accounts
SET balance = balance + ?
WHERE user_id = ?
''', (amount, to_user))
# トランザクションの確定
conn.commit()
print("送金が完了しました")
except Exception as e:
print(f"エラーが発生しました: {e}")
# エラー時はロールバック
conn.rollback()
これにより、残高チェックと安全なトランザクション管理を行った送金処理が実現できます。
エラーハンドリング
堅牢なアプリケーションのためには、適切なエラーハンドリングが不可欠です。処理の流れは以下の通りです。
with sqlite3.connect('example.db') as conn:でデータベースに接続し、接続管理を自動化。cursor()を使い、INSERT INTO users (name, email, age) VALUES (?, ?, ?)で指定された名前・メール・年齢をテーブルに追加。conn.commit()で変更を確定。- 例外処理として、メールアドレスが重複した場合は
sqlite3.IntegrityErrorをキャッチしエラーメッセージを表示。その他のデータベースエラーや予期せぬ例外も捕捉してメッセージを出力。
import sqlite3
def add_user(name, email, age):
try:
with sqlite3.connect('example.db') as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO users (name, email, age)
VALUES (?, ?, ?)
''', (name, email, age))
conn.commit()
print("ユーザーが正常に追加されました")
except sqlite3.IntegrityError:
print("エラー: メールアドレスが既に存在します")
except sqlite3.Error as e:
print(f"データベースエラー: {e}")
except Exception as e:
print(f"予期せぬエラー: {e}")
これにより、データ整合性を保ちながら安全にユーザーを登録できます。
ORM(Object-Relational Mapping)の利用
SQLAlchemyの基本
複雑なアプリケーションでは、ORMを使用するとPythonのオブジェクトとしてデータベースを操作でき、生産性が向上します。SQLAlchemyはPythonで最も人気のあるORMの一つです。
まずはpipコマンドを利用してインストールします。
※pipコマンドは、Pythonのパッケージ管理システムです。サードパーティ製ライブラリを簡単に導入・管理するための標準ツールです。
pip install sqlalchemy
モデルの定義
SQLAlchemyを使ってSQLiteデータベースに users テーブルを定義し、操作する準備を行います。
- モデル定義:
declarative_base()でベースクラスを作成し、Userクラスでテーブル構造を定義。id:自動増分の主キーname:必須の文字列email:一意制約付きの必須文字列age:整数created_at:デフォルトで現在時刻__repr__でインスタンス表示を分かりやすく定義
- データベース接続:
create_engine('sqlite:///example.db')でSQLiteに接続。 - テーブル作成:
Base.metadata.create_all(engine)で定義したテーブルをデータベースに作成。 - セッション作成:
sessionmaker(bind=engine)でセッションを作成し、session = Session()で操作用のセッションを準備。
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
# ベースモデルの作成
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), nullable=False)
email = Column(String(100), unique=True, nullable=False)
age = Column(Integer)
created_at = Column(DateTime, default=datetime.now)
def __repr__(self):
return f""
# データベース接続の設定
engine = create_engine('sqlite:///example.db')
Base.metadata.create_all(engine)
# セッションの作成
Session = sessionmaker(bind=engine)
session = Session()
これにより、SQLAlchemyでORMを使った安全なデータベース操作の基盤が整います。
ORMを使用したCRUD操作
SQLAlchemyを使って users テーブルのデータ操作を行います。処理内容をまとめると以下の通りです。
- データの追加
new_userを作成しsession.add()で追加後、session.commit()で確定。- 複数ユーザーを
session.add_all()で一括追加し、コミット。
- データの検索
session.query(User).all()で全件取得し、ループで表示。filter()を使って年齢20以上のユーザーを取得。filter_by()で特定のメールアドレスのユーザーを1件取得。
- データの更新
- 検索したユーザーの
ageを変更し、session.commit()で反映。
- 検索したユーザーの
- データの削除
- 削除対象のユーザーを取得後、
session.delete()で削除し、コミット。
- 削除対象のユーザーを取得後、
# データの追加
new_user = User(name='高橋さくら', email='sakura@example.com', age=22)
session.add(new_user)
session.commit()
# 複数データの追加
users = [
User(name='伊藤健太', email='kenta@example.com', age=29),
User(name='山本愛', email='ai@example.com', age=26)
]
session.add_all(users)
session.commit()
# データの検索
# 全件取得
all_users = session.query(User).all()
for user in all_users:
print(user)
# 条件検索
adult_users = session.query(User).filter(User.age >= 20).all()
specific_user = session.query(User).filter_by(email='sakura@example.com').first()
# データの更新
user = session.query(User).filter_by(email='sakura@example.com').first()
user.age = 23
session.commit()
# データの削除
user_to_delete = session.query(User).filter_by(name='山本愛').first()
session.delete(user_to_delete)
session.commit()
このように、SQLAlchemyのセッションを使うことで、追加・検索・更新・削除の基本操作を簡潔に安全に実行できます。
その他のデータ永続化方法
JSONファイルの利用
小規模なデータや設定ファイルにはJSON形式が便利です。
- データの作成:
dataにユーザー情報と設定情報を辞書でまとめる。 - JSONへの保存:
open('data.json', 'w', encoding='utf-8')でファイルを開き、json.dump()でデータを書き込む。ensure_ascii=Falseにより日本語も文字化けせず保存され、indent=2で見やすく整形。 - JSONの読み込み:
open('data.json', 'r', encoding='utf-8')でファイルを開き、json.load()でデータを読み込む。 - データの確認:読み込んだ内容を
print()で表示。
import json
# データの保存
data = {
'users': [
{'name': '山田太郎', 'email': 'taro@example.com', 'age': 30},
{'name': '佐藤花子', 'email': 'hanako@example.com', 'age': 25}
],
'settings': {
'theme': 'dark',
'language': 'ja'
}
}
with open('data.json', 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# データの読み込み
with open('data.json', 'r', encoding='utf-8') as f:
loaded_data = json.load(f)
print(loaded_data)
これにより、Pythonの辞書データを簡単にJSONファイルとして保存・復元できます。
pickleによるオブジェクトのシリアライズ
Pythonオブジェクトをそのまま保存する場合に便利です。
- クラス定義:
Userクラスを定義し、名前・メール・年齢を属性として持つ。__repr__で表示形式を定義。 - オブジェクト作成:
usersリストにUserインスタンスを作成。 - オブジェクトの保存:
with open('users.pkl', 'wb')でバイナリモードでファイルを開き、pickle.dump()でオブジェクトをシリアライズして保存。 - オブジェクトの読み込み:
with open('users.pkl', 'rb')でファイルを開き、pickle.load()でオブジェクトを復元。 - 確認表示:読み込んだユーザーオブジェクトをループで表示。
import pickle
class User:
def __init__(self, name, email, age):
self.name = name
self.email = email
self.age = age
def __repr__(self):
return f"User({self.name}, {self.email}, {self.age})"
# オブジェクトの保存
users = [
User('山田太郎', 'taro@example.com', 30),
User('佐藤花子', 'hanako@example.com', 25)
]
with open('users.pkl', 'wb') as f:
pickle.dump(users, f)
# オブジェクトの読み込み
with open('users.pkl', 'rb') as f:
loaded_users = pickle.load(f)
for user in loaded_users:
print(user)
これにより、Pythonオブジェクトを簡単に保存・復元でき、プログラム間でデータをやり取りできます。
実践プログラム例】アドレス帳アプリケーション
最後に、学んだ知識を統合した簡単なアドレス帳アプリケーションの例を示します。
AddressBook クラスに主要な機能をまとめています。
- データベース初期化
init_database()でcontactsテーブルを作成(存在しない場合のみ)。- テーブルには
id(主キー)、name、phone、email、address、created_atを持つ。
- 連絡先の追加
add_contact()で名前・電話・メール・住所を指定して追加。INSERT文を使い、commit()で保存。
- 検索機能
search_contacts()で名前・電話・メールに対して部分一致検索。LIKE文を使用し、検索結果をリストで返す。
- 全件表示
list_all_contacts()で登録されている全連絡先を名前順で取得。
- 削除機能
delete_contact()で指定IDの連絡先を削除し、変更をコミット。
- 使用例
- アプリ起動時に
AddressBookを作成し、連絡先を追加。 - 全連絡先を表示し、キーワード検索の結果も表示。
- アプリ起動時に
import sqlite3
from datetime import datetime
class AddressBook:
def __init__(self, db_path='address_book.db'):
self.db_path = db_path
self.init_database()
def init_database(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS contacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
phone TEXT,
email TEXT,
address TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
def add_contact(self, name, phone=None, email=None, address=None):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO contacts (name, phone, email, address)
VALUES (?, ?, ?, ?)
''', (name, phone, email, address))
conn.commit()
print(f"連絡先 '{name}' を追加しました")
def search_contacts(self, search_term):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM contacts
WHERE name LIKE ? OR phone LIKE ? OR email LIKE ?
''', (f'%{search_term}%', f'%{search_term}%', f'%{search_term}%'))
results = cursor.fetchall()
return results
def list_all_contacts(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM contacts ORDER BY name')
return cursor.fetchall()
def delete_contact(self, contact_id):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM contacts WHERE id = ?', (contact_id,))
conn.commit()
print(f"連絡先ID {contact_id} を削除しました")
# アプリケーションの使用例
if __name__ == "__main__":
ab = AddressBook()
# 連絡先の追加
ab.add_contact("山田太郎", "03-1234-5678", "taro@example.com", "東京都渋谷区")
ab.add_contact("佐藤花子", "06-9876-5432", "hanako@example.com", "大阪府大阪市")
# 全連絡先の表示
print("全連絡先:")
for contact in ab.list_all_contacts():
print(contact)
# 検索
print("\n'山田'で検索:")
results = ab.search_contacts("山田")
for contact in results:
print(contact)
まとめると、このプログラムはSQLiteで連絡先情報を管理し、追加・検索・一覧表示・削除を簡単に行えるアドレス帳アプリケーションです。
まとめ
本記事では、Pythonを用いたデータベース操作とデータ永続化について解説しました。まず、データ永続化の基本概念とその重要性を説明し、次にSQLiteデータベースを使った基本的なCRUD(作成・読み取り・更新・削除)操作を紹介しました。
さらに、トランザクション処理やエラーハンドリングによる安全なデータ操作の方法を解説し、SQLAlchemyを用いたORM(オブジェクト関係マッピング)の基本も示しました。また、JSONやpickleを利用したその他のデータ永続化手法についても触れています。最後に、これらの知識を活用した実践的なアドレス帳アプリケーションの例を通して、データ管理の具体的な応用方法を紹介しました。
データベース操作は現代のアプリケーション開発において不可欠なスキルです。基本をしっかり理解し、実際のプロジェクトで応用することで、より実践的なスキルを身につけることができます。次のステップとしては、MySQLやPostgreSQLのような本格的なデータベースシステムの学習や、Djangoなどのフレームワーク内でのデータベース操作の学習に進むことをおすすめします。
演習問題
初級問題(3問)
初級1: 基本的なデータベース接続
以下の条件を満たすPythonプログラムを作成してください
students.dbというSQLiteデータベースを作成studentsテーブルを作成(id, name, grade, scoreのカラム)- 3人分の生徒データを挿入
- 全生徒データを表示
解答例の骨格:
import sqlite3
# データベース接続
conn = sqlite3.connect('students.db')
cursor = conn.cursor()
# テーブル作成
cursor.execute('''...''')
# データ挿入
students = [
('田中太郎', 1, 85),
('佐藤花子', 2, 92),
('鈴木一郎', 1, 78)
]
cursor.executemany('''...''', students)
# データ表示
cursor.execute('''...''')
for row in cursor.fetchall():
print(row)
conn.commit()
conn.close()
初級2: 条件付き検索
studentsテーブルから以下の条件でデータを検索するプログラムを作成
- 成績(score)が80点以上の生徒
- 2年生(grade=2)の生徒
- 特定の名前(例:「田中太郎」)の生徒
初級3: データの更新と削除
- 特定の生徒の点数を更新
- 特定の生徒を削除
- 変更前後のデータを表示して確認
中級問題(6問)
中級1: エラーハンドリング
データベース操作にエラーハンドリングを追加
- 重複データ挿入時の処理
- 存在しないデータへのアクセス
- データベース接続エラー
try:
# データベース操作
except sqlite3.IntegrityError:
print("重複データエラー")
except sqlite3.Error as e:
print(f"データベースエラー: {e}")
中級2: トランザクション処理
銀行口座システムを模したプログラム
- 口座間送金機能
- 残高確認
- トランザクションの適切な管理
中級3: コンテキストマネージャの実装
with文を使用してデータベース接続を管理するクラスを作成
class DatabaseManager:
def __enter__(self):
# 接続開始
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# 接続終了処理
pass
中級4: JSON連携
- JSONファイルからデータを読み込みデータベースに保存
- データベースのデータをJSONファイルにエクスポート
JSON例:
{
"students": [
{"name": "田中太郎", "grade": 1, "score": 85},
{"name": "佐藤花子", "grade": 2, "score": 92}
]
}
中級5: バッチ処理
CSVファイルから大量のデータをデータベースに一括登録
name,grade,score
田中太郎,1,85
佐藤花子,2,92
鈴木一郎,1,78
中級6: 集計クエリ
以下の集計処理を実装
- 各学年の平均点
- 最高点・最低点
- 点数順でのランキング
上級問題(3問)
上級1: ORMを使った書籍管理システム
SQLAlchemyを使用して書籍管理システムを実装
from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class Book(Base):
__tablename__ = 'books'
id = Column(Integer, primary_key=True)
title = Column(String(100), nullable=False)
author = Column(String(50), nullable=False)
price = Column(Float)
stock = Column(Integer, default=0)
実装要件:
- 書籍の登録・検索・更新・削除
- 在庫管理機能
- 著者別書籍一覧
上級2: 非同期データベース操作
asyncioとaiosqliteを使用した非同期データベース操作
import asyncio
import aiosqlite
async def main():
async with aiosqlite.connect('database.db') as db:
# 非同期データベース操作
pass
実装要件:
- 非同期でのCRUD操作
- 複数クエリの並行実行
- パフォーマンス比較
上級3: データベースマイグレーションシステム
シンプルなマイグレーションシステムの実装です。
class Migration:
def __init__(self, db_path):
self.db_path = db_path
self.init_migration_table()
def init_migration_table(self):
# マイグレーション履歴テーブル作成
pass
def create_migration(self, name, sql):
# マイグレーション実行
pass
def rollback(self, migration_id):
# ロールバック処理
pass
実装要件:
- マイグレーション履歴の管理
- ロールバック機能
- マイグレーション状態の確認
演習問題 解答例
初級問題 解答例
初級1: 基本的なデータベース接続
import sqlite3
def setup_student_database():
# データベース接続
conn = sqlite3.connect('students.db')
cursor = conn.cursor()
# テーブル作成
cursor.execute('''
CREATE TABLE IF NOT EXISTS students (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
grade INTEGER NOT NULL,
score INTEGER NOT NULL
)
''')
# データ挿入
students = [
('田中太郎', 1, 85),
('佐藤花子', 2, 92),
('鈴木一郎', 1, 78)
]
cursor.executemany('''
INSERT INTO students (name, grade, score)
VALUES (?, ?, ?)
''', students)
# データ表示
cursor.execute('SELECT * FROM students')
print("生徒一覧:")
for row in cursor.fetchall():
print(f"ID: {row[0]}, 名前: {row[1]}, 学年: {row[2]}, 点数: {row[3]}")
conn.commit()
conn.close()
if __name__ == "__main__":
setup_student_database()
初級2: 条件付き検索
import sqlite3
def search_students():
conn = sqlite3.connect('students.db')
cursor = conn.cursor()
print("=== 80点以上の生徒 ===")
cursor.execute('SELECT * FROM students WHERE score >= ?', (80,))
for row in cursor.fetchall():
print(f"ID: {row[0]}, 名前: {row[1]}, 学年: {row[2]}, 点数: {row[3]}")
print("\n=== 2年生の生徒 ===")
cursor.execute('SELECT * FROM students WHERE grade = ?', (2,))
for row in cursor.fetchall():
print(f"ID: {row[0]}, 名前: {row[1]}, 学年: {row[2]}, 点数: {row[3]}")
print("\n=== 特定の名前の生徒 ===")
cursor.execute('SELECT * FROM students WHERE name = ?', ('田中太郎',))
result = cursor.fetchone()
if result:
print(f"ID: {result[0]}, 名前: {result[1]}, 学年: {result[2]}, 点数: {result[3]}")
else:
print("該当する生徒が見つかりません")
conn.close()
if __name__ == "__main__":
search_students()
初級3: データの更新と削除
import sqlite3
def update_and_delete_students():
conn = sqlite3.connect('students.db')
cursor = conn.cursor()
# 更新前のデータ表示
print("=== 更新前のデータ ===")
cursor.execute('SELECT * FROM students')
for row in cursor.fetchall():
print(f"ID: {row[0]}, 名前: {row[1]}, 学年: {row[2]}, 点数: {row[3]}")
# データ更新(田中太郎の点数を90点に)
cursor.execute('UPDATE students SET score = ? WHERE name = ?', (90, '田中太郎'))
# データ削除(鈴木一郎を削除)
cursor.execute('DELETE FROM students WHERE name = ?', ('鈴木一郎',))
# 更新後のデータ表示
print("\n=== 更新後のデータ ===")
cursor.execute('SELECT * FROM students')
for row in cursor.fetchall():
print(f"ID: {row[0]}, 名前: {row[1]}, 学年: {row[2]}, 点数: {row[3]}")
conn.commit()
conn.close()
if __name__ == "__main__":
update_and_delete_students()
中級問題 解答例
中級1: エラーハンドリング
import sqlite3
def error_handling_example():
try:
conn = sqlite3.connect('students.db')
cursor = conn.cursor()
# 重複データ挿入を試みる(UNIQUE制約がある場合)
cursor.execute('''
CREATE TABLE IF NOT EXISTS unique_students (
id INTEGER PRIMARY KEY AUTOINCREMENT,
student_id TEXT UNIQUE NOT NULL,
name TEXT NOT NULL
)
''')
# 最初の挿入(成功)
cursor.execute('INSERT INTO unique_students (student_id, name) VALUES (?, ?)',
('001', '山田太郎'))
# 重複したstudent_idで挿入(失敗)
cursor.execute('INSERT INTO unique_students (student_id, name) VALUES (?, ?)',
('001', '山田次郎'))
conn.commit()
except sqlite3.IntegrityError:
print("エラー: 重複するstudent_idが存在します")
except sqlite3.Error as e:
print(f"データベースエラー: {e}")
except Exception as e:
print(f"予期せぬエラー: {e}")
finally:
if 'conn' in locals():
conn.close()
def handle_nonexistent_data():
try:
conn = sqlite3.connect('students.db')
cursor = conn.cursor()
# 存在しないIDで検索
cursor.execute('SELECT * FROM students WHERE id = ?', (999,))
result = cursor.fetchone()
if result:
print(f"見つかりました: {result}")
else:
print("該当するデータが見つかりません")
except sqlite3.Error as e:
print(f"データベースエラー: {e}")
finally:
conn.close()
if __name__ == "__main__":
error_handling_example()
handle_nonexistent_data()
中級2: トランザクション処理
import sqlite3
class BankSystem:
def __init__(self, db_path='bank.db'):
self.db_path = db_path
self.init_database()
def init_database(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS accounts (
account_number TEXT PRIMARY KEY,
account_holder TEXT NOT NULL,
balance REAL DEFAULT 0.0
)
''')
# サンプルデータ
sample_accounts = [
('123-456-001', '田中太郎', 10000.0),
('123-456-002', '佐藤花子', 5000.0),
('123-456-003', '鈴木一郎', 8000.0)
]
cursor.executemany('''
INSERT OR IGNORE INTO accounts
(account_number, account_holder, balance)
VALUES (?, ?, ?)
''', sample_accounts)
conn.commit()
def transfer_funds(self, from_account, to_account, amount):
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# 送金元の残高確認
cursor.execute('SELECT balance FROM accounts WHERE account_number = ?',
(from_account,))
from_balance = cursor.fetchone()
if not from_balance:
raise ValueError("送金元口座が存在しません")
if from_balance[0] < amount:
raise ValueError("残高不足です")
# 送金処理
cursor.execute('''
UPDATE accounts
SET balance = balance - ?
WHERE account_number = ?
''', (amount, from_account))
cursor.execute('''
UPDATE accounts
SET balance = balance + ?
WHERE account_number = ?
''', (amount, to_account))
# トランザクション確定
conn.commit()
print(f"送金成功: {from_account} → {to_account} ({amount}円)")
except sqlite3.Error as e:
print(f"データベースエラー: {e}")
conn.rollback()
except ValueError as e:
print(f"送金エラー: {e}")
if 'conn' in locals():
conn.rollback()
def show_all_accounts(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM accounts')
print("=== 全口座一覧 ===")
for row in cursor.fetchall():
print(f"口座番号: {row[0]}, 名義人: {row[1]}, 残高: {row[2]:.2f}円")
if __name__ == "__main__":
bank = BankSystem()
bank.show_all_accounts()
print("\n--- 送金実行 ---")
bank.transfer_funds('123-456-001', '123-456-002', 2000.0)
print("\n--- 送金後 ---")
bank.show_all_accounts()
中級3: コンテキストマネージャの実装
import sqlite3
class DatabaseManager:
def __init__(self, db_path):
self.db_path = db_path
self.conn = None
def __enter__(self):
self.conn = sqlite3.connect(self.db_path)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.conn:
if exc_type is not None:
self.conn.rollback()
else:
self.conn.commit()
self.conn.close()
def execute_query(self, query, params=None):
cursor = self.conn.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
return cursor
def fetch_all(self, query, params=None):
cursor = self.execute_query(query, params)
return cursor.fetchall()
def fetch_one(self, query, params=None):
cursor = self.execute_query(query, params)
return cursor.fetchone()
def use_database_manager():
# コンテキストマネージャを使用
with DatabaseManager('students.db') as db:
# テーブル作成
db.execute_query('''
CREATE TABLE IF NOT EXISTS courses (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
instructor TEXT NOT NULL
)
''')
# データ挿入
courses = [
('Python基礎', '山田先生'),
('データベース入門', '佐藤先生'),
('Web開発', '鈴木先生')
]
for course in courses:
db.execute_query(
'INSERT INTO courses (name, instructor) VALUES (?, ?)',
course
)
# データ検索
results = db.fetch_all('SELECT * FROM courses')
print("=== コース一覧 ===")
for row in results:
print(f"ID: {row[0]}, コース名: {row[1]}, 講師: {row[2]}")
if __name__ == "__main__":
use_database_manager()
中級4: JSON連携
import sqlite3
import json
import os
class JSONDatabaseManager:
def __init__(self, db_path='students.db'):
self.db_path = db_path
self.init_database()
def init_database(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS students (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
grade INTEGER NOT NULL,
score INTEGER NOT NULL
)
''')
def import_from_json(self, json_file):
"""JSONファイルからデータをインポート"""
try:
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
for student in data.get('students', []):
cursor.execute('''
INSERT INTO students (name, grade, score)
VALUES (?, ?, ?)
''', (student['name'], student['grade'], student['score']))
conn.commit()
print(f"JSONファイルから{len(data['students'])}件のデータをインポートしました")
except FileNotFoundError:
print("JSONファイルが見つかりません")
except json.JSONDecodeError:
print("JSONファイルの形式が不正です")
def export_to_json(self, json_file):
"""データベースのデータをJSONファイルにエクスポート"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM students')
rows = cursor.fetchall()
students_data = []
for row in rows:
students_data.append({
'id': row[0],
'name': row[1],
'grade': row[2],
'score': row[3]
})
data = {'students': students_data}
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"JSONファイルに{len(students_data)}件のデータをエクスポートしました")
def show_students(self):
"""学生データを表示"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM students')
print("=== 学生一覧 ===")
for row in cursor.fetchall():
print(f"ID: {row[0]}, 名前: {row[1]}, 学年: {row[2]}, 点数: {row[3]}")
if __name__ == "__main__":
manager = JSONDatabaseManager()
# サンプルJSONファイルを作成
sample_data = {
"students": [
{"name": "田中太郎", "grade": 1, "score": 85},
{"name": "佐藤花子", "grade": 2, "score": 92},
{"name": "鈴木一郎", "grade": 1, "score": 78}
]
}
with open('sample_data.json', 'w', encoding='utf-8') as f:
json.dump(sample_data, f, ensure_ascii=False, indent=2)
# JSONからインポート
manager.import_from_json('sample_data.json')
manager.show_students()
# JSONにエクスポート
manager.export_to_json('exported_data.json')
中級5: バッチ処理
import sqlite3
import csv
import os
class BatchProcessor:
def __init__(self, db_path='students.db'):
self.db_path = db_path
self.init_database()
def init_database(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS students (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
grade INTEGER NOT NULL,
score INTEGER NOT NULL
)
''')
def create_sample_csv(self, csv_file):
"""サンプルCSVファイルを作成"""
sample_data = [
['name', 'grade', 'score'],
['田中太郎', 1, 85],
['佐藤花子', 2, 92],
['鈴木一郎', 1, 78],
['山本さくら', 3, 88],
['伊藤健太', 2, 76],
['高橋みなみ', 1, 95]
]
with open(csv_file, 'w', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
writer.writerows(sample_data)
print(f"サンプルCSVファイルを作成: {csv_file}")
def import_from_csv(self, csv_file, batch_size=100):
"""CSVファイルからデータをバッチ処理でインポート"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
with open(csv_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
batch = []
total_count = 0
for row in reader:
batch.append((row['name'], int(row['grade']), int(row['score'])))
if len(batch) >= batch_size:
cursor.executemany('''
INSERT INTO students (name, grade, score)
VALUES (?, ?, ?)
''', batch)
total_count += len(batch)
batch = []
print(f"{total_count}件処理...")
# 残りのデータを処理
if batch:
cursor.executemany('''
INSERT INTO students (name, grade, score)
VALUES (?, ?, ?)
''', batch)
total_count += len(batch)
conn.commit()
print(f"合計{total_count}件のデータをインポートしました")
except FileNotFoundError:
print("CSVファイルが見つかりません")
except Exception as e:
print(f"エラーが発生しました: {e}")
def export_to_csv(self, csv_file):
"""データベースのデータをCSVファイルにエクスポート"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM students')
rows = cursor.fetchall()
with open(csv_file, 'w', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
writer.writerow(['id', 'name', 'grade', 'score'])
writer.writerows(rows)
print(f"CSVファイルに{len(rows)}件のデータをエクスポートしました")
def show_statistics(self):
"""統計情報を表示"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# 総件数
cursor.execute('SELECT COUNT(*) FROM students')
total_count = cursor.fetchone()[0]
# 平均点
cursor.execute('SELECT AVG(score) FROM students')
avg_score = cursor.fetchone()[0]
# 最高点・最低点
cursor.execute('SELECT MAX(score), MIN(score) FROM students')
max_score, min_score = cursor.fetchone()
print(f"総件数: {total_count}件")
print(f"平均点: {avg_score:.1f}点")
print(f"最高点: {max_score}点")
print(f"最低点: {min_score}点")
if __name__ == "__main__":
processor = BatchProcessor()
# サンプルCSVファイルを作成
csv_file = 'students_data.csv'
processor.create_sample_csv(csv_file)
# CSVからインポート
processor.import_from_csv(csv_file, batch_size=2)
# 統計情報表示
processor.show_statistics()
# CSVにエクスポート
processor.export_to_csv('exported_students.csv')
中級6: 集計クエリ
import sqlite3
class StudentAnalyzer:
def __init__(self, db_path='students.db'):
self.db_path = db_path
def setup_sample_data(self):
"""サンプルデータをセットアップ"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# 既存のテーブルを削除
cursor.execute('DROP TABLE IF EXISTS students')
# テーブル作成
cursor.execute('''
CREATE TABLE students (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
grade INTEGER NOT NULL,
score INTEGER NOT NULL
)
''')
# サンプルデータ挿入
students = [
('田中太郎', 1, 85), ('佐藤花子', 2, 92), ('鈴木一郎', 1, 78),
('山本さくら', 3, 88), ('伊藤健太', 2, 76), ('高橋みなみ', 1, 95),
('中村勇気', 3, 82), ('小林めぐみ', 2, 89), ('加藤大輔', 1, 72),
('吉田ゆり', 3, 91)
]
cursor.executemany('''
INSERT INTO students (name, grade, score)
VALUES (?, ?, ?)
''', students)
conn.commit()
def grade_statistics(self):
"""学年別統計"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT
grade,
COUNT(*) as count,
AVG(score) as average,
MAX(score) as max_score,
MIN(score) as min_score
FROM students
GROUP BY grade
ORDER BY grade
''')
print("=== 学年別統計 ===")
for row in cursor.fetchall():
print(f"学年 {row[0]}: {row[1]}人, 平均 {row[2]:.1f}点, "
f"最高 {row[3]}点, 最低 {row[4]}点")
def score_ranking(self, limit=5):
"""点数ランキング"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT name, grade, score
FROM students
ORDER BY score DESC
LIMIT ?
''', (limit,))
print(f"\n=== 上位{limit}名 点数ランキング ===")
for i, row in enumerate(cursor.fetchall(), 1):
print(f"{i}位: {row[0]} (学年{row[1]}) - {row[2]}点")
def score_distribution(self):
"""点数分布"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT
CASE
WHEN score >= 90 THEN '90点以上'
WHEN score >= 80 THEN '80-89点'
WHEN score >= 70 THEN '70-79点'
WHEN score >= 60 THEN '60-69点'
ELSE '60点未満'
END as score_range,
COUNT(*) as count
FROM students
GROUP BY score_range
ORDER BY MIN(score) DESC
''')
print("\n=== 点数分布 ===")
for row in cursor.fetchall():
print(f"{row[0]}: {row[1]}人")
def advanced_analysis(self):
"""詳細分析"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# 標準偏差(近似値)
cursor.execute('''
SELECT
AVG((score - (SELECT AVG(score) FROM students)) *
(score - (SELECT AVG(score) FROM students))) as variance
FROM students
''')
variance = cursor.fetchone()[0]
std_dev = variance ** 0.5
# 中央値(近似値)
cursor.execute('''
SELECT score
FROM students
ORDER BY score
LIMIT 1
OFFSET (SELECT COUNT(*) FROM students) / 2
''')
median = cursor.fetchone()[0]
print(f"\n=== 詳細分析 ===")
print(f"標準偏差: {std_dev:.2f}")
print(f"中央値: {median}点")
if __name__ == "__main__":
analyzer = StudentAnalyzer()
analyzer.setup_sample_data()
analyzer.grade_statistics()
analyzer.score_ranking()
analyzer.score_distribution()
analyzer.advanced_analysis()
上級問題 解答例
上級1: ORMを使った書籍管理システム
from sqlalchemy import create_engine, Column, Integer, String, Float, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import ForeignKey
Base = declarative_base()
class Author(Base):
__tablename__ = 'authors'
id = Column(Integer, primary_key=True)
name = Column(String(50), nullable=False)
email = Column(String(100))
books = relationship("Book", back_populates="author")
def __repr__(self):
return f""
class Book(Base):
__tablename__ = 'books'
id = Column(Integer, primary_key=True)
title = Column(String(100), nullable=False)
author_id = Column(Integer, ForeignKey('authors.id'))
price = Column(Float)
stock = Column(Integer, default=0)
description = Column(Text)
author = relationship("Author", back_populates="books")
def __repr__(self):
return f""
class LibraryManager:
def __init__(self, db_path='library.db'):
self.engine = create_engine(f'sqlite:///{db_path}')
Base.metadata.create_all(self.engine)
Session = sessionmaker(bind=self.engine)
self.session = Session()
def add_author(self, name, email=None):
author = Author(name=name, email=email)
self.session.add(author)
self.session.commit()
print(f"著者 '{name}' を追加しました")
return author
def add_book(self, title, author_name, price, stock=0, description=None):
# 著者を検索または作成
author = self.session.query(Author).filter_by(name=author_name).first()
if not author:
author = self.add_author(author_name)
book = Book(
title=title,
author_id=author.id,
price=price,
stock=stock,
description=description
)
self.session.add(book)
self.session.commit()
print(f"書籍 '{title}' を追加しました")
return book
def search_books(self, keyword=None, author_name=None, max_price=None):
query = self.session.query(Book).join(Author)
if keyword:
query = query.filter(Book.title.contains(keyword) |
Book.description.contains(keyword))
if author_name:
query = query.filter(Author.name == author_name)
if max_price:
query = query.filter(Book.price <= max_price)
return query.all()
def update_stock(self, book_id, new_stock):
book = self.session.query(Book).get(book_id)
if book:
old_stock = book.stock
book.stock = new_stock
self.session.commit()
print(f"在庫を更新: '{book.title}' {old_stock} → {new_stock}")
else:
print("書籍が見つかりません")
def get_books_by_author(self, author_name):
author = self.session.query(Author).filter_by(name=author_name).first()
if author:
return author.books
return []
def show_inventory(self):
books = self.session.query(Book).all()
print("=== 在庫一覧 ===")
for book in books:
status = "在庫あり" if book.stock > 0 else "在庫切れ"
print(f"『{book.title}』 - {book.author.name} "
f"({book.price:.2f}円) - {status} ({book.stock}冊)")
def low_stock_alert(self, threshold=5):
low_stock_books = self.session.query(Book).filter(Book.stock < threshold).all()
if low_stock_books:
print(f"\n=== 在庫少ない書籍 (閾値: {threshold}冊) ===")
for book in low_stock_books:
print(f"『{book.title}』 - 在庫: {book.stock}冊")
else:
print("在庫が少ない書籍はありません")
if __name__ == "__main__":
manager = LibraryManager()
# サンプルデータ追加
manager.add_book("Python入門", "山田太郎", 2500, 10, "Pythonの基礎を学ぶ入門書")
manager.add_book("データベース実践", "佐藤花子", 3200, 5, "実践的なデータベース技術")
manager.add_book("Web開発の極意", "山田太郎", 2800, 3, "モダンなWeb開発手法")
manager.add_book("機械学習入門", "鈴木一郎", 3500, 8, "機械学習の基礎理論と実装")
print("\n" + "="*50)
# 在庫一覧表示
manager.show_inventory()
# 検索機能
print("\n=== '入門'で検索 ===")
results = manager.search_books(keyword='入門')
for book in results:
print(f"『{book.title}』 - {book.author.name}")
# 著者別書籍一覧
print("\n=== 山田太郎の書籍 ===")
author_books = manager.get_books_by_author('山田太郎')
for book in author_books:
print(f"『{book.title}』 - {book.price:.2f}円")
# 在庫アラート
manager.low_stock_alert(threshold=5)
# 在庫更新
book_to_update = manager.session.query(Book).filter_by(title="Web開発の極意").first()
if book_to_update:
manager.update_stock(book_to_update.id, 15)
print("\n=== 更新後の在庫 ===")
manager.show_inventory()
上級2: 非同期データベース操作
import asyncio
import aiosqlite
import time
import sqlite3
class AsyncDatabaseManager:
def __init__(self, db_path='async_example.db'):
self.db_path = db_path
async def init_database(self):
"""データベースの初期化"""
async with aiosqlite.connect(self.db_path) as db:
await db.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
await db.execute('''
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
title TEXT NOT NULL,
content TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
await db.commit()
print("データベースを初期化しました")
async def add_user(self, name, email):
"""ユーザーを追加"""
async with aiosqlite.connect(self.db_path) as db:
try:
await db.execute(
'INSERT INTO users (name, email) VALUES (?, ?)',
(name, email)
)
await db.commit()
print(f"ユーザー '{name}' を追加しました")
return True
except aiosqlite.IntegrityError:
print(f"エラー: メールアドレス '{email}' は既に存在します")
return False
async def add_post(self, user_name, title, content):
"""投稿を追加"""
async with aiosqlite.connect(self.db_path) as db:
# ユーザーIDを取得
cursor = await db.execute(
'SELECT id FROM users WHERE name = ?',
(user_name,)
)
user = await cursor.fetchone()
if user:
user_id = user[0]
await db.execute(
'INSERT INTO posts (user_id, title, content) VALUES (?, ?, ?)',
(user_id, title, content)
)
await db.commit()
print(f"投稿 '{title}' を追加しました")
return True
else:
print(f"エラー: ユーザー '{user_name}' が見つかりません")
return False
async def get_user_posts(self, user_name):
"""ユーザーの投稿を取得"""
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute('''
SELECT p.title, p.content, p.created_at
FROM posts p
JOIN users u ON p.user_id = u.id
WHERE u.name = ?
ORDER BY p.created_at DESC
''', (user_name,))
posts = await cursor.fetchall()
return posts
async def batch_add_users(self, users_data):
"""複数ユーザーを一括追加"""
async with aiosqlite.connect(self.db_path) as db:
for name, email in users_data:
try:
await db.execute(
'INSERT INTO users (name, email) VALUES (?, ?)',
(name, email)
)
print(f"ユーザー '{name}' を追加しました")
except aiosqlite.IntegrityError:
print(f"スキップ: メールアドレス '{email}' は既に存在します")
await db.commit()
async def concurrent_operations(self):
"""並行処理のデモ"""
# 複数の非同期タスクを同時に実行
tasks = [
self.add_user(f"ユーザー{i}", f"user{i}@example.com")
for i in range(5)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
print(f"並行処理完了: {len([r for r in results if r])}件成功")
class SyncDatabaseManager:
"""比較用の同期バージョン"""
def __init__(self, db_path='sync_example.db'):
self.db_path = db_path
def init_database(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL
)
''')
conn.commit()
def add_users_sync(self, users_data):
start_time = time.time()
with sqlite3.connect(self.db_path) as conn:
for name, email in users_data:
try:
conn.execute(
'INSERT INTO users (name, email) VALUES (?, ?)',
(name, email)
)
except sqlite3.IntegrityError:
pass
conn.commit()
return time.time() - start_time
async def performance_comparison():
"""非同期と同期のパフォーマンス比較"""
print("=== パフォーマンス比較 ===")
# 非同期バージョン
async_manager = AsyncDatabaseManager('async_perf.db')
await async_manager.init_database()
users_data = [(f"TestUser{i}", f"test{i}@example.com") for i in range(100)]
start_time = time.time()
await async_manager.batch_add_users(users_data)
async_duration = time.time() - start_time
print(f"非同期処理時間: {async_duration:.3f}秒")
# 同期バージョン
sync_manager = SyncDatabaseManager('sync_perf.db')
sync_manager.init_database()
sync_duration = sync_manager.add_users_sync(users_data)
print(f"同期処理時間: {sync_duration:.3f}秒")
improvement = ((sync_duration - async_duration) / sync_duration) * 100
print(f"改善率: {improvement:.1f}%")
async def main():
"""メイン実行関数"""
manager = AsyncDatabaseManager()
# データベース初期化
await manager.init_database()
# ユーザー追加
await manager.add_user("田中太郎", "taro@example.com")
await manager.add_user("佐藤花子", "hanako@example.com")
# 投稿追加
await manager.add_post("田中太郎", "Pythonの非同期処理", "asyncioを使った非同期処理について...")
await manager.add_post("佐藤花子", "データベース設計", "効率的なデータベース設計方法...")
# ユーザーの投稿を取得
print("\n=== 田中太郎の投稿 ===")
posts = await manager.get_user_posts("田中太郎")
for title, content, created_at in posts:
print(f"タイトル: {title}")
print(f"内容: {content[:50]}...")
print(f"投稿日: {created_at}\n")
# 並行処理デモ
print("\n=== 並行処理デモ ===")
await manager.concurrent_operations()
# パフォーマンス比較
await performance_comparison()
if __name__ == "__main__":
asyncio.run(main())
上級3: データベースマイグレーションシステム
import sqlite3
import os
import hashlib
from datetime import datetime
class MigrationSystem:
def __init__(self, db_path, migrations_dir='migrations'):
self.db_path = db_path
self.migrations_dir = migrations_dir
self.ensure_migrations_dir()
self.init_migration_table()
def ensure_migrations_dir(self):
"""マイグレーションディレクトリの作成"""
if not os.path.exists(self.migrations_dir):
os.makedirs(self.migrations_dir)
print(f"マイグレーションディレクトリ '{self.migrations_dir}' を作成しました")
def init_migration_table(self):
"""マイグレーション履歴テーブルの初期化"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS migration_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
checksum TEXT NOT NULL,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
def create_migration(self, name, up_sql, down_sql=None):
"""新しいマイグレーションファイルを作成"""
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
filename = f"{timestamp}_{name}.sql"
filepath = os.path.join(self.migrations_dir, filename)
content = f"-- Up Migration\n{up_sql}\n"
if down_sql:
content += f"\n-- Down Migration\n{down_sql}\n"
with open(filepath, 'w', encoding='utf-8') as f:
f.write(content)
print(f"マイグレーションファイル '{filename}' を作成しました")
return filepath
def get_migration_checksum(self, filepath):
"""ファイルのチェックサムを計算"""
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
return hashlib.md5(content.encode()).hexdigest()
def get_applied_migrations(self):
"""適用済みマイグレーションの一覧を取得"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT name, checksum, applied_at
FROM migration_history
ORDER BY id
''')
return {row[0]: row[1] for row in cursor.fetchall()}
def get_pending_migrations(self):
"""未適用のマイグレーションを取得"""
applied = self.get_applied_migrations()
pending = []
if not os.path.exists(self.migrations_dir):
return pending
for filename in sorted(os.listdir(self.migrations_dir)):
if filename.endswith('.sql'):
filepath = os.path.join(self.migrations_dir, filename)
name = filename[:-4] # .sqlを除去
if name not in applied:
pending.append((name, filepath))
else:
# チェックサムの検証
current_checksum = self.get_migration_checksum(filepath)
if applied[name] != current_checksum:
print(f"警告: マイグレーション '{name}' の内容が変更されています")
return pending
def apply_migration(self, name, filepath):
"""マイグレーションを適用"""
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
# UpとDownを分割
parts = content.split('-- Down Migration')
up_sql = parts[0].replace('-- Up Migration', '').strip()
checksum = self.get_migration_checksum(filepath)
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# トランザクション開始
cursor.execute('BEGIN')
# Upマイグレーション実行
for statement in up_sql.split(';'):
statement = statement.strip()
if statement:
cursor.execute(statement)
# 履歴に記録
cursor.execute('''
INSERT INTO migration_history (name, checksum)
VALUES (?, ?)
''', (name, checksum))
conn.commit()
print(f"マイグレーション '{name}' を適用しました")
return True
except sqlite3.Error as e:
print(f"マイグレーション '{name}' の適用に失敗: {e}")
return False
def rollback_migration(self, name):
"""特定のマイグレーションをロールバック"""
filepath = None
for filename in os.listdir(self.migrations_dir):
if filename.startswith(name) or filename.endswith(f"{name}.sql"):
filepath = os.path.join(self.migrations_dir, filename)
break
if not filepath:
print(f"マイグレーション '{name}' のファイルが見つかりません")
return False
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
# Downマイグレーションを抽出
if '-- Down Migration' in content:
down_sql = content.split('-- Down Migration')[1].strip()
else:
print(f"マイグレーション '{name}' にDownスクリプトがありません")
return False
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('BEGIN')
# Downマイグレーション実行
for statement in down_sql.split(';'):
statement = statement.strip()
if statement:
cursor.execute(statement)
# 履歴から削除
cursor.execute('DELETE FROM migration_history WHERE name = ?', (name,))
conn.commit()
print(f"マイグレーション '{name}' をロールバックしました")
return True
except sqlite3.Error as e:
print(f"ロールバックに失敗: {e}")
return False
def migrate(self):
"""未適用のマイグレーションをすべて適用"""
pending = self.get_pending_migrations()
if not pending:
print("適用するマイグレーションはありません")
return
print(f"{len(pending)}件のマイグレーションを適用します...")
for name, filepath in pending:
success = self.apply_migration(name, filepath)
if not success:
print("マイグレーションを中止します")
return
print("すべてのマイグレーションを適用しました")
def status(self):
"""マイグレーションの状態を表示"""
applied = self.get_applied_migrations()
pending = self.get_pending_migrations()
print("=== マイグレーション状態 ===")
print(f"適用済み: {len(applied)}件")
print(f"未適用: {len(pending)}件")
if applied:
print("\n適用済みマイグレーション:")
for name in sorted(applied.keys()):
print(f" ✓ {name}")
if pending:
print("\n未適用マイグレーション:")
for name, _ in pending:
print(f" ○ {name}")
def demo_migration_system():
"""マイグレーションシステムのデモ"""
db_path = 'migration_demo.db'
# 既存のデータベースファイルを削除
if os.path.exists(db_path):
os.remove(db_path)
migration_system = MigrationSystem(db_path)
# マイグレーション1: ユーザーテーブル作成
up_sql1 = '''
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_users_username ON users(username);
'''
down_sql1 = '''
DROP INDEX IF EXISTS idx_users_username;
DROP TABLE IF EXISTS users;
'''
migration_system.create_migration('create_users_table', up_sql1, down_sql1)
# マイグレーション2: 投稿テーブル作成
up_sql2 = '''
CREATE TABLE posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
title TEXT NOT NULL,
content TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id)
);
CREATE INDEX idx_posts_user_id ON posts(user_id);
CREATE INDEX idx_posts_created_at ON posts(created_at);
'''
down_sql2 = '''
DROP INDEX IF EXISTS idx_posts_created_at;
DROP INDEX IF EXISTS idx_posts_user_id;
DROP TABLE IF EXISTS posts;
'''
migration_system.create_migration('create_posts_table', up_sql2, down_sql2)
# マイグレーション3: プロファイルテーブル追加
up_sql3 = '''
CREATE TABLE profiles (
user_id INTEGER PRIMARY KEY,
bio TEXT,
avatar_url TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id)
);
'''
down_sql3 = '''
DROP TABLE IF EXISTS profiles;
'''
migration_system.create_migration('create_profiles_table', up_sql3, down_sql3)
# 状態確認とマイグレーション実行
print("\n" + "="*50)
migration_system.status()
print("\n" + "="*50)
migration_system.migrate()
print("\n" + "="*50)
migration_system.status()
# ロールバックのデモ
print("\n" + "="*50)
print("最後のマイグレーションをロールバックします...")
# 適用済みマイグレーションから最後のものを取得
applied = migration_system.get_applied_migrations()
if applied:
last_migration = sorted(applied.keys())[-1]
migration_system.rollback_migration(last_migration)
print("\n" + "="*50)
migration_system.status()
if __name__ == "__main__":
demo_migration_system()