セキュリティ対策:CSRF・XSS・SQLインジェクション
2026-03-08はじめに
現代のWebアプリケーション開発において、セキュリティ対策は単なる「追加機能」ではなく、開発プロセスの核心部分として捉える必要があります。特にFlaskのような軽量フレームワークを使用する場合、デフォルトでは最小限のセキュリティ機能しか提供されていないため、開発者自身が適切な対策を実装することが不可欠です。
CSRF(クロスサイトリクエストフォージェリ)対策
2023年に公開されたOWASP Top 10では、従来から指摘されているクロスサイトスクリプティング(XSS)、SQLインジェクション、クロスサイトリクエストフォージェリ(CSRF)などの古典的な攻撃手法が依然として重大な脅威として挙げられています。これらの脆弱性を適切に対処しない場合、以下のような深刻な結果を招く可能性があります。
- ユーザーデータの漏洩
- 認証情報の盗難
- サービスの停止
- 法的責任の発生
- 企業イメージの毀損
本記事では、Flaskアプリケーションにおける三大セキュリティ脅威であるCSRF、XSS、SQLインジェクションへの対策を、実践的なコード例と共に詳細に解説します。各対策について理論的な背景から具体的な実装方法、テスト手法までを網羅的に説明します。
CSRF攻撃の仕組みとリスク
CSRF攻撃は、悪意あるウェブサイトがユーザーのブラウザを利用して、ユーザーが認証済みの別のウェブサイトに意図しないリクエストを送信させる攻撃手法です。例えば、ユーザーがオンラインバンキングにログインした状態で悪意あるサイトを閲覧した場合、そのサイトからユーザーの知らない間に送金リクエストが送信される可能性があります。
CSRF攻撃が成立するための条件
- ユーザーが標的サイトにログイン済みである(セッションが有効)
- 標的サイトが重要な操作をGETリクエストで受け付けている、またはフォームの検証が不十分
- 攻撃者が重要な操作のリクエストパラメータを推測可能
FlaskにおけるCSRF対策の基本実装
Flaskでは、Flask-WTF拡張を使用してCSRF対策を実装するのが一般的です。以下に基本的な実装方法を示します。
# app.py - CSRF対策を組み込んだFlaskアプリケーション
from flask import Flask, render_template, request, jsonify, session
from flask_wtf.csrf import CSRFProtect, generate_csrf
import secrets
app = Flask(__name__)
# セキュアな秘密鍵の生成と設定
app.config['SECRET_KEY'] = secrets.token_hex(32)
app.config['WTF_CSRF_SECRET_KEY'] = secrets.token_hex(32)
# CSRF保護の有効化
csrf = CSRFProtect(app)
# 開発環境でのみCSRFチェックを除外するエンドポイントを設定
if app.debug:
csrf.exempt('debug_info')
@app.route('/')
def home():
return render_template('index.html')
@app.route('/api/sensitive-action', methods=['POST'])
def sensitive_action():
"""機密性の高い操作を実行するエンドポイント"""
# CSRFトークンは自動的に検証される
data = request.get_json()
# ビジネスロジックの実行
return jsonify({'status': 'success', 'message': '操作が完了しました'})
@app.route('/api/csrf-token', methods=['GET'])
def get_csrf_token():
"""フロントエンド用にCSRFトークンを取得するエンドポイント"""
token = generate_csrf()
return jsonify({'csrf_token': token})
# CSRF保護を除外すべきエンドポイントの例
@app.route('/webhook/external-service', methods=['POST'])
@csrf.exempt
def external_webhook():
"""
外部サービスからのWebhook受信エンドポイント
CSRF保護を除外(外部サービスはCSRFトークンを持たないため)
"""
# 代わりに署名検証などの別の認証方法を実装
signature = request.headers.get('X-Signature')
# 署名検証ロジック
return jsonify({'status': 'received'})
if __name__ == '__main__':
app.run(debug=True)
フォームとAJAXリクエストでのCSRFトークン使用
フロントエンドでCSRFトークンを適切に扱う方法。
<!-- templates/form.html - フォームでのCSRFトークン使用 -->
<!DOCTYPE html>
<html lang="ja">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>CSRF保護フォーム</title>
</head>
<body>
<h1>ユーザー設定更新</h1>
<!-- 方法1: 隠しフィールドとしてトークンを埋め込む -->
<form method="POST" action="/update-profile">
{{ form.csrf_token }}
<div>
<label for="username">ユーザー名:</label>
<input type="text" id="username" name="username" required>
</div>
<div>
<label for="email">メールアドレス:</label>
<input type="email" id="email" name="email" required>
</div>
<button type="submit">更新</button>
</form>
<!-- 方法2: メタタグからトークンを取得 -->
<meta name="csrf-token" content="{{ csrf_token() }}">
<script>
// AJAXリクエストでのCSRFトークン使用
document.addEventListener('DOMContentLoaded', function() {
// メタタグからトークンを取得
const csrfToken = document
.querySelector('meta[name="csrf-token"]')
.getAttribute('content');
// すべてのAJAXリクエストにトークンを自動的に追加
const originalFetch = window.fetch;
window.fetch = function(url, options = {}) {
options.headers = options.headers || {};
options.headers['X-CSRF-Token'] = csrfToken;
return originalFetch(url, options);
};
// 個別のリクエストに手動で追加
document.getElementById('ajax-button')
.addEventListener('click', function() {
fetch('/api/sensitive-action', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-CSRF-Token': csrfToken
},
body: JSON.stringify({ action: 'perform' })
})
.then(response => response.json())
.then(data => console.log(data));
});
});
</script>
<button id="ajax-button">AJAXリクエスト送信</button>
</body>
</html>
二重送信防止トークンの実装
CSRF対策に加えて、二重送信防止も重要な対策です。
# double_submit.py - 二重送信防止機能
import hashlib
import time
from flask import session, request, abort
class DoubleSubmitPrevention:
"""二重送信防止クラス"""
@staticmethod
def generate_token():
"""ユニークなトークンを生成"""
token = hashlib.sha256(
f"{session.sid}{time.time()}{secrets.token_hex(8)}".encode()
).hexdigest()
session['_submit_token'] = token
return token
@staticmethod
def validate_token():
"""トークンの検証"""
if request.method in ['POST', 'PUT', 'DELETE', 'PATCH']:
token = request.form.get('_submit_token') or \
request.headers.get('X-Submit-Token') or \
request.args.get('_submit_token')
if not token or token != session.get('_submit_token'):
abort(400, '無効なリクエストまたは二重送信です')
# トークンを使用した後は削除
session.pop('_submit_token', None)
# デコレータとして使用
def prevent_double_submit(f):
"""二重送信防止デコレータ"""
def decorated_function(*args, **kwargs):
DoubleSubmitPrevention.validate_token()
return f(*args, **kwargs)
return decorated_function
@app.route('/api/order', methods=['POST'])
@prevent_double_submit
def place_order():
"""注文処理(二重送信防止付き)"""
# 注文処理ロジック
return jsonify({'status': 'ordered'})
XSS(クロスサイトスクリプティング)対策
XSS攻撃の種類と影響
XSS攻撃は、悪意あるスクリプトをウェブページに注入する攻撃手法で、主に3つのタイプがあります。
- 反射型XSS:ユーザーがクリックしたURLパラメータに含まれるスクリプトが実行される
- 格納型XSS:データベースなどに保存された悪意あるスクリプトが表示時に実行される
- DOMベースXSS:クライアント側のJavaScriptによるDOM操作で発生する
XSS攻撃による影響
- セッションハイジャック
- 機密情報の漏洩
- 偽のコンテンツ表示
- マルウェア配布
入力値の検証とサニタイズ
XSS対策の第一歩は、入力値の適切な検証とサニタイズです。
# xss_prevention.py - XSS対策モジュール
import re
import html
from bleach import clean, linkify
from flask import request
class XSSPrevention:
"""XSS対策クラス"""
# 許可するHTMLタグと属性の定義
ALLOWED_TAGS = [
'a', 'b', 'i', 'u', 'em', 'strong', 'p', 'br',
'ul', 'ol', 'li', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6'
]
ALLOWED_ATTRIBUTES = {
'a': ['href', 'title', 'target', 'rel'],
'img': ['src', 'alt', 'width', 'height'],
'*': ['class', 'id']
}
# 安全なURLスキーム
ALLOWED_SCHEMES = ['http', 'https', 'mailto', 'tel']
@staticmethod
def validate_input(input_string, max_length=1000, pattern=None):
"""
入力値の基本検証
Args:
input_string: 検証する文字列
max_length: 最大文字数
pattern: 正規表現パターン(オプション)
Returns:
検証済みの文字列または例外
"""
if not isinstance(input_string, str):
raise ValueError("入力値は文字列である必要があります")
# 長さチェック
if len(input_string) > max_length:
raise ValueError(f"入力値は{max_length}文字以内である必要があります")
# 空文字列チェック(必要に応じて)
if not input_string.strip():
raise ValueError("入力値が空です")
# 正規表現パターンによる検証(オプション)
if pattern and not re.match(pattern, input_string):
raise ValueError("入力形式が正しくありません")
return input_string
@staticmethod
def sanitize_html(unsafe_html):
"""
HTMLのサニタイズ(許可されたタグと属性のみを保持)
Args:
unsafe_html: サニタイズ前のHTML文字列
Returns:
サニタイズされた安全なHTML
"""
if not unsafe_html:
return ''
# Bleachを使用したサニタイズ
safe_html = clean(
unsafe_html,
tags=XSSPrevention.ALLOWED_TAGS,
attributes=XSSPrevention.ALLOWED_ATTRIBUTES,
protocols=XSSPrevention.ALLOWED_SCHEMES,
strip=True,
strip_comments=True
)
return safe_html
@staticmethod
def escape_html(unsafe_text):
"""
HTMLエスケープ(すべてのHTMLをテキストとして表示)
Args:
unsafe_text: エスケープ前のテキスト
Returns:
エスケープされた安全なテキスト
"""
return html.escape(unsafe_text)
@staticmethod
def sanitize_url(unsafe_url):
"""
URLのサニタイズ
Args:
unsafe_url: サニタイズ前のURL
Returns:
安全なURLまたはNone
"""
if not unsafe_url:
return None
# 危険なプロトコルのチェック
dangerous_protocols = ['javascript:', 'data:', 'vbscript:']
for protocol in dangerous_protocols:
if unsafe_url.lower().startswith(protocol):
return None
# 相対URLのチェック
if unsafe_url.startswith('/'):
# 内部リンクは許可
return unsafe_url
# 絶対URLの検証
import urllib.parse
parsed = urllib.parse.urlparse(unsafe_url)
if parsed.scheme not in XSSPrevention.ALLOWED_SCHEMES:
return None
return unsafe_url
@staticmethod
def safe_json_response(data):
"""
安全なJSONレスポンスの作成
Args:
data: レスポンスデータ
Returns:
安全なJSONレスポンス
"""
from flask import jsonify
def sanitize_value(value):
"""再帰的に値をサニタイズ"""
if isinstance(value, str):
return XSSPrevention.escape_html(value)
elif isinstance(value, dict):
return {k: sanitize_value(v) for k, v in value.items()}
elif isinstance(value, list):
return [sanitize_value(item) for item in value]
else:
return value
sanitized_data = sanitize_value(data)
response = jsonify(sanitized_data)
# 安全なContent-Typeの設定
response.headers['Content-Type'] = 'application/json; charset=utf-8'
# XSS対策のHTTPヘッダー
response.headers['X-Content-Type-Options'] = 'nosniff'
return response
# 使用例
@app.route('/api/comment', methods=['POST'])
def post_comment():
"""コメント投稿エンドポイント(XSS対策付き)"""
try:
data = request.get_json()
# 入力値の検証
username = XSSPrevention.validate_input(
data.get('username'),
max_length=50,
pattern=r'^[a-zA-Z0-9_\-]{3,50}$'
)
# HTMLコンテンツのサニタイズ
content = data.get('content', '')
if data.get('allow_html', False):
# 制限付きでHTMLを許可
safe_content = XSSPrevention.sanitize_html(content)
else:
# 通常はHTMLをエスケープ
safe_content = XSSPrevention.escape_html(content)
# URLのサニタイズ
website = XSSPrevention.sanitize_url(data.get('website'))
# データベースに保存(サニタイズ済みのデータのみ)
comment = {
'username': username,
'content': safe_content,
'website': website,
'created_at': datetime.now()
}
# 保存処理...
return XSSPrevention.safe_json_response({
'status': 'success',
'message': 'コメントが投稿されました',
'comment': comment
})
except ValueError as e:
return jsonify({'status': 'error', 'message': str(e)}), 400
セキュリティHTTPヘッダーの設定
適切なHTTPヘッダー設定もXSS対策に有効です。
# security_headers.py - セキュリティHTTPヘッダー設定
from flask import Flask
from datetime import timedelta
def add_security_headers(response):
"""セキュリティ関連のHTTPヘッダーを追加"""
# Content Security Policy (CSP) - 最も重要なXSS対策ヘッダー
csp_policy = (
"default-src 'self'; "
"script-src 'self' https://cdn.example.com 'unsafe-inline' 'unsafe-eval'; "
"style-src 'self' https://cdn.example.com 'unsafe-inline'; "
"img-src 'self' https://cdn.example.com data:; "
"font-src 'self' https://cdn.example.com; "
"connect-src 'self' https://api.example.com; "
"frame-ancestors 'none'; "
"form-action 'self'; "
"base-uri 'self';"
)
response.headers['Content-Security-Policy'] = csp_policy
# XSS対策ヘッダー(レガシーブラウザ用)
response.headers['X-XSS-Protection'] = '1; mode=block'
# MIMEタイプのスニッフィング防止
response.headers['X-Content-Type-Options'] = 'nosniff'
# クリックジャッキング対策
response.headers['X-Frame-Options'] = 'DENY'
# Referrer Policy
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
# キャッシュ制御(機密情報を含むページ用)
if 'private' in response.headers.get('Cache-Control', ''):
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0'
return response
# Flaskアプリケーションでの使用
app = Flask(__name__)
app.after_request(add_security_headers)
# または、より高度な設定
class SecurityHeaders:
"""動的なセキュリティヘッダー設定"""
@staticmethod
def get_csp_policy(role='default'):
"""ロールに応じたCSPポリシーを生成"""
policies = {
'default': (
"default-src 'self'; "
"script-src 'self'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data:;"
),
'admin': (
"default-src 'self'; "
"script-src 'self' https://admin-cdn.example.com; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data:;"
),
'public': (
"default-src 'self' https://*.example.com; "
"script-src 'self' https://cdn.example.com; "
"style-src 'self' https://cdn.example.com 'unsafe-inline'; "
"img-src 'self' https://cdn.example.com data:;"
)
}
return policies.get(role, policies['default'])
@app.after_request
def dynamic_security_headers(response):
"""動的なセキュリティヘッダー設定"""
user_role = session.get('role', 'default')
response.headers['Content-Security-Policy'] = SecurityHeaders.get_csp_policy(user_role)
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-Content-Type-Options'] = 'nosniff'
# HSTS (HTTPS Strict Transport Security)
if request.is_secure:
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
return response
SQLインジェクション対策
SQLインジェクション攻撃のメカニズム
SQLインジェクションは、ユーザー入力を介して悪意あるSQL文を実行させる攻撃です。適切な対策が講じられていない場合、以下のような被害が発生します。
- データベース内の全データの閲覧、改ざん、削除
- 認証回避による管理者権限の取得
- ファイルシステムへのアクセス
- サーバーOSコマンドの実行
典型的なSQLインジェクションの例
-- 脆弱なログイン処理
SELECT * FROM users WHERE username = 'admin' OR '1'='1' -- AND password = '...'
パラメータ化クエリの実装
SQLインジェクション対策の基本は、パラメータ化クエリ(プリペアドステートメント)の使用です。
# sql_injection_prevention.py - SQLインジェクション対策
import sqlite3
import pymysql
from contextlib import contextmanager
from flask import g
import re
class DatabaseSecurity:
"""データベースセキュリティクラス"""
@staticmethod
def get_db_connection():
"""安全なデータベース接続を取得"""
if 'db' not in g:
g.db = sqlite3.connect('database.db')
# 行を辞書形式で取得できるように設定
g.db.row_factory = sqlite3.Row
return g.db
@staticmethod
@contextmanager
def safe_cursor():
"""安全なカーソルのコンテキストマネージャ"""
db = DatabaseSecurity.get_db_connection()
cursor = db.cursor()
try:
yield cursor
db.commit()
except Exception as e:
db.rollback()
raise e
finally:
cursor.close()
@staticmethod
def safe_execute(query, params=None, fetchone=False, fetchall=False):
"""
安全なSQL実行
Args:
query: SQLクエリ(パラメータは?または%sプレースホルダー)
params: パラメータのタプルまたはリスト
fetchone: 1行取得するか
fetchall: 全行取得するか
Returns:
クエリ結果
"""
if params is None:
params = ()
with DatabaseSecurity.safe_cursor() as cursor:
cursor.execute(query, params)
if fetchone:
return cursor.fetchone()
elif fetchall:
return cursor.fetchall()
else:
return cursor.lastrowid
@staticmethod
def validate_sql_identifier(identifier):
"""
SQL識別子(テーブル名、カラム名)の検証
Args:
identifier: 検証する識別子
Returns:
検証済みの識別子または例外
"""
if not identifier:
raise ValueError("識別子が空です")
# 許可する文字パターン
pattern = r'^[a-zA-Z_][a-zA-Z0-9_]*$'
if not re.match(pattern, identifier):
raise ValueError(f"無効な識別子: {identifier}")
# 予約語のチェック(簡易版)
reserved_words = {
'SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP', 'CREATE',
'ALTER', 'TABLE', 'FROM', 'WHERE', 'AND', 'OR', 'NOT'
}
if identifier.upper() in reserved_words:
raise ValueError(f"SQL予約語は識別子に使用できません: {identifier}")
return identifier
# SQLインジェクション対策の実装例
class UserRepository:
"""ユーザーデータのリポジトリ(SQLインジェクション対策済み)"""
@staticmethod
def find_by_credentials(username, password):
"""
認証情報によるユーザー検索(安全な実装)
Args:
username: ユーザー名
password: パスワード(ハッシュ化済み)
Returns:
ユーザー情報またはNone
"""
# パラメータ化クエリを使用
query = """
SELECT id, username, email, role, created_at
FROM users
WHERE username = ? AND password_hash = ? AND active = 1
"""
try:
result = DatabaseSecurity.safe_execute(
query,
params=(username, password),
fetchone=True
)
return dict(result) if result else None
except Exception as e:
# ログに記録するが、詳細なエラー情報は返さない
app.logger.error(f"認証エラー: {str(e)}")
return None
@staticmethod
def search_users(keyword, limit=10, offset=0):
"""
ユーザー検索(動的クエリの安全な実装)
Args:
keyword: 検索キーワード
limit: 取得制限
offset: オフセット
Returns:
検索結果のリスト
"""
# 基本クエリ
base_query = """
SELECT id, username, email, created_at
FROM users
WHERE active = 1
"""
params = []
# キーワード検索(安全な実装)
if keyword:
# LIKE検索用にワイルドカードを追加
base_query += " AND (username LIKE ? OR email LIKE ?)"
like_keyword = f"%{keyword}%"
params.extend([like_keyword, like_keyword])
# ソートとページネーション
base_query += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
try:
results = DatabaseSecurity.safe_execute(
base_query,
params=tuple(params),
fetchall=True
)
return [dict(row) for row in results]
except Exception as e:
app.logger.error(f"ユーザー検索エラー: {str(e)}")
return []
@staticmethod
def update_user_profile(user_id, update_data):
"""
ユーザープロフィール更新(動的SET句の安全な実装)
Args:
user_id: ユーザーID
update_data: 更新データの辞書
Returns:
更新成功可否
"""
if not update_data:
return False
# 許可する更新可能なカラム
allowed_columns = {'username', 'email', 'display_name', 'bio'}
# セット句とパラメータの構築
set_clauses = []
params = []
for column, value in update_data.items():
if column in allowed_columns:
# 識別子の検証
try:
validated_column = DatabaseSecurity.validate_sql_identifier(column)
set_clauses.append(f"{validated_column} = ?")
params.append(value)
except ValueError:
# 無効なカラム名はスキップ
continue
if not set_clauses:
return False
# 完全なクエリの構築
query = f"UPDATE users SET {', '.join(set_clauses)} WHERE id = ?"
params.append(user_id)
try:
DatabaseSecurity.safe_execute(query, params=tuple(params))
return True
except Exception as e:
app.logger.error(f"プロフィール更新エラー: {str(e)}")
return False
# SQLAlchemyを使用した場合の安全な実装
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import text
db = SQLAlchemy()
class SecureDatabaseService:
"""SQLAlchemyを使用した安全なデータベース操作"""
@staticmethod
def raw_sql_query_with_params():
"""生SQLクエリの安全な実行例"""
# 安全でない例(絶対に使用しない)
# user_id = request.args.get('id')
# query = f"SELECT * FROM users WHERE id = {user_id}" # インジェクション脆弱性
# 安全な実装
user_id = request.args.get('id')
# パラメータ化クエリ
query = text("SELECT * FROM users WHERE id = :user_id")
# 安全な実行
result = db.session.execute(query, {'user_id': user_id})
users = result.fetchall()
return users
@staticmethod
def dynamic_query_building(search_filters):
"""動的クエリ構築の安全な方法"""
from sqlalchemy.sql import and_, or_
# モデルの定義(例)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
active = db.Column(db.Boolean, default=True)
# 動的なフィルター構築
filters = []
if search_filters.get('username'):
filters.append(User.username.ilike(f"%{search_filters['username']}%"))
if search_filters.get('email'):
filters.append(User.email.ilike(f"%{search_filters['email']}%"))
if search_filters.get('active') is not None:
filters.append(User.active == search_filters['active'])
# 安全なクエリ実行
if filters:
query = User.query.filter(and_(*filters))
else:
query = User.query
# ページネーション
page = search_filters.get('page', 1)
per_page = search_filters.get('per_page', 10)
results = query.paginate(page=page, per_page=per_page, error_out=False)
return {
'items': [user.to_dict() for user in results.items],
'total': results.total,
'pages': results.pages,
'current_page': results.page
}
# ストアドプロシージャの安全な呼び出し
class StoredProcedureService:
"""ストアドプロシージャの安全な実行"""
@staticmethod
def call_user_statistics(start_date, end_date):
"""
ストアドプロシージャの呼び出し
Args:
start_date: 開始日
end_date: 終了日
Returns:
統計結果
"""
# パラメータの検証
if not start_date or not end_date:
raise ValueError("開始日と終了日を指定してください")
# ストアドプロシージャの呼び出し
query = text("CALL GetUserStatistics(:start_date, :end_date)")
try:
result = db.session.execute(query, {
'start_date': start_date,
'end_date': end_date
})
# 複数の結果セットがある場合
statistics = []
for row in result:
statistics.append(dict(row))
return statistics
except Exception as e:
app.logger.error(f"ストアドプロシージャ実行エラー: {str(e)}")
return []
入力値の厳格な検証と正規化
SQLインジェクション対策には、入力値の検証も重要です。
# input_validation.py - 入力値の厳格な検証
import re
from datetime import datetime
class StrictInputValidator:
"""厳格な入力値検証クラス"""
# 各種パターンの定義
PATTERNS = {
'username': r'^[a-zA-Z0-9_\-.]{3,50}$',
'email': r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
'password': r'^.{8,100}$', # 実際にはもっと厳格なルールを推奨
'integer': r'^-?\d+$',
'positive_integer': r'^\d+$',
'decimal': r'^-?\d+(?:\.\d+)?$',
'date_iso': r'^\d{4}-\d{2}-\d{2}$',
'datetime_iso': r'^\d{4}-\d{2}-\d{2}[T\s]\d{2}:\d{2}:\d{2}(?:\.\d+)?(?:Z|[+-]\d{2}:?\d{2})?$',
'safe_string': r'^[a-zA-Z0-9\s\-_,.!?@()]{1,500}$',
'url': r'^https?://[a-zA-Z0-9\-._~:/?#[\]@!$&\'()*+,;=]+$'
}
@staticmethod
def validate_pattern(value, pattern_name, optional=False):
"""
パターンによる検証
Args:
value: 検証する値
pattern_name: パターン名
optional: オプショナルかどうか
Returns:
検証済みの値または例外
"""
if optional and (value is None or value == ''):
return None
if not isinstance(value, str):
raise ValueError(f"{pattern_name}は文字列である必要があります")
pattern = StrictInputValidator.PATTERNS.get(pattern_name)
if not pattern:
raise ValueError(f"未知のパターン名: {pattern_name}")
if not re.match(pattern, value):
raise ValueError(f"{pattern_name}の形式が正しくありません")
return value
@staticmethod
def validate_integer(value, min_value=None, max_value=None):
"""整数の検証と範囲チェック"""
if value is None:
raise ValueError("値が指定されていません")
# 文字列の場合は整数に変換
if isinstance(value, str):
if not re.match(StrictInputValidator.PATTERNS['integer'], value):
raise ValueError("整数の形式が正しくありません")
value = int(value)
if not isinstance(value, int):
raise ValueError("整数である必要があります")
# 範囲チェック
if min_value is not None and value < min_value:
raise ValueError(f"値は{min_value}以上である必要があります")
if max_value is not None and value > max_value:
raise ValueError(f"値は{max_value}以下である必要があります")
return value
@staticmethod
def validate_sql_input(value, max_length=None, allow_null=False):
"""
SQL入力値の厳格な検証
Args:
value: 検証する値
max_length: 最大長
allow_null: NULLを許可するか
Returns:
検証済みの値
"""
if allow_null and value is None:
return None
if value is None:
raise ValueError("値が指定されていません")
# 文字列に変換
str_value = str(value)
# 長さチェック
if max_length and len(str_value) > max_length:
raise ValueError(f"値は{max_length}文字以内である必要があります")
# 危険な文字の検出(簡易版)
dangerous_patterns = [
r'[\'"]', # 引用符
r';', # ステートメント区切り
r'--', # SQLコメント
r'/\*.*\*/', # 複数行コメント
r'(?i)(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER|UNION|EXEC)',
]
for pattern in dangerous_patterns:
if re.search(pattern, str_value):
# 本番環境ではログに記録
app.logger.warning(f"危険な文字パターンを検出: {pattern} in {str_value[:50]}")
# 例外を発生させるか、安全な値に変換
raise ValueError("無効な文字が含まれています")
return str_value
@staticmethod
def normalize_sql_value(value, column_type='string'):
"""
SQL値の正規化
Args:
value: 正規化する値
column_type: カラムのデータタイプ
Returns:
正規化された値
"""
if value is None:
return None
type_handlers = {
'integer': lambda x: int(x) if str(x).isdigit() else None,
'float': lambda x: float(x) if re.match(r'^-?\d+(?:\.\d+)?$', str(x)) else None,
'boolean': lambda x: bool(int(x)) if str(x).isdigit() else str(x).lower() in ['true', '1', 'yes'],
'date': lambda x: datetime.strptime(x, '%Y-%m-%d').date() if re.match(r'^\d{4}-\d{2}-\d{2}$', str(x)) else None,
'string': lambda x: str(x)[:255] # 長さ制限
}
handler = type_handlers.get(column_type, type_handlers['string'])
try:
return handler(value)
except (ValueError, TypeError):
return None
# 使用例
@app.route('/api/users/', methods=['GET'])
def get_user(user_id):
"""ユーザー情報取得(安全な実装)"""
try:
# IDの検証
validated_id = StrictInputValidator.validate_integer(
user_id, min_value=1, max_value=1000000
)
# 安全なクエリ実行
with DatabaseSecurity.safe_cursor() as cursor:
cursor.execute(
"SELECT id, username, email FROM users WHERE id = ? AND active = 1",
(validated_id,)
)
user = cursor.fetchone()
if not user:
return jsonify({'error': 'ユーザーが見つかりません'}), 404
return jsonify(dict(user))
except ValueError as e:
return jsonify({'error': str(e)}), 400
統合的なセキュリティテスト
セキュリティテストの自動化
実際の攻撃シナリオを想定したテストを自動化します。
# security_tests.py - セキュリティテスト
import pytest
import json
from app import app
class TestSecurityVulnerabilities:
"""セキュリティ脆弱性テストクラス"""
@pytest.fixture
def client(self):
app.config['TESTING'] = True
app.config['WTF_CSRF_ENABLED'] = True
with app.test_client() as client:
yield client
def test_csrf_protection(self, client):
"""CSRF保護のテスト"""
# CSRFトークンなしでリクエスト
response = client.post('/api/sensitive-action', data={})
assert response.status_code in [400, 403]
# 無効なCSRFトークンでリクエスト
response = client.post('/api/sensitive-action',
headers={'X-CSRF-Token': 'invalid_token'},
data={}
)
assert response.status_code in [400, 403]
def test_xss_injection(self, client):
"""XSS攻撃のテスト"""
xss_payloads = [
'',
'
',
'javascript:alert(document.cookie)',
'" onmouseover="alert(1)"',
'