Redis 为什么要引入 Pipeline机制?

[复制链接]
发表于 2025-11-10 11:59:47 | 显示全部楼层 |阅读模式
在 Redis 中有一种 Pipeline(管道)机制,其目标是进步数据传输服从和吞吐量。那么,Pipeline是怎样工作的?它又是怎样进步性能的?Pipeline有什么优缺点?我们该怎样使用 Pipeline?
1、Redis Pipeline是什么?

Redis Pipeline 是一种批量实行下令的技能,答应客户端在不等待服务器相应的环境下,一次性发送多个下令到 Redis 服务器。传统的哀求-相应模式中,客户端每发送一个下令,就须要等待服务器相应后才气发送下一个下令,这种模式在高延伸网络环境下,严肃影响 Redis 的性能体现。
Pipeline 通过消除或镌汰网络往返次数(Round-Trip Time, RTT),可以大概显着进步下令实行的吞吐量,客户端可以将多个下令打包发送,服务器则依次实行这些下令并将效果返回给客户端,从而有效地提升了网络使用率和团体性能
2、为什么引入 Pipeline?
在相识 Redis为什么引入 Pipeline之前,我们先来相识传统哀求-相应模式,在传统的哀求-相应模式中,客户端与服务器之间的通讯流程如下:

  • 客户端发送一个下令到服务器。
  • 服务器吸取下令并实行。
  • 服务器将实行效果返回给客户端。
  • 客户端吸取效果后,发送下一个下令。

在这种传统的模式下,每个下令都须要履历完备的 RTT,这在高延伸网络环境下会导致显着的性能瓶颈。
而 Pipeline的核心头脑是“下令打包,高效传输”。其工作流程可以总结成下面 5个步调:

  • 打包下令: 客户端将多个 Redis 下令按照特定的格式打包成一个哀求包。
  • 发送下令: 将打包好的哀求一次性发送给 Redis 服务器。
  • 实行下令: Redis 服务器按次序实行吸取到的全部下令。
  • 吸取相应: 服务器将全部下令的实行效果按次序返回给客户端。
  • 分析相应: 客户端分析吸取到的相应,并将效果对应到各个下令。

这种方式通过镌汰网络往返次数,有效低就逮络延伸对性能的影响,特殊适当于须要实行大量 Redis 下令的高并发场景。
只管 Pipeline带来了性能的提升,但它也有一些缺点:

  • 资源斲丧: 发送大量下令一次性实行,大概会斲丧较多的服务器资源,导致 Redis 其他操纵的相应时间增长。
  • 错误处置惩罚复杂: 在批量实行下令时,单个下令的错误处置惩罚大概变得复杂,须要逐一查抄每个下令的实行效果。
  • 次序依赖: 如果下令之间存在次序依赖,Pipeline 的批量实行须要确保精确的下令次序。
  • 不支持事件功能: Pipeline 只是批量实行下令的工具,不具备事件的原子性和隔离性。
  • 客户端支持: 差别的 Redis 客户端对 Pipeline 的支持程度差别,使用时需思量所选客户端库的特性和限定。
3、源码分析
在 Java中,常见的 Redis 客户端库有 Jedis 和 Lettuce两种,下面我们将分别分析这两个库实现 Pipeline功能
3.1 使用 Jedis 库
Jedis 是一个简单、直观的 Redis 客户端,支持 Pipeline 功能。下面的示例展示怎样使用 Jedis实现 Pipeline操纵。
  1. import redis.clients.jedis.Jedis;
  2. import redis.clients.jedis.Pipeline;
  3. import redis.clients.jedis.Response;
  4. import java.util.ArrayList;
  5. import java.util.List;
  6. publicclass JedisPipelineExample {
  7.     public static void main(String[] args) {
  8.         // Redis 连接参数
  9.         String redisHost = "localhost";
  10.         int redisPort = 6379;
  11.         String redisPassword = null; // 若有密码,填写密码
  12.         // 连接 Redis
  13.         try (Jedis jedis = new Jedis(redisHost, redisPort)) {
  14.             if (redisPassword != null && !redisPassword.isEmpty()) {
  15.                 jedis.auth(redisPassword);
  16.             }
  17.             // 批量设置键值对
  18.             batchSet(jedis);
  19.             // 批量获取键值对
  20.             batchGet(jedis);
  21.         } catch (Exception e) {
  22.             e.printStackTrace();
  23.         }
  24.     }
  25.     /**
  26.      * 使用 Pipeline 批量设置键值对
  27.      *
  28.      * @param jedis Jedis 实例
  29.      */
  30.     public static void batchSet(Jedis jedis) {
  31.         System.out.println("开始批量设置键值对...");
  32.         Pipeline pipeline = jedis.pipelined();
  33.         int numCommands = 1000;
  34.         for (int i = 0; i < numCommands; i++) {
  35.             pipeline.set("key-" + i, "value-" + i);
  36.         }
  37.         // 执行所有命令
  38.         pipeline.sync();
  39.         System.out.println("批量设置完成,共设置 " + numCommands + " 个键值对。");
  40.     }
  41.     /**
  42.      * 使用 Pipeline 批量获取键值对
  43.      */
  44.     public static void batchGet(Jedis jedis) {
  45.         System.out.println("开始批量获取键值对...");
  46.         Pipeline pipeline = jedis.pipelined();
  47.         int numCommands = 1000;
  48.         List<Response<String>> responses = new ArrayList<>(numCommands);
  49.         for (int i = 0; i < numCommands; i++) {
  50.             Response<String> response = pipeline.get("key-" + i);
  51.             responses.add(response);
  52.         }
  53.         // 执行所有命令
  54.         pipeline.sync();
  55.         // 处理结果
  56.         for (int i = 0; i < numCommands; i++) {
  57.             String value = responses.get(i).get();
  58.             System.out.println("key-" + i + " = " + value);
  59.         }
  60.         System.out.println("批量获取完成,共获取 " + numCommands + " 个键值对。");
  61.     }
  62. }
复制代码
代码分析
上面的代码告急总结为 4个步调:
1、毗连 Redis:
使用 Jedis 类毗连 Redis 服务器。如果 Redis 服务器设置了暗码,须要调用 jedis.auth 举行认证。
2、批量设置键值对:

  • 调用 jedis.pipelined() 获取一个 Pipeline 对象。
  • 使用循环将多个 set 下令添加到 Pipeline 中。
  • 调用 pipeline.sync() 发送全部下令并等待实行效果。
  • 通过 Pipeline 一次性提交全部下令,镌汰了网络往返次数。
3、批量获取键值对:

  • 同样使用 pipelines 获取 Pipeline 对象。
  • 使用 pipeline.get 方法批量添加 get 下令,并将 Response 对象生存到列表中。
  • 调用 pipeline.sync() 发送全部下令并等待实行效果。
  • 遍历 Response 对象列表,获取每个键的值。
4、关闭毗连:
使用 try-with-resources 语法自动关闭 Jedis 毗连,确保资源的精确开释。
3.2 使用 Lettuce 库
Lettuce 是一个基于 Netty 的可伸缩、多线程的 Redis 客户端,支持异步和反应式编程模子,同样支持 Pipeline 功能。
以下示例展示怎样使用 Lettuce 实现 Pipeline 操纵,包罗批量设置和获取键值对。
  1. import io.lettuce.core.RedisClient;
  2. import io.lettuce.core.RedisURI;
  3. import io.lettuce.core.api.StatefulRedisConnection;
  4. import io.lettuce.core.api.sync.RedisCommands;
  5. import io.lettuce.core.api.sync.SyncCommands;
  6. import io.lettuce.core.api.sync.RedisScriptingCommands;
  7. import io.lettuce.core.api.sync.RedisClusterCommands;
  8. import java.util.ArrayList;
  9. import java.util.List;
  10. publicclass LettucePipelineExample {
  11.     public static void main(String[] args) {
  12.         // Redis 连接参数
  13.         String redisHost = "localhost";
  14.         int redisPort = 6379;
  15.         String redisPassword = null; // 若有密码,填写密码
  16.         // 创建 RedisURI
  17.         RedisURI redisURI = RedisURI.Builder.redis(redisHost)
  18.                 .withPort(redisPort)
  19.                 .withPassword(redisPassword != null ? redisPassword.toCharArray() : null)
  20.                 .build();
  21.         // 创建 RedisClient
  22.         RedisClient redisClient = RedisClient.create(redisURI);
  23.         // 建立连接
  24.         try (StatefulRedisConnection<String, String> connection = redisClient.connect()) {
  25.             RedisCommands<String, String> syncCommands = connection.sync();
  26.             // 批量设置键值对
  27.             batchSet(syncCommands);
  28.             // 批量获取键值对
  29.             batchGet(syncCommands);
  30.         } catch (Exception e) {
  31.             e.printStackTrace();
  32.         } finally {
  33.             // 关闭客户端
  34.             redisClient.shutdown();
  35.         }
  36.     }
  37.     /**
  38.      * 使用 Lettuce 的 Pipeline 批量设置键值对
  39.      *
  40.      * @param syncCommands 同步命令接口
  41.      */
  42.     public static void batchSet(RedisCommands<String, String> syncCommands) {
  43.         System.out.println("开始批量设置键值对...");
  44.         int numCommands = 1000;
  45.         for (int i = 0; i < numCommands; i++) {
  46.             syncCommands.set("key-" + i, "value-" + i);
  47.         }
  48.         // 批量执行所有命令
  49.         syncCommands.getStatefulConnection().flushCommands();
  50.         System.out.println("批量设置完成,共设置 " + numCommands + " 个键值对。");
  51.     }
  52.     /**
  53.      * 使用 Lettuce 的 Pipeline 批量获取键值对
  54.      *
  55.      * @param syncCommands 同步命令接口
  56.      */
  57.     public static void batchGet(RedisCommands<String, String> syncCommands) {
  58.         System.out.println("开始批量获取键值对...");
  59.         int numCommands = 1000;
  60.         List<String> keys = new ArrayList<>(numCommands);
  61.         for (int i = 0; i < numCommands; i++) {
  62.             keys.add("key-" + i);
  63.         }
  64.         List<String> values = syncCommands.mget(keys.toArray(new String[0]))
  65.                 .stream()
  66.                 .map(res -> res.getValue())
  67.                 .toList();
  68.         for (int i = 0; i < numCommands; i++) {
  69.             System.out.println(keys.get(i) + " = " + values.get(i));
  70.         }
  71.         System.out.println("批量获取完成,共获取 " + numCommands + " 个键值对。");
  72.     }
  73. }
复制代码
代码分析
上面的代码告急总结为 4个步调:
1、毗连 Redis:
使用 RedisClient 创建毗连,RedisURI 封装了毗连参数。如果 Redis 服务器设置了暗码,须要在 RedisURI 中指定。
2、批量设置键值对:

  • 使用 syncCommands.set 方法批量添加 set 下令。
  • 调用 flushCommands() 方法将全部积累的下令一次性发送到服务器。
注:Lettuce 的 Pipeline 支持隐式的 Pipeline,即没有显式的 Pipeline API,通过积累下令并调用 flushCommands() 实现批量发送。
3、批量获取键值对:
使用 mget 方法一次性获取多个键的值,这是 Lettuce 提供的批量获取下令,天然支持 Pipeline。
mget 返回一个包罗每个键值的 List,通过流处置惩罚提取值。
4、关闭毗连:
使用 try-with-resources 语法自动关闭毗连,末了调用 redisClient.shutdown() 关闭 Redis 客户端。
只管 Lettuce 支持 Pipeline,但其 API 不如 Jedis 那样显式。要实现更细粒度的 Pipeline 控制,可以使用 Lettuce 的下令缓冲机制或异步 API。上述示例中展示的是同步方式,实用于简单的批量操纵。
4、使用场景

  • 批量设置键值对: 将大量键值对一次性写入 Redis,实用于数据初始化或大规模更新。
  • 批量获取键值对: 在须要同时获取多个键的值时,通过 Pipeline 镌汰哀求次数,进步服从。
  • 分布式计数器: 高并发环境下,使用 Pipeline 聚合多个计数操纵,提升吞吐量。
  • 缓存预热: 在应用启动或重启时,通过 Pipeline 将常用数据加载到缓存中,进步应用启动性能。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
回复

使用道具 举报

×
登录参与点评抽奖,加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表