← 返回首页 | ← 返回教程列表

Python学习教程 - 第十一课:网络编程,连接世界的桥梁!

欢迎回来!🎉 这节课我们要学习网络编程——如何使用Python进行HTTP请求、调用API,让你的程序连接到互联网!


复习一下前十节课

前十节课我们学会了: - 变量和基本数据类型 - 条件判断和循环 - 列表和字典 - 函数(代码的魔法师) - 文件操作(数据持久化) - 模块和包(代码组织) - 面向对象编程(OOP) - 异常和错误处理 - 正则表达式 - 日期和时间处理

这节课我们要学习如何与网络交互!


第一个知识点:HTTP基础

什么是HTTP?

HTTP(HyperText Transfer Protocol,超文本传输协议)是客户端和服务器之间通信的规则。

HTTP方法

方法 说明 用途
GET 获取资源 查询数据
POST 创建资源 提交数据
PUT 更新资源 完整更新
PATCH 部分更新 部分更新
DELETE 删除资源 删除数据

HTTP状态码

状态码 说明
200 OK,成功
201 Created,创建成功
400 Bad Request,请求错误
401 Unauthorized,未授权
404 Not Found,未找到
500 Internal Server Error,服务器错误

第二个知识点:requests库

安装requests

pip install requests

导入requests

import requests

第三个知识点:GET请求

基本GET请求

import requests

# 发送GET请求
response = requests.get("https://api.github.com")

# 查看响应状态
print("状态码:", response.status_code)
print("是否成功:", response.ok)

# 查看响应头
print("\n响应头:")
for key, value in response.headers.items():
    print(f"  {key}: {value}")

# 获取响应内容
print("\n响应内容(文本):")
print(response.text[:500])  # 只显示前500个字符

带参数的GET请求

import requests

# 方式1:在URL中拼接参数
response = requests.get("https://httpbin.org/get?name=小明&age=18")

# 方式2:使用params参数(推荐)
params = {
    "name": "小明",
    "age": 18,
    "city": "北京"
}
response = requests.get("https://httpbin.org/get", params=params)

print("完整URL:", response.url)
print("\n响应:")
print(response.json())

解析JSON响应

import requests

# 调用GitHub API
response = requests.get("https://api.github.com/users/octocat")

# 检查请求是否成功
if response.status_code == 200:
    data = response.json()  # 解析JSON

    print("用户名:", data['login'])
    print("姓名:", data['name'])
    print("公司:", data['company'])
    print("位置:", data['location'])
    print("博客:", data['blog'])
    print("公开仓库数:", data['public_repos'])
    print("关注者:", data['followers'])
    print("关注中:", data['following'])
else:
    print(f"请求失败,状态码: {response.status_code}")

第四个知识点:POST请求

发送表单数据

import requests

# 表单数据
data = {
    "username": "admin",
    "password": "secret123"
}

response = requests.post("https://httpbin.org/post", data=data)

print("状态码:", response.status_code)
print("\n响应:")
print(response.json())

发送JSON数据

import requests

# JSON数据
data = {
    "name": "小明",
    "age": 18,
    "hobbies": ["读书", "游泳", "编程"]
}

response = requests.post("https://httpbin.org/post", json=data)

print("状态码:", response.status_code)
print("\n响应:")
print(response.json())

第五个知识点:请求头和认证

自定义请求头

import requests

headers = {
    "User-Agent": "My-Python-App/1.0",
    "Accept": "application/json",
    "Authorization": "Bearer YOUR_TOKEN_HERE"
}

response = requests.get("https://httpbin.org/headers", headers=headers)

print("发送的请求头:")
print(response.json())

基础认证

import requests
from requests.auth import HTTPBasicAuth

# 方式1
response = requests.get(
    "https://httpbin.org/basic-auth/user/pass",
    auth=("user", "pass")
)

# 方式2
response = requests.get(
    "https://httpbin.org/basic-auth/user/pass",
    auth=HTTPBasicAuth("user", "pass")
)

print("状态码:", response.status_code)
print("响应:", response.json())

第六个知识点:错误处理和超时

异常处理

import requests

try:
    response = requests.get("https://api.github.com", timeout=5)
    response.raise_for_status()  # 如果状态码>=400,抛出异常

    print("请求成功!")
    print(response.json())

except requests.exceptions.ConnectionError:
    print("❌ 连接错误:无法连接到服务器")
except requests.exceptions.Timeout:
    print("❌ 请求超时")
except requests.exceptions.HTTPError as e:
    print(f"❌ HTTP错误: {e}")
except requests.exceptions.RequestException as e:
    print(f"❌ 请求异常: {e}")

超时设置

import requests

# 设置连接超时和读取超时
try:
    response = requests.get(
        "https://api.github.com",
        timeout=(3, 10)  # (连接超时, 读取超时)
    )
    print("请求成功")
except requests.exceptions.Timeout:
    print("请求超时")

第七个知识点:文件上传和下载

文件上传

import requests

# 上传文件
files = {
    "file": open("test.txt", "rb")
}

response = requests.post("https://httpbin.org/post", files=files)

print("状态码:", response.status_code)

文件下载

import requests

# 下载小文件
url = "https://httpbin.org/image/png"
response = requests.get(url)

with open("image.png", "wb") as f:
    f.write(response.content)

print("文件下载完成!")

# 下载大文件(流式下载)
url = "https://example.com/large_file.zip"
response = requests.get(url, stream=True)

with open("large_file.zip", "wb") as f:
    for chunk in response.iter_content(chunk_size=8192):
        f.write(chunk)

print("大文件下载完成!")

第八个知识点:会话(Session)

使用Session保持连接

import requests

# 创建Session
session = requests.Session()

# 设置Session级别的请求头
session.headers.update({
    "User-Agent": "My-Python-App/1.0"
})

# 使用Session发送请求(会保持Cookie)
response1 = session.get("https://httpbin.org/cookies/set?name=value")
response2 = session.get("https://httpbin.org/cookies")

print("Cookie:", response2.json())

# 关闭Session
session.close()

综合练习1:天气查询器

让我们创建一个天气查询器!

import requests

class WeatherFetcher:
    def __init__(self):
        # 使用免费的天气API
        self.base_url = "https://wttr.in"

    def get_weather(self, city, format="text"):
        """获取天气"""
        try:
            if format == "json":
                url = f"{self.base_url}/{city}?format=j1"
                response = requests.get(url, timeout=10)
                response.raise_for_status()
                return response.json()
            else:
                url = f"{self.base_url}/{city}?m"  # m=公制单位
                response = requests.get(url, timeout=10)
                response.raise_for_status()
                return response.text

        except requests.exceptions.RequestException as e:
            return f"获取天气失败: {e}"

    def print_weather(self, city):
        """打印天气"""
        print(f"\n🌤️  {city} 的天气")
        print("="*50)

        weather_text = self.get_weather(city)
        print(weather_text)
        print("="*50)

    def get_detailed_weather(self, city):
        """获取详细天气(JSON格式)"""
        data = self.get_weather(city, format="json")

        if isinstance(data, dict) and "current_condition" in data:
            current = data["current_condition"][0]
            print(f"\n📊 {city} 详细天气")
            print("="*50)
            print(f"温度: {current['temp_C']}°C")
            print(f"体感温度: {current['FeelsLikeC']}°C")
            print(f"天气: {current['weatherDesc'][0]['value']}")
            print(f"湿度: {current['humidity']}%")
            print(f"风速: {current['windspeedKmph']} km/h")
            print(f"能见度: {current['visibility']} km")
            print("="*50)
        else:
            print("无法获取详细天气信息")

# 使用天气查询器
if __name__ == "__main__":
    print("🌤️  天气查询器")
    print("="*50)

    weather = WeatherFetcher()

    # 查询几个城市的天气
    cities = ["Beijing", "Shanghai", "Guangzhou", "Shenzhen"]

    for city in cities:
        weather.print_weather(city)
        print("\n" + "="*50 + "\n")

    # 显示详细天气
    weather.get_detailed_weather("Beijing")

综合练习2:GitHub仓库浏览器

让我们创建一个GitHub仓库浏览器!

import requests
from datetime import datetime

class GitHubBrowser:
    def __init__(self, token=None):
        self.base_url = "https://api.github.com"
        self.headers = {}
        if token:
            self.headers["Authorization"] = f"token {token}"

    def get_user(self, username):
        """获取用户信息"""
        try:
            url = f"{self.base_url}/users/{username}"
            response = requests.get(url, headers=self.headers, timeout=10)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"请求失败: {e}")
            return None

    def get_user_repos(self, username, sort="updated", per_page=30):
        """获取用户的仓库"""
        try:
            url = f"{self.base_url}/users/{username}/repos"
            params = {
                "sort": sort,
                "per_page": per_page
            }
            response = requests.get(url, headers=self.headers, params=params, timeout=10)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"请求失败: {e}")
            return []

    def get_repo(self, owner, repo):
        """获取仓库信息"""
        try:
            url = f"{self.base_url}/repos/{owner}/{repo}"
            response = requests.get(url, headers=self.headers, timeout=10)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"请求失败: {e}")
            return None

    def print_user_profile(self, username):
        """打印用户资料"""
        user = self.get_user(username)
        if not user:
            return

        print("\n" + "="*60)
        print(f"  👤 {user['name'] or user['login']}")
        print("="*60)
        print(f"  用户名: {user['login']}")
        print(f"  姓名: {user['name'] or '未设置'}")
        print(f"  公司: {user['company'] or '未设置'}")
        print(f"  位置: {user['location'] or '未设置'}")
        print(f"  博客: {user['blog'] or '未设置'}")
        print(f"  简介: {user['bio'] or '未设置'}")
        print(f"\n  🏷️  统计:")
        print(f"    公开仓库: {user['public_repos']}")
        print(f"    公开Gist: {user['public_gists']}")
        print(f"    关注者: {user['followers']}")
        print(f"    关注中: {user['following']}")
        print(f"  📅  加入时间: {user['created_at'][:10]}")
        print("="*60)

    def print_user_repos(self, username, limit=10):
        """打印用户的仓库"""
        repos = self.get_user_repos(username)
        if not repos:
            return

        print(f"\n📦 {username} 的仓库(最近更新的前{limit}个):")
        print("="*80)

        for i, repo in enumerate(repos[:limit], 1):
            print(f"\n{i}. 📁 {repo['name']}")
            print(f"   描述: {repo['description'] or '无描述'}")
            print(f"   语言: {repo['language'] or '未知'}")
            print(f"   ⭐ Star: {repo['stargazers_count']}")
            print(f"   🍴 Fork: {repo['forks_count']}")
            print(f"   👁️  Watch: {repo['watchers_count']}")
            print(f"   🔗 {repo['html_url']}")
            print(f"   📅 更新: {repo['updated_at'][:10]}")

        print("\n" + "="*80)

# 使用GitHub浏览器
if __name__ == "__main__":
    print("="*60)
    print("  🐙 GitHub仓库浏览器")
    print("="*60)

    browser = GitHubBrowser()

    # 查看用户
    username = "octocat"
    browser.print_user_profile(username)
    browser.print_user_repos(username, limit=5)

    # 查看另一个用户
    print("\n" + "="*60)
    print("  查看另一个用户...")

    username2 = "python"
    browser.print_user_profile(username2)
    browser.print_user_repos(username2, limit=5)

综合练习3:简易爬虫

让我们创建一个简易的网页爬虫!

import requests
from bs4 import BeautifulSoup
import time
from urllib.parse import urljoin, urlparse

class SimpleCrawler:
    def __init__(self, base_url, max_pages=10):
        self.base_url = base_url
        self.max_pages = max_pages
        self.visited = set()
        self.results = []

    def is_valid_url(self, url):
        """检查URL是否有效"""
        parsed = urlparse(url)
        return bool(parsed.netloc) and bool(parsed.scheme)

    def get_links(self, soup, current_url):
        """提取页面中的链接"""
        links = set()

        for a_tag in soup.find_all("a", href=True):
            href = a_tag["href"]
            full_url = urljoin(current_url, href)

            if self.is_valid_url(full_url):
                # 只保留同一域名的链接
                if urlparse(full_url).netloc == urlparse(self.base_url).netloc:
                    links.add(full_url)

        return links

    def crawl_page(self, url):
        """爬取单个页面"""
        if url in self.visited:
            return None

        if len(self.visited) >= self.max_pages:
            return None

        try:
            print(f"🔍 正在爬取: {url}")

            headers = {
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
            }

            response = requests.get(url, headers=headers, timeout=10)
            response.raise_for_status()

            self.visited.add(url)

            soup = BeautifulSoup(response.text, "html.parser")

            # 提取标题
            title = soup.title.string if soup.title else "无标题"

            # 提取正文(简化版)
            paragraphs = soup.find_all("p")
            content = "\n".join([p.get_text() for p in paragraphs[:5]])

            # 提取链接
            links = self.get_links(soup, url)

            page_data = {
                "url": url,
                "title": title,
                "content": content,
                "links": list(links),
                "crawled_at": time.strftime("%Y-%m-%d %H:%M:%S")
            }

            self.results.append(page_data)

            return page_data

        except requests.exceptions.RequestException as e:
            print(f"❌ 爬取失败 {url}: {e}")
            return None

    def crawl(self):
        """开始爬取"""
        print("="*60)
        print(f"  🕷️  简易爬虫 - 开始爬取: {self.base_url}")
        print("="*60)

        to_visit = [self.base_url]

        while to_visit and len(self.visited) < self.max_pages:
            url = to_visit.pop(0)

            page_data = self.crawl_page(url)

            if page_data:
                # 将新链接加入待访问队列
                for link in page_data["links"]:
                    if link not in self.visited and link not in to_visit:
                        to_visit.append(link)

            # 延时,避免请求过快
            time.sleep(1)

        print("\n" + "="*60)
        print(f"  ✅ 爬取完成!共爬取 {len(self.results)} 个页面")
        print("="*60)

    def print_results(self):
        """打印结果"""
        print("\n📊 爬取结果:")
        print("="*80)

        for i, result in enumerate(self.results, 1):
            print(f"\n{i}. {result['title']}")
            print(f"   URL: {result['url']}")
            print(f"   时间: {result['crawled_at']}")
            print(f"   链接数: {len(result['links'])}")
            if result['content']:
                print(f"   内容预览: {result['content'][:100]}...")

        print("\n" + "="*80)

# 使用爬虫
if __name__ == "__main__":
    # 注意:请仅爬取允许爬取的网站
    # 这里使用一个测试网站
    crawler = SimpleCrawler("https://example.com", max_pages=3)
    crawler.crawl()
    crawler.print_results()

第十一课小结

今天我们学会了:

HTTP基础:方法、状态码、请求响应
requests库:Python最流行的HTTP库
GET请求:获取资源,带参数
POST请求:提交数据,表单和JSON
请求头和认证:自定义请求头,基础认证
错误处理:异常处理,超时设置
文件上传下载:处理文件
会话保持:Session对象
实战项目:天气查询器、GitHub浏览器、简易爬虫


课后练习 ✏️

  1. 写一个程序,调用一个公开API获取数据并展示
  2. 写一个程序,下载一个网页并解析其中的所有链接
  3. 写一个程序,批量下载图片
  4. 写一个简单的API客户端,实现增删改查
  5. 写一个程序,监控某个网页的变化并通知

下节课预告:我们会学习实战项目,把前面学的所有知识综合起来!

恭喜你!你已经完成了所有11课的学习!🎉 下节课是最后的实战项目!