Flaskで画像処理機能
2026-02-27はじめに
Webアプリケーションにおいて、画像処理は重要な機能の一つです。特にユーザーがアップロードした画像を適切なサイズで表示したり、サムネイルを生成したりする機能は、ユーザー体験の向上やサーバーの負荷軽減に大きく貢献します。本章では、Pythonの画像処理ライブラリであるPillowを使った画像加工、サムネイル生成、画像表示の最適化について学びます。これにより、前章で実装したファイルアップロード機能をさらに強化していきます。
Pillowを使った画像加工
まずは、Pillowライブラリをインストールし、基本的な画像処理の方法を見ていきましょう。
Pillowのインストール
Pillowは、Pythonで画像処理を行うための非常に代表的なオープンソースライブラリです。旧称であるPIL (Python Imaging Library) からフォークされ、現在でも活発に開発されています。画像の読み込み、リサイズ、回転、トリミング、フィルタ処理、保存などをシンプルかつ強力に実行できます。
pipコマンドでインストールします。
pip install Pillow
pip install flask
基本的な画像処理の実装
以下は、アップロードされた画像を加工する基本的な例です。
# app/utils/image_processor.py
import os
from PIL import Image, ImageOps
from io import BytesIO
import secrets
class ImageProcessor:
def __init__(self, upload_folder):
self.upload_folder = upload_folder
self.allowed_extensions = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
def allowed_file(self, filename):
"""許可されたファイル形式かチェック"""
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in self.allowed_extensions
def generate_filename(self, original_filename):
"""ランダムなファイル名を生成"""
random_hex = secrets.token_hex(8)
_, ext = os.path.splitext(original_filename)
return random_hex + ext
def save_image(self, image_file):
"""画像を保存する基本的なメソッド"""
if not self.allowed_file(image_file.filename):
return None
filename = self.generate_filename(image_file.filename)
filepath = os.path.join(self.upload_folder, filename)
# 画像を保存
image = Image.open(image_file)
image.save(filepath)
return filename
def resize_image(self, image_file, max_size=(800, 600)):
"""
画像をリサイズする
max_size: (width, height) のタプル
"""
image = Image.open(image_file)
# 画像の向きを自動修正(Exif情報対応)
image = ImageOps.exif_transpose(image)
# リサイズ(アスペクト比を保持)
image.thumbnail(max_size, Image.Resampling.LANCZOS)
return image
def convert_format(self, image, format='JPEG', quality=85):
"""画像フォーマットを変換"""
if format == 'JPEG' and image.mode in ('RGBA', 'LA', 'P'):
# JPEGはアルファチャンネルに対応していないので背景を白にする
background = Image.new('RGB', image.size, (255, 255, 255))
background.paste(image, mask=image.split()[-1] if image.mode == 'RGBA' else None)
image = background
output = BytesIO()
image.save(output, format=format, quality=quality, optimize=True)
output.seek(0)
return output
画像アップロード処理を行うImageProcessorクラスを定義します。ファイル名のランダム生成や拡張子チェックでセキュリティに配慮し、Pillowを使って画像のリサイズやフォーマット変換を行います。リサイズ時はExif情報で向きを自動修正し、アスペクト比を保持します。JPEG変換時は透過背景を白に変換するなど、実用的な画像処理機能を提供しています。
サムネイル生成
サムネイル生成は、画像一覧表示などで特に有用です。異なるサイズのサムネイルを複数生成することで、様々な画面サイズに最適化できます。
複数サイズのサムネイル生成クラス
四段階の標準サイズを定義し、指定された出力ディレクトリに各サイズのサムネイルを自動生成します。画像の向きをExif情報で修正した後、アスペクト比を保持しながら高品質にリサイズし、可能な場合はWebP形式に変換してファイルサイズを最適化します。
# app/utils/thumbnail_generator.py
import os
from PIL import Image, ImageOps
from pathlib import Path
class ThumbnailGenerator:
# 一般的なサムネイルサイズ定義
THUMBNAIL_SIZES = {
'xs': (100, 100), # 超小さいサムネイル(アイコン用)
'sm': (300, 300), # 小さいサムネイル
'md': (600, 600), # 中くらいサムネイル
'lg': (1200, 1200), # 大きいサムネイル
}
@staticmethod
def generate_thumbnails(image_path, output_dir, sizes=None):
"""
複数のサムネイルを生成する
Args:
image_path: 元画像のパス
output_dir: サムネイル保存ディレクトリ
sizes: 生成するサイズの辞書(デフォルトはTHUMBNAIL_SIZES)
Returns:
生成されたサムネイルのファイルパス辞書
"""
if sizes is None:
sizes = ThumbnailGenerator.THUMBNAIL_SIZES
# 出力ディレクトリがなければ作成
Path(output_dir).mkdir(parents=True, exist_ok=True)
# 元のファイル名と拡張子を取得
filename = Path(image_path).stem
extension = Path(image_path).suffix.lower()
# WebP対応(必要に応じて変換)
if extension in ['.jpg', '.jpeg', '.png']:
output_format = 'WEBP'
output_extension = '.webp'
else:
output_format = None
output_extension = extension
thumbnails = {}
try:
# 画像を開く
with Image.open(image_path) as img:
# 画像の向きを自動修正
img = ImageOps.exif_transpose(img)
# 各サイズのサムネイルを生成
for size_name, dimensions in sizes.items():
# 画像をコピーしてリサイズ
thumb = img.copy()
# リサイズ(アスペクト比を保持)
thumb.thumbnail(dimensions, Image.Resampling.LANCZOS)
# 出力パスを設定
if output_format == 'WEBP':
output_path = os.path.join(output_dir, f"{filename}_{size_name}.webp")
thumb.save(output_path, format='WEBP', quality=80, optimize=True)
else:
output_path = os.path.join(output_dir, f"{filename}_{size_name}{extension}")
thumb.save(output_path, optimize=True)
thumbnails[size_name] = output_path
except Exception as e:
print(f"サムネイル生成エラー: {e}")
return {}
return thumbnails
@staticmethod
def generate_smart_crop(image_path, output_path, size=(400, 400)):
"""
スマートクロップで正方形のサムネイルを生成
Args:
image_path: 元画像のパス
output_path: 出力パス
size: 出力サイズ(デフォルトは400x400)
"""
try:
with Image.open(image_path) as img:
img = ImageOps.exif_transpose(img)
# 画像の中心を基準に正方形にクロップ
width, height = img.size
if width > height:
# 横長の画像
left = (width - height) / 2
top = 0
right = left + height
bottom = height
else:
# 縦長の画像
left = 0
top = (height - width) / 2
right = width
bottom = top + width
# クロップ
img_cropped = img.crop((left, top, right, bottom))
# リサイズ
img_resized = img_cropped.resize(size, Image.Resampling.LANCZOS)
# 保存
img_resized.save(output_path, optimize=True)
except Exception as e:
print(f"スマートクロップエラー: {e}")
スマートクロップ機能では画像の縦横比に応じて中央を基準に正方形に切り出し、指定サイズにリサイズします。前回のImageProcessorが単一画像の保存や変換を担うのに対し、こちらは既存画像から複数サイズのサムネイルを効率的に量産する役割を担っています。
画像の表示最適化
Flaskアプリケーションでの画像処理統合
以下は、Flaskアプリケーションにおける画像アップロードと配信のコントローラーです。アップロードされた画像を検証し、ImageProcessorでリサイズした後、WebP形式に変換して保存することで軽量化を実現します。同時にThumbnailGeneratorで複数サイズのサムネイルを生成し、アップロード完了とともにサムネイル群が自動的に作成されます。
# app/routes/images.py
from flask import Blueprint, render_template, request, jsonify, current_app, send_file
from werkzeug.utils import secure_filename
import os
from app.utils.image_processor import ImageProcessor
from app.utils.thumbnail_generator import ThumbnailGenerator
from pathlib import Path
bp = Blueprint('images', __name__, url_prefix='/images')
def get_upload_folder():
"""アップロードフォルダのパスを取得"""
upload_folder = os.path.join(current_app.root_path, 'static', 'uploads')
Path(upload_folder).mkdir(parents=True, exist_ok=True)
return upload_folder
@bp.route('/upload', methods=['POST'])
def upload_image():
"""画像アップロード処理"""
if 'image' not in request.files:
return jsonify({'error': '画像ファイルがありません'}), 400
image_file = request.files['image']
if image_file.filename == '':
return jsonify({'error': 'ファイルが選択されていません'}), 400
# 画像プロセッサの初期化
upload_folder = get_upload_folder()
processor = ImageProcessor(upload_folder)
if not processor.allowed_file(image_file.filename):
return jsonify({'error': '許可されていないファイル形式です'}), 400
try:
# 画像をリサイズ
image_resized = processor.resize_image(image_file, max_size=(1200, 1200))
# ファイル名を生成
filename = processor.generate_filename(image_file.filename)
# 保存パス
filepath = os.path.join(upload_folder, filename)
# 画像を保存(WebP形式で最適化)
if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
image_resized.save(filepath, 'WEBP', quality=85, optimize=True)
filename = filename.rsplit('.', 1)[0] + '.webp'
filepath = os.path.join(upload_folder, filename)
else:
image_resized.save(filepath, optimize=True)
# サムネイルを生成
thumbnail_dir = os.path.join(upload_folder, 'thumbnails')
thumbnails = ThumbnailGenerator.generate_thumbnails(
filepath,
thumbnail_dir
)
return jsonify({
'success': True,
'filename': filename,
'thumbnails': list(thumbnails.keys())
}), 200
except Exception as e:
current_app.logger.error(f'画像アップロードエラー: {e}')
return jsonify({'error': '画像処理中にエラーが発生しました'}), 500
@bp.route('/serve/')
@bp.route('/serve//')
def serve_image(filename, size='original'):
"""最適化された画像を提供"""
upload_folder = get_upload_folder()
# セキュリティチェック
filename = secure_filename(filename)
if size == 'original':
# 元の画像を提供
filepath = os.path.join(upload_folder, filename)
else:
# サムネイルを提供
thumbnail_dir = os.path.join(upload_folder, 'thumbnails')
# ファイル名から拡張子を除去してサムネイル名を構築
name_without_ext = os.path.splitext(filename)[0]
thumbnail_name = f"{name_without_ext}_{size}.webp"
filepath = os.path.join(thumbnail_dir, thumbnail_name)
# サムネイルがなければ生成
if not os.path.exists(filepath):
original_path = os.path.join(upload_folder, filename)
if os.path.exists(original_path):
ThumbnailGenerator.generate_thumbnails(
original_path,
thumbnail_dir,
sizes={size: ThumbnailGenerator.THUMBNAIL_SIZES.get(size, (300, 300))}
)
if os.path.exists(filepath):
return send_file(filepath)
else:
return jsonify({'error': '画像が見つかりません'}), 404
@bp.route('/optimize', methods=['POST'])
def optimize_existing_images():
"""既存の画像を最適化(バッチ処理)"""
upload_folder = get_upload_folder()
try:
optimized_count = 0
for filename in os.listdir(upload_folder):
filepath = os.path.join(upload_folder, filename)
# ディレクトリはスキップ
if os.path.isdir(filepath):
continue
# 画像ファイルのみ処理
if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
with Image.open(filepath) as img:
# WebP形式に変換して保存
webp_filename = filename.rsplit('.', 1)[0] + '.webp'
webp_path = os.path.join(upload_folder, webp_filename)
# 画像の向きを修正
img = ImageOps.exif_transpose(img)
# サイズが大きすぎる場合はリサイズ
if img.size[0] > 2000 or img.size[1] > 2000:
img.thumbnail((2000, 2000), Image.Resampling.LANCZOS)
# WebPで保存(品質85%、最適化有効)
img.save(webp_path, 'WEBP', quality=85, optimize=True)
# サムネイルを生成
thumbnail_dir = os.path.join(upload_folder, 'thumbnails')
ThumbnailGenerator.generate_thumbnails(webp_path, thumbnail_dir)
optimized_count += 1
return jsonify({
'success': True,
'message': f'{optimized_count}枚の画像を最適化しました'
}), 200
except Exception as e:
current_app.logger.error(f'画像最適化エラー: {e}')
return jsonify({'error': '画像最適化中にエラーが発生しました'}), 500
画像配信エンドポイントは元画像とサムネイルを振り分け、要求されたサイズのサムネイルが未生成の場合はその場で生成するオンデマンド対応も備えています。さらに既存画像を一括最適化するバッチ処理機能も実装されており、巨大画像のリサイズやWebP変換、サムネイル生成をまとめて実行できます。ルーティング、バリデーション、エラーハンドリング、ログ出力まで含めた実践的な実装となっています。
HTMLテンプレートの例
フロントエンドインターフェースはBootstrapを活用したレスポンシブな二段構成で、左側にアップロードフォーム、右側にプレビューエリアを配置しています。
ユーザーは画像選択後、リサイズサイズの指定やサムネイル自動生成の有無を選択でき、フォーム送信は非同期で行われます。ファイル選択時には即座にブラウザ上でプレビューとファイルサイズが表示され、アップロード中はプログレスバーが動作します。
<!-- templates/images/upload.html -->
<!DOCTYPE html>
<html lang="ja">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>画像アップロードと処理</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css" rel="stylesheet">
<style>
.image-preview {
max-width: 100%;
height: auto;
margin: 10px 0;
border: 1px solid #dee2e6;
border-radius: 5px;
}
.thumbnail-grid {
display: grid;
grid-template-columns: repeat(auto-fill, minmax(150px, 1fr));
gap: 10px;
margin-top: 20px;
}
.thumbnail-item {
text-align: center;
}
.thumbnail-img {
width: 100%;
height: 150px;
object-fit: cover;
border-radius: 5px;
}
</style>
</head>
<body>
<div class="container mt-5">
<h1 class="mb-4">画像アップロードと処理</h1>
<div class="row">
<div class="col-md-6">
<div class="card">
<div class="card-header">
画像アップロード
</div>
<div class="card-body">
<form id="uploadForm" enctype="multipart/form-data">
<div class="mb-3">
<label for="imageFile" class="form-label">画像を選択</label>
<input class="form-control" type="file" id="imageFile" name="image" accept="image/*">
</div>
<div class="mb-3">
<label class="form-label">最大サイズ</label>
<select class="form-select" id="maxSize">
<option value="800,600">800×600px</option>
<option value="1200,900">1200×900px</option>
<option value="1920,1080">1920×1080px (HD)</option>
<option value="original">オリジナルサイズ</option>
</select>
</div>
<div class="form-check mb-3">
<input class="form-check-input" type="checkbox" id="generateThumbnails" checked>
<label class="form-check-label" for="generateThumbnails">
サムネイルを自動生成
</label>
</div>
<button type="submit" class="btn btn-primary" id="uploadBtn">
アップロードして処理
</button>
</form>
<div class="progress mt-3 d-none" id="uploadProgress">
<div class="progress-bar progress-bar-striped progress-bar-animated"
role="progressbar" style="width: 0%"></div>
</div>
</div>
</div>
</div>
<div class="col-md-6">
<div class="card">
<div class="card-header">
プレビュー
</div>
<div class="card-body">
<div id="previewContainer">
<p class="text-muted">画像がアップロードされるとここに表示されます</p>
</div>
<div id="thumbnailContainer" class="d-none">
<h5 class="mt-3">生成されたサムネイル</h5>
<div class="thumbnail-grid" id="thumbnailGrid"></div>
</div>
</div>
</div>
</div>
</div>
<div class="row mt-4">
<div class="col-12">
<div class="card">
<div class="card-header">
画像最適化ツール
</div>
<div class="card-body">
<p class="card-text">
アップロード済みの画像をWebP形式に変換して最適化します。
ページ読み込み速度が向上します。
</p>
<button class="btn btn-warning" id="optimizeBtn">
既存画像を最適化
</button>
<div id="optimizeResult" class="mt-2"></div>
</div>
</div>
</div>
</div>
</div>
<script>
document.getElementById('uploadForm').addEventListener('submit', async function(e) {
e.preventDefault();
const fileInput = document.getElementById('imageFile');
const maxSize = document.getElementById('maxSize').value;
const generateThumbnails = document.getElementById('generateThumbnails').checked;
if (!fileInput.files[0]) {
alert('画像ファイルを選択してください');
return;
}
const formData = new FormData();
formData.append('image', fileInput.files[0]);
formData.append('max_size', maxSize);
formData.append('generate_thumbnails', generateThumbnails);
const uploadBtn = document.getElementById('uploadBtn');
const progressBar = document.getElementById('uploadProgress');
uploadBtn.disabled = true;
progressBar.classList.remove('d-none');
try {
const response = await fetch('/images/upload', {
method: 'POST',
body: formData
});
const result = await response.json();
if (response.ok) {
// プレビューを表示
const previewContainer = document.getElementById('previewContainer');
const thumbnailContainer = document.getElementById('thumbnailContainer');
const thumbnailGrid = document.getElementById('thumbnailGrid');
previewContainer.innerHTML = `
<img src="/images/serve/${result.filename}"
alt="アップロードした画像"
class="image-preview">
<p class="mt-2">
<strong>ファイル名:</strong> ${result.filename}<br>
<strong>サムネイル:</strong> ${result.thumbnails.length}種類生成されました
</p>
`;
// サムネイルを表示
if (result.thumbnails.length > 0) {
thumbnailGrid.innerHTML = '';
result.thumbnails.forEach(size => {
thumbnailGrid.innerHTML += `
<div class="thumbnail-item">
<img src="/images/serve/${size}/${result.filename}"
alt="${size}サムネイル"
class="thumbnail-img">
<small>${size}</small>
</div>
`;
});
thumbnailContainer.classList.remove('d-none');
}
alert('画像のアップロードと処理が成功しました!');
} else {
alert(`エラー: ${result.error}`);
}
} catch (error) {
alert('アップロード中にエラーが発生しました');
console.error(error);
} finally {
uploadBtn.disabled = false;
progressBar.classList.add('d-none');
}
});
document.getElementById('optimizeBtn').addEventListener('click', async function() {
const btn = this;
const resultDiv = document.getElementById('optimizeResult');
btn.disabled = true;
resultDiv.innerHTML = '<div class="spinner-border spinner-border-sm" role="status"></div> 処理中...';
try {
const response = await fetch('/images/optimize', {
method: 'POST'
});
const result = await response.json();
if (response.ok) {
resultDiv.innerHTML = `<div class="alert alert-success">${result.message}</div>`;
} else {
resultDiv.innerHTML = `<div class="alert alert-danger">エラー: ${result.error}</div>`;
}
} catch (error) {
resultDiv.innerHTML = '<div class="alert alert-danger">最適化中にエラーが発生しました</div>';
console.error(error);
} finally {
btn.disabled = false;
}
});
// ファイル選択時のプレビュー
document.getElementById('imageFile').addEventListener('change', function(e) {
const file = e.target.files[0];
if (file) {
const reader = new FileReader();
reader.onload = function(event) {
const previewContainer = document.getElementById('previewContainer');
previewContainer.innerHTML = `
<img src="${event.target.result}"
alt="選択した画像のプレビュー"
class="image-preview">
<p class="mt-2">
<strong>ファイル名:</strong> ${file.name}<br>
<strong>サイズ:</strong> ${(file.size / 1024).toFixed(2)} KB
</p>
`;
};
reader.readAsDataURL(file);
}
});
</script>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/js/bootstrap.bundle.min.js"></script>
</body>
</html>
サーバーからの応答を受けると、処理済み画像とともに生成された複数サイズのサムネイルがグリッド表示されます。下部には既存画像を一括最適化するツールも配置され、処理中はスピナー表示で待機状態を明確に伝えます。全体としてユーザビリティと視覚的フィードバックを重視した実装になっています。
設定ファイルの追加
アプリケーション全体の設定を一元管理する構成ファイルです。SECRET_KEYや最大アップロードサイズなどの基本設定に加え、許可する画像形式やデフォルトのリサイズサイズ、サムネイルのサイズ定義など画像処理に特化した多数のパラメータを保持しています。
# config.py
import os
from pathlib import Path
class Config:
SECRET_KEY = os.environ.get('SECRET_KEY') or 'your-secret-key-here'
# 画像関連の設定
MAX_CONTENT_LENGTH = 16 * 1024 * 1024 # 16MBまで
ALLOWED_IMAGE_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
# アップロード設定
UPLOAD_FOLDER = Path(__file__).parent / 'app' / 'static' / 'uploads'
THUMBNAIL_FOLDER = UPLOAD_FOLDER / 'thumbnails'
# 画像処理設定
DEFAULT_MAX_SIZE = (1200, 1200)
THUMBNAIL_SIZES = {
'xs': (100, 100),
'sm': (300, 300),
'md': (600, 600),
'lg': (1200, 1200)
}
# WebP変換設定
CONVERT_TO_WEBP = True
WEBP_QUALITY = 85
@staticmethod
def init_app(app):
# フォルダを作成
Config.UPLOAD_FOLDER.mkdir(parents=True, exist_ok=True)
Config.THUMBNAIL_FOLDER.mkdir(parents=True, exist_ok=True)
重要なのはUPLOAD_FOLDERとTHUMBNAIL_FOLDERをPathオブジェクトで宣言している点で、プロジェクトルートからの相対パスを動的に解決し、init_appメソッドで実際のディレクトリ作成まで自動実行します。
これによりImageProcessorやThumbnailGeneratorで個別にフォルダパスを記述する必要がなくなり、一箇所の変更で全体の動作を制御できます。環境変数からの読み込みとデフォルト値の併用、数値や辞書型での定数定義など、保守性と可読性を考慮した設計になっています。
アプリケーションのメインファイル
create_app関数がアプリケーションインスタンスを生成し、外部のConfigクラスから設定情報を読み込んで適用します。画像処理用のルートはブループリントとして登録され、モジュール性と関心の分離が図られています。
注目すべきはconfig_class.init_appメソッドの呼び出しで、アップロードディレクトリ作成などの初期化作業を設定ファイル側に委譲することで、アプリケーションの起動処理を簡潔に保っています。このファクトリーパターンにより環境ごとの設定切り替えやテストが容易になり、複数のアプリケーションインスタンス生成にも柔軟に対応できます。全体として設定とアプリケーション構築の責務が明確に分離された、拡張性の高い設計となっています。
# app/__init__.py
from flask import Flask
from config import Config
from app.routes.images import bp as images_bp
def create_app(config_class=Config):
app = Flask(__name__)
app.config.from_object(config_class)
# ブループリントの登録
app.register_blueprint(images_bp)
# 初期設定
config_class.init_app(app)
return app
アプリケーションの実行ファイル
わずか数行でありながら、先ほど定義されたcreate_appファクトリー関数を呼び出してFlaskアプリケーションインスタンスを生成し、開発サーバーを起動する役割を担っています。
# run.py
from app import create_app
app = create_app()
if __name__ == '__main__':
app.run(debug=True)
このシンプルな構造により、アプリケーションの起動処理と実際の実装が完全に分離され、保守性が高まっています。debugモードは開発中の利便性のために有効化されており、コード変更時の自動再読み込みや詳細なエラー表示が可能です。
まとめ
Flaskアプリケーションにおける画像処理機能の実装について学び、Pillowライブラリを用いた画像の読み込み、リサイズ、フォーマット変換、保存といった基本操作から、複数サイズのサムネイルを自動生成する方法、WebP形式への変換や適切なサイズでの提供、遅延読み込みといった画像表示の最適化テクニック、そして画像アップロードから処理、表示に至るまでのFlaskとの統合フローまでを包括的にカバーしました。このような画像処理を適切に実装することにより、ページ読み込み速度の向上、モバイルユーザーへの最適化、サーバー帯域幅の削減といった技術的利点に加え、ユーザー体験の向上という大きなメリットが得られます。
演習問題
初級問題(3問)
問題1:基本的な画像リサイズ
画像を指定した最大サイズにリサイズする関数を作成してください。アスペクト比は保持し、画像が指定サイズより小さい場合はリサイズしないようにしてください。
問題2:画像フォーマット変換
JPEG形式の画像をPNG形式に変換する関数を作成してください。変換時に透過情報が失われないように注意してください。
問題3:簡単な画像情報取得
画像ファイルから以下の情報を取得する関数を作成してください:
- 画像サイズ(幅×高さ)
- 画像モード(RGB、RGBAなど)
- ファイルサイズ
中級問題(6問)
問題4:画像の自動回転
スマートフォンで撮影した画像のExif情報を読み取り、自動的に正しい向きに回転させる機能を実装してください。
問題5:画像のウォーターマーク追加
画像の右下にテキストまたは画像のウォーターマークを追加する機能を作成してください。ウォーターマークの透過度も調整できるようにしてください。
問題6:画像の一括処理
指定したディレクトリ内のすべての画像ファイルを読み込み、一括でリサイズし、別のディレクトリに保存するバッチ処理機能を実装してください。
問題7:画像の圧縮品質調整
画像の圧縮品質を指定して保存する機能を作成してください。品質は1(最低)から100(最高)まで指定できるようにし、ファイルサイズと画質のバランスを調整できるようにしてください。
問題8:画像のトリミング機能
画像を指定した座標とサイズでトリミング(切り抜き)する機能を実装してください。座標とサイズはパラメータで指定できるようにしてください。
問題9:サムネイルキャッシュシステム
一度生成したサムネイルをキャッシュし、同じサイズのサムネイルが再度要求された時に再生成しないようにするシステムを実装してください。
上級問題(3問)
問題10:スマート画像最適化システム
画像の内容を分析し、最適な圧縮設定を自動的に決定するシステムを作成してください。例えば:
- 写真の場合は高品質で保存
- スクリーンショットの場合はロスレス圧縮
- 単色が多い画像の場合は色数を減らす
問題11:プログレッシブ画像表示システム
画像を複数の解像度でエンコードし、ページ読み込み時に低解像度から高解像度へと段階的に表示するシステムを実装してください。
問題12:クラウド画像処理パイプライン
画像アップロードから、複数サイズのサムネイル生成、CDNへの配信までを非同期で処理するパイプラインシステムを設計・実装してください。Celeryなどのタスクキューを使用して、バックグラウンドで処理を行うようにしてください。
演習問題 解答例
初級問題
問題1:基本的な画像リサイズ
from PIL import Image
import os
def resize_image_max_size(image_path, output_path, max_size=(800, 600)):
"""
画像を指定した最大サイズにリサイズ(アスペクト比保持)
Args:
image_path: 入力画像のパス
output_path: 出力画像のパス
max_size: 最大サイズ (width, height)
Returns:
bool: 成功したかどうか
"""
try:
with Image.open(image_path) as img:
# 現在のサイズを取得
current_width, current_height = img.size
max_width, max_height = max_size
# 現在のサイズが最大サイズより小さい場合はそのままコピー
if current_width <= max_width and current_height <= max_height:
img.save(output_path)
print(f"画像は既に指定サイズより小さいため、コピーしました")
return True
# アスペクト比を計算
aspect_ratio = current_width / current_height
target_aspect_ratio = max_width / max_height
if aspect_ratio > target_aspect_ratio:
# 横長の画像
new_width = max_width
new_height = int(max_width / aspect_ratio)
else:
# 縦長の画像
new_height = max_height
new_width = int(max_height * aspect_ratio)
# リサイズ
resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
resized_img.save(output_path, optimize=True)
print(f"画像を {current_width}x{current_height} から {new_width}x{new_height} にリサイズしました")
return True
except Exception as e:
print(f"画像リサイズエラー: {e}")
return False
# 使用例
if __name__ == "__main__":
resize_image_max_size("input.jpg", "output.jpg", max_size=(800, 600))
問題2:画像フォーマット変換
from PIL import Image
import os
def convert_jpeg_to_png(jpeg_path, png_path, preserve_transparency=True):
"""
JPEG画像をPNG形式に変換
Args:
jpeg_path: JPEG画像のパス
png_path: 出力PNG画像のパス
preserve_transparency: 透過情報を保持するか(JPEGには透過情報がないが、将来の拡張用)
Returns:
bool: 成功したかどうか
"""
try:
with Image.open(jpeg_path) as img:
# JPEGは通常RGBモード(透過情報なし)
if img.mode == 'RGB':
# RGBをRGBAに変換(完全不透明のアルファチャンネルを追加)
if preserve_transparency:
# アルファチャンネルを追加(完全不透明 = 255)
rgba_img = Image.new('RGBA', img.size)
rgba_img.paste(img, (0, 0))
rgba_img.save(png_path, 'PNG', optimize=True)
else:
img.save(png_path, 'PNG', optimize=True)
else:
# 他のモードの場合はそのまま保存
img.save(png_path, 'PNG', optimize=True)
# ファイルサイズを比較
jpeg_size = os.path.getsize(jpeg_path)
png_size = os.path.getsize(png_path)
print(f"変換完了: {jpeg_path} -> {png_path}")
print(f"ファイルサイズ: {jpeg_size/1024:.1f}KB -> {png_size/1024:.1f}KB")
return True
except Exception as e:
print(f"画像変換エラー: {e}")
return False
# 使用例
if __name__ == "__main__":
convert_jpeg_to_png("photo.jpg", "photo.png")
問題3:簡単な画像情報取得
from PIL import Image
import os
from datetime import datetime
def get_image_info(image_path):
"""
画像ファイルの情報を取得
Args:
image_path: 画像ファイルのパス
Returns:
dict: 画像情報の辞書
"""
info = {
'file_path': image_path,
'file_size': 0,
'file_format': None,
'image_size': None,
'image_mode': None,
'created_time': None,
'modified_time': None,
'exif_data': None
}
try:
# ファイル情報
if os.path.exists(image_path):
file_stats = os.stat(image_path)
info['file_size'] = file_stats.st_size
info['created_time'] = datetime.fromtimestamp(file_stats.st_ctime)
info['modified_time'] = datetime.fromtimestamp(file_stats.st_mtime)
# 画像情報
with Image.open(image_path) as img:
info['image_size'] = img.size # (width, height)
info['image_mode'] = img.mode
info['file_format'] = img.format
# Exif情報(存在する場合)
if hasattr(img, '_getexif') and img._getexif():
info['exif_data'] = img._getexif()
return info
except Exception as e:
print(f"画像情報取得エラー: {e}")
return info
def print_image_info(image_path):
"""
画像情報を読みやすい形式で表示
"""
info = get_image_info(image_path)
print("=" * 50)
print(f"画像情報: {info['file_path']}")
print("=" * 50)
if info['file_size'] > 0:
print(f"ファイルサイズ: {info['file_size']:,} バイト ({info['file_size']/1024:.1f} KB)")
if info['image_size']:
width, height = info['image_size']
print(f"画像サイズ: {width} × {height} ピクセル")
print(f"縦横比: {width/height:.2f}:1")
print(f"画像モード: {info['image_mode']}")
print(f"ファイル形式: {info['file_format']}")
if info['created_time']:
print(f"作成日時: {info['created_time'].strftime('%Y-%m-%d %H:%M:%S')}")
if info['modified_time']:
print(f"更新日時: {info['modified_time'].strftime('%Y-%m-%d %H:%M:%S')}")
# Exif情報から主要なデータを表示
if info['exif_data']:
print("\nExif情報:")
exif_tags = {
271: 'メーカー',
272: 'モデル',
306: '日時',
36867: '撮影日時',
36868: 'デジタル化日時',
37378: '露出時間',
37379: 'F値',
37380: 'ISO感度',
37383: '焦点距離',
37385: 'フラッシュ',
41987: 'ホワイトバランス'
}
for tag_id, tag_name in exif_tags.items():
if tag_id in info['exif_data']:
value = info['exif_data'][tag_id]
print(f" {tag_name}: {value}")
else:
print("ファイルが見つからないか、読み込みに失敗しました")
print("=" * 50)
# 使用例
if __name__ == "__main__":
print_image_info("sample.jpg")
中級問題
問題4:画像の自動回転
from PIL import Image, ImageOps
import os
def auto_rotate_image(input_path, output_path=None):
"""
Exif情報を読み取り、画像を自動回転
Args:
input_path: 入力画像パス
output_path: 出力画像パス(Noneの場合は入力ファイルを上書き)
Returns:
bool: 回転が行われたかどうか
"""
if output_path is None:
output_path = input_path
try:
with Image.open(input_path) as img:
# Exif情報を取得
exif = img.getexif()
# 画像の向きを自動修正
rotated_img = ImageOps.exif_transpose(img)
# 回転が行われたかチェック
was_rotated = rotated_img.size != img.size
# 保存
rotated_img.save(output_path, exif=exif, optimize=True)
if was_rotated:
print(f"画像を自動回転しました: {input_path}")
else:
print(f"回転の必要はありませんでした: {input_path}")
return was_rotated
except Exception as e:
print(f"自動回転エラー: {e}")
return False
def batch_auto_rotate(directory, recursive=True, extensions=None):
"""
ディレクトリ内の全画像を自動回転
Args:
directory: 対象ディレクトリ
recursive: サブディレクトリも含めるか
extensions: 処理する拡張子のリスト(Noneなら全ての画像)
"""
if extensions is None:
extensions = ['.jpg', '.jpeg', '.png', '.tiff', '.bmp']
rotated_count = 0
total_count = 0
for root, dirs, files in os.walk(directory):
for file in files:
if any(file.lower().endswith(ext) for ext in extensions):
input_path = os.path.join(root, file)
total_count += 1
if auto_rotate_image(input_path):
rotated_count += 1
if not recursive:
break
print(f"\n処理完了: {total_count}ファイル中{rotated_count}ファイルを回転しました")
# 使用例
if __name__ == "__main__":
# 単一ファイル
auto_rotate_image("photo.jpg", "photo_rotated.jpg")
# バッチ処理
batch_auto_rotate("./photos", recursive=True)
問題5:画像のウォーターマーク追加
from PIL import Image, ImageDraw, ImageFont, ImageEnhance
import os
def add_text_watermark(input_path, output_path, text,
position='bottom-right', opacity=0.7,
font_size=20, font_color=(255, 255, 255, 128)):
"""
画像にテキストウォーターマークを追加
Args:
input_path: 入力画像パス
output_path: 出力画像パス
text: ウォーターマークテキスト
position: 位置 ('top-left', 'top-right', 'bottom-left', 'bottom-right', 'center')
opacity: 不透明度 (0.0〜1.0)
font_size: フォントサイズ
font_color: フォント色 (R, G, B, A)
Returns:
bool: 成功したかどうか
"""
try:
with Image.open(input_path).convert('RGBA') as base_img:
# テキストレイヤーを作成
txt_layer = Image.new('RGBA', base_img.size, (255, 255, 255, 0))
draw = ImageDraw.Draw(txt_layer)
# フォントの読み込み(システムフォントを使用)
try:
font = ImageFont.truetype("arial.ttf", font_size)
except:
# システムフォントがなければデフォルトフォントを使用
font = ImageFont.load_default()
# テキストサイズを計算
text_bbox = draw.textbbox((0, 0), text, font=font)
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
# 位置を計算
margin = 20
if position == 'top-left':
x = margin
y = margin
elif position == 'top-right':
x = base_img.width - text_width - margin
y = margin
elif position == 'bottom-left':
x = margin
y = base_img.height - text_height - margin
elif position == 'bottom-right':
x = base_img.width - text_width - margin
y = base_img.height - text_height - margin
elif position == 'center':
x = (base_img.width - text_width) // 2
y = (base_img.height - text_height) // 2
else:
x = margin
y = margin
# 影を描画(見やすくするため)
draw.text((x+2, y+2), text, font=font,
fill=(0, 0, 0, int(font_color[3] * opacity)))
# メインテキストを描画
draw.text((x, y), text, font=font, fill=font_color)
# 不透明度を調整
if opacity < 1.0:
alpha = txt_layer.split()[3]
alpha = ImageEnhance.Brightness(alpha).enhance(opacity)
txt_layer.putalpha(alpha)
# 画像に合成
watermarked = Image.alpha_composite(base_img, txt_layer)
# RGBに変換して保存(JPEG用)
if output_path.lower().endswith(('.jpg', '.jpeg')):
watermarked = watermarked.convert('RGB')
watermarked.save(output_path, optimize=True)
print(f"ウォーターマークを追加しました: {output_path}")
return True
except Exception as e:
print(f"ウォーターマーク追加エラー: {e}")
return False
def add_image_watermark(input_path, output_path, watermark_path,
position='bottom-right', opacity=0.5, scale=0.2):
"""
画像に画像ウォーターマークを追加
Args:
input_path: 入力画像パス
output_path: 出力画像パス
watermark_path: ウォーターマーク画像パス
position: 位置
opacity: 不透明度 (0.0〜1.0)
scale: ウォーターマークのスケール(元画像に対する比率)
Returns:
bool: 成功したかどうか
"""
try:
# ベース画像を開く
with Image.open(input_path).convert('RGBA') as base_img:
# ウォーターマーク画像を開く
with Image.open(watermark_path).convert('RGBA') as watermark_img:
# ウォーターマークをリサイズ
new_width = int(base_img.width * scale)
aspect_ratio = watermark_img.height / watermark_img.width
new_height = int(new_width * aspect_ratio)
watermark_resized = watermark_img.resize((new_width, new_height),
Image.Resampling.LANCZOS)
# 不透明度を調整
if opacity < 1.0:
alpha = watermark_resized.split()[3]
alpha = ImageEnhance.Brightness(alpha).enhance(opacity)
watermark_resized.putalpha(alpha)
# 位置を計算
margin = 10
if position == 'top-left':
x = margin
y = margin
elif position == 'top-right':
x = base_img.width - new_width - margin
y = margin
elif position == 'bottom-left':
x = margin
y = base_img.height - new_height - margin
elif position == 'bottom-right':
x = base_img.width - new_width - margin
y = base_img.height - new_height - margin
elif position == 'center':
x = (base_img.width - new_width) // 2
y = (base_img.height - new_height) // 2
elif position == 'tile':
# タイル状に繰り返し
watermarked = base_img.copy()
for x in range(0, base_img.width, new_width):
for y in range(0, base_img.height, new_height):
watermarked.alpha_composite(watermark_resized, (x, y))
# RGBに変換して保存
if output_path.lower().endswith(('.jpg', '.jpeg')):
watermarked = watermarked.convert('RGB')
watermarked.save(output_path, optimize=True)
print(f"タイルウォーターマークを追加しました: {output_path}")
return True
else:
x = margin
y = margin
# 画像に合成
base_img.alpha_composite(watermark_resized, (x, y))
# RGBに変換して保存
if output_path.lower().endswith(('.jpg', '.jpeg')):
base_img = base_img.convert('RGB')
base_img.save(output_path, optimize=True)
print(f"画像ウォーターマークを追加しました: {output_path}")
return True
except Exception as e:
print(f"画像ウォーターマーク追加エラー: {e}")
return False
# 使用例
if __name__ == "__main__":
# テキストウォーターマーク
add_text_watermark(
input_path="photo.jpg",
output_path="photo_watermarked.jpg",
text="© 2024 My Company",
position='bottom-right',
opacity=0.7,
font_size=24,
font_color=(255, 255, 255, 180)
)
# 画像ウォーターマーク
add_image_watermark(
input_path="photo.jpg",
output_path="photo_logo_watermarked.jpg",
watermark_path="logo.png",
position='bottom-right',
opacity=0.5,
scale=0.1
)
問題6:画像の一括処理
from PIL import Image, ImageOps
import os
from pathlib import Path
import concurrent.futures
from tqdm import tqdm
class BatchImageProcessor:
def __init__(self, max_workers=4):
self.max_workers = max_workers
self.supported_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'}
def process_single_image(self, input_path, output_path, max_size=None,
format=None, quality=85, rotate=True):
"""
単一画像を処理
Args:
input_path: 入力画像パス
output_path: 出力画像パス
max_size: 最大サイズ (width, height) または None
format: 出力形式 ('JPEG', 'PNG', 'WEBP') または None(自動)
quality: 圧縮品質 (1-100)
rotate: Exifに基づき自動回転するか
Returns:
tuple: (成功したか, エラーメッセージ)
"""
try:
with Image.open(input_path) as img:
# 自動回転
if rotate:
img = ImageOps.exif_transpose(img)
# リサイズ
if max_size:
img.thumbnail(max_size, Image.Resampling.LANCZOS)
# 出力形式を決定
if format is None:
# 拡張子から形式を判断
ext = Path(output_path).suffix.lower()
if ext == '.jpg' or ext == '.jpeg':
format = 'JPEG'
elif ext == '.png':
format = 'PNG'
elif ext == '.webp':
format = 'WEBP'
else:
format = 'JPEG' # デフォルト
# 形式に応じた保存オプション
save_kwargs = {'optimize': True}
if format == 'JPEG':
# JPEGはRGBモードが必要
if img.mode in ('RGBA', 'LA', 'P'):
# アルファチャンネルがある場合は白背景で合成
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'RGBA':
background.paste(img, mask=img.split()[-1])
else:
background.paste(img, (0, 0))
img = background
save_kwargs['quality'] = quality
elif format == 'WEBP':
save_kwargs['quality'] = quality
# 保存
img.save(output_path, format=format, **save_kwargs)
return True, ""
except Exception as e:
return False, str(e)
def process_directory(self, input_dir, output_dir,
max_size=None, format=None, quality=85,
recursive=True, overwrite=False):
"""
ディレクトリ内の全画像を処理
Args:
input_dir: 入力ディレクトリ
output_dir: 出力ディレクトリ
max_size: 最大サイズ
format: 出力形式
quality: 圧縮品質
recursive: サブディレクトリも含めるか
overwrite: 既存ファイルを上書きするか
Returns:
dict: 処理結果の統計
"""
input_path = Path(input_dir)
output_path = Path(output_dir)
# 出力ディレクトリを作成
output_path.mkdir(parents=True, exist_ok=True)
# 処理対象ファイルを収集
files_to_process = []
if recursive:
walk_iter = input_path.rglob('*')
else:
walk_iter = input_path.glob('*')
for item in walk_iter:
if item.is_file() and item.suffix.lower() in self.supported_extensions:
# 相対パスを計算
rel_path = item.relative_to(input_path)
# 出力パスを構築
out_file = output_path / rel_path
# 必要に応じて出力ディレクトリを作成
out_file.parent.mkdir(parents=True, exist_ok=True)
# フォーマット指定がある場合は拡張子を変更
if format:
out_file = out_file.with_suffix(f'.{format.lower()}')
# 既存ファイルチェック
if not overwrite and out_file.exists():
continue
files_to_process.append((item, out_file))
# 処理結果の統計
stats = {
'total': len(files_to_process),
'success': 0,
'failed': 0,
'errors': []
}
# プログレスバー
print(f"処理対象: {stats['total']}ファイル")
print(f"出力先: {output_dir}")
# マルチスレッド処理
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# タスクを実行
futures = []
for in_file, out_file in files_to_process:
future = executor.submit(
self.process_single_image,
str(in_file), str(out_file),
max_size, format, quality
)
futures.append((future, in_file, out_file))
# 結果を収集
for future, in_file, out_file in tqdm(futures, desc="処理中"):
success, error = future.result()
if success:
stats['success'] += 1
else:
stats['failed'] += 1
stats['errors'].append({
'file': str(in_file),
'error': error
})
# 結果を表示
print("\n" + "="*50)
print("バッチ処理完了")
print(f"成功: {stats['success']}ファイル")
print(f"失敗: {stats['failed']}ファイル")
if stats['errors']:
print("\nエラー詳細:")
for error in stats['errors'][:5]: # 最初の5件のみ表示
print(f" {error['file']}: {error['error']}")
if len(stats['errors']) > 5:
print(f" ... 他 {len(stats['errors']) - 5}件のエラー")
return stats
# 使用例
if __name__ == "__main__":
processor = BatchImageProcessor(max_workers=4)
stats = processor.process_directory(
input_dir="./raw_photos",
output_dir="./processed_photos",
max_size=(1920, 1080), # Full HDサイズにリサイズ
format='WEBP', # WebP形式で保存
quality=80, # 品質80%
recursive=True, # サブディレクトリも含む
overwrite=False # 既存ファイルは上書きしない
)
問題7:画像の圧縮品質調整
from PIL import Image
import os
from pathlib import Path
import io
class ImageCompressor:
def __init__(self):
self.format_settings = {
'JPEG': {'extensions': ['.jpg', '.jpeg'], 'default_quality': 85},
'WEBP': {'extensions': ['.webp'], 'default_quality': 80},
'PNG': {'extensions': ['.png'], 'default_quality': None} # PNGは品質設定なし
}
def compress_image(self, input_path, output_path=None, quality=None,
max_file_size=None, target_format=None):
"""
画像を圧縮
Args:
input_path: 入力画像パス
output_path: 出力画像パス(Noneの場合は入力ファイルを上書き)
quality: 圧縮品質 (1-100、Noneの場合はデフォルト)
max_file_size: 最大ファイルサイズ(バイト、オプション)
target_format: 出力形式(Noneの場合は入力形式を維持)
Returns:
dict: 圧縮結果
"""
if output_path is None:
output_path = input_path
try:
with Image.open(input_path) as img:
# 元のファイル情報
original_format = img.format
original_mode = img.mode
# 出力形式を決定
if target_format is None:
target_format = original_format or 'JPEG'
# 品質を決定
if quality is None:
if target_format in self.format_settings:
quality = self.format_settings[target_format]['default_quality']
else:
quality = 85
# 品質を範囲内に収める
quality = max(1, min(100, quality))
# 保存オプション
save_kwargs = {}
if target_format == 'JPEG':
# JPEGはRGBモードが必要
if img.mode in ('RGBA', 'LA', 'P'):
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'RGBA':
background.paste(img, mask=img.split()[-1])
else:
background.paste(img, (0, 0))
img = background
save_kwargs = {
'format': 'JPEG',
'quality': quality,
'optimize': True,
'progressive': True # プログレッシブJPEG
}
elif target_format == 'WEBP':
save_kwargs = {
'format': 'WEBP',
'quality': quality,
'method': 6, # 圧縮方法 (0-6、大きいほど高圧縮)
'lossless': False
}
elif target_format == 'PNG':
save_kwargs = {
'format': 'PNG',
'optimize': True,
'compress_level': 9 # 圧縮レベル (0-9、大きいほど高圧縮)
}
# 最大ファイルサイズ指定がある場合
if max_file_size:
# バイナリストリームに保存してサイズ確認
buffer = io.BytesIO()
img.save(buffer, **save_kwargs)
current_size = buffer.getbuffer().nbytes
# サイズが大きすぎる場合、品質を下げて再試行
if current_size > max_file_size:
print(f"ファイルサイズが大きすぎます: {current_size}バイト")
print(f"目標サイズ: {max_file_size}バイト")
if target_format in ['JPEG', 'WEBP']:
# バイナリサーチで最適な品質を探す
low, high = 1, quality
best_quality = low
for _ in range(10): # 最大10回試行
mid = (low + high) // 2
save_kwargs['quality'] = mid
buffer = io.BytesIO()
img.save(buffer, **save_kwargs)
current_size = buffer.getbuffer().nbytes
if current_size <= max_file_size:
best_quality = mid
low = mid + 1
else:
high = mid - 1
save_kwargs['quality'] = best_quality
print(f"最適な品質に調整: {best_quality}")
# ファイルを保存
img.save(output_path, **save_kwargs)
# 結果を比較
original_size = os.path.getsize(input_path)
compressed_size = os.path.getsize(output_path)
result = {
'success': True,
'original_size': original_size,
'compressed_size': compressed_size,
'reduction_percent': ((original_size - compressed_size) / original_size * 100)
if original_size > 0 else 0,
'original_format': original_format,
'compressed_format': target_format,
'quality_used': save_kwargs.get('quality', 'N/A')
}
print(f"圧縮完了: {input_path} -> {output_path}")
print(f" 元のサイズ: {original_size/1024:.1f} KB")
print(f" 圧縮サイズ: {compressed_size/1024:.1f} KB")
print(f" 削減率: {result['reduction_percent']:.1f}%")
return result
except Exception as e:
print(f"圧縮エラー: {e}")
return {'success': False, 'error': str(e)}
def find_optimal_quality(self, input_path, target_size_kb, max_iterations=10):
"""
目標ファイルサイズを達成する最適な品質を探す
Args:
input_path: 入力画像パス
target_size_kb: 目標ファイルサイズ(KB)
max_iterations: 最大試行回数
Returns:
dict: 最適な設定
"""
target_size_bytes = target_size_kb * 1024
with Image.open(input_path) as img:
# JPEG/WEBP用の品質探索
low, high = 1, 95
best_settings = None
for i in range(max_iterations):
mid = (low + high) // 2
# メモリ上で圧縮してサイズを計測
buffer = io.BytesIO()
if img.mode in ('RGBA', 'LA', 'P'):
# RGBAをRGBに変換
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'RGBA':
background.paste(img, mask=img.split()[-1])
else:
background.paste(img, (0, 0))
temp_img = background
else:
temp_img = img
temp_img.save(buffer, format='JPEG', quality=mid, optimize=True)
current_size = buffer.getbuffer().nbytes
print(f"試行 {i+1}: 品質={mid}, サイズ={current_size/1024:.1f}KB")
if abs(current_size - target_size_bytes) < target_size_bytes * 0.05: # 5%以内
best_settings = {'quality': mid, 'size': current_size}
break
elif current_size > target_size_bytes:
high = mid - 1
else:
low = mid + 1
best_settings = {'quality': mid, 'size': current_size}
return best_settings
# 使用例
if __name__ == "__main__":
compressor = ImageCompressor()
# 基本的な圧縮
result = compressor.compress_image(
input_path="large_photo.jpg",
output_path="compressed_photo.jpg",
quality=75
)
# 最大ファイルサイズ指定
result = compressor.compress_image(
input_path="large_photo.jpg",
output_path="optimized_photo.jpg",
max_file_size=200*1024 # 200KB以下に
)
# 最適な品質を探索
optimal = compressor.find_optimal_quality(
input_path="large_photo.jpg",
target_size_kb=150 # 150KB以下を目指す
)
print(f"最適な品質: {optimal['quality']}")
問題8:画像のトリミング機能
from PIL import Image
import os
class ImageCropper:
def __init__(self):
self.crop_modes = ['manual', 'center', 'face', 'salient']
def crop_image(self, input_path, output_path, crop_params):
"""
画像をトリミング
Args:
input_path: 入力画像パス
output_path: 出力画像パス
crop_params: トリミングパラメータの辞書
Returns:
bool: 成功したかどうか
"""
try:
with Image.open(input_path) as img:
width, height = img.size
# トリミングモードに応じて座標を計算
mode = crop_params.get('mode', 'manual')
if mode == 'manual':
# 手動で座標を指定
left = crop_params.get('left', 0)
top = crop_params.get('top', 0)
right = crop_params.get('right', width)
bottom = crop_params.get('bottom', height)
# 座標を画像範囲内に収める
left = max(0, min(left, width))
top = max(0, min(top, height))
right = max(left, min(right, width))
bottom = max(top, min(bottom, height))
elif mode == 'center':
# 中心から指定サイズでトリミング
crop_width = crop_params.get('width', min(width, height))
crop_height = crop_params.get('height', crop_width)
# 中心座標を計算
left = (width - crop_width) // 2
top = (height - crop_height) // 2
right = left + crop_width
bottom = top + crop_height
# 範囲内に収める
left = max(0, left)
top = max(0, top)
right = min(width, right)
bottom = min(height, bottom)
elif mode == 'aspect_ratio':
# 指定したアスペクト比でトリミング
target_ratio = crop_params.get('aspect_ratio', 1.0) # 幅/高さ
if width / height > target_ratio:
# 画像が横長 → 幅を調整
new_width = int(height * target_ratio)
left = (width - new_width) // 2
top = 0
right = left + new_width
bottom = height
else:
# 画像が縦長 → 高さを調整
new_height = int(width / target_ratio)
left = 0
top = (height - new_height) // 2
right = width
bottom = top + new_height
else:
raise ValueError(f"未知のモード: {mode}")
# トリミング実行
cropped_img = img.crop((left, top, right, bottom))
# 出力サイズ指定がある場合はリサイズ
output_width = crop_params.get('output_width')
output_height = crop_params.get('output_height')
if output_width and output_height:
cropped_img = cropped_img.resize(
(output_width, output_height),
Image.Resampling.LANCZOS
)
# 保存
cropped_img.save(output_path, optimize=True)
print(f"トリミング完了: {output_path}")
print(f" 元のサイズ: {width}x{height}")
print(f" トリミング後: {cropped_img.size[0]}x{cropped_img.size[1]}")
print(f" トリミング領域: ({left},{top})-({right},{bottom})")
return True
except Exception as e:
print(f"トリミングエラー: {e}")
return False
def batch_crop_by_aspect(self, input_dir, output_dir, aspect_ratios,
min_size=100, recursive=True):
"""
アスペクト比ごとにバッチトリミング
Args:
input_dir: 入力ディレクトリ
output_dir: 出力ディレクトリ
aspect_ratios: アスペクト比のリスト [(name, ratio), ...]
min_size: 最小サイズ
recursive: サブディレクトリを含むか
"""
import glob
# ファイルパターン
patterns = ['*.jpg', '*.jpeg', '*.png', '*.gif', '*.bmp']
if recursive:
search_paths = []
for pattern in patterns:
search_paths.extend(glob.glob(os.path.join(input_dir, '**', pattern), recursive=True))
else:
search_paths = []
for pattern in patterns:
search_paths.extend(glob.glob(os.path.join(input_dir, pattern)))
total_processed = 0
for input_path in search_paths:
try:
with Image.open(input_path) as img:
width, height = img.size
# 最小サイズチェック
if width < min_size or height < min_size:
continue
# 各アスペクト比でトリミング
for name, ratio in aspect_ratios:
# 出力パスを生成
rel_path = os.path.relpath(input_path, input_dir)
base_name = os.path.splitext(rel_path)[0]
output_filename = f"{base_name}_{name}.jpg"
output_path = os.path.join(output_dir, output_filename)
# 出力ディレクトリを作成
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# トリミング実行
self.crop_image(
input_path=input_path,
output_path=output_path,
crop_params={
'mode': 'aspect_ratio',
'aspect_ratio': ratio
}
)
total_processed += 1
except Exception as e:
print(f"処理エラー ({input_path}): {e}")
print(f"\nバッチトリミング完了: {total_processed}ファイルを処理")
# 使用例
if __name__ == "__main__":
cropper = ImageCropper()
# 手動トリミング
cropper.crop_image(
input_path="photo.jpg",
output_path="photo_cropped.jpg",
crop_params={
'mode': 'manual',
'left': 100,
'top': 50,
'right': 700,
'bottom': 500
}
)
# 中心トリミング
cropper.crop_image(
input_path="photo.jpg",
output_path="photo_center_crop.jpg",
crop_params={
'mode': 'center',
'width': 400,
'height': 400
}
)
# アスペクト比トリミング
cropper.crop_image(
input_path="photo.jpg",
output_path="photo_16_9.jpg",
crop_params={
'mode': 'aspect_ratio',
'aspect_ratio': 16/9
}
)
# バッチ処理
aspect_ratios = [
('square', 1.0), # 正方形
('portrait', 2/3), # ポートレート (2:3)
('landscape', 16/9), # ランドスケープ (16:9)
('instagram', 4/5), # Instagram縦長 (4:5)
]
cropper.batch_crop_by_aspect(
input_dir="./photos",
output_dir="./cropped_photos",
aspect_ratios=aspect_ratios,
min_size=300,
recursive=True
)
問題9:サムネイルキャッシュシステム
from PIL import Image, ImageOps
import os
import hashlib
import json
from datetime import datetime, timedelta
from pathlib import Path
import shutil
class ThumbnailCache:
def __init__(self, cache_dir='./thumbnail_cache', max_cache_size_mb=500):
"""
サムネイルキャッシュシステム
Args:
cache_dir: キャッシュディレクトリ
max_cache_size_mb: 最大キャッシュサイズ(MB)
"""
self.cache_dir = Path(cache_dir)
self.max_cache_size = max_cache_size_mb * 1024 * 1024 # バイトに変換
self.metadata_file = self.cache_dir / 'metadata.json'
# キャッシュディレクトリを作成
self.cache_dir.mkdir(parents=True, exist_ok=True)
# メタデータを読み込み
self.metadata = self._load_metadata()
def _load_metadata(self):
"""メタデータを読み込む"""
if self.metadata_file.exists():
try:
with open(self.metadata_file, 'r', encoding='utf-8') as f:
return json.load(f)
except:
return {}
return {}
def _save_metadata(self):
"""メタデータを保存する"""
try:
with open(self.metadata_file, 'w', encoding='utf-8') as f:
json.dump(self.metadata, f, indent=2, ensure_ascii=False)
except Exception as e:
print(f"メタデータ保存エラー: {e}")
def _generate_cache_key(self, image_path, size, crop_mode=None):
"""
キャッシュキーを生成
Args:
image_path: 画像ファイルパス
size: サムネイルサイズ (width, height)
crop_mode: クロップモード
Returns:
str: キャッシュキー
"""
# ファイルの最終更新時間とサイズを取得
try:
stat = os.stat(image_path)
file_info = f"{image_path}:{stat.st_mtime}:{stat.st_size}"
except:
file_info = image_path
# パラメータを含めてハッシュを生成
params = f"{file_info}:{size[0]}x{size[1]}"
if crop_mode:
params += f":{crop_mode}"
return hashlib.md5(params.encode('utf-8')).hexdigest()
def _get_cache_path(self, cache_key, extension='.webp'):
"""キャッシュファイルのパスを取得"""
# キャッシュキーの最初の2文字をディレクトリ名に使用
sub_dir = self.cache_dir / cache_key[:2]
sub_dir.mkdir(exist_ok=True)
return sub_dir / f"{cache_key}{extension}"
def _cleanup_cache(self):
"""キャッシュをクリーンアップ"""
try:
# キャッシュサイズを計算
total_size = 0
cache_files = []
for file_path in self.cache_dir.rglob('*'):
if file_path.is_file() and file_path.suffix in ['.webp', '.jpg', '.png']:
file_size = file_path.stat().st_size
total_size += file_size
# 最終アクセス時間を取得
access_time = file_path.stat().st_atime
cache_files.append({
'path': file_path,
'size': file_size,
'access_time': access_time
})
# 最大サイズを超えている場合、古いファイルから削除
if total_size > self.max_cache_size:
print(f"キャッシュクリーンアップ: {total_size/1024/1024:.1f}MB > {self.max_cache_size/1024/1024:.1f}MB")
# アクセス時間でソート(古い順)
cache_files.sort(key=lambda x: x['access_time'])
freed_size = 0
files_deleted = 0
for cache_file in cache_files:
if total_size - freed_size <= self.max_cache_size * 0.9: # 90%まで削減
break
try:
# メタデータからも削除
cache_key = cache_file['path'].stem
if cache_key in self.metadata:
del self.metadata[cache_key]
# ファイルを削除
freed_size += cache_file['size']
files_deleted += 1
cache_file['path'].unlink()
except Exception as e:
print(f"キャッシュファイル削除エラー: {e}")
self._save_metadata()
print(f" 削除ファイル数: {files_deleted}")
print(f" 解放サイズ: {freed_size/1024/1024:.1f}MB")
except Exception as e:
print(f"キャッシュクリーンアップエラー: {e}")
def get_thumbnail(self, image_path, size=(300, 300), crop_mode='fit',
force_regenerate=False, max_age_days=30):
"""
サムネイルを取得(キャッシュがあればそれを使用)
Args:
image_path: 元画像のパス
size: サムネイルサイズ (width, height)
crop_mode: 'fit'(アスペクト比保持), 'fill'(指定サイズで埋める), 'crop'(中心でクロップ)
force_regenerate: 強制的に再生成するか
max_age_days: キャッシュの最大有効日数
Returns:
Path: サムネイルファイルのパス または None
"""
# 元画像の存在確認
if not os.path.exists(image_path):
print(f"元画像が存在しません: {image_path}")
return None
# キャッシュキーを生成
cache_key = self._generate_cache_key(image_path, size, crop_mode)
cache_path = self._get_cache_path(cache_key)
# 強制再生成でなければキャッシュをチェック
if not force_regenerate and cache_path.exists():
# キャッシュの有効期限をチェック
cache_mtime = cache_path.stat().st_mtime
cache_age = datetime.now() - datetime.fromtimestamp(cache_mtime)
# メタデータをチェック
if cache_key in self.metadata:
metadata = self.metadata[cache_key]
# 元画像が変更されていないかチェック
try:
original_mtime = os.stat(image_path).st_mtime
if (metadata.get('original_mtime') == original_mtime and
cache_age.days < max_age_days):
# キャッシュヒット - アクセス時間を更新
os.utime(cache_path, None)
print(f"キャッシュヒット: {cache_key}")
return cache_path
except:
pass
# キャッシュがないか無効の場合、サムネイルを生成
print(f"キャッシュミス、生成します: {cache_key}")
thumbnail = self._generate_thumbnail(image_path, size, crop_mode)
if thumbnail:
# サムネイルをキャッシュに保存
try:
# 最適な形式で保存(WebP)
thumbnail.save(cache_path, 'WEBP', quality=80, optimize=True)
# メタデータを更新
original_mtime = os.stat(image_path).st_mtime
self.metadata[cache_key] = {
'original_path': str(image_path),
'original_mtime': original_mtime,
'size': size,
'crop_mode': crop_mode,
'generated_at': datetime.now().isoformat(),
'cache_path': str(cache_path)
}
self._save_metadata()
# キャッシュクリーンアップを定期的に実行
if len(self.metadata) % 100 == 0:
self._cleanup_cache()
return cache_path
except Exception as e:
print(f"キャッシュ保存エラー: {e}")
return None
return None
def _generate_thumbnail(self, image_path, size, crop_mode):
"""サムネイルを生成"""
try:
with Image.open(image_path) as img:
# 画像の向きを自動修正
img = ImageOps.exif_transpose(img)
width, height = img.size
target_width, target_height = size
if crop_mode == 'fit':
# アスペクト比を保持してリサイズ
img.thumbnail(size, Image.Resampling.LANCZOS)
elif crop_mode == 'fill':
# 指定サイズで埋める(アスペクト比を無視)
img = img.resize(size, Image.Resampling.LANCZOS)
elif crop_mode == 'crop':
# アスペクト比を保持してリサイズ後、中央をクロップ
# まずはリサイズ(長辺を基準)
img_copy = img.copy()
img_copy.thumbnail(size, Image.Resampling.LANCZOS)
# 中央をクロップ
thumb_width, thumb_height = img_copy.size
if thumb_width != target_width or thumb_height != target_height:
left = (thumb_width - target_width) // 2
top = (thumb_height - target_height) // 2
right = left + target_width
bottom = top + target_height
# クロップ範囲を調整
left = max(0, left)
top = max(0, top)
right = min(thumb_width, right)
bottom = min(thumb_height, bottom)
img = img_copy.crop((left, top, right, bottom))
else:
img = img_copy
else:
raise ValueError(f"未知のクロップモード: {crop_mode}")
return img
except Exception as e:
print(f"サムネイル生成エラー: {e}")
return None
def clear_cache(self, older_than_days=None):
"""
キャッシュをクリア
Args:
older_than_days: 指定日数より古いキャッシュのみ削除(Noneなら全て)
"""
try:
deleted_count = 0
deleted_size = 0
for file_path in self.cache_dir.rglob('*'):
if file_path.is_file() and file_path.suffix in ['.webp', '.jpg', '.png']:
# 日付フィルタ
if older_than_days is not None:
file_age = datetime.now() - datetime.fromtimestamp(file_path.stat().st_mtime)
if file_age.days < older_than_days:
continue
try:
file_size = file_path.stat().st_size
file_path.unlink()
deleted_count += 1
deleted_size += file_size
except:
pass
# メタデータもクリア
self.metadata = {}
self._save_metadata()
print(f"キャッシュをクリアしました")
print(f" 削除ファイル数: {deleted_count}")
print(f" 解放サイズ: {deleted_size/1024/1024:.1f}MB")
except Exception as e:
print(f"キャッシュクリアエラー: {e}")
def get_cache_stats(self):
"""キャッシュの統計情報を取得"""
try:
total_size = 0
file_count = 0
oldest_file = None
newest_file = None
for file_path in self.cache_dir.rglob('*'):
if file_path.is_file() and file_path.suffix in ['.webp', '.jpg', '.png']:
file_count += 1
total_size += file_path.stat().st_size
mtime = file_path.stat().st_mtime
file_age = datetime.fromtimestamp(mtime)
if oldest_file is None or mtime < oldest_file[0]:
oldest_file = (mtime, file_path)
if newest_file is None or mtime > newest_file[0]:
newest_file = (mtime, file_path)
return {
'file_count': file_count,
'total_size_mb': total_size / 1024 / 1024,
'max_size_mb': self.max_cache_size / 1024 / 1024,
'metadata_entries': len(self.metadata),
'oldest_file': oldest_file[1] if oldest_file else None,
'newest_file': newest_file[1] if newest_file else None
}
except Exception as e:
print(f"キャッシュ統計取得エラー: {e}")
return {}
# 使用例
if __name__ == "__main__":
# キャッシュシステムの初期化
cache = ThumbnailCache(
cache_dir='./thumbnail_cache',
max_cache_size_mb=500
)
# サムネイルを取得(キャッシュがあれば使用)
thumbnail_path = cache.get_thumbnail(
image_path="large_photo.jpg",
size=(300, 300),
crop_mode='fit',
force_regenerate=False
)
if thumbnail_path:
print(f"サムネイルパス: {thumbnail_path}")
# 複数サイズのサムネイルを一度に取得
sizes = [(100, 100), (300, 300), (600, 600)]
for size in sizes:
thumb = cache.get_thumbnail("large_photo.jpg", size, 'fit')
if thumb:
print(f"{size} サムネイル: {thumb}")
# キャッシュ統計を表示
stats = cache.get_cache_stats()
print("\nキャッシュ統計:")
print(f" ファイル数: {stats['file_count']}")
print(f" 合計サイズ: {stats['total_size_mb']:.1f}MB / {stats['max_size_mb']:.1f}MB")
print(f" メタデータエントリ数: {stats['metadata_entries']}")
# 古いキャッシュをクリア
cache.clear_cache(older_than_days=30)
上級問題
問題10:スマート画像最適化システム
from PIL import Image, ImageStat, ImageFilter, ImageOps
import os
import numpy as np
from pathlib import Path
import json
from datetime import datetime
class SmartImageOptimizer:
def __init__(self, config_file=None):
"""
スマート画像最適化システム
Args:
config_file: 設定ファイルのパス
"""
self.config = self._load_config(config_file)
# 画像タイプの分類基準
self.image_types = {
'photograph': {
'description': '自然な写真',
'compression': {'quality': 85, 'method': 6},
'format': 'WEBP',
'resize_threshold': 4000 # 4000px以上ならリサイズ
},
'screenshot': {
'description': 'スクリーンショット・画面キャプチャ',
'compression': {'quality': 95, 'method': 4},
'format': 'PNG', # ロスレスが好ましい
'resize_threshold': 1920
},
'graphic': {
'description': 'グラフィック・ロゴ・図形',
'compression': {'quality': 100, 'method': 0},
'format': 'PNG',
'resize_threshold': 2000
},
'text': {
'description': 'テキスト中心の画像',
'compression': {'quality': 95, 'method': 5},
'format': 'WEBP',
'resize_threshold': 1600
},
'mixed': {
'description': '混合コンテンツ',
'compression': {'quality': 90, 'method': 5},
'format': 'WEBP',
'resize_threshold': 3000
}
}
def _load_config(self, config_file):
"""設定を読み込む"""
default_config = {
'max_output_size_mb': 5,
'default_quality': 85,
'enable_auto_rotation': True,
'preserve_metadata': True,
'strip_metadata': False,
'target_formats': ['WEBP', 'JPEG', 'PNG'],
'color_reduction_threshold': 64 # 色数がこの値以下なら減色を検討
}
if config_file and os.path.exists(config_file):
try:
with open(config_file, 'r', encoding='utf-8') as f:
user_config = json.load(f)
default_config.update(user_config)
except Exception as e:
print(f"設定ファイル読み込みエラー: {e}")
return default_config
def analyze_image(self, image_path):
"""
画像を分析して特徴を抽出
Returns:
dict: 画像分析結果
"""
analysis = {
'file_size': 0,
'dimensions': None,
'format': None,
'mode': None,
'is_photograph': False,
'is_screenshot': False,
'color_count': 0,
'entropy': 0.0,
'sharpness': 0.0,
'texture_complexity': 0.0,
'dominant_colors': [],
'estimated_type': 'unknown'
}
try:
with Image.open(image_path) as img:
# 基本情報
analysis['file_size'] = os.path.getsize(image_path)
analysis['dimensions'] = img.size
analysis['format'] = img.format
analysis['mode'] = img.mode
# RGBに変換(分析用)
if img.mode != 'RGB':
img_rgb = img.convert('RGB')
else:
img_rgb = img
# 画像を配列に変換
img_array = np.array(img_rgb)
# 1. 色数の推定(簡易版)
# 画像を縮小して色数を数える
small_img = img_rgb.resize((100, 100), Image.Resampling.LANCZOS)
colors = small_img.getcolors(maxcolors=10000)
if colors:
analysis['color_count'] = len(colors)
# 2. エントロピー(複雑さ)の計算
# ヒストグラムから計算
histogram = img_rgb.histogram()
total_pixels = img.size[0] * img.size[1]
entropy = 0.0
for count in histogram:
if count > 0:
probability = count / total_pixels / 3 # 3チャンネル
entropy -= probability * np.log2(probability)
analysis['entropy'] = entropy
# 3. シャープネス(鮮鋭度)の計算
# エッジ検出フィルタを適用
gray_img = img_rgb.convert('L')
edges = gray_img.filter(ImageFilter.FIND_EDGES())
edge_array = np.array(edges)
analysis['sharpness'] = np.mean(edge_array)
# 4. テクスチャの複雑さ
# 標準偏差から計算
std_r = np.std(img_array[:,:,0])
std_g = np.std(img_array[:,:,1])
std_b = np.std(img_array[:,:,2])
analysis['texture_complexity'] = (std_r + std_g + std_b) / 3
# 5. ドミナントカラー(簡易版)
# k-meansの代わりにヒストグラムから
for i in range(3): # RGB各チャンネル
channel_hist = img_array[:,:,i].flatten()
unique, counts = np.unique(channel_hist, return_counts=True)
dominant_idx = np.argmax(counts)
analysis['dominant_colors'].append(int(unique[dominant_idx]))
# 6. 画像タイプの推定
analysis['estimated_type'] = self._estimate_image_type(analysis)
return analysis
except Exception as e:
print(f"画像分析エラー: {e}")
return analysis
def _estimate_image_type(self, analysis):
"""分析結果から画像タイプを推定"""
entropy = analysis['entropy']
sharpness = analysis['sharpness']
color_count = analysis['color_count']
texture = analysis['texture_complexity']
# ヒューリスティックなルールベース分類
if color_count < 50:
return 'graphic'
elif sharpness > 50 and texture > 40:
# エッジが多く、テクスチャが複雑 → スクリーンショットの可能性
return 'screenshot'
elif 5 < entropy < 8 and color_count > 100:
# 中程度のエントロピー、多くの色 → 写真
return 'photograph'
elif entropy < 5 and color_count < 200:
# 低エントロピー、少ない色 → テキストやグラフィック
# さらにシャープネスで区別
if sharpness > 30:
return 'text'
else:
return 'graphic'
else:
return 'mixed'
def optimize_image(self, input_path, output_path=None,
target_size_mb=None, analysis=None):
"""
画像をスマートに最適化
Args:
input_path: 入力画像パス
output_path: 出力画像パス(Noneなら入力と同じ場所)
target_size_mb: 目標ファイルサイズ(MB)
analysis: 事前分析結果(Noneなら分析実行)
Returns:
dict: 最適化結果
"""
if output_path is None:
output_path = input_path
if analysis is None:
analysis = self.analyze_image(input_path)
try:
with Image.open(input_path) as img:
original_size = os.path.getsize(input_path)
original_format = img.format
# 自動回転
if self.config['enable_auto_rotation']:
img = ImageOps.exif_transpose(img)
# 画像タイプに基づいた設定を取得
image_type = analysis['estimated_type']
settings = self.image_types.get(image_type, self.image_types['mixed'])
# リサイズの必要性を判断
width, height = img.size
resize_threshold = settings['resize_threshold']
if max(width, height) > resize_threshold:
# リサイズが必要
scale_factor = resize_threshold / max(width, height)
new_width = int(width * scale_factor)
new_height = int(height * scale_factor)
img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
print(f"リサイズ: {width}x{height} -> {new_width}x{new_height}")
# 色数削減の検討
if (analysis['color_count'] < self.config['color_reduction_threshold'] and
image_type in ['graphic', 'text']):
# インデックスカラーモードに変換
if img.mode in ['RGB', 'RGBA']:
img = img.convert('P', palette=Image.Palette.ADAPTIVE, colors=256)
# 出力形式を決定
output_format = settings['format']
# 保存オプション
save_kwargs = {}
if output_format == 'WEBP':
save_kwargs = {
'format': 'WEBP',
'quality': settings['compression']['quality'],
'method': settings['compression']['method']
}
elif output_format == 'JPEG':
# RGBモードに変換
if img.mode in ('RGBA', 'LA', 'P'):
background = Image.new('RGB', img.size, (255, 255, 255))
background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None)
img = background
save_kwargs = {
'format': 'JPEG',
'quality': settings['compression']['quality'],
'optimize': True,
'progressive': True
}
elif output_format == 'PNG':
save_kwargs = {
'format': 'PNG',
'optimize': True,
'compress_level': 9
}
# 目標ファイルサイズがある場合、品質調整
if target_size_mb:
target_size_bytes = target_size_mb * 1024 * 1024
if output_format in ['WEBP', 'JPEG']:
# バイナリサーチで最適な品質を探す
optimal_quality = self._find_optimal_quality(
img, output_format, target_size_bytes, save_kwargs
)
save_kwargs['quality'] = optimal_quality
# メタデータ処理
if not self.config['strip_metadata'] and self.config['preserve_metadata']:
# Exif情報を保持
exif = img.getexif()
if exif:
save_kwargs['exif'] = exif
# 保存
img.save(output_path, **save_kwargs)
# 結果の比較
optimized_size = os.path.getsize(output_path)
reduction = ((original_size - optimized_size) / original_size * 100)
if original_size > 0 else 0
result = {
'success': True,
'original_size_mb': original_size / 1024 / 1024,
'optimized_size_mb': optimized_size / 1024 / 1024,
'reduction_percent': reduction,
'original_format': original_format,
'optimized_format': output_format,
'image_type': image_type,
'settings_used': settings,
'analysis': analysis
}
self._print_optimization_result(result)
return result
except Exception as e:
print(f"最適化エラー: {e}")
return {'success': False, 'error': str(e)}
def _find_optimal_quality(self, img, format, target_size_bytes, save_kwargs):
"""目標ファイルサイズを達成する最適な品質を探す"""
import io
low, high = 1, save_kwargs.get('quality', 95)
best_quality = low
for _ in range(10): # 最大10回試行
mid = (low + high) // 2
buffer = io.BytesIO()
temp_kwargs = save_kwargs.copy()
temp_kwargs['quality'] = mid
# 一時的な保存
if format == 'JPEG' and img.mode in ('RGBA', 'LA', 'P'):
temp_img = img.convert('RGB')
temp_img.save(buffer, **temp_kwargs)
else:
img.save(buffer, **temp_kwargs)
current_size = buffer.getbuffer().nbytes
if current_size <= target_size_bytes:
best_quality = mid
low = mid + 1
else:
high = mid - 1
return best_quality
def _print_optimization_result(self, result):
"""最適化結果を表示"""
print("\n" + "="*60)
print("スマート画像最適化結果")
print("="*60)
if result['success']:
print(f"画像タイプ: {result['image_type']}")
print(f" 分析: {self.image_types[result['image_type']]['description']}")
print(f"ファイルサイズ:")
print(f" 元: {result['original_size_mb']:.2f} MB")
print(f" 最適化後: {result['optimized_size_mb']:.2f} MB")
print(f" 削減率: {result['reduction_percent']:.1f}%")
print(f"フォーマット:")
print(f" 元: {result['original_format']}")
print(f" 最適化後: {result['optimized_format']}")
if 'analysis' in result:
analysis = result['analysis']
print(f"分析結果:")
print(f" 解像度: {analysis['dimensions'][0]}×{analysis['dimensions'][1]}")
print(f" 推定色数: {analysis['color_count']}")
print(f" エントロピー: {analysis['entropy']:.2f}")
print(f" シャープネス: {analysis['sharpness']:.1f}")
else:
print(f"エラー: {result.get('error', '不明なエラー')}")
print("="*60)
def batch_optimize(self, input_dir, output_dir=None, recursive=True):
"""
ディレクトリ内の全画像をバッチ最適化
Args:
input_dir: 入力ディレクトリ
output_dir: 出力ディレクトリ(Noneなら入力と同じ)
recursive: サブディレクトリを含むか
"""
from pathlib import Path
input_path = Path(input_dir)
if output_dir is None:
output_path = input_path.parent / f"{input_path.name}_optimized"
else:
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# ファイルを収集
extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp']
if recursive:
file_iter = input_path.rglob('*')
else:
file_iter = input_path.glob('*')
files_to_process = []
for item in file_iter:
if item.is_file() and item.suffix.lower() in extensions:
files_to_process.append(item)
print(f"処理対象: {len(files_to_process)}ファイル")
results = {
'total': len(files_to_process),
'success': 0,
'failed': 0,
'total_reduction_mb': 0,
'by_type': {}
}
for input_file in files_to_process:
try:
# 出力パスを構築
rel_path = input_file.relative_to(input_path)
output_file = output_path / rel_path
output_file.parent.mkdir(parents=True, exist_ok=True)
# 分析と最適化
analysis = self.analyze_image(str(input_file))
result = self.optimize_image(
input_path=str(input_file),
output_path=str(output_file),
analysis=analysis
)
if result['success']:
results['success'] += 1
reduction_mb = result['original_size_mb'] - result['optimized_size_mb']
results['total_reduction_mb'] += reduction_mb
# タイプ別統計
img_type = result['image_type']
if img_type not in results['by_type']:
results['by_type'][img_type] = {
'count': 0,
'total_reduction_mb': 0
}
results['by_type'][img_type]['count'] += 1
results['by_type'][img_type]['total_reduction_mb'] += reduction_mb
else:
results['failed'] += 1
print(f"処理: {rel_path}")
except Exception as e:
print(f"処理エラー ({input_file}): {e}")
results['failed'] += 1
# 結果の要約
print("\n" + "="*60)
print("バッチ最適化完了")
print("="*60)
print(f"総ファイル数: {results['total']}")
print(f"成功: {results['success']}")
print(f"失敗: {results['failed']}")
print(f"合計削減サイズ: {results['total_reduction_mb']:.2f} MB")
if results['by_type']:
print("\n画像タイプ別統計:")
for img_type, stats in results['by_type'].items():
desc = self.image_types.get(img_type, {}).get('description', '不明')
print(f" {img_type} ({desc}):")
print(f" 件数: {stats['count']}")
print(f" 削減サイズ: {stats['total_reduction_mb']:.2f} MB")
if stats['count'] > 0:
avg_reduction = stats['total_reduction_mb'] / stats['count']
print(f" 平均削減/ファイル: {avg_reduction:.2f} MB")
return results
# 使用例
if __name__ == "__main__":
# オプティマイザの初期化
optimizer = SmartImageOptimizer()
# 単一ファイルの分析
analysis = optimizer.analyze_image("sample_photo.jpg")
print(f"分析結果: {analysis['estimated_type']}")
# 単一ファイルの最適化
result = optimizer.optimize_image(
input_path="sample_photo.jpg",
output_path="sample_photo_optimized.webp",
target_size_mb=1.0 # 1MB以下を目指す
)
# バッチ最適化
results = optimizer.batch_optimize(
input_dir="./raw_images",
output_dir="./optimized_images",
recursive=True
)
問題11:プログレッシブ画像表示システム
了解です。**Python 全体を Prism にそのまま貼れる形**(`…`)で、**HTMLとして崩れないように全面エスケープ**しました。
※ご要望どおり、**Python 内の `=>`(アロー)は置換していません**(そのまま表示されます)。
```html
from PIL import Image, ImageOps
import os
import base64
import io
from pathlib import Path
import json
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading
import time
class ProgressiveImageGenerator:
def __init__(self, cache_dir='./progressive_cache'):
"""
プログレッシブ画像表示システム
Args:
cache_dir: キャッシュディレクトリ
"""
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
# プログレッシブ表示用の品質レベル
self.quality_levels = [
{'quality': 10, 'size_factor': 0.1, 'blur': 5}, # レベル1: 非常に低品質
{'quality': 30, 'size_factor': 0.3, 'blur': 3}, # レベル2: 低品質
{'quality': 50, 'size_factor': 0.5, 'blur': 2}, # レベル3: 中品質
{'quality': 70, 'size_factor': 0.7, 'blur': 1}, # レベル4: 中高品質
{'quality': 90, 'size_factor': 1.0, 'blur': 0}, # レベル5: 高品質
]
def generate_progressive_layers(self, image_path, output_dir=None):
"""
プログレッシブ表示用の複数品質レイヤーを生成
Args:
image_path: 元画像のパス
output_dir: 出力ディレクトリ(Noneならキャッシュを使用)
Returns:
dict: 生成されたレイヤー情報
"""
if output_dir is None:
output_dir = self.cache_dir
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# 元画像のハッシュを計算(キャッシュキーとして使用)
with open(image_path, 'rb') as f:
file_hash = base64.urlsafe_b64encode(
os.urandom(16) # 簡易的なハッシュ
).decode('utf-8')[:8]
layers_info = {
'original': image_path,
'hash': file_hash,
'layers': [],
'total_size': 0,
'generated_at': time.time()
}
try:
with Image.open(image_path) as original_img:
# 自動回転
original_img = ImageOps.exif_transpose(original_img)
original_width, original_height = original_img.size
for i, level in enumerate(self.quality_levels):
# 一時的な画像を作成
layer_img = original_img.copy()
# サイズ縮小
size_factor = level['size_factor']
if size_factor < 1.0:
new_width = int(original_width * size_factor)
new_height = int(original_height * size_factor)
layer_img = layer_img.resize(
(new_width, new_height),
Image.Resampling.LANCZOS
)
# ぼかし適用(低品質レイヤーのみ)
if level['blur'] > 0:
layer_img = layer_img.filter(
ImageFilter.GaussianBlur(level['blur'])
)
# レイヤーを保存
layer_filename = f"{file_hash}_layer_{i}.webp"
layer_path = output_path / layer_filename
layer_img.save(
layer_path,
format='WEBP',
quality=level['quality'],
method=6 # 高圧縮
)
# レイヤー情報を記録
layer_size = layer_path.stat().st_size
layers_info['layers'].append({
'level': i,
'path': str(layer_path),
'filename': layer_filename,
'quality': level['quality'],
'size_factor': size_factor,
'blur': level['blur'],
'width': layer_img.width,
'height': layer_img.height,
'size_bytes': layer_size
})
layers_info['total_size'] += layer_size
print(f"レイヤー {i} 生成: {layer_img.width}x{layer_img.height}, "
f"品質 {level['quality']}, {layer_size/1024:.1f}KB")
# 最終的な高品質画像も保存
final_filename = f"{file_hash}_final.webp"
final_path = output_path / final_filename
original_img.save(
final_path,
format='WEBP',
quality=95,
method=6
)
final_size = final_path.stat().st_size
layers_info['final'] = {
'path': str(final_path),
'filename': final_filename,
'size_bytes': final_size
}
layers_info['total_size'] += final_size
# メタデータを保存
metadata_path = output_path / f"{file_hash}_metadata.json"
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(layers_info, f, indent=2, ensure_ascii=False)
print(f"プログレッシブ画像生成完了: {len(layers_info['layers'])}レイヤー")
print(f"合計サイズ: {layers_info['total_size']/1024:.1f}KB")
return layers_info
except Exception as e:
print(f"プログレッシブ画像生成エラー: {e}")
return None
def generate_data_urls(self, image_path, max_layers=None):
"""
プログレッシブ表示用のData URLを生成(小規模サイト用)
Args:
image_path: 画像パス
max_layers: 最大レイヤー数(Noneなら全て)
Returns:
dict: Data URLを含む情報
"""
if max_layers is None:
max_layers = len(self.quality_levels)
data_urls = {
'original_size': (0, 0),
'layers': []
}
try:
with Image.open(image_path) as img:
data_urls['original_size'] = img.size
for i in range(min(max_layers, len(self.quality_levels))):
level = self.quality_levels[i]
# 一時的な画像を作成
temp_img = img.copy()
# サイズ縮小
size_factor = level['size_factor']
if size_factor < 1.0:
width, height = img.size
new_width = int(width * size_factor)
new_height = int(height * size_factor)
temp_img = temp_img.resize(
(new_width, new_height),
Image.Resampling.LANCZOS
)
# バイトデータに変換
buffer = io.BytesIO()
temp_img.save(
buffer,
format='WEBP',
quality=level['quality'],
method=6
)
# Base64エンコードしてData URLを作成
base64_data = base64.b64encode(buffer.getvalue()).decode('utf-8')
data_url = f"data:image/webp;base64,{base64_data}"
data_urls['layers'].append({
'level': i,
'data_url': data_url[:100] + "..." if len(data_url) > 100 else data_url,
'data_url_length': len(data_url),
'size': temp_img.size,
'quality': level['quality']
})
print(f"レイヤー {i} Data URL生成: {len(data_url)}文字")
return data_urls
except Exception as e:
print(f"Data URL生成エラー: {e}")
return None
def create_html_demo(self, image_path, output_html_path):
"""
プログレッシブ読み込みのデモHTMLを生成
Args:
image_path: 画像パス
output_html_path: 出力HTMLパス
"""
# レイヤーを生成
layers_info = self.generate_progressive_layers(image_path)
if not layers_info:
print("レイヤー生成に失敗しました")
return
html_content = f"""
<!DOCTYPE html>
<html lang="ja">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>プログレッシブ画像読み込みデモ</title>
<style>
body {{
font-family: Arial, sans-serif;
max-width: 1200px;
margin: 0 auto;
padding: 20px;
background-color: #f5f5f5;
}}
h1, h2 {{
color: #333;
}}
.demo-container {{
display: flex;
flex-wrap: wrap;
gap: 20px;
margin-bottom: 30px;
}}
.image-box {{
flex: 1;
min-width: 300px;
background: white;
padding: 15px;
border-radius: 8px;
box-shadow: 0 2px 5px rgba(0,0,0,0.1);
}}
.image-box h3 {{
margin-top: 0;
color: #555;
}}
.progressive-image {{
width: 100%;
height: 300px;
object-fit: contain;
background: linear-gradient(45deg, #eee 25%, transparent 25%),
linear-gradient(-45deg, #eee 25%, transparent 25%),
linear-gradient(45deg, transparent 75%, #eee 75%),
linear-gradient(-45deg, transparent 75%, #eee 75%);
background-size: 20px 20px;
background-position: 0 0, 0 10px, 10px -10px, -10px 0px;
}}
.loading-bar {{
width: 100%;
height: 4px;
background: #ddd;
margin: 10px 0;
overflow: hidden;
}}
.loading-progress {{
height: 100%;
background: #4CAF50;
width: 0%;
transition: width 0.3s;
}}
.stats {{
font-size: 12px;
color: #666;
margin-top: 10px;
}}
.control-panel {{
background: white;
padding: 20px;
border-radius: 8px;
margin-bottom: 20px;
box-shadow: 0 2px 5px rgba(0,0,0,0.1);
}}
.layer-info {{
background: white;
padding: 15px;
border-radius: 8px;
margin-top: 20px;
box-shadow: 0 2px 5px rgba(0,0,0,0.1);
}}
table {{
width: 100%;
border-collapse: collapse;
}}
th, td {{
padding: 8px;
text-align: left;
border-bottom: 1px solid #ddd;
}}
th {{
background-color: #f2f2f2;
}}
</style>
</head>
<body>
<h1>プログレッシブ画像読み込みデモ</h1>
<div class="control-panel">
<h2>制御パネル</h2>
<div>
<label for="networkSpeed">ネットワーク速度シミュレーション:</label>
<select id="networkSpeed">
<option value="50">3G (50KB/s)</option>
<option value="200">4G (200KB/s)</option>
<option value="1000" selected>WiFi (1MB/s)</option>
<option value="5000">高速 (5MB/s)</option>
</select>
<button id="startDemo">デモ開始</button>
<button id="resetDemo">リセット</button>
</div>
</div>
<div class="demo-container">
<div class="image-box">
<h3>従来の読み込み方式</h3>
<div class="loading-bar">
<div id="traditionalProgress" class="loading-progress"></div>
</div>
<img id="traditionalImage" class="progressive-image"
src="{layers_info['final']['path']}"
style="display: none;">
<div class="stats" id="traditionalStats"></div>
</div>
<div class="image-box">
<h3>プログレッシブ読み込み方式</h3>
<div class="loading-bar">
<div id="progressiveProgress" class="loading-progress"></div>
</div>
<img id="progressiveImage" class="progressive-image"
src="" style="display: none;">
<div class="stats" id="progressiveStats"></div>
</div>
</div>
<div class="layer-info">
<h2>レイヤー詳細情報</h2>
<table>
<tr>
<th>レベル</th>
<th>解像度</th>
<th>品質</th>
<th>サイズ</th>
<th>概要</th>
</tr>
"""
# レイヤー情報のテーブル行を追加
for i, layer in enumerate(layers_info['layers']):
html_content += f"""
<tr>
<td>レベル {layer['level']}</td>
<td>{layer['width']} × {layer['height']}</td>
<td>{layer['quality']}</td>
<td>{layer['size_bytes']/1024:.1f} KB</td>
<td>{"初期表示用" if i == 0 else "段階的改善用"}</td>
</tr>
"""
html_content += f"""
<tr>
<td><strong>最終画像</strong></td>
<td>{layers_info['original_size']}</td>
<td>95</td>
<td>{layers_info['final']['size_bytes']/1024:.1f} KB</td>
<td>高品質最終画像</td>
</tr>
</table>
</div>
<script>
const layers = {json.dumps(layers_info['layers'], ensure_ascii=False)};
const finalImage = "{layers_info['final']['path']}";
let traditionalLoaded = false;
let progressiveLoaded = false;
let currentLayer = 0;
document.getElementById('startDemo').addEventListener('click', startDemo);
document.getElementById('resetDemo').addEventListener('click', resetDemo);
function startDemo() {{
const speed = parseInt(document.getElementById('networkSpeed').value);
resetDemo();
// 従来方式のシミュレーション
simulateTraditionalLoad(speed);
// プログレッシブ方式のシミュレーション
simulateProgressiveLoad(speed);
}}
function resetDemo() {{
traditionalLoaded = false;
progressiveLoaded = false;
currentLayer = 0;
document.getElementById('traditionalProgress').style.width = '0%';
document.getElementById('progressiveProgress').style.width = '0%';
document.getElementById('traditionalImage').style.display = 'none';
document.getElementById('progressiveImage').style.display = 'none';
document.getElementById('traditionalStats').innerHTML = '';
document.getElementById('progressiveStats').innerHTML = '';
}}
function simulateTraditionalLoad(speed) {{
const finalSize = {layers_info['final']['size_bytes']};
const loadTime = (finalSize / speed) / 1000; // 秒単位
let progress = 0;
const interval = setInterval(() => {{
progress += 1;
document.getElementById('traditionalProgress').style.width = progress + '%';
document.getElementById('traditionalStats').innerHTML =
`読み込み中... ${{progress}}% (${{(finalSize * progress / 100 / 1024).toFixed(1)}}KB/${{(finalSize / 1024).toFixed(1)}}KB)`;
if (progress >= 100) {{
clearInterval(interval);
traditionalLoaded = true;
document.getElementById('traditionalImage').style.display = 'block';
document.getElementById('traditionalStats').innerHTML =
`読み込み完了: ${{loadTime.toFixed(1)}}秒 (${{(finalSize / 1024).toFixed(1)}}KB)`;
updateComparison();
}}
}}, loadTime * 10);
}}
function simulateProgressiveLoad(speed) {{
let totalLoaded = 0;
const totalSize = layers.reduce((sum, layer) => sum + layer.size_bytes, 0);
function loadNextLayer() {{
if (currentLayer >= layers.length) {{
// 最終画像をロード
loadFinalImage(speed);
return;
}}
const layer = layers[currentLayer];
const layerTime = (layer.size_bytes / speed) / 1000;
setTimeout(() => {{
totalLoaded += layer.size_bytes;
const progress = (totalLoaded / totalSize) * 100;
document.getElementById('progressiveProgress').style.width = progress + '%';
document.getElementById('progressiveImage').src = layer.path;
document.getElementById('progressiveImage').style.display = 'block';
document.getElementById('progressiveStats').innerHTML =
`レベル ${{currentLayer}} 読み込み完了 (${{layer.size_bytes/1024:.1f}}KB)<br>
累計: ${{(totalLoaded/1024).toFixed(1)}}KB/${{(totalSize/1024).toFixed(1)}}KB`;
currentLayer++;
loadNextLayer();
}}, layerTime * 1000);
}}
function loadFinalImage(speed) {{
const finalSize = {layers_info['final']['size_bytes']};
const finalTime = (finalSize / speed) / 1000;
setTimeout(() => {{
totalLoaded += finalSize;
const progress = 100;
document.getElementById('progressiveProgress').style.width = progress + '%';
document.getElementById('progressiveImage').src = finalImage;
document.getElementById('progressiveStats').innerHTML =
`最終画像読み込み完了<br>
総時間: ${{((totalLoaded - finalSize) / speed / 1000 + finalTime).toFixed(1)}}秒<br>
総データ量: ${{(totalLoaded/1024).toFixed(1)}}KB`;
progressiveLoaded = true;
updateComparison();
}}, finalTime * 1000);
}}
loadNextLayer();
}}
function updateComparison() {{
if (traditionalLoaded && progressiveLoaded) {{
const traditionalImg = document.getElementById('traditionalImage');
const progressiveImg = document.getElementById('progressiveImage');
// 両方読み込み完了時の比較表示
console.log('両方の読み込みが完了しました');
}}
}}
</script>
</body>
</html>
"""
# HTMLファイルを保存
with open(output_html_path, 'w', encoding='utf-8') as f:
f.write(html_content)
print(f"デモHTMLを生成しました: {output_html_path}")
return output_html_path
class ProgressiveImageServer:
"""プログレッシブ画像配信用の簡易HTTPサーバー"""
def __init__(self, port=8080, image_dir='.'):
self.port = port
self.image_dir = Path(image_dir)
self.generator = ProgressiveImageGenerator()
def start(self):
"""サーバーを起動"""
handler = self._create_handler()
server = HTTPServer(('localhost', self.port), handler)
print(f"サーバーを起動しました: http://localhost:{self.port}")
print(f"画像ディレクトリ: {self.image_dir}")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nサーバーを停止しました")
def _create_handler(self):
"""HTTPリクエストハンドラを作成"""
generator = self.generator
image_dir = self.image_dir
class ProgressiveImageHandler(BaseHTTPRequestHandler):
def do_GET(self):
# パスを解析
path = self.path
if path == '/':
# インデックスページ
self._send_index()
elif path.startswith('/generate/'):
# 画像生成リクエスト
filename = path.split('/')[-1]
image_path = image_dir / filename
if image_path.exists():
layers_info = generator.generate_progressive_layers(str(image_path))
self._send_json(layers_info)
else:
self._send_error(404, "File not found")
elif path.startswith('/image/'):
# 画像ファイルの配信
filename = path.split('/')[-1]
file_path = generator.cache_dir / filename
if file_path.exists():
self._send_file(file_path)
else:
self._send_error(404, "File not found")
else:
self._send_error(404, "Not found")
def _send_index(self):
"""インデックスページを送信"""
html = """
<!DOCTYPE html>
<html>
<head>
<title>プログレッシブ画像サーバー</title>
<style>
body { font-family: Arial, sans-serif; padding: 20px; }
.image-list { display: grid; grid-template-columns: repeat(auto-fill, minmax(200px, 1fr)); gap: 20px; }
.image-item { border: 1px solid #ddd; padding: 10px; border-radius: 5px; }
.image-item img { width: 100%; height: 150px; object-fit: cover; }
</style>
</head>
<body>
<h1>プログレッシブ画像サーバー</h1>
<div class="image-list" id="imageList"></div>
<script>
// 画像リストを取得
fetch('/list')
.then(response => response.json())
.then(images => {
const container = document.getElementById('imageList');
images.forEach(image => {
const item = document.createElement('div');
item.className = 'image-item';
item.innerHTML = `
<img src="/preview/${image}" alt="${image}">
<h4>${image}</h4>
<button onclick="generateProgressive('${image}')">プログレッシブ生成</button>
`;
container.appendChild(item);
});
});
function generateProgressive(filename) {
fetch(`/generate/${filename}`)
.then(response => response.json())
.then(data => {
alert(`生成完了: ${data.layers.length}レイヤー`);
});
}
</script>
</body>
</html>
"""
self.send_response(200)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
self.wfile.write(html.encode('utf-8'))
def _send_json(self, data):
"""JSONデータを送信"""
self.send_response(200)
self.send_header('Content-type', 'application/json; charset=utf-8')
self.end_headers()
self.wfile.write(json.dumps(data, ensure_ascii=False).encode('utf-8'))
def _send_file(self, file_path):
"""ファイルを送信"""
self.send_response(200)
# コンテントタイプを設定
if file_path.suffix == '.webp':
self.send_header('Content-type', 'image/webp')
elif file_path.suffix == '.json':
self.send_header('Content-type', 'application/json')
else:
self.send_header('Content-type', 'application/octet-stream')
# ファイルサイズ
file_size = file_path.stat().st_size
self.send_header('Content-Length', str(file_size))
self.end_headers()
# ファイルを送信
with open(file_path, 'rb') as f:
self.wfile.write(f.read())
def _send_error(self, code, message):
"""エラーレスポンスを送信"""
self.send_response(code)
self.send_header('Content-type', 'text/plain; charset=utf-8')
self.end_headers()
self.wfile.write(message.encode('utf-8'))
return ProgressiveImageHandler
# 使用例
if __name__ == "__main__":
# プログレッシブ画像ジェネレータの使用
generator = ProgressiveImageGenerator()
# レイヤーを生成
layers_info = generator.generate_progressive_layers("large_photo.jpg")
# Data URLを生成(小さい画像用)
data_urls = generator.generate_data_urls("small_photo.jpg", max_layers=3)
# デモHTMLを生成
generator.create_html_demo("demo_photo.jpg", "progressive_demo.html")
# サーバーを起動(別スレッドで)
# server = ProgressiveImageServer(port=8080, image_dir='./images')
# server_thread = threading.Thread(target=server.start)
# server_thread.daemon = True
# server_thread.start()
# print("サーバーが起動しました。Ctrl+Cで終了します。")
# server_thread.join()
問題12:クラウド画像処理パイプライン
"""
注意: この実装は概念実証レベルのものです。
実際の運用では、Celery, Redis, クラウドストレージ(S3等)を使用してください。
"""
import os
import json
import time
import hashlib
import threading
import queue
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from PIL import Image, ImageOps
import redis
from celery import Celery
import boto3
from botocore.exceptions import ClientError
# 環境変数から設定を読み込む(実際の環境では.envファイルなどを使用)
REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
CELERY_BROKER = os.getenv('CELERY_BROKER', 'redis://localhost:6379/0')
CELERY_BACKEND = os.getenv('CELERY_BACKEND', 'redis://localhost:6379/0')
AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY', '')
AWS_SECRET_KEY = os.getenv('AWS_SECRET_KEY', '')
AWS_REGION = os.getenv('AWS_REGION', 'us-east-1')
S3_BUCKET = os.getenv('S3_BUCKET', 'your-image-bucket')
CDN_URL = os.getenv('CDN_URL', 'https://cdn.example.com')
# Celeryアプリの設定
app = Celery('image_pipeline', broker=CELERY_BROKER, backend=CELERY_BACKEND)
# Redisクライアント(進捗状況の追跡用)
redis_client = redis.from_url(REDIS_URL)
# S3クライアント(実際の環境では適切な認証情報を設定)
s3_client = None
if AWS_ACCESS_KEY and AWS_SECRET_KEY:
s3_client = boto3.client(
's3',
aws_access_key_id=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET_KEY,
region_name=AWS_REGION
)
class ImageProcessingPipeline:
"""画像処理パイプラインの管理クラス"""
def __init__(self):
self.processing_queue = queue.Queue()
self.processing_threads = []
self.max_threads = 4
# 処理ステータスの定義
self.statuses = {
'pending': '待機中',
'processing': '処理中',
'completed': '完了',
'failed': '失敗',
'uploading': 'アップロード中',
'distributed': '配信完了'
}
def upload_image(self, file_path: str, user_id: str,
metadata: Optional[Dict] = None) -> str:
"""
画像をパイプラインに投入
Args:
file_path: 画像ファイルのパス
user_id: ユーザーID
metadata: 追加メタデータ
Returns:
str: ジョブID
"""
# ジョブIDを生成
job_id = hashlib.md5(
f"{file_path}:{user_id}:{time.time()}".encode()
).hexdigest()[:16]
# ジョブ情報を作成
job = {
'job_id': job_id,
'file_path': file_path,
'user_id': user_id,
'metadata': metadata or {},
'status': 'pending',
'created_at': datetime.now().isoformat(),
'steps': [],
'results': {}
}
# Redisにジョブ情報を保存
redis_key = f"image_job:{job_id}"
redis_client.setex(
redis_key,
timedelta(hours=24), # 24時間有効
json.dumps(job)
)
# 処理キューに追加
self.processing_queue.put(job_id)
# 処理スレッドを起動(まだ動いていない場合)
self._start_processing_threads()
return job_id
def _start_processing_threads(self):
"""処理スレッドを起動"""
if not self.processing_threads:
for i in range(self.max_threads):
thread = threading.Thread(
target=self._process_queue,
daemon=True,
name=f"ImageProcessor-{i}"
)
thread.start()
self.processing_threads.append(thread)
def _process_queue(self):
"""キューからジョブを取り出して処理"""
while True:
try:
job_id = self.processing_queue.get(timeout=1)
self._process_job(job_id)
except queue.Empty:
continue
except Exception as e:
print(f"キュー処理エラー: {e}")
def _process_job(self, job_id: str):
"""個別のジョブを処理"""
redis_key = f"image_job:{job_id}"
try:
# ジョブ情報を取得
job_data = redis_client.get(redis_key)
if not job_data:
print(f"ジョブが見つかりません: {job_id}")
return
job = json.loads(job_data)
# ステータスを更新
job['status'] = 'processing'
redis_client.setex(redis_key, timedelta(hours=24), json.dumps(job))
file_path = job['file_path']
# 各処理ステップを実行
steps = [
('validation', self._validate_image),
('resize', self._resize_image),
('thumbnail_generation', self._generate_thumbnails),
('optimization', self._optimize_image),
('upload', self._upload_to_storage),
('cdn_distribution', self._distribute_to_cdn)
]
for step_name, step_func in steps:
try:
# ステップ開始
step_start = time.time()
# ステップ実行
result = step_func(file_path, job)
# ステップ完了情報を記録
step_duration = time.time() - step_start
job['steps'].append({
'name': step_name,
'status': 'completed',
'duration': step_duration,
'result': result
})
# Redisを更新
redis_client.setex(redis_key, timedelta(hours=24), json.dumps(job))
# 結果をjobに追加
if step_name in ['thumbnail_generation', 'upload', 'cdn_distribution']:
job['results'][step_name] = result
except Exception as e:
# ステップ失敗
job['steps'].append({
'name': step_name,
'status': 'failed',
'error': str(e)
})
job['status'] = 'failed'
redis_client.setex(redis_key, timedelta(hours=24), json.dumps(job))
print(f"ジョブ {job_id} ステップ {step_name} で失敗: {e}")
return
# すべてのステップが完了
job['status'] = 'completed'
job['completed_at'] = datetime.now().isoformat()
redis_client.setex(redis_key, timedelta(hours=24), json.dumps(job))
print(f"ジョブ {job_id} 完了")
except Exception as e:
print(f"ジョブ処理エラー: {e}")
# エラー状態を記録
try:
job['status'] = 'failed'
job['error'] = str(e)
redis_client.setex(redis_key, timedelta(hours=24), json.dumps(job))
except:
pass
def _validate_image(self, file_path: str, job: Dict) -> Dict:
"""画像の検証"""
try:
with Image.open(file_path) as img:
# 基本的な検証
width, height = img.size
format = img.format
mode = img.mode
# ファイルサイズ制限(例: 10MB)
max_size_mb = 10
file_size = os.path.getsize(file_path)
if file_size > max_size_mb * 1024 * 1024:
raise ValueError(f"ファイルサイズが大きすぎます: {file_size/1024/1024:.1f}MB")
# 許可されるフォーマット
allowed_formats = ['JPEG', 'PNG', 'GIF', 'WEBP']
if format not in allowed_formats:
raise ValueError(f"許可されていないフォーマット: {format}")
return {
'valid': True,
'dimensions': (width, height),
'format': format,
'mode': mode,
'file_size_bytes': file_size
}
except Exception as e:
raise ValueError(f"画像検証失敗: {e}")
def _resize_image(self, file_path: str, job: Dict) -> Dict:
"""画像のリサイズ"""
try:
# 設定から最大サイズを取得
max_size = job['metadata'].get('max_size', (1920, 1080))
with Image.open(file_path) as img:
# 自動回転
img = ImageOps.exif_transpose(img)
original_size = img.size
# リサイズ(必要に応じて)
if img.size[0] > max_size[0] or img.size[1] > max_size[1]:
img.thumbnail(max_size, Image.Resampling.LANCZOS)
# 一時ファイルに保存
temp_dir = Path("/tmp/image_pipeline")
temp_dir.mkdir(exist_ok=True)
temp_path = temp_dir / f"resized_{job['job_id']}.webp"
img.save(temp_path, 'WEBP', quality=85, method=6)
# 元のファイルパスを更新
job['file_path'] = str(temp_path)
return {
'original_size': original_size,
'resized_size': img.size,
'temp_path': str(temp_path)
}
except Exception as e:
raise ValueError(f"リサイズ失敗: {e}")
def _generate_thumbnails(self, file_path: str, job: Dict) -> Dict:
"""サムネイル生成"""
try:
thumbnails = {}
thumbnail_sizes = {
'xs': (100, 100),
'sm': (300, 300),
'md': (600, 600),
'lg': (1200, 1200)
}
temp_dir = Path("/tmp/image_pipeline/thumbnails")
temp_dir.mkdir(parents=True, exist_ok=True)
with Image.open(file_path) as img:
for size_name, dimensions in thumbnail_sizes.items():
# サムネイル生成
thumb = img.copy()
thumb.thumbnail(dimensions, Image.Resampling.LANCZOS)
# 保存
thumb_path = temp_dir / f"{job['job_id']}_{size_name}.webp"
thumb.save(thumb_path, 'WEBP', quality=80, method=6)
thumbnails[size_name] = {
'path': str(thumb_path),
'size': thumb.size,
'file_size': thumb_path.stat().st_size
}
return thumbnails
except Exception as e:
raise ValueError(f"サムネイル生成失敗: {e}")
def _optimize_image(self, file_path: str, job: Dict) -> Dict:
"""画像の最適化"""
try:
# 画像タイプに基づいた最適化
with Image.open(file_path) as img:
# メタデータから最適化設定を取得
optimize_settings = job['metadata'].get('optimize', {})
quality = optimize_settings.get('quality', 85)
# 最適化された一時ファイルを作成
temp_dir = Path("/tmp/image_pipeline/optimized")
temp_dir.mkdir(parents=True, exist_ok=True)
optimized_path = temp_dir / f"optimized_{job['job_id']}.webp"
# WebP形式で保存(高圧縮)
img.save(optimized_path, 'WEBP', quality=quality, method=6)
# ファイルパスを更新
job['file_path'] = str(optimized_path)
original_size = os.path.getsize(file_path)
optimized_size = os.path.getsize(optimized_path)
return {
'optimized_path': str(optimized_path),
'original_size': original_size,
'optimized_size': optimized_size,
'reduction_percent': ((original_size - optimized_size) / original_size * 100)
if original_size > 0 else 0
}
except Exception as e:
raise ValueError(f"最適化失敗: {e}")
def _upload_to_storage(self, file_path: str, job: Dict) -> Dict:
"""ストレージへのアップロード"""
try:
if not s3_client:
raise ValueError("S3クライアントが初期化されていません")
# アップロードパスを生成
user_id = job['user_id']
timestamp = datetime.now().strftime('%Y/%m/%d')
filename = Path(file_path).name
# メインファイルのアップロード
main_key = f"images/{user_id}/{timestamp}/original/{filename}"
with open(file_path, 'rb') as f:
s3_client.upload_fileobj(
f,
S3_BUCKET,
main_key,
ExtraArgs={
'ContentType': 'image/webp',
'CacheControl': 'max-age=31536000' # 1年間キャッシュ
}
)
# サムネイルのアップロード
thumbnail_urls = {}
if 'thumbnail_generation' in job['results']:
thumbnails = job['results']['thumbnail_generation']
for size_name, thumb_info in thumbnails.items():
thumb_key = f"images/{user_id}/{timestamp}/thumbnails/{size_name}/{filename}"
with open(thumb_info['path'], 'rb') as f:
s3_client.upload_fileobj(
f,
S3_BUCKET,
thumb_key,
ExtraArgs={
'ContentType': 'image/webp',
'CacheControl': 'max-age=31536000'
}
)
thumbnail_urls[size_name] = f"https://{S3_BUCKET}.s3.{AWS_REGION}.amazonaws.com/{thumb_key}"
return {
'main_file_url': f"https://{S3_BUCKET}.s3.{AWS_REGION}.amazonaws.com/{main_key}",
'thumbnail_urls': thumbnail_urls,
'bucket': S3_BUCKET,
'keys': {
'main': main_key,
'thumbnails': list(thumbnail_urls.values())
}
}
except Exception as e:
raise ValueError(f"アップロード失敗: {e}")
def _distribute_to_cdn(self, file_path: str, job: Dict) -> Dict:
"""CDNへの配信(実際の環境ではCloudFrontなどのCDNと連携)"""
try:
# ここでは簡易的な実装
# 実際の環境では、CloudFrontのInvalidation APIなどを呼び出す
upload_result = job['results']['upload']
main_url = upload_result['main_file_url']
thumbnail_urls = upload_result['thumbnail_urls']
# CDN URLに変換(仮定)
cdn_main_url = main_url.replace(
f"https://{S3_BUCKET}.s3.{AWS_REGION}.amazonaws.com/",
f"{CDN_URL}/"
)
cdn_thumbnail_urls = {}
for size_name, url in thumbnail_urls.items():
cdn_thumbnail_urls[size_name] = url.replace(
f"https://{S3_BUCKET}.s3.{AWS_REGION}.amazonaws.com/",
f"{CDN_URL}/"
)
# ここで実際のCDNキャッシュパージAPIを呼び出す
# self._purge_cdn_cache([cdn_main_url] + list(cdn_thumbnail_urls.values()))
return {
'cdn_main_url': cdn_main_url,
'cdn_thumbnail_urls': cdn_thumbnail_urls,
'distribution_time': datetime.now().isoformat()
}
except Exception as e:
raise ValueError(f"CDN配信失敗: {e}")
def get_job_status(self, job_id: str) -> Optional[Dict]:
"""ジョブのステータスを取得"""
redis_key = f"image_job:{job_id}"
job_data = redis_client.get(redis_key)
if job_data:
return json.loads(job_data)
return None
def cleanup_temp_files(self, older_than_hours: int = 24):
"""古い一時ファイルをクリーンアップ"""
temp_dir = Path("/tmp/image_pipeline")
if temp_dir.exists():
cutoff_time = time.time() - (older_than_hours * 3600)
for file_path in temp_dir.rglob("*"):
if file_path.is_file():
if file_path.stat().st_mtime < cutoff_time:
try:
file_path.unlink()
except:
pass
# Flaskアプリケーションとの連携用
from flask import Flask, request, jsonify, Blueprint
def create_image_api(pipeline: ImageProcessingPipeline):
"""Flask APIエンドポイントを作成"""
bp = Blueprint('image_api', __name__, url_prefix='/api/images')
@bp.route('/upload', methods=['POST'])
def upload_image():
"""画像アップロードエンドポイント"""
try:
if 'image' not in request.files:
return jsonify({'error': '画像ファイルがありません'}), 400
image_file = request.files['image']
user_id = request.form.get('user_id', 'anonymous')
# 一時ファイルに保存
temp_dir = Path("/tmp/uploads")
temp_dir.mkdir(exist_ok=True)
temp_path = temp_dir / f"upload_{int(time.time())}_{image_file.filename}"
image_file.save(temp_path)
# メタデータの取得
metadata = {
'max_size': tuple(map(int, request.form.get('max_size', '1920,1080').split(','))),
'optimize': {'quality': int(request.form.get('quality', 85))},
'original_filename': image_file.filename,
'content_type': image_file.content_type
}
# パイプラインに投入
job_id = pipeline.upload_image(
file_path=str(temp_path),
user_id=user_id,
metadata=metadata
)
return jsonify({
'success': True,
'job_id': job_id,
'message': '画像処理を開始しました'
}), 202
except Exception as e:
return jsonify({'error': str(e)}), 500
@bp.route('/status/', methods=['GET'])
def get_status(job_id):
"""処理ステータス取得エンドポイント"""
job_status = pipeline.get_job_status(job_id)
if job_status:
# クライアント向けに情報をフィルタリング
response = {
'job_id': job_status['job_id'],
'status': job_status['status'],
'created_at': job_status['created_at'],
'steps': [
{
'name': step['name'],
'status': step['status'],
'duration': step.get('duration')
}
for step in job_status.get('steps', [])
]
}
# 完了している場合は結果も含める
if job_status['status'] == 'completed':
response['results'] = job_status.get('results', {})
return jsonify(response), 200
else:
return jsonify({'error': 'ジョブが見つかりません'}), 404
@bp.route('/result/', methods=['GET'])
def get_result(job_id):
"""処理結果取得エンドポイント"""
job_status = pipeline.get_job_status(job_id)
if not job_status:
return jsonify({'error': 'ジョブが見つかりません'}), 404
if job_status['status'] != 'completed':
return jsonify({'error': '処理がまだ完了していません'}), 400
# 結果を整形して返す
results = job_status.get('results', {})
response = {
'job_id': job_id,
'status': 'completed',
'urls': {}
}
if 'cdn_distribution' in results:
cdn_info = results['cdn_distribution']
response['urls']['main'] = cdn_info['cdn_main_url']
response['urls']['thumbnails'] = cdn_info['cdn_thumbnail_urls']
return jsonify(response), 200
return bp
# Celeryタスクの定義(非同期処理用)
@app.task(bind=True)
def process_image_async(self, file_path: str, user_id: str, metadata: Dict):
"""非同期画像処理タスク"""
pipeline = ImageProcessingPipeline()
# ジョブIDをCeleryタスクIDとして使用
job_id = self.request.id
# ジョブ情報を作成
job = {
'job_id': job_id,
'file_path': file_path,
'user_id': user_id,
'metadata': metadata,
'status': 'processing',
'created_at': datetime.now().isoformat(),
'steps': [],
'results': {}
}
# Redisに保存
redis_key = f"celery_image_job:{job_id}"
redis_client.setex(redis_key, timedelta(hours=24), json.dumps(job))
try:
# 各処理ステップを実行(同期処理と同様)
steps = [
('validation', pipeline._validate_image),
('resize', pipeline._resize_image),
('thumbnail_generation', pipeline._generate_thumbnails),
('optimization', pipeline._optimize_image),
('upload', pipeline._upload_to_storage),
('cdn_distribution', pipeline._distribute_to_cdn)
]
current_file_path = file_path
for step_name, step_func in steps:
# タスクの進捗状況を更新
self.update_state(
state='PROGRESS',
meta={
'current': step_name,
'total': len(steps),
'progress': (steps.index((step_name, step_func)) / len(steps)) * 100
}
)
try:
# ステップ実行
result = step_func(current_file_path, job)
# ファイルパスを更新(リサイズや最適化で変更される場合)
if step_name in ['resize', 'optimization'] and 'temp_path' in result:
current_file_path = result['temp_path']
# ステップ情報を記録
job['steps'].append({
'name': step_name,
'status': 'completed',
'result': result
})
# 結果を保存
if step_name in ['thumbnail_generation', 'upload', 'cdn_distribution']:
job['results'][step_name] = result
# Redisを更新
redis_client.setex(redis_key, timedelta(hours=24), json.dumps(job))
except Exception as e:
job['steps'].append({
'name': step_name,
'status': 'failed',
'error': str(e)
})
job['status'] = 'failed'
redis_client.setex(redis_key, timedelta(hours=24), json.dumps(job))
raise e
# 完了
job['status'] = 'completed'
job['completed_at'] = datetime.now().isoformat()
redis_client.setex(redis_key, timedelta(hours=24), json.dumps(job))
# 一時ファイルのクリーンアップ(オプション)
pipeline.cleanup_temp_files()
return {
'status': 'completed',
'job_id': job_id,
'results': job['results']
}
except Exception as e:
job['status'] = 'failed'
job['error'] = str(e)
redis_client.setex(redis_key, timedelta(hours=24), json.dumps(job))
raise self.retry(exc=e, countdown=60) # 60秒後に再試行
# メインアプリケーションの例
if __name__ == "__main__":
# パイプラインの初期化
pipeline = ImageProcessingPipeline()
# サンプル画像を処理
sample_image = "sample.jpg"
if os.path.exists(sample_image):
job_id = pipeline.upload_image(
file_path=sample_image,
user_id="test_user",
metadata={
'max_size': (1200, 1200),
'optimize': {'quality': 85}
}
)
print(f"ジョブを開始しました: {job_id}")
# ステータスを監視
for _ in range(30): # 最大30秒待機
status = pipeline.get_job_status(job_id)
if status and status['status'] in ['completed', 'failed']:
print(f"ジョブ完了: {status['status']}")
if status['status'] == 'completed':
print(f"結果: {json.dumps(status.get('results', {}), indent=2)}")
break
time.sleep(1)
print("処理完了")