openresty 学习笔记三:连接redis和进行相关操作

时间:2021-09-01 16:40:07

openresty 学习笔记三:连接redis和进行相关操作

openresty 因其非阻塞的调用,令服务器拥有高性能高并发,当涉及到数据库操作时,更应该选择有高速读写速度的redis进行数据处理。避免其应为读写数据而造成瓶颈。

openresty 默认就带了redis的库,这里先梳理下其自带redis连接库的操作流程,再根据存在问题进行二次封装。

自带redis连接库的操作流程

首先是连接redis

local redis = require "resty.redis"
local redisCache = redis.new()
local ok ,err = redisCache.connect(redisCache,127.0.0.1,6379)
redisCache:set_timeout(1000) if not ok then
ngx.log(ngx.ERR, "failed to connect: ", err)
return
end

  

简单的reids操作

local code , err = redisCache:get("code")
local res , err = redisCache:incr("code")
local ok , err = redisCache:set("code",1)

  

然后操作后要进行close

ok , err = redisCache:close()

  

如果项目中有大量redis操作,使用这个标准库就变成会有大量的代码在重复创建连接-->数据操作-->关闭连接(或放到连接池),甚至还要考虑不同的 return 情况做不同处理。

redis 库的二次封装

针对以上问题,也通过学习openresty的最佳实践,直接用上其中已经做好的二次封装。这个封装是要实现一下目的

  1. new、connect 函数合体,使用时只负责申请,尽量少关心什么时候具体连接、释放;
  2. 默认 redis 数据库连接地址,但是允许自定义;
  3. 每次 redis 使用完毕,自动释放 redis 连接到连接池供其他请求复用;
  4. 要支持 redis 的重要优化手段 pipeline;

我只理解了前三点,最后的pipeline也还不是很懂,先贴上代码:

local redis_c = require "resty.redis"

local ok, new_tab = pcall(require, "table.new")
if not ok or type(new_tab) ~= "function" then
new_tab = function (narr, nrec) return {} end
end local _M = new_tab(0, 155)
_M._VERSION = '0.01' local commands = {
"append", "auth", "bgrewriteaof",
"bgsave", "bitcount", "bitop",
"blpop", "brpop",
"brpoplpush", "client", "config",
"dbsize",
"debug", "decr", "decrby",
"del", "discard", "dump",
"echo",
"eval", "exec", "exists",
"expire", "expireat", "flushall",
"flushdb", "get", "getbit",
"getrange", "getset", "hdel",
"hexists", "hget", "hgetall",
"hincrby", "hincrbyfloat", "hkeys",
"hlen",
"hmget", "hmset", "hscan",
"hset",
"hsetnx", "hvals", "incr",
"incrby", "incrbyfloat", "info",
"keys",
"lastsave", "lindex", "linsert",
"llen", "lpop", "lpush",
"lpushx", "lrange", "lrem",
"lset", "ltrim", "mget",
"migrate",
"monitor", "move", "mset",
"msetnx", "multi", "object",
"persist", "pexpire", "pexpireat",
"ping", "psetex", "psubscribe",
"pttl",
"publish", --[[ "punsubscribe", ]] "pubsub",
"quit",
"randomkey", "rename", "renamenx",
"restore",
"rpop", "rpoplpush", "rpush",
"rpushx", "sadd", "save",
"scan", "scard", "script",
"sdiff", "sdiffstore",
"select", "set", "setbit",
"setex", "setnx", "setrange",
"shutdown", "sinter", "sinterstore",
"sismember", "slaveof", "slowlog",
"smembers", "smove", "sort",
"spop", "srandmember", "srem",
"sscan",
"strlen", --[[ "subscribe", ]] "sunion",
"sunionstore", "sync", "time",
"ttl",
"type", --[[ "unsubscribe", ]] "unwatch",
"watch", "zadd", "zcard",
"zcount", "zincrby", "zinterstore",
"zrange", "zrangebyscore", "zrank",
"zrem", "zremrangebyrank", "zremrangebyscore",
"zrevrange", "zrevrangebyscore", "zrevrank",
"zscan",
"zscore", "zunionstore", "evalsha"
} local mt = { __index = _M } local function is_redis_null( res )
if type(res) == "table" then
for k,v in pairs(res) do
if v ~= ngx.null then
return false
end
end
return true
elseif res == ngx.null then
return true
elseif res == nil then
return true
end return false
end -- change connect address as you need
function _M.connect_mod( self, redis )
redis:set_timeout(self.timeout)
return redis:connect(self.db_host, self.db_port)
end function _M.set_keepalive_mod( redis )
-- put it into the connection pool of size 100, with 60 seconds max idle time
return redis:set_keepalive(60000, 1000)
end function _M.init_pipeline( self )
self._reqs = {}
end function _M.commit_pipeline( self )
local reqs = self._reqs if nil == reqs or 0 == #reqs then
return {}, "no pipeline"
else
self._reqs = nil
end local redis, err = redis_c:new()
if not redis then
return nil, err
end local ok, err = self:connect_mod(redis)
if not ok then
return {}, err
end redis:init_pipeline()
for _, vals in ipairs(reqs) do
local fun = redis[vals[1]]
table.remove(vals , 1) fun(redis, unpack(vals))
end local results, err = redis:commit_pipeline()
if not results or err then
return {}, err
end if is_redis_null(results) then
results = {}
ngx.log(ngx.WARN, "is null")
end
-- table.remove (results , 1) self.set_keepalive_mod(redis) for i,value in ipairs(results) do
if is_redis_null(value) then
results[i] = nil
end
end return results, err
end function _M.subscribe( self, channel )
local redis, err = redis_c:new()
if not redis then
return nil, err
end local ok, err = self:connect_mod(redis)
if not ok or err then
return nil, err
end local res, err = redis:subscribe(channel)
if not res then
return nil, err
end local function do_read_func ( do_read )
if do_read == nil or do_read == true then
res, err = redis:read_reply()
if not res then
return nil, err
end
return res
end redis:unsubscribe(channel)
self.set_keepalive_mod(redis)
return
end return do_read_func
end local function do_command(self, cmd, ... )
if self._reqs then
table.insert(self._reqs, {cmd, ...})
return
end local redis, err = redis_c:new()
if not redis then
return nil, err
end local ok, err = self:connect_mod(redis)
if not ok or err then
return nil, err
end local fun = redis[cmd]
local result, err = fun(redis, ...)
if not result or err then
-- ngx.log(ngx.ERR, "pipeline result:", result, " err:", err)
return nil, err
end if is_redis_null(result) then
result = nil
end self.set_keepalive_mod(redis) return result, err
end function _M.new(self, opts)
opts = opts or {}
local timeout = (opts.timeout and opts.timeout * 1000) or 1000
local db_index= opts.db_index or 0
local db_host= opts.host or '127.0.0.1'
local db_port= opts.port or 6379 for i = 1, #commands do
local cmd = commands[i]
_M[cmd] =
function (self, ...)
return do_command(self, cmd, ...)
end
end return setmetatable({
timeout = timeout,
db_index = db_index,
db_host = db_host,
db_port = db_port,
_reqs = nil }, mt)
end return _M

  

这个二次封装除了解决上面几个问题,其实还重写了两个方法,不过订阅和发布暂时没用到,所以还没去细看这一部分。

使用示例

_config.redisConfig = {
timeout = 1000,
host = '127.0.0.1',
port = 6379,
} local redis = require (ngx.var.SERVER_DIR .. ".common.myRedis")
local redisCache = redis:new(config.redisConfig) local accessDetails , err = redisCache:get("code")

  

甚至可以直接

local redisCache = redis:new()