欢迎回来!🎉 这节课我们要学习网络编程——如何使用Python进行HTTP请求、调用API,让你的程序连接到互联网!
前十节课我们学会了: - 变量和基本数据类型 - 条件判断和循环 - 列表和字典 - 函数(代码的魔法师) - 文件操作(数据持久化) - 模块和包(代码组织) - 面向对象编程(OOP) - 异常和错误处理 - 正则表达式 - 日期和时间处理
这节课我们要学习如何与网络交互!
HTTP(HyperText Transfer Protocol,超文本传输协议)是客户端和服务器之间通信的规则。
| 方法 | 说明 | 用途 |
|---|---|---|
| GET | 获取资源 | 查询数据 |
| POST | 创建资源 | 提交数据 |
| PUT | 更新资源 | 完整更新 |
| PATCH | 部分更新 | 部分更新 |
| DELETE | 删除资源 | 删除数据 |
| 状态码 | 说明 |
|---|---|
| 200 | OK,成功 |
| 201 | Created,创建成功 |
| 400 | Bad Request,请求错误 |
| 401 | Unauthorized,未授权 |
| 404 | Not Found,未找到 |
| 500 | Internal Server Error,服务器错误 |
pip install requests
import requests
import requests
# 发送GET请求
response = requests.get("https://api.github.com")
# 查看响应状态
print("状态码:", response.status_code)
print("是否成功:", response.ok)
# 查看响应头
print("\n响应头:")
for key, value in response.headers.items():
print(f" {key}: {value}")
# 获取响应内容
print("\n响应内容(文本):")
print(response.text[:500]) # 只显示前500个字符
import requests
# 方式1:在URL中拼接参数
response = requests.get("https://httpbin.org/get?name=小明&age=18")
# 方式2:使用params参数(推荐)
params = {
"name": "小明",
"age": 18,
"city": "北京"
}
response = requests.get("https://httpbin.org/get", params=params)
print("完整URL:", response.url)
print("\n响应:")
print(response.json())
import requests
# 调用GitHub API
response = requests.get("https://api.github.com/users/octocat")
# 检查请求是否成功
if response.status_code == 200:
data = response.json() # 解析JSON
print("用户名:", data['login'])
print("姓名:", data['name'])
print("公司:", data['company'])
print("位置:", data['location'])
print("博客:", data['blog'])
print("公开仓库数:", data['public_repos'])
print("关注者:", data['followers'])
print("关注中:", data['following'])
else:
print(f"请求失败,状态码: {response.status_code}")
import requests
# 表单数据
data = {
"username": "admin",
"password": "secret123"
}
response = requests.post("https://httpbin.org/post", data=data)
print("状态码:", response.status_code)
print("\n响应:")
print(response.json())
import requests
# JSON数据
data = {
"name": "小明",
"age": 18,
"hobbies": ["读书", "游泳", "编程"]
}
response = requests.post("https://httpbin.org/post", json=data)
print("状态码:", response.status_code)
print("\n响应:")
print(response.json())
import requests
headers = {
"User-Agent": "My-Python-App/1.0",
"Accept": "application/json",
"Authorization": "Bearer YOUR_TOKEN_HERE"
}
response = requests.get("https://httpbin.org/headers", headers=headers)
print("发送的请求头:")
print(response.json())
import requests
from requests.auth import HTTPBasicAuth
# 方式1
response = requests.get(
"https://httpbin.org/basic-auth/user/pass",
auth=("user", "pass")
)
# 方式2
response = requests.get(
"https://httpbin.org/basic-auth/user/pass",
auth=HTTPBasicAuth("user", "pass")
)
print("状态码:", response.status_code)
print("响应:", response.json())
import requests
try:
response = requests.get("https://api.github.com", timeout=5)
response.raise_for_status() # 如果状态码>=400,抛出异常
print("请求成功!")
print(response.json())
except requests.exceptions.ConnectionError:
print("❌ 连接错误:无法连接到服务器")
except requests.exceptions.Timeout:
print("❌ 请求超时")
except requests.exceptions.HTTPError as e:
print(f"❌ HTTP错误: {e}")
except requests.exceptions.RequestException as e:
print(f"❌ 请求异常: {e}")
import requests
# 设置连接超时和读取超时
try:
response = requests.get(
"https://api.github.com",
timeout=(3, 10) # (连接超时, 读取超时)
)
print("请求成功")
except requests.exceptions.Timeout:
print("请求超时")
import requests
# 上传文件
files = {
"file": open("test.txt", "rb")
}
response = requests.post("https://httpbin.org/post", files=files)
print("状态码:", response.status_code)
import requests
# 下载小文件
url = "https://httpbin.org/image/png"
response = requests.get(url)
with open("image.png", "wb") as f:
f.write(response.content)
print("文件下载完成!")
# 下载大文件(流式下载)
url = "https://example.com/large_file.zip"
response = requests.get(url, stream=True)
with open("large_file.zip", "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print("大文件下载完成!")
import requests
# 创建Session
session = requests.Session()
# 设置Session级别的请求头
session.headers.update({
"User-Agent": "My-Python-App/1.0"
})
# 使用Session发送请求(会保持Cookie)
response1 = session.get("https://httpbin.org/cookies/set?name=value")
response2 = session.get("https://httpbin.org/cookies")
print("Cookie:", response2.json())
# 关闭Session
session.close()
让我们创建一个天气查询器!
import requests
class WeatherFetcher:
def __init__(self):
# 使用免费的天气API
self.base_url = "https://wttr.in"
def get_weather(self, city, format="text"):
"""获取天气"""
try:
if format == "json":
url = f"{self.base_url}/{city}?format=j1"
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
else:
url = f"{self.base_url}/{city}?m" # m=公制单位
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.text
except requests.exceptions.RequestException as e:
return f"获取天气失败: {e}"
def print_weather(self, city):
"""打印天气"""
print(f"\n🌤️ {city} 的天气")
print("="*50)
weather_text = self.get_weather(city)
print(weather_text)
print("="*50)
def get_detailed_weather(self, city):
"""获取详细天气(JSON格式)"""
data = self.get_weather(city, format="json")
if isinstance(data, dict) and "current_condition" in data:
current = data["current_condition"][0]
print(f"\n📊 {city} 详细天气")
print("="*50)
print(f"温度: {current['temp_C']}°C")
print(f"体感温度: {current['FeelsLikeC']}°C")
print(f"天气: {current['weatherDesc'][0]['value']}")
print(f"湿度: {current['humidity']}%")
print(f"风速: {current['windspeedKmph']} km/h")
print(f"能见度: {current['visibility']} km")
print("="*50)
else:
print("无法获取详细天气信息")
# 使用天气查询器
if __name__ == "__main__":
print("🌤️ 天气查询器")
print("="*50)
weather = WeatherFetcher()
# 查询几个城市的天气
cities = ["Beijing", "Shanghai", "Guangzhou", "Shenzhen"]
for city in cities:
weather.print_weather(city)
print("\n" + "="*50 + "\n")
# 显示详细天气
weather.get_detailed_weather("Beijing")
让我们创建一个GitHub仓库浏览器!
import requests
from datetime import datetime
class GitHubBrowser:
def __init__(self, token=None):
self.base_url = "https://api.github.com"
self.headers = {}
if token:
self.headers["Authorization"] = f"token {token}"
def get_user(self, username):
"""获取用户信息"""
try:
url = f"{self.base_url}/users/{username}"
response = requests.get(url, headers=self.headers, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return None
def get_user_repos(self, username, sort="updated", per_page=30):
"""获取用户的仓库"""
try:
url = f"{self.base_url}/users/{username}/repos"
params = {
"sort": sort,
"per_page": per_page
}
response = requests.get(url, headers=self.headers, params=params, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return []
def get_repo(self, owner, repo):
"""获取仓库信息"""
try:
url = f"{self.base_url}/repos/{owner}/{repo}"
response = requests.get(url, headers=self.headers, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return None
def print_user_profile(self, username):
"""打印用户资料"""
user = self.get_user(username)
if not user:
return
print("\n" + "="*60)
print(f" 👤 {user['name'] or user['login']}")
print("="*60)
print(f" 用户名: {user['login']}")
print(f" 姓名: {user['name'] or '未设置'}")
print(f" 公司: {user['company'] or '未设置'}")
print(f" 位置: {user['location'] or '未设置'}")
print(f" 博客: {user['blog'] or '未设置'}")
print(f" 简介: {user['bio'] or '未设置'}")
print(f"\n 🏷️ 统计:")
print(f" 公开仓库: {user['public_repos']}")
print(f" 公开Gist: {user['public_gists']}")
print(f" 关注者: {user['followers']}")
print(f" 关注中: {user['following']}")
print(f" 📅 加入时间: {user['created_at'][:10]}")
print("="*60)
def print_user_repos(self, username, limit=10):
"""打印用户的仓库"""
repos = self.get_user_repos(username)
if not repos:
return
print(f"\n📦 {username} 的仓库(最近更新的前{limit}个):")
print("="*80)
for i, repo in enumerate(repos[:limit], 1):
print(f"\n{i}. 📁 {repo['name']}")
print(f" 描述: {repo['description'] or '无描述'}")
print(f" 语言: {repo['language'] or '未知'}")
print(f" ⭐ Star: {repo['stargazers_count']}")
print(f" 🍴 Fork: {repo['forks_count']}")
print(f" 👁️ Watch: {repo['watchers_count']}")
print(f" 🔗 {repo['html_url']}")
print(f" 📅 更新: {repo['updated_at'][:10]}")
print("\n" + "="*80)
# 使用GitHub浏览器
if __name__ == "__main__":
print("="*60)
print(" 🐙 GitHub仓库浏览器")
print("="*60)
browser = GitHubBrowser()
# 查看用户
username = "octocat"
browser.print_user_profile(username)
browser.print_user_repos(username, limit=5)
# 查看另一个用户
print("\n" + "="*60)
print(" 查看另一个用户...")
username2 = "python"
browser.print_user_profile(username2)
browser.print_user_repos(username2, limit=5)
让我们创建一个简易的网页爬虫!
import requests
from bs4 import BeautifulSoup
import time
from urllib.parse import urljoin, urlparse
class SimpleCrawler:
def __init__(self, base_url, max_pages=10):
self.base_url = base_url
self.max_pages = max_pages
self.visited = set()
self.results = []
def is_valid_url(self, url):
"""检查URL是否有效"""
parsed = urlparse(url)
return bool(parsed.netloc) and bool(parsed.scheme)
def get_links(self, soup, current_url):
"""提取页面中的链接"""
links = set()
for a_tag in soup.find_all("a", href=True):
href = a_tag["href"]
full_url = urljoin(current_url, href)
if self.is_valid_url(full_url):
# 只保留同一域名的链接
if urlparse(full_url).netloc == urlparse(self.base_url).netloc:
links.add(full_url)
return links
def crawl_page(self, url):
"""爬取单个页面"""
if url in self.visited:
return None
if len(self.visited) >= self.max_pages:
return None
try:
print(f"🔍 正在爬取: {url}")
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
self.visited.add(url)
soup = BeautifulSoup(response.text, "html.parser")
# 提取标题
title = soup.title.string if soup.title else "无标题"
# 提取正文(简化版)
paragraphs = soup.find_all("p")
content = "\n".join([p.get_text() for p in paragraphs[:5]])
# 提取链接
links = self.get_links(soup, url)
page_data = {
"url": url,
"title": title,
"content": content,
"links": list(links),
"crawled_at": time.strftime("%Y-%m-%d %H:%M:%S")
}
self.results.append(page_data)
return page_data
except requests.exceptions.RequestException as e:
print(f"❌ 爬取失败 {url}: {e}")
return None
def crawl(self):
"""开始爬取"""
print("="*60)
print(f" 🕷️ 简易爬虫 - 开始爬取: {self.base_url}")
print("="*60)
to_visit = [self.base_url]
while to_visit and len(self.visited) < self.max_pages:
url = to_visit.pop(0)
page_data = self.crawl_page(url)
if page_data:
# 将新链接加入待访问队列
for link in page_data["links"]:
if link not in self.visited and link not in to_visit:
to_visit.append(link)
# 延时,避免请求过快
time.sleep(1)
print("\n" + "="*60)
print(f" ✅ 爬取完成!共爬取 {len(self.results)} 个页面")
print("="*60)
def print_results(self):
"""打印结果"""
print("\n📊 爬取结果:")
print("="*80)
for i, result in enumerate(self.results, 1):
print(f"\n{i}. {result['title']}")
print(f" URL: {result['url']}")
print(f" 时间: {result['crawled_at']}")
print(f" 链接数: {len(result['links'])}")
if result['content']:
print(f" 内容预览: {result['content'][:100]}...")
print("\n" + "="*80)
# 使用爬虫
if __name__ == "__main__":
# 注意:请仅爬取允许爬取的网站
# 这里使用一个测试网站
crawler = SimpleCrawler("https://example.com", max_pages=3)
crawler.crawl()
crawler.print_results()
今天我们学会了:
✅ HTTP基础:方法、状态码、请求响应
✅ requests库:Python最流行的HTTP库
✅ GET请求:获取资源,带参数
✅ POST请求:提交数据,表单和JSON
✅ 请求头和认证:自定义请求头,基础认证
✅ 错误处理:异常处理,超时设置
✅ 文件上传下载:处理文件
✅ 会话保持:Session对象
✅ 实战项目:天气查询器、GitHub浏览器、简易爬虫
下节课预告:我们会学习实战项目,把前面学的所有知识综合起来!
恭喜你!你已经完成了所有11课的学习!🎉 下节课是最后的实战项目!