外部API連携
2026-03-03はじめに
Webアプリケーションにおいて、外部APIとの連携は非常に重要な機能です。天気情報、地図、翻訳、ソーシャルメディア連携など、様々な外部サービスを活用することで、アプリケーションの機能を大きく拡張できます。本章では、Pythonのrequestsライブラリを使った外部APIとの通信方法、APIキーの安全な管理方法、そして具体的な天気APIと地図APIとの連携例について学びます。
requestsライブラリの基本
まずは、外部APIと通信するための基本ライブラリであるrequestsの使い方を学びましょう。
requestsのインストール
requestsとは、HTTP通信用のPythonのライブラリです。主にWEBスクレイピングでHTMLやXMLファイルからデータを取得するのに使われます。
pip install requests
基本的なAPIリクエスト
requestsライブラリを使ったFlaskアプリケーションで使用するAPIクライアントを実装したものです。APIClientクラスは、requests.Sessionを利用して共通のヘッダーとタイムアウトを設定し、GETやPOSTなどのHTTPメソッドに対応した汎用的なリクエストメソッドを提供します。
# app/utils/api_client.py
import requests
import json
from typing import Optional, Dict, Any
from functools import wraps
import time
from flask import current_app
import os
class APIClient:
"""APIクライアントの基本クラス"""
def __init__(self, base_url: str, timeout: int = 10):
self.base_url = base_url.rstrip('/')
self.timeout = timeout
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Flask-Image-App/1.0',
'Accept': 'application/json',
})
def request(self, method: str, endpoint: str, **kwargs) -> Optional[Dict[str, Any]]:
"""汎用APIリクエストメソッド"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
try:
response = self.session.request(
method=method,
url=url,
timeout=self.timeout,
**kwargs
)
response.raise_for_status() # HTTPエラーがあれば例外を発生
if response.status_code == 204: # No Content
return None
return response.json()
except requests.exceptions.Timeout:
current_app.logger.error(f"APIリクエストがタイムアウトしました: {url}")
return None
except requests.exceptions.RequestException as e:
current_app.logger.error(f"APIリクエストエラー: {e}")
return None
except json.JSONDecodeError as e:
current_app.logger.error(f"JSONデコードエラー: {e}")
return None
def get(self, endpoint: str, **kwargs) -> Optional[Dict[str, Any]]:
"""GETリクエスト"""
return self.request('GET', endpoint, **kwargs)
def post(self, endpoint: str, **kwargs) -> Optional[Dict[str, Any]]:
"""POSTリクエスト"""
return self.request('POST', endpoint, **kwargs)
def put(self, endpoint: str, **kwargs) -> Optional[Dict[str, Any]]:
"""PUTリクエスト"""
return self.request('PUT', endpoint, **kwargs)
def delete(self, endpoint: str, **kwargs) -> Optional[Dict[str, Any]]:
"""DELETEリクエスト"""
return self.request('DELETE', endpoint, **kwargs)
def rate_limit(max_per_minute: int = 60):
"""APIレート制限デコレータ"""
def decorator(func):
last_called = [0.0]
min_interval = 60.0 / max_per_minute
@wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - last_called[0]
if elapsed < min_interval:
time.sleep(min_interval - elapsed)
result = func(*args, **kwargs)
last_called[0] = time.time()
return result
return wrapper
return decorator
def cache_api_response(ttl: int = 300):
"""APIレスポンスキャッシュデコレータ(簡易版)"""
cache = {}
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# キャッシュキーの生成(関数名と引数から)
cache_key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
# キャッシュチェック
if cache_key in cache:
cached_data, timestamp = cache[cache_key]
if time.time() - timestamp < ttl:
return cached_data
# APIコール
result = func(*args, **kwargs)
# キャッシュ保存
if result is not None:
cache[cache_key] = (result, time.time())
return result
return wrapper
return decorator
リクエスト時にはタイムアウトやHTTPステータスコード、JSONデコードの各エラーを補足し、ログ出力した上でNoneを返すようになっています。また、レート制限用のデコレータと、簡易的なインメモリキャッシュを行うデコレータも併せて実装されています。
get、post、put、delete は request メソッドを内部的に呼び出す補助メソッドで、呼び出し側の記述を簡潔にする役割を持っています。これにより、APIClient を使うコードは HTTP メソッドの違いを意識せず、統一されたインターフェースで外部 API を利用できます。
また、このファイルにはデコレータが二つ定義されています。rate_limit は関数の呼び出し間隔を調整し、1 分あたりの最大実行回数を超えないように待機処理を入れます。cache_api_response は関数名と引数をキーにしてレスポンスを一定時間キャッシュし、同じ条件での再呼び出し時には API を叩かず結果を返します。これらにより、外部 API の負荷軽減とアプリケーションの安定性向上を図っています。
天気API連携
OpenWeatherMap APIを使用して、天気情報を取得する機能を実装します。
# app/services/weather_service.py
import os
from typing import Optional, Dict, Any
from dataclasses import dataclass
from datetime import datetime
from app.utils.api_client import APIClient, cache_api_response, rate_limit
@dataclass
class WeatherData:
"""天気データクラス"""
location: str
temperature: float
feels_like: float
humidity: int
pressure: int
description: str
icon: str
wind_speed: float
wind_direction: int
sunrise: datetime
sunset: datetime
timestamp: datetime
@classmethod
def from_openweathermap(cls, data: Dict[str, Any], location: str) -> 'WeatherData':
"""OpenWeatherMap APIのレスポンスからWeatherDataを作成"""
main = data['main']
weather = data['weather'][0]
return cls(
location=location,
temperature=main['temp'],
feels_like=main['feels_like'],
humidity=main['humidity'],
pressure=main['pressure'],
description=weather['description'],
icon=weather['icon'],
wind_speed=data['wind']['speed'] if 'wind' in data else 0,
wind_direction=data['wind']['deg'] if 'wind' in data and 'deg' in data['wind'] else 0,
sunrise=datetime.fromtimestamp(data['sys']['sunrise']),
sunset=datetime.fromtimestamp(data['sys']['sunset']),
timestamp=datetime.now()
)
def to_dict(self) -> Dict[str, Any]:
"""辞書形式に変換"""
return {
'location': self.location,
'temperature': round(self.temperature, 1),
'feels_like': round(self.feels_like, 1),
'humidity': self.humidity,
'pressure': self.pressure,
'description': self.description,
'icon': self.icon,
'icon_url': f"https://openweathermap.org/img/wn/{self.icon}@2x.png",
'wind_speed': round(self.wind_speed, 1),
'wind_direction': self.wind_direction,
'sunrise': self.sunrise.strftime('%H:%M'),
'sunset': self.sunset.strftime('%H:%M'),
'timestamp': self.timestamp.strftime('%Y-%m-%d %H:%M:%S')
}
class WeatherService:
"""天気情報サービス"""
def __init__(self):
self.api_key = os.environ.get('OPENWEATHERMAP_API_KEY')
self.base_url = "https://api.openweathermap.org/data/2.5"
self.client = APIClient(self.base_url)
@rate_limit(max_per_minute=60) # OpenWeatherMapの無料プラン制限
@cache_api_response(ttl=600) # 10分間キャッシュ
def get_current_weather(self, city: str, country_code: str = None) -> Optional[WeatherData]:
"""現在の天気情報を取得"""
if not self.api_key:
raise ValueError("OpenWeatherMap APIキーが設定されていません")
# 都市名の整形
location = city
if country_code:
location = f"{city},{country_code}"
try:
params = {
'q': location,
'appid': self.api_key,
'units': 'metric', # 摂氏温度
'lang': 'ja' # 日本語対応
}
response = self.client.get('/weather', params=params)
if response:
return WeatherData.from_openweathermap(response, city)
return None
except Exception as e:
current_app.logger.error(f"天気情報取得エラー: {e}")
return None
@rate_limit(max_per_minute=60)
@cache_api_response(ttl=3600) # 1時間キャッシュ
def get_forecast(self, city: str, days: int = 5) -> Optional[Dict[str, Any]]:
"""天気予報を取得"""
if not self.api_key:
raise ValueError("OpenWeatherMap APIキーが設定されていません")
try:
params = {
'q': city,
'appid': self.api_key,
'units': 'metric',
'lang': 'ja',
'cnt': days * 8 # 3時間ごとのデータなので、days×8
}
response = self.client.get('/forecast', params=params)
if response and 'list' in response:
forecasts = []
for item in response['list']:
forecast = {
'datetime': datetime.fromtimestamp(item['dt']).strftime('%m/%d %H:%M'),
'temperature': round(item['main']['temp'], 1),
'feels_like': round(item['main']['feels_like'], 1),
'description': item['weather'][0]['description'],
'icon': item['weather'][0]['icon'],
'icon_url': f"https://openweathermap.org/img/wn/{item['weather'][0]['icon']}.png",
'humidity': item['main']['humidity'],
'pop': round(item.get('pop', 0) * 100), # 降水確率(%)
}
forecasts.append(forecast)
return {
'city': response['city']['name'],
'country': response['city']['country'],
'forecasts': forecasts
}
return None
except Exception as e:
current_app.logger.error(f"天気予報取得エラー: {e}")
return None
def get_weather_by_coordinates(self, lat: float, lon: float) -> Optional[WeatherData]:
"""緯度経度から天気情報を取得"""
if not self.api_key:
raise ValueError("OpenWeatherMap APIキーが設定されていません")
try:
params = {
'lat': lat,
'lon': lon,
'appid': self.api_key,
'units': 'metric',
'lang': 'ja'
}
response = self.client.get('/weather', params=params)
if response:
return WeatherData.from_openweathermap(response,
response['name'])
return None
except Exception as e:
current_app.logger.error(f"座標から天気取得エラー: {e}")
return None
このコードは OpenWeatherMap API を使って天気情報を取得し、アプリで扱いやすい形に整形するサービス層を定義しています。WeatherData は天気データを表すデータクラスで、地点名、気温、体感温度、湿度、気圧、天気説明、アイコン、風、日の出入時刻、取得時刻などを保持します。from_openweathermap は API レスポンスから必要な項目を取り出して WeatherData を生成し、to_dict は表示用に丸め処理や時刻フォーマット、アイコン URL の組み立てまで行って辞書に変換します。
WeatherService は API キーを環境変数から読み込み、APIClient を使って OpenWeatherMap にアクセスします。get_current_weather は都市名と任意の国コードから現在天気を取得し、結果があれば WeatherData に変換して返します。get_forecast は 3 時間ごとの予報データを取得し、指定日数分をリストに整形して都市情報と一緒に返します。cnt を days×8 にしているのは 1 日あたり 3 時間刻みで 8 件ある前提によります。
また各取得メソッドにはレート制限とキャッシュが付いており、無料プランの制限に配慮しつつ同じ条件の呼び出しを一定時間は再利用します。現在天気は 10 分、予報は 1 時間キャッシュされます。API キー未設定時は例外を投げ、通信や処理で例外が起きた場合はログを出して None を返す方針です。さらに get_weather_by_coordinates では緯度経度から現在天気を取得し、レスポンス中の地名を使って WeatherData を生成します。
地図API連携(LeafletとOpenStreetMap)
OpenStreetMapとLeafletを使用して、インタラクティブな地図機能を実装します。
# app/services/map_service.py
import os
from typing import Optional, Dict, Any, Tuple, List
from dataclasses import dataclass
import math
from app.utils.api_client import APIClient, cache_api_response
@dataclass
class Location:
"""位置情報クラス"""
latitude: float
longitude: float
address: str = ""
def to_dict(self) -> Dict[str, Any]:
return {
'lat': self.latitude,
'lng': self.longitude,
'address': self.address
}
class MapService:
"""地図サービス"""
def __init__(self):
# OpenStreetMap Nominatim API(ジオコーディング用)
self.geocode_client = APIClient("https://nominatim.openstreetmap.org")
self.geocode_client.session.headers.update({
'Accept-Language': 'ja',
})
@cache_api_response(ttl=86400) # 24時間キャッシュ(Nominatimの利用規約に準拠)
def geocode(self, address: str) -> Optional[Location]:
"""住所から緯度経度を取得(ジオコーディング)"""
try:
params = {
'q': address,
'format': 'json',
'limit': 1,
'addressdetails': 1
}
response = self.geocode_client.get('/search', params=params)
if response and len(response) > 0:
location = response[0]
return Location(
latitude=float(location['lat']),
longitude=float(location['lon']),
address=location.get('display_name', address)
)
return None
except Exception as e:
current_app.logger.error(f"ジオコーディングエラー: {e}")
return None
@cache_api_response(ttl=86400)
def reverse_geocode(self, lat: float, lon: float) -> Optional[Location]:
"""緯度経度から住所を取得(逆ジオコーディング)"""
try:
params = {
'lat': lat,
'lon': lon,
'format': 'json',
'zoom': 18, # 詳細な住所情報
'addressdetails': 1
}
response = self.geocode_client.get('/reverse', params=params)
if response:
return Location(
latitude=lat,
longitude=lon,
address=response.get('display_name', f"{lat}, {lon}")
)
return None
except Exception as e:
current_app.logger.error(f"逆ジオコーディングエラー: {e}")
return None
def calculate_distance(self, loc1: Location, loc2: Location) -> float:
"""2点間の距離を計算(ハバーサイン公式)"""
# 地球の半径(km)
R = 6371.0
# 緯度経度をラジアンに変換
lat1 = math.radians(loc1.latitude)
lon1 = math.radians(loc1.longitude)
lat2 = math.radians(loc2.latitude)
lon2 = math.radians(loc2.longitude)
# 緯度差と経度差
dlat = lat2 - lat1
dlon = lon2 - lon1
# ハバーサイン公式
a = math.sin(dlat / 2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2)**2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
# 距離(km)
distance = R * c
return round(distance, 2)
def calculate_bounding_box(self, center: Location, radius_km: float = 1.0) -> Dict[str, float]:
"""中心点と半径からバウンディングボックスを計算"""
# 1度あたりの緯度経度の距離(おおよそ)
km_per_degree_lat = 111.0
km_per_degree_lon = 111.0 * math.cos(math.radians(center.latitude))
delta_lat = radius_km / km_per_degree_lat
delta_lon = radius_km / km_per_degree_lon
return {
'north': center.latitude + delta_lat,
'south': center.latitude - delta_lat,
'east': center.longitude + delta_lon,
'west': center.longitude - delta_lon
}
def generate_static_map_url(self, location: Location, zoom: int = 15,
width: int = 600, height: int = 400) -> str:
"""静的な地図画像のURLを生成"""
return (f"https://www.openstreetmap.org/export/embed.html?"
f"bbox={location.longitude-0.01}%2C{location.latitude-0.01}%2C"
f"{location.longitude+0.01}%2C{location.latitude+0.01}&"
f"layer=mapnik&marker={location.latitude}%2C{location.longitude}")
def search_places_nearby(self, location: Location, radius_km: float = 1.0,
categories: List[str] = None) -> List[Dict[str, Any]]:
"""周辺の施設を検索(Overpass API使用)"""
if categories is None:
categories = ['amenity', 'shop', 'tourism']
# バウンディングボックスを計算
bbox = self.calculate_bounding_box(location, radius_km)
# Overpass QLクエリの構築
category_filters = '|'.join([f'node[{cat}]' for cat in categories])
query = f"""
[out:json][timeout:25];
(
{category_filters}
({bbox['south']},{bbox['west']},{bbox['north']},{bbox['east']});
);
out body;
>;
out skel qt;
"""
try:
overpass_client = APIClient("https://overpass-api.de/api")
response = overpass_client.post('/interpreter', data={'data': query})
places = []
if response and 'elements' in response:
for element in response['elements']:
if 'tags' in element:
place = {
'name': element['tags'].get('name', '名称不明'),
'lat': element.get('lat'),
'lon': element.get('lon'),
'type': next((cat for cat in categories if cat in element['tags']), 'unknown'),
'details': element['tags']
}
places.append(place)
return places
except Exception as e:
current_app.logger.error(f"周辺施設検索エラー: {e}")
return []
このコードは OpenStreetMap 系の公開 API を使って、住所と座標の変換や距離計算、周辺施設検索などを行う地図サービスを提供します。Location は緯度・経度と任意の住所文字列を保持するデータクラスで、to_dict により lat/lng と address を持つ辞書へ変換できます。MapService は Nominatim をジオコーディング用クライアントとして初期化し、Accept-Language を日本語にして結果が日本語寄りになるよう調整しています。
geocode は住所から座標を取得し、reverse_geocode は座標から住所を取得します。どちらも Nominatim の負荷や利用規約を意識して 24 時間キャッシュを付け、同じ入力なら API 呼び出しを抑えます。取得に成功した場合は Location を返し、失敗時はログを出して None を返す方針です。calculate_distance はハバーサイン公式で 2 点間の地表距離を km で計算し、少数 2 桁に丸めて返します。calculate_bounding_box は中心点と半径から検索用の矩形範囲を度換算で求め、north/south/east/west を返します。
generate_static_map_url は指定地点の周辺を埋め込み表示できる OpenStreetMap の URL を組み立て、マーカー付きの表示を想定しています。search_places_nearby は Overpass API に Overpass QL クエリを投げ、中心点周辺のバウンディングボックス内にある施設ノードを検索します。categories が未指定なら amenity、shop、tourism を対象にし、レスポンスの tags から名称や種別、詳細情報を抽出してリスト化して返し、失敗時は空配列を返します。
APIキーの安全な管理
APIキーを安全に管理するためのクラスを実装します。
# app/utils/api_key_manager.py
import os
import base64
import json
from typing import Dict, Optional
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2
from pathlib import Path
import secrets
class APIKeyManager:
"""APIキー管理クラス"""
def __init__(self, config_dir: str = None):
# 設定ディレクトリ
if config_dir is None:
self.config_dir = Path.home() / '.flask_app' / 'api_keys'
else:
self.config_dir = Path(config_dir)
# ディレクトリがなければ作成
self.config_dir.mkdir(parents=True, exist_ok=True)
# マスターキーファイルのパス
self.master_key_file = self.config_dir / '.master_key'
# マスターキーの読み込みまたは生成
self.master_key = self._load_or_create_master_key()
# Fernetインスタンスの作成
self.cipher = Fernet(self.master_key)
def _load_or_create_master_key(self) -> bytes:
"""マスターキーの読み込みまたは生成"""
if self.master_key_file.exists():
# 既存のマスターキーを読み込み
return self.master_key_file.read_bytes()
else:
# 新しいマスターキーを生成
master_key = Fernet.generate_key()
# マスターキーファイルを保存(権限を600に設定)
self.master_key_file.write_bytes(master_key)
self.master_key_file.chmod(0o600)
return master_key
def _derive_key_from_password(self, password: str, salt: bytes = None) -> bytes:
"""パスワードから暗号化キーを導出"""
if salt is None:
salt = secrets.token_bytes(16)
kdf = PBKDF2(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
return key
def save_api_key(self, service_name: str, api_key: str,
password: str = None, metadata: Dict = None) -> bool:
"""APIキーを暗号化して保存"""
try:
# サービスごとの設定ファイル
service_file = self.config_dir / f"{service_name}.json"
# 保存データの準備
key_data = {
'api_key': api_key,
'metadata': metadata or {},
'created_at': time.time(),
'updated_at': time.time()
}
# 暗号化
if password:
# パスワードベースの暗号化
salt = secrets.token_bytes(16)
key = self._derive_key_from_password(password, salt)
cipher = Fernet(key)
encrypted_data = cipher.encrypt(json.dumps(key_data).encode())
save_data = {
'encrypted': base64.b64encode(encrypted_data).decode(),
'salt': base64.b64encode(salt).decode(),
'method': 'password'
}
else:
# マスターキーによる暗号化
encrypted_data = self.cipher.encrypt(json.dumps(key_data).encode())
save_data = {
'encrypted': base64.b64encode(encrypted_data).decode(),
'method': 'master_key'
}
# ファイルに保存
service_file.write_text(json.dumps(save_data, indent=2))
service_file.chmod(0o600) # 所有者のみ読み書き
return True
except Exception as e:
current_app.logger.error(f"APIキー保存エラー: {e}")
return False
def get_api_key(self, service_name: str, password: str = None) -> Optional[str]:
"""APIキーを復号して取得"""
try:
service_file = self.config_dir / f"{service_name}.json"
if not service_file.exists():
return None
# 保存データの読み込み
save_data = json.loads(service_file.read_text())
# 復号
if save_data['method'] == 'password':
if not password:
raise ValueError("パスワードが必要です")
salt = base64.b64decode(save_data['salt'])
key = self._derive_key_from_password(password, salt)
cipher = Fernet(key)
encrypted_data = base64.b64decode(save_data['encrypted'])
decrypted_data = cipher.decrypt(encrypted_data)
else:
# マスターキーによる復号
encrypted_data = base64.b64decode(save_data['encrypted'])
decrypted_data = self.cipher.decrypt(encrypted_data)
# データをパース
key_data = json.loads(decrypted_data.decode())
return key_data['api_key']
except Exception as e:
current_app.logger.error(f"APIキー取得エラー: {e}")
return None
def list_services(self) -> List[str]:
"""保存されているサービス一覧を取得"""
services = []
for file in self.config_dir.glob("*.json"):
if file.name != '.master_key':
services.append(file.stem)
return services
def delete_api_key(self, service_name: str) -> bool:
"""APIキーを削除"""
try:
service_file = self.config_dir / f"{service_name}.json"
if service_file.exists():
service_file.unlink()
return True
return False
except Exception as e:
current_app.logger.error(f"APIキー削除エラー: {e}")
return False
# 環境変数からのAPIキー読み込みを補助する関数
def load_api_key_from_env(service_name: str, env_var: str = None,
key_manager: APIKeyManager = None) -> Optional[str]:
"""環境変数またはAPIキーマネージャーからAPIキーを読み込む"""
# 環境変数を優先
if env_var is None:
env_var = f"{service_name.upper()}_API_KEY"
api_key = os.environ.get(env_var)
if api_key:
return api_key
# APIキーマネージャーから読み込み(パスワード保護されている場合)
if key_manager:
# パスワードは環境変数から取得(例: OPENWEATHERMAP_API_PASSWORD)
password_env_var = f"{service_name.upper()}_API_PASSWORD"
password = os.environ.get(password_env_var)
return key_manager.get_api_key(service_name, password)
return None
このコードは、外部サービスの API キーを安全に保存・取得するための APIKeyManager を定義しています。保存先はデフォルトでユーザーのホーム配下の .flask_app/api_keys で、起動時にディレクトリを作成し、暗号化に使うマスターキーを .master_key から読み込むか新規生成します。生成したマスターキーファイルと各サービスの設定ファイルは権限を 600 にして、所有者のみが読み書きできるようにしています。暗号方式には Fernet を使い、マスターキー方式のほか、パスワードを与えた場合は PBKDF2(SHA-256、反復 100000 回、salt 付き)で鍵を導出して暗号化する方式も選べます。
save_api_key は service_name ごとの JSON ファイルに、API キー本体と任意のメタデータ、作成・更新時刻をまとめたデータを暗号化して保存します。パスワード指定時は salt と暗号文を base64 化して保存し、method として password を記録します。パスワードがない場合はマスターキーで暗号化し、method を master_key として保存します。get_api_key は保存された method を見て復号手順を切り替え、password 方式なら salt を使って同じ鍵を導出して復号し、マスターキー方式なら保持している cipher で復号して API キー文字列を返します。list_services は保存済みのサービス名一覧を返し、delete_api_key は該当ファイルを削除します。いずれも例外時はログを出して失敗扱いにします。
末尾の load_api_key_from_env は API キーの読み込み順を統一する補助関数で、まず環境変数を最優先で参照し、見つからない場合に限って APIKeyManager から読み込みます。パスワード保護されているケースを想定し、パスワード自体もサービス名に応じた環境変数から取得して復号に渡す設計です。なお、このコード断片には time、List、current_app の import が見当たらないため、そのままでは実行時にエラーになる可能性があります。
Flaskアプリケーションとの統合
# app/routes/api_integration.py
from flask import Blueprint, render_template, request, jsonify, current_app
import os
from app.services.weather_service import WeatherService
from app.services.map_service import MapService
from app.utils.api_key_manager import load_api_key_from_env, APIKeyManager
from config import Config
bp = Blueprint('api', __name__, url_prefix='/api')
# APIサービスの初期化
weather_service = WeatherService()
map_service = MapService()
key_manager = APIKeyManager()
@bp.route('/weather/current')
def get_current_weather():
"""現在の天気情報を取得"""
city = request.args.get('city', 'Tokyo')
country_code = request.args.get('country', 'JP')
try:
weather_data = weather_service.get_current_weather(city, country_code)
if weather_data:
return jsonify({
'success': True,
'data': weather_data.to_dict()
})
else:
return jsonify({
'success': False,
'error': '天気情報を取得できませんでした'
}), 404
except Exception as e:
current_app.logger.error(f"天気APIエラー: {e}")
return jsonify({
'success': False,
'error': '天気情報の取得中にエラーが発生しました'
}), 500
@bp.route('/weather/forecast')
def get_weather_forecast():
"""天気予報を取得"""
city = request.args.get('city', 'Tokyo')
days = min(int(request.args.get('days', 5)), 5) # 最大5日間
try:
forecast_data = weather_service.get_forecast(city, days)
if forecast_data:
return jsonify({
'success': True,
'data': forecast_data
})
else:
return jsonify({
'success': False,
'error': '天気予報を取得できませんでした'
}), 404
except Exception as e:
current_app.logger.error(f"天気予報APIエラー: {e}")
return jsonify({
'success': False,
'error': '天気予報の取得中にエラーが発生しました'
}), 500
@bp.route('/geocode')
def geocode_address():
"""住所から緯度経度を取得"""
address = request.args.get('address', '')
if not address:
return jsonify({
'success': False,
'error': '住所を指定してください'
}), 400
try:
location = map_service.geocode(address)
if location:
return jsonify({
'success': True,
'data': location.to_dict()
})
else:
return jsonify({
'success': False,
'error': '住所をジオコーディングできませんでした'
}), 404
except Exception as e:
current_app.logger.error(f"ジオコーディングエラー: {e}")
return jsonify({
'success': False,
'error': 'ジオコーディング中にエラーが発生しました'
}), 500
@bp.route('/reverse-geocode')
def reverse_geocode():
"""緯度経度から住所を取得"""
try:
lat = float(request.args.get('lat', 35.681236))
lon = float(request.args.get('lon', 139.767125))
location = map_service.reverse_geocode(lat, lon)
if location:
return jsonify({
'success': True,
'data': location.to_dict()
})
else:
return jsonify({
'success': False,
'error': '逆ジオコーディングできませんでした'
}), 404
except ValueError:
return jsonify({
'success': False,
'error': '緯度経度の形式が正しくありません'
}), 400
except Exception as e:
current_app.logger.error(f"逆ジオコーディングエラー: {e}")
return jsonify({
'success': False,
'error': '逆ジオコーディング中にエラーが発生しました'
}), 500
@bp.route('/places/nearby')
def get_places_nearby():
"""周辺施設を検索"""
try:
lat = float(request.args.get('lat', 35.681236))
lon = float(request.args.get('lon', 139.767125))
radius = float(request.args.get('radius', 1.0))
categories = request.args.getlist('categories')
if not categories:
categories = ['amenity', 'shop', 'tourism']
location = MapService.Location(latitude=lat, longitude=lon)
places = map_service.search_places_nearby(location, radius, categories)
return jsonify({
'success': True,
'data': {
'location': location.to_dict(),
'places': places,
'count': len(places)
}
})
except ValueError:
return jsonify({
'success': False,
'error': '緯度経度の形式が正しくありません'
}), 400
except Exception as e:
current_app.logger.error(f"周辺施設検索エラー: {e}")
return jsonify({
'success': False,
'error': '周辺施設の検索中にエラーが発生しました'
}), 500
@bp.route('/api-keys/manage', methods=['POST'])
def manage_api_key():
"""APIキーの管理(保存・取得・削除)"""
action = request.json.get('action')
service_name = request.json.get('service_name')
if not service_name:
return jsonify({
'success': False,
'error': 'サービス名を指定してください'
}), 400
try:
if action == 'save':
api_key = request.json.get('api_key')
password = request.json.get('password')
metadata = request.json.get('metadata', {})
if not api_key:
return jsonify({
'success': False,
'error': 'APIキーを指定してください'
}), 400
success = key_manager.save_api_key(service_name, api_key, password, metadata)
if success:
return jsonify({
'success': True,
'message': f'{service_name}のAPIキーを保存しました'
})
else:
return jsonify({
'success': False,
'error': 'APIキーの保存に失敗しました'
}), 500
elif action == 'get':
password = request.json.get('password')
api_key = key_manager.get_api_key(service_name, password)
if api_key:
return jsonify({
'success': True,
'service_name': service_name,
'api_key': api_key
})
else:
return jsonify({
'success': False,
'error': 'APIキーを取得できませんでした'
}), 404
elif action == 'delete':
success = key_manager.delete_api_key(service_name)
if success:
return jsonify({
'success': True,
'message': f'{service_name}のAPIキーを削除しました'
})
else:
return jsonify({
'success': False,
'error': 'APIキーの削除に失敗しました'
}), 500
elif action == 'list':
services = key_manager.list_services()
return jsonify({
'success': True,
'services': services
})
else:
return jsonify({
'success': False,
'error': '不明なアクションです'
}), 400
except Exception as e:
current_app.logger.error(f"APIキー管理エラー: {e}")
return jsonify({
'success': False,
'error': 'APIキーの管理中にエラーが発生しました'
}), 500
# デモ用HTMLテンプレート
@bp.route('/demo')
def api_demo():
"""API連携デモページ"""
return render_template('api/demo.html')
このコードは、外部 API と連携する機能を Flask の Blueprint としてまとめたルーティング定義です。/api をプレフィックスに、天気情報、地図情報、周辺施設検索、API キー管理などのエンドポイントを提供します。WeatherService と MapService を初期化し、それぞれ OpenWeatherMap や OpenStreetMap 系 API を利用した処理を呼び出します。各エンドポイントは JSON 形式で結果を返し、成功可否を success フラグで統一しています。
天気関連では現在の天気と最大 5 日分の予報を取得でき、都市名や日数をクエリパラメータで指定します。地図関連では住所から座標を求めるジオコーディング、座標から住所を返す逆ジオコーディング、指定地点周辺の施設検索を行います。入力値が不正な場合は 400、データが見つからない場合は 404、想定外のエラーは 500 を返すようにエラーハンドリングされています。
また API キー管理用のエンドポイントでは、保存、取得、削除、一覧取得を JSON リクエストで切り替えて実行します。API キーは APIKeyManager によって暗号化管理され、必要に応じてパスワード付きで扱えます。最後に /api/demo では、これらの API を試すためのデモ用 HTML テンプレートを表示します。
設定ファイルの更新
# config.pyの更新(API関連設定を追加)
import os
from pathlib import Path
class Config:
# ... 既存の設定 ...
# API関連の設定
OPENWEATHERMAP_API_KEY = os.environ.get('OPENWEATHERMAP_API_KEY')
OPENWEATHERMAP_BASE_URL = 'https://api.openweathermap.org/data/2.5'
# 地図API設定
NOMINATIM_BASE_URL = 'https://nominatim.openstreetmap.org'
OVERPASS_API_URL = 'https://overpass-api.de/api'
# APIレート制限
API_RATE_LIMIT_PER_MINUTE = 60
# APIキャッシュ設定
WEATHER_CACHE_TTL = 600 # 10分
GEOCACHE_CACHE_TTL = 86400 # 24時間
# APIキー管理設定
API_KEY_STORAGE_DIR = Path(__file__).parent / 'data' / 'api_keys'
@staticmethod
def init_app(app):
# ... 既存の初期化 ...
# APIキーストレージディレクトリの作成
Config.API_KEY_STORAGE_DIR.mkdir(parents=True, exist_ok=True)
このコードは、外部 API 連携に関する設定を Config クラスにまとめて追加したものです。OpenWeatherMap 用の API キーとベース URL、OpenStreetMap 系の Nominatim と Overpass API のエンドポイントを定数として定義し、サービス層から共通で参照できるようにしています。
あわせて API の利用制限やキャッシュ時間も設定として切り出しており、天気情報は 10 分、ジオコーディング結果は 24 時間キャッシュする前提を明示しています。これにより、レート制限対策やパフォーマンス調整を設定変更だけで行える設計になっています。
さらに API キーを安全に保存するためのストレージディレクトリを定義し、init_app 内でディレクトリを自動作成します。これにより、アプリ起動時に必要な保存先が確実に用意され、環境変数とファイル保存の両方を前提とした API キー管理を支える設定となっています。
HTMLテンプレート(APIデモページ)
<!-- templates/api/demo.html -->
<!DOCTYPE html>
<html lang="ja">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>外部API連携デモ</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css" rel="stylesheet">
<link rel="stylesheet" href="https://unpkg.com/leaflet@1.9.4/dist/leaflet.css" />
<style>
#map {
height: 400px;
border-radius: 8px;
margin-bottom: 20px;
}
.weather-card {
border-radius: 10px;
padding: 20px;
margin-bottom: 20px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.weather-sunny { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; }
.weather-cloudy { background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%); color: white; }
.weather-rainy { background: linear-gradient(135deg, #4facfe 0%, #00f2fe 100%); color: white; }
.api-test-btn { margin: 5px; }
.result-box {
background-color: #f8f9fa;
border-radius: 5px;
padding: 15px;
margin-top: 15px;
max-height: 300px;
overflow-y: auto;
}
</style>
</head>
<body>
<div class="container mt-4">
<h1 class="mb-4">外部API連携デモ</h1>
<div class="row">
<!-- 天気APIセクション -->
<div class="col-md-6">
<div class="card">
<div class="card-header bg-primary text-white">
<h5 class="mb-0">天気API連携</h5>
</div>
<div class="card-body">
<div class="mb-3">
<label for="weatherCity" class="form-label">都市名</label>
<input type="text" class="form-control" id="weatherCity" value="東京">
</div>
<div class="mb-3">
<label for="weatherCountry" class="form-label">国コード (オプション)</label>
<input type="text" class="form-control" id="weatherCountry" value="JP" placeholder="例: JP, US, GB">
</div>
<button class="btn btn-primary" id="getCurrentWeather">
現在の天気を取得
</button>
<button class="btn btn-secondary" id="getWeatherForecast">
天気予報を取得 (5日間)
</button>
<div id="weatherResult" class="result-box mt-3"></div>
</div>
</div>
</div>
<!-- 地図APIセクション -->
<div class="col-md-6">
<div class="card">
<div class="card-header bg-success text-white">
<h5 class="mb-0">地図API連携</h5>
</div>
<div class="card-body">
<div class="mb-3">
<label for="searchAddress" class="form-label">住所検索</label>
<div class="input-group">
<input type="text" class="form-control" id="searchAddress"
placeholder="例: 東京都千代田区" value="東京駅">
<button class="btn btn-success" id="searchLocation">
検索
</button>
</div>
</div>
<div id="map"></div>
<div class="mt-3">
<button class="btn btn-info btn-sm" id="findNearbyPlaces">
周辺施設を検索
</button>
<button class="btn btn-warning btn-sm" id="getCurrentLocation">
現在地を取得
</button>
</div>
<div id="mapResult" class="result-box mt-3"></div>
</div>
</div>
</div>
</div>
<!-- APIキー管理セクション -->
<div class="row mt-4">
<div class="col-12">
<div class="card">
<div class="card-header bg-dark text-white">
<h5 class="mb-0">APIキー管理</h5>
</div>
<div class="card-body">
<div class="row">
<div class="col-md-4">
<h6>APIキーを保存</h6>
<div class="mb-3">
<label class="form-label">サービス名</label>
<input type="text" class="form-control" id="saveServiceName"
placeholder="例: OpenWeatherMap">
</div>
<div class="mb-3">
<label class="form-label">APIキー</label>
<input type="password" class="form-control" id="saveApiKey">
</div>
<div class="mb-3">
<label class="form-label">パスワード (オプション)</label>
<input type="password" class="form-control" id="savePassword">
</div>
<button class="btn btn-primary" id="saveApiKeyBtn">保存</button>
</div>
<div class="col-md-4">
<h6>APIキーを取得</h6>
<div class="mb-3">
<label class="form-label">サービス名</label>
<input type="text" class="form-control" id="getServiceName">
</div>
<div class="mb-3">
<label class="form-label">パスワード</label>
<input type="password" class="form-control" id="getPassword">
</div>
<button class="btn btn-success" id="getApiKeyBtn">取得</button>
<button class="btn btn-info" id="listApiKeysBtn">一覧表示</button>
</div>
<div class="col-md-4">
<h6>APIキーを削除</h6>
<div class="mb-3">
<label class="form-label">サービス名</label>
<input type="text" class="form-control" id="deleteServiceName">
</div>
<button class="btn btn-danger" id="deleteApiKeyBtn">削除</button>
</div>
</div>
<div id="apiKeyResult" class="result-box mt-3"></div>
</div>
</div>
</div>
</div>
<!-- APIテスト結果表示エリア -->
<div class="row mt-4">
<div class="col-12">
<div class="card">
<div class="card-header">
<h5 class="mb-0">APIリクエストログ</h5>
</div>
<div class="card-body">
<div id="apiLog" class="result-box" style="max-height: 200px;"></div>
</div>
</div>
</div>
</div>
</div>
<!-- JavaScript -->
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/js/bootstrap.bundle.min.js"></script>
<script src="https://unpkg.com/leaflet@1.9.4/dist/leaflet.js"></script>
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
<script>
// 地図の初期化
let map = null;
let currentMarker = null;
let markers = [];
function initMap() {
map = L.map('map').setView([35.681236, 139.767125], 13);
L.tileLayer('https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png', {
attribution: '© OpenStreetMap contributors'
}).addTo(map);
}
// APIログ表示
function addApiLog(message, type = 'info') {
const timestamp = new Date().toLocaleTimeString();
const logEntry = `<div class="text-${type}"><small>${timestamp}</small> ${message}</div>`;
$('#apiLog').prepend(logEntry);
}
// 天気API関数
async function getCurrentWeather() {
const city = $('#weatherCity').val();
const country = $('#weatherCountry').val();
addApiLog(`天気情報を取得: ${city}, ${country}`);
try {
const response = await fetch(`/api/weather/current?city=${encodeURIComponent(city)}&country=${country}`);
const data = await response.json();
if (data.success) {
const weather = data.data;
let weatherClass = 'weather-cloudy';
if (weather.description.includes('晴')) weatherClass = 'weather-sunny';
if (weather.description.includes('雨')) weatherClass = 'weather-rainy';
$('#weatherResult').html(`
<div class="weather-card ${weatherClass}">
<h4>${weather.location}の天気</h4>
<div class="d-flex align-items-center">
<img src="${weather.icon_url}" alt="${weather.description}"
style="width: 80px; height: 80px;">
<div class="ms-3">
<h2 class="mb-0">${weather.temperature}°C</h2>
<p class="mb-1">体感温度: ${weather.feels_like}°C</p>
<p class="mb-1">${weather.description}</p>
<small>湿度: ${weather.humidity}% | 気圧: ${weather.pressure}hPa</small>
</div>
</div>
<div class="row mt-3">
<div class="col-6">
<small>風速: ${weather.wind_speed}m/s</small><br>
<small>風向: ${weather.wind_direction}°</small>
</div>
<div class="col-6 text-end">
<small>日の出: ${weather.sunrise}</small><br>
<small>日の入: ${weather.sunset}</small>
</div>
</div>
</div>
`);
addApiLog('天気情報の取得に成功', 'success');
} else {
$('#weatherResult').html(`<div class="alert alert-danger">${data.error}</div>`);
addApiLog(`天気情報の取得に失敗: ${data.error}`, 'danger');
}
} catch (error) {
$('#weatherResult').html('<div class="alert alert-danger">API呼び出し中にエラーが発生しました</div>');
addApiLog(`天気APIエラー: ${error}`, 'danger');
}
}
async function getWeatherForecast() {
const city = $('#weatherCity').val();
addApiLog(`天気予報を取得: ${city}`);
try {
const response = await fetch(`/api/weather/forecast?city=${encodeURIComponent(city)}&days=5`);
const data = await response.json();
if (data.success) {
let forecastHtml = `
<h5>${data.data.city}の5日間天気予報</h5>
<div class="row">`;
// 24時間ごとのデータを表示(8件ごとに1日分)
for (let i = 0; i < Math.min(data.data.forecasts.length, 8); i += 8) {
const dayForecasts = data.data.forecasts.slice(i, i + 8);
if (dayForecasts.length > 0) {
const mainForecast = dayForecasts[4] || dayForecasts[0];
forecastHtml += `
<div class="col-md">
<div class="text-center">
<div class="mb-2">${mainForecast.datetime.split(' ')[0]}</div>
<img src="${mainForecast.icon_url}" alt="${mainForecast.description}"
style="width: 50px; height: 50px;">
<div class="mt-1">
<strong>${mainForecast.temperature}°C</strong><br>
<small>${mainForecast.description}</small>
</div>
<small class="text-muted">降水確率: ${mainForecast.pop}%</small>
</div>
</div>`;
}
}
forecastHtml += `</div>`;
$('#weatherResult').html(forecastHtml);
addApiLog('天気予報の取得に成功', 'success');
} else {
$('#weatherResult').html(`<div class="alert alert-danger">${data.error}</div>`);
addApiLog(`天気予報の取得に失敗: ${data.error}`, 'danger');
}
} catch (error) {
$('#weatherResult').html('<div class="alert alert-danger">API呼び出し中にエラーが発生しました</div>');
addApiLog(`天気予報APIエラー: ${error}`, 'danger');
}
}
// 地図API関数
async function searchLocation() {
const address = $('#searchAddress').val();
if (!address) {
alert('住所を入力してください');
return;
}
addApiLog(`住所を検索: ${address}`);
try {
const response = await fetch(`/api/geocode?address=${encodeURIComponent(address)}`);
const data = await response.json();
if (data.success) {
const location = data.data;
// 地図を更新
map.setView([location.lat, location.lng], 15);
// 既存のマーカーを削除
if (currentMarker) {
map.removeLayer(currentMarker);
}
// 新しいマーカーを追加
currentMarker = L.marker([location.lat, location.lng])
.addTo(map)
.bindPopup(`<strong>${address}</strong><br>${location.lat}, ${location.lng}`)
.openPopup();
$('#mapResult').html(`
<div class="alert alert-success">
<strong>住所:</strong> ${location.address}<br>
<strong>緯度:</strong> ${location.lat}<br>
<strong>経度:</strong> ${location.lng}
</div>
`);
addApiLog('住所検索に成功', 'success');
} else {
$('#mapResult').html(`<div class="alert alert-danger">${data.error}</div>`);
addApiLog(`住所検索に失敗: ${data.error}`, 'danger');
}
} catch (error) {
$('#mapResult').html('<div class="alert alert-danger">API呼び出し中にエラーが発生しました</div>');
addApiLog(`ジオコーディングエラー: ${error}`, 'danger');
}
}
async function findNearbyPlaces() {
if (!currentMarker) {
alert('まずは場所を検索してください');
return;
}
const lat = currentMarker.getLatLng().lat;
const lng = currentMarker.getLatLng().lng;
addApiLog(`周辺施設を検索: ${lat}, ${lng}`);
try {
const response = await fetch(`/api/places/nearby?lat=${lat}&lon=${lng}&radius=0.5`);
const data = await response.json();
if (data.success) {
// 既存のマーカーを削除
markers.forEach(marker => map.removeLayer(marker));
markers = [];
// 周辺施設のマーカーを追加
data.data.places.forEach(place => {
if (place.lat && place.lon) {
const marker = L.marker([place.lat, place.lon])
.addTo(map)
.bindPopup(`<strong>${place.name}</strong><br>${place.type}`);
markers.push(marker);
}
});
let placesHtml = `<h6>周辺施設 (${data.data.places.length}件)</h6><div class="list-group">`;
data.data.places.forEach(place => {
placesHtml += `
<div class="list-group-item list-group-item-action">
<div class="d-flex w-100 justify-content-between">
<h6 class="mb-1">${place.name}</h6>
<small class="text-muted">${place.type}</small>
</div>
<small class="text-muted">${JSON.stringify(place.details)}</small>
</div>`;
});
placesHtml += `</div>`;
$('#mapResult').html(placesHtml);
addApiLog(`周辺施設を${data.data.places.length}件見つけました`, 'success');
} else {
$('#mapResult').html(`<div class="alert alert-danger">${data.error}</div>`);
addApiLog(`周辺施設検索に失敗: ${data.error}`, 'danger');
}
} catch (error) {
$('#mapResult').html('<div class="alert alert-danger">API呼び出し中にエラーが発生しました</div>');
addApiLog(`周辺施設検索エラー: ${error}`, 'danger');
}
}
function getCurrentLocation() {
if (!navigator.geolocation) {
alert('このブラウザはGeolocationをサポートしていません');
return;
}
addApiLog('現在地を取得中...');
navigator.geolocation.getCurrentPosition(
async (position) => {
const lat = position.coords.latitude;
const lng = position.coords.longitude;
try {
// 逆ジオコーディングで住所を取得
const response = await fetch(`/api/reverse-geocode?lat=${lat}&lon=${lng}`);
const data = await response.json();
if (data.success) {
const location = data.data;
// 地図を更新
map.setView([lat, lng], 15);
// マーカーを追加
if (currentMarker) {
map.removeLayer(currentMarker);
}
currentMarker = L.marker([lat, lng])
.addTo(map)
.bindPopup(`<strong>現在地</strong><br>${location.address}`)
.openPopup();
$('#mapResult').html(`
<div class="alert alert-info">
<strong>現在地:</strong> ${location.address}<br>
<strong>緯度:</strong> ${lat}<br>
<strong>経度:</strong> ${lng}<br>
<strong>精度:</strong> ${position.coords.accuracy}m
</div>
`);
addApiLog('現在地の取得に成功', 'success');
}
} catch (error) {
addApiLog(`現在地の取得中にエラー: ${error}`, 'danger');
}
},
(error) => {
let errorMessage = '位置情報の取得に失敗しました';
switch (error.code) {
case error.PERMISSION_DENIED:
errorMessage = '位置情報の利用が許可されていません';
break;
case error.POSITION_UNAVAILABLE:
errorMessage = '位置情報が利用できません';
break;
case error.TIMEOUT:
errorMessage = '位置情報の取得がタイムアウトしました';
break;
}
$('#mapResult').html(`<div class="alert alert-danger">${errorMessage}</div>`);
addApiLog(`現在地の取得に失敗: ${errorMessage}`, 'danger');
},
{ enableHighAccuracy: true, timeout: 5000, maximumAge: 0 }
);
}
// APIキー管理関数
async function saveApiKey() {
const serviceName = $('#saveServiceName').val();
const apiKey = $('#saveApiKey').val();
const password = $('#savePassword').val();
if (!serviceName || !apiKey) {
alert('サービス名とAPIキーを入力してください');
return;
}
addApiLog(`${serviceName}のAPIキーを保存中...`);
try {
const response = await fetch('/api/api-keys/manage', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
action: 'save',
service_name: serviceName,
api_key: apiKey,
password: password || null
})
});
const data = await response.json();
if (data.success) {
$('#apiKeyResult').html(`<div class="alert alert-success">${data.message}</div>`);
addApiLog(`${serviceName}のAPIキーを保存しました`, 'success');
// 入力欄をクリア
$('#saveServiceName').val('');
$('#saveApiKey').val('');
$('#savePassword').val('');
} else {
$('#apiKeyResult').html(`<div class="alert alert-danger">${data.error}</div>`);
addApiLog(`APIキーの保存に失敗: ${data.error}`, 'danger');
}
} catch (error) {
$('#apiKeyResult').html('<div class="alert alert-danger">API呼び出し中にエラーが発生しました</div>');
addApiLog(`APIキー保存エラー: ${error}`, 'danger');
}
}
async function getApiKey() {
const serviceName = $('#getServiceName').val();
const password = $('#getPassword').val();
if (!serviceName) {
alert('サービス名を入力してください');
return;
}
addApiLog(`${serviceName}のAPIキーを取得中...`);
try {
const response = await fetch('/api/api-keys/manage', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
action: 'get',
service_name: serviceName,
password: password || null
})
});
const data = await response.json();
if (data.success) {
$('#apiKeyResult').html(`
<div class="alert alert-success">
<strong>サービス名:</strong> ${data.service_name}<br>
<strong>APIキー:</strong> <code>${data.api_key}</code>
</div>
`);
addApiLog(`${serviceName}のAPIキーを取得しました`, 'success');
} else {
$('#apiKeyResult').html(`<div class="alert alert-danger">${data.error}</div>`);
addApiLog(`APIキーの取得に失敗: ${data.error}`, 'danger');
}
} catch (error) {
$('#apiKeyResult').html('<div class="alert alert-danger">API呼び出し中にエラーが発生しました</div>');
addApiLog(`APIキー取得エラー: ${error}`, 'danger');
}
}
async function listApiKeys() {
addApiLog('APIキーの一覧を取得中...');
try {
const response = await fetch('/api/api-keys/manage', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
action: 'list'
})
});
const data = await response.json();
if (data.success) {
let html = '<h6>保存済みAPIキー一覧</h6><ul class="list-group">';
if (data.services.length > 0) {
data.services.forEach(service => {
html += `<li class="list-group-item">${service}</li>`;
});
} else {
html += '<li class="list-group-item text-muted">保存されたAPIキーはありません</li>';
}
html += '</ul>';
$('#apiKeyResult').html(html);
addApiLog(`${data.services.length}件のAPIキーを一覧表示`, 'success');
} else {
$('#apiKeyResult').html(`<div class="alert alert-danger">${data.error}</div>`);
addApiLog(`APIキーの一覧取得に失敗: ${data.error}`, 'danger');
}
} catch (error) {
$('#apiKeyResult').html('<div class="alert alert-danger">API呼び出し中にエラーが発生しました</div>');
addApiLog(`APIキー一覧取得エラー: ${error}`, 'danger');
}
}
async function deleteApiKey() {
const serviceName = $('#deleteServiceName').val();
if (!serviceName) {
alert('サービス名を入力してください');
return;
}
if (!confirm(`本当に${serviceName}のAPIキーを削除しますか?`)) {
return;
}
addApiLog(`${serviceName}のAPIキーを削除中...`);
try {
const response = await fetch('/api/api-keys/manage', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
action: 'delete',
service_name: serviceName
})
});
const data = await response.json();
if (data.success) {
$('#apiKeyResult').html(`<div class="alert alert-success">${data.message}</div>`);
$('#deleteServiceName').val('');
addApiLog(`${serviceName}のAPIキーを削除しました`, 'success');
} else {
$('#apiKeyResult').html(`<div class="alert alert-danger">${data.error}</div>`);
addApiLog(`APIキーの削除に失敗: ${data.error}`, 'danger');
}
} catch (error) {
$('#apiKeyResult').html('<div class="alert alert-danger">API呼び出し中にエラーが発生しました</div>');
addApiLog(`APIキー削除エラー: ${error}`, 'danger');
}
}
// イベントリスナーの設定
$(document).ready(function() {
// 地図の初期化
initMap();
// 天気APIイベント
$('#getCurrentWeather').click(getCurrentWeather);
$('#getWeatherForecast').click(getWeatherForecast);
// 地図APIイベント
$('#searchLocation').click(searchLocation);
$('#findNearbyPlaces').click(findNearbyPlaces);
$('#getCurrentLocation').click(getCurrentLocation);
// APIキー管理イベント
$('#saveApiKeyBtn').click(saveApiKey);
$('#getApiKeyBtn').click(getApiKey);
$('#listApiKeysBtn').click(listApiKeys);
$('#deleteApiKeyBtn').click(deleteApiKey);
// エンターキーで住所検索
$('#searchAddress').keypress(function(e) {
if (e.which === 13) {
searchLocation();
return false;
}
});
// 初期データの読み込み
addApiLog('API連携デモページを読み込みました', 'info');
});
</script>
</body>
</html>
この HTML は外部 API 連携の動作確認用デモ画面で、Bootstrap によるレイアウトと Leaflet による地図表示を組み合わせています。画面は天気 API、地図 API、API キー管理、リクエストログの4領域で構成され、結果表示用にスクロール可能なボックスを用意しています。JavaScript は jQuery と fetch を使い、各ボタン操作を Flask 側の /api 配下エンドポイントへ送信して JSON 応答を画面に反映します。
天気は都市名と国コードを入力して現在天気または5日予報を取得し、説明文に応じてカード背景を切り替えつつ、温度や湿度などを表示します。地図は住所検索でジオコーディングして地図を移動しマーカーを立て、周辺施設検索では Overpass の結果を複数マーカーとして描画し一覧表示します。現在地取得はブラウザの Geolocation を利用し、逆ジオコーディングで住所を引いて地図に反映します。
API キー管理はサービス名を指定して保存、取得、一覧、削除を実行し、結果を専用エリアに表示します。全操作は addApiLog で時刻付きのログとして蓄積され、成功や失敗が色付きで分かるようになっています。なお、予報表示のループ条件が 8 件分までに制限されているため、5日分を表示したい意図と一致しない可能性があります。
まとめ
本章では、Flask アプリケーションにおける外部 API 連携の考え方と実装方法を一通り学びました。requests ライブラリを用いた基本的な通信やエラー処理、タイムアウト設定を土台に、OpenWeatherMap API を使った天気情報や予報の取得、OpenStreetMap 系 API によるジオコーディングや周辺施設検索までを実装例として確認しました。また、API キーを暗号化して保存し、環境変数やパスワードと組み合わせて安全に管理する方法や、レート制限やキャッシュ、ロギングといった実運用を意識した設計の重要性も押さえています。
外部 API を適切に利用することで、限られた開発リソースでも高機能なアプリケーションを短期間で構築でき、専門サービスに処理を委ねることで保守負担やリスクを減らせます。一方で、利用規約の遵守やレート制限への配慮、障害を前提としたエラー処理、API キーの安全な管理、応答待ちを意識したユーザー体験設計などを怠らないことが、安定した API 連携を実現するうえで欠かせません。
演習問題
初級問題(3問)
問題1:基本的なAPIリクエスト
指定されたURLからデータを取得し、JSON形式で返す関数を作成してください。タイムアウトは10秒に設定し、エラーが発生した場合はNoneを返すようにしてください。
問題2:APIパラメータの構築
辞書形式のパラメータをURLクエリ文字列に変換する関数を作成してください。値がNoneのパラメータは除外し、日本語を含むパラメータは適切にエンコードするようにしてください。
問題3:簡易キャッシュシステム
関数の結果を指定時間キャッシュするデコレータを作成してください。同じ引数での呼び出しは、キャッシュが有効な間は再計算せずにキャッシュから結果を返すようにしてください。
中級問題(6問)
問題4:複数APIの並列呼び出し
複数のAPIエンドポイントから同時にデータを取得し、全ての結果が揃ったらまとめて返す関数を作成してください。asyncioまたはthreadingを使用して並列処理を実装してください。
問題5:APIレスポンスのバリデーション
APIからのレスポンスを検証するクラスを作成してください。必須フィールドのチェック、データ型の検証、値の範囲チェックなどを行い、不正なデータが来た場合は適切な例外を発生させるようにしてください。
問題6:リトライ機能付きAPIクライアント
一時的なネットワークエラーに対して自動リトライを行うAPIクライアントを作成してください。指数バックオフアルゴリズムを使用し、最大リトライ回数とリトライ間隔を設定可能にしてください。
問題7:API使用量のモニタリング
APIの呼び出し回数、レスポンス時間、エラー率などを記録し、統計情報を提供するモニタリングシステムを作成してください。一定の閾値を超えた場合にアラートを発行する機能も実装してください。
問題8:APIキーのローテーションシステム
複数のAPIキーを管理し、使用回数やエラー率に基づいて自動的にキーを切り替えるシステムを作成してください。キーの使用状況を記録し、最も効率的なキーを選択するアルゴリズムを実装してください。
問題9:APIレスポンスの差分検出
定期的にAPIを呼び出し、前回のレスポンスとの差分を検出するシステムを作成してください。重要な変更があった場合のみ通知を送信し、無視しても良い小さな変更はフィルタリングする機能も実装してください。
上級問題(3問)
問題10:分散型APIキャッシュシステム
複数のサーバー間でAPIレスポンスを共有できる分散キャッシュシステムを設計・実装してください。RedisやMemcachedなどの外部キャッシュサーバーを使用し、キャッシュの一貫性と効率性を確保してください。
問題11:API依存関係グラフの最適化
複数のAPI呼び出し間の依存関係を分析し、並列実行可能な部分を見つけて実行時間を最適化するシステムを作成してください。依存関係グラフを構築し、トポロジカルソートを使用して実行順序を決定してください。
問題12:機械学習を用いたAPI異常検知
APIのレスポンス時間、エラー率、レスポンス内容などの特徴量を収集し、機械学習モデルを使用して異常な振る舞いを検出するシステムを作成してください。正常なパターンから逸脱した場合に自動的にアラートを発行するようにしてください。
演習問題 解答例
初級問題(3問)
問題1:基本的な画像リサイズ
from PIL import Image
from typing import Tuple, Optional
import os
def resize_image_with_aspect_ratio(
input_path: str,
output_path: str,
max_size: Tuple[int, int] = (800, 600),
quality: int = 85
) -> bool:
"""
画像を指定した最大サイズにリサイズする。
アスペクト比は保持し、画像が指定サイズより小さい場合はリサイズしない。
Args:
input_path: 入力画像パス
output_path: 出力画像パス
max_size: 最大サイズ (幅, 高さ)
quality: 保存品質 (1-100)
Returns:
bool: 処理が成功したかどうか
"""
try:
# 画像を開く
with Image.open(input_path) as img:
# 元のサイズを取得
original_width, original_height = img.size
max_width, max_height = max_size
# 元の画像が最大サイズより小さい場合はリサイズしない
if original_width <= max_width and original_height <= max_height:
# サイズ変更なしで保存
img.save(output_path, quality=quality)
print(f"画像は指定サイズより小さいため、リサイズせずに保存しました。")
return True
# アスペクト比を計算
aspect_ratio = original_width / original_height
target_ratio = max_width / max_height
# リサイズ後のサイズを計算
if aspect_ratio > target_ratio:
# 横長の画像
new_width = max_width
new_height = int(max_width / aspect_ratio)
else:
# 縦長または正方形の画像
new_width = int(max_height * aspect_ratio)
new_height = max_height
# リサイズ実行
resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
# 保存
resized_img.save(output_path, quality=quality, optimize=True)
print(f"画像を {original_width}x{original_height} -> {new_width}x{new_height} にリサイズしました。")
return True
except FileNotFoundError:
print(f"エラー: ファイル '{input_path}' が見つかりません。")
return False
except Exception as e:
print(f"エラー: 画像処理中に問題が発生しました - {e}")
return False
# 使用例
if __name__ == "__main__":
# テスト用の画像処理
success = resize_image_with_aspect_ratio(
input_path="input.jpg",
output_path="output_resized.jpg",
max_size=(800, 600),
quality=90
)
if success:
print("画像リサイズが成功しました。")
else:
print("画像リサイズが失敗しました。")
問題2:画像フォーマット変換
from PIL import Image
from typing import Optional
import os
def convert_jpeg_to_png(
input_path: str,
output_path: str,
preserve_transparency: bool = True
) -> bool:
"""
JPEG画像をPNG形式に変換する。
透過情報が重要な場合は、背景を透明化するオプションを提供。
Args:
input_path: 入力JPEG画像パス
output_path: 出力PNG画像パス
preserve_transparency: 透過情報を保持するか(デフォルトはTrue)
Returns:
bool: 処理が成功したかどうか
"""
try:
# 入力ファイルの拡張子をチェック
if not input_path.lower().endswith(('.jpg', '.jpeg', '.jpe')):
print(f"警告: 入力ファイル '{input_path}' はJPEG形式ではない可能性があります。")
# 画像を開く
with Image.open(input_path) as img:
# 画像モードを確認
print(f"元画像のモード: {img.mode}")
if preserve_transparency:
# 透過情報を保持する処理
if img.mode == 'RGB':
# RGB画像をRGBAに変換(アルファチャンネルを追加)
rgba_img = Image.new('RGBA', img.size)
# RGB値をRGBAにコピー(アルファ値は255=完全不透明)
rgba_pixels = []
for r, g, b in img.getdata():
rgba_pixels.append((r, g, b, 255))
rgba_img.putdata(rgba_pixels)
img = rgba_img
elif img.mode == 'L': # グレースケール
# グレースケールをLAに変換
la_img = Image.new('LA', img.size)
la_pixels = []
for value in img.getdata():
la_pixels.append((value, 255))
la_img.putdata(la_pixels)
img = la_img
print(f"変換後のモード: {img.mode}")
# PNGとして保存(圧縮レベルを指定可能)
img.save(output_path, 'PNG', compress_level=6, optimize=True)
# ファイルサイズを比較
input_size = os.path.getsize(input_path)
output_size = os.path.getsize(output_path)
print(f"変換完了:")
print(f" 入力: {input_path} ({input_size:,} bytes)")
print(f" 出力: {output_path} ({output_size:,} bytes)")
print(f" サイズ比: {output_size/input_size:.2%}")
return True
except FileNotFoundError:
print(f"エラー: ファイル '{input_path}' が見つかりません。")
return False
except Exception as e:
print(f"エラー: 画像変換中に問題が発生しました - {e}")
return False
def convert_png_to_jpeg(
input_path: str,
output_path: str,
background_color: tuple = (255, 255, 255), # 白背景
quality: int = 85
) -> bool:
"""
PNG画像をJPEG形式に変換する。
透過部分は指定した背景色で塗りつぶす。
Args:
input_path: 入力PNG画像パス
output_path: 出力JPEG画像パス
background_color: 背景色 (R, G, B)
quality: JPEG品質 (1-100)
Returns:
bool: 処理が成功したかどうか
"""
try:
# 画像を開く
with Image.open(input_path) as img:
# RGBAまたはLAモードの場合は背景を追加
if img.mode in ('RGBA', 'LA', 'P'):
# 背景用のRGB画像を作成
background = Image.new('RGB', img.size, background_color)
if img.mode == 'RGBA':
# RGBAからRGBに変換(アルファチャンネルを考慮)
if img.mode == 'RGBA':
# アルファチャンネルを考慮して合成
background.paste(img, mask=img.split()[3])
img = background
elif img.mode == 'LA':
# LAからRGBに変換
rgb_img = Image.new('RGB', img.size, background_color)
rgb_img.paste(img, mask=img.split()[1])
img = rgb_img
elif img.mode == 'P':
# パレット画像をRGBに変換
img = img.convert('RGB')
# JPEGとして保存
img.save(output_path, 'JPEG', quality=quality, optimize=True)
print(f"変換完了: {input_path} -> {output_path}")
return True
except Exception as e:
print(f"エラー: {e}")
return False
# 使用例
if __name__ == "__main__":
# JPEGからPNGへの変換
print("=== JPEGからPNGへの変換 ===")
success1 = convert_jpeg_to_png(
input_path="photo.jpg",
output_path="photo_converted.png",
preserve_transparency=True
)
# PNGからJPEGへの変換
print("\n=== PNGからJPEGへの変換 ===")
success2 = convert_png_to_jpeg(
input_path="image.png",
output_path="image_converted.jpg",
background_color=(255, 255, 255), # 白背景
quality=90
)
問題3:簡単な画像情報取得
from PIL import Image
from typing import Dict, Any
import os
from datetime import datetime
def get_image_info(image_path: str) -> Dict[str, Any]:
"""
画像ファイルから基本情報を取得する。
Args:
image_path: 画像ファイルのパス
Returns:
Dict[str, Any]: 画像情報の辞書
"""
try:
# ファイルが存在するか確認
if not os.path.exists(image_path):
return {"error": f"ファイル '{image_path}' が見つかりません"}
# ファイル情報の取得
file_stat = os.stat(image_path)
with Image.open(image_path) as img:
# 基本情報を取得
info = {
'file_info': {
'path': os.path.abspath(image_path),
'filename': os.path.basename(image_path),
'size_bytes': file_stat.st_size,
'size_formatted': _format_file_size(file_stat.st_size),
'created': datetime.fromtimestamp(file_stat.st_ctime).strftime('%Y-%m-%d %H:%M:%S'),
'modified': datetime.fromtimestamp(file_stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'),
},
'image_info': {
'format': img.format,
'mode': img.mode,
'width': img.width,
'height': img.height,
'size': f"{img.width} × {img.height}",
'aspect_ratio': round(img.width / img.height, 3) if img.height > 0 else 0,
'megapixels': round((img.width * img.height) / 1000000, 2),
}
}
# カラープロファイル情報(あれば)
if hasattr(img, 'info') and 'icc_profile' in img.info:
info['image_info']['has_icc_profile'] = True
else:
info['image_info']['has_icc_profile'] = False
# EXIF情報(あれば)
try:
exif_data = img._getexif()
if exif_data:
# 主要なEXIFタグを抽出
exif_tags = {}
# EXIFタグの定義(一部)
exif_tag_names = {
271: 'Make', # メーカー
272: 'Model', # モデル
274: 'Orientation', # 方向
282: 'XResolution', # X解像度
283: 'YResolution', # Y解像度
306: 'DateTime', # 日時
33434: 'ExposureTime', # 露出時間
33437: 'FNumber', # F値
36867: 'DateTimeOriginal', # 撮影日時
37378: 'FocalLength', # 焦点距離
}
for tag_id, value in exif_data.items():
if tag_id in exif_tag_names:
tag_name = exif_tag_names[tag_id]
exif_tags[tag_name] = str(value)
info['exif_info'] = exif_tags
except Exception:
info['exif_info'] = None
# 高度な画像解析
info['analysis'] = _analyze_image(img)
return info
except Exception as e:
return {"error": f"情報取得中にエラーが発生しました: {str(e)}"}
def _format_file_size(size_bytes: int) -> str:
"""ファイルサイズを読みやすい形式にフォーマット"""
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024.0:
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.2f} TB"
def _analyze_image(img: Image.Image) -> Dict[str, Any]:
"""画像の詳細分析"""
analysis = {}
# モードに基づく情報
mode_info = {
'1': '1-bit pixels, black and white',
'L': '8-bit pixels, grayscale',
'P': '8-bit pixels, mapped to any mode using palette',
'RGB': '3x8-bit pixels, true color',
'RGBA': '4x8-bit pixels, true color with transparency mask',
'CMYK': '4x8-bit pixels, color separation',
'YCbCr': '3x8-bit pixels, color video format',
'LAB': '3x8-bit pixels, L*a*b color space',
'HSV': '3x8-bit pixels, Hue, Saturation, Value',
'I': '32-bit signed integer pixels',
'F': '32-bit floating point pixels',
}
analysis['mode_description'] = mode_info.get(img.mode, 'Unknown mode')
# 画像の統計情報(サンプリング)
if img.mode in ('RGB', 'RGBA'):
# 画像からサンプルピクセルを取得
sample_size = min(1000, img.width * img.height)
pixels = list(img.getdata())[:sample_size]
# RGBチャンネルの平均値を計算
if pixels:
if img.mode == 'RGB':
r_sum = g_sum = b_sum = 0
for r, g, b in pixels:
r_sum += r
g_sum += g
b_sum += b
analysis['average_color'] = (
r_sum // len(pixels),
g_sum // len(pixels),
b_sum // len(pixels)
)
elif img.mode == 'RGBA':
r_sum = g_sum = b_sum = a_sum = 0
for r, g, b, a in pixels:
r_sum += r
g_sum += g
b_sum += b
a_sum += a
analysis['average_color'] = (
r_sum // len(pixels),
g_sum // len(pixels),
b_sum // len(pixels),
a_sum // len(pixels)
)
# 画像のタイプを推測
width, height = img.size
if width == height:
analysis['shape'] = 'square'
elif width > height:
analysis['shape'] = 'landscape'
else:
analysis['shape'] = 'portrait'
# 解像度情報
dpi = img.info.get('dpi', (72, 72))
analysis['dpi'] = dpi
analysis['print_size_inch'] = (
round(width / dpi[0], 2) if dpi[0] > 0 else 0,
round(height / dpi[1], 2) if dpi[1] > 0 else 0
)
return analysis
def print_image_info(info: Dict[str, Any]):
"""画像情報を読みやすい形式で表示"""
if 'error' in info:
print(f"エラー: {info['error']}")
return
print("=" * 50)
print("画像情報")
print("=" * 50)
# ファイル情報
print("\n📁 ファイル情報:")
print(f" ファイル名: {info['file_info']['filename']}")
print(f" パス: {info['file_info']['path']}")
print(f" サイズ: {info['file_info']['size_formatted']} ({info['file_info']['size_bytes']:,} bytes)")
print(f" 作成日時: {info['file_info']['created']}")
print(f" 更新日時: {info['file_info']['modified']}")
# 画像情報
print("\n🖼️ 画像情報:")
print(f" 形式: {info['image_info']['format']}")
print(f" モード: {info['image_info']['mode']}")
print(f" サイズ: {info['image_info']['size']}")
print(f" 解像度: {info['image_info']['megapixels']} MP")
print(f" アスペクト比: {info['image_info']['aspect_ratio']}")
print(f" ICCプロファイル: {'有り' if info['image_info']['has_icc_profile'] else '無し'}")
# EXIF情報
if info.get('exif_info'):
print("\n📸 EXIF情報:")
for key, value in info['exif_info'].items():
print(f" {key}: {value}")
# 分析情報
if info.get('analysis'):
print("\n🔍 分析情報:")
analysis = info['analysis']
print(f" 形状: {analysis['shape']}")
print(f" モード詳細: {analysis['mode_description']}")
if 'average_color' in analysis:
color = analysis['average_color']
if len(color) == 3:
print(f" 平均色: RGB{color}")
elif len(color) == 4:
print(f" 平均色: RGBA{color}")
print(f" DPI: {analysis['dpi']}")
print_size = analysis['print_size_inch']
print(f" 印刷サイズ: {print_size[0]} × {print_size[1]} インチ")
# 使用例
if __name__ == "__main__":
# 画像情報を取得
image_path = "sample.jpg" # テスト用の画像パス
info = get_image_info(image_path)
# 情報を表示
print_image_info(info)
# JSON形式で保存も可能
import json
with open("image_info.json", "w", encoding="utf-8") as f:
json.dump(info, f, indent=2, ensure_ascii=False)
print("\n💾 情報を 'image_info.json' に保存しました。")
中級問題(6問)
問題4:画像の自動回転
from PIL import Image, ImageOps
import os
from typing import Optional, Tuple
class AutoImageRotator:
"""スマートフォン画像の自動回転クラス"""
# EXIFのOrientationタグと回転角度の対応
ORIENTATION_MAP = {
1: (0, False), # 通常
2: (0, True), # 左右反転
3: (180, False), # 180度回転
4: (180, True), # 180度回転 + 左右反転
5: (270, True), # 270度回転 + 左右反転
6: (270, False), # 270度回転
7: (90, True), # 90度回転 + 左右反転
8: (90, False), # 90度回転
}
def __init__(self, preserve_exif: bool = True):
"""
Args:
preserve_exif: EXIF情報を保持するかどうか
"""
self.preserve_exif = preserve_exif
def rotate_image_by_exif(self, image_path: str, output_path: Optional[str] = None) -> Tuple[bool, str]:
"""
EXIF情報に基づいて画像を自動回転
Args:
image_path: 入力画像パス
output_path: 出力画像パス(Noneの場合は上書き)
Returns:
Tuple[bool, str]: (成功したかどうか, メッセージ)
"""
try:
# 出力パスの決定
if output_path is None:
base, ext = os.path.splitext(image_path)
output_path = f"{base}_rotated{ext}"
# 画像を開く
with Image.open(image_path) as img:
# EXIF情報の取得
exif = img.getexif()
# Orientationタグの取得
orientation = exif.get(274) # 274はOrientationタグ
if orientation is None or orientation == 1:
# 回転不要
if output_path != image_path:
img.save(output_path)
return True, "回転不要(既に正しい向き)"
return False, "回転不要(既に正しい向き)"
if orientation not in self.ORIENTATION_MAP:
return False, f"未知のOrientation値: {orientation}"
# 回転角度と反転フラグを取得
rotation_angle, flip_horizontal = self.ORIENTATION_MAP[orientation]
# 画像処理
processed_img = img
# 反転処理
if flip_horizontal:
processed_img = ImageOps.mirror(processed_img)
# 回転処理
if rotation_angle != 0:
processed_img = processed_img.rotate(
rotation_angle,
expand=True, # 画像サイズを調整
resample=Image.Resampling.BICUBIC
)
# EXIF情報の更新(Orientationを1に設定)
if self.preserve_exif:
new_exif = dict(exif)
new_exif[274] = 1 # Orientationを通常に設定
else:
new_exif = None
# 画像を保存
processed_img.save(
output_path,
exif=new_exif,
quality=95,
optimize=True
)
# 処理情報を返す
action = "回転" if rotation_angle != 0 else ""
action += "および" if rotation_angle != 0 and flip_horizontal else ""
action += "反転" if flip_horizontal else ""
return True, f"画像を{action}しました(Orientation: {orientation})"
except Exception as e:
return False, f"エラー: {str(e)}"
def batch_rotate_images(self, input_dir: str, output_dir: Optional[str] = None,
extensions: Tuple[str, ...] = ('.jpg', '.jpeg', '.png', '.tiff')):
"""
ディレクトリ内の全画像を自動回転
Args:
input_dir: 入力ディレクトリ
output_dir: 出力ディレクトリ(Noneの場合はinput_dir内に'rotated'フォルダ作成)
extensions: 処理する拡張子のタプル
"""
# 出力ディレクトリの設定
if output_dir is None:
output_dir = os.path.join(input_dir, 'rotated')
# 出力ディレクトリがなければ作成
os.makedirs(output_dir, exist_ok=True)
results = {
'total': 0,
'success': 0,
'failed': 0,
'details': []
}
# ディレクトリ内のファイルを処理
for filename in os.listdir(input_dir):
# 拡張子チェック
if not filename.lower().endswith(extensions):
continue
input_path = os.path.join(input_dir, filename)
# ディレクトリはスキップ
if os.path.isdir(input_path):
continue
results['total'] += 1
# 出力パス
output_path = os.path.join(output_dir, filename)
# 回転処理
success, message = self.rotate_image_by_exif(input_path, output_path)
if success:
results['success'] += 1
status = "成功"
else:
results['failed'] += 1
status = "失敗"
results['details'].append({
'filename': filename,
'status': status,
'message': message
})
print(f"{filename}: {status} - {message}")
return results
def detect_orientation(self, image_path: str) -> Dict[str, Any]:
"""
画像の向き情報を検出
Args:
image_path: 画像パス
Returns:
Dict[str, Any]: 向き情報
"""
try:
with Image.open(image_path) as img:
# EXIF情報取得
exif = img.getexif()
orientation = exif.get(274)
# 画像のアスペクト比からも向きを推測
width, height = img.size
aspect_ratio = width / height
# 向きの判定
orientation_info = {
'exif_orientation': orientation,
'exif_description': self._get_orientation_description(orientation),
'image_size': f"{width}×{height}",
'aspect_ratio': round(aspect_ratio, 3),
'inferred_orientation': None,
'needs_rotation': False
}
# アスペクト比から向きを推測
if aspect_ratio > 1.2:
orientation_info['inferred_orientation'] = 'landscape'
elif aspect_ratio < 0.8:
orientation_info['inferred_orientation'] = 'portrait'
else:
orientation_info['inferred_orientation'] = 'square'
# 回転が必要か判定
if orientation in [3, 6, 8]:
orientation_info['needs_rotation'] = True
elif orientation is None:
# EXIFがない場合、アスペクト比で推測
if orientation_info['inferred_orientation'] == 'portrait' and width > height:
orientation_info['needs_rotation'] = True
return orientation_info
except Exception as e:
return {'error': str(e)}
def _get_orientation_description(self, orientation: Optional[int]) -> str:
"""Orientation値の説明を取得"""
if orientation is None:
return "EXIF情報なし"
descriptions = {
1: "通常(回転不要)",
2: "左右反転",
3: "180度回転",
4: "180度回転 + 左右反転",
5: "270度回転 + 左右反転",
6: "270度回転",
7: "90度回転 + 左右反転",
8: "90度回転"
}
return descriptions.get(orientation, f"未知の値: {orientation}")
# 使用例
def main():
"""自動回転機能の使用例"""
# インスタンス作成
rotator = AutoImageRotator(preserve_exif=True)
print("=== 単一画像の回転 ===")
# テスト用の画像パス
test_image = "test_photo.jpg"
if os.path.exists(test_image):
# 向き情報の検出
orientation_info = rotator.detect_orientation(test_image)
print("向き情報:")
for key, value in orientation_info.items():
print(f" {key}: {value}")
# 自動回転
success, message = rotator.rotate_image_by_exif(
test_image,
"test_photo_rotated.jpg"
)
print(f"\n回転結果: {message}")
else:
print(f"テスト画像 '{test_image}' が見つかりません")
print("\n=== バッチ処理 ===")
# テスト用ディレクトリ
test_dir = "photos"
if os.path.exists(test_dir) and os.path.isdir(test_dir):
results = rotator.batch_rotate_images(
input_dir=test_dir,
output_dir="photos_rotated",
extensions=('.jpg', '.jpeg', '.png')
)
print(f"\nバッチ処理結果:")
print(f" 合計: {results['total']} ファイル")
print(f" 成功: {results['success']}")
print(f" 失敗: {results['failed']}")
else:
print(f"テストディレクトリ '{test_dir}' が見つかりません")
if __name__ == "__main__":
main()
問題5:画像のウォーターマーク追加
from PIL import Image, ImageDraw, ImageFont, ImageEnhance
from typing import Union, Tuple, Optional
import os
class WatermarkAdder:
"""画像にウォーターマークを追加するクラス"""
def __init__(self):
self.font_cache = {}
def add_text_watermark(
self,
image_path: str,
text: str,
output_path: Optional[str] = None,
position: str = "bottom-right",
font_size: int = 36,
font_path: Optional[str] = None,
text_color: Tuple[int, int, int, int] = (255, 255, 255, 128),
bg_color: Optional[Tuple[int, int, int, int]] = None,
padding: int = 20,
rotation: int = 0,
shadow: bool = True
) -> bool:
"""
テキストウォーターマークを追加
Args:
image_path: 入力画像パス
text: ウォーターマークテキスト
output_path: 出力パス
position: 位置 ("top-left", "top-right", "bottom-left", "bottom-right", "center", "tile")
font_size: フォントサイズ
font_path: フォントファイルパス(Noneの場合はデフォルトフォント)
text_color: テキスト色 (R, G, B, A)
bg_color: 背景色 (R, G, B, A)(Noneの場合は背景なし)
padding: 余白
rotation: 回転角度(度)
shadow: 影を付けるかどうか
Returns:
bool: 成功したかどうか
"""
try:
# 出力パスの決定
if output_path is None:
base, ext = os.path.splitext(image_path)
output_path = f"{base}_watermarked{ext}"
# 画像を開く
with Image.open(image_path) as img:
# RGBAに変換(透過処理のため)
if img.mode != 'RGBA':
img = img.convert('RGBA')
# フォントの読み込み
font = self._load_font(font_path, font_size)
# テキストサイズの計算
draw = ImageDraw.Draw(Image.new('RGBA', (1, 1)))
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
# 位置の計算
img_width, img_height = img.size
position_coords = self._calculate_position(
position, img_width, img_height,
text_width, text_height, padding
)
# ウォーターマーク画像の作成
watermark = self._create_text_watermark_image(
text, font, text_color, bg_color,
text_width, text_height, rotation, shadow
)
# 元画像にウォーターマークを合成
if position == "tile":
result = self._tile_watermark(img, watermark, padding)
else:
result = self._place_watermark(img, watermark, position_coords)
# 保存
if output_path.lower().endswith('.jpg') or output_path.lower().endswith('.jpeg'):
result = result.convert('RGB')
result.save(output_path, quality=95, optimize=True)
print(f"ウォーターマークを追加しました: {output_path}")
return True
except Exception as e:
print(f"エラー: {str(e)}")
return False
def add_image_watermark(
self,
image_path: str,
watermark_path: str,
output_path: Optional[str] = None,
position: str = "bottom-right",
scale: float = 0.2,
opacity: float = 0.7,
padding: int = 20,
rotation: int = 0
) -> bool:
"""
画像ウォーターマークを追加
Args:
image_path: 入力画像パス
watermark_path: ウォーターマーク画像パス
output_path: 出力パス
position: 位置
scale: 元画像に対するウォーターマークのスケール (0.0-1.0)
opacity: 不透明度 (0.0-1.0)
padding: 余白
rotation: 回転角度
Returns:
bool: 成功したかどうか
"""
try:
# 出力パスの決定
if output_path is None:
base, ext = os.path.splitext(image_path)
output_path = f"{base}_watermarked{ext}"
# メイン画像を開く
with Image.open(image_path) as img:
# RGBAに変換
if img.mode != 'RGBA':
img = img.convert('RGBA')
# ウォーターマーク画像を開く
with Image.open(watermark_path) as watermark_img:
# ウォーターマークをRGBAに変換
if watermark_img.mode != 'RGBA':
watermark_img = watermark_img.convert('RGBA')
# スケーリング
watermark_size = self._calculate_watermark_size(
img.size, watermark_img.size, scale
)
watermark_resized = watermark_img.resize(
watermark_size, Image.Resampling.LANCZOS
)
# 不透明度調整
if opacity < 1.0:
watermark_resized = self._adjust_opacity(
watermark_resized, opacity
)
# 回転
if rotation != 0:
watermark_resized = watermark_resized.rotate(
rotation, expand=True, resample=Image.Resampling.BICUBIC
)
# 位置の計算
position_coords = self._calculate_position(
position, img.width, img.height,
watermark_resized.width, watermark_resized.height,
padding
)
# 合成
result = self._place_watermark(
img, watermark_resized, position_coords
)
# 保存
if output_path.lower().endswith(('.jpg', '.jpeg')):
result = result.convert('RGB')
result.save(output_path, quality=95, optimize=True)
print(f"画像ウォーターマークを追加しました: {output_path}")
return True
except Exception as e:
print(f"エラー: {str(e)}")
return False
def add_diagonal_watermark(
self,
image_path: str,
text: str,
output_path: Optional[str] = None,
font_size: int = 48,
font_path: Optional[str] = None,
text_color: Tuple[int, int, int, int] = (255, 255, 255, 64),
angle: int = 45,
spacing: int = 200
) -> bool:
"""
斜めに繰り返しウォーターマークを追加
Args:
image_path: 入力画像パス
text: ウォーターマークテキスト
output_path: 出力パス
font_size: フォントサイズ
font_path: フォントファイルパス
text_color: テキスト色
angle: 角度
spacing: ウォーターマーク間の間隔
Returns:
bool: 成功したかどうか
"""
try:
if output_path is None:
base, ext = os.path.splitext(image_path)
output_path = f"{base}_diagonal_watermark{ext}"
with Image.open(image_path) as img:
if img.mode != 'RGBA':
img = img.convert('RGBA')
# フォントの読み込み
font = self._load_font(font_path, font_size)
# テキストサイズの計算
draw = ImageDraw.Draw(Image.new('RGBA', (1, 1)))
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
# ウォーターマーク画像の作成
watermark = Image.new('RGBA', (text_width + 20, text_height + 20), (0, 0, 0, 0))
watermark_draw = ImageDraw.Draw(watermark)
# テキストを描画
watermark_draw.text(
(10, 10), text,
font=font, fill=text_color
)
# 画像を回転
watermark_rotated = watermark.rotate(
angle, expand=True, resample=Image.Resampling.BICUBIC
)
# 斜めに繰り返し配置
result = img.copy()
wm_width, wm_height = watermark_rotated.size
for x in range(-wm_width, img.width + wm_width, spacing):
for y in range(-wm_height, img.height + wm_height, spacing):
# 位置を調整して斜めに配置
offset_x = x
offset_y = y
# マスクを使って合成
mask = watermark_rotated.split()[3] # アルファチャンネルをマスクとして使用
result.paste(watermark_rotated, (offset_x, offset_y), mask)
# 保存
if output_path.lower().endswith(('.jpg', '.jpeg')):
result = result.convert('RGB')
result.save(output_path, quality=95, optimize=True)
print(f"斜めウォーターマークを追加しました: {output_path}")
return True
except Exception as e:
print(f"エラー: {str(e)}")
return False
def _load_font(self, font_path: Optional[str], font_size: int) -> ImageFont.FreeTypeFont:
"""フォントを読み込む(キャッシュを使用)"""
cache_key = f"{font_path}_{font_size}"
if cache_key not in self.font_cache:
try:
if font_path and os.path.exists(font_path):
font = ImageFont.truetype(font_path, font_size)
else:
# デフォルトフォントを使用
font = ImageFont.load_default()
# デフォルトフォントはサイズ調整ができないので、大きいフォントをロード
try:
font = ImageFont.truetype("arial.ttf", font_size)
except:
# 最終手段
font = ImageFont.load_default()
self.font_cache[cache_key] = font
except Exception as e:
print(f"フォント読み込みエラー: {e}")
font = ImageFont.load_default()
self.font_cache[cache_key] = font
return self.font_cache[cache_key]
def _calculate_position(
self,
position: str,
img_width: int,
img_height: int,
wm_width: int,
wm_height: int,
padding: int
) -> Tuple[int, int]:
"""ウォーターマークの位置を計算"""
if position == "top-left":
return (padding, padding)
elif position == "top-right":
return (img_width - wm_width - padding, padding)
elif position == "bottom-left":
return (padding, img_height - wm_height - padding)
elif position == "bottom-right":
return (img_width - wm_width - padding, img_height - wm_height - padding)
elif position == "center":
return ((img_width - wm_width) // 2, (img_height - wm_height) // 2)
else: # bottom-rightをデフォルト
return (img_width - wm_width - padding, img_height - wm_height - padding)
def _create_text_watermark_image(
self,
text: str,
font: ImageFont.FreeTypeFont,
text_color: Tuple[int, int, int, int],
bg_color: Optional[Tuple[int, int, int, int]],
text_width: int,
text_height: int,
rotation: int,
shadow: bool
) -> Image.Image:
"""テキストウォーターマーク画像を作成"""
# 背景を含むサイズを計算
padding = 20
img_width = text_width + padding * 2
img_height = text_height + padding * 2
# ウォーターマーク画像を作成
watermark = Image.new('RGBA', (img_width, img_height), (0, 0, 0, 0))
draw = ImageDraw.Draw(watermark)
# 背景を描画
if bg_color:
draw.rectangle(
[0, 0, img_width, img_height],
fill=bg_color
)
# 影を描画
if shadow:
shadow_color = (0, 0, 0, text_color[3] // 2)
draw.text(
(padding + 2, padding + 2), text,
font=font, fill=shadow_color
)
# テキストを描画
draw.text(
(padding, padding), text,
font=font, fill=text_color
)
# 回転
if rotation != 0:
watermark = watermark.rotate(
rotation, expand=True, resample=Image.Resampling.BICUBIC
)
return watermark
def _calculate_watermark_size(
self,
img_size: Tuple[int, int],
wm_size: Tuple[int, int],
scale: float
) -> Tuple[int, int]:
"""ウォーターマークのサイズを計算"""
img_width, img_height = img_size
wm_width, wm_height = wm_size
# スケールに基づく最大サイズ
max_width = int(img_width * scale)
max_height = int(img_height * scale)
# アスペクト比を保持
aspect_ratio = wm_width / wm_height
if max_width / aspect_ratio <= max_height:
new_width = max_width
new_height = int(max_width / aspect_ratio)
else:
new_height = max_height
new_width = int(max_height * aspect_ratio)
return (new_width, new_height)
def _adjust_opacity(self, image: Image.Image, opacity: float) -> Image.Image:
"""画像の不透明度を調整"""
if opacity >= 1.0:
return image
# アルファチャンネルを調整
alpha = image.split()[3]
alpha = ImageEnhance.Brightness(alpha).enhance(opacity)
# 新しいアルファチャンネルを設定
image.putalpha(alpha)
return image
def _place_watermark(
self,
base_image: Image.Image,
watermark: Image.Image,
position: Tuple[int, int]
) -> Image.Image:
"""ウォーターマークを配置"""
result = base_image.copy()
result.paste(watermark, position, watermark)
return result
def _tile_watermark(
self,
base_image: Image.Image,
watermark: Image.Image,
spacing: int
) -> Image.Image:
"""ウォーターマークをタイル状に配置"""
result = base_image.copy()
wm_width, wm_height = watermark.size
for x in range(0, base_image.width, wm_width + spacing):
for y in range(0, base_image.height, wm_height + spacing):
result.paste(watermark, (x, y), watermark)
return result
# 使用例
def main():
"""ウォーターマーク機能の使用例"""
# インスタンス作成
watermarker = WatermarkAdder()
print("=== テキストウォーターマーク ===")
# シンプルなテキストウォーターマーク
success1 = watermarker.add_text_watermark(
image_path="photo.jpg",
text="© My Company 2024",
output_path="photo_watermarked.jpg",
position="bottom-right",
font_size=24,
text_color=(255, 255, 255, 160),
bg_color=(0, 0, 0, 100),
padding=10
)
print(f"結果: {'成功' if success1 else '失敗'}")
print("\n=== 画像ウォーターマーク ===")
# 画像ウォーターマーク
success2 = watermarker.add_image_watermark(
image_path="photo.jpg",
watermark_path="logo.png",
output_path="photo_logo_watermarked.jpg",
position="bottom-right",
scale=0.2,
opacity=0.7
)
print(f"結果: {'成功' if success2 else '失敗'}")
print("\n=== 斜めウォーターマーク ===")
# 斜めの繰り返しウォーターマーク
success3 = watermarker.add_diagonal_watermark(
image_path="photo.jpg",
text="CONFIDENTIAL",
output_path="photo_confidential.jpg",
font_size=36,
text_color=(255, 0, 0, 64),
angle=45,
spacing=150
)
print(f"結果: {'成功' if success3 else '失敗'}")
if __name__ == "__main__":
main()
問題6:画像の一括処理
from PIL import Image, ImageOps
import os
from pathlib import Path
from typing import List, Tuple, Optional, Dict, Callable
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
import json
class BatchImageProcessor:
"""画像の一括処理クラス"""
def __init__(self, max_workers: Optional[int] = None):
"""
Args:
max_workers: 並列処理の最大ワーカー数
"""
self.max_workers = max_workers or multiprocessing.cpu_count()
self.processed_count = 0
self.failed_count = 0
def process_directory(
self,
input_dir: str,
output_dir: str,
operations: List[Dict[str, any]],
extensions: Tuple[str, ...] = ('.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'),
recursive: bool = False,
overwrite: bool = False,
use_multiprocessing: bool = True
) -> Dict[str, any]:
"""
ディレクトリ内の画像を一括処理
Args:
input_dir: 入力ディレクトリ
output_dir: 出力ディレクトリ
operations: 処理操作のリスト
extensions: 処理する拡張子
recursive: サブディレクトリも処理するか
overwrite: 既存ファイルを上書きするか
use_multiprocessing: マルチプロセスを使用するか
Returns:
Dict[str, any]: 処理結果
"""
# 出力ディレクトリの作成
Path(output_dir).mkdir(parents=True, exist_ok=True)
# 処理対象ファイルの収集
files_to_process = self._collect_files(
input_dir, extensions, recursive
)
print(f"処理対象ファイル数: {len(files_to_process)}")
# 処理の開始
start_time = time.time()
# 処理結果の記録
results = {
'total': len(files_to_process),
'processed': 0,
'failed': 0,
'success': 0,
'details': [],
'processing_time': 0
}
if use_multiprocessing and len(files_to_process) > 1:
# マルチプロセス処理
results = self._process_multiprocessing(
files_to_process, output_dir, operations, overwrite
)
else:
# シングルプロセス処理
results = self._process_single(
files_to_process, output_dir, operations, overwrite
)
# 処理時間の計算
results['processing_time'] = time.time() - start_time
# サマリーの表示
self._print_summary(results)
# 結果をJSONファイルに保存
self._save_results(results, output_dir)
return results
def _collect_files(
self,
directory: str,
extensions: Tuple[str, ...],
recursive: bool
) -> List[Tuple[str, str]]:
"""処理対象ファイルを収集"""
files = []
if recursive:
# 再帰的にファイルを収集
for root, _, filenames in os.walk(directory):
for filename in filenames:
if filename.lower().endswith(extensions):
input_path = os.path.join(root, filename)
# 相対パスを維持
rel_path = os.path.relpath(root, directory)
output_subdir = rel_path if rel_path != '.' else ''
files.append((input_path, output_subdir))
else:
# 指定ディレクトリのみ
for filename in os.listdir(directory):
filepath = os.path.join(directory, filename)
if os.path.isfile(filepath) and filename.lower().endswith(extensions):
files.append((filepath, ''))
return files
def _process_multiprocessing(
self,
files: List[Tuple[str, str]],
output_dir: str,
operations: List[Dict[str, any]],
overwrite: bool
) -> Dict[str, any]:
"""マルチプロセスで処理"""
results = {
'total': len(files),
'processed': 0,
'failed': 0,
'success': 0,
'details': []
}
with ProcessPoolExecutor(max_workers=self.max_workers) as executor:
# 各ファイルの処理タスクを作成
future_to_file = {
executor.submit(
self._process_single_file,
input_path,
output_subdir,
output_dir,
operations,
overwrite
): (input_path, output_subdir)
for input_path, output_subdir in files
}
# タスクの完了を待機
for future in as_completed(future_to_file):
input_path, output_subdir = future_to_file[future]
try:
result = future.result()
results['details'].append(result)
results['processed'] += 1
if result['status'] == 'success':
results['success'] += 1
print(f"✓ {os.path.basename(input_path)}")
else:
results['failed'] += 1
print(f"✗ {os.path.basename(input_path)}: {result['error']}")
except Exception as e:
results['failed'] += 1
results['processed'] += 1
error_result = {
'input_path': input_path,
'output_path': '',
'status': 'failed',
'error': str(e)
}
results['details'].append(error_result)
print(f"✗ {os.path.basename(input_path)}: {e}")
return results
def _process_single(
self,
files: List[Tuple[str, str]],
output_dir: str,
operations: List[Dict[str, any]],
overwrite: bool
) -> Dict[str, any]:
"""シングルプロセスで処理"""
results = {
'total': len(files),
'processed': 0,
'failed': 0,
'success': 0,
'details': []
}
for input_path, output_subdir in files:
try:
result = self._process_single_file(
input_path, output_subdir, output_dir, operations, overwrite
)
results['details'].append(result)
results['processed'] += 1
if result['status'] == 'success':
results['success'] += 1
print(f"✓ {os.path.basename(input_path)}")
else:
results['failed'] += 1
print(f"✗ {os.path.basename(input_path)}: {result['error']}")
except Exception as e:
results['failed'] += 1
results['processed'] += 1
error_result = {
'input_path': input_path,
'output_path': '',
'status': 'failed',
'error': str(e)
}
results['details'].append(error_result)
print(f"✗ {os.path.basename(input_path)}: {e}")
return results
def _process_single_file(
self,
input_path: str,
output_subdir: str,
output_dir: str,
operations: List[Dict[str, any]],
overwrite: bool
) -> Dict[str, any]:
"""単一ファイルを処理"""
try:
# 出力パスの構築
filename = os.path.basename(input_path)
# 出力サブディレクトリの作成
if output_subdir:
final_output_dir = os.path.join(output_dir, output_subdir)
Path(final_output_dir).mkdir(parents=True, exist_ok=True)
else:
final_output_dir = output_dir
output_path = os.path.join(final_output_dir, filename)
# 既存ファイルのチェック
if not overwrite and os.path.exists(output_path):
return {
'input_path': input_path,
'output_path': output_path,
'status': 'skipped',
'error': 'ファイルが既に存在します'
}
# 画像を開く
with Image.open(input_path) as img:
# 各操作を適用
processed_img = img.copy()
for operation in operations:
processed_img = self._apply_operation(
processed_img, operation
)
# 画像を保存
processed_img.save(output_path, optimize=True)
# 元ファイルと処理後ファイルの情報を記録
input_size = os.path.getsize(input_path)
output_size = os.path.getsize(output_path)
return {
'input_path': input_path,
'output_path': output_path,
'status': 'success',
'input_size': input_size,
'output_size': output_size,
'size_reduction': (input_size - output_size) / input_size if input_size > 0 else 0,
'operations_applied': [op['name'] for op in operations]
}
except Exception as e:
return {
'input_path': input_path,
'output_path': '',
'status': 'failed',
'error': str(e)
}
def _apply_operation(self, image: Image.Image, operation: Dict[str, any]) -> Image.Image:
"""画像に操作を適用"""
op_name = operation['name']
params = operation.get('params', {})
if op_name == 'resize':
return self._resize_image(image, **params)
elif op_name == 'convert':
return self._convert_image(image, **params)
elif op_name == 'rotate':
return self._rotate_image(image, **params)
elif op_name == 'crop':
return self._crop_image(image, **params)
elif op_name == 'filter':
return self._apply_filter(image, **params)
elif op_name == 'adjust':
return self._adjust_image(image, **params)
elif op_name == 'auto_orient':
return self._auto_orient_image(image)
else:
raise ValueError(f"未知の操作: {op_name}")
def _resize_image(
self,
image: Image.Image,
width: Optional[int] = None,
height: Optional[int] = None,
max_size: Optional[Tuple[int, int]] = None,
keep_aspect: bool = True
) -> Image.Image:
"""画像をリサイズ"""
if max_size:
max_width, max_height = max_size
image.thumbnail((max_width, max_height), Image.Resampling.LANCZOS)
return image
elif width and height:
if keep_aspect:
image.thumbnail((width, height), Image.Resampling.LANCZOS)
return image
else:
return image.resize((width, height), Image.Resampling.LANCZOS)
elif width:
aspect_ratio = image.width / image.height
new_height = int(width / aspect_ratio)
return image.resize((width, new_height), Image.Resampling.LANCZOS)
elif height:
aspect_ratio = image.width / image.height
new_width = int(height * aspect_ratio)
return image.resize((new_width, height), Image.Resampling.LANCZOS)
else:
return image
def _convert_image(
self,
image: Image.Image,
format: str = 'JPEG',
quality: int = 85,
optimize: bool = True
) -> Image.Image:
"""画像形式を変換"""
# メモリ上で変換して返す
from io import BytesIO
buffer = BytesIO()
if format.upper() == 'JPEG' and image.mode in ('RGBA', 'LA', 'P'):
# JPEGはアルファチャンネルに対応していない
background = Image.new('RGB', image.size, (255, 255, 255))
if image.mode == 'RGBA':
background.paste(image, mask=image.split()[3])
else:
background.paste(image)
image = background
image.save(buffer, format=format, quality=quality, optimize=optimize)
buffer.seek(0)
return Image.open(buffer)
def _rotate_image(
self,
image: Image.Image,
angle: int,
expand: bool = True
) -> Image.Image:
"""画像を回転"""
return image.rotate(angle, expand=expand, resample=Image.Resampling.BICUBIC)
def _crop_image(
self,
image: Image.Image,
left: int,
top: int,
right: int,
bottom: int
) -> Image.Image:
"""画像をトリミング"""
return image.crop((left, top, right, bottom))
def _apply_filter(
self,
image: Image.Image,
filter_type: str
) -> Image.Image:
"""フィルターを適用"""
from PIL import ImageFilter
filters = {
'blur': ImageFilter.BLUR,
'contour': ImageFilter.CONTOUR,
'detail': ImageFilter.DETAIL,
'edge_enhance': ImageFilter.EDGE_ENHANCE,
'emboss': ImageFilter.EMBOSS,
'find_edges': ImageFilter.FIND_EDGES,
'sharpen': ImageFilter.SHARPEN,
'smooth': ImageFilter.SMOOTH,
}
if filter_type in filters:
return image.filter(filters[filter_type])
else:
return image
def _adjust_image(
self,
image: Image.Image,
brightness: Optional[float] = None,
contrast: Optional[float] = None,
saturation: Optional[float] = None,
sharpness: Optional[float] = None
) -> Image.Image:
"""画像を調整"""
from PIL import ImageEnhance
result = image
if brightness is not None:
enhancer = ImageEnhance.Brightness(result)
result = enhancer.enhance(brightness)
if contrast is not None:
enhancer = ImageEnhance.Contrast(result)
result = enhancer.enhance(contrast)
if saturation is not None and result.mode == 'RGB':
enhancer = ImageEnhance.Color(result)
result = enhancer.enhance(saturation)
if sharpness is not None:
enhancer = ImageEnhance.Sharpness(result)
result = enhancer.enhance(sharpness)
return result
def _auto_orient_image(self, image: Image.Image) -> Image.Image:
"""EXIF情報に基づいて自動回転"""
try:
return ImageOps.exif_transpose(image)
except:
return image
def _print_summary(self, results: Dict[str, any]):
"""処理結果のサマリーを表示"""
print("\n" + "="*50)
print("バッチ処理 サマリー")
print("="*50)
print(f"総ファイル数: {results['total']}")
print(f"処理済み: {results['processed']}")
print(f"成功: {results['success']}")
print(f"失敗: {results['failed']}")
if results['success'] > 0:
# サイズ削減の統計
size_reductions = [
r['size_reduction']
for r in results['details']
if r['status'] == 'success' and 'size_reduction' in r
]
if size_reductions:
avg_reduction = sum(size_reductions) / len(size_reductions) * 100
print(f"平均サイズ削減率: {avg_reduction:.1f}%")
print(f"処理時間: {results['processing_time']:.2f}秒")
print(f"ファイルあたり平均時間: {results['processing_time']/max(results['processed'], 1):.3f}秒")
print("="*50)
def _save_results(self, results: Dict[str, any], output_dir: str):
"""結果をJSONファイルに保存"""
result_file = os.path.join(output_dir, 'batch_processing_results.json')
# datetimeオブジェクトを文字列に変換
import datetime
def json_serializer(obj):
if isinstance(obj, datetime.datetime):
return obj.isoformat()
raise TypeError(f"Type {type(obj)} not serializable")
with open(result_file, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2, default=json_serializer, ensure_ascii=False)
print(f"結果を '{result_file}' に保存しました。")
# 使用例
def main():
"""バッチ処理の使用例"""
# プロセッサの作成
processor = BatchImageProcessor(max_workers=4)
# 処理操作の定義
operations = [
{
'name': 'resize',
'params': {
'max_size': (1200, 1200),
'keep_aspect': True
}
},
{
'name': 'convert',
'params': {
'format': 'JPEG',
'quality': 85,
'optimize': True
}
},
{
'name': 'auto_orient',
'params': {}
}
]
# バッチ処理の実行
results = processor.process_directory(
input_dir='input_photos',
output_dir='output_photos',
operations=operations,
extensions=('.jpg', '.jpeg', '.png'),
recursive=True,
overwrite=False,
use_multiprocessing=True
)
# 詳細結果の表示
if results['failed'] > 0:
print("\n失敗したファイル:")
for detail in results['details']:
if detail['status'] == 'failed':
print(f" {os.path.basename(detail['input_path'])}: {detail['error']}")
if __name__ == "__main__":
main()
問題7:画像の圧縮品質調整
from PIL import Image, ImageOps
import os
from typing import Optional, Dict, Any, Tuple, List
from dataclasses import dataclass
from pathlib import Path
import hashlib
import json
@dataclass
class CompressionResult:
"""圧縮結果のデータクラス"""
input_path: str
output_path: str
original_size: int
compressed_size: int
quality: int
compression_ratio: float
estimated_saving: float
status: str # 'success', 'skipped', 'failed'
error_message: Optional[str] = None
@property
def size_reduction_percent(self) -> float:
"""サイズ削減率(%)"""
if self.original_size == 0:
return 0.0
return (1 - self.compressed_size / self.original_size) * 100
@property
def saving_kb(self) -> float:
"""節約容量(KB)"""
return (self.original_size - self.compressed_size) / 1024
def to_dict(self) -> Dict[str, Any]:
"""辞書形式に変換"""
return {
'input_path': self.input_path,
'output_path': self.output_path,
'original_size': self.original_size,
'compressed_size': self.compressed_size,
'quality': self.quality,
'compression_ratio': self.compression_ratio,
'size_reduction_percent': self.size_reduction_percent,
'saving_kb': self.saving_kb,
'status': self.status,
'error_message': self.error_message
}
class ImageCompressor:
"""画像圧縮クラス"""
# 圧縮設定のプリセット
PRESETS = {
'maximum': {
'quality': 95,
'optimize': True,
'progressive': False
},
'high': {
'quality': 85,
'optimize': True,
'progressive': False
},
'medium': {
'quality': 75,
'optimize': True,
'progressive': True
},
'low': {
'quality': 65,
'optimize': True,
'progressive': True
},
'web': {
'quality': 80,
'optimize': True,
'progressive': True
},
'mobile': {
'quality': 70,
'optimize': True,
'progressive': True
}
}
# フォーマットごとの最適設定
FORMAT_SETTINGS = {
'JPEG': {
'extensions': ['.jpg', '.jpeg', '.jpe'],
'default_quality': 85,
'supports_progressive': True
},
'PNG': {
'extensions': ['.png'],
'default_quality': None, # PNGは品質設定なし
'supports_progressive': False,
'optimize': True
},
'WEBP': {
'extensions': ['.webp'],
'default_quality': 80,
'supports_progressive': False,
'lossless': False
}
}
def __init__(self, cache_dir: Optional[str] = None):
"""
Args:
cache_dir: キャッシュディレクトリ
"""
self.cache_dir = cache_dir
if cache_dir:
Path(cache_dir).mkdir(parents=True, exist_ok=True)
# 圧縮統計
self.stats = {
'total_files': 0,
'processed': 0,
'skipped': 0,
'failed': 0,
'total_original_size': 0,
'total_compressed_size': 0,
'total_saving_kb': 0
}
def compress_image(
self,
input_path: str,
output_path: Optional[str] = None,
quality: Optional[int] = None,
preset: Optional[str] = None,
max_width: Optional[int] = None,
max_height: Optional[int] = None,
convert_to_webp: bool = False,
preserve_metadata: bool = True,
overwrite: bool = False
) -> CompressionResult:
"""
単一画像を圧縮
Args:
input_path: 入力画像パス
output_path: 出力画像パス
quality: 圧縮品質 (1-100)
preset: プリセット名 ('maximum', 'high', 'medium', 'low', 'web', 'mobile')
max_width: 最大幅
max_height: 最大高さ
convert_to_webp: WebP形式に変換するか
preserve_metadata: メタデータを保持するか
overwrite: 既存ファイルを上書きするか
Returns:
CompressionResult: 圧縮結果
"""
self.stats['total_files'] += 1
try:
# 出力パスの決定
if output_path is None:
if convert_to_webp:
base = os.path.splitext(input_path)[0]
output_path = f"{base}.webp"
else:
base, ext = os.path.splitext(input_path)
output_path = f"{base}_compressed{ext}"
# 既存ファイルのチェック
if not overwrite and os.path.exists(output_path):
original_size = os.path.getsize(input_path)
return CompressionResult(
input_path=input_path,
output_path=output_path,
original_size=original_size,
compressed_size=original_size,
quality=quality or 0,
compression_ratio=1.0,
estimated_saving=0.0,
status='skipped',
error_message='出力ファイルが既に存在します'
)
# 画像を開く
with Image.open(input_path) as img:
# 元のサイズを保存
original_size = os.path.getsize(input_path)
# リサイズが必要な場合
if max_width or max_height:
img = self._resize_image(img, max_width, max_height)
# 画像形式の決定
output_format = self._determine_output_format(
input_path, output_path, convert_to_webp
)
# 圧縮設定の取得
compress_params = self._get_compression_params(
output_format, quality, preset
)
# メタデータの保持
metadata = img.info if preserve_metadata else {}
# 保存
img.save(
output_path,
format=output_format,
**compress_params,
**metadata
)
# 圧縮後のサイズ
compressed_size = os.path.getsize(output_path)
# 結果の計算
compression_ratio = compressed_size / original_size if original_size > 0 else 1.0
estimated_saving = original_size - compressed_size
# 統計の更新
self.stats['processed'] += 1
self.stats['total_original_size'] += original_size
self.stats['total_compressed_size'] += compressed_size
self.stats['total_saving_kb'] += estimated_saving / 1024
return CompressionResult(
input_path=input_path,
output_path=output_path,
original_size=original_size,
compressed_size=compressed_size,
quality=compress_params.get('quality', 0),
compression_ratio=compression_ratio,
estimated_saving=estimated_saving,
status='success'
)
except Exception as e:
self.stats['failed'] += 1
original_size = os.path.getsize(input_path) if os.path.exists(input_path) else 0
return CompressionResult(
input_path=input_path,
output_path=output_path or '',
original_size=original_size,
compressed_size=0,
quality=quality or 0,
compression_ratio=0.0,
estimated_saving=0.0,
status='failed',
error_message=str(e)
)
def compress_directory(
self,
input_dir: str,
output_dir: str,
quality: Optional[int] = None,
preset: Optional[str] = None,
max_width: Optional[int] = None,
max_height: Optional[int] = None,
convert_to_webp: bool = False,
extensions: Optional[List[str]] = None,
recursive: bool = False,
overwrite: bool = False
) -> List[CompressionResult]:
"""
ディレクトリ内の画像を一括圧縮
Args:
input_dir: 入力ディレクトリ
output_dir: 出力ディレクトリ
quality: 圧縮品質
preset: プリセット名
max_width: 最大幅
max_height: 最大高さ
convert_to_webp: WebP形式に変換するか
extensions: 処理する拡張子
recursive: サブディレクトリも処理するか
overwrite: 既存ファイルを上書きするか
Returns:
List[CompressionResult]: 圧縮結果のリスト
"""
# 出力ディレクトリの作成
Path(output_dir).mkdir(parents=True, exist_ok=True)
# 拡張子の設定
if extensions is None:
extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff']
# ファイルの収集
files_to_process = self._collect_image_files(
input_dir, extensions, recursive
)
print(f"圧縮対象ファイル数: {len(files_to_process)}")
results = []
for input_path, rel_path in files_to_process:
# 出力パスの構築
filename = os.path.basename(input_path)
if convert_to_webp:
filename = os.path.splitext(filename)[0] + '.webp'
# サブディレクトリ構造の維持
if rel_path:
output_subdir = os.path.join(output_dir, rel_path)
Path(output_subdir).mkdir(parents=True, exist_ok=True)
output_path = os.path.join(output_subdir, filename)
else:
output_path = os.path.join(output_dir, filename)
# 圧縮実行
result = self.compress_image(
input_path=input_path,
output_path=output_path,
quality=quality,
preset=preset,
max_width=max_width,
max_height=max_height,
convert_to_webp=convert_to_webp,
preserve_metadata=True,
overwrite=overwrite
)
results.append(result)
# 進捗表示
self._print_progress(result)
# サマリー表示
self._print_summary()
return results
def smart_compress(
self,
input_path: str,
output_path: Optional[str] = None,
target_size_kb: Optional[float] = None,
target_quality: Optional[int] = None,
min_quality: int = 40,
max_quality: int = 95,
max_iterations: int = 10
) -> CompressionResult:
"""
ターゲットサイズを達成するようにスマート圧縮
Args:
input_path: 入力画像パス
output_path: 出力画像パス
target_size_kb: 目標ファイルサイズ(KB)
target_quality: 目標品質
min_quality: 最小品質
max_quality: 最大品質
max_iterations: 最大試行回数
Returns:
CompressionResult: 圧縮結果
"""
if output_path is None:
base, ext = os.path.splitext(input_path)
output_path = f"{base}_smart{ext}"
original_size = os.path.getsize(input_path)
# ターゲットサイズの計算
if target_size_kb:
target_size_bytes = target_size_kb * 1024
if original_size <= target_size_bytes:
# 既にターゲットサイズ以下
return CompressionResult(
input_path=input_path,
output_path=output_path,
original_size=original_size,
compressed_size=original_size,
quality=95,
compression_ratio=1.0,
estimated_saving=0.0,
status='skipped',
error_message='既にターゲットサイズ以下です'
)
# バイナリサーチで最適な品質を探す
low = min_quality
high = max_quality
best_result = None
for iteration in range(max_iterations):
if low > high:
break
mid = (low + high) // 2
# 一時ファイルで圧縮を試みる
temp_path = f"{output_path}.temp"
result = self.compress_image(
input_path=input_path,
output_path=temp_path,
quality=mid,
overwrite=True
)
if result.status != 'success':
# 失敗した場合は低品質側を探す
high = mid - 1
continue
# ターゲットサイズがある場合
if target_size_kb:
compressed_size_kb = result.compressed_size / 1024
if abs(compressed_size_kb - target_size_kb) < target_size_kb * 0.05:
# 目標サイズに近い
best_result = result
break
elif compressed_size_kb > target_size_kb:
# まだ大きいので品質を下げる
high = mid - 1
else:
# 小さいので品質を上げる
low = mid + 1
best_result = result
# ターゲット品質がある場合
elif target_quality:
if mid == target_quality:
best_result = result
break
elif mid < target_quality:
low = mid + 1
else:
high = mid - 1
best_result = result
else:
# デフォルト:高品質を維持
best_result = result
break
# 最適な結果が見つかったら、正式な出力ファイルに保存
if best_result and best_result.status == 'success':
# 一時ファイルを正式ファイルに移動
if os.path.exists(temp_path):
os.replace(temp_path, output_path)
best_result.output_path = output_path
return best_result
# 見つからなかった場合は最高品質で保存
return self.compress_image(
input_path=input_path,
output_path=output_path,
quality=max_quality
)
def _resize_image(
self,
img: Image.Image,
max_width: Optional[int],
max_height: Optional[int]
) -> Image.Image:
"""画像をリサイズ"""
if max_width or max_height:
if not max_width:
max_width = img.width
if not max_height:
max_height = img.height
img.thumbnail((max_width, max_height), Image.Resampling.LANCZOS)
return img
def _determine_output_format(
self,
input_path: str,
output_path: str,
convert_to_webp: bool
) -> str:
"""出力フォーマットを決定"""
if convert_to_webp:
return 'WEBP'
# 出力パスの拡張子から判断
ext = os.path.splitext(output_path)[1].lower()
if ext in ['.jpg', '.jpeg', '.jpe']:
return 'JPEG'
elif ext == '.png':
return 'PNG'
elif ext == '.webp':
return 'WEBP'
elif ext == '.gif':
return 'GIF'
elif ext in ['.tiff', '.tif']:
return 'TIFF'
elif ext == '.bmp':
return 'BMP'
else:
# デフォルトは入力フォーマットを維持
with Image.open(input_path) as img:
return img.format or 'JPEG'
def _get_compression_params(
self,
format: str,
quality: Optional[int],
preset: Optional[str]
) -> Dict[str, Any]:
"""圧縮パラメータを取得"""
# プリセットの適用
if preset and preset in self.PRESETS:
params = self.PRESETS[preset].copy()
else:
params = {}
# フォーマット固有の設定
if format in ['JPEG', 'WEBP']:
if quality is not None:
params['quality'] = max(1, min(100, quality))
elif 'quality' not in params:
params['quality'] = self.FORMAT_SETTINGS.get(format, {}).get('default_quality', 85)
if format == 'JPEG' and 'progressive' not in params:
params['progressive'] = True
elif format == 'PNG':
if 'optimize' not in params:
params['optimize'] = True
return params
def _collect_image_files(
self,
directory: str,
extensions: List[str],
recursive: bool
) -> List[Tuple[str, str]]:
"""画像ファイルを収集"""
files = []
if recursive:
for root, _, filenames in os.walk(directory):
for filename in filenames:
if any(filename.lower().endswith(ext) for ext in extensions):
input_path = os.path.join(root, filename)
rel_path = os.path.relpath(root, directory)
if rel_path == '.':
rel_path = ''
files.append((input_path, rel_path))
else:
for filename in os.listdir(directory):
if any(filename.lower().endswith(ext) for ext in extensions):
input_path = os.path.join(directory, filename)
if os.path.isfile(input_path):
files.append((input_path, ''))
return files
def _print_progress(self, result: CompressionResult):
"""進捗を表示"""
if result.status == 'success':
reduction = result.size_reduction_percent
saving = result.saving_kb
if reduction > 0:
print(f"✓ {os.path.basename(result.input_path)}: "
f"{reduction:.1f}%削減 ({saving:.1f}KB節約)")
else:
print(f"✓ {os.path.basename(result.input_path)}: サイズ変更なし")
elif result.status == 'skipped':
print(f"⏭️ {os.path.basename(result.input_path)}: スキップ")
elif result.status == 'failed':
print(f"✗ {os.path.basename(result.input_path)}: 失敗 - {result.error_message}")
def _print_summary(self):
"""サマリーを表示"""
print("\n" + "="*60)
print("圧縮サマリー")
print("="*60)
print(f"総ファイル数: {self.stats['total_files']}")
print(f"処理済み: {self.stats['processed']}")
print(f"スキップ: {self.stats['skipped']}")
print(f"失敗: {self.stats['failed']}")
if self.stats['processed'] > 0:
total_saving_mb = self.stats['total_saving_kb'] / 1024
total_reduction = (
(self.stats['total_original_size'] - self.stats['total_compressed_size']) /
self.stats['total_original_size'] * 100
if self.stats['total_original_size'] > 0 else 0
)
print(f"合計節約容量: {total_saving_mb:.2f} MB")
print(f"合計サイズ削減率: {total_reduction:.1f}%")
print("="*60)
# 使用例
def main():
"""画像圧縮の使用例"""
# 圧縮器の作成
compressor = ImageCompressor()
print("=== 単一画像の圧縮 ===")
# 高品質圧縮
result1 = compressor.compress_image(
input_path="large_photo.jpg",
output_path="large_photo_compressed.jpg",
quality=85,
max_width=1920,
preserve_metadata=True
)
print(f"結果: {result1.status}")
if result1.status == 'success':
print(f" 削減率: {result1.size_reduction_percent:.1f}%")
print(f" 節約容量: {result1.saving_kb:.1f}KB")
print("\n=== プリセットを使用した圧縮 ===")
# プリセットを使用
result2 = compressor.compress_image(
input_path="photo.png",
output_path="photo_web.jpg",
preset='web',
convert_to_webp=True
)
print(f"結果: {result2.status}")
print("\n=== スマート圧縮 ===")
# ターゲットサイズを指定したスマート圧縮
result3 = compressor.smart_compress(
input_path="large_file.jpg",
target_size_kb=500, # 500KB以下を目標
min_quality=40,
max_quality=95
)
print(f"結果: {result3.status}")
if result3.status == 'success':
compressed_kb = result3.compressed_size / 1024
print(f" 圧縮後サイズ: {compressed_kb:.1f}KB")
print(f" 使用品質: {result3.quality}")
print("\n=== ディレクトリの一括圧縮 ===")
# ディレクトリ内の全画像をWebPに変換して圧縮
results = compressor.compress_directory(
input_dir="images",
output_dir="images_compressed",
preset='web',
convert_to_webp=True,
max_width=1600,
recursive=True,
overwrite=False
)
if __name__ == "__main__":
main()
問題8:画像のトリミング機能
from PIL import Image, ImageDraw
import os
from typing import Tuple, Optional, Union, List
from dataclasses import dataclass
import math
@dataclass
class CropArea:
"""トリミング領域のデータクラス"""
left: int
top: int
right: int
bottom: int
@property
def width(self) -> int:
"""領域の幅"""
return self.right - self.left
@property
def height(self) -> int:
"""領域の高さ"""
return self.bottom - self.top
@property
def center(self) -> Tuple[int, int]:
"""領域の中心点"""
return (
(self.left + self.right) // 2,
(self.top + self.bottom) // 2
)
def validate(self, image_width: int, image_height: int) -> bool:
"""領域が画像内に収まっているか検証"""
return (
self.left >= 0 and
self.top >= 0 and
self.right <= image_width and
self.bottom <= image_height and
self.left < self.right and
self.top < self.bottom
)
def to_tuple(self) -> Tuple[int, int, int, int]:
"""タプル形式に変換"""
return (self.left, self.top, self.right, self.bottom)
@classmethod
def from_center(
cls,
center_x: int,
center_y: int,
width: int,
height: int
) -> 'CropArea':
"""中心点とサイズから領域を作成"""
left = center_x - width // 2
top = center_y - height // 2
right = left + width
bottom = top + height
return cls(left, top, right, bottom)
@classmethod
def from_percentage(
cls,
image_width: int,
image_height: int,
left_pct: float,
top_pct: float,
right_pct: float,
bottom_pct: float
) -> 'CropArea':
"""パーセンテージから領域を作成"""
left = int(image_width * left_pct)
top = int(image_height * top_pct)
right = int(image_width * right_pct)
bottom = int(image_height * bottom_pct)
return cls(left, top, right, bottom)
class ImageCropper:
"""画像トリミングクラス"""
def __init__(self, show_preview: bool = False):
"""
Args:
show_preview: プレビューを表示するか
"""
self.show_preview = show_preview
def crop_image(
self,
image_path: str,
output_path: Optional[str] = None,
crop_area: Optional[CropArea] = None,
coordinates: Optional[Tuple[int, int, int, int]] = None,
percentage: Optional[Tuple[float, float, float, float]] = None,
size: Optional[Tuple[int, int]] = None,
position: str = "center",
aspect_ratio: Optional[float] = None,
margin: int = 0,
auto_crop_white_space: bool = False,
preview_color: Tuple[int, int, int, int] = (255, 0, 0, 128)
) -> bool:
"""
画像をトリミング
Args:
image_path: 入力画像パス
output_path: 出力画像パス
crop_area: トリミング領域(直接指定)
coordinates: 座標で指定 (left, top, right, bottom)
percentage: パーセンテージで指定 (left%, top%, right%, bottom%)
size: サイズで指定 (width, height)
position: 位置 ("center", "top-left", "top-right", "bottom-left", "bottom-right", "custom")
aspect_ratio: アスペクト比(width/height)
margin: 余白(ピクセル)
auto_crop_white_space: 余白を自動で切り取る
preview_color: プレビュー色 (R, G, B, A)
Returns:
bool: 成功したかどうか
"""
try:
# 出力パスの決定
if output_path is None:
base, ext = os.path.splitext(image_path)
output_path = f"{base}_cropped{ext}"
# 画像を開く
with Image.open(image_path) as img:
original_width, original_height = img.size
# トリミング領域の決定
if crop_area:
final_area = crop_area
elif coordinates:
final_area = CropArea(*coordinates)
elif percentage:
final_area = CropArea.from_percentage(
original_width, original_height, *percentage
)
elif size:
final_area = self._calculate_crop_area_by_size(
img, size, position, aspect_ratio
)
elif auto_crop_white_space:
final_area = self._auto_crop_white_space(img)
else:
raise ValueError("トリミング領域が指定されていません")
# 余白の追加
if margin > 0:
final_area = self._add_margin(final_area, margin, original_width, original_height)
# 領域の検証
if not final_area.validate(original_width, original_height):
raise ValueError(f"トリミング領域が画像の範囲外です: {final_area}")
# プレビューの表示
if self.show_preview:
preview_img = self._create_preview_image(
img.copy(), final_area, preview_color
)
preview_path = f"{os.path.splitext(output_path)[0]}_preview.png"
preview_img.save(preview_path)
print(f"プレビューを保存: {preview_path}")
# トリミング実行
cropped_img = img.crop(final_area.to_tuple())
# 保存
cropped_img.save(output_path, optimize=True)
print(f"画像をトリミングしました:")
print(f" 元サイズ: {original_width}x{original_height}")
print(f" トリミング領域: {final_area.width}x{final_area.height}")
print(f" 保存先: {output_path}")
return True
except Exception as e:
print(f"エラー: {str(e)}")
return False
def crop_to_square(
self,
image_path: str,
output_path: Optional[str] = None,
position: str = "center",
size: Optional[int] = None
) -> bool:
"""
正方形にトリミング
Args:
image_path: 入力画像パス
output_path: 出力画像パス
position: 位置 ("center", "top", "bottom", "left", "right")
size: 出力サイズ(Noneの場合は短辺サイズ)
Returns:
bool: 成功したかどうか
"""
try:
with Image.open(image_path) as img:
width, height = img.size
# 正方形のサイズを決定
square_size = min(width, height)
if size and size <= min(width, height):
square_size = size
# 位置に基づいて領域を計算
if position == "center":
left = (width - square_size) // 2
top = (height - square_size) // 2
elif position == "top":
left = (width - square_size) // 2
top = 0
elif position == "bottom":
left = (width - square_size) // 2
top = height - square_size
elif position == "left":
left = 0
top = (height - square_size) // 2
elif position == "right":
left = width - square_size
top = (height - square_size) // 2
else:
left = (width - square_size) // 2
top = (height - square_size) // 2
right = left + square_size
bottom = top + square_size
crop_area = CropArea(left, top, right, bottom)
return self.crop_image(
image_path=image_path,
output_path=output_path,
crop_area=crop_area
)
except Exception as e:
print(f"エラー: {str(e)}")
return False
def crop_to_circle(
self,
image_path: str,
output_path: Optional[str] = None,
diameter: Optional[int] = None,
border_width: int = 0,
border_color: Tuple[int, int, int, int] = (255, 255, 255, 255)
) -> bool:
"""
円形にトリミング
Args:
image_path: 入力画像パス
output_path: 出力画像パス
diameter: 直径(Noneの場合は短辺)
border_width: 枠線の幅
border_color: 枠線の色
Returns:
bool: 成功したかどうか
"""
try:
# 出力パスの決定
if output_path is None:
base, ext = os.path.splitext(image_path)
output_path = f"{base}_circle.png" # PNGで保存(透過対応)
with Image.open(image_path) as img:
# RGBAに変換(透過対応)
if img.mode != 'RGBA':
img = img.convert('RGBA')
width, height = img.size
# 直径の決定
if diameter is None:
diameter = min(width, height)
# 正方形にトリミング
square_size = diameter
left = (width - square_size) // 2
top = (height - square_size) // 2
square_img = img.crop((left, top, left + square_size, top + square_size))
# マスクを作成
mask = Image.new('L', (square_size, square_size), 0)
draw = ImageDraw.Draw(mask)
# 円を描画
draw.ellipse((0, 0, square_size, square_size), fill=255)
# 枠線を追加
if border_width > 0:
# 円形の枠線
draw.ellipse(
(border_width, border_width,
square_size - border_width, square_size - border_width),
fill=0
)
# マスクを適用
result = Image.new('RGBA', (square_size, square_size), (0, 0, 0, 0))
result.paste(square_img, (0, 0), mask)
# 枠線を描画
if border_width > 0:
border_img = Image.new('RGBA', (square_size, square_size), (0, 0, 0, 0))
border_draw = ImageDraw.Draw(border_img)
border_draw.ellipse(
(0, 0, square_size, square_size),
outline=border_color,
width=border_width
)
# 枠線を合成
result = Image.alpha_composite(result, border_img)
# 保存
result.save(output_path, 'PNG')
print(f"円形にトリミングしました: {output_path}")
return True
except Exception as e:
print(f"エラー: {str(e)}")
return False
def crop_multiple(
self,
image_path: str,
output_dir: str,
grid: Tuple[int, int], # (rows, columns)
overlap: int = 0,
output_size: Optional[Tuple[int, int]] = None
) -> List[str]:
"""
画像をグリッド状に分割
Args:
image_path: 入力画像パス
output_dir: 出力ディレクトリ
grid: グリッド分割 (行数, 列数)
overlap: 重なり(ピクセル)
output_size: 出力サイズ (Noneの場合は分割サイズを維持)
Returns:
List[str]: 出力ファイルパスのリスト
"""
try:
os.makedirs(output_dir, exist_ok=True)
with Image.open(image_path) as img:
width, height = img.size
rows, cols = grid
# 各セルのサイズを計算
cell_width = width // cols
cell_height = height // rows
output_files = []
for row in range(rows):
for col in range(cols):
# セルの領域を計算(重なりを考慮)
left = max(0, col * cell_width - overlap)
top = max(0, row * cell_height - overlap)
right = min(width, (col + 1) * cell_width + overlap)
bottom = min(height, (row + 1) * cell_height + overlap)
crop_area = CropArea(left, top, right, bottom)
# トリミング
cell_img = img.crop(crop_area.to_tuple())
# リサイズ
if output_size:
cell_img = cell_img.resize(output_size, Image.Resampling.LANCZOS)
# 保存
filename = f"grid_{row}_{col}_{os.path.basename(image_path)}"
output_path = os.path.join(output_dir, filename)
cell_img.save(output_path, optimize=True)
output_files.append(output_path)
print(f"画像を {rows}x{cols} のグリッドに分割しました")
print(f"出力ファイル数: {len(output_files)}")
return output_files
except Exception as e:
print(f"エラー: {str(e)}")
return []
def _calculate_crop_area_by_size(
self,
img: Image.Image,
size: Tuple[int, int],
position: str,
aspect_ratio: Optional[float]
) -> CropArea:
"""サイズと位置からトリミング領域を計算"""
width, height = img.size
crop_width, crop_height = size
# アスペクト比を考慮
if aspect_ratio:
if crop_width / crop_height > aspect_ratio:
crop_height = int(crop_width / aspect_ratio)
else:
crop_width = int(crop_height * aspect_ratio)
# 位置に基づいて座標を計算
if position == "center":
left = (width - crop_width) // 2
top = (height - crop_height) // 2
elif position == "top-left":
left = 0
top = 0
elif position == "top-right":
left = width - crop_width
top = 0
elif position == "bottom-left":
left = 0
top = height - crop_height
elif position == "bottom-right":
left = width - crop_width
top = height - crop_height
elif position == "top":
left = (width - crop_width) // 2
top = 0
elif position == "bottom":
left = (width - crop_width) // 2
top = height - crop_height
elif position == "left":
left = 0
top = (height - crop_height) // 2
elif position == "right":
left = width - crop_width
top = (height - crop_height) // 2
else:
# デフォルトは中央
left = (width - crop_width) // 2
top = (height - crop_height) // 2
return CropArea(
left, top,
left + crop_width,
top + crop_height
)
def _auto_crop_white_space(self, img: Image.Image) -> CropArea:
"""余白を自動検出して切り取る"""
# グレースケールに変換
if img.mode != 'L':
gray_img = img.convert('L')
else:
gray_img = img
width, height = gray_img.size
# 閾値を設定(白っぽい部分を検出)
threshold = 240
pixels = gray_img.load()
# 上下左右の余白を検出
left = width
right = 0
top = height
bottom = 0
for y in range(height):
for x in range(width):
if pixels[x, y] < threshold: # 白以外のピクセル
left = min(left, x)
right = max(right, x)
top = min(top, y)
bottom = max(bottom, y)
# 余白が検出されなかった場合は全体を使用
if left > right or top > bottom:
left = 0
top = 0
right = width
bottom = height
# 少しマージンを追加
margin = 5
left = max(0, left - margin)
top = max(0, top - margin)
right = min(width, right + margin)
bottom = min(height, bottom + margin)
return CropArea(left, top, right, bottom)
def _add_margin(
self,
crop_area: CropArea,
margin: int,
image_width: int,
image_height: int
) -> CropArea:
"""トリミング領域に余白を追加"""
return CropArea(
max(0, crop_area.left - margin),
max(0, crop_area.top - margin),
min(image_width, crop_area.right + margin),
min(image_height, crop_area.bottom + margin)
)
def _create_preview_image(
self,
img: Image.Image,
crop_area: CropArea,
preview_color: Tuple[int, int, int, int]
) -> Image.Image:
"""トリミング領域のプレビュー画像を作成"""
# 元の画像をコピー
preview = img.copy()
# RGBAに変換
if preview.mode != 'RGBA':
preview = preview.convert('RGBA')
draw = ImageDraw.Draw(preview)
# トリミング領域を描画
draw.rectangle(
crop_area.to_tuple(),
outline=preview_color,
width=3
)
# 交差点にマークを描画
cross_size = 10
center_x, center_y = crop_area.center
# 水平線
draw.line(
[(center_x - cross_size, center_y), (center_x + cross_size, center_y)],
fill=preview_color,
width=2
)
# 垂直線
draw.line(
[(center_x, center_y - cross_size), (center_x, center_y + cross_size)],
fill=preview_color,
width=2
)
# 寸法を表示
text = f"{crop_area.width}×{crop_area.height}"
draw.text(
(crop_area.left + 5, crop_area.top + 5),
text,
fill=preview_color,
stroke_width=1,
stroke_fill=(0, 0, 0, 255)
)
return preview
# 使用例
def main():
"""トリミング機能の使用例"""
# トリマーの作成
cropper = ImageCropper(show_preview=True)
print("=== 座標指定でのトリミング ===")
# 座標を指定してトリミング
success1 = cropper.crop_image(
image_path="photo.jpg",
output_path="photo_cropped_coords.jpg",
coordinates=(100, 100, 500, 400) # (left, top, right, bottom)
)
print(f"結果: {'成功' if success1 else '失敗'}")
print("\n=== サイズ指定でのトリミング ===")
# サイズと位置を指定してトリミング
success2 = cropper.crop_image(
image_path="photo.jpg",
output_path="photo_cropped_size.jpg",
size=(300, 200), # (width, height)
position="center",
aspect_ratio=16/9 # アスペクト比を固定
)
print(f"結果: {'成功' if success2 else '失敗'}")
print("\n=== 正方形トリミング ===")
# 正方形にトリミング
success3 = cropper.crop_to_square(
image_path="photo.jpg",
output_path="photo_square.jpg",
position="center",
size=400
)
print(f"結果: {'成功' if success3 else '失敗'}")
print("\n=== 円形トリミング ===")
# 円形にトリミング(枠線付き)
success4 = cropper.crop_to_circle(
image_path="photo.jpg",
output_path="photo_circle.png",
diameter=300,
border_width=5,
border_color=(255, 255, 255, 255)
)
print(f"結果: {'成功' if success4 else '失敗'}")
print("\n=== グリッド分割 ===")
# 画像を3x3のグリッドに分割
output_files = cropper.crop_multiple(
image_path="large_image.jpg",
output_dir="grid_parts",
grid=(3, 3),
overlap=10,
output_size=(300, 300)
)
print(f"分割ファイル数: {len(output_files)}")
print("\n=== 余白自動トリミング ===")
# 余白を自動検出して切り取り
success5 = cropper.crop_image(
image_path="document_with_margins.jpg",
output_path="document_cropped.jpg",
auto_crop_white_space=True,
margin=10 # 少し余白を残す
)
print(f"結果: {'成功' if success5 else '失敗'}")
if __name__ == "__main__":
main()
問題9:サムネイルキャッシュシステム
from PIL import Image, ImageOps
import os
import hashlib
import json
import time
from pathlib import Path
from typing import Optional, Dict, Any, Tuple, List
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import shutil
@dataclass
class ThumbnailCacheEntry:
"""サムネイルキャッシュエントリー"""
original_path: str
thumbnail_path: str
size: Tuple[int, int]
quality: int
format: str
created_at: float
last_accessed: float
access_count: int
file_size: int
@property
def age_seconds(self) -> float:
"""作成からの経過時間(秒)"""
return time.time() - self.created_at
@property
def idle_seconds(self) -> float:
"""最後のアクセスからの経過時間(秒)"""
return time.time() - self.last_accessed
def touch(self):
"""アクセス時間を更新"""
self.last_accessed = time.time()
self.access_count += 1
def is_expired(self, max_age_seconds: float) -> bool:
"""キャッシュが期限切れかどうか"""
return self.age_seconds > max_age_seconds
def to_dict(self) -> Dict[str, Any]:
"""辞書形式に変換"""
data = asdict(self)
data['age_seconds'] = self.age_seconds
data['idle_seconds'] = self.idle_seconds
return data
class ThumbnailCache:
"""サムネイルキャッシュシステム"""
def __init__(
self,
cache_dir: str = ".thumbnail_cache",
max_cache_size_mb: int = 500,
max_age_days: int = 30,
cleanup_interval_hours: int = 24
):
"""
Args:
cache_dir: キャッシュディレクトリ
max_cache_size_mb: 最大キャッシュサイズ(MB)
max_age_days: キャッシュの最大保持日数
cleanup_interval_hours: クリーンアップ実行間隔(時間)
"""
self.cache_dir = Path(cache_dir)
self.max_cache_size_bytes = max_cache_size_mb * 1024 * 1024
self.max_age_seconds = max_age_days * 24 * 3600
self.cleanup_interval_seconds = cleanup_interval_hours * 3600
# キャッシュインデックスファイル
self.index_file = self.cache_dir / "cache_index.json"
# キャッシュエントリーの辞書 {cache_key: ThumbnailCacheEntry}
self.entries: Dict[str, ThumbnailCacheEntry] = {}
# キャッシュ統計
self.stats = {
'hits': 0,
'misses': 0,
'created': 0,
'cleaned': 0,
'total_size_bytes': 0
}
# 初期化
self._initialize_cache()
def _initialize_cache(self):
"""キャッシュの初期化"""
# キャッシュディレクトリの作成
self.cache_dir.mkdir(parents=True, exist_ok=True)
# インデックスの読み込み
if self.index_file.exists():
try:
with open(self.index_file, 'r', encoding='utf-8') as f:
data = json.load(f)
for key, entry_data in data.items():
# タプルの変換
if 'size' in entry_data:
entry_data['size'] = tuple(entry_data['size'])
self.entries[key] = ThumbnailCacheEntry(**entry_data)
print(f"キャッシュインデックスを読み込みました: {len(self.entries)} エントリー")
# 統計の更新
self._update_stats()
except Exception as e:
print(f"キャッシュインデックスの読み込みエラー: {e}")
self.entries = {}
else:
print("新規キャッシュを作成します")
# 定期的なクリーンアップの設定
self._schedule_cleanup()
def get_or_create_thumbnail(
self,
image_path: str,
size: Tuple[int, int],
quality: int = 85,
format: str = 'JPEG',
force_regenerate: bool = False
) -> Optional[str]:
"""
サムネイルを取得または作成
Args:
image_path: 元画像のパス
size: サムネイルサイズ (width, height)
quality: 画質 (1-100)
format: 出力フォーマット
force_regenerate: 強制的に再生成するか
Returns:
Optional[str]: サムネイルのパス、失敗時はNone
"""
# キャッシュキーの生成
cache_key = self._generate_cache_key(image_path, size, quality, format)
# キャッシュヒットのチェック
if not force_regenerate and cache_key in self.entries:
entry = self.entries[cache_key]
# 有効性のチェック
if (os.path.exists(entry.thumbnail_path) and
os.path.exists(entry.original_path) and
not entry.is_expired(self.max_age_seconds)):
# アクセス時間を更新
entry.touch()
self.stats['hits'] += 1
print(f"キャッシュヒット: {os.path.basename(image_path)} → {size}")
return entry.thumbnail_path
self.stats['misses'] += 1
# サムネイルの生成
thumbnail_path = self._create_thumbnail(
image_path, size, quality, format, cache_key
)
if thumbnail_path:
# キャッシュエントリーの作成
entry = ThumbnailCacheEntry(
original_path=os.path.abspath(image_path),
thumbnail_path=os.path.abspath(thumbnail_path),
size=size,
quality=quality,
format=format,
created_at=time.time(),
last_accessed=time.time(),
access_count=1,
file_size=os.path.getsize(thumbnail_path)
)
self.entries[cache_key] = entry
self.stats['created'] += 1
self.stats['total_size_bytes'] += entry.file_size
# インデックスの保存
self._save_index()
print(f"キャッシュ作成: {os.path.basename(image_path)} → {size}")
# キャッシュサイズのチェック
if self.stats['total_size_bytes'] > self.max_cache_size_bytes:
self._cleanup_cache()
return thumbnail_path
def get_thumbnail_batch(
self,
image_paths: List[str],
size: Tuple[int, int],
quality: int = 85,
format: str = 'JPEG'
) -> Dict[str, Optional[str]]:
"""
複数画像のサムネイルをバッチ取得
Args:
image_paths: 画像パスのリスト
size: サムネイルサイズ
quality: 画質
format: 出力フォーマット
Returns:
Dict[str, Optional[str]]: {画像パス: サムネイルパス}
"""
results = {}
for image_path in image_paths:
thumbnail_path = self.get_or_create_thumbnail(
image_path, size, quality, format
)
results[image_path] = thumbnail_path
return results
def pregenerate_thumbnails(
self,
image_paths: List[str],
sizes: List[Tuple[int, int]],
qualities: Optional[List[int]] = None,
formats: Optional[List[str]] = None
) -> Dict[str, List[str]]:
"""
サムネイルを事前生成
Args:
image_paths: 画像パスのリスト
sizes: サイズのリスト
qualities: 画質のリスト
formats: フォーマットのリスト
Returns:
Dict[str, List[str]]: {画像パス: [サムネイルパス, ...]}
"""
if qualities is None:
qualities = [85]
if formats is None:
formats = ['JPEG']
results = {}
for image_path in image_paths:
thumbnail_paths = []
for size in sizes:
for quality in qualities:
for format in formats:
thumbnail_path = self.get_or_create_thumbnail(
image_path, size, quality, format
)
if thumbnail_path:
thumbnail_paths.append(thumbnail_path)
results[image_path] = thumbnail_paths
return results
def clear_cache(self, older_than_days: Optional[int] = None):
"""
キャッシュをクリア
Args:
older_than_days: 指定日数より古いキャッシュのみクリア
"""
if older_than_days is None:
# 全クリア
for entry in list(self.entries.values()):
self._remove_cache_entry(entry)
self.entries.clear()
self.stats['total_size_bytes'] = 0
print("キャッシュを完全にクリアしました")
else:
# 古いキャッシュのみクリア
cutoff_time = time.time() - (older_than_days * 24 * 3600)
keys_to_remove = []
for key, entry in self.entries.items():
if entry.created_at < cutoff_time:
self._remove_cache_entry(entry)
keys_to_remove.append(key)
for key in keys_to_remove:
del self.entries[key]
print(f"{len(keys_to_remove)} 個の古いキャッシュをクリアしました")
# インデックスの保存
self._save_index()
def get_cache_info(self) -> Dict[str, Any]:
"""キャッシュ情報を取得"""
total_entries = len(self.entries)
# サイズ別の統計
size_groups = {}
for entry in self.entries.values():
size_key = f"{entry.size[0]}x{entry.size[1]}"
if size_key not in size_groups:
size_groups[size_key] = 0
size_groups[size_key] += 1
# フォーマット別の統計
format_groups = {}
for entry in self.entries.values():
if entry.format not in format_groups:
format_groups[entry.format] = 0
format_groups[entry.format] += 1
return {
'total_entries': total_entries,
'total_size_mb': self.stats['total_size_bytes'] / (1024 * 1024),
'size_groups': size_groups,
'format_groups': format_groups,
'stats': self.stats.copy(),
'hits_ratio': (
self.stats['hits'] / max(self.stats['hits'] + self.stats['misses'], 1)
)
}
def _generate_cache_key(
self,
image_path: str,
size: Tuple[int, int],
quality: int,
format: str
) -> str:
"""キャッシュキーを生成"""
# ファイルの情報を含める
file_stat = os.stat(image_path)
file_info = f"{image_path}_{file_stat.st_size}_{file_stat.st_mtime}"
key_data = f"{file_info}_{size}_{quality}_{format}"
# SHA256ハッシュを使用
return hashlib.sha256(key_data.encode()).hexdigest()
def _create_thumbnail(
self,
image_path: str,
size: Tuple[int, int],
quality: int,
format: str,
cache_key: str
) -> Optional[str]:
"""サムネイルを作成"""
try:
# 画像を開く
with Image.open(image_path) as img:
# 画像の向きを自動修正
img = ImageOps.exif_transpose(img)
# リサイズ(アスペクト比を保持)
img.thumbnail(size, Image.Resampling.LANCZOS)
# サムネイルパスの決定
filename = f"{cache_key}.{format.lower()}"
thumbnail_path = self.cache_dir / filename
# 保存
save_kwargs = {'quality': quality, 'optimize': True}
if format.upper() == 'JPEG' and img.mode in ('RGBA', 'LA', 'P'):
# JPEGはアルファチャンネルに対応していない
background = Image.new('RGB', img.size, (255, 255, 255))
background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None)
img = background
img.save(thumbnail_path, format=format, **save_kwargs)
return str(thumbnail_path)
except Exception as e:
print(f"サムネイル生成エラー ({image_path}): {e}")
return None
def _remove_cache_entry(self, entry: ThumbnailCacheEntry):
"""キャッシュエントリーを削除"""
try:
if os.path.exists(entry.thumbnail_path):
os.remove(entry.thumbnail_path)
self.stats['total_size_bytes'] -= entry.file_size
self.stats['cleaned'] += 1
except Exception as e:
print(f"キャッシュファイル削除エラー: {e}")
def _cleanup_cache(self):
"""キャッシュをクリーンアップ"""
print("キャッシュクリーンアップを実行...")
# エントリーをソート(最終アクセスが古い順)
sorted_entries = sorted(
self.entries.items(),
key=lambda x: x[1].last_accessed
)
# キャッシュサイズが制限以下になるまで削除
while (self.stats['total_size_bytes'] > self.max_cache_size_bytes * 0.8 and
sorted_entries):
key, entry = sorted_entries.pop(0)
# アクセス頻度が低いものを優先的に削除
if entry.access_count < 2 or entry.idle_seconds > (7 * 24 * 3600):
self._remove_cache_entry(entry)
del self.entries[key]
# 期限切れのキャッシュも削除
keys_to_remove = []
for key, entry in self.entries.items():
if entry.is_expired(self.max_age_seconds):
self._remove_cache_entry(entry)
keys_to_remove.append(key)
for key in keys_to_remove:
del self.entries[key]
# インデックスの保存
self._save_index()
print(f"クリーンアップ完了: {len(keys_to_remove)} エントリー削除")
def _save_index(self):
"""キャッシュインデックスを保存"""
try:
# シリアライズ可能な形式に変換
index_data = {}
for key, entry in self.entries.items():
entry_dict = asdict(entry)
# タプルをリストに変換
entry_dict['size'] = list(entry_dict['size'])
index_data[key] = entry_dict
with open(self.index_file, 'w', encoding='utf-8') as f:
json.dump(index_data, f, indent=2, ensure_ascii=False)
except Exception as e:
print(f"キャッシュインデックスの保存エラー: {e}")
def _update_stats(self):
"""キャッシュ統計を更新"""
self.stats['total_size_bytes'] = 0
for entry in self.entries.values():
if os.path.exists(entry.thumbnail_path):
try:
entry.file_size = os.path.getsize(entry.thumbnail_path)
self.stats['total_size_bytes'] += entry.file_size
except:
entry.file_size = 0
def _schedule_cleanup(self):
"""定期的なクリーンアップをスケジュール"""
# 最後のクリーンアップ時間を記録
cleanup_file = self.cache_dir / "last_cleanup.txt"
if cleanup_file.exists():
try:
with open(cleanup_file, 'r') as f:
last_cleanup = float(f.read().strip())
except:
last_cleanup = 0
else:
last_cleanup = 0
# クリーンアップ間隔を過ぎていたら実行
if time.time() - last_cleanup > self.cleanup_interval_seconds:
self._cleanup_cache()
# クリーンアップ時間を記録
with open(cleanup_file, 'w') as f:
f.write(str(time.time()))
# 使用例
def main():
"""サムネイルキャッシュの使用例"""
# キャッシュシステムの作成
cache = ThumbnailCache(
cache_dir=".thumbnail_cache",
max_cache_size_mb=100, # 100MBまで
max_age_days=7, # 7日間保持
cleanup_interval_hours=1 # 1時間ごとにクリーンアップチェック
)
print("=== 単一画像のサムネイル取得 ===")
# サムネイルを取得(キャッシュがあればそれを返す)
thumbnail_path = cache.get_or_create_thumbnail(
image_path="photo1.jpg",
size=(300, 300),
quality=85,
format="JPEG"
)
if thumbnail_path:
print(f"サムネイルパス: {thumbnail_path}")
print("\n=== 同じ画像の再度取得(キャッシュヒット) ===")
# 同じパラメータで再度取得(キャッシュから)
thumbnail_path2 = cache.get_or_create_thumbnail(
image_path="photo1.jpg",
size=(300, 300),
quality=85,
format="JPEG"
)
print(f"キャッシュから取得: {thumbnail_path2}")
print("\n=== バッチ取得 ===")
# 複数画像のサムネイルを一括取得
image_paths = ["photo1.jpg", "photo2.jpg", "photo3.jpg"]
results = cache.get_thumbnail_batch(
image_paths=image_paths,
size=(200, 200),
quality=75
)
for img_path, thumb_path in results.items():
status = "成功" if thumb_path else "失敗"
print(f"{os.path.basename(img_path)}: {status}")
print("\n=== 事前生成 ===")
# 複数サイズのサムネイルを事前生成
sizes = [(100, 100), (300, 300), (600, 600)]
pregen_results = cache.pregenerate_thumbnails(
image_paths=["photo1.jpg", "photo2.jpg"],
sizes=sizes,
qualities=[75, 85],
formats=["JPEG", "WEBP"]
)
for img_path, thumb_paths in pregen_results.items():
print(f"{os.path.basename(img_path)}: {len(thumb_paths)}個のサムネイルを生成")
print("\n=== キャッシュ情報 ===")
# キャッシュ情報を表示
cache_info = cache.get_cache_info()
print(f"総エントリー数: {cache_info['total_entries']}")
print(f"キャッシュサイズ: {cache_info['total_size_mb']:.2f} MB")
print(f"ヒット率: {cache_info['hits_ratio']:.1%}")
print("\nサイズ別分布:")
for size, count in cache_info['size_groups'].items():
print(f" {size}: {count}個")
print("\n=== キャッシュのクリア ===")
# 3日以上前のキャッシュをクリア
cache.clear_cache(older_than_days=3)
# キャッシュ情報を再表示
cache_info = cache.get_cache_info()
print(f"クリア後のエントリー数: {cache_info['total_entries']}")
if __name__ == "__main__":
main()
上級問題(3問)
問題10:スマート画像最適化システム
from PIL import Image, ImageOps, ImageStat, ImageEnhance
import os
from typing import Dict, Any, Tuple, Optional, List
from dataclasses import dataclass
from pathlib import Path
import numpy as np
from sklearn.cluster import KMeans
import colorsys
@dataclass
class ImageAnalysis:
"""画像分析結果"""
image_type: str # 'photo', 'screenshot', 'graphic', 'text', 'mixed'
color_complexity: float # 0-1, 色の複雑さ
contrast: float # 0-1, コントラスト
sharpness: float # 0-1, シャープネス
noise_level: float # 0-1, ノイズレベル
has_transparency: bool # 透過情報があるか
dominant_colors: List[Tuple[int, int, int]] # 支配色
suggested_format: str # 推奨フォーマット
suggested_quality: int # 推奨品質
compression_method: str # 圧縮方法
class SmartImageOptimizer:
"""スマート画像最適化システム"""
def __init__(self, analysis_cache_dir: Optional[str] = None):
"""
Args:
analysis_cache_dir: 分析結果のキャッシュディレクトリ
"""
self.analysis_cache_dir = analysis_cache_dir
if analysis_cache_dir:
Path(analysis_cache_dir).mkdir(parents=True, exist_ok=True)
# フォーマットごとの特徴
self.format_features = {
'JPEG': {
'best_for': ['photo', 'mixed'],
'lossy': True,
'supports_transparency': False,
'typical_quality_range': (60, 95)
},
'PNG': {
'best_for': ['screenshot', 'graphic', 'text'],
'lossy': False,
'supports_transparency': True,
'typical_quality_range': None
},
'WEBP': {
'best_for': ['photo', 'graphic', 'mixed'],
'lossy': True,
'supports_transparency': True,
'typical_quality_range': (50, 90)
}
}
def analyze_image(self, image_path: str) -> ImageAnalysis:
"""
画像を分析して最適化設定を提案
Args:
image_path: 画像パス
Returns:
ImageAnalysis: 分析結果
"""
# キャッシュチェック
if self.analysis_cache_dir:
cache_path = self._get_cache_path(image_path)
if os.path.exists(cache_path):
try:
return self._load_from_cache(cache_path)
except:
pass
try:
with Image.open(image_path) as img:
# 基本情報
width, height = img.size
mode = img.mode
has_transparency = mode in ('RGBA', 'LA', 'P')
# 画像タイプの判定
image_type = self._determine_image_type(img)
# 色の複雑さを分析
color_complexity = self._analyze_color_complexity(img)
# コントラスト分析
contrast = self._analyze_contrast(img)
# シャープネス分析
sharpness = self._analyze_sharpness(img)
# ノイズレベル分析
noise_level = self._analyze_noise_level(img)
# 支配色の抽出
dominant_colors = self._extract_dominant_colors(img, num_colors=5)
# 最適なフォーマットの提案
suggested_format = self._suggest_format(
image_type, has_transparency, color_complexity
)
# 最適な品質の提案
suggested_quality = self._suggest_quality(
suggested_format, image_type, color_complexity, contrast
)
# 圧縮方法の提案
compression_method = self._suggest_compression_method(
suggested_format, image_type
)
# 分析結果の作成
analysis = ImageAnalysis(
image_type=image_type,
color_complexity=color_complexity,
contrast=contrast,
sharpness=sharpness,
noise_level=noise_level,
has_transparency=has_transparency,
dominant_colors=dominant_colors,
suggested_format=suggested_format,
suggested_quality=suggested_quality,
compression_method=compression_method
)
# キャッシュに保存
if self.analysis_cache_dir:
self._save_to_cache(analysis, cache_path)
return analysis
except Exception as e:
# エラー時のデフォルト設定
return ImageAnalysis(
image_type='mixed',
color_complexity=0.5,
contrast=0.5,
sharpness=0.5,
noise_level=0.1,
has_transparency=False,
dominant_colors=[],
suggested_format='JPEG',
suggested_quality=85,
compression_method='standard'
)
def optimize_image(
self,
image_path: str,
output_path: Optional[str] = None,
target_size_kb: Optional[float] = None,
max_width: Optional[int] = None,
max_height: Optional[int] = None,
use_smart_settings: bool = True,
preserve_metadata: bool = True
) -> Dict[str, Any]:
"""
画像をスマート最適化
Args:
image_path: 入力画像パス
output_path: 出力画像パス
target_size_kb: 目標ファイルサイズ(KB)
max_width: 最大幅
max_height: 最大高さ
use_smart_settings: スマート設定を使用するか
preserve_metadata: メタデータを保持するか
Returns:
Dict[str, Any]: 最適化結果
"""
try:
# 出力パスの決定
if output_path is None:
base, ext = os.path.splitext(image_path)
output_path = f"{base}_optimized{ext}"
original_size = os.path.getsize(image_path)
# 画像分析
if use_smart_settings:
analysis = self.analyze_image(image_path)
format = analysis.suggested_format
quality = analysis.suggested_quality
compression_method = analysis.compression_method
else:
# デフォルト設定
format = 'JPEG'
quality = 85
compression_method = 'standard'
# 画像を開く
with Image.open(image_path) as img:
# リサイズ
if max_width or max_height:
img = self._smart_resize(img, max_width, max_height)
# 画像タイプに応じた前処理
if use_smart_settings:
img = self._apply_smart_preprocessing(img, analysis)
# 保存パラメータの設定
save_params = self._get_save_parameters(
format, quality, compression_method, preserve_metadata, img.info
)
# ターゲットサイズがある場合、品質を調整
if target_size_kb:
img, save_params = self._adjust_for_target_size(
img, format, target_size_kb, save_params
)
# 保存
img.save(output_path, **save_params)
optimized_size = os.path.getsize(output_path)
# 結果の計算
size_reduction = (original_size - optimized_size) / original_size * 100
result = {
'input_path': image_path,
'output_path': output_path,
'original_size_kb': original_size / 1024,
'optimized_size_kb': optimized_size / 1024,
'size_reduction_percent': size_reduction,
'format': format,
'quality': save_params.get('quality', quality),
'compression_method': compression_method,
'success': True
}
if use_smart_settings:
result['analysis'] = {
'image_type': analysis.image_type,
'color_complexity': analysis.color_complexity,
'contrast': analysis.contrast,
'sharpness': analysis.sharpness
}
return result
except Exception as e:
return {
'input_path': image_path,
'output_path': output_path or '',
'error': str(e),
'success': False
}
def optimize_directory(
self,
input_dir: str,
output_dir: str,
target_size_kb: Optional[float] = None,
max_width: Optional[int] = None,
max_height: Optional[int] = None,
extensions: Optional[List[str]] = None,
recursive: bool = False
) -> Dict[str, Any]:
"""
ディレクトリ内の画像をスマート最適化
Args:
input_dir: 入力ディレクトリ
output_dir: 出力ディレクトリ
target_size_kb: 目標ファイルサイズ
max_width: 最大幅
max_height: 最大高さ
extensions: 処理する拡張子
recursive: 再帰的に処理
Returns:
Dict[str, Any]: 処理結果
"""
if extensions is None:
extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff']
# ファイル収集
files_to_process = []
if recursive:
for root, _, filenames in os.walk(input_dir):
for filename in filenames:
if any(filename.lower().endswith(ext) for ext in extensions):
input_path = os.path.join(root, filename)
rel_path = os.path.relpath(root, input_dir)
files_to_process.append((input_path, rel_path))
else:
for filename in os.listdir(input_dir):
if any(filename.lower().endswith(ext) for ext in extensions):
input_path = os.path.join(input_dir, filename)
if os.path.isfile(input_path):
files_to_process.append((input_path, ''))
print(f"最適化対象ファイル数: {len(files_to_process)}")
# 出力ディレクトリの作成
Path(output_dir).mkdir(parents=True, exist_ok=True)
results = {
'total': len(files_to_process),
'success': 0,
'failed': 0,
'total_original_size_kb': 0,
'total_optimized_size_kb': 0,
'details': []
}
for input_path, rel_path in files_to_process:
# 出力パスの構築
filename = os.path.basename(input_path)
if rel_path:
output_subdir = os.path.join(output_dir, rel_path)
Path(output_subdir).mkdir(parents=True, exist_ok=True)
output_path = os.path.join(output_subdir, filename)
else:
output_path = os.path.join(output_dir, filename)
# 最適化実行
result = self.optimize_image(
image_path=input_path,
output_path=output_path,
target_size_kb=target_size_kb,
max_width=max_width,
max_height=max_height,
use_smart_settings=True,
preserve_metadata=True
)
# 結果の記録
results['details'].append(result)
if result['success']:
results['success'] += 1
results['total_original_size_kb'] += result['original_size_kb']
results['total_optimized_size_kb'] += result['optimized_size_kb']
reduction = result['size_reduction_percent']
print(f"✓ {filename}: {reduction:.1f}%削減 ({result['format']}, Q{result['quality']})")
else:
results['failed'] += 1
print(f"✗ {filename}: {result['error']}")
# 統計計算
if results['success'] > 0:
total_saving_kb = results['total_original_size_kb'] - results['total_optimized_size_kb']
total_saving_mb = total_saving_kb / 1024
avg_reduction = (total_saving_kb / results['total_original_size_kb']) * 100
results['total_saving_mb'] = total_saving_mb
results['average_reduction_percent'] = avg_reduction
# サマリー表示
self._print_optimization_summary(results)
return results
def _determine_image_type(self, img: Image.Image) -> str:
"""画像タイプを判定"""
# サイズとアスペクト比
width, height = img.size
aspect_ratio = width / height
# カラーヒストグラムを分析
if img.mode != 'RGB':
rgb_img = img.convert('RGB')
else:
rgb_img = img
# 色の数を概算
pixels = np.array(rgb_img).reshape(-1, 3)
unique_colors = len(np.unique(pixels, axis=0))
total_pixels = width * height
color_density = unique_colors / total_pixels
# エッジ検出によるテキスト/グラフィック判定
from PIL import ImageFilter
edges = rgb_img.filter(ImageFilter.FIND_EDGES)
edge_array = np.array(edges.convert('L'))
edge_density = np.sum(edge_array > 50) / total_pixels
# 判定ロジック
if color_density > 0.5 and edge_density < 0.1:
# 色が豊富でエッジが少ない → 写真
return 'photo'
elif edge_density > 0.3:
# エッジが多い → スクリーンショットやテキスト
if color_density < 0.1:
return 'text'
else:
return 'screenshot'
elif unique_colors < 256:
# 色数が少ない → グラフィック
return 'graphic'
else:
return 'mixed'
def _analyze_color_complexity(self, img: Image.Image) -> float:
"""色の複雑さを分析 (0-1)"""
if img.mode != 'RGB':
rgb_img = img.convert('RGB')
else:
rgb_img = img
pixels = np.array(rgb_img).reshape(-1, 3)
# ユニークな色の割合
unique_colors = len(np.unique(pixels, axis=0))
total_pixels = pixels.shape[0]
# 色の分散
color_std = np.std(pixels, axis=0).mean() / 255
# 複合スコア
complexity = min(1.0, (unique_colors / 10000) * 0.5 + color_std * 0.5)
return complexity
def _analyze_contrast(self, img: Image.Image) -> float:
"""コントラストを分析 (0-1)"""
if img.mode != 'L':
gray_img = img.convert('L')
else:
gray_img = img
# ヒストグラムを計算
hist = gray_img.histogram()
# 明るさの範囲
min_brightness = next(i for i, count in enumerate(hist) if count > 0)
max_brightness = next(i for i, count in enumerate(reversed(hist)) if count > 0)
max_brightness = 255 - max_brightness
contrast_range = (max_brightness - min_brightness) / 255
# ヒストグラムの分散
pixels = np.array(gray_img).flatten()
contrast_std = np.std(pixels) / 128
# 複合スコア
contrast = min(1.0, contrast_range * 0.7 + contrast_std * 0.3)
return contrast
def _analyze_sharpness(self, img: Image.Image) -> float:
"""シャープネスを分析 (0-1)"""
from PIL import ImageFilter
if img.mode != 'L':
gray_img = img.convert('L')
else:
gray_img = img
# ラプラシアンフィルタでエッジを強調
laplacian = gray_img.filter(ImageFilter.Kernel(
(3, 3),
[0, 1, 0, 1, -4, 1, 0, 1, 0],
1
))
laplacian_array = np.array(laplacian)
# エッジ強度の分散
sharpness = np.std(laplacian_array) / 128
sharpness = min(1.0, max(0.0, sharpness))
return sharpness
def _analyze_noise_level(self, img: Image.Image) -> float:
"""ノイズレベルを分析 (0-1)"""
if img.mode != 'L':
gray_img = img.convert('L')
else:
gray_img = img
from PIL import ImageFilter
# 平滑化した画像
smoothed = gray_img.filter(ImageFilter.GaussianBlur(radius=1))
# 元画像との差分
diff = np.abs(np.array(gray_img) - np.array(smoothed))
# 差分の平均値
noise_level = np.mean(diff) / 128
noise_level = min(1.0, max(0.0, noise_level))
return noise_level
def _extract_dominant_colors(
self,
img: Image.Image,
num_colors: int = 5
) -> List[Tuple[int, int, int]]:
"""支配色を抽出"""
if img.mode != 'RGB':
rgb_img = img.convert('RGB')
else:
rgb_img = img
# ダウンサンプリングして処理を軽量化
small_img = rgb_img.resize((100, 100), Image.Resampling.LANCZOS)
pixels = np.array(small_img).reshape(-1, 3)
# K-meansでクラスタリング
if len(pixels) > num_colors:
kmeans = KMeans(n_clusters=num_colors, random_state=42, n_init=10)
kmeans.fit(pixels)
colors = kmeans.cluster_centers_.astype(int)
else:
colors = pixels[:num_colors]
return [tuple(color) for color in colors]
def _suggest_format(
self,
image_type: str,
has_transparency: bool,
color_complexity: float
) -> str:
"""最適なフォーマットを提案"""
if has_transparency:
if color_complexity < 0.3:
return 'PNG' # シンプルな透過グラフィック
else:
return 'WEBP' # 複雑な透過画像
if image_type in ['screenshot', 'graphic', 'text']:
if color_complexity < 0.2:
return 'PNG' # ロスレス圧縮が適している
else:
return 'WEBP' # 良い圧縮率
# 写真や混合画像
if color_complexity > 0.5:
return 'WEBP' # 高圧縮率
else:
return 'JPEG' # 広くサポートされている
def _suggest_quality(
self,
format: str,
image_type: str,
color_complexity: float,
contrast: float
) -> int:
"""最適な品質を提案"""
if format == 'PNG':
return 100 # PNGは品質設定なし
base_quality = {
'photo': 85,
'screenshot': 90,
'graphic': 95,
'text': 100,
'mixed': 85
}.get(image_type, 85)
# 色の複雑さに応じて調整
if color_complexity > 0.7:
base_quality += 5
elif color_complexity < 0.3:
base_quality -= 10
# コントラストに応じて調整
if contrast > 0.7:
base_quality += 5
# フォーマットに応じて範囲を制限
if format == 'JPEG':
return max(60, min(95, base_quality))
elif format == 'WEBP':
return max(50, min(90, base_quality - 5))
return base_quality
def _suggest_compression_method(self, format: str, image_type: str) -> str:
"""圧縮方法を提案"""
if format == 'JPEG':
if image_type == 'photo':
return 'progressive' # プログレッシブJPEG
else:
return 'baseline'
elif format == 'PNG':
if image_type in ['graphic', 'text']:
return 'palette' # パレット化
else:
return 'deflate'
elif format == 'WEBP':
if image_type in ['photo', 'mixed']:
return 'lossy'
else:
return 'lossless'
return 'standard'
def _smart_resize(
self,
img: Image.Image,
max_width: Optional[int],
max_height: Optional[int]
) -> Image.Image:
"""スマートリサイズ"""
if max_width is None and max_height is None:
return img
width, height = img.size
# 元のアスペクト比
aspect_ratio = width / height
# ターゲットサイズの計算
if max_width and max_height:
target_aspect = max_width / max_height
if aspect_ratio > target_aspect:
# 横長の画像
new_width = max_width
new_height = int(max_width / aspect_ratio)
else:
# 縦長の画像
new_height = max_height
new_width = int(max_height * aspect_ratio)
elif max_width:
new_width = max_width
new_height = int(max_width / aspect_ratio)
elif max_height:
new_height = max_height
new_width = int(max_height * aspect_ratio)
else:
return img
# リサイズ実行
return img.resize((new_width, new_height), Image.Resampling.LANCZOS)
def _apply_smart_preprocessing(
self,
img: Image.Image,
analysis: ImageAnalysis
) -> Image.Image:
"""画像タイプに応じた前処理"""
result = img.copy()
# ノイズリダクション
if analysis.noise_level > 0.3 and analysis.image_type == 'photo':
from PIL import ImageFilter
result = result.filter(ImageFilter.GaussianBlur(radius=0.5))
# シャープネス強化
if analysis.sharpness < 0.3 and analysis.image_type in ['text', 'screenshot']:
enhancer = ImageEnhance.Sharpness(result)
result = enhancer.enhance(1.3)
# コントラスト調整
if analysis.contrast < 0.4:
enhancer = ImageEnhance.Contrast(result)
result = enhancer.enhance(1.2)
return result
def _get_save_parameters(
self,
format: str,
quality: int,
compression_method: str,
preserve_metadata: bool,
original_info: dict
) -> Dict[str, Any]:
"""保存パラメータを取得"""
params = {}
if preserve_metadata:
params.update(original_info)
if format == 'JPEG':
params.update({
'format': 'JPEG',
'quality': quality,
'optimize': True,
'progressive': (compression_method == 'progressive')
})
elif format == 'PNG':
params.update({
'format': 'PNG',
'optimize': True,
'compress_level': 9 if compression_method == 'deflate' else 6
})
if compression_method == 'palette':
# パレット化
params['bits'] = 8
elif format == 'WEBP':
params.update({
'format': 'WEBP',
'quality': quality,
'method': 6, # 圧縮品質
'lossless': (compression_method == 'lossless')
})
return params
def _adjust_for_target_size(
self,
img: Image.Image,
format: str,
target_size_kb: float,
save_params: Dict[str, Any]
) -> Tuple[Image.Image, Dict[str, Any]]:
"""ターゲットサイズに合わせて調整"""
target_size_bytes = target_size_kb * 1024
# バイナリサーチで最適な品質を探す
low = 40
high = 95
best_img = img
best_params = save_params.copy()
for _ in range(10): # 最大10回試行
if low > high:
break
mid = (low + high) // 2
# 一時ファイルでテスト
from io import BytesIO
buffer = BytesIO()
test_params = best_params.copy()
if format in ['JPEG', 'WEBP']:
test_params['quality'] = mid
img.save(buffer, **test_params)
current_size = buffer.tell()
if abs(current_size - target_size_bytes) < target_size_bytes * 0.1:
# 目標サイズに十分近い
best_params = test_params
break
elif current_size > target_size_bytes:
# まだ大きいので品質を下げる
high = mid - 1
else:
# 小さいので品質を上げる
low = mid + 1
best_params = test_params
return best_img, best_params
def _get_cache_path(self, image_path: str) -> str:
"""キャッシュファイルのパスを取得"""
import hashlib
# ファイルのハッシュを計算
with open(image_path, 'rb') as f:
file_hash = hashlib.md5(f.read()).hexdigest()
filename = os.path.basename(image_path)
cache_filename = f"{filename}_{file_hash}.json"
return os.path.join(self.analysis_cache_dir, cache_filename)
def _save_to_cache(self, analysis: ImageAnalysis, cache_path: str):
"""分析結果をキャッシュに保存"""
import json
data = {
'image_type': analysis.image_type,
'color_complexity': analysis.color_complexity,
'contrast': analysis.contrast,
'sharpness': analysis.sharpness,
'noise_level': analysis.noise_level,
'has_transparency': analysis.has_transparency,
'dominant_colors': analysis.dominant_colors,
'suggested_format': analysis.suggested_format,
'suggested_quality': analysis.suggested_quality,
'compression_method': analysis.compression_method,
'timestamp': time.time()
}
with open(cache_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def _load_from_cache(self, cache_path: str) -> ImageAnalysis:
"""分析結果をキャッシュから読み込み"""
import json
import time
with open(cache_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# キャッシュの有効期限(1日)
if time.time() - data['timestamp'] > 86400:
raise ValueError("キャッシュが古いです")
return ImageAnalysis(
image_type=data['image_type'],
color_complexity=data['color_complexity'],
contrast=data['contrast'],
sharpness=data['sharpness'],
noise_level=data['noise_level'],
has_transparency=data['has_transparency'],
dominant_colors=[tuple(color) for color in data['dominant_colors']],
suggested_format=data['suggested_format'],
suggested_quality=data['suggested_quality'],
compression_method=data['compression_method']
)
def _print_optimization_summary(self, results: Dict[str, Any]):
"""最適化サマリーを表示"""
print("\n" + "="*60)
print("スマート最適化 サマリー")
print("="*60)
print(f"総ファイル数: {results['total']}")
print(f"成功: {results['success']}")
print(f"失敗: {results['failed']}")
if results['success'] > 0:
print(f"\n合計節約容量: {results.get('total_saving_mb', 0):.2f} MB")
print(f"平均サイズ削減率: {results.get('average_reduction_percent', 0):.1f}%")
# フォーマット分布
format_counts = {}
for detail in results['details']:
if detail['success']:
fmt = detail.get('format', 'unknown')
format_counts[fmt] = format_counts.get(fmt, 0) + 1
print("\n使用フォーマット:")
for fmt, count in format_counts.items():
percentage = count / results['success'] * 100
print(f" {fmt}: {count}ファイル ({percentage:.1f}%)")
print("="*60)
# 使用例
def main():
"""スマート画像最適化の使用例"""
# 最適化システムの作成
optimizer = SmartImageOptimizer(
analysis_cache_dir=".image_analysis_cache"
)
print("=== 単一画像の分析 ===")
# 画像を分析
analysis = optimizer.analyze_image("sample_photo.jpg")
print(f"画像タイプ: {analysis.image_type}")
print(f"色の複雑さ: {analysis.color_complexity:.2f}")
print(f"コントラスト: {analysis.contrast:.2f}")
print(f"シャープネス: {analysis.sharpness:.2f}")
print(f"推奨フォーマット: {analysis.suggested_format}")
print(f"推奨品質: {analysis.suggested_quality}")
print(f"圧縮方法: {analysis.compression_method}")
if analysis.dominant_colors:
print("支配色:")
for i, color in enumerate(analysis.dominant_colors[:3], 1):
print(f" {i}. RGB{color}")
print("\n=== 単一画像の最適化 ===")
# 画像を最適化
result = optimizer.optimize_image(
image_path="sample_photo.jpg",
output_path="sample_photo_optimized.jpg",
target_size_kb=500, # 500KBを目標
max_width=1920,
use_smart_settings=True
)
if result['success']:
print(f"最適化成功:")
print(f" 元サイズ: {result['original_size_kb']:.1f} KB")
print(f" 最適化後: {result['optimized_size_kb']:.1f} KB")
print(f" 削減率: {result['size_reduction_percent']:.1f}%")
print(f" フォーマット: {result['format']}")
print(f" 品質: {result['quality']}")
else:
print(f"最適化失敗: {result['error']}")
print("\n=== ディレクトリの一括最適化 ===")
# ディレクトリ内の画像を一括最適化
results = optimizer.optimize_directory(
input_dir="images",
output_dir="images_optimized",
target_size_kb=1000, # 1MBを目標
max_width=1600,
recursive=True
)
if __name__ == "__main__":
main()
問題11:プログレッシブ画像表示システム
from PIL import Image, ImageOps
import os
import json
import base64
from typing import Dict, List, Tuple, Optional
from pathlib import Path
from dataclasses import dataclass
import time
@dataclass
class ProgressiveImageLayer:
"""プログレッシブ画像のレイヤー"""
level: int # 解像度レベル (0: 最低, 1: 中, 2: 高)
width: int
height: int
quality: int
file_size: int
file_path: str
data_url: Optional[str] = None
@property
def resolution(self) -> str:
"""解像度文字列"""
return f"{self.width}x{self.height}"
@property
def size_kb(self) -> float:
"""ファイルサイズ(KB)"""
return self.file_size / 1024
class ProgressiveImageGenerator:
"""プログレッシブ画像生成システム"""
def __init__(
self,
output_dir: str = "progressive_images",
levels: int = 3,
base_width: int = 1600
):
"""
Args:
output_dir: 出力ディレクトリ
levels: 解像度レベル数
base_width: ベースとなる最大幅
"""
self.output_dir = Path(output_dir)
self.levels = levels
self.base_width = base_width
self.config_file = self.output_dir / "config.json"
# 出力ディレクトリの作成
self.output_dir.mkdir(parents=True, exist_ok=True)
def generate_progressive_image(
self,
image_path: str,
output_name: Optional[str] = None,
format: str = "WEBP",
base_quality: int = 85,
use_blur_placeholder: bool = True
) -> Dict[str, any]:
"""
プログレッシブ画像を生成
Args:
image_path: 入力画像パス
output_name: 出力ベース名
format: 出力フォーマット
base_quality: ベース品質
use_blur_placeholder: ぼかしプレースホルダーを使用するか
Returns:
Dict[str, any]: 生成結果
"""
try:
# 出力名の決定
if output_name is None:
output_name = Path(image_path).stem
# 画像を開く
with Image.open(image_path) as img:
original_width, original_height = img.size
# レイヤーを生成
layers = self._generate_layers(
img, output_name, format, base_quality
)
# プレースホルダーの生成
placeholder = None
if use_blur_placeholder:
placeholder = self._generate_placeholder(img, output_name)
# 設定ファイルの作成
config = self._create_config(
output_name, layers, placeholder, original_width, original_height
)
# マニフェストの保存
manifest_path = self.output_dir / f"{output_name}_manifest.json"
with open(manifest_path, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
# HTMLテンプレートの生成
html_template = self._generate_html_template(config, output_name)
result = {
'success': True,
'output_name': output_name,
'original_size': f"{original_width}x{original_height}",
'layers': len(layers),
'total_size_kb': sum(layer.size_kb for layer in layers),
'manifest_path': str(manifest_path),
'html_template': html_template,
'layers_info': [
{
'level': layer.level,
'resolution': layer.resolution,
'quality': layer.quality,
'size_kb': layer.size_kb
}
for layer in layers
]
}
if placeholder:
result['placeholder_size_kb'] = placeholder.size_kb
return result
except Exception as e:
return {
'success': False,
'error': str(e)
}
def generate_directory(
self,
input_dir: str,
extensions: Optional[List[str]] = None,
recursive: bool = False
) -> Dict[str, any]:
"""
ディレクトリ内の画像をプログレッシブ化
Args:
input_dir: 入力ディレクトリ
extensions: 処理する拡張子
recursive: 再帰的に処理
Returns:
Dict[str, any]: 処理結果
"""
if extensions is None:
extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp']
input_dir_path = Path(input_dir)
results = {
'total': 0,
'success': 0,
'failed': 0,
'images': []
}
# ファイル収集
files_to_process = []
if recursive:
for file_path in input_dir_path.rglob('*'):
if file_path.suffix.lower() in extensions:
files_to_process.append(file_path)
else:
for file_path in input_dir_path.iterdir():
if file_path.suffix.lower() in extensions:
files_to_process.append(file_path)
print(f"処理対象ファイル数: {len(files_to_process)}")
for file_path in files_to_process:
results['total'] += 1
# 相対パスを維持した出力名
if recursive:
rel_path = file_path.relative_to(input_dir_path)
output_name = str(rel_path.with_suffix('')).replace(os.sep, '_')
else:
output_name = file_path.stem
print(f"処理中: {file_path.name}...")
# プログレッシブ画像を生成
result = self.generate_progressive_image(
image_path=str(file_path),
output_name=output_name,
format="WEBP",
base_quality=80,
use_blur_placeholder=True
)
if result['success']:
results['success'] += 1
results['images'].append({
'name': output_name,
'layers': result['layers'],
'total_size_kb': result['total_size_kb']
})
print(f" ✓ 成功: {result['layers']}レイヤー, {result['total_size_kb']:.1f}KB")
else:
results['failed'] += 1
print(f" ✗ 失敗: {result['error']}")
# インデックスHTMLの生成
self._generate_index_html(results['images'])
return results
def _generate_layers(
self,
img: Image.Image,
output_name: str,
format: str,
base_quality: int
) -> List[ProgressiveImageLayer]:
"""解像度レイヤーを生成"""
layers = []
original_width, original_height = img.size
for level in range(self.levels):
# 解像度の計算
scale_factor = 2 ** (self.levels - level - 1)
target_width = min(original_width, self.base_width // scale_factor)
# アスペクト比を保持
aspect_ratio = original_width / original_height
target_height = int(target_width / aspect_ratio)
# リサイズ
if target_width < original_width:
resized_img = img.resize(
(target_width, target_height),
Image.Resampling.LANCZOS
)
else:
resized_img = img.copy()
# 品質の調整(低解像度ほど低品質)
quality = max(30, base_quality - (level * 15))
# ファイル名
filename = f"{output_name}_L{level}.{format.lower()}"
file_path = self.output_dir / filename
# 保存
save_params = {
'quality': quality,
'optimize': True
}
if format.upper() == 'WEBP':
save_params['method'] = 6
resized_img.save(file_path, format=format, **save_params)
# ファイルサイズ
file_size = os.path.getsize(file_path)
# データURLの生成(小さな画像のみ)
if level == self.levels - 1: # 最低解像度
data_url = self._create_data_url(resized_img, format)
else:
data_url = None
# レイヤーの作成
layer = ProgressiveImageLayer(
level=level,
width=target_width,
height=target_height,
quality=quality,
file_size=file_size,
file_path=str(file_path),
data_url=data_url
)
layers.append(layer)
print(f" レイヤー {level}: {layer.resolution}, Q{quality}, {layer.size_kb:.1f}KB")
return layers
def _generate_placeholder(
self,
img: Image.Image,
output_name: str
) -> ProgressiveImageLayer:
"""ぼかしプレースホルダーを生成"""
# 非常に小さいサイズにリサイズ
placeholder_width = 20
aspect_ratio = img.width / img.height
placeholder_height = int(placeholder_width / aspect_ratio)
# リサイズ
small_img = img.resize(
(placeholder_width, placeholder_height),
Image.Resampling.LANCZOS
)
# 強くぼかす
from PIL import ImageFilter
blurred_img = small_img.filter(ImageFilter.GaussianBlur(radius=5))
# 保存
filename = f"{output_name}_placeholder.webp"
file_path = self.output_dir / filename
blurred_img.save(
file_path,
format='WEBP',
quality=30,
method=3
)
file_size = os.path.getsize(file_path)
# データURLの生成
data_url = self._create_data_url(blurred_img, 'WEBP')
return ProgressiveImageLayer(
level=-1, # プレースホルダー
width=placeholder_width,
height=placeholder_height,
quality=30,
file_size=file_size,
file_path=str(file_path),
data_url=data_url
)
def _create_data_url(self, img: Image.Image, format: str) -> str:
"""画像をデータURLに変換"""
from io import BytesIO
buffer = BytesIO()
if format.upper() == 'WEBP':
img.save(buffer, format='WEBP', quality=50, method=3)
mime_type = 'image/webp'
elif format.upper() == 'JPEG':
img.save(buffer, format='JPEG', quality=50, optimize=True)
mime_type = 'image/jpeg'
else:
img.save(buffer, format='PNG', optimize=True)
mime_type = 'image/png'
base64_data = base64.b64encode(buffer.getvalue()).decode('utf-8')
return f"data:{mime_type};base64,{base64_data}"
def _create_config(
self,
output_name: str,
layers: List[ProgressiveImageLayer],
placeholder: Optional[ProgressiveImageLayer],
original_width: int,
original_height: int
) -> Dict[str, any]:
"""設定ファイルを作成"""
config = {
'name': output_name,
'original_size': {
'width': original_width,
'height': original_height
},
'layers': [],
'loading_strategy': 'progressive',
'generated_at': time.time()
}
# レイヤー情報
for layer in layers:
layer_info = {
'level': layer.level,
'width': layer.width,
'height': layer.height,
'quality': layer.quality,
'file_size': layer.file_size,
'file_path': Path(layer.file_path).name,
'url': f"./{Path(layer.file_path).name}"
}
if layer.data_url:
layer_info['data_url'] = layer.data_url
config['layers'].append(layer_info)
# プレースホルダー情報
if placeholder:
config['placeholder'] = {
'width': placeholder.width,
'height': placeholder.height,
'file_size': placeholder.file_size,
'data_url': placeholder.data_url
}
return config
def _generate_html_template(self, config: Dict[str, any], output_name: str) -> str:
"""HTMLテンプレートを生成"""
html = f'''<!DOCTYPE html>
<html lang="ja">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>プログレッシブ画像: {config['name']}</title>
<style>
* {{
margin: 0;
padding: 0;
box-sizing: border-box;
}}
body {{
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
line-height: 1.6;
padding: 20px;
background: #f5f5f5;
color: #333;
}}
.container {{
max-width: 1200px;
margin: 0 auto;
}}
header {{
margin-bottom: 30px;
text-align: center;
}}
h1 {{
margin-bottom: 10px;
color: #2c3e50;
}}
.image-container {{
position: relative;
width: 100%;
max-width: {config['original_size']['width']}px;
margin: 0 auto 30px;
background: #fff;
border-radius: 8px;
overflow: hidden;
box-shadow: 0 4px 12px rgba(0,0,0,0.1);
}}
.progressive-image {{
width: 100%;
height: auto;
display: block;
transition: filter 0.3s ease;
}}
.loading {{
filter: blur(10px);
}}
.loaded {{
filter: blur(0);
}}
.placeholder {{
position: absolute;
top: 0;
left: 0;
width: 100%;
height: 100%;
background-size: cover;
background-position: center;
}}
.controls {{
display: flex;
flex-wrap: wrap;
gap: 15px;
margin-bottom: 20px;
padding: 15px;
background: white;
border-radius: 8px;
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
}}
.control-group {{
display: flex;
align-items: center;
gap: 10px;
}}
select, button {{
padding: 8px 16px;
border: 1px solid #ddd;
border-radius: 4px;
background: white;
font-size: 14px;
cursor: pointer;
}}
button:hover {{
background: #f0f0f0;
}}
.stats {{
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 15px;
margin-top: 20px;
}}
.stat-card {{
background: white;
padding: 15px;
border-radius: 8px;
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
}}
.stat-title {{
font-weight: 600;
color: #666;
margin-bottom: 5px;
font-size: 14px;
}}
.stat-value {{
font-size: 18px;
color: #2c3e50;
}}
.loading-indicator {{
display: none;
position: absolute;
top: 50%;
left: 50%;
transform: translate(-50%, -50%);
background: rgba(0,0,0,0.7);
color: white;
padding: 10px 20px;
border-radius: 20px;
font-size: 14px;
z-index: 10;
}}
.loading-indicator.active {{
display: block;
}}
@media (max-width: 768px) {{
.controls {{
flex-direction: column;
}}
.control-group {{
flex-direction: column;
align-items: flex-start;
}}
}}
</style>
</head>
<body>
<div class="container">
<header>
<h1>プログレッシブ画像表示</h1>
<p>画像: {config['name']} ({config['original_size']['width']}×{config['original_size']['height']})</p>
</header>
<div class="controls">
<div class="control-group">
<label for="loadStrategy">読み込み戦略:</label>
<select id="loadStrategy">
<option value="progressive" selected>プログレッシブ</option>
<option value="sequential">順次</option>
<option value="eager">即時</option>
</select>
</div>
<div class="control-group">
<label for="simulateSlow">速度シミュレーション:</label>
<select id="simulateSlow">
<option value="0">高速 (0ms)</option>
<option value="500">低速 (500ms)</option>
<option value="1000">非常に遅い (1000ms)</option>
</select>
</div>
<button id="resetBtn">リセット</button>
<button id="showInfoBtn">情報表示</button>
</div>
<div class="image-container">
<div id="loadingIndicator" class="loading-indicator">
読み込み中...
</div>
<div id="placeholder" class="placeholder" style="background-image: url('{config['placeholder']['data_url'] if 'placeholder' in config else ''}');"></div>
<img id="mainImage" class="progressive-image loading" alt="{config['name']}">
</div>
<div class="stats">
<div class="stat-card">
<div class="stat-title">現在の解像度</div>
<div class="stat-value" id="currentResolution">-</div>
</div>
<div class="stat-card">
<div class="stat-title">読み込み済みレイヤー</div>
<div class="stat-value" id="loadedLayers">0 / {len(config['layers'])}</div>
</div>
<div class="stat-card">
<div class="stat-title">転送済みデータ</div>
<div class="stat-value" id="loadedSize">0 KB</div>
</div>
<div class="stat-card">
<div class="stat-title">読み込み時間</div>
<div class="stat-value" id="loadTime">0 ms</div>
</div>
</div>
</div>
<script>
const config = {json.dumps(config, indent=2)};
class ProgressiveImageLoader {{
constructor() {{
this.layers = config.layers.sort((a, b) => a.level - b.level);
this.currentLayer = -1;
this.loadedLayers = new Set();
this.loadedSize = 0;
this.startTime = null;
this.strategy = 'progressive';
this.simulateDelay = 0;
this.elements = {{
image: document.getElementById('mainImage'),
placeholder: document.getElementById('placeholder'),
loadingIndicator: document.getElementById('loadingIndicator'),
currentResolution: document.getElementById('currentResolution'),
loadedLayers: document.getElementById('loadedLayers'),
loadedSize: document.getElementById('loadedSize'),
loadTime: document.getElementById('loadTime')
}};
this.init();
}}
init() {{
// イベントリスナーの設定
document.getElementById('loadStrategy').addEventListener('change', (e) => {{
this.strategy = e.target.value;
this.reset();
}});
document.getElementById('simulateSlow').addEventListener('change', (e) => {{
this.simulateDelay = parseInt(e.target.value);
}});
document.getElementById('resetBtn').addEventListener('click', () => {{
this.reset();
}});
document.getElementById('showInfoBtn').addEventListener('click', () => {{
this.showInfo();
}});
// 画像読み込みイベント
this.elements.image.addEventListener('load', () => {{
this.onImageLoaded();
}});
// 初期読み込み
this.startLoading();
}}
startLoading() {{
this.startTime = Date.now();
this.elements.loadingIndicator.classList.add('active');
switch(this.strategy) {{
case 'progressive':
this.loadProgressive();
break;
case 'sequential':
this.loadSequential();
break;
case 'eager':
this.loadEager();
break;
}}
}}
async loadProgressive() {{
// 低解像度から順に読み込み
for (let i = this.layers.length - 1; i >= 0; i--) {{
await this.loadLayer(i);
// 十分な解像度になったら中断
if (i === 0) break; // 最高解像度まで続行
const currentWidth = this.elements.image.naturalWidth || 0;
const viewportWidth = window.innerWidth;
if (currentWidth >= viewportWidth * 0.8) {{
// ビューポートの80%以上の解像度があれば中断
break;
}}
}}
this.finishLoading();
}}
async loadSequential() {{
// 低解像度から高解像度へ順次読み込み
for (let i = this.layers.length - 1; i >= 0; i--) {{
await this.loadLayer(i);
}}
this.finishLoading();
}}
async loadEager() {{
// 最高解像度を直接読み込み
await this.loadLayer(0);
this.finishLoading();
}}
async loadLayer(layerIndex) {{
const layer = this.layers[layerIndex];
// シミュレーション遅延
if (this.simulateDelay > 0) {{
await new Promise(resolve => setTimeout(resolve, this.simulateDelay));
}}
return new Promise((resolve) => {{
// データURLがある場合は即時表示
if (layer.data_url && layerIndex === this.layers.length - 1) {{
this.elements.image.src = layer.data_url;
this.elements.image.classList.remove('loading');
this.elements.placeholder.style.display = 'none';
}}
// 画像をプリロード
const img = new Image();
img.onload = () => {{
// メイン画像を更新
this.elements.image.src = layer.url;
this.currentLayer = layerIndex;
// 統計の更新
this.loadedLayers.add(layerIndex);
this.loadedSize += layer.file_size;
this.updateStats();
// ぼかし効果の削除
if (layerIndex === 0) {{
this.elements.image.classList.remove('loading');
this.elements.image.classList.add('loaded');
this.elements.placeholder.style.display = 'none';
}}
resolve();
}};
img.src = layer.url;
}});
}}
updateStats() {{
if (this.currentLayer >= 0) {{
const layer = this.layers[this.currentLayer];
this.elements.currentResolution.textContent = `${{layer.width}}×${{layer.height}}`;
}}
this.elements.loadedLayers.textContent = `${{this.loadedLayers.size}} / ${{this.layers.length}}`;
this.elements.loadedSize.textContent = `${{(this.loadedSize / 1024).toFixed(1)}} KB`;
const elapsed = Date.now() - this.startTime;
this.elements.loadTime.textContent = `${{elapsed}} ms`;
}}
finishLoading() {{
this.elements.loadingIndicator.classList.remove('active');
console.log('画像読み込み完了:', {{
総レイヤー数: this.layers.length,
読み込み済みレイヤー数: this.loadedLayers.size,
総データサイズ: (this.loadedSize / 1024).toFixed(1) + ' KB',
所要時間: (Date.now() - this.startTime) + ' ms'
}});
}}
reset() {{
// リセット
this.currentLayer = -1;
this.loadedLayers.clear();
this.loadedSize = 0;
// UIのリセット
this.elements.image.src = '';
this.elements.image.classList.add('loading');
this.elements.image.classList.remove('loaded');
this.elements.placeholder.style.display = 'block';
this.updateStats();
// 再読み込み
setTimeout(() => this.startLoading(), 100);
}}
showInfo() {{
let info = `画像情報:\\n`;
info += `名前: ${{config.name}}\\n`;
info += `元のサイズ: ${{config.original_size.width}}×${{config.original_size.height}}\\n`;
info += `レイヤー数: ${{config.layers.length}}\\n\\n`;
info += `レイヤー詳細:\\n`;
config.layers.forEach((layer, index) => {{
const loaded = this.loadedLayers.has(index) ? '✓' : '✗';
info += ` [${{loaded}}] L${{layer.level}}: ${{layer.width}}×${{layer.height}}, Q${{layer.quality}}, ${{(layer.file_size / 1024).toFixed(1)}}KB\\n`;
}});
alert(info);
}}
}}
// ページ読み込み時に初期化
document.addEventListener('DOMContentLoaded', () => {{
window.imageLoader = new ProgressiveImageLoader();
}});
</script>
</body>
</html>'''
# HTMLファイルとして保存
html_path = self.output_dir / f"{output_name}_demo.html"
with open(html_path, 'w', encoding='utf-8') as f:
f.write(html)
return html
def _generate_index_html(self, images: List[Dict[str, any]]):
"""インデックスHTMLを生成"""
html = '''<!DOCTYPE html>
<html lang="ja">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>プログレッシブ画像ギャラリー</title>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
line-height: 1.6;
padding: 20px;
background: #f5f5f5;
color: #333;
}
.container {
max-width: 1200px;
margin: 0 auto;
}
header {
margin-bottom: 30px;
text-align: center;
}
h1 {
margin-bottom: 10px;
color: #2c3e50;
}
.description {
color: #666;
max-width: 800px;
margin: 0 auto 20px;
}
.image-grid {
display: grid;
grid-template-columns: repeat(auto-fill, minmax(300px, 1fr));
gap: 20px;
margin-top: 30px;
}
.image-card {
background: white;
border-radius: 8px;
overflow: hidden;
box-shadow: 0 4px 12px rgba(0,0,0,0.1);
transition: transform 0.3s ease, box-shadow 0.3s ease;
}
.image-card:hover {
transform: translateY(-5px);
box-shadow: 0 8px 24px rgba(0,0,0,0.15);
}
.image-preview {
width: 100%;
height: 200px;
object-fit: cover;
display: block;
}
.image-info {
padding: 15px;
}
.image-name {
font-weight: 600;
margin-bottom: 5px;
color: #2c3e50;
}
.image-stats {
font-size: 14px;
color: #666;
}
.view-button {
display: block;
margin-top: 10px;
padding: 8px 16px;
background: #3498db;
color: white;
text-decoration: none;
border-radius: 4px;
text-align: center;
transition: background 0.3s ease;
}
.view-button:hover {
background: #2980b9;
}
.stats-summary {
background: white;
padding: 20px;
border-radius: 8px;
margin-bottom: 30px;
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
}
.stats-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 15px;
margin-top: 15px;
}
.stat-item {
text-align: center;
}
.stat-value {
font-size: 24px;
font-weight: 600;
color: #2c3e50;
}
.stat-label {
font-size: 14px;
color: #666;
margin-top: 5px;
}
@media (max-width: 768px) {
.image-grid {
grid-template-columns: 1fr;
}
}
</style>
</head>
<body>
<div class="container">
<header>
<h1>プログレッシブ画像ギャラリー</h1>
<p class="description">
このページでは、プログレッシブ画像読み込みのデモを表示しています。
各画像は複数の解像度レイヤーを持ち、低解像度から高解像度へと段階的に読み込まれます。
</p>
</header>
<div class="stats-summary">
<h2>統計サマリー</h2>
<div class="stats-grid" id="statsGrid">
<!-- JavaScriptで動的に生成 -->
</div>
</div>
<div class="image-grid" id="imageGrid">
<!-- JavaScriptで動的に生成 -->
</div>
</div>
<script>
// 画像データ
const images = '''
images_json = json.dumps(images, indent=2)
html += images_json
html += ''';
// 統計の計算
function calculateStats() {
const totalImages = images.length;
const totalLayers = images.reduce((sum, img) => sum + img.layers, 0);
const totalSize = images.reduce((sum, img) => sum + img.total_size_kb, 0);
const avgLayers = totalLayers / totalImages;
const avgSize = totalSize / totalImages;
return {
totalImages,
totalLayers,
totalSize: totalSize.toFixed(1),
avgLayers: avgLayers.toFixed(1),
avgSize: avgSize.toFixed(1)
};
}
// 統計の表示
function renderStats() {
const stats = calculateStats();
const statsGrid = document.getElementById('statsGrid');
const statItems = [
{ label: '総画像数', value: stats.totalImages, unit: '枚' },
{ label: '総レイヤー数', value: stats.totalLayers, unit: '層' },
{ label: '総データサイズ', value: stats.totalSize, unit: 'KB' },
{ label: '平均レイヤー数', value: stats.avgLayers, unit: '層/枚' },
{ label: '平均サイズ', value: stats.avgSize, unit: 'KB/枚' }
];
statsGrid.innerHTML = statItems.map(item => `
<div class="stat-item">
<div class="stat-value">${item.value}</div>
<div class="stat-label">${item.label}${item.unit ? ` (${item.unit})` : ''}</div>
</div>
`).join('');
}
// 画像グリッドの表示
function renderImageGrid() {
const imageGrid = document.getElementById('imageGrid');
imageGrid.innerHTML = images.map(img => `
<div class="image-card">
<img src="./${img.name}_L2.webp" alt="${img.name}" class="image-preview">
<div class="image-info">
<div class="image-name">${img.name}</div>
<div class="image-stats">
${img.layers}レイヤー, ${img.total_size_kb.toFixed(1)}KB
</div>
<a href="./${img.name}_demo.html" class="view-button">詳細表示</a>
</div>
</div>
`).join('');
}
// 初期化
document.addEventListener('DOMContentLoaded', () => {
renderStats();
renderImageGrid();
});
</script>
</body>
</html>'''
index_path = self.output_dir / "index.html"
with open(index_path, 'w', encoding='utf-8') as f:
f.write(html)
print(f"\nインデックスページを生成: {index_path}")
# 使用例
def main():
"""プログレッシブ画像生成の使用例"""
# ジェネレーターの作成
generator = ProgressiveImageGenerator(
output_dir="progressive_output",
levels=3,
base_width=1600
)
print("=== 単一画像のプログレッシブ化 ===")
# 単一画像をプログレッシブ化
result = generator.generate_progressive_image(
image_path="sample_photo.jpg",
output_name="sample_photo",
format="WEBP",
base_quality=80,
use_blur_placeholder=True
)
if result['success']:
print(f"生成成功:")
print(f" 画像名: {result['output_name']}")
print(f" レイヤー数: {result['layers']}")
print(f" 総サイズ: {result['total_size_kb']:.1f} KB")
print(f" マニフェスト: {result['manifest_path']}")
print(f" HTMLデモ: progressive_output/{result['output_name']}_demo.html")
print("\nレイヤー詳細:")
for layer_info in result['layers_info']:
print(f" レベル {layer_info['level']}: {layer_info['resolution']}, "
f"Q{layer_info['quality']}, {layer_info['size_kb']:.1f}KB")
else:
print(f"生成失敗: {result['error']}")
print("\n=== ディレクトリの一括処理 ===")
# ディレクトリ内の画像を一括処理
results = generator.generate_directory(
input_dir="images",
extensions=['.jpg', '.png'],
recursive=True
)
print(f"\n処理完了:")
print(f" 総ファイル数: {results['total']}")
print(f" 成功: {results['success']}")
print(f" 失敗: {results['failed']}")
if results['success'] > 0:
print(f"\n生成されたHTML:")
print(f" ギャラリー: progressive_output/index.html")
print(f" 各画像のデモ: progressive_output/*_demo.html")
if __name__ == "__main__":
main()
問題12:クラウド画像処理パイプライン
import os
import json
import time
import hashlib
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
import redis
import boto3
from botocore.exceptions import ClientError
from celery import Celery
from PIL import Image, ImageOps
import io
# Redis設定
REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
# AWS S3設定
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
AWS_REGION = os.getenv('AWS_REGION', 'ap-northeast-1')
S3_BUCKET = os.getenv('S3_BUCKET', 'image-processing-pipeline')
# Celeryアプリの作成
celery_app = Celery(
'image_pipeline',
broker=REDIS_URL,
backend=REDIS_URL
)
# 設定
celery_app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Asia/Tokyo',
enable_utc=True,
task_track_started=True,
task_time_limit=300, # 5分
task_soft_time_limit=240 # 4分
)
@dataclass
class ImageMetadata:
"""画像メタデータ"""
image_id: str
original_filename: str
file_size: int
width: int
height: int
format: str
uploaded_at: str
user_id: Optional[str] = None
tags: Optional[List[str]] = None
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass
class ProcessingJob:
"""処理ジョブ"""
job_id: str
image_id: str
status: str # 'pending', 'processing', 'completed', 'failed'
steps: List[str]
created_at: str
started_at: Optional[str] = None
completed_at: Optional[str] = None
error_message: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass
class ProcessingResult:
"""処理結果"""
job_id: str
image_id: str
processed_files: Dict[str, str] # {サイズ: S3パス}
thumbnail_urls: Dict[str, str] # {サイズ: URL}
cdn_urls: Dict[str, str] # {サイズ: CDN URL}
metadata: Dict[str, Any]
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
class StorageManager:
"""ストレージ管理クラス"""
def __init__(self):
self.s3_client = boto3.client(
's3',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=AWS_REGION
)
self.bucket = S3_BUCKET
def upload_file(
self,
file_data: bytes,
s3_key: str,
content_type: str = 'image/jpeg',
metadata: Optional[Dict[str, str]] = None
) -> str:
"""ファイルをS3にアップロード"""
try:
extra_args = {
'ContentType': content_type,
'Metadata': metadata or {}
}
self.s3_client.put_object(
Bucket=self.bucket,
Key=s3_key,
Body=file_data,
**extra_args
)
return f"s3://{self.bucket}/{s3_key}"
except ClientError as e:
raise Exception(f"S3アップロードエラー: {e}")
def generate_presigned_url(
self,
s3_key: str,
expiration: int = 3600
) -> str:
"""署名付きURLを生成"""
try:
url = self.s3_client.generate_presigned_url(
'get_object',
Params={
'Bucket': self.bucket,
'Key': s3_key
},
ExpiresIn=expiration
)
return url
except ClientError as e:
raise Exception(f"署名付きURL生成エラー: {e}")
def delete_file(self, s3_key: str) -> bool:
"""ファイルをS3から削除"""
try:
self.s3_client.delete_object(
Bucket=self.bucket,
Key=s3_key
)
return True
except ClientError:
return False
def get_file_metadata(self, s3_key: str) -> Optional[Dict[str, Any]]:
"""ファイルメタデータを取得"""
try:
response = self.s3_client.head_object(
Bucket=self.bucket,
Key=s3_key
)
return {
'content_type': response.get('ContentType'),
'content_length': response.get('ContentLength'),
'last_modified': response.get('LastModified'),
'metadata': response.get('Metadata', {})
}
except ClientError:
return None
class RedisManager:
"""Redis管理クラス"""
def __init__(self):
self.redis_client = redis.Redis.from_url(REDIS_URL)
def store_metadata(self, image_id: str, metadata: ImageMetadata) -> bool:
"""メタデータを保存"""
try:
key = f"image:{image_id}:metadata"
self.redis_client.set(
key,
json.dumps(metadata.to_dict()),
ex=86400 * 7 # 7日間有効
)
return True
except Exception as e:
print(f"Redis保存エラー: {e}")
return False
def get_metadata(self, image_id: str) -> Optional[ImageMetadata]:
"""メタデータを取得"""
try:
key = f"image:{image_id}:metadata"
data = self.redis_client.get(key)
if data:
metadata_dict = json.loads(data)
return ImageMetadata(**metadata_dict)
return None
except Exception:
return None
def create_job(self, job: ProcessingJob) -> bool:
"""ジョブを作成"""
try:
key = f"job:{job.job_id}"
self.redis_client.set(
key,
json.dumps(job.to_dict()),
ex=86400 # 1日間有効
)
# ジョブキューに追加
self.redis_client.lpush('job_queue', job.job_id)
return True
except Exception as e:
print(f"ジョブ作成エラー: {e}")
return False
def get_job(self, job_id: str) -> Optional[ProcessingJob]:
"""ジョブを取得"""
try:
key = f"job:{job_id}"
data = self.redis_client.get(key)
if data:
job_dict = json.loads(data)
return ProcessingJob(**job_dict)
return None
except Exception:
return None
def update_job_status(
self,
job_id: str,
status: str,
error_message: Optional[str] = None
) -> bool:
"""ジョブステータスを更新"""
try:
job = self.get_job(job_id)
if not job:
return False
if status == 'processing' and not job.started_at:
job.started_at = datetime.now().isoformat()
elif status in ['completed', 'failed'] and not job.completed_at:
job.completed_at = datetime.now().isoformat()
if error_message:
job.error_message = error_message
job.status = status
key = f"job:{job_id}"
self.redis_client.set(
key,
json.dumps(job.to_dict()),
ex=86400
)
return True
except Exception as e:
print(f"ジョブ更新エラー: {e}")
return False
def store_processing_result(self, result: ProcessingResult) -> bool:
"""処理結果を保存"""
try:
key = f"result:{result.job_id}"
self.redis_client.set(
key,
json.dumps(result.to_dict()),
ex=86400 * 3 # 3日間有効
)
return True
except Exception as e:
print(f"結果保存エラー: {e}")
return False
def get_processing_result(self, job_id: str) -> Optional[ProcessingResult]:
"""処理結果を取得"""
try:
key = f"result:{job_id}"
data = self.redis_client.get(key)
if data:
result_dict = json.loads(data)
return ProcessingResult(**result_dict)
return None
except Exception:
return None
class ImageProcessor:
"""画像処理クラス"""
# サムネイルサイズの定義
THUMBNAIL_SIZES = {
'xs': (100, 100), # 超小さい
'sm': (300, 300), # 小さい
'md': (600, 600), # 中くらい
'lg': (1200, 1200), # 大きい
'xl': (1920, 1920) # 特大
}
# 出力フォーマット
OUTPUT_FORMAT = 'WEBP'
OUTPUT_QUALITY = 85
def __init__(self, storage_manager: StorageManager):
self.storage = storage_manager
def process_image(
self,
image_data: bytes,
image_id: str,
original_filename: str
) -> ProcessingResult:
"""画像を処理"""
try:
# 画像を開く
image = Image.open(io.BytesIO(image_data))
# メタデータの抽出
metadata = self._extract_metadata(image, image_id, original_filename, len(image_data))
# 画像の向きを修正
image = ImageOps.exif_transpose(image)
# 処理済みファイルのパス
processed_files = {}
thumbnail_urls = {}
cdn_urls = {}
# 各サイズの画像を生成
for size_name, dimensions in self.THUMBNAIL_SIZES.items():
# 画像をコピーしてリサイズ
processed_image = image.copy()
processed_image.thumbnail(dimensions, Image.Resampling.LANCZOS)
# メモリに保存
output_buffer = io.BytesIO()
if self.OUTPUT_FORMAT == 'JPEG' and processed_image.mode in ('RGBA', 'LA', 'P'):
# JPEGはアルファチャンネルに対応していない
background = Image.new('RGB', processed_image.size, (255, 255, 255))
if processed_image.mode == 'RGBA':
background.paste(processed_image, mask=processed_image.split()[3])
else:
background.paste(processed_image)
processed_image = background
processed_image.save(
output_buffer,
format=self.OUTPUT_FORMAT,
quality=self.OUTPUT_QUALITY,
optimize=True
)
output_data = output_buffer.getvalue()
# S3にアップロード
s3_key = f"processed/{image_id}/{size_name}.{self.OUTPUT_FORMAT.lower()}"
content_type = f'image/{self.OUTPUT_FORMAT.lower()}'
s3_path = self.storage.upload_file(
output_data,
s3_key,
content_type,
metadata={'image_id': image_id, 'size': size_name}
)
processed_files[size_name] = s3_path
# 署名付きURLを生成
signed_url = self.storage.generate_presigned_url(s3_key)
thumbnail_urls[size_name] = signed_url
# CDN URLを生成(実装に応じて変更)
cdn_url = f"https://cdn.example.com/{s3_key}"
cdn_urls[size_name] = cdn_url
# 元の画像も保存(必要に応じて)
original_key = f"original/{image_id}/{original_filename}"
self.storage.upload_file(
image_data,
original_key,
'image/jpeg',
metadata={'image_id': image_id, 'type': 'original'}
)
# 処理結果の作成
result = ProcessingResult(
job_id=f"job_{image_id}",
image_id=image_id,
processed_files=processed_files,
thumbnail_urls=thumbnail_urls,
cdn_urls=cdn_urls,
metadata=metadata.to_dict()
)
return result
except Exception as e:
raise Exception(f"画像処理エラー: {e}")
def _extract_metadata(
self,
image: Image.Image,
image_id: str,
original_filename: str,
file_size: int
) -> ImageMetadata:
"""メタデータを抽出"""
width, height = image.size
return ImageMetadata(
image_id=image_id,
original_filename=original_filename,
file_size=file_size,
width=width,
height=height,
format=image.format or 'UNKNOWN',
uploaded_at=datetime.now().isoformat(),
tags=self._generate_tags(image)
)
def _generate_tags(self, image: Image.Image) -> List[str]:
"""画像からタグを生成"""
tags = []
# サイズに基づくタグ
width, height = image.size
if width > height:
tags.append('landscape')
elif width < height:
tags.append('portrait')
else:
tags.append('square')
# 解像度に基づくタグ
megapixels = (width * height) / 1000000
if megapixels > 8:
tags.append('high-res')
elif megapixels > 2:
tags.append('medium-res')
else:
tags.append('low-res')
# カラーモードに基づくタグ
if image.mode == 'RGB':
tags.append('color')
elif image.mode == 'L':
tags.append('grayscale')
elif image.mode == 'RGBA':
tags.append('transparent')
return tags
# Celeryタスク
@celery_app.task(bind=True, name='process_image_task')
def process_image_task(self, image_data: bytes, image_id: str, original_filename: str) -> Dict[str, Any]:
"""画像処理タスク"""
redis_manager = RedisManager()
storage_manager = StorageManager()
processor = ImageProcessor(storage_manager)
# ジョブの作成
job = ProcessingJob(
job_id=self.request.id,
image_id=image_id,
status='processing',
steps=['upload', 'processing', 'cdn_deployment'],
created_at=datetime.now().isoformat(),
started_at=datetime.now().isoformat()
)
redis_manager.store_metadata(image_id, job)
redis_manager.update_job_status(job.job_id, 'processing')
try:
# 画像処理
result = processor.process_image(image_data, image_id, original_filename)
# 結果の保存
redis_manager.store_processing_result(result)
redis_manager.update_job_status(job.job_id, 'completed')
return {
'success': True,
'job_id': job.job_id,
'image_id': image_id,
'thumbnail_urls': result.thumbnail_urls,
'cdn_urls': result.cdn_urls,
'metadata': result.metadata
}
except Exception as e:
redis_manager.update_job_status(job.job_id, 'failed', str(e))
return {
'success': False,
'job_id': job.job_id,
'image_id': image_id,
'error': str(e)
}
@celery_app.task(name='cleanup_old_files_task')
def cleanup_old_files_task(days_old: int = 30):
"""古いファイルをクリーンアップ"""
storage_manager = StorageManager()
redis_manager = RedisManager()
# ここでは簡易実装。実際にはS3のライフサイクルポリシーを使用することを推奨
print(f"Cleaning up files older than {days_old} days...")
# 実際の実装では、S3のリスト操作と削除を行う
return {'cleaned': 0, 'skipped': 0}
class ImageProcessingPipeline:
"""画像処理パイプラインの管理クラス"""
def __init__(self):
self.redis_manager = RedisManager()
self.storage_manager = StorageManager()
self.processor = ImageProcessor(self.storage_manager)
def upload_and_process(
self,
file_data: bytes,
filename: str,
user_id: Optional[str] = None
) -> Dict[str, Any]:
"""ファイルをアップロードして処理を開始"""
try:
# 画像IDの生成
image_id = self._generate_image_id(file_data, filename)
# メタデータの作成
metadata = self._create_initial_metadata(image_id, filename, len(file_data), user_id)
# メタデータを保存
self.redis_manager.store_metadata(image_id, metadata)
# 非同期処理タスクを開始
task = process_image_task.delay(file_data, image_id, filename)
# ジョブの作成
job = ProcessingJob(
job_id=task.id,
image_id=image_id,
status='pending',
steps=['upload', 'processing', 'cdn_deployment'],
created_at=datetime.now().isoformat()
)
self.redis_manager.create_job(job)
return {
'success': True,
'image_id': image_id,
'job_id': task.id,
'task_id': task.id,
'status_url': f"/api/jobs/{task.id}/status"
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
def get_job_status(self, job_id: str) -> Dict[str, Any]:
"""ジョブのステータスを取得"""
job = self.redis_manager.get_job(job_id)
if not job:
return {
'success': False,
'error': 'Job not found'
}
result = None
if job.status == 'completed':
result = self.redis_manager.get_processing_result(job_id)
return {
'success': True,
'job': job.to_dict(),
'result': result.to_dict() if result else None
}
def get_image_urls(self, image_id: str, size: str = 'md') -> Dict[str, Any]:
"""画像のURLを取得"""
# 実際の実装では、Redisから結果を検索
# ここでは簡易実装
s3_key = f"processed/{image_id}/{size}.webp"
try:
url = self.storage_manager.generate_presigned_url(s3_key)
return {
'success': True,
'image_id': image_id,
'size': size,
'url': url,
'cdn_url': f"https://cdn.example.com/{s3_key}"
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
def _generate_image_id(self, file_data: bytes, filename: str) -> str:
"""画像IDを生成"""
# ファイル内容とファイル名からハッシュを生成
hash_input = file_data + filename.encode()
return hashlib.sha256(hash_input).hexdigest()[:16]
def _create_initial_metadata(
self,
image_id: str,
filename: str,
file_size: int,
user_id: Optional[str]
) -> ImageMetadata:
"""初期メタデータを作成"""
return ImageMetadata(
image_id=image_id,
original_filename=filename,
file_size=file_size,
width=0, # 後で更新
height=0, # 後で更新
format='UNKNOWN', # 後で更新
uploaded_at=datetime.now().isoformat(),
user_id=user_id,
tags=[]
)
# Web API(Flaskを使用)
from flask import Flask, request, jsonify
import uuid
app = Flask(__name__)
pipeline = ImageProcessingPipeline()
@app.route('/api/upload', methods=['POST'])
def upload_image():
"""画像アップロードAPI"""
if 'image' not in request.files:
return jsonify({'error': 'No image file'}), 400
file = request.files['image']
user_id = request.form.get('user_id')
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
# ファイルを読み込み
file_data = file.read()
# パイプラインで処理
result = pipeline.upload_and_process(file_data, file.filename, user_id)
if result['success']:
return jsonify(result), 202 # Accepted
else:
return jsonify(result), 500
@app.route('/api/jobs//status', methods=['GET'])
def get_job_status(job_id: str):
"""ジョブステータス取得API"""
result = pipeline.get_job_status(job_id)
if result['success']:
return jsonify(result), 200
else:
return jsonify(result), 404
@app.route('/api/images//url', methods=['GET'])
def get_image_url(image_id: str):
"""画像URL取得API"""
size = request.args.get('size', 'md')
result = pipeline.get_image_urls(image_id, size)
if result['success']:
return jsonify(result), 200
else:
return jsonify(result), 404
@app.route('/api/health', methods=['GET'])
def health_check():
"""ヘルスチェック"""
return jsonify({
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'services': {
'redis': 'connected', # 実際には接続チェックを行う
's3': 'connected',
'celery': 'running'
}
}), 200
# ワーカーの起動スクリプト
if __name__ == '__main__':
# 開発サーバー
app.run(debug=True, port=5000)
# ワーカーの起動方法:
# celery -A pipeline worker --loglevel=info
# celery -A pipeline beat --loglevel=info # 定期タスク用