liuhao163.github.io

杂七杂八


  • Home

  • Categories

  • Tags

  • Archives

  • Sitemap

数据结构与算法-bitMap和bloomFilter

Posted on 2019-08-04 | Edited on 2022-09-21 | In 数据结构与算法 , 算法

考虑如下需求:

  • 爬虫对于大量的URL进行排重
  • 统计网站的UV;
  • 对1到1亿的数据进行排序

位图的使用场景

如果我们要存储1到1亿个数字,支持排重该如何做呢,可以采用位图。见下面的代码,用位图会极大的节省查找、插入的效率.一次位运算即可,同时也极大的节省了内存空间,现在只许愿1亿个2进制,即12MB左右。如果用散列表至少需要40MB。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class BitMap {
private int[] bytes;
private int total;

//int类型的位图下标是4个字节即32bit
public BitMap(int total) {
this.bytes = new int[total / 32 + 1];
this.total = total;
}

public void set(int value) {
//找到数组下标
int index = value / 32;
//通过摩运算找到偏移量
int bit = value % 32;
//通过位运算|找到值
bytes[index] |= 1 << bit;
}

public boolean exists(int value) {
if (value > total) {
return false;
}

int index = value / 32;
int bit = value % 32;
return (bytes[index] & 1 << bit) != 0;
}
}

bloomfilter

接上面的问题,如果数字的值不多,但是范围很大,比如1亿个数字但是范围是1到10亿,这时候我们的存储空间变成了120MB,不降反升,我们该如何存储呢?这时候就要借用bloomfilter了。

boolmfilter底层还是bitMap,大小还可以设置为1亿。在介绍hash散列时候我们遇到hash冲突采用链表来解决hash冲突。在这里我们采用多个hash函数。
比如:

  • 我们对一个值用h1,h2,h3……hn,分别将值写入到v1,v2,v3…..vn中。
  • 查询时候我们分别判断如果h1~hn都为true,返回true,只要有一个为false,就返回false
  • 缺点:

    • 存在误判的情况,随着bloomfilter里1的值越来越多,误判率会加大,最好支持扩容;
    • 删除会很麻烦;

    开头提到的问题:我们可以采用10倍的bloomfilter存储url,同时进行排重

引申问题

  1. 假设我们有1亿个整数,数据范围是从1到10亿,进行排序;
  2. 海量图库的排重;
1
2
3
4
5
6
7
8
9
10
11
12
//排序
public void sort() {
for (int i = 0; i < bytes.length; i++) {
for (int j = 0; j < 32; j++) {
if ((bytes[i] & (1 << j)) != 0) {
System.out.print((i*32+j) + " ");
}
}
}

System.out.println();
}

bitmap在语言中的应用

java的中bitSet
google库guava中的BloomFilter的实现类

数据结构与算法-todo

Posted on 2019-08-02 | Edited on 2022-09-21 | In 数据结构与算法 , todo

记录我暂时不感兴趣或者我看不懂的

  • 最短路径算法。暂时不想看:小顶堆+

RocketMq源码学习-负载均衡的处理

Posted on 2019-08-02 | Edited on 2022-09-21 | In 消息队列 , rocketmq , 源码学习

rocketmq对负载均衡的处理

注册

broker发起

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
TopicConfigSerializeWrapper topicConfigWrapper) {
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.getHAServerAddr(),
topicConfigWrapper,
this.filterServerManager.buildNewFilterServerList(),
oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isCompressedRegister());

//同步方法
if (registerBrokerResultList.size() > 0) {
RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
if (registerBrokerResult != null) {
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
}

this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());

if (checkOrderConfig) {
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
}
}
}
}

调用brokerOutApi的registerBrokerAll方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean compressed) {

final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
//关键向nameServer发送RegisterBrokerRequestHeader请求
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);

RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
//封装发送逻辑-->这里向nameServer发送
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
//更新注册结果
if (result != null) {
registerBrokerResultList.add(result);
}

log.info("register broker to name server {} OK", namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}

try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}

return registerBrokerResultList;
}

NameServ的处理

在DefaultRequestProcessor中processRequest方法

1
2
3
4
5
6
7
8
9
......
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
......

registerBoker方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public RemotingCommand registerBroker(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
//
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);

//得到TopicConfigSerializeWrapper
TopicConfigSerializeWrapper topicConfigWrapper;
if (request.getBody() != null) {
topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);
} else {
topicConfigWrapper = new TopicConfigSerializeWrapper();
topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0));
topicConfigWrapper.getDataVersion().setTimestamp(0);
}

//关键方法
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
topicConfigWrapper,
null,
ctx.channel()
);

//封装response结构
responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());

byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}

RouteInfoManager

broker和topic的路由信息管理者,重要属性如下

1
2
3
4
5
6
7
8
9
10
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
//每个topic对应【Master】Broker的队列信息读队列、写队列、名称信息
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
//Broker的信息,包含了borker的id,地址等,
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//集群和broker的对应关系
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
`

调用RouteInfoManager的registerBroker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly();

//更新本地的Cluster-->brokerNames
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);

boolean registerFirst = false;

//更新rokerData(brokerName-->brokerData)
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);

//Broker是Master更新topicQueueTable
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
//有变化或者是第一次注册
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}

//同步和过滤
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}

if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}

if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}

return result;
}

关键方法:createAndUpdateQueueData

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//注册Broker时候
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
//生成队列信息,对于统一个topic来说每个msterbroker对应一个QueueData
QueueData queueData = new QueueData();
queueData.setBrokerName(brokerName);
queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
queueData.setReadQueueNums(topicConfig.getReadQueueNums());
queueData.setPerm(topicConfig.getPerm());
queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());

//topic-->多个Broker,所以有多个QueueData
List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
if (null == queueDataList) {
queueDataList = new LinkedList<QueueData>();
queueDataList.add(queueData);
this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
} else {
boolean addNewOne = true;

Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
if (qd.getBrokerName().equals(brokerName)) {
if (qd.equals(queueData)) {
addNewOne = false;
} else {
log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
queueData);
it.remove();
}
}
}

if (addNewOne) {
queueDataList.add(queueData);
}
}
}

Producer负载均衡

DefaultMQProducerImpl

发送消息和负载均衡相关的逻辑是:

先通过topicPublishInfo选择一个MessageQueue;
调用DefaultMQProducerImpl#sendKernelImpl,在之前发送消息前会调用findBrokerAddressInPublish方法根据mq.getBrokerName来从本地内存获取brokerAddr,见下面代码,如果不存在会调用,tryToFindTopicPublishInfo方法来重新加载brokerAddrTable和topicPublishInfo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
//得到BrokerAddr,没有在更新topic
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
......
}

tryToFindTopicPublishInfo方法获取TopicPublishInfo,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
//没有或者messageQueueList为空去nameserv获取
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}

if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}

updateTopicRouteInfoFromNameServer方法中去nameServ获取TopicRouteData,并且将TopicRouteData转成TopicPublishInfo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
//获取路由信息
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
//发送RequestCode为GET_ROUTEINTO_BY_TOPIC的请求获取topicRouteData
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}

if (topicRouteData != null) {
......
//如果发生改变将TopicRouteData转成TopicPublishInfo 和 subscribeInfo 更新
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
//重点 本地维护BorkerName-->bd.getBrokerAddrs()
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// Update Pub info
{
//转换
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}

// Update sub info
{
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
} else {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
}
} catch (Exception e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
} finally {
this.lockNamesrv.unlock();
}
} else {
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
}
} catch (InterruptedException e) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}

return false;
}

NameServer收到GET_ROUTEINTO_BY_TOPIC请求的处理

DefaultRequestProcessor中的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);

//
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);

//关键代码
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());

if (topicRouteData != null) {
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
}

byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}

response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}

RouteInfoManager#pickupTopicRouteData

关键方法pickupTopicRouteData,通过上述broker的注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public TopicRouteData pickupTopicRouteData(final String topic) {
TopicRouteData topicRouteData = new TopicRouteData();
boolean foundQueueData = false;
boolean foundBrokerData = false;
Set<String> brokerNameSet = new HashSet<String>();
List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
topicRouteData.setBrokerDatas(brokerDataList);

HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
topicRouteData.setFilterServerTable(filterServerMap);

try {
try {
this.lock.readLock().lockInterruptibly();
//从topicQueueTable获取的queueDataList(注册broker时候每个topic的Master的Broker对应一个QueueData)
List<QueueData> queueDataList = this.topicQueueTable.get(topic);
if (queueDataList != null) {
topicRouteData.setQueueDatas(queueDataList);
foundQueueData = true;

Iterator<QueueData> it = queueDataList.iterator();
//遍历queuData,生成brokerNameSet
while (it.hasNext()) {
QueueData qd = it.next();
brokerNameSet.add(qd.getBrokerName());
}

//遍历 组装brokerDataList
for (String brokerName : brokerNameSet) {
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
.getBrokerAddrs().clone());
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
List<String> filterServerList = this.filterServerTable.get(brokerAddr);
filterServerMap.put(brokerAddr, filterServerList);
}
}
}
}
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("pickupTopicRouteData Exception", e);
}

log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);

if (foundBrokerData && foundQueueData) {
return topicRouteData;
}

return null;
}

consumer

todo

TopicPublishInfo和TopicRouteData

todo 补充consumer

TopicRouteData保存整个broker的路由信息,通过TopicRouteData转换成TopicPublishInfo。重要属性如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* topic排序的配置
* 和"ORDER_TOPIC_CONFIG"这个NameSpace有关
* 参照DefaultRequestProcessor#getRouteInfoByTopic
*/
private String orderTopicConf;
/**
* 一个topic对应存储的位置,可参照RouteInfoManager.topicQueueTable
*/
private List<QueueData> queueDatas;
/**
* 一个topic对应的brokerDatas集合(可以根据queueDatas得到,参照RouteInfoManager#pickupTopicRouteData)
* brokerDatas来源于queueDatas
*/
private List<BrokerData> brokerDatas;
/**
* 每个brokerAddr对应的过滤Server地址
*/
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

TopicPublishInfo保存了Producer的队列信息,用于发送消息的负载均衡即选择MessageQueue。重要属性如下

1
2
3
4
5
6
7
//messageQueues
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
//用于选择队列用
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();

//nameserver中的topicRouteData
private TopicRouteData topicRouteData;

数据结构与算法-拓扑排序

Posted on 2019-08-01 | Edited on 2022-09-21 | In 数据结构与算法 , 算法 , 高级篇

初始化以及原理

拓扑排序基于一个有向无环图构成,当B依赖A,A先B执行的时候,我们画一条A–>B的边。如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Graph {
private int v;
private LinkedList<Integer>[] adj;

public Graph(int v) {
this.v = v;
this.adj = new LinkedList[v];
for (int i = 0; i < v; i++) {
adj[i] = new LinkedList<>();
}
}

public void addEdge(int s, int t) {
adj[s].add(t);
}
}

排序

有俩种排序算法Kahn算法

Kahn排序

采用贪心算法思想。

  • 因为图的关系A–>B,A先执行,所以我们统计每个顶点的入度,凡是入度为0的说明没有依赖应该他先执行;
  • 将顶点放到结果执行序列后。将该顶点从图中删除,即该顶点所到达的顶点的入度都-1,如果到0放到执行结果集中;
  • 重复上一步直到所有的顶点都在执行序列中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
    public void topoSortByKahn() {
//统计入度
int[] indegree = new int[v];
for (int i = 0; i < adj.length; i++) {
for (int j = 0; j < adj[i].size(); j++) {
// i先执行于j,j依赖i,统计j的入度w,等于统计j有多少前置依赖
int w = adj[i].get(j);
indegree[w]++;
}
}

//队列,先执行的先放入队列中
Map<Integer, LinkedList<Integer>> queueMap = new HashMap<>();

//执行结果序列
Map<Integer, List<Integer>> result = new HashMap<>();
for (int i = 0; i < indegree.length; i++) {
if (indegree[i] == 0) {
queueMap.put(i, new LinkedList<>());
queueMap.get(i).add(i);
result.put(i, new ArrayList<>());
}
}

//遍历队列
for (Map.Entry<Integer, LinkedList<Integer>> queue : queueMap.entrySet()) {
//循环放入执行队列
while (!queue.getValue().isEmpty()) {
//入度为0的顶点出队,放入到结果执行序列中
int v = queue.getValue().remove();
result.get(queue.getKey()).add(v);
for (int j = 0; j < adj[v].size(); j++) {
//将当前顶点到达的顶点入度--,等价于删除顶点。
int k = adj[v].get(j);
indegree[k]--;
//如果这个顶点的入度为0将其放到到执行序列中重复知道队列为空
if (indegree[k] == 0) {
queue.getValue().add(k);
}
}
}
}

//打印
for (List<Integer> ret : result.values()) {
System.out.print(ret);
System.out.println();
}
}

DFS

采用图的深度遍历法

  • 制造逆邻接表
  • 遍历所有的顶点,如果遇到
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//深度优先遍历
public void topoSortByDFS() {
//构造逆邻接表
LinkedList<Integer>[] inverseAdj = new LinkedList[adj.length];
for (int i = 0; i < adj.length; i++) {
inverseAdj[i] = new LinkedList<>();
}
for (int i = 0; i < adj.length; i++) {
for (int j = 0; j < adj[i].size(); j++) {
int w = adj[i].get(j);
inverseAdj[w].add(i);
}
}

//深度遍历,遍历所有的顶点,从第一个顶点开始处理
Map<Integer, Boolean> visted = new HashMap<>();
for (int i = 0; i < adj.length; i++) {
if (visted.get(i) == null || !visted.get(i)) {
visted.put(i, true);
dfs(i, inverseAdj, visted);
}
}
}

//递归深度遍历图
public void dfs(int v, LinkedList<Integer>[] inverseAdj, Map<Integer, Boolean> visted) {
for (int j = 0; j < inverseAdj[v].size(); j++) {
int w = inverseAdj[v].get(j);
if (visted.get(w) != null && visted.get(w)) {
continue;
}
visted.put(w, true);
dfs(w, inverseAdj, visted);
}
//最后da
System.out.print("->" + v);
}

时间复杂度

kahn复杂度是O(v+e),所有的顶点和边都访问了一次
dfs复杂度是O(v+e),所有的顶点和边都访问了一次

引申

如果A先于B执行我们的边是 A–>B,反过来A<–B,代码是否能正常执行?
答:不能,结果相反的结果,kahn:在算法中A–>B所以A的入度为0,这个入度为0很类似哨兵一样的机制,可以很方便的计入到执行结果中,并且可以一次减少入度。反之,我们入度为0的就变成了最后的数组,如果修改,需要统计出度凡是从出度为0的就是没有依赖的最先执行的,同时简历逆邻接表,在打印完出度为0的数据后在逆邻接表中循环递减出度同时输出,见下方代码。DFS:这个很简单,接别用逆邻接表了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
  public void topoSortByKahn() {
//统计出度
int[] outdegree = new int[v];
LinkedList<Integer>[] inverseList = new LinkedList[v];
for (int i = 0; i < v; i++) {
inverseList[i] = new LinkedList<>();
}
for (int i = 0; i < adj.length; i++) {
outdegree[i] = adj[i].size();
for (int j : adj[i]) {
inverseList[j].addLast(i);
}
}

//队列,先执行的先放入队列中
Map<Integer, LinkedList<Integer>> queueMap = new HashMap<>();

//执行结果序列
Map<Integer, List<Integer>> result = new HashMap<>();
for (int i = 0; i < outdegree.length; i++) {
if (outdegree[i] == 0) {
queueMap.put(i, new LinkedList<>());
queueMap.get(i).add(i);
result.put(i, new ArrayList<>());
}
}

//遍历队列
for (Map.Entry<Integer, LinkedList<Integer>> queue : queueMap.entrySet()) {
//循环放入执行队列
while (!queue.getValue().isEmpty()) {
//入度为0的顶点出队,放入到结果执行序列中
int v = queue.getValue().remove();
result.get(queue.getKey()).add(v);
for (int j = 0; j < inverseList[v].size(); j++) {
//将当前顶点到达的顶点入度--,等价于删除顶点。
int k = inverseList[v].get(j);
outdegree[k]--;
//如果这个顶点的入度为0将其放到到执行序列中重复知道队列为空
if (outdegree[k] == 0) {
queue.getValue().add(k);
}
}
}
}

//打印
for (List<Integer> ret : result.values()) {
System.out.print(ret);
System.out.println();
}
}

RoketMq源码学习-可用性研究

Posted on 2019-08-01 | Edited on 2022-09-21 | In 消息队列 , rocketmq , 源码学习

消息发送中用到了MQFaultStrategy来保证系统的稳定性,代码在包org.apache.rocketmq.client.latency中,其主要是通过LatencyFaultToleranceImpl来保证Broker的可用性的

MQFaultStrategy

主要成员

1
2
3
4
5
6
7
8
9
10
11
//延迟容错对象,维护延迟Brokers的信息
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();

//延迟容错开关
private boolean sendLatencyFaultEnable = false;

//延迟级别数组
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};

//不可用时长数组 对应latencyMax如果花费时间大于latencyMax[i],则不可用时间为notAvailableDuration[i]
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

selectOneMessageQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* 逻辑如下:
* 如果时延开关打开
* 1.选择一个队列,该mq的broker没在时延对列中返回
* 2.如果在时延对列中,找一个相对较好的不考虑可用性返回
* 3.不考虑可用性,选择一个返回
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
//启用延迟策略
if (this.sendLatencyFaultEnable) {
try {
//取模选一个messageQueue,符合条件返回
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
......
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
//没在延迟队列中,lastBrokerName为空或者mq和lastBrokerName俩个相等直接返回,考虑可用性
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}

//选择一个相对较好的直接返回
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
//writeQueueNums为0了删除notBestBroker
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}

//不考虑可用性
return tpInfo.selectOneMessageQueue();
}

//不考虑可用性
return tpInfo.selectOneMessageQueue(lastBrokerName);
}

updateFaultItem

发送完Message通过计算花费的时间来计算broker的不可用时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 统计不可用时长,根据brokerName和花费时间,计算出不可用时间,从而调整时延策略
*
* @param brokerName brokername
* @param currentLatency 当前的延时(花费时间)
* @param isolation 和时延等级最大的比
*/
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}

//对应latencyMax如果花费时间大于latencyMax[i],则不可用时间为notAvailableDuration[i]
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}

return 0;
}

LatencyFaultToleranceImpl

重要方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* 更新对象的时延级别
*
* @param name brokername
* @param currentLatency 当前操作花费时间
* @param notAvailableDuration 不可用时长
*/
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}

@Override
public boolean isAvailable(final String name) {
//为空或者不可用时间已经过期
final FaultItem faultItem = this.faultItemTable.get(name);
if (faultItem != null) {
return faultItem.isAvailable();
}
return true;
}

@Override
public String pickOneAtLeast() {
final Enumeration<FaultItem> elements = this.faultItemTable.elements();
List<FaultItem> tmpList = new LinkedList<FaultItem>();
while (elements.hasMoreElements()) {
final FaultItem faultItem = elements.nextElement();
tmpList.add(faultItem);
}

if (!tmpList.isEmpty()) {
//
Collections.shuffle(tmpList);

Collections.sort(tmpList);

final int half = tmpList.size() / 2;
if (half <= 0) {
return tmpList.get(0).getName();
} else {
final int i = this.whichItemWorst.getAndIncrement() % half;
return tmpList.get(i).getName();
}
}

return null;
}

消费者的负载均衡

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer.rebalance;

import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;

import java.util.ArrayList;
import java.util.List;

/**
* Average Hashing queue algorithm
*/
public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
private final InternalLogger log = ClientLogger.getLog();

@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}

List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}

//当前队列的下标
int index = cidAll.indexOf(currentCID);
//mod代表多出来的队列数
int mod = mqAll.size() % cidAll.size();
//averageSize 每个消费者对应的队列数
// mqAll.size() <= cidAll.size() 消费者数量超过了mq队列,每个消费者消费1个队列
// mod>0 && index < mod,说明:不能整除而且该消费者对应的队列数量是mqSize/cosumerSize+1即多出来一个,否则的是花是mqSize/cosumerSize
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());

//startIndex,该消费者对应的队列下标
//mod > 0 && index < mod,不能整除,而且消费者对应的队列数量是多余出来的,则开始的下标是index*avarageSize
//反之,消费者对应的队列数量不能多余出一个,index*avarageSize+mod ,这里注意avarageSize是比上面-1的(超过mod的下标,index*averageSize)
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;

//队列的数量是averageSize,和注意如果消费者大于mq且index>mode,这里会出现负数,也就是说多的消费者没有数据
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}

@Override
public String getName() {
return "AVG";
}
}

数据结构与算法-算法思想-动态规划

Posted on 2019-07-29 | Edited on 2022-09-21 | In 数据结构与算法 , 算法

原理

使用场景

一个模型:多阶段决策最优解模型,三个特征:最优解子结构,无后效性,重复子为题。

  • 多阶段决策最优解模型:即一个问题分为多个阶段,每个阶段的决策对应一组状态,我们根据这些状态寻找一组决策序列,最中获取决策序列的最优解。
  • 三个特征:
    • 最优解子结构:问题的最优解,包含子问题的最优解。也就是能通过子问题最优解找到问题的最优解
    • 无后效性:后续的问题解决方案只依赖于前一个问题的状态,而不关心他是如何推导出来的。
    • 重复子问题:不同的决策序列到达相同阶段会产生重复的状态。

空间、时间复杂度

能用动态规划解决的问题往往都是能通过回溯算法解决的,只是回溯算法的时间复杂度往往很高是指数级的O(2^n)。用动态规划这种算法往往能很大的降低是复杂度具体会变为O(nw)
空间复杂度:冬天规划因为借住了一个2维数组states[n][w+1],所以空间复杂度是O(n
w)。实际上动态规划是拿时间换空间的一个思想。

几种算法模型的区别

  • 分治:不能抽象成多阶段决策模型,而是将一个模型分成不同的部分一次解决。
  • 贪心:是动态规划的一个特殊方法,通过局部最优解推导出全局最优解,解决问题更加高效,但是也更加受限,最优子结构,无后效性,贪心选择。
  • 回溯:能用贪心、动态规划算法解决的问题几乎都能用回溯算法解决,主要是用递归方法解决问题,通过穷举所有的可能,在经过对比获取最优解,由于复杂度是指数级,试用于小数据量
  • 动态规划:上述所属多阶段决策最优解模型,有重复的子问题,无后效性,有重复子问题。

解题思路

状态转移表法

一般我们会采用二维数组来保存每一步决策的状态,如果状态较多可以采用三维四维数组,因为状态太多所以不太适合用这个方法。

方法如下:(代码和题目在下面,纪念下根据方法手撕出来的呦~)

avator

  1. 先用回溯方法实现算法
  2. 画出递归树,找到重复子问题
  3. 画一个状态表,往往是一个二维数组,这个二维数组分为行、列、数值。
  4. 我们根据题目要求,模拟递推我们的决策过程,来填写状态表表,这个递推的过程翻译成代码就是动态规划的过程,即状态转移表法。

    思考过程:
    avator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class MinDist {
private static int[][] metrix = new int[][]{{1, 3, 5, 9}, {2, 1, 3, 4}, {5, 2, 6, 7}, {6, 8, 4, 3}};

/**
* 示例中的动态表法:
* 1.我们先初始化第一行和第一列的值
* 2.根据stats[i-1][j]向下走和stats[i][j-1]向右走,找到最短距离,其他的值丢弃
* 3.遍历stats[n-1],即最后一行找到最短距离
*
* @param metrix 是矩阵,
* @return
*/
public static int minDist(int[][] metrix) {
int w = metrix.length;
int h = metrix.length;
int[][] stats = new int[w][h];

//初始化第一行和第一列
//第一行1,1+3,1+3+5,1+3+5+9
//第一列1,1+2,1+2+5,1+2+5+6
stats[0][0] = 1;
for (int i = 1; i < w; i++) {
stats[i][0] += stats[i - 1][0] + metrix[i][0];
}
for (int j = 1; j < h; j++) {
stats[0][j] += stats[0][j - 1] + metrix[0][j];
}

//推导状态转移
for (int i = 1; i < w; i++) {
for (int j = 1; j < metrix[i].length; j++) {
//stats[i-1][j]向下走-->stats[i - 1][j]+ metrix[i][j]
int down = Integer.MAX_VALUE;
if (stats[i - 1][j] > 0) {
down = stats[i - 1][j] + metrix[i][j];
}

//stats[i][j-1]向右走-->stats[i][j - 1]+ metrix[i][j]
int right = Integer.MAX_VALUE;
if (stats[i][j - 1] > 0) {
right = stats[i][j - 1] + metrix[i][j];
}

//找最小值
int min = right < down ? right : down;
if (min != Integer.MAX_VALUE) {
stats[i][j] = min;
}
}
}

int min = Integer.MAX_VALUE;
for (int i = 0; i < stats[w - 1].length; i++) {
if (stats[w - 1][i] > 0 && stats[w - 1][i] < min) {
min = stats[w - 1][i];
}
}

if (min != Integer.MAX_VALUE) {
return min;
}

return -1;
}

public static void main(String[] args) {
System.out.println(MinDist.minDist(metrix));
}
}

状态转移方程

完成状态转移方程,然后将状态转移方程翻译成代码。例如上面例子,状态转移方程是

1
min_dist(i, j) = w[i][j] + min(min_dist(i, j-1), min_dist(i-1, j))

示例

0-1背包问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
背包0-1问题升级版,
求解一组物体,pkg代表重量,它的价值是value,装n个物体,在w的限制下,如何使价值最大
*/
func Knapsack(pkg []int, value []int, n int, w int) int {
//初始化state数组长队[n][w+1]
state := make([][]int, n)
for i := 0; i < n; i++ {
state[i] = make([]int, w+1)
for j := range state[i] {
state[i][j] = -1
}
}

//0元素特殊处理
state[0][0] = 0
if pkg[0] <= w {
state[0][pkg[0]] = value[0]
}

//从第一个物体开始考察,开始状态转移
for i := 1; i < n; i++ {
//不放进背包中,i个物体是i-1个物体的重量j,value是state[i][j]值
for j := 0; j < w; j++ {
//如果上一个物体有值
if state[i-1][j] > -1 {
state[i][j] = state[i-1][j]
}
}

//放进背包中,i个物体是i-1个物体的重量j+pkg[i],value是state[i-1][j] + value[i]
//这里求解的是最优解,所以d当v大于当前重量的时候保留v。
for j := 0; j < w-pkg[i]; j++ {
if state[i-1][j] > -1 {
v := state[i-1][j] + value[i]
cw := j + pkg[i]
if v > state[i][cw] {
state[i][cw] = v
}
}
}
}

//遍历获取最大值
max := -1
for j := 0; j <= w; j++ {
if state[n-1][j] > max {
max = state[n-1][j]
}
}
return max
}

最短路径问题

如图:

avator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package dp

var Triangle = [][]int{{5}, {7, 8}, {2, 3, 4}, {4, 9, 6, 1}, {2, 7, 9, 4, 5}}

/**
坐标,只能往做或者往右走,左[i-1][j]-->[i][j],右[i-1][j]-->[i][j+1]
*/
func ShortDir(arr [][]int) int {
state := make([][]int, len(arr))
for i := range arr {
state[i] = make([]int, len(arr))
for j := range state[i] {
state[i][j] = -1
}
}

n := len(arr)
state[0][0] = arr[0][0]

for i := 1; i < n; i++ {
for j := 0; j < len(arr[i])-1; j++ {
if state[i-1][j] > 0 {
//左走
state[i][j] = arr[i][j] + state[i-1][j]
//右走
state[i][j+1] = arr[i][j+1] + state[i-1][j]
}
}
}

min := int(^uint(0) >> 1)

for j := 0; j < len(arr); j++ {
if state[n-1][j] != -1 && state[n-1][j] < min {
min = state[n-1][j]
}
}

if min == int(^uint(0)>>1) {
return -1
}

return min
}

找零问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//纸币找零动态规划求解
func ChargeDP(money []int, sum int) int {
//初始化状态数组 state[i][j],纸币数,最多是sum,j是当前状态的金额
state := make([][]bool, sum)
for i := range state {
state[i] = make([]bool, sum+1)
for j := range state[i] {
state[i][j] = false
}
}
state[0][0] = true
//min_charge(i,j)=state[i][j+max(money[0] money[1],mongey[2]])==sum-->i
for i := 1; i < sum; i++ {
for j := 0; j < sum; j++ {
//从上一次状态开始推导,当前这次最大的面额然后给纸币数+1
if state[i-1][j] {
max := -100
for k := 0; k < len(money); k++ {
//小于等于总和,然后取最大值
if j+money[k] <= sum && max < j+money[k] {
max = j + money[k]
}
}
//state[i][max]最大值获取最优解
state[i][max] = state[i-1][j]
if max == sum {
return i
}
}
}
}

return -1
}

查找莱温斯坦距离和最大共有子串长度

todo

查找数组递增子序列

动态转移公式:
如果:array[i] < array[j]==>state[i][j] = state[i - 1][j - 1] + 1;
state[i][j] =Math.max(state[i][j - 1],state[i-1][j - 1]);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public int getAscDP(int[] array) {
int state[][] = new int[array.length][array.length];
state[0][0] = 1;


for(int j=1;j<array.length;j++){
if(array[0]<array[j]){
state[0][j]=2;
}
state[0][j]=1;
}


for(int i=1;i<array.length;i++){
if(array[i]<array[0]){
state[i][0]=2;
}
state[i][0]=1;
}


for (int i = 1; i < array.length; i++) {
for (int j = 1; j < array.length; j++) {
if (array[i] < array[j]) {
state[i][j] = state[i - 1][j - 1] + 1;
} else {
state[i][j] =Math.max(state[i][j - 1],state[i-1][j - 1]) ;
}
}
}

return state[array.length-1][array.length-1];
}

数据结构与算法-分治算法

Posted on 2019-07-25 | Edited on 2022-09-21 | In 数据结构与算法 , 算法

todo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

type ReversedCount struct {
Num int
A []int
}

func (this *ReversedCount) Count(p int, r int) {
if p >= r {
return
}

m := (r + p) / 2

this.Count(p, m)
this.Count(m+1, r)

this.merge(p, m, r)
}

func (this *ReversedCount) merge(p int, m int, r int) {
j := p
k := m + 1
tmp := make([]int, r-p+1)
i := 0
for j <= m && k <= r {
if this.A[j] <= this.A[k] {
tmp[i] = this.A[j]
j++
i++
} else {
tmp[i] = this.A[k]
/**
1.数组分为俩部分分别为前半部分a[p,m],以及后版部分a[m+1,r]
2.这时候因为A[K]<A[J],又因为前半部分的数组是有序的,所以前半部分的数组的剩余部分都是逆序的,统计这部分元素个数即可
3.计算方法m-j+1,解释:
j是之前已经有了j个比A[k]小的元素,由于前半数组是有序的,所以从m-j+1个之后都是比a[j]大的元素了。
+1是因为结果药品包含当前元素(这个地方当时写错了)
*/
this.Num += m - j + 1
k++
i++
}
}

for j <= m {
tmp[i] = this.A[j]
i++
j++
}

for k <= r {
tmp[i] = this.A[k]
i++
k++
}

for idx, i := range tmp {
this.A[p+idx] = i
}

}

数据结构与算法-算法思想-贪心算法

Posted on 2019-07-25 | Edited on 2022-09-21 | In 数据结构与算法 , 算法

贪心算法的使用场景

  1. 给定一组数据,定义了限制值和期望值,在限定的条件下,期望值最大。
  2. 在对限制值同等贡献量条件下,选择对期望值贡献最大的数据
  3. 举几个例子验证弹性算法的正确性

几个经典的场景

分糖果

有N个大小不同的糖果,分给M个孩子,N>M,每个孩子对于糖果大小的需求不同,并且每个孩子得到一个糖果就能满足,请问如何分:
答:按照贪心算法,限制值是M,每个孩子一颗糖即相同期望值是一颗糖,我们想尽量满足更多的孩子,所以我们要先将最小的糖果分给对糖果的大小需求小的孩子

钱币找零

钱币有10,20,50,100的零钱,我们如何找零
答:由于找零所需的金额一定,即限制值是金额。我们希望相同纸币数的情况下多贡献金额,相同期望值是纸币数。所以我们先将大的找出去,在用零钱填补空缺。

空间覆盖

从N个空间选出尽量多不相交的区间见下图

avator

答:我们选择尽量靠近左端且不与前面覆盖的端点,然后右边选择离左边尽量近的点给右边尽可能大的空间

huffman编码

用于压缩一段数据往往压缩的量在20~90%之间,原理在:https://time.geekbang.org/column/article/73188
后期完善

数据结构与算法-字符串匹配-KMP算法

Posted on 2019-07-21 | Edited on 2022-09-21 | In 数据结构与算法 , 算法

时间复杂度是O(m+n),n是模式串长度、m是主串长度。空间复杂度O(n),因为失效函数借助了一个数组长度为n

重点

  1. 比较的移动借鉴了BM算法,采用好前缀规则如图,遇到坏字符时候,假设好前缀的最长可匹配前缀子串长度为v,则模式串一次性移动j-v个距离,相当于将j变为v
  2. 求解失效数组

avator

匹配代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func KMP(str string, ptr string) int {
next := getNext(ptr)
i := 0
j := 0
for i < len(str) && j < len(ptr) {
//如果匹配继续
if j < 0 || str[i] == ptr[j] {
j++
i++
} else {
//发现badecode,滑动模式串,next[j]个位置,如果是0发现不等,会到-1,然后i移动,j移动
j = next[j]
}
}

if j == len(ptr) {
return i - j
}

return -1
}

求解next数组代码

规律如下,假设模式串为a,那么遵循如下俩条规律

  1. 当a[0,i-1]的最长可匹配前子串是a[0,k-1],的下一个字符a[i]等于a[k],那么a[0,i]的最长匹配前缀就是a[0,k]
  2. 当a[i]不等于a[k],我们要找到a[0,i-1]的次长匹配子串a[0,k’],当a[k’+1]等于a[i],那么a[0,k’]就是a[0,i]的最长匹配前缀
    1. 其中,a[0,k’]一定包含a[0,k-1]中,即next数组中,如图

avator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

//失效函数,求解ptr的next数组,我们可以看做是ptr字符串和自己的最长前缀的匹配
//重点:
// 1、公式1:如果p[i]的最长匹配前缀子串是j,如果p[i+1]==p[j+1],那么p[i+1]的最长匹配子串是j+1,next[i+1]=j+1
// 2、公式2:上述情况下如果p[i+1]!=p[j+1],我们认为j+1是坏字符串,应该将最长前缀字符串(在这里是模式串)挪动next[j+1]个距离假设是y,在继续查找,如果这时候p[i+1]==p[y],那么p[i+1]的最长匹配子串就是next[i+1]=y
func getNext(ptr string) []int {
next := make([]int, len(ptr))
next[0] = -1 //如果前缀只有一个字符是没有好前缀的

i := 0
j := -1

//遍历模式串ptr
for i < len(ptr)-1 {
//j==-1时候,匹配失效,复位j=0说明没有匹配得上的最长前缀子串;
// 如果ptr[i]==ptr[j],说明next[i]=j,然后俩个都++如果继续相等next[i+1]=j+1,
if j == -1 || ptr[i] == ptr[j] {
j++
i++
next[i] = j
} else {
//如果ptr[i] != ptr[j]并且j!=-1,相当于模式串的最长后缀和模式串的最长前缀无法匹配
//这时候要移动j,移动的方案是假设j是坏字符,那么查找ptr[0,j]的最长前缀子串一定在next数组中(上面的分支已经匹配过了)
//所以移动j,距离是next[j](刚才匹配的最长前缀的长度)
j = next[j] //****不会空指针的原因在上面的分支,上一次循环next[i++]=j++,i本身就>j,所以next[j]之前已经计算过了。一定是有值的
}
}
fmt.Print(next)
return next
}

数据结构与算法-字符串匹配-BM算法

Posted on 2019-07-17 | Edited on 2022-09-21 | In 数据结构与算法 , 算法

原理:BM算法是在暴力破解算法的基础上利用一定规律都移动模式串已达到快速匹配的效果。(俩个规律好后缀和坏字符),直接上代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package stringsearch

func BM(s string, p string) int {
//初始化好后缀
suffix, prefix := initGS(p)

//初始化坏字符
bc := initBC(p)
for i := 0; i < len(s)-len(p); {
j := len(p) - 1

//从后往前匹配
for ; j >= 0; j-- {
if s[i+j] != p[j] {
break
}
}

//整个模式串都符合直接返回
if j < 0 {
return i
}

//坏字符规则
x := j - bc[s[i+j]]

//好后缀
y := 1
if j < len(p)-1 {
y = moveGS(j, len(p), suffix, prefix)
}

//移动i相当于按照规则移动模式串
ret := x
if y > x {
ret = y
}
i = i + ret
}
return -1
}

func initBC(pattern string) []int {
bc := make([]int, 256)
for r := 0; r < 255; r++ {
bc[r] = -1
}

for idx, v := range []byte(pattern) {
bc[v] = idx
}

return bc
}

func initGS(pattern string) ([]int, []bool) {
suffix := make([]int, len(pattern))
prefix := make([]bool, len(pattern))
for r := 0; r < len(pattern); r++ {
suffix[r] = -1
}

for i := 0; i < len(pattern)-1; i++ {
j := i
k := 0
for j >= 0 && pattern[j] == pattern[len(pattern)-1-k] {
k++
suffix[k] = j
j--
}

if j < 0 {
prefix[k] = true
}
}

return suffix, prefix
}

func moveGS(j int, m int, suffix []int, prefix []bool) int {
k := m - 1 - j
if suffix[k] != -1 {
return j - suffix[k] + 1
} else {
for r := j + 2; r <= m-1; r++ {
if prefix[m-r] {
return r
}
}
}

return m
}
1…131415…23

Liu hao

励志当好厨子的程序员

229 posts
54 categories
81 tags
RSS
GitHub E-Mail
© 2018 – 2023 Liu hao
Powered by Hexo v3.9.0
|
Theme – NexT.Pisces v7.0.0