redis 常用

Redis Cluster 使用 lua 问题:

EVAL 定义是: EVAL script numkeys key [key…] arg [arg…]
设计者期望所有操作的KEY 都是用 KEYS传进去的,EVAL执行前会检查一下所有的KEYS,根据这些KEYS找到把指令发送到那个节点
这里的节点对应 Cluster 就是 Slots,一共16384个Slots,规则是 SLOT = CRC16(KEY) mod 16384

`
16384 The reason is:
1. Normal heartbeat packets carry the full configuration of a node, that can be replaced in an idempotent way with the old in order to update an old config. This means they contain the slots configuration for a node, in raw form, that uses 2k of space with16k slots, but would use a prohibitive 8k of space using 65k slots.
2. At the same time it is unlikely that Redis Cluster would scale to more than 1000 mater nodes because of other design tradeoffs.
So 16k was in the right range to ensure enough slots per master with a max of 1000 maters, but a small enough number to propagate the slot configuration as a raw bitmap easily. Note that in small clusters the bitmap would be hard to compress because when N is small the bitmap would have slots/N bits set that is a large percentage of bits set.

From https://github.com/redis/redis/issues/2576
`

根据上面的规则,传入的KEYS如果不是同样的SLOT,就会报错  CROSSSLOT Keys in request don't hash to the same slot
如果不传 KEYS 只使用 ARGV,或者直接lua写死多个KEY的方式,比如:
redis.call('get', '123abc'); redis.call('get', '123def'); -- 这里没有做测试,认为 '123abc' 和 '123def' 是不同的SLOT
这时候会报错类似 ERR Error running script (call to f_8ead0f68893988e15c455c0b6c8ab9982e2e707c): @user_script:1: @user_script: 1: Lua script attempted to access a non local key in a cluster node
可以用 {} 括起来计算SLOT用的部分,保证 {} 括起来的部分是相同的,
Redis 集群的拓扑结构是是一个全连通的网络,每一个节点之间都会建立一个 Cluster Bus,所以集群的任何配置变动都会立即同步到各个节点,也就是说,每一个节点都知道哪些 Slot 对应哪个节点。
所以不论客户端连接到哪个节点进行执行指令,服务端都会正确的指示客户端应当重定向到哪一个节点来操作。
redis.call('get', '{123}abc'); redis.call('get', '{123}def'); -- SLOT 计算将会用 123 计算
但是这样 SLOT 所在节点压力就会变大,不均衡
原本这两个操作会分布在两个SLOT,但是现在都用一个SLOT执行,而且这样的做法业务不友好


unable to connect to RedisURI 报错:

1
2
3
4
5
# By default protected mode is enabled. You should disable it only if
# you are sure you want clients from other hosts to connect to Redis
# even if no authentication is configured, nor a specific set of interfaces
# are explicitly listed using the "bind" directive.
protected-mode no

Redis Cluster 批量删除(shell):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#!/bin/bash

if [ $# != 1 ]; then
echo "Usage: ./del_redis.sh pattern"
exit
fi;

ips=(192.168.1.1 192.168.1.2)
ports=(6379 6380 6381)

redis_cluster_ip=192.168.2.1
redis_cluster_port=6379

for ip in ${ips[@]}; do
for port in ${ports[@]}; do
echo "ip: ${ip} port: ${port}"
redis-cli -h $ip -p $port --scan --pattern "$1" | xargs -i redis-cli -h $ip -p $port del {}
done;
done;

redis_nodes=`redis-cli -h $redis_cluster_ip -p $redis_cluster_port -c cluster nodes | cut -d ' ' -f 2`
for node in $redis_nodes; do
node_redis_ip=`echo "$node" | awk -F ':' '{print $1}'`
node_redis_port=`echo "$node" | awk -F ':' '{print $2}'`
echo "------- redis-cluster ${node} ${node_redis_ip} ${node_redis_port} flushall -------"
redis-cli -h $node_redis_ip -p $node_redis_port -c flushall
done

排行榜(c++ & lua):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// redis zset score between -9007199254740992 and 9007199254740992
// ( MAX_SCORE & score ) << 32 | timeseconds
int64_t calc_score(int64_t base_value) {
int64_t time_curr = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
return ((MAX_SCORE & base_value) << 32) | (time_curr - SUB_TIME);
}

-- 前 count 名
"local count = KEYS[1];\n" +
"local revrange = redis.pcall('ZREVRANGE', '"+ RANK_REDIS_ZSET_PVP_KEY + "', 0, count - 1);\n" +
"local result = {};\n" +
"for _, v in ipairs(revrange) do\n" +
" local ones = redis.pcall('GET', '"+ RANK_REDIS_ONES_PVP_INFO + "'..v);\n" +
" table.insert(result, ones);\n" +
"end\n" +
"return result;\n"

-- 自己的排名
"local user = KEYS[1];\n" +
"local revrank = redis.pcall('ZREVRANK', '"+ RANK_REDIS_ZSET_PVP_KEY + "', user);\n" +
"local result = {};\n" +
"if (revrank ~= false) then\n" +
" local ones = redis.pcall('GET', '"+ RANK_REDIS_ONES_PVP_INFO + "'..user);\n" +
" table.insert(result, tostring(revrank + 1));\n" +
" table.insert(result, ones);\n" +
"else\n" +
" table.insert(result, tostring(-1));\n" +
" table.insert(result, '{}');\n" +
"end\n" +
"return result;\n";

定时任务(lua):

key: 任务数据,score:时间戳

1
2
3
4
5
local taskList = redis.pcall('ZRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'limit', ARGV[3], ARGV[4]);
for _, v in pairs(taskList) do
redis.call('ZADD',KEYS[1], ARGV[5], v) -- 更新为下一个触发时间点, 避免任务丢失
end;
return taskList

对于分布式系统,服务器间时间可能有些许差异,只关注超时触发的话可以使用redis拿到的时间

  • add task:
    1
    2
    3
    4
    5
    6
    7
    local timestamp = redis.call('TIME');
    local timeout_ms = tonumber(timestamp[1]) * 1000 + tonumber(ARGV[2]);
    if (ARGV[3] == nil) then
    return redis.call('ZADD', KEYS[1], timeout_ms, ARGV[1])
    else
    return redis.call('ZADD', KEYS[1], ARGV[3], timeout_ms, ARGV[1])
    end
  • consume task:
    1
    2
    3
    4
    5
    6
    7
    8
    local timestamp = redis.call('TIME');
    local curr_ms = tonumber(timestamp[1]) * 1000 + tonumber(timestamp[2]) / 1000;
    local next_ms = curr_ms + tonumber(ARGV[5]);
    local taskList = redis.pcall('ZRANGEBYSCORE', KEYS[1], ARGV[1], curr_ms, 'limit', ARGV[3], ARGV[4]);
    for k,v in pairs( taskList) do
    redis.call('ZADD', KEYS[1], next_ms, v)
    end;
    return taskList

分布式锁(java):

  • 基于redis单线程模型+lua原子的特性

  • 前置条件是redis的lock和unlock是在业务的同一个线程里面的

  • lock 任何情况下都会释放

  • 线程局部变量

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public class IdThreadLocalHelper {
    private static ThreadLocal<String> idThreadLocal = new ThreadLocal();

    IdThreadLocalHelper() {
    }

    public static void put(final String value) {

    idThreadLocal.set(value);
    }

    public static String get() {

    return idThreadLocal.get();
    }
    }
  • 加锁

    1
    2
    3
    4
    5
    6
    7
    public boolean lock(String key, long expireSecs) {
    final String uuid = IdUtils.getUUid();
    IdThreadLocalHelper.put(uuid);

    String result = redisStandalone.set(key, uuid, "NX", "EX", expireSecs);
    return "OK".equalsIgnoreCase(result);
    }
  • 解锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @Override
    public boolean unLock(String key) {

    final String requestId = IdThreadLocalHelper.get();
    if(requestId == null) {
    return false;
    }

    List<String> listKeys = new ArrayList<>();
    listKeys.add(key);
    String[] keys = listKeys.toArray(new String[0]);

    String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    int result = redisStandalone.eval(script, ScriptOutputType.INTEGER, keys, requestId);
    return 1 == result;
    }
  • 应用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    final String lockKey = "test";
    boolean locked = lock(lockKey, 15);
    if (!locked) {
    // TODO: lock error
    return;
    }

    try {
    // TODO: logic
    } finally {
    lobbyServiceRedis.lobbyUnlock(lockKey);
    }

重度redis lua使用的一些基础功能(lua):

  • local 前置声明定义
    1
    2
    local KEY_XXX = 'test.lua.KEY_XXX';
    local KEY_YYY = 'test.lua.KEY_YYY';
  • final atomic 用于lua return 前更新版本号一类的东西
    • any : 具体要返回的内容
    • inc : 是否操作版本号变更(+1)
  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    local atomicInc = ARGV;
    local lua_oper = {}
    lua_oper.final = function(any, inc)
    if inc == true then
    if #atomicInc == 1 then
    redis.call('INCR', KEY_ATOMIC_INC..atomicInc[1])
    end
    end
    return any;
    end
  • redis operator 一些常用的操作封装
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    ----------------------------------------------
    -- redis operator
    local redis_oper = {}
    redis_oper.zset_element = function(key, st, ed)
    local elescores = redis.call('ZRANGE', key, st, ed, 'WITHSCORES');
    local ele = {};
    local eleKey; local eleScore = 0;
    for index, val in pairs(elescores) do
    if index % 2 == 1 then eleKey = val;
    else eleScore = tonumber(val);
    end
    ele[eleKey] = eleScore;
    end
    return ele;
    end
    redis_oper.zset_insert = function(key, ele, score)
    return redis.call('ZADD', key, score, ele);
    end
    redis_oper.zset_rem = function(key, ele)
    return redis.call('ZREM', key, ele);
    end
    redis_oper.zset_card = function(key)
    return tonumber(redis.call('ZCARD', key));
    end
    redis_oper.set = function(key, key_plus, data)
    return redis.call('SET', key..key_plus, data);
    end
    redis_oper.get = function(key, key_plus)
    local res = redis.call('GET', key..key_plus);
    if (res == nil or (type(res) == 'boolean' and not res)) then
    return nil;
    end
    return res;
    end
    redis_oper.del = function(key, key_plus)
    return redis.call('DEL', key..key_plus);
    end
    redis_oper.exist = function(key, key_plus)
    return tonumber(redis.call('EXISTS', key..key_plus));
    end
  • lua algorithm 取集合常用的函数
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    -----------------------------------------------
    -- lua algorithm
    local lua_res = {}
    -- 数组转化成键值对table
    lua_res.new = function(list)
    local tab = {}
    for _, v in pairs(list) do
    tab[v] = true
    end
    return tab
    end
    -- 并集 a,b为键值对形式的table
    res.union = function(a, b)
    local tab = {}
    for k, _ in pairs(a) do tab[k] = true end
    for k, _ in pairs(b) do tab[k] = true end
    return res.changeToArr(tab)
    end
    -- 交集 a,b为键值对形式的table
    lua_res.intersecion = function(a, b)
    local tab = {}
    for k, v in pairs(a) do
    tab[k] = b[k]
    end
    return lua_res.change2arr(tab)
    end
    -- 键值对转换成数组
    lua_res.change2arr = function(tab)
    local list = {}
    for k, _ in pairs(tab) do
    list[#list + 1] = k
    end
    return list
    end
  • 基于上面一系列的基础内容,剩下的就是写逻辑处理代码了,这样子的逻辑代码看起来更整齐也更干净

备忘:一些 redis lua 需要注意的地方

redis 内存置换策略

  • noeviction
    内存不足时直接报错 OOM command not allowed when used memory > maxmemory;不会清理数据
  • volatile-ttl
    清理数据根据 ttl 时间(expire设置了生存时间的)从小到大依次清楚,不会清理 persist 的 key(没有过期时间的);直到所有 expire 的 key 都清除了,报错 OOM
  • volatile-lru (默认)
    清理数据根据 expire key 按 lru 算法(最近最少使用)采样清除;直到所有 expire 的 key 都清除了,报错 OOM
  • volatile-random
    清理数据根据 expire key 随机清除;直到所有 expire 的 key 都清除了,报错 OOM
  • allkeys-lru
    和 volatile-lru 的区别是不判断是否是 expire key;删除一个插入一个,不再报 OOM
  • allkeys-random
    和 volatile-random 的区别是不判断是否是 expire key;删除一个插入一个,不再报 OOM

redis 监控项

  • aof_enabled, AOF是否处于打开状态
  • aof_last_write_status, 记录最近一次AOF写的结果是成功还是失败
  • blocked_clients, 正在等待阻塞命令(keys*, monitor, blpop, brpop, brpoplpush)的客户端数量
  • client_biggest_input_buf, 当前连接的客户端当中,最大输入缓存
  • client_longest_output_list, 当前连接的客户端当中,最长的输出列表
  • cluster_enabled, 集群功能是否已经开启
  • connected_clients, 已连接客户端的数量(不包括通过slave服务器连接的客户端)
  • connected_slaves, 已连接的slave服务器数量
  • evicted_keys, 因最大内存容量限制而被LRU算法置换出内存的键数量
  • expired_keys, 因过期而被自动删除的键数量
  • keyspace_hits, 查找键hit次数
  • keyspace_misses, 查找键miss次数
  • lru_clock, 以分钟为单位进行自增的时钟,用于LRU管理
  • master_link_status, slave节点复制连接当前的状态,up表示连接正常,down表示连接断开。
  • maxmemory, redis最大可用内存
  • maxmemory_policy, 内存不足时,数据清除策略,默认为volatile-lru。
      1. volatile-lru:从已设置过期时间的数据集(server.db[i].expires)使用LRU算法淘汰。
      1. volatile-ttl:从已设置过期时间的数据集(server.db[i].expires)采取TTL算法(最小存活时间),移除即将过期的数据。
      1. volatile-random:从已设置过期时间的数据集(server.db[i].expires)采取随机选取算法,并移除选中的K-V,直到”内存足够”为止,如果如果”过期集合”中全部移除全部移除仍不能满足,将OOM
      1. allkeys-lru:对所有数据(server.db[i].dict),采用LRU算法淘汰。
      1. allkeys-random:对所有的数据(server.db[i].dict),采取”随机选取”算法,并移除选中的K-V,直到”内存足够”为止
      1. noeviction:禁止驱逐数据,直接返回OOM异常
  • mem_fragmentation_ratio, 碎片率,计算公式used_memory_rss/used_memory
  • pubsub_channels, 当前被订阅的频道数量
  • pubsub_patterns, 当前被订阅的模式数量
  • rdb_last_bgsave_status, 最近一次创建RDB文件的结果是成功还是失败
  • role, 本机在主从架构中的角色,master和slave两种选择
  • total_commands_processed, 当前已执行的命令数量
  • total_connections_received, 当前已接受的连接请求数量
  • uptime_in_seconds, Redis服务器启动以来经过的秒数
  • used_cpu_sys, Redis服务器耗费的系统CPU
  • used_cpu_user, Redis服务器耗费的用户CPU
  • used_memory, 由Redis分配器分配的内存总量
  • used_memory_rss, 从操作系统的角度,返回Redis已分配的内存总量
  • instantaneous_ops_per_sec, 当前每秒钟执行的命令数量

redis 内存相关

  • redis内存相关的

    • rdb持久化
    • AOF重写
    • 内存剔除策略(高版本redis还存在着内存碎片整理的配置选项),
    • 其中AOF重写和rdb持久化都属于fork子进程来完成的。本次就以rdb持久化为例,rdb的持久化可以由持久化的配置策略或者命令行bgsave或者主从全同步触发。 redis在做bgsave的时候,fork出子进程来做bgsave。具体的过程如下:
    • rdbSaveBackground()中fork子进程 —> rdbSave() —> rdbSaveRio()。
    • fork后子进程拥有和父进程一模一样的进程空间,虽然采用了COW机制(父子进程的虚拟内存指向相同的物理page),但是ps或者top命令中的RSS显示的值都会算成自己进程所占的物理内存,这个可能是很多运维同学/DBA同学经常可以眼见的现象,恐怕这个就是潜意识里需要内存预留一半的重要因素。
  • Linux下的进程下的地址都是虚拟地址,CPU使用的也是虚拟地址,Linux将每个进程的地址空间人为地分为用户地址空间和内核地址空间

    • 32位下 0-3G为用户地址空间,3-4G为内核地址空间(每个进程都是这样),
    • 64位下,0-128T 为用户地址空间,高位-128T为内核地址空间。
    • 进程中的虚拟地址和内存物理地址存在映射关系,这个映射关系由进程的页表pte来维护。
    • 虚拟地址和物理地址是多对1或者1对1的关系。Linux默认情况下fork子进程会采用写时复制(Copy On Write)。
    • 为了解决默认glibc内存分配器的性能和碎片率问题,redis引入了jemalloc,并成为默认配置。
  • Linux的fork COW机制

    • 在内核层面看,fork 创建一个进程的动作:
      • do_fork()
      • copy_process()
      • copy_mm()
      • dup_mm()
      • dump_nmap()
      • copy_page_range()
      • copyp4d_range()
      • copy_pmd_range()
      • copy_pte_range()
      • copy_one_pte()
      • ptep_set_wrprotect()
    • 大体上的功能就是:
      复制当前进程的结构,复制当前进程路径,文件句柄,信号,namespace, 虚拟内存(当然包含页目录和页表),内核栈, CPU体系结构相关信息, 在复制页表的过程中,内核会将物理page权限为设置为只读,一旦父进程
    • 修改物理page的时候,会触发page fault, 内核在异常处理过程中通过pgd_alloc重新分配物理page,将先前物理page中的数据复制到新分配的物理page,同时修改父进程中页表和物理page的映射关系。(参见ULK 2.4和3.3)
  • 从理论上看,redis 在fork bgsave的时候,是不会让内存翻倍的, 那么是不是只要父进程的内存管够,就可以安全地进行bgsave呢?

      1. 在bgsave期间,业务产生的’update’类数据量(新增/修改)。
      1. redis运行过程中rehash产生的内存消耗。
------ 本文结束 ------
------ 版权声明:转载请注明出处 ------