redis学习笔记之pipeline

时间:2022-06-08 15:33:07

redis是一个cs模式的tcp server,使用和http类似的请求响应协议。一个client可以通过一个socket连接发起多个请求命令。每个请求命令发出后client通常 会阻塞并等待redis服务处理,redis处理完后请求命令后会将结果通过响应报文返回给client。基本的通信过程如下
Client: INCR X
Server: 1
Client: INCR X
Server: 2
Client: INCR X
Server: 3
Client: INCR X
Server: 4
基 本上四个命令需要8个tcp报文才能完成。由于通信会有网络延迟,假如从client和server之间的包传输时间需要0.125秒。那么上面的四个命 令8个报文至少会需要1秒才能完成。这样即使redis每秒能处理100个命令,而我们的client也只能一秒钟发出四个命令。这显示没有充分利用 redis的处理能力。除了可以利用mget,mset 之类的单条命令处理多个key的命令外
我们还可以利用pipeline的方式从client打包多条命令一起发出,不需要等待单条命令的响应返回,而redis服务端会处理完多条命令后会将多条命令的处理结果打包到一起返回给客户端。通信过程如下

Client: INCR X
Client: INCR X
Client: INCR X
Client: INCR X
Server: 1
Server: 2
Server: 3
Server: 4

假 设不会因为tcp 报文过长而被拆分。可能两个tcp报文就能完成四条命令,client可以将四个incr命令放到一个tcp报文一起发送,server则可以将四条命令 的处理结果放到一个tcp报文返回。通过pipeline方式当有大批量的操作时候。我们可以节省很多原来浪费在网络延迟的时间。需要注意到是用 pipeline方式打包命令发送,redis必须在处理完所有命令前先缓存起所有命令的处理结果。打包的命令越多,缓存消耗内存也越多。所以并是不是打 包的命令越多越好。具体多少合适需要根据具体情况测试。下面是个jredis客户端使用pipeline的测试
package jredisStudy;
import org.jredis.JRedis;
import org.jredis.connector.ConnectionSpec;
import org.jredis.ri.alphazero.JRedisClient;
import org.jredis.ri.alphazero.JRedisPipelineService;
import org.jredis.ri.alphazero.connection.DefaultConnectionSpec;
public class PipeLineTest {
    public static void main(String[] args) {
          long start = System.currentTimeMillis();
          usePipeline();
          long end = System.currentTimeMillis();
          System.out.println(end-start);

start =  System.currentTimeMillis();
          withoutPipeline();
          end  = System.currentTimeMillis();
          System.out.println(end-start);
  
    }
    
    private static void withoutPipeline()
    {
         try { 
             JRedis  jredis = new JRedisClient("192.168.56.55",6379);
                for(int i =0 ; i < 100000 ; i++)
                {
                    jredis.incr("test2");
                }
                jredis.quit();
        } catch (Exception e) {
        }
    }

private static void usePipeline() {
        try {
            ConnectionSpec spec = DefaultConnectionSpec.newSpec("192.168.56.55", 6379, 0, null);
            JRedis jredis = new JRedisPipelineService(spec);
            for(int i =0 ; i < 100000 ; i++)
            {
                jredis.incr("test2");
            }
            jredis.quit();
        } catch (Exception e) {
        }
    }
}
输出
103408 //使用了pipeline
104598 //没有使用

测试结果不是很明显,这应该是跟我的测试环境有关。我是在自己win连接虚拟机的linux。网络延迟比较小。所以pipeline
优势不明显。如果网络延迟小的话,最好还是不用pipeline。除了增加复杂外,带来的性能提升不明显。

=================

一般情况下,Redis Client端发出一个请求后,通常会阻塞并等待Redis服务端处理,Redis服务端处理完后请求命令后会将结果通过响应报文返回给Client。
这有点类似于HBase的Scan,通常是Client端获取每一条记录都是一次RPC调用服务端。
在Redis中,有没有类似HBase Scanner Caching的东西呢,一次请求,返回多条记录呢?
有,这就是Pipline。官方介绍 http://redis.io/topics/pipelining

通过pipeline方式当有大批量的操作时候,我们可以节省很多原来浪费在网络延迟的时间,需要注意到是用pipeline方式打包命令发送,redis必须在处理完所有命令前先缓存起所有命令的处理结果。打包的命令越多,缓存消耗内存也越多。所以并不是打包的命令越多越好。

使用Pipeline在对Redis批量读写的时候,性能上有非常大的提升。

使用Java测试了一下:

  1. package com.lxw1234.redis;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import java.util.Set;
  5. import redis.clients.jedis.Jedis;
  6. import redis.clients.jedis.Pipeline;
  7. import redis.clients.jedis.Response;
  8. public class Test {
  9. public static void main(String[] args) throws Exception {
  10. Jedis redis = new Jedis("127.0.0.1", 6379, 400000);
  11. Map<String,String> data = new HashMap<String,String>();
  12. redis.select(8);
  13. redis.flushDB();
  14. //hmset
  15. long start = System.currentTimeMillis();
  16. //直接hmset
  17. for (int i=0;i<10000;i++) {
  18. data.clear();
  19. data.put("k_" + i, "v_" + i);
  20. redis.hmset("key_" + i, data);
  21. }
  22. long end = System.currentTimeMillis();
  23. System.out.println("dbsize:[" + redis.dbSize() + "] .. ");
  24. System.out.println("hmset without pipeline used [" + (end - start) / 1000 + "] seconds ..");
  25. redis.select(8);
  26. redis.flushDB();
  27. //使用pipeline hmset
  28. Pipeline p = redis.pipelined();
  29. start = System.currentTimeMillis();
  30. for (int i=0;i<10000;i++) {
  31. data.clear();
  32. data.put("k_" + i, "v_" + i);
  33. p.hmset("key_" + i, data);
  34. }
  35. p.sync();
  36. end = System.currentTimeMillis();
  37. System.out.println("dbsize:[" + redis.dbSize() + "] .. ");
  38. System.out.println("hmset with pipeline used [" + (end - start) / 1000 + "] seconds ..");
  39. //hmget
  40. Set keys = redis.keys("*");
  41. //直接使用Jedis hgetall
  42. start = System.currentTimeMillis();
  43. Map<String,Map<String,String>> result = new HashMap<String,Map<String,String>>();
  44. for(String key : keys) {
  45. result.put(key, redis.hgetAll(key));
  46. }
  47. end = System.currentTimeMillis();
  48. System.out.println("result size:[" + result.size() + "] ..");
  49. System.out.println("hgetAll without pipeline used [" + (end - start) / 1000 + "] seconds ..");
  50. //使用pipeline hgetall
  51. Map<String,Response<Map<String,String>>> responses = new HashMap<String,Response<Map<String,String>>>(keys.size());
  52. result.clear();
  53. start = System.currentTimeMillis();
  54. for(String key : keys) {
  55. responses.put(key, p.hgetAll(key));
  56. }
  57. p.sync();
  58. for(String k : responses.keySet()) {
  59. result.put(k, responses.get(k).get());
  60. }
  61. end = System.currentTimeMillis();
  62. System.out.println("result size:[" + result.size() + "] ..");
  63. System.out.println("hgetAll with pipeline used [" + (end - start) / 1000 + "] seconds ..");
  64. redis.disconnect();
  65. }
  66. }
    1. dbsize:[10000] ..
    2. hmset without pipeline used [243] seconds ..
    3. dbsize:[10000] ..
    4. hmset with pipeline used [0] seconds ..
    5. result size:[10000] ..
    6. hgetAll without pipeline used [243] seconds ..
    7. result size:[10000] ..
    8. hgetAll with pipeline used [0] seconds ..

    使用pipeline来批量读写10000条记录,就是小菜一碟,秒完。