golang源码学习-sync.Map

sync.Map

位于sync包中,主要解决map的线程安全的问题,适用于读多写少的场景。

sync.Map的原理

内部持有俩个map,一个是read,类型是atomic.Value实际类型是【map[interface{}]entry】,一个是dirty类型是map【[interface{}]entry】。

其中read主要解决无数据竞争的情况下数据的快速访问,它通过cas进行快速的读写操作;一旦出现数据的竞争,就会用到dirty,dirty里面保存read里所有非nil的值【通过状态来表示unexpunged】,当出现竞争后数据会写到dirty而不是read中。

数据的访问路径大致是:

  • 读,先从read里找,read里没有,去dirty里找,如果miss过一定的阈值【dirty的长度】时候,将dirty和read交换,交换后dirty置为nil;
  • 写,先判断key是否存在,如果存在且不为expunaged,先通过cas写快速返回,否则有如下分支:

    • 如果key是expunaged,说明key之前被删除了,但是dirty没有,unexpunaged之后同步修改read和dirty
    • 如果key不存在于read,但是存在于dirty修改dirty
    • 如果key不存在与read和dirty,初始化dirty【如果需要】,数据写入dirty

    因为数据在并发写的时候一旦发生竞争还是会用到锁,并发写的时候的锁是不可避免的。所以sync.Map适用于读多写少数据冲突不那么复杂的场景

sync.Map的源码分析

结构体分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
type Map struct {
mu Mutex //锁 用来保护read、dirty的并发控制

// read包含部分map的内容,他是atomic.Value类型的用来解决并发访问的安全的问题,在读的场景不需要加锁,而在写的场景需要mu的控制
read atomic.Value // readOnly

// dirty访问要在mu的保护下进行,他包含所有的readOnly里non-expunged的数据
// 在read中标记为Expunged的entries不会被存储在dirt中,如果一个在read中存在的key被标记为expunged,他需要先unexpunged在保存在dirty中
// 如果dirty是nil,当下一次修改map的时候需要初始化dirty,初始化的方式是将read的不为exunped的value都copy到dirty中
dirty map[interface{}]*entry

// 计数器,当从read读取数据时候miss回增加该值,打到一定的阈值,即miss>=len(dirty)的时候回触发dirty和read的互换
misses int
}


// read中的值
type readOnly struct {
m map[interface{}]*entry
amended bool // 一个标记为,默认是false,true说明readOnly的m和dirty已经同步过了,即dirty不为nil了,在Store操作中,dirtyLocked函数调用后会置为true
}


type entry struct {
// 封装了指向value值的指针
//
// If p == nil, the entry has been deleted and m.dirty == nil.
//
// If p == expunged, the entry has been deleted, m.dirty != nil, and the entry
// is missing from m.dirty.
//
// Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty
// != nil, in m.dirty[key].
//
// An entry can be deleted by atomic replacement with nil: when m.dirty is
// next created, it will atomically replace nil with expunged and leave
// m.dirty[key] unset.
//
// An entry's associated value can be updated by atomic replacement, provided
// p != expunged. If p == expunged, an entry's associated value can be updated
// only after first setting m.dirty[key] = e so that lookups using the dirty
// map find the entry.
p unsafe.Pointer // *interface{}
}

//标记位当entry.p=expunged说明,该对象在read中被删除,标记了expunged的key不会出现在dirty中。
var expunged = unsafe.Pointer(new(interface{}))

Store

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func (m *Map) Store(key, value interface{}) {
//从read中获取readOnly,如果key存在,通过cas修改不为expunged的值,成功后快速返回
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}

//修改失败通过锁进行鬓发保护
m.mu.Lock()
read, _ = m.read.Load().(readOnly)//双重检查
if e, ok := read.m[key]; ok {
//key存在于read但是状态是expunged,cas修改成nil,同步dirty和readOnly的值,
if e.unexpungeLocked() {
//走到这里说明dirty已经被初始化了,见m.dirtyLocked()里e.tryExpungeLocked()这一步
m.dirty[key] = e
}
//cas保存value
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
//key存在于dirty
e.storeLocked(&value)
} else {
//!read.amended说明dirty为空初始化dirty,将read中不为expunged值写入dirty,见下面
if !read.amended {
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
//修改dirty的值
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}

func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}

read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}

func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := atomic.LoadPointer(&e.p)
for p == nil {
if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
return true
}
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}

Load

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
//优先读取read
e, ok := read.m[key]
if !ok && read.amended {
//key不存在但是dirty已经初始化,这里需要锁来保护并发冲突
m.mu.Lock()
read, _ = m.read.Load().(readOnly)//双重检查
e, ok = read.m[key]
if !ok && read.amended {
//从dirty里取
e, ok = m.dirty[key]
//超过阈值交换dirty和read,并且dirty置为nil,见下面
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
//加载value
return e.load()
}

func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*interface{})(p), true
}

func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}

Delete

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (m *Map) Delete(key interface{}) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
delete(m.dirty, key)
}
m.mu.Unlock()
}
if ok {
e.delete()
}
}

func (e *entry) delete() (hadValue bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return true
}
}
}
`