Redis 凭借其高性能和丰富的数据类型,在缓存、会话存储和消息队列等场景中应用广泛。通过 Python 的 redis-py 库,我们可以方便地操作 Redis。下面为你梳理关键要点和用法。
📦 安装与连接
首先需要安装 redis-py 库,并确保 Redis 服务器已运行。
| |
在 Python 中建立连接时,推荐使用连接池以提升性能。
| |
参数说明:
decode_responses=True:自动将返回的字节数据解码为字符串。max_connections=20:设置连接池的最大连接数。
🔧 核心数据类型操作
Redis 支持多种数据结构,下表汇总了它们的常见操作和典型应用场景,方便你快速查阅。
| 数据结构 | 典型应用场景 | 关键写入命令示例 | 关键读取命令示例 |
|---|---|---|---|
| String(字符串) | 缓存会话、计数器、分布式锁 | set(key, value)setex(key, 秒数, value)mset({k1:v1, k2:v2})incr(key) | get(key)mget([key1, key2]) |
| Hash(哈希) | 存储用户信息、商品详情等对象 | hset(name, key, value)hmset(name, {k1:v1, k2:v2}) | hget(name, key)hgetall(name)hkeys(name) |
| List(列表) | 消息队列、最新列表 | lpush(key, *values)rpush(key, *values) | lrange(key, 0, -1)lpop(key)rpop(key) |
| Set(集合) | 标签系统、共同好友 | sadd(key, *members) | smembers(key)sismember(key, member)sinter(key1, key2) |
| Sorted Set(有序集合) | 排行榜、优先级队列 | zadd(key, {member1:score1, member2:score2}) | zrange(key, 0, -1, withscores=True)zrevrange(...) |
以下是针对上述数据结构的常用 Python 代码示例。
1. String(字符串)
| |
2. Hash(哈希)
| |
3. List(列表)
| |
4. Set(集合)
| |
5. Sorted Set(有序集合)
| |
⚙️ 高级功能
事务处理 使用
pipeline确保多个命令的原子性执行。1 2 3 4 5 6 7 8 9try: with client.pipeline() as pipe: pipe.multi() # 开启事务 pipe.set('key1', 'val1') pipe.hset('user:1002', 'name', 'wangwu') results = pipe.execute() # 执行事务 print(f"事务执行成功: {results}") except redis.RedisError as e: print(f"事务执行失败: {e}")发布订阅 (Pub/Sub) 可用于实现简单的消息队列。
1 2 3 4 5 6 7 8 9# 发布者 client.publish('news_channel', 'Hello, World!') # 订阅者 (需要在另一个线程或进程中运行) pubsub = client.pubsub() pubsub.subscribe('news_channel') for message in pubsub.listen(): if message['type'] == 'message': print(f"收到消息: {message['data']}")过期时间 可以设置键的生存时间,到期后自动删除。
1 2client.setex('temp_data', 60, '临时数据') # 60秒后过期 ttl = client.ttl('temp_data') # 查看剩余生存时间
💡 性能优化与最佳实践
- 使用连接池:避免频繁建立和关闭连接的开销。
- 批量操作:优先使用
mset、mget等批量命令,或使用pipeline打包多个命令,减少网络往返次数。 - 选择合适的数据类型:根据场景选择最有效的数据结构。例如,存储对象使用 Hash 通常比多个 String 更节省内存。
- 异常处理:重要的 Redis 操作应放在
try-except块中。1 2 3 4 5 6 7 8 9 10from redis.exceptions import ConnectionError, TimeoutError try: client.set('key', 'value') except ConnectionError as e: print(f"Redis连接错误: {e}") except TimeoutError as e: print(f"Redis请求超时: {e}") except Exception as e: print(f"其他错误: {e}")
💎 简单应用示例
缓存函数结果
1 2 3 4 5 6 7 8 9 10def get_expensive_data(key): # 先尝试从Redis缓存获取 cached_data = client.get(key) if cached_data: return cached_data else: # 如果缓存中没有,则从数据库或其他耗时操作中获取 data = ... # 这里是耗时的数据获取逻辑 client.setex(key, 3600, data) # 存入缓存,设置1小时过期 return data简单分布式锁
1 2 3 4 5 6 7 8 9 10 11# 获取锁,设置过期时间防止死锁 lock_acquired = client.set('my_lock', 'locked', nx=True, ex=10) if lock_acquired: try: # 执行需要加锁的临界区代码 print("锁已获取,执行关键操作...") finally: # 释放锁 client.delete('my_lock') else: print("获取锁失败,其他进程正在操作。")
希望这份详细的总结能帮助你在 Python 项目中高效地使用 Redis。如果你有特定的应用场景想深入了解,我们可以继续探讨。
💬 评论