Go-sarama的遇到的几个坑

sarama是golang对kafka操作的封装,在使用过程中主要遇到如下的问题,总结如下:

kafka的error的处理

kakfa的error处理是通过一个 chan <-error 如果不处理error当发生error的时候会造成协程阻塞,导致协程夯住

这里建议放在另一个协程处理,防止处理error影响了message的处理

1
2
3
4
5
go func() {
for err := range c.consumer.Errors() {
log.Trace.Errorf(context.TODO(), "_kafka_error", "errorError:%v", err.Error())
}
}()

sarma-cluster的作用

这个包最大的用处是autocommit offset

封装按照timestamp和condition消费制定offset的数据

首先,根据key找到partiton,根据timestamp找到offset,然后过滤condition。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// PartitionClientConnector 封装kafka的client
// checkTimoutMills,z在拉取消息时候会启动一个ticker检测拉取是否过期,checkTimoutMills用来控制检测器的频率
// timeOutMills,判断上一次拉取消息和当前时间的间隔超过这个值且consumer处于空闲状态会停止拉取消息
type PartitionClientConnector struct {
client sarama.Client
checkTimoutMills int64
timeOutMills int64
}

// NewPartitionConsumer 构造器
func NewPartitionConsumer(brokers []string, clientID string) (*PartitionClientConnector, error) {
client, err := sarama.NewClient(brokers, newSaramaConfig(clientID, sarama.OffsetOldest))
if err != nil {
return nil, err
}
return &PartitionClientConnector{
client: client,
checkTimoutMills: 1000,
timeOutMills: 5000,
}, nil
}

func newSaramaConfig(clientID string, initialOffset int64) *sarama.Config {
config := sarama.NewConfig()
config.Version = sarama.V0_10_2_0
config.Consumer.Offsets.CommitInterval = time.Second
config.Consumer.Offsets.Initial = initialOffset
config.Consumer.Return.Errors = true
config.ClientID = clientID

//sasl
if len(conf.ServiceConfig.KafkaConf.SaslUser) > 0 && len(conf.ServiceConfig.KafkaConf.SaslPassword) > 0 {
config.Net.SASL.User = conf.ServiceConfig.KafkaConf.SaslUser
config.Net.SASL.Password = conf.ServiceConfig.KafkaConf.SaslPassword
config.Net.SASL.Handshake = true
config.Net.SASL.Enable = true
}
return config
}

// getPartition 通过key来取partition的值
func (pc *PartitionClientConnector) GetPartition(topic string, key string) (int32, error) {
partition, err := pc.client.Partitions(topic)
if err != nil {
return -1, err
}
return getPartition(key, len(partition)), nil
}

// GetStartOffsetByTime 获取开始的Offset
func (pc *PartitionClientConnector) GetStartOffsetByTime(topic string, partitionID int32, time int64) (int64, error) {
return pc.client.GetOffset(topic, partitionID, time)
}

func (pc *PartitionClientConnector) createPartitionConsumer(topic string, partitionID int32, beginOffset int64) (sarama.PartitionConsumer, error) {
consumer, err := sarama.NewConsumerFromClient(pc.client)
if err != nil {
return nil, err
}
return consumer.ConsumePartition(topic, partitionID, beginOffset)
}

// PullPartitionMessageByCondition 根据key通过javaHash找到指定的partition,根据Condition拉取消息
func (pc *PartitionClientConnector) PullPartitionMessageByCondition(topic string, key string,
startTime int64, endTime int64,
filter ConsumerMsgFilter,
consumerHandler PullMsgHandler) error {

partitionID, err := pc.GetPartition(topic, key)
if err != nil {
return err
}
startOffset, err := pc.GetStartOffsetByTime(topic, partitionID, startTime)
if err != nil {
return err
}

//
consumer, err := pc.createPartitionConsumer(topic, partitionID, startOffset)
if err != nil {
return err
}
defer consumer.Close()

endOffset, err := pc.GetStartOffsetByTime(topic, partitionID, endTime)
if err != nil {
return err
}

startOffset, endTime, endOffset)
var msgOut int64
//check timeoout
ticker := time.NewTicker(time.Duration(pc.checkTimoutMills) * time.Millisecond)

lastRecvNano := time.Now().UnixNano()
isIdle := true
for {
select {
case msg := <-consumer.Messages():
msgOut = msg.Offset
isIdle = false
if endOffset > startOffset && msg.Offset > endOffset {
isIdle = true
goto stopLoop
}
if msg.Offset < startOffset {
continue
}
if filter != nil && !filter.Conditional(msg) {
continue
}
consumerHandler.processMsg(msg)
lastRecvNano = time.Now().UnixNano()
isIdle = true
case err := <-consumer.Errors():
goto stopLoop
case <-ticker.C: //检查过期停止阻塞 当现在的事件-上一次拉取的时间>设置的timeOutMills,并且是空闲状态
if (time.Now().UnixNano()-lastRecvNano)/int64(time.Millisecond) >= pc.timeOutMills &&
isIdle {
log.Trace.Infof(context.TODO(), trace.DLTagHTTPFailed, "time out.spent")
goto stopLoop
}
}
}
stopLoop:
return nil
}

// Close 关闭清理资源
func (pc *PartitionClientConnector) Close() {
pc.client.Close()
}

func getPartition(key string, partition int) int32 {
return int32(math.Abs(float64(stringutil.JavaHashCode(key) % int64(partition))))
}