requestsライブラリを使ったHTTP通信
2025-11-17はじめに
現代のプログラミングにおいて、Webサービスとの連携は不可欠なスキルです。PythonでWeb通信を扱う際、最も広く使われているライブラリがrequestsです。このライブラリを使うことで、複雑なHTTP通信を簡単かつ直感的に扱えるようになります。
本記事では、Pythonの基本的な文法を習得した方が、requestsライブラリを使ってWebとの通信を始められるように、基礎から実践までを詳しく解説します。
requestsライブラリとは
requestsは、PythonでHTTP通信を行うためのシンプルでエレガントなライブラリです。標準ライブラリのurllibと比べて、より直感的で使いやすいAPIを提供しています。Web APIとの連携、Webスクレイピング、データ取得など、様々な場面で活用されています。
インストール
まずはrequestsライブラリをインストールしましょう。pipを使用して簡単にインストールできます。
※pipコマンドは、Pythonのパッケージ管理システムです。サードパーティ製ライブラリを簡単に導入・管理するための標準ツールです。
pip install requests
インストールが完了したら、Pythonスクリプトでインポートして使用できます。
import requests
基本的なHTTPリクエスト
GETリクエスト – データの取得
GETリクエストは、サーバーからデータを取得する際に使用します。最も基本的なリクエスト方法です。
requestsモジュールをインポートして、GitHubのAPIエンドポイントに対して基本的なGETリクエストを送信する処理を実装しています。まずrequests.getメソッドを使用して指定されたURLにHTTP GETリクエストを送信し、そのレスポンスをresponse変数に格納します。次にレスポンスのステータスコードをフォーマット文字列を用いて表示し、最後にレスポンスの本文内容をテキスト形式で出力します。これによりAPIからの応答を確認することができます。
import requests
# 基本的なGETリクエスト
response = requests.get('https://api.github.com')
# ステータスコードの確認
print(f"ステータスコード: {response.status_code}")
# レスポンス内容の表示
print("レスポンス内容:")
print(response.text)
レスポンスの扱い方
requestsが返すレスポンスオブジェクトには、様々な有用な情報が含まれています。
requestsライブラリをインポートしてGitHub APIに対してHTTP GETリクエストを実行し、詳細なレスポンス情報を取得するものです。リクエスト送信後、ステータスコードを最初に表示し、特に200が成功を意味することを注記しています。次にレスポンスヘッダーの全てのキーと値をループ処理で一つずつ表示し、続いてレスポンス内容のテキストデータを先頭500文字に限定して出力します。さらにレスポンスのエンコーディング方式と実際にリクエストが送信されたURL情報もそれぞれ表示します。これによりHTTPレスポンスの多角的な分析が可能になります。
import requests
response = requests.get('https://api.github.com')
# ステータスコード(200は成功を意味します)
print(f"ステータスコード: {response.status_code}")
# レスポンスヘッダー
print("レスポンスヘッダー:")
for key, value in response.headers.items():
print(f" {key}: {value}")
# レスポンス内容(テキスト形式)
print(f"レスポンステキスト(最初の500文字): {response.text[:500]}")
# エンコーディングの確認
print(f"エンコーディング: {response.encoding}")
# URLの確認
print(f"リクエストURL: {response.url}")
パラメータの付与
実際のWeb APIを使用する際には、URLにパラメータを付加してリクエストすることがよくあります。
クエリパラメータの使用
requestsライブラリを使用してパラメータ付きのGETリクエストを送信しています。まず検索クエリとして「python programming」を、結果制限数を10件に、ページ指定を1ページ目とするパラメータを辞書形式で定義しています。次にこれらのパラメータをparams引数に指定してAPIエンドポイントにGETリクエストを送信し、そのレスポンスを受け取っています。最後に実際にリクエストされた完全なURLとサーバーからのステータスコードを表示することで、リクエストの詳細と結果を確認しています。
import requests
# パラメータを辞書で定義
params = {
'q': 'python programming',
'limit': 10,
'page': 1
}
# パラメータ付きのGETリクエスト
response = requests.get('https://api.example.com/search', params=params)
print(f"実際にリクエストされたURL: {response.url}")
print(f"ステータスコード: {response.status_code}")
実際のAPIを使用した例
実際に動作する例として、JSONPlaceholderというテスト用APIを使用してみましょう。
requestsライブラリを用いてJSONPlaceholderというテスト用APIから投稿データを取得する処理を行っています。まず指定されたURLにGETリクエストを送信し、レスポンスのステータスコードが200(成功)であるかどうかを確認します。もし成功していれば、レスポンスのJSONデータをPythonのオブジェクトに変換して投稿データとして変数に格納します。その後、取得した投稿の総数を表示し、最初の3件の投稿についてループ処理を行い、各投稿のタイトルと本文の先頭50文字を番号付きで表示します。もしステータスコードが200以外の場合はエラーメッセージを表示するようになっています。
import requests
# テスト用APIからデータを取得
response = requests.get('https://jsonplaceholder.typicode.com/posts')
# ステータスコードの確認
if response.status_code == 200:
# JSONデータをPythonオブジェクトに変換
posts = response.json()
print(f"取得した投稿数: {len(posts)}")
# 最初の3件の投稿を表示
for i, post in enumerate(posts[:3]):
print(f"\n投稿 {i+1}:")
print(f" タイトル: {post['title']}")
print(f" 本文: {post['body'][:50]}...")
else:
print(f"エラーが発生しました: {response.status_code}")
さまざまなHTTPメソッド
POSTリクエスト – データの送信
POSTリクエストは、サーバーにデータを送信する際に使用します。
requestsライブラリとjsonモジュールをインポートして、新しい投稿データをAPI経由で送信する処理を実装しています。まずタイトルと本文、ユーザーIDを含む辞書形式のデータを用意し、その後requests.postメソッドを使用してJSONPlaceholderのAPIエンドポイントにPOSTリクエストを送信します。ここでjsonパラメータを使用することでデータが自動的にJSON形式に変換されます。リクエスト送信後、ステータスコードを確認し、201(作成成功)が返された場合には、レスポンスのJSONデータを解析して作成された投稿のIDとタイトルを表示します。
import requests
import json
# 送信するデータ
new_post = {
'title': '私の新しい投稿',
'body': 'これはrequestsライブラリを使って作成された投稿です。',
'userId': 1
}
# POSTリクエストの送信
response = requests.post(
'https://jsonplaceholder.typicode.com/posts',
json=new_post # jsonパラメータを使用すると自動でJSONに変換されます
)
print(f"ステータスコード: {response.status_code}")
if response.status_code == 201: # 201は作成成功を意味します
created_post = response.json()
print("作成された投稿:")
print(f" ID: {created_post['id']}")
print(f" タイトル: {created_post['title']}")
PUTとDELETEリクエスト
requestsライブラリを使用して、REST APIに対するPUTリクエストとDELETEリクエストを連続して実行するものです。まず更新用のデータとしてタイトルと本文を含む辞書を定義し、そのデータをPUTメソッドで指定したURLのリソース(IDが1の投稿)に送信して更新処理を行います。その後、同じURLに対してDELETEメソッドを実行してリソースの削除処理を行います。各リクエストの後にステータスコードを出力することで、更新と削除の操作が正常に受け付けられたかどうかを確認することができます。
import requests
# PUTリクエスト - データの更新
updated_data = {
'title': '更新されたタイトル',
'body': '更新された本文内容'
}
response = requests.put(
'https://jsonplaceholder.typicode.com/posts/1',
json=updated_data
)
print(f"PUTリクエストのステータスコード: {response.status_code}")
# DELETEリクエスト - データの削除
response = requests.delete('https://jsonplaceholder.typicode.com/posts/1')
print(f"DELETEリクエストのステータスコード: {response.status_code}")
エラーハンドリング
実際のアプリケーションでは、ネットワークエラーやサーバーエラーに対処する必要があります。
基本的なエラーハンドリング
requestsライブラリを使用して安全なGETリクエストを実行する関数を定義しています。safe_get_request関数は指定されたURLとオプションパラメータを受け取り、10秒のタイムアウト設定を付けてGETリクエストを送信します。内部では様々な例外処理を実装しており、タイムアウト発生時にはリクエストがタイムアウトしたことを通知し、接続エラー時にはネットワーク接続エラーを報告します。またHTTPエラーが発生した場合は詳細なエラーメッセージを表示し、その他の一般的なリクエスト例外も捕捉します。リクエストが成功した場合はレスポンスオブジェクトを返し、失敗した場合はNoneを返します。最後に関数の使用例としてGitHub APIから特定ユーザーのデータを取得し、ユーザー名とプロフィール情報を表示する処理が示されています。
import requests
from requests.exceptions import RequestException
def safe_get_request(url, params=None):
"""
安全なGETリクエストを実行する関数
"""
try:
response = requests.get(url, params=params, timeout=10)
# ステータスコードのチェック
response.raise_for_status() # 400番台、500番台のステータスコードで例外を発生
return response
except requests.exceptions.Timeout:
print("リクエストがタイムアウトしました")
return None
except requests.exceptions.ConnectionError:
print("ネットワーク接続エラーが発生しました")
return None
except requests.exceptions.HTTPError as e:
print(f"HTTPエラーが発生しました: {e}")
return None
except RequestException as e:
print(f"リクエスト中にエラーが発生しました: {e}")
return None
# 使用例
response = safe_get_request('https://api.github.com/users/octocat')
if response:
user_data = response.json()
print(f"ユーザー名: {user_data.get('login')}")
print(f"プロフィール: {user_data.get('bio')}")
ステータスコードに基づいた処理
requestsライブラリを使用してHTTPレスポンスを適切に処理する関数を定義しています。handle_response関数はレスポンスオブジェクトを受け取り、ステータスコードに基づいて異なる処理を行います。ステータスコードが200の場合はリクエスト成功としてJSONデータを解析して返し、400の場合はリクエストの不正を通知し、404の場合はリソース不存在を報告し、500の場合はサーバーエラーを伝えます。それ以外のステータスコードの場合も予期しないコードとして処理します。関数の使用例では具体的なAPIエンドポイントにGETリクエストを送信し、そのレスポンスをhandle_response関数で処理して、データが正常に取得できた場合のみ結果を表示する流れが示されています。
import requests
def handle_response(response):
"""
レスポンスに基づいて適切な処理を行う
"""
if response.status_code == 200:
print("リクエスト成功")
return response.json()
elif response.status_code == 400:
print("リクエストが不正です")
return None
elif response.status_code == 404:
print("リソースが見つかりません")
return None
elif response.status_code == 500:
print("サーバーエラーが発生しました")
return None
else:
print(f"予期しないステータスコード: {response.status_code}")
return None
# 使用例
response = requests.get('https://jsonplaceholder.typicode.com/posts/1')
result = handle_response(response)
if result:
print(f"取得したデータ: {result}")
ヘッダーのカスタマイズ
requestsライブラリを使用してカスタムヘッダーを設定したAPIリクエストを送信する例です。まずUser-Agentでアプリケーション名とバージョンを、Authorizationでベアラートークンを用いた認証情報を、Content-TypeでJSON形式のデータを送信することを明示するヘッダーを辞書形式で定義しています。その後、これらのカスタムヘッダーをheadersパラメータとして指定してGitHub APIのユーザー情報取得エンドポイントにGETリクエストを送信します。レスポンスのステータスコードが200(成功)の場合には取得したユーザー情報を表示し、それ以外の場合は認証エラーとしてステータスコードを表示する処理を行っています。
import requests
# カスタムヘッダーの設定
headers = {
'User-Agent': 'MyPythonApp/1.0',
'Authorization': 'Bearer your_token_here',
'Content-Type': 'application/json'
}
response = requests.get(
'https://api.github.com/user',
headers=headers
)
if response.status_code == 200:
user_info = response.json()
print(f"ユーザー情報: {user_info}")
else:
print(f"認証エラー: {response.status_code}")
セッションの使用
複数のリクエストを送信する際、セッションを使用すると効率的です。
requestsライブラリのSession機能を使用して効率的なHTTP通信を行う例です。with文を使用してセッションオブジェクトを作成し、コンテキストマネージャーとして利用することで、セッションの終了時にリソースが適切に解放されるようにしています。セッション内ではUser-AgentとAcceptヘッダーを設定して、全てのリクエストに共通して適用されるようにしています。同じセッションを使用して二つの連続したGETリクエストを異なるリソース(IDが1と2の投稿)に対して送信し、各リクエストのステータスコードをそれぞれ表示しています。セッションを使用することで接続の再利用やクッキーの保持などの利点が得られます。
import requests
# セッションの作成
with requests.Session() as session:
# セッション全体に適用する設定
session.headers.update({
'User-Agent': 'MyPythonApp/1.0',
'Accept': 'application/json'
})
# 同じセッションで複数のリクエストを送信
response1 = session.get('https://jsonplaceholder.typicode.com/posts/1')
response2 = session.get('https://jsonplaceholder.typicode.com/posts/2')
print(f"最初のリクエストステータス: {response1.status_code}")
print(f"2番目のリクエストステータス: {response2.status_code}")
ファイルのアップロード
requestsライブラリを使用してファイルをアップロードする処理を行っています。まず’example.txt’というファイルをバイナリ読み込みモードで開き、filesディクショナリに格納しています。その後、requests.postメソッドを用いて指定されたURLにPOSTリクエストを送信し、filesパラメータとして先ほど準備したファイルデータを渡しています。これによりサーバーに対してファイルのアップロード処理が実行され、そのレスポンスのステータスコードを表示してアップロードの成功可否を確認しています。
import requests
# ファイルのアップロード
files = {'file': open('example.txt', 'rb')}
response = requests.post('https://httpbin.org/post', files=files)
print(f"アップロードステータス: {response.status_code}")
OpenWeatherMap API
OpenWeatherMap APIから天気情報を取得する関数を定義しています。get_weather関数は都市名とAPIキーを受け取り、ベースURLにパラメータとして都市名、APIキー、摂氏単位、日本語表示を指定してリクエストを送信します。リクエストは10秒のタイムアウト設定付きで実行され、成功した場合はレスポンスのJSONデータから都市名、天気状況、気温、湿度、風速といった主要な気象情報を抽出して辞書形式で返します。例外処理ではリクエスト関連のエラーを捕捉してエラーメッセージを表示し、Noneを返します。使用例のコメントでは東京の天気情報を取得する方法が示されており、実際に使用するには有効なAPIキーが必要であることが注記されています。
import requests
import json
def get_weather(city_name, api_key):
"""
OpenWeatherMap APIから天気情報を取得する
"""
base_url = "http://api.openweathermap.org/data/2.5/weather"
params = {
'q': city_name,
'appid': api_key,
'units': 'metric', # 摂氏で取得
'lang': 'ja' # 日本語で取得
}
try:
response = requests.get(base_url, params=params, timeout=10)
response.raise_for_status()
weather_data = response.json()
# 天気情報の抽出
weather_info = {
'都市': weather_data['name'],
'天気': weather_data['weather'][0]['description'],
'気温': weather_data['main']['temp'],
'湿度': weather_data['main']['humidity'],
'風速': weather_data['wind']['speed']
}
return weather_info
except requests.exceptions.RequestException as e:
print(f"天気情報の取得中にエラーが発生しました: {e}")
return None
# 使用例(実際のAPIキーが必要です)
# weather = get_weather('Tokyo', 'your_api_key_here')
# if weather:
# for key, value in weather.items():
# print(f"{key}: {value}")
タイムアウトの設定
常にタイムアウトを設定し、リクエストが永遠に待機しないようにしましょう。requestsライブラリを使用する際のタイムアウト設定のベストプラクティスを示しています。良い例ではリクエストに10秒のタイムアウトを明示的に設定してます。
# 良い例
response = requests.get('https://api.example.com', timeout=10)
# 避けるべき例
response = requests.get('https://api.example.com') # タイムアウトなし
リトライメカニズムの実装
次のコードは、リトライ機能を備えた堅牢なHTTPリクエスト関数を定義しています。robust_request関数は最大3回のリトライを行い、各失敗後に指数バックオフで待機時間を増やしながら再試行します。リクエストが成功するか最大試行回数に達するまで処理を繰り返し、最終的に失敗した場合は例外を発生させます。
import requests
from time import sleep
def robust_request(url, max_retries=3):
"""
リトライ機能付きのリクエスト
"""
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
print(f"試行 {attempt + 1} 失敗: {e}")
if attempt < max_retries - 1:
sleep(2 ** attempt) # 指数バックオフ
else:
raise e
# 使用例
try:
response = robust_request('https://api.github.com')
print("リクエスト成功")
except Exception as e:
print(f"全ての試行が失敗しました: {e}")
環境変数による設定管理
環境変数からAPIキーとベースURLを安全に取得する方法を示しています。WEATHER_API_KEY環境変数から認証キーを取得し、API_BASE_URLが設定されていない場合はデフォルトURLを使用します。APIキーが未設定の場合はエラーを発生させ、設定されている場合はBearer認証を用いてAPIリクエストを送信し、結果をJSON形式で返します。
import os
import requests
# 環境変数からAPIキーを取得
API_KEY = os.getenv('WEATHER_API_KEY')
BASE_URL = os.getenv('API_BASE_URL', 'https://api.example.com')
def get_api_data(endpoint):
"""
環境変数を使用したAPIリクエスト
"""
if not API_KEY:
raise ValueError("APIキーが設定されていません")
url = f"{BASE_URL}/{endpoint}"
headers = {'Authorization': f'Bearer {API_KEY}'}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
return response.json()
まとめ
requestsライブラリはPythonにおいてHTTP通信を実現するための強力かつ直感的なツールとして広く利用されています。
まず基本的なHTTPメソッドであるGET、POST、PUT、DELETEの使用方法を習得しました。さらにリクエストを効果的にカスタマイズするためのパラメータとヘッダーの設定方法について理解を深め、ネットワークエラーやサーバーエラーといった様々な異常状況に対処するエラーハンドリング技法です。
これらの基礎をマスターすることで、Web APIとの連携、データ収集、サービス統合など、様々な実践的なアプリケーションを開発できるようになります。次のステップでは、REST APIの設計原則や、取得したデータの処理方法について学んでいきましょう。
実際にコードを書いて試してみることが最も効果的な学習方法です。公開されている無料のAPI(JSONPlaceholder、GitHub APIなど)を使って、実際にリクエストを送信してみましょう。
演習問題
初級問題(3問)
初級1: 基本的なGETリクエスト
"""
問題1: Pythonの公式ドキュメントサイト(https://docs.python.org/3/)に
GETリクエストを送信し、以下の情報を取得してください。
1. ステータスコード
2. レスポンスのエンコーディング
3. レスポンスヘッダーの'Content-Type'
4. レスポンス内容の最初の200文字
注意: 適切なエラーハンドリングを実装してください。
"""
import requests
# ここにコードを書いてください
初級2: パラメータ付きリクエスト
"""
問題2: JSONPlaceholder APIを使用して、以下の操作を行ってください。
1. ユーザーIDが1の投稿をすべて取得
2. 投稿IDが1のコメントをすべて取得
3. タイトルに特定のキーワードを含む投稿を検索
ベースURL: https://jsonplaceholder.typicode.com
エンドポイント: /posts, /comments
パラメータ: userId, postId, title
"""
import requests
# ここにコードを書いてください
初級3: 基本的なPOSTリクエスト
"""
問題3: JSONPlaceholderの/postsエンドポイントに新しい投稿を作成してください。
送信するデータ:
- title: "My First API Request"
- body: "Learning requests library is fun!"
- userId: 1
以下の情報を表示してください:
1. レスポンスステータスコード
2. 作成された投稿のID(応答から取得)
3. 応答の全文
"""
import requests
# ここにコードを書いてください
中級問題(6問)
中級1: エラーハンドリングとリトライ
"""
問題1: リクエスト関数を作成し、以下の機能を実装してください。
1. タイムアウトを5秒に設定
2. 接続エラー時のリトライ機能(最大3回)
3. ステータスコードのチェック(200以外はエラー扱い)
4. 各種例外の適切な処理
使用するURL: https://httpstat.us/500(テスト用)
"""
import requests
import time
def robust_request(url, retries=3, timeout=5):
# ここにコードを書いてください
pass
# テスト
result = robust_request("https://httpstat.us/200")
print(result)
中級2: セッション管理と認証
"""
問題2: requests.Sessionを使用して、複数の関連リクエストを処理してください。
1. セッションを作成し、共通ヘッダーを設定
2. まず/usersエンドポイントでユーザーリストを取得
3. 取得した最初のユーザーの詳細情報を取得
4. そのユーザーの投稿を取得
ベースURL: https://jsonplaceholder.typicode.com
"""
import requests
# ここにコードを書いてください
中級3: ファイルのアップロードとダウンロード
"""
問題3: 以下のファイル操作を実装してください。
1. 小さな画像ファイルをダウンロードして保存
2. テキストファイルを作成してアップロード(httpbin.orgを使用)
3. ダウンロード進捗状況の表示
使用するURL:
- ダウンロード: https://httpbin.org/image/jpeg
- アップロード: https://httpbin.org/post
"""
import requests
import os
# ここにコードを書いてください
中級4: APIレスポンスの分析と処理
"""
問題4: GitHub APIを使用して以下の情報を取得してください。
1. 特定のユーザー(例: 'torvalds')の公開リポジトリリスト
2. 各リポジトリのスター数、フォーク数、言語
3. 最もスターの多いリポジトリトップ5を表示
4. 言語別のリポジトリ数を集計
エンドポイント: https://api.github.com/users/{username}/repos
"""
import requests
from collections import defaultdict
# ここにコードを書いてください
中級5: カスタムヘッダーとUser-Agent
"""
問題5: カスタムヘッダーを使用してリクエストを送信してください。
1. カスタムUser-Agentを設定
2. Accept-Languageヘッダーを設定
3. リファラーヘッダーを設定
4. レスポンスヘッダーを分析
使用するURL: https://httpbin.org/headers
"""
import requests
# ここにコードを書いてください
中級6: パラメータの動的生成とページネーション
"""
問題6: ページネーションに対応したAPIリクエスト関数を作成してください。
1. ページネーションパラメータを受け取る
2. 全ページのデータを取得して結合
3. 取得件数の制限オプションを追加
4. 進捗状況を表示
使用するAPI: JSONPlaceholderの/posts(実際はページネーションなしですが、模擬実装)
"""
import requests
def get_paginated_data(base_url, limit=None):
# ここにコードを書いてください
pass
# 使用例
data = get_paginated_data("https://jsonplaceholder.typicode.com/posts")
print(f"取得件数: {len(data)}")
上級問題(3問)
上級1: 非同期リクエストとパフォーマンス最適化
"""
問題1: 複数のURLに対して並行してリクエストを送信する関数を作成してください。
1. concurrent.futuresを使用した並行処理
2. 各リクエストの処理時間計測
3. エラーハンドリング
4. 結果の集計と分析
使用するURLリスト:
- https://jsonplaceholder.typicode.com/posts/1
- https://jsonplaceholder.typicode.com/posts/2
- https://jsonplaceholder.typicode.com/posts/3
- https://jsonplaceholder.typicode.com/posts/4
- https://jsonplaceholder.typicode.com/posts/5
"""
import requests
import concurrent.futures
import time
def fetch_url(url):
# ここにコードを書いてください
pass
def concurrent_requests(urls):
# ここにコードを書いてください
pass
# テスト
urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 6)]
results = concurrent_requests(urls)
上級2: OAuth認証とAPIレート制限の処理
"""
問題2: 認証が必要なAPI(模擬)との連携を実装してください。
1. OAuth2.0の認証フロー(模擬)の実装
2. アクセストークンの取得と管理
3. APIレート制限の検出と対応
4. トークンの自動更新
模擬APIエンドポイント:
- 認証: https://httpbin.org/bearer
- データ取得: https://httpbin.org/json
"""
import requests
import time
class AuthenticatedAPIClient:
def __init__(self, client_id, client_secret):
self.client_id = client_id
self.client_secret = client_secret
self.access_token = None
self.token_expiry = None
self.rate_limit_remaining = None
self.rate_limit_reset = None
def authenticate(self):
# ここに認証処理を書いてください
pass
def make_authenticated_request(self, url):
# ここに認証済みリクエスト処理を書いてください
pass
def handle_rate_limiting(self):
# ここにレート制限処理を書いてください
pass
# 使用例
client = AuthenticatedAPIClient("test_id", "test_secret")
response = client.make_authenticated_request("https://httpbin.org/json")
上級3: Webスクレイピングとデータ抽出
"""
問題3: 実際のWebサイトからデータをスクレイピングするシステムを構築してください。
1. セッション管理とクッキー処理
2. BeautifulSoupを使用したHTML解析
3. データの正規化とクリーニング
4. 結果のJSONファイルへの保存
ターゲットURL: Python公式サイトの最新ニュース
"""
import requests
from bs4 import BeautifulSoup
import json
import re
class WebScraper:
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
def fetch_news(self, url):
# ここにスクレイピングコードを書いてください
pass
def parse_news(self, html):
# ここに解析コードを書いてください
pass
def save_to_json(self, data, filename):
# ここに保存コードを書いてください
pass
# 使用例
scraper = WebScraper()
news = scraper.fetch_news("https://www.python.org/blogs/")
scraper.save_to_json(news, "python_news.json")
解答作成のヒント
初級問題のヒント
- 基本的なrequestsのメソッド(get, post)を使用
- ステータスコードのチェックを忘れずに
- パラメータは辞書形式で渡す
中級問題のヒント
- Sessionオブジェクトで接続を維持
- 適切なエラーハンドリングの実装
- 進捗表示にはレスポンスのiter_contentを使用
上級問題のヒント
- concurrent.futuresで並行処理を実現
- レート制限にはX-RateLimit-*ヘッダーを確認
- BeautifulSoupでCSSセレクターを使用
演習問題 解答例
初級問題 解答
初級1: 基本的なGETリクエスト
import requests
def basic_get_request():
try:
# GETリクエストの送信
response = requests.get('https://docs.python.org/3/', timeout=10)
# ステータスコードの確認
print(f"1. ステータスコード: {response.status_code}")
# レスポンスのエンコーディング
print(f"2. エンコーディング: {response.encoding}")
# レスポンスヘッダーの'Content-Type'
content_type = response.headers.get('Content-Type', '見つかりません')
print(f"3. Content-Type: {content_type}")
# レスポンス内容の最初の200文字
print(f"4. レスポンスの最初の200文字: {response.text[:200]}")
except requests.exceptions.RequestException as e:
print(f"エラーが発生しました: {e}")
# 実行
basic_get_request()
初級2: パラメータ付きリクエスト
import requests
def parameterized_requests():
base_url = "https://jsonplaceholder.typicode.com"
try:
# 1. ユーザーIDが1の投稿をすべて取得
params1 = {'userId': 1}
response1 = requests.get(f"{base_url}/posts", params=params1)
if response1.status_code == 200:
posts = response1.json()
print(f"ユーザー1の投稿数: {len(posts)}")
print(f"最初の投稿タイトル: {posts[0]['title']}")
# 2. 投稿IDが1のコメントをすべて取得
params2 = {'postId': 1}
response2 = requests.get(f"{base_url}/comments", params=params2)
if response2.status_code == 200:
comments = response2.json()
print(f"投稿1のコメント数: {len(comments)}")
print(f"最初のコメント: {comments[0]['body'][:50]}...")
# 3. タイトルに特定のキーワードを含む投稿を検索
# JSONPlaceholderは完全一致のみなので、全件取得してフィルタリング
response3 = requests.get(f"{base_url}/posts")
if response3.status_code == 200:
all_posts = response3.json()
keyword = "aut"
filtered_posts = [post for post in all_posts if keyword in post['title'].lower()]
print(f"タイトルに '{keyword}' を含む投稿数: {len(filtered_posts)}")
for post in filtered_posts[:3]: # 最初の3件を表示
print(f" - {post['title']}")
except requests.exceptions.RequestException as e:
print(f"エラーが発生しました: {e}")
# 実行
parameterized_requests()
初級3: 基本的なPOSTリクエスト
import requests
def basic_post_request():
url = "https://jsonplaceholder.typicode.com/posts"
# 送信データ
new_post = {
'title': "My First API Request",
'body': "Learning requests library is fun!",
'userId': 1
}
try:
# POSTリクエストの送信
response = requests.post(url, json=new_post)
# 1. レスポンスステータスコード
print(f"1. ステータスコード: {response.status_code}")
if response.status_code == 201:
response_data = response.json()
# 2. 作成された投稿のID
print(f"2. 作成された投稿ID: {response_data.get('id')}")
# 3. 応答の全文
print("3. 応答の全文:")
print(response.text)
else:
print(f"エラー: 期待されるステータスコード201ではなく{response.status_code}でした")
except requests.exceptions.RequestException as e:
print(f"リクエスト中にエラーが発生しました: {e}")
# 実行
basic_post_request()
中級問題 解答
中級1: エラーハンドリングとリトライ
import requests
import time
def robust_request(url, retries=3, timeout=5):
"""
堅牢なリクエスト関数
"""
for attempt in range(retries):
try:
print(f"試行 {attempt + 1}/{retries}: {url}")
# リクエスト送信
response = requests.get(url, timeout=timeout)
# ステータスコードのチェック
response.raise_for_status()
print("リクエスト成功")
return response
except requests.exceptions.Timeout:
print(f"タイムアウトしました(試行 {attempt + 1})")
except requests.exceptions.ConnectionError:
print(f"接続エラー(試行 {attempt + 1})")
except requests.exceptions.HTTPError as e:
print(f"HTTPエラー: {e}(試行 {attempt + 1})")
# 400番台のクライアントエラーはリトライしない
if 400 <= response.status_code < 500:
break
except requests.exceptions.RequestException as e:
print(f"リクエストエラー: {e}(試行 {attempt + 1})")
# 最終試行でなければ待機
if attempt < retries - 1:
wait_time = 2 ** attempt # 指数バックオフ
print(f"{wait_time}秒待機して再試行...")
time.sleep(wait_time)
print("全ての試行が失敗しました")
return None
# テスト
print("=== 成功するリクエスト ===")
result1 = robust_request("https://httpstat.us/200")
if result1:
print(f"ステータスコード: {result1.status_code}")
print("\n=== 失敗するリクエスト ===")
result2 = robust_request("https://httpstat.us/500")
中級2: セッション管理と認証
import requests
def session_management():
base_url = "https://jsonplaceholder.typicode.com"
# セッションの作成
with requests.Session() as session:
# 共通ヘッダーの設定
session.headers.update({
'User-Agent': 'MyPythonApp/1.0',
'Accept': 'application/json'
})
try:
# 1. ユーザーリストの取得
users_response = session.get(f"{base_url}/users")
if users_response.status_code == 200:
users = users_response.json()
print(f"ユーザー数: {len(users)}")
# 最初のユーザーを取得
first_user = users[0]
user_id = first_user['id']
print(f"最初のユーザー: {first_user['name']} (ID: {user_id})")
# 2. ユーザーの詳細情報を取得
user_detail_response = session.get(f"{base_url}/users/{user_id}")
if user_detail_response.status_code == 200:
user_detail = user_detail_response.json()
print(f"メール: {user_detail['email']}")
print(f"Webサイト: {user_detail['website']}")
# 3. ユーザーの投稿を取得
user_posts_response = session.get(f"{base_url}/posts", params={'userId': user_id})
if user_posts_response.status_code == 200:
user_posts = user_posts_response.json()
print(f"ユーザー{user_id}の投稿数: {len(user_posts)}")
# 最初の3件の投稿タイトルを表示
for i, post in enumerate(user_posts[:3]):
print(f" 投稿{i+1}: {post['title']}")
else:
print(f"ユーザーリストの取得に失敗: {users_response.status_code}")
except requests.exceptions.RequestException as e:
print(f"エラーが発生しました: {e}")
# 実行
session_management()
中級3: ファイルのアップロードとダウンロード
import requests
import os
def file_operations():
try:
# 1. 画像ファイルのダウンロード
print("画像をダウンロード中...")
image_url = "https://httpbin.org/image/jpeg"
image_response = requests.get(image_url, stream=True)
if image_response.status_code == 200:
# 進捗表示付きでファイル保存
total_size = int(image_response.headers.get('content-length', 0))
downloaded_size = 0
with open('downloaded_image.jpg', 'wb') as f:
for chunk in image_response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded_size += len(chunk)
if total_size > 0:
progress = (downloaded_size / total_size) * 100
print(f"ダウンロード進捗: {progress:.1f}%", end='\r')
print("\n画像のダウンロード完了: downloaded_image.jpg")
# 2. テキストファイルの作成とアップロード
print("\nテキストファイルを作成中...")
with open('sample.txt', 'w', encoding='utf-8') as f:
f.write("これはrequestsライブラリでアップロードするテストファイルです。\n")
f.write("Pythonの学習は楽しい!")
# ファイルのアップロード
print("ファイルをアップロード中...")
with open('sample.txt', 'rb') as f:
files = {'file': ('sample.txt', f, 'text/plain')}
upload_response = requests.post('https://httpbin.org/post', files=files)
if upload_response.status_code == 200:
upload_result = upload_response.json()
print("アップロード成功!")
print(f"アップロードされたファイル名: {upload_result['files']['file']}")
# クリーンアップ
if os.path.exists('sample.txt'):
os.remove('sample.txt')
print("一時ファイルを削除しました")
except requests.exceptions.RequestException as e:
print(f"エラーが発生しました: {e}")
except IOError as e:
print(f"ファイル操作エラー: {e}")
# 実行
file_operations()
中級4: APIレスポンスの分析と処理
import requests
from collections import defaultdict
def github_api_analysis():
username = "torvalds" # Linuxの生みの親!
try:
# ユーザーのリポジトリ情報を取得
url = f"https://api.github.com/users/{username}/repos"
response = requests.get(url, timeout=10)
if response.status_code == 200:
repos = response.json()
print(f"{username} の公開リポジトリ数: {len(repos)}")
# 言語別リポジトリ数を集計
language_stats = defaultdict(int)
# リポジトリ情報を処理
repo_data = []
for repo in repos:
repo_info = {
'name': repo['name'],
'stars': repo['stargazers_count'],
'forks': repo['forks_count'],
'language': repo['language'] or 'Unknown',
'description': repo['description'] or 'No description'
}
repo_data.append(repo_info)
language_stats[repo_info['language']] += 1
# 1. 最もスターの多いリポジトリトップ5
print("\n=== スター数トップ5 ===")
top_repos = sorted(repo_data, key=lambda x: x['stars'], reverse=True)[:5]
for i, repo in enumerate(top_repos, 1):
print(f"{i}. {repo['name']}")
print(f" スター: {repo['stars']}, フォーク: {repo['forks']}, 言語: {repo['language']}")
print(f" 説明: {repo['description'][:60]}...")
# 2. 言語別リポジトリ数
print("\n=== 言語別リポジトリ数 ===")
for lang, count in sorted(language_stats.items(), key=lambda x: x[1], reverse=True):
print(f"{lang}: {count}リポジトリ")
else:
print(f"APIリクエスト失敗: {response.status_code}")
if response.status_code == 403:
print("レート制限の可能性があります")
except requests.exceptions.RequestException as e:
print(f"エラーが発生しました: {e}")
# 実行
github_api_analysis()
中級5: カスタムヘッダーとUser-Agent
import requests
def custom_headers_request():
# カスタムヘッダーの設定
headers = {
'User-Agent': 'MyCustomPythonApp/2.0 (Learning HTTP Requests)',
'Accept-Language': 'ja-JP,ja;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Referer': 'https://www.example.com',
'Accept': 'application/json, text/plain, */*'
}
try:
# カスタムヘッダー付きでリクエスト送信
response = requests.get('https://httpbin.org/headers', headers=headers, timeout=10)
if response.status_code == 200:
result = response.json()
print("=== 送信したヘッダー ===")
sent_headers = result['headers']
for key, value in sent_headers.items():
if any(key.lower() in header.lower() for header in ['user-agent', 'accept', 'referer', 'language']):
print(f"{key}: {value}")
print("\n=== レスポンスヘッダー ===")
for key, value in response.headers.items():
if any(term in key.lower() for term in ['content', 'server', 'date']):
print(f"{key}: {value}")
else:
print(f"リクエスト失敗: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"エラーが発生しました: {e}")
# 実行
custom_headers_request()
中級6: パラメータの動的生成とページネーション
import requests
def get_paginated_data(base_url, limit=None):
"""
ページネーションに対応したデータ取得関数
"""
all_data = []
page = 1
per_page = 20 # 1ページあたりの件数
print("データ取得を開始...")
while True:
try:
# ページネーションパラメータ
params = {
'_page': page,
'_limit': per_page
}
print(f"ページ {page} を取得中...")
response = requests.get(base_url, params=params, timeout=10)
if response.status_code == 200:
page_data = response.json()
# データが空なら終了
if not page_data:
print("全てのデータを取得しました")
break
all_data.extend(page_data)
# 進捗表示
print(f" 取得済み: {len(all_data)}件")
# 制限チェック
if limit and len(all_data) >= limit:
all_data = all_data[:limit]
print(f"制限数 {limit} に達したため終了")
break
page += 1
# 実際のAPIではレート制限を考慮して少し待機
# time.sleep(0.1)
else:
print(f"ページ {page} の取得に失敗: {response.status_code}")
break
except requests.exceptions.RequestException as e:
print(f"ページ {page} の取得中にエラー: {e}")
break
return all_data
# 使用例
print("=== ページネーション実装のテスト ===")
data = get_paginated_data("https://jsonplaceholder.typicode.com/posts", limit=50)
print(f"\n最終的な取得件数: {len(data)}")
# 取得したデータのサンプル表示
if data:
print("\n=== 取得したデータのサンプル ===")
for i in range(min(3, len(data))):
print(f"{i+1}. {data[i]['title'][:50]}...")
上級問題 解答
上級1: 非同期リクエストとパフォーマンス最適化
import requests
import concurrent.futures
import time
def fetch_url(url):
"""
単一URLの取得処理
"""
start_time = time.time()
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
processing_time = time.time() - start_time
return {
'url': url,
'status': 'success',
'status_code': response.status_code,
'data': response.json(),
'processing_time': processing_time
}
except requests.exceptions.RequestException as e:
processing_time = time.time() - start_time
return {
'url': url,
'status': 'error',
'error': str(e),
'processing_time': processing_time
}
def concurrent_requests(urls, max_workers=5):
"""
並行リクエスト処理
"""
print(f"{len(urls)}個のURLを並行処理(最大{max_workers}同時)")
start_time = time.time()
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# すべてのURLを実行キューに追加
future_to_url = {executor.submit(fetch_url, url): url for url in urls}
# 完了したタスクから結果を収集
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
results.append(result)
status = "✓" if result['status'] == 'success' else "✗"
print(f"{status} {url} - {result['processing_time']:.2f}秒")
except Exception as e:
print(f"✗ {url} - 例外: {e}")
total_time = time.time() - start_time
print(f"\n=== 処理結果サマリー ===")
print(f"総処理時間: {total_time:.2f}秒")
print(f"成功: {len([r for r in results if r['status'] == 'success'])}件")
print(f"失敗: {len([r for r in results if r['status'] == 'error'])}件")
# パフォーマンス分析
if results:
avg_time = sum(r['processing_time'] for r in results) / len(results)
max_time = max(r['processing_time'] for r in results)
min_time = min(r['processing_time'] for r in results)
print(f"平均処理時間: {avg_time:.2f}秒")
print(f"最速: {min_time:.2f}秒, 最遅: {max_time:.2f}秒")
print(f"並行処理による時間節約: {(sum(r['processing_time'] for r in results) - total_time):.2f}秒")
return results
# テスト
urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 11)]
results = concurrent_requests(urls)
# 成功したリクエストのデータを表示
print("\n=== 成功リクエストのデータサンプル ===")
successful_results = [r for r in results if r['status'] == 'success']
for result in successful_results[:3]:
data = result['data']
print(f"URL: {result['url']}")
print(f"タイトル: {data['title'][:30]}...")
print(f"処理時間: {result['processing_time']:.2f}秒\n")
上級2: OAuth認証とAPIレート制限の処理
import requests
import time
import json
class AuthenticatedAPIClient:
def __init__(self, client_id, client_secret):
self.client_id = client_id
self.client_secret = client_secret
self.access_token = None
self.token_expiry = None
self.rate_limit_remaining = None
self.rate_limit_reset = None
def authenticate(self):
"""
模擬認証処理(実際のOAuth2.0フローを簡略化)
"""
print("認証処理を開始...")
# 実際の環境ではここで認証サーバーと通信
# 今回は模擬的なトークン発行
self.access_token = f"mock_token_{int(time.time())}"
self.token_expiry = time.time() + 3600 # 1時間後
print("認証成功")
return True
def check_token_expiry(self):
"""トークンの有効期限をチェック"""
if not self.access_token or not self.token_expiry:
return False
if time.time() >= self.token_expiry:
print("トークンの有効期限が切れています")
return False
return True
def handle_rate_limiting(self):
"""レート制限の処理"""
if self.rate_limit_remaining == 0:
reset_time = self.rate_limit_reset - time.time()
if reset_time > 0:
print(f"レート制限により{reset_time:.0f}秒待機します...")
time.sleep(reset_time + 1) # バッファを追加
return True
return False
def update_rate_limits(self, response):
"""レスポンスからレート制限情報を更新"""
self.rate_limit_remaining = int(response.headers.get('X-RateLimit-Remaining', 1000))
self.rate_limit_reset = int(response.headers.get('X-RateLimit-Reset', time.time() + 3600))
def make_authenticated_request(self, url, max_retries=3):
"""
認証済みリクエストの実行
"""
for attempt in range(max_retries):
try:
# トークンチェック
if not self.check_token_expiry():
print("再認証を実行...")
if not self.authenticate():
raise Exception("認証に失敗しました")
# レート制限チェック
if self.handle_rate_limiting():
continue
# 認証ヘッダーを設定
headers = {
'Authorization': f'Bearer {self.access_token}',
'User-Agent': 'AuthenticatedAPIClient/1.0'
}
print(f"リクエスト送信: {url} (試行 {attempt + 1})")
response = requests.get(url, headers=headers, timeout=10)
# レート制限情報の更新
self.update_rate_limits(response)
# ステータスコードチェック
if response.status_code == 401: # 未認証
print("トークンが無効です。再認証します...")
self.access_token = None
continue
elif response.status_code == 429: # レート制限
print("レート制限に達しました")
self.rate_limit_remaining = 0
continue
elif response.status_code == 200:
print("リクエスト成功")
return response.json()
else:
print(f"HTTPエラー: {response.status_code}")
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"リクエストエラー (試行 {attempt + 1}): {e}")
if attempt == max_retries - 1:
raise e
raise Exception("最大リトライ回数を超えました")
def get_protected_data(self):
"""保護されたデータを取得"""
try:
# 模擬APIエンドポイント(httpbinは実際には認証不要ですが、テスト用)
data = self.make_authenticated_request("https://httpbin.org/bearer")
return data
except Exception as e:
print(f"データ取得失敗: {e}")
return None
# 使用例
print("=== OAuth認証クライアントのテスト ===")
client = AuthenticatedAPIClient("test_client_id", "test_client_secret")
# 認証実行
if client.authenticate():
# 保護されたデータにアクセス
result = client.get_protected_data()
if result:
print("取得したデータ:")
print(json.dumps(result, indent=2, ensure_ascii=False))
# レート制限情報の表示
print(f"\nレート制限情報:")
print(f"残りリクエスト数: {client.rate_limit_remaining}")
print(f"制限リセット時間: {time.ctime(client.rate_limit_reset)}")
上級3: Webスクレイピングとデータ抽出
import requests
from bs4 import BeautifulSoup
import json
import re
from datetime import datetime
class WebScraper:
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'ja,en-US;q=0.7,en;q=0.3',
})
def fetch_news(self, url):
"""
ニュースページを取得
"""
try:
print(f"ページを取得中: {url}")
response = self.session.get(url, timeout=15)
response.raise_for_status()
# エンコーディングを設定
response.encoding = 'utf-8'
print("ページ取得成功")
return response.text
except requests.exceptions.RequestException as e:
print(f"ページ取得エラー: {e}")
return None
def parse_news(self, html):
"""
HTMLを解析してニュースデータを抽出
"""
soup = BeautifulSoup(html, 'html.parser')
news_items = []
print("HTML解析を開始...")
# Python公式ブログの構造に基づいてセレクターを調整
# 注意: 実際のサイト構造は変更される可能性があります
# ニュース項目の検索(複数の可能なセレクターを試す)
selectors = [
'.blog-widget li',
'.list-recent-posts li',
'.blog-list li',
'article.post'
]
news_elements = []
for selector in selectors:
news_elements = soup.select(selector)
if news_elements:
print(f"セレクター '{selector}' で {len(news_elements)} 件見つかりました")
break
if not news_elements:
print("ニュース項目が見つかりませんでした")
return news_items
for element in news_elements:
try:
news_item = {}
# タイトルの抽出
title_element = element.select_one('a')
if title_element:
news_item['title'] = title_element.get_text().strip()
news_item['url'] = title_element.get('href', '')
# 相対URLを絶対URLに変換
if news_item['url'] and news_item['url'].startswith('/'):
news_item['url'] = 'https://www.python.org' + news_item['url']
# 日付の抽出
date_element = element.select_one('.blog-date, time, .date')
if date_element:
news_item['date'] = date_element.get_text().strip()
# 説明の抽出
description_element = element.select_one('p, .description')
if description_element:
news_item['description'] = description_element.get_text().strip()[:200] + '...'
# データクリーニング
news_item = self.clean_news_data(news_item)
if news_item.get('title'): # タイトルがあるもののみ追加
news_items.append(news_item)
except Exception as e:
print(f"ニュース項目の解析中にエラー: {e}")
continue
print(f"{len(news_items)} 件のニュースを解析しました")
return news_items
def clean_news_data(self, news_item):
"""
ニュースデータのクリーニングと正規化
"""
# タイトルのクリーニング
if 'title' in news_item:
news_item['title'] = re.sub(r'\s+', ' ', news_item['title']).strip()
# 説明のクリーニング
if 'description' in news_item:
news_item['description'] = re.sub(r'\s+', ' ', news_item['description']).strip()
# 日付の正規化
if 'date' in news_item:
# 日付形式の標準化を試みる
news_item['date'] = self.normalize_date(news_item['date'])
return news_item
def normalize_date(self, date_string):
"""
日付文字列の正規化
"""
try:
# 様々な日付形式に対応
patterns = [
r'(\d{4}-\d{2}-\d{2})',
r'(\d{2}/\d{2}/\d{4})',
r'(\w+ \d{1,2}, \d{4})'
]
for pattern in patterns:
match = re.search(pattern, date_string)
if match:
return match.group(1)
return date_string.strip()
except:
return date_string
def save_to_json(self, data, filename):
"""
データをJSONファイルに保存
"""
try:
output_data = {
'scraped_at': datetime.now().isoformat(),
'source_url': 'https://www.python.org/blogs/',
'news_count': len(data),
'news_items': data
}
with open(filename, 'w', encoding='utf-8') as f:
json.dump(output_data, f, indent=2, ensure_ascii=False)
print(f"データを {filename} に保存しました")
except Exception as e:
print(f"ファイル保存エラー: {e}")
def scrape_news(self, url, output_file):
"""
ニューススクレイピングのメイン処理
"""
print("=== Webスクレイピング開始 ===")
# ページ取得
html = self.fetch_news(url)
if not html:
print("ページ取得に失敗したため終了します")
return
# HTML解析
news_data = self.parse_news(html)
if news_data:
# 結果の表示
print(f"\n=== 取得したニュース ({len(news_data)}件) ===")
for i, news in enumerate(news_data[:5], 1): # 最初の5件を表示
print(f"{i}. {news.get('title', 'タイトルなし')}")
if news.get('date'):
print(f" 日付: {news['date']}")
if news.get('description'):
print(f" 説明: {news['description'][:80]}...")
print()
# ファイル保存
self.save_to_json(news_data, output_file)
else:
print("ニュースデータを取得できませんでした")
# 使用例
if __name__ == "__main__":
scraper = WebScraper()
# Python公式ブログからニュースを取得
# 注意: 実際のサイト構造に合わせてURLを調整してください
scraper.scrape_news(
"https://www.python.org/blogs/",
"python_news.json"
)