liuhao163.github.io

杂七杂八


  • Home

  • Categories

  • Tags

  • Archives

  • Sitemap

JAVA虚拟机的内存管理

Posted on 2019-10-15 | Edited on 2022-09-21 | In java , jvm

Java与C++之间有一堵由内存动态分配和垃圾收集技术所围成的“高墙”,墙外面的人想进去,墙里面的人却想出来。

运行时数据区域

见图:

avator

程序计数器

一块较小的内存空间,当前线程执行字节码的行号计数器,字节码解释器通过修改它来让程序知道分支、跳转、异常处理等逻辑下一条指令去哪里执行。
在多线程环境中,由于当前线程在某一时刻,只会执行线程的一条指令,线程切换之后要恢复到之前程序执行的点。所以程序计数器线程独立的。
如果线程执行的是JAVA方法,该值记录的是虚拟机字节码指令地址。如果是native方法该值是undifined,该区域是jvm中唯一没规定oom的区域

Java虚拟机栈

也是线程独有的,生命周期和线程绑定在一起。描述的是JVM的内存模型:每个方法在执行时候会生成一个Stack Frame。存储方法的局部变量表、操作数栈、动态链接、方法出口等。
每一个方法从执行开始到执行完毕都是Stack Frame的一个入栈到出栈的操作。
局部变量表存放:java的几本类型,对象引用、returnAddress等。局部变量表的内存空间在编译时候就确定好了,当进入一个方法中,局部变量表在Stack Frame的大小是确定的不会改变。
如果线程请求的虚拟机栈超过最大深度会报StackOverFlowError异常,虚拟机栈可以动态扩展但是没申请到内存会报OutOfMemoryError异常。(这里是只没打到深度但是内存不足了)

本地方法栈

基本上和JAVA虚拟机栈类似这里是指调用的是native方法。会抛出的异常如上。

JAVA堆

存放对象最大的一块区域,可以是不连续的内存空间。具体针对gc方式可以分为young old metaspace等。后期还有g1回收期,这里暂时不做讨论,所有线程共享。如果内存不足会报OutOfMemoryError异常

方法区

我们常说的PermSpace或者MetaSpace。用于存储Java虚拟机的类信息、静态变量、常量、即时编译的代码等。为了和堆区分开也叫(Non heap)。JVM规范对方法去限制很宽松除了内存可以不连续外,还可以不实现gc,方法区的gc主要取决于常量的回收和类的卸载。

直接内存

在jdk1.4之后Java引入了Nio,可以直接通过native函数分配堆外内存 通过DirectByteBuffer进行读取/写入,也会引起OutOfMemoryError

运行时的常量池

方法区的一部分,类在编译时产生的常量在加载Class时候会存储在方法区的常量池中,常量池是动态的的比如String的inter()方法会在运行时动态的加入常量池,所以也有可能出现OutOfMemoryError异常

补充一点:
String s=”liuhao” 是一个常量是编译时候就决定好的,所以java汇总
String s1=”liu”+”hao” 俩个常量相加会进入常量池儿常量池只有一个拷贝,所以s==s1
String s=new String(“liuhao”) 不是常量所以不能放在常量池。
String s1=s1.intern() 将s1的值写入到常量池中,扩充了常量池

Object

HotSpot的Object的创建

对象创建的步骤

  1. 类装载:去常量池中查找Class的符号引用,检查类是否已经被加载、解析、初始化过;
  2. 分配内存,依据内存是否完整有俩种分配方式
    1. 指针碰撞:Bump the Pointer,规整的内存空间,Serial,ParNew这种垃圾回收机制会整理内存。将已分配、未分配用一个指针分隔开,分配时候讲指针挪动一个对象的size个位置。
    2. 空闲列表: Free List,不规整的内存空间,CMS垃圾回收机制不需要整理内存。维护一个空闲列表,分配时候找到一个空闲的内存空间进行分配。
    3. 指针碰撞的线程安全:
      1. TLAB:每个内存开辟一个独有的小的内存空间默认是Eden区1%,对象会先在TLAB上创建,这样就保证了原子性。虚拟机内部会维护一个refill_waste值,如果对象需要的空间小于TLAB的剩余空间,同时对象的size大于refill_waste,会在堆中创建,如果小于refill_waste则会新申请一个TLAB进行创建。用参数-XX:+UseTLAB开启,默认是开启的
      2. CAS+失败重试,保证分配空间的原子性。
  3. 初始化对象,将对象里的属性初始化成零值
  4. 设置对象头:元数据信息、Hash值、GC分带年龄等
  5. 对象从虚拟机的角度来看已经创建完成,java代码还需要对对象进行init的设置。

对象的内存布局

分为对象头(Header)、实例数据(Instance Data)、对齐填充(Padding)

对象头

  • MarkWord:HashCode、GC年代分龄、锁标记位、线程持有的锁、偏向的线程ID、偏向时间戳。长度32bit or 64bit。
    • 32bit:HashCode(25bit)、GC年代分龄(4bit)、锁标记位(2bit)、unused(1bit)
    • 64bit:unused(25bit)、HashCode(31bit)、GC年代分龄(4bit)、锁标记位(2bit)、unused(1bit)、block(1bit)
  • 类型指针:类元素的指针告诉虚拟机它是哪个类的实例。如果是数组还需要保存数组的长度

实例数据

保存对象的真实数据,分配策略是相同宽度的对象会分配在一起,满足这个前提条件下,父类的字段会放在子类之前。

对象填充

用于对齐对象用,因为要求对象必须是8字节的整数倍,如果不是需要这部分进行填充。

对象访问定位

创建对象之后java通过栈上的refrences数据来操作对象实例,JVM没有规定如何具体访问对象有俩种访问方式,通过句柄方式和直接指针方式

句柄方式:jvm在堆上开辟一块空间作为句柄池,refrence保存的是句柄地址,句柄来负责访问对象一部分指向堆中对象的实例地址,一部分指向方法区的类实例地址。
好处:refrence保存的是稳定的句柄地址,当对象被移动时候refrence不需要改变。(例如gc时候移动对象)
avator

直接对象指针:refrence保存的是实例数据的指针。HotSpot采用这种方式
好处:速度快。少一次句柄查找的操作。由于创建对象频繁这一步性能的节省效果很客观。
avator

OutOfMemory

todo

java堆溢出

执行如下程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* -Xms20m -Xmx20m -XX:+HeapDumpOnOutOfMemoryError
* @Author: liuhaoeric
* Create time: 2019/10/17
* Description:
*/
public class HeapOOM {

static class OOMObject {
}

public static void main(String[] args) {
List<OOMObject> list = new ArrayList<OOMObject>();
while (true) {
list.add(new OOMObject());
}
}
}

会出现异常信息
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3210)
at java.util.Arrays.copyOf(Arrays.java:3181)
at java.util.ArrayList.grow(ArrayList.java:265)
at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:239)
at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:231)
at java.util.ArrayList.add(ArrayList.java:462)
at com.ericliu.practice.toy.jvm.oom.HeapOOM.main(HeapOOM.java:19)

其中Java heap space的表示是堆溢出,可以通过HeapDumpOnOutOfMemoryError导出的文件通过MAT查看(eclipse的插件)

java方法栈和本地方法栈溢出溢出

会出现StackOverFlowError或者StackOutOfMemeoryError。执行下面代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* -Xss160k
* @Author: liuhaoeric
* Create time: 2019/10/17
* Description:
*/
public class JavaVMStackSOF {
private int stackLength = 1;

public void stackLeak() {
stackLength++;
stackLeak();
}

public static void main(String[] args) throws Throwable {
JavaVMStackSOF oom = new JavaVMStackSOF();
try {
oom.stackLeak();
} catch (Throwable e) {
System.out.println("stack length:" + oom.stackLength);
throw e;
}
}

}

会出现异常信息
Exception in thread "main" java.lang.StackOverflowError
at com.ericliu.practice.toy.jvm.oom.JavaVMStackSOF.stackLeak(JavaVMStackSOF.java:13)
......

单线程的情况会出现StackOverflowError,通过不断创建线程可能会耗尽内存导致Stack的OOM,这种情况可以通过减少线程或者增大堆内存,或者减少每个线程的内存。

多线程会出现如下异常

1
Exception in thread"main"java.lang.OutOfMemoryError:unable to create new native thread

方法区和常量池溢出

1.6执行如下代码,因为1.7采用了MetaSpace所以失效了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* -XX:PermSize=10M -XX:MaxPermSize=10M
* @Author: liuhaoeric
* Create time: 2019/10/17
* Description:
*/
public class RuntimeConstantPoolOOM {
public static void main(String[] args) {
//使用List保持着常量池引用,避免Full GC回收常量池行为
List<String> list = new ArrayList<>();
// 10MB的PermSize在integer范围内足够产生OOM了
int i = 0;
while (true) {
list.add(String.valueOf(i++).intern());
}
}
}

出现PermGen Space

本机直接内存溢出

可以通过DirectMemory容量可通过-XX:MaxDirectMemorySize指定,如果不指定,则默认与Java堆最大值(-Xmx指定)一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* VM Args:-Xmx20M -XX:MaxDirectMemorySize=10M
*/
public class DirectMemoryOOM {

private static final int MB = 1024 * 1024;

public static void main(String[] args) throws Exception {
Field unsafeField = Unsafe.class.getDeclaredFields()[0];

unsafeField.setAccessible(true);
Unsafe unsafe = (Unsafe) unsafeField.get(null);

while (true) {
unsafe.allocateMemory(MB);
}
}
}

异常:
Exception in thread"main"java.lang.OutOfMemoryError
at sun.misc.Unsafe.allocateMemory(Native Method)
at org.fenixsoft.oom.DMOOM.main(DMOOM.java:20)

特征是dump的文件很小。如果发现dump的文件很小同时用了nio可以考虑是这方面的原因。

Redis-源码理解

Posted on 2019-10-11 | Edited on 2022-09-21 | In redis

RedisObject

所有的Redis对象都有如下的头信息

1
2
3
4
5
6
7
struct ReidsObject{
int4 type; //类型 4bit
int4 encoding; //编码方式 4bit
int24 lru; //lru时间戳 24bit
int32 refcount; //引用计数如果为0回收 4byte
void *ptr; //指针 8byte
}robj

每一中RedisObject的数据类型对应一个type,但是根据情况可能对应多个encoding

字符串的原理

Redis的字符串叫SDS格式如下

1
2
3
4
5
6
7

struct SDS<T>{
T capacity; //容量 1byte
T len; //实际长度 1byte
byte flags; //特殊标志位 1byte
byte[] content; //数组长度
}

redis的字符串支持修改,为了减少修改的成本会预先申请capacity个长度的byte数组,当数组写入字符串的时候根据len定位。好处如下:

  • 当发生追加等操作时候不需要申请新的数组,copy原数组
  • 获取字符串长度时候只需要访问len变量即可,不需要遍历数组
  • len和capacity使用泛型T可以针对长度将变量设置为byte short。对对象用到了极致

    redis的字符串在小于44字节的时候encoding是embstr,大于44字节的encoding是raw。如果字符串是embstr,分配内存往往RedisObject和SDS是紧挨着的;如果是raw内存是不连续的;为什么是44个字节,原理如下:

  • 为什么是44个字节
    • SDS除content最少占用ReidsObject占16byte+SDS的3byte(假设这时候content为空)=19byte
    • 由于redis的内存管理工具分配方式是8/16/32/64,由于头文件占据了19空间,所以至少分配32字节。当content+redisObj需要分配64字节,那么content最多存储45字节,减去最后的空字符NULL,剩余44字节。
  • 超过这个阈值Redis会认为对象是个大字符串适合用raw去存

dict

Redis的hash结构,所有的key-value是一个全局的dict,以及设置了expire的key-value,另外zset的value–score的对应关系也是通过字典结构。

1
2
3
4
5
6
7
8
9
struct RedisDb{
dict* dict;
dict* expires;
}

struct zset{
dict *dict // value=>score
zskiplist *zsl
}

dict采用的rehash策略是渐进式的rehash,防止hash过大rehash影响主流程。采用的hash算法是siphash

扩容:当元素等于dict数组长度时候会扩容,扩容为原来的2倍,如果遇到bgsave尽量不会去扩容,但是如果当元素打到dict数组的5倍会进行强制扩容
缩容:当元素低于dict数组长度10%会缩容。缩容不会考虑bgsave。

ziplist

1
2
3
4
5
6
7
8
9
10
11
12
13
struct ziplist<T>{
int32 zlbytes; //压缩表占用自己数
int32 zltail_offset; //最后一位偏移量用于到这遍历
int16 length; //长度
T[] entries; //元素列表
int8 zlend; //压缩列表的结束标记,值:0xFF
}

srtuct entry{
int<var> prevlen; //前一个长度
int<var> encoding; //元素类型编码 小于254时候一个字节,大于254时候5个字节第一个自己是254(0xFE)
optional byte[] contennt
}

intset

当集合set是整数且个数较小时候会考虑使用intset

1
2
3
4
5
struct intset<T>{
int32 encoding;
int32 length;
int<T> content;
}

quicklist

list采用这种数据结构是ziplist和linkedlist的整合,将linkedlist按照ziplist分段,时内存更紧凑,每一段在用链表链接起来。

skiplist

zset采用这种方式,优点:快速定位.它的层数是2的64次方。
zset的rank的排名是通过zslforward的span对象计算出来的每次插入的时候都会维护这个值,表示当前层到这个节点跳过了多少节点

Redis-Key的过期策略

Posted on 2019-10-11 | Edited on 2022-09-21 | In redis

key的过期策略

主动过期

为key设置expire。由于redis是单线程的如果对于过期的key扫描的过多会影响服务的卡顿,所以redis提供了惰性过期和主动扫描过期俩种方案。

扫描机制,redis会把设置过期时间的key放在内存中,key的扫描过期机制如下:

  1. 为了防止频繁扫描,redis每秒扫描10次过期的key,明且每次扫描建个是25ms
  2. 每次扫描随机取20个key
  3. 判断20个key是否过期,并且回收过期key
  4. 如果过期的key占1/4会重复回收

    如果这段时间有大量的key过期,势必会增加扫描频次,同时由于redis的内存管理对数据页的回收,会导致CPU飙升,服务卡顿。

    注意:salve没有扫描机制,master过期后会生成一条del的命令给slave执行(如果这时候宕机可能造成数据不一致)。

LRU

redis本质是内存数据库当内存占用满后,根据maxmemory-policy配置,会有如下几种策略

  • noeviction:默认策略,直接拒绝写服务(del后会执行),不会丢失数据
  • volatile-lru:对设置expire的key进行淘汰。策略是lru
  • volatile-ttl:对设置expire的key进行淘汰。策略是ttl
  • volatile-random:对设置expire的key进行淘汰。策略是随机
  • allkeys-lru:对所有的key进行淘汰。策略是lru
  • allkeys-random:对所有的key进行淘汰。策略是随机

    redis为了节省内存采用了近似lru算法的淘汰机制,是一种懒惰淘汰机制,即当写入操作发现内存已经超过了maxmemory时候,根据maxmemory-policy去allkeys和设置了过期时间的keys中随机取5个key淘汰掉最旧的key。

异步线程

懒惰删除

redis如果删除一个key正好这个key是大key,会阻碍业务的执行。redis4.0增加了unlink命令可以将key放到异步队列中由异步线程去消费。

AOF的异步刷盘

由于AOF需要调用sync,会影响业务。所以AOF的刷盘也会放到一个异步队列和线程中。为了保证效率是独立的一个线程处理

redis-运维

Posted on 2019-10-10 | Edited on 2022-09-21 | In redis

Redis的监控命令和几个重点指标

info < seciton >命令

查看qps

查看redis的qps【info stats】,instantaneous_ops_per_sec 这是qps
查看具体的key redis-cli monitor

查看客户端

info clients

1
2
3
4
5
# Clients
connected_clients:1
client_longest_output_list:0
client_biggest_input_buf:0
blocked_clients:0

拒绝连接数如果过多应该开打链接
info stats

1
rejected_connections:0

查看内存

info memory

1
2
3
4
used_memory_human:796.36K //使用
used_memory_rss_human:1.26M //top的数值
used_memory_peak_human:877.38K //峰值
total_system_memory_human:251.67G

复制挤压缓冲器

info replication

1
2
3
4
5
6
7
role:master
connected_slaves:0
master_repl_offset:0
repl_backlog_active:0
repl_backlog_size:1048576 //挤压缓冲区大小
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0

info stats

1
2
3
sync_full:0
sync_partial_ok:0
sync_partial_err:0 //半同步失败次数如果过多需要增大挤压缓冲区大小

安全相关

指令安全:禁止危险命令

端口安全:不要暴露到公网,同时设置bindIp或者密码,注意设置密码从库也需要密码才能复制

脚本安全:禁止客户输入lua脚本

Redis-集群

Posted on 2019-10-10 | Edited on 2022-09-21 | In redis

Redis的集群遵循的是CAP理论中的AP,因为Redis是出现网络分区的时候,主依然提供写服务,从会采用多中方式追赶主

主从复制和从从复制

Redis的主从复制、从从复制是异步复制,从从复制是为了减少主的压力

增量复制

Redis主会将涉及到数据修改的命令写入到Buffer中,然后异步传给从库,从库执行命令并且返回偏移量给主,保证俩边的数据一致,buffer采用环状结构,如果buffer满了会覆盖原来的数据。如果出现还没有执行就被覆盖的情况会触发快照复制

快照复制

快照复制很耗时。过程如下

  • 主先bgsave将快照写入到磁盘。
  • 同步快照给从
  • 从在接收完快照后,先持久化,在Load快照
  • 从库开始执行这段时间的增量操作。

    如果这段时间buffer又被覆盖了,则继续快照复制,如此恶性循环,所以要选择一个合适的复制buffer

    快照复制前会持久化,如果这时候正好需要aof的fsync,则会推迟fsync,影响主的业务。

    新加入的从库都需要先执行一遍快照复制。

    Redis3.0针对这种情况采用了无盘复制的思路,遍历内存不持久化,直接传递给从库,从库如上面的过程先持久化在load,避免了主库的磁盘IO操作

wait命令

可以将异步复制改为同步复制,由AP变为CP

redis-sentinel 哨兵机制

redis在3.0提供sentinel服务,进行节点的healthCheck。

客户端通过连接sentinel获取当前的master信息,客户端连接Master。

如果发现master挂了,哨兵会选举新的Master,同时集群中其他的slave会和新的master开始主从同步任务。当原来的master恢复会变为slave。

注意:实际上senintel的主从切换是客户端做的。

数据切片

Codis

codis-proxy:做请求代理
codis-server:服务存储

将数据分成1024个slot,数据写入时候proxy会对key做crc32求hash,然后根据槽取模。槽的映射关系会交给zookeeper和etcd这种第三方组件来维护。

槽的迁移,当发现请求的key正在迁移时候会强制key进行迁移之后在去新的槽去请求。

支持自动平衡。

缺点:因为不是官方的方案,所以支持新的特性比较困难,且不支持事物等命令。

Cluster

redis官方集群方案。

将数据分为16384个slot,每个节点都保留一份slot的配置信息,并且客户端链接时候会获取一份Slot信息,这部分信息持久化到redis-cluster中。

slot的跳转:当客户端发现请求的数据已经迁移后客户端会收到MOVED < newSlotNum > < ip:port >可以更新客户端的配置,然后去新的server去获取。

slot的迁移:迁移是以slot为单位进行迁移,当访问到迁移的key时候流程如下:

  • 想srcNodeAddr请求,如果key在srcNodeAddr进行操作
  • 如果不在srcNodeAddr,返回-ASKING tragetNodeAddr
  • 向tragetNodeAddr发送不带参数的ASKING节点
  • 返回OK,去tragetNodeAddr getKey (目的是ASKING之后告诉tragetNodeAddr下一条指令不能不处理,因为在迁移中理论上tartNode不负责这个key)

    redis的key如果是大Key,因为迁移指令是阻塞,会影响到服务。

    redis的集群管理是通过Gossip协议的。当一个节点发现另一个节点失联(PFail),它会广播给集群,然后当集群中其他节点收到的该节点失联事件过半数(PFail Count)时候,就标记该节点下线。

    另外,redis-cluster的槽位迁移、节点切换都是在客户端做的。

分布式锁的不安全性

由于主节点宕机客户端无感知,所以当客户端从主节点获取锁后,未同步到从节点这时候发生宕机。锁数据未同步成功发生了主从切换,锁是失效的,如果业务不容忍可以采用redlock机制。

Redis-基本原理

Posted on 2019-10-09 | Edited on 2022-09-21 | In redis

redis是单线程的,IO模型采用多路复用

redis的线程模型

redis的io是单线程多路复用。

为客户端关键字关联一个请求队列和响应队列,用于处理请求和响应。

定时任务采用最小堆的方案,俩个定时任务的间隔时间就是select(timeout)的timeout

redis的备份

快照用于全量备份,AOF是命令重放用于增量备份

快照的备份原理CoW

由于redis的快照备份需要用到磁盘IO是没有NIO的,为了不影响业务的正常请求,redis的磁盘备份采用了系统的COW。方法如下

  1. 父进程调用glibc,fork出一个子进程,这时候子进程和父进程会同时返回。父进程pid大于0,子进程pid等于0,如果小于0说明没有资源,由于是fork出的子进程,俩者共享代码段和数据段
  2. 父进程依然处理正常请求,子进程处理快照的持久化
  3. 对于写的请求,会用到系统的cow。父进程在写的时候会将数据页复制一份,在复制的这份里进行修改,子进程对应的数据页不做变更,在fork出来的那一刻就固定下来了

注意,随着父进程写请求的增多,内存会持续增长,不过也不会超过原数据段的2倍。

AOF

随着日积月累,AOF的体积会逐渐增大,一旦重放会导致redis长期无法对外提供服务

瘦身:bgrewriteaof,

  1. 开辟一个新的进程遍历内存的数据,转换成一些列的指令,写到新的aof文件中
  2. 新增的aof指令追加的这个文件中
  3. 替换原文件完成瘦身

    fsync,redis写aof文件是些到一个内核的内存中,通过调用fsync刷到磁盘中,如果这段时间服务器宕机会丢失数据。redis采用glibc的fsync(int fd)方法每隔一段时间刷盘一次。(每次命令都刷盘会降低吞吐)

运维

由于aof的fsync和快照的大块写文件都影响性能,建议持久化放在从库中做,为了防止磁盘分区还建议有多个从库。

pipeline和事物

pipeline简称管道,为了减少网络的开销,piplline可以一次提交多条指令给redis一起执行,本质上是client的行为。

事物:redis支持事物,但是无法解决事物的原子性,只能保证事物的隔离性。redis事物的隔离性是通过管道来实现的

为了节省性能,redis的事物往往配合pipeline一起使用。

事物命令
multi、exec、discard

pubsub

redis自身的消息队列,缺点很明显:消息不能持久化,发送的消息只能被当时在线的消费者消费,如果消费者宕机是无法消费消息的。这个特性可以做服务发现。后续5.0提供了stream来取代它

支持模式订阅,message订阅

对象的压缩

如果小于4G可以采用32位进行编译,内存会少一半

zipList

压缩列表,redis在zset、hash、intset这些数据结构存储数据时候在一定阈值下回采用ziplist来存储。

ziplist实际上是数组的一个变种,有点是:元素长度不固定节省空间,内存地址连续对缓存友好。

zset:元素个数不超过128,k/v长度不超过64

hash、list:元素个数不超过512,k/v长度不超过64

set:不超过512,数据是intset时候,会进行升级,随着元素的增大会从uint16–>uint32–>unint64

内存管理

redis是以页为单位回收内存的,如果该数据页有一个key,也不会被回收。但是可以被重复使用,这点和mysql很类似

redis的内存管理采用第三方库默认是jemallloc

Redis-基础数据结构

Posted on 2019-10-08 | Edited on 2022-09-21 | In redis

string

redis所有数据的基础,采用预分配进行数据的分配当数据小于1MB时候,成倍扩容,大于1MB每次扩容1倍。最大512MB。

int类型支持计数

list

quicklist的数据结构(ziplist+双向链表)

hash

类似于Java的hashmap,采用数组+链表方式,数据量小采用ziplist。

rehash采用渐进式

set

底层采用一个特殊的字典来维护,字典的值都为NULL

zset

(skiplist or ziplist)+hash

bitmap

  • setbit key offset value【0 or 1】 set某一位的bit值

  • getbit key offset 返回当前位的值

  • bitcount key [start end] 返回start-end值为1的bit的数量。start-end必须是8的倍数

  • bitpos key [true false] 返回第一个true or false的值。

hyperHyperLog

  • pfadd key value 不同的才会加入到列表中
  • pfcount key 查看key的值

用于统计网站的uv,大小是12kb,缺点是有误差

bloomfilter

redis4.0以后提供插件支持

  • bf.add
  • bf.exists

重复并且存在误判。

Reids的限流–redis-cell

redis4.0后提供的限流模块

cl.throttle key capacity opsQuota opssec quota-pre-action

返回值
0 or 1 –结果0允许,1不允许
capacity–漏斗容量
left_quota–剩余容量
next_gen_quota_sec–下一次产生多少令牌的事件
maxQuotaSec–令牌桶满,下一次capacity=left_quota的时间 单位:秒

Redis的位置服务-GeoHash

 原理是,将地球变为一个平面,划分成一系列方格每个方格有一个唯一编号,方格越近编号越近,将坐标的经纬度映射为一维坐标,放在唯一的方格中,只需要查看俩个方格的距离即可。

坐标会变成一串整数编码越长越精确,这些整数编码能还原成坐标。getHash会对这个整数做一次base32。

redis的坐标编码是52位,且底层采用zset。value是key,score是哪个52位的编码,我们查询附近的坐标只需要按照score查询即可

命令

  • geoadd key 精度 纬度 value 添加
  • geodist key value1 value2 单位 查询value1和value2的距离
  • geoops key value 获取value的坐标
  • geohash key value 获取hash值可以计算出经纬度
  • georediusbymember key value 20 km [withcoord withdist withhash] count 3 asc 查找value半径20km的其他坐标【包括自己】 按照距离升序排列
  • georedius key value x y 20 km [withcoord withdist withhash] count 3 asc 查找坐标点【x、y】半径20km的其他坐标【包括自己】 按照距离升序排列

RoketMq源码学习-七-事务的实现

Posted on 2019-09-22 | Edited on 2022-09-21 | In 消息队列 , rocketmq , 源码学习

RocketMq分布式事务采用的是XA协议。其中的TransactionManager由Broker担任。

原理

  1. producer发送half-Message(prepare)给broker。
  2. 发送成功后,producer侧开始执行本地事务,sendResult会包含transactionId和本地事务进行绑定,之后broker反查消息需通过这个ID来处理事物。(用户实现)
  3. 本地事务执行成功后,发送commit或者rollback状态给broker。
  4. Broker确认是否收到了Producer发送的commit或者rollback的消息
    1. Broker收到消息
      1. 如果收到的是commit,认为事务提交成功,交由consumer处理
      2. 如果收到的是rollback,认为事务回滚,consumer看不到本条消息。broker删除half-Message。(rocketmq会先将消息发送到一个事务专用的topic中QueueId为0,然后如果commit成功会将消息移到real_topic中,消费者订阅的是real_topic这时候就能看到消息了,如果rollback是不会移动的消费者就看不到这条消息)
    2. Broker没收到消息
      1. borker定时回查本地事务,如果本地事务已经执行返回commit;如果本地事务未执行rollback;(用户实现)
        1. commit:返回commit,说明事务已经提交,consumer进行执行
        2. rollback:回滚:
        3. unknown:一直会差到成功为止
  5. 事务的消费,和传统消息消费没什么区别,不过要注意,rocketmq的消费者侧的本地事务如果失败了,需要自行解决数据一致性,毕竟rocketmq整体事务回滚代价太大了

官方Demo

发送的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//事务的监听器,本地事务在这里进行
TransactionListener transactionListener = new TransactionListenerImpl();

//生产者采用TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");

//处理broker会查本地事务的线程池
//producer接到broker发送的RequestCode.CHECK_TRANSACTION_STATE,异步来会差处理本地事务的状态,并且返回给borker
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();

String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//send half message给broker
//如果sendResult是sendOk会开始执行transactionListener.executeLocalTransaction逻辑,即本地事务。
//然后根据localTransactionState发送请求给RequestCode.END_TRANSACTION,broker来选择事务是否完成
SendResult sendResult = producer.sendMessageInTransaction(msg, null);

Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}

for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}

源码解析

Producer

TransactionMQProducer关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class TransactionMQProducer extends DefaultMQProducer {

//用于borker回查事务消息用
private ExecutorService executorService;
//监听器
private TransactionListener transactionListener;

@Override
public void start() throws MQClientException {
//初始化事务相关的环境具体见小面代码
this.defaultMQProducerImpl.initTransactionEnv();
super.start();
}

@Override
public void shutdown() {
super.shutdown();
//停止事务
this.defaultMQProducerImpl.destroyTransactionEnv();
}


@Override
public TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
//发送事务类型的消息
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
}

defaultMQProducerImpl的相关代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
public void initTransactionEnv() {
//TransactionMQProducer中的defaultMQProducerImpl持有的对象只可能是TransactionMQProducer
TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
if (producer.getExecutorService() != null) {
this.checkExecutor = producer.getExecutorService();
} else {
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
this.checkExecutor = new ThreadPoolExecutor(
producer.getCheckThreadPoolMinSize(),
producer.getCheckThreadPoolMaxSize(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.checkRequestQueue);
}
}

public void destroyTransactionEnv() {
if (this.checkExecutor != null) {
this.checkExecutor.shutdown();
}
}

/**
* 1.发送half-message消息sendOk执行本地任务
* 2.执行完本地事务通过endTransaction消息通知borker事务结果
* 3.封装TransactionSendResult返回
* @param msg half message
* @param localTransactionExecuter 已经不在需要了传空就好了,5.0.0会用producer的transactionListener替代
* @param arg 消息参数
* @return TransactionSendResult 返回执行结果
* @throws MQClientException 异常
*/
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
Validators.checkMessage(msg, this.defaultMQProducer);

SendResult sendResult = null;
//为Message设置相关Tranaction的属性,根据PROPERTY_TRANSACTION_PREPARED属性broker知道是half-message
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
//发送half-message
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
//发送结果返回了transactionId,将结果写入到Property。暂时没用到感觉,而且sendResult.getTransactionId为空
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
//发送消息调用setUniqID时候生成的唯一ID作为事务ID:业务方通过这个事务ID和本地事务绑定处理反差逻辑
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}

//不用关心5.0.0会删掉
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
//处理本地事物返回localTransactionState
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}

if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}

//给borker发送本地事物结束的request
try {
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}

TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}

public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
//通过half-message获取MessageId,优先取OffsetMsgId:消息追加到磁盘时候生成的ID16byte,包含【来源地址+msg在文件中的偏移量】)
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}

//为空
String transactionId = sendResult.getTransactionId();
//查找borker地址
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());

//构建EndTransactionRequest请求
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
//设置Message的标记是回滚还是commit
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}

requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}

broker

主要有3个部分

接收半消息消息

处理half-message的入口在SendMessageProcessor.processRequest()方法中,前面介绍消息发送时候介绍过这里不在过多赘述,最终我们跟到SendMessageProcessor.sendMessage方法中,这里在处理发送的事件时候会判断消息的PROPERTY_TRANSACTION_PREPARED标记是否为true如果为true。会调用brokerController.getTransactionalMessageService().prepareMessage,具体如下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
......
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
//这里就是重点
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
......
}
....
}

继续跟下去:brokerController.getTransactionalMessageService中TransactionalMessageService的实现类TransactionalMessageServiceImpl的prepareMessage方法

1
2
3
4
@Override
public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
return transactionalMessageBridge.putHalfMessage(messageInner);
}

继续:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
return store.putMessage(parseHalfMessageInner(messageInner));
}

//处理preparedMessage
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
//RMQ_SYS_TRANS_HALF_TOPIC,queueId始终为0
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}

回查half-message

brokerController在启动时候,会启动一个检查线程定期林旭RMQ_SYS_TRANS_HALF_TOPIC这个Topic中是否有过期的half-message,然后对这个half-message的producer发送RequestCode.CHECK_TRANSACTION_STATE的请求

过程和代码如下:

  1. brokerController在initialTransaction初始化TransactionalMessageCheckService
  2. brokerController在start时候调用TransactionalMessageCheckService的start方法启动线程
  3. TransactionalMessageCheckService每隔1分钟轮询一次RMQ_SYS_TRANS_HALF_TOPIC这个topic,发现过期half-message后通知producer。

TransactionalMessageCheckService关键代码

1
2
3
4
5
6
7
8
9
10
@Override
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
//检查过期事务
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

继续跟TransactionalMessageServiceImpl.check

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@Override
public void check(long transactionTimeout, int transactionCheckMax,
AbstractTransactionalMessageCheckListener listener) {
try {
//Half-message topic
String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
log.info("Check topic={}, queues={}", topic, msgQueues);
//有Half-message,检查如果有消息过期默认是6sec,省略大部分校验过程
for (MessageQueue messageQueue : msgQueues) {
long startTime = System.currentTimeMillis();
MessageQueue opQueue = getOpQueue(messageQueue);
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
......

List<Long> doneOpOffset = new ArrayList<>();
HashMap<Long, Long> removeMap = new HashMap<>();
//
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
......
// single thread
int getMessageNullCount = 1;
long newOffset = halfOffset;
long i = halfOffset;
while (true) {
......
if (removeMap.containsKey(i)) {
......
} else {
//读取
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();
if (msgExt == null) {
.....
}
List<MessageExt> opMsg = pullResult.getMsgFoundList();
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
|| (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
|| (valueOfCurrentMinusBorn <= -1);
if (isNeedCheck) {
//给msgExt赋值half-message的msgId等属性
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
//处理half-message
listener.resolveHalfMsg(msgExt);
} else {
......
}
}
newOffset = i + 1;
i++;
}

......
if (newOffset != halfOffset) {
transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
if (newOpOffset != opOffset) {
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
}
} catch (Exception e) {
e.printStackTrace();
log.error("Check error", e);
}

}

继续跟进listener.resolveHalfMsg中去

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void resolveHalfMsg(final MessageExt msgExt) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
sendCheckMessage(msgExt);
} catch (Exception e) {
LOGGER.error("Send check message error!", e);
}
}
});
}

public void sendCheckMessage(MessageExt msgExt) throws Exception {
CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
//注意下这个MsgId就是MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,分布式任务开始时,发送half-message设置
checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());

checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0);
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId);
if (channel != null) {
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
} else {
LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
}
}

prouder处理RequestCode.CHECK_TRANSACTION_STATE

  1. produer的ClientRemotingProcessor会调用 this.checkTransactionState(ctx, request);
  2. this.checkTransactionState(ctx, request);会调用producer.checkTransactionState(addr, messageExt, requestHeader);
  3. producer.checkTransactionState(addr, messageExt, requestHeader);处理事务的逻辑代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@Override
public void checkTransactionState(final String addr, final MessageExt msg,
final CheckTransactionStateRequestHeader header) {
//用线程池去处理
Runnable request = new Runnable() {
private final String brokerAddr = addr;
private final MessageExt message = msg;
private final CheckTransactionStateRequestHeader checkRequestHeader = header;
private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();

@Override
public void run() {
......
TransactionListener transactionListener = getCheckListener();
if (transactionCheckListener != null || transactionListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
if (transactionCheckListener != null) {
......
} else if (transactionListener != null) {
log.debug("Used new check API in transaction message");
//...业务自己处理检查事务状态
localTransactionState = transactionListener.checkLocalTransaction(message);
} else {
log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
}
} catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
exception = e;
}

//内部方法
this.processTransactionState(
localTransactionState,
group,
exception);
} else {
log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
}
}

private void processTransactionState(
final LocalTransactionState localTransactionState,
final String producerGroup,
final Throwable exception) {
final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
thisHeader.setProducerGroup(producerGroup);
thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
thisHeader.setFromTransactionCheck(true);

//取消息的msgID,优先取事务中的MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,如果为空会取,发送来的MsgId
String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqueKey == null) {
uniqueKey = message.getMsgId();
}

thisHeader.setMsgId(uniqueKey);
//这个值和最开始发送办消息的message是一样的见上面
thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
switch (localTransactionState) {
case COMMIT_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
log.warn("when broker check, client rollback this transaction, {}", thisHeader);
break;
case UNKNOW:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
break;
default:
break;
}

String remark = null;
if (exception != null) {
remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
}

try {
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
3000);
} catch (Exception e) {
log.error("endTransactionOneway exception", e);
}
}
};

this.checkExecutor.submit(request);
}

处理EndTransaction

在本地事务完成或者检查后都需要给broker发送RequestCode.END_TRANSACTION信息,broker在接到请求流程是

EndTransactionProcessor的processRequest处理具体请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader =
(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
LOGGER.info("Transaction request:{}", requestHeader);
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
return response;
}

//请求来自于TransactionCheck,打印log
if (requestHeader.getFromTransactionCheck()) {
......
} else {
......
}

OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
//提交事务
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
//校验halfmessage的Offset和Produceguroup的信息等
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
//重点,将消息移到真实的Topic和QueueId,这样consumer才能看到
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
//重置标记等信息
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
RemotingCommand sendResult = sendFinalMessage(msgInner);
//删除halfmessage
if (sendResult.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
//回滚事务
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}

JAVA-NIO-ByteBuffer

Posted on 2019-09-16 | Edited on 2022-09-21 | In java , nio

bytebuffer

属性

  • position:下一次读取或写入的位置。
  • limit:指定还有多少数据需要取出(在从缓冲区写入通道时),或者还有多少空间可以放入数据(在从通道读入缓冲区时)。
  • capacity:指定了可以存储在缓冲区中的最大数据容量,实际上,它指定了底层数组的大小,或者至少是指定了准许我们使用的底层数组的容量。
  • mark:标记位配置mark和reset方法

初始化方法

  • allocateDirect:堆外内存
  • allocate:堆内内存

几个重要的方法介绍

compact-压缩缓冲区,将未读的数据放在缓冲区头部

将position到limit的数据复制到0到limit-postion中,同时position等于limit-postion,limit等于capacity。一般用于继续写。

例如:postion=1,limit=10,capacity=10,经过compact后,position=9,limit=10,capacity=10

clear

position为0,limit=capacity,一般在写之前调用,buffer可以从头开始写。

flip

limit变为当前的position,用于读取,一般在读取buffer之前调用,可以返回0-limit之间的数据。

rewind

读写都可以用,只是将position置为0,mark置为-1。表示可以重新读写。

mark和reset

mark:指定一个特定的position一个标记位
reset:在mark指定的标记位开始进行操作

slice

slice() 方法根据现有的缓冲区创建一个子缓冲区,俩者共享数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ByteBuffer orgBuffer = ByteBuffer.allocateDirect(1024);
orgBuffer.put("liuhao".getBytes());
orgBuffer.flip();

ByteBuffer newBuffer = orgBuffer.slice();
System.out.println("newBuffer init slice =" + newBuffer);

System.out.println("============");
byte[] stringNew = new byte[newBuffer.limit()];
newBuffer.get(stringNew);
System.out.println("read new buffer " + new String(stringNew));//和orgBuffer读出来的一致

//对newBuffer做了修改,影响了orgBuffer
newBuffer.clear();
newBuffer.put("lzh".getBytes());
byte[] stringOrg = new byte[orgBuffer.limit()];
orgBuffer.get(stringOrg);
System.out.println("read org buffer " + new String(stringOrg));

RoketMq源码学习-六-Offset的存储

Posted on 2019-09-16 | Edited on 2022-09-21 | In 消息队列 , rocketmq , 源码学习

Consumer维护OffSet的接口是:OffsetStore,有俩个实现类

  • RemoteBrokerOffsetStore:消息模式是Cluster会用到它
  • LocalFileOffsetStore:消息模式是BoardCast会用到它

我们以RemoteBrokerOffsetStore为例来看读取offset的顺序

RemoteBrokerOffsetStore读取offset的顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
if (mq != null) {
switch (type) {
case MEMORY_FIRST_THEN_STORE:
case READ_FROM_MEMORY: {
AtomicLong offset = this.offsetTable.get(mq);
if (offset != null) {
return offset.get();
} else if (ReadOffsetType.READ_FROM_MEMORY == type) {
return -1;
}
}
case READ_FROM_STORE: {
try {
long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
AtomicLong offset = new AtomicLong(brokerOffset);
this.updateOffset(mq, offset.get(), false);
return brokerOffset;
}
// No offset in broker
catch (MQBrokerException e) {
return -1;
}
//Other exceptions
catch (Exception e) {
log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
return -2;
}
}
default:
break;
}
}

return -1;
}

当需要从broker读取的时候调用fetchConsumeOffsetFromBroker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
//获取broker from memory
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {

this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}

//sendBroker get Offset
if (findBrokerResult != null) {
QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());

//从broker取consumerOffSet
return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}

public long queryConsumerOffset(
final String addr,
final QueryConsumerOffsetRequestHeader requestHeader,
final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader);

//发送同步请求,这时候当前线程会阻塞
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
QueryConsumerOffsetResponseHeader responseHeader =
(QueryConsumerOffsetResponseHeader) response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);

return responseHeader.getOffset();
}
default:
break;
}

throw new MQBrokerException(response.getCode(), response.getRemark());
}

broker处理QUERY_CONSUMER_OFFSET请求的逻辑

从ConsumerQueue中获取offset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
//创建响应
final RemotingCommand response =
RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
//响应头
final QueryConsumerOffsetResponseHeader responseHeader =
(QueryConsumerOffsetResponseHeader) response.readCustomHeader();
//请求头
final QueryConsumerOffsetRequestHeader requestHeader =
(QueryConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);

long offset =
this.brokerController.getConsumerOffsetManager().queryOffset(
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());

//如果>=0
if (offset >= 0) {
responseHeader.setOffset(offset);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} else {
long minOffset =
this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
requestHeader.getQueueId());
//如果小于等于0,并且创建ConsumerQueue成功
if (minOffset <= 0
&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
responseHeader.setOffset(0L);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} else {
//错误
response.setCode(ResponseCode.QUERY_NOT_FOUND);
response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");
}
}

return response;
}


public long getMinOffsetInQueue(String topic, int queueId) {
//getAndCreateQueue
ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
//找到队列里最小的offsetminLogicOffset / CQ_STORE_UNIT_SIZE
return logic.getMinOffsetInQueue();
}

return -1;
}

public ConsumeQueue findConsumeQueue(String topic, int queueId) {
ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
//防止并发
ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
} else {
map = newMap;
}
}

ConsumeQueue logic = map.get(queueId);
if (null == logic) {

ConsumeQueue newLogic = new ConsumeQueue(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
this);

//防止并发
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
} else {
logic = newLogic;
}
}

return logic;
}
1…101112…23

Liu hao

励志当好厨子的程序员

229 posts
54 categories
81 tags
RSS
GitHub E-Mail
© 2018 – 2023 Liu hao
Powered by Hexo v3.9.0
|
Theme – NexT.Pisces v7.0.0