liuhao163.github.io

杂七杂八


  • Home

  • Categories

  • Tags

  • Archives

  • Sitemap

Spring事务-事务传播级别

Posted on 2021-06-22 | Edited on 2022-09-21 | In java , spring , 事务

事务是逻辑处理原子性的保证手段,通过使用事务控制,可以极大的避免出现逻辑处理失败导致的脏数据等问题。
事务最重要的两个特性,是事务的传播级别和数据隔离级别。传播级别定义的是事务的控制范围,事务隔离级别定义的是事务在数据库读写方面的控制范围。

在spring中我们对事务的定义主要是通过注解@Transactional,控制传播级别的就是参数propagation是一个Propagation的枚举有如下的值

REQUIRED(PROPAGATION_REQUIRED)

默认的spring事务传播级别,使用该级别的特点是,如果上下文中已经存在事务,那么就加入到事务中执行,如果当前上下文中不存在事务,则新建事务执行。所以这个级别通常能满足处理大多数的业务场景。

即,当前的事务如果没有运行在@Transactional注解的方法中,新建一个事务,如果运行在一个@Transactional注解的方法中则加入当前事务,一旦出现异常,会连调用者的事务一起回滚。

SUPPORTS(PROPAGATION_SUPPORTS)

从字面意思就知道,supports,支持,该传播级别的特点是,如果上下文存在事务,则支持事务加入事务,如果没有事务,则使用非事务的方式执行。所以说,并非所有的包在transactionTemplate.execute中的代码都会有事务支持。这个通常是用来处理那些并非原子性的非核心业务逻辑操作。应用场景较少。

即,当前的事务如果没有运行在@Transactional注解的方法中,则没有事务,如果运行在一个@Transactional注解的方法中则加入当前事务,一旦出现异常,会连调用者的事务一起回滚。

MANDATORY(PROPAGATION_MANDATORY)

该级别的事务要求上下文中必须要存在事务,否则就会抛出异常!配置该方式的传播级别是有效的控制上下文调用代码遗漏添加事务控制的保证手段。比如一段代码不能单独被调用执行,但是一旦被调用,就必须有事务包含的情况,就可以使用这个传播级别。

即,当前的方法必须运行在有@Transactional注解的方法种,否则会抛出异常,和NEVER相反。

REQUIRES_NEW(PROPAGATION_REQUIRES_NEW)

从字面即可知道,new,每次都要一个新事务,该传播级别的特点是,每次都会新建一个事务,并且同时将上下文中的事务挂起,执行当前新建事务完成以后,上下文事务恢复再执行。

即,当前的方法单独运行会以事务方式运行,在@Transactional注解的方法中运行,运行到该方法时候回将当前事务挂起,新建一个事务运行,一旦出现异常,只会回滚该方法的事务,不会回滚调用者的事务。

这是一个很有用的传播级别,举一个应用场景:现在有一个发送100个红包的操作,在发送之前,要做一些系统的初始化、验证、数据记录操作,然后发送100封红包,然后再记录发送日志,发送日志要求100%的准确,如果日志不准确,那么整个父事务逻辑需要回滚。
怎么处理整个业务需求呢?就是通过这个PROPAGATION_REQUIRES_NEW 级别的事务传播控制就可以完成。发送红包的子事务不会直接影响到父事务的提交和回滚。

NOT_SUPPORTED(PROPAGATION_NOT_SUPPORTED)

这个也可以从字面得知,not supported ,不支持,当前级别的特点就是上下文中存在事务,则挂起事务,执行当前逻辑,结束后恢复上下文的事务。

即,当前方法单独运行不会以事务方法运行,如果运行在在@Transactional注解的方法中,运行到该方法时候会将当前事务挂起,以非事务的方式执行该方法在恢复事务

这个级别有什么好处?可以帮助你将事务极可能的缩小。我们知道一个事务越大,它存在的风险也就越多。所以在处理事务的过程中,要保证尽可能的缩小范围。比如一段代码,是每次逻辑操作都必须调用的,比如循环1000次的某个非核心业务逻辑操作。这样的代码如果包在事务中,势必造成事务太大,导致出现一些难以考虑周全的异常情况。所以这个事务这个级别的传播级别就派上用场了。用当前级别的事务模板抱起来就可以了。

NEVER(PROPAGATION_NEVER)

该事务更严格,上面一个事务传播级别只是不支持而已,有事务就挂起,而PROPAGATION_NEVER传播级别要求上下文中不能存在事务,一旦有事务,就抛出runtime异常,强制停止执行!这个级别上辈子跟事务有仇。

即,该方法不能运行在注解@Transactional的方法中,和MANDATORY相反

NESTED(PROPAGATION_NESTED)

字面也可知道,nested,嵌套级别事务。该传播级别特征是,如果上下文中存在事务,则嵌套事务执行,如果不存在事务,则新建事务。

即:和REQUIRED(PROPAGATION_REQUIRED)类似,很少见

附录

测试代码见:https://gitee.com/liuhao163/test-transational

重构-为什么要重构以及代码中的坏味道

Posted on 2021-06-02 | Edited on 2022-09-21 | In 重构

重构指什么

这里的 重构行为 是指在不改变软件功能的前提下,通过一些 重构手段 调整代码结构使其更易维护、代码更加健壮,并且能有效降低项目的学习成本。

为何重构

  • 改进软件设计
  • 使软件更易读
  • 帮助解决一些诡异的BUG
  • 提高变成速度

重构的时机很重要

我们何时重构呢?一般来说,我们很难说找到一个比较长的周期什么事都不做,只重构。那么重构应该就穿插在我们日常工作中。

  • 预备性重构:添加新的功能之前做技术设计,发现这部分代码设计的不够合理我们可以在这时候进行该功能的重构
  • 帮助理解的重构:当我读不懂这段代码的时候,我就要考虑进行重构,或者说的极限点,当我觉得这部分代码需要些注释才能解释明白的时候我可以考虑进行重构
  • 捡垃圾式的重构:在读代码时候发现这段代码逻辑相同,或者结构迂回有问题
  • 有计划的重构和见机行事的重构:在写新功能时候顺手就把不合理的代码改掉
  • 长期重构:一些比较大的架构需要调整的视乎
  • review时候重构

    另外重构尽量在程序员之间内部闭环少让产品、项目经理参与他们可能出于成本考虑阻碍重构,另外重新比重构划算时候就重写吧

重构的挑战是什么

  • 延缓新功能开发:如果一块代码被封装隔离起来了,即便写的很糟只要你不用理解它你就可以不重构它
  • 代码所有权:一旦接口发布出去了就不好在重构了,否则需要做大量兼容工作
  • 分支:主干发布,分支开发这种会导致,merge地狱出现
  • 测试:重构要提前想好测试方案,减少出现BUG的几率
  • 遗留代码:祖传代码没人敢动,还是提前准备充足测试方案
  • 数据库的重构:做好版本管理,将改动合并到代码库中便于回滚

代码有哪些坏味道

在我们工程的代码中出现如下的情况,会使我们的代码产生坏的味道入股不管,这些坏味道很快就会让我们的工程腐败,所以我们可以考虑对这些地方进行进行重构。

神秘命名

当我们发现我们对方法或者函数命名不够准确,很难理解这时候我们应该考虑架构是否合理,代码结构是否应该调整。

解决方法:

  1. 改变函数声明
  2. 字段改名
  3. 变量改名

重复代码

在一个地方看到相同的代码结构,导致后期维护需要维护多个副本

解决方法:

  1. 提炼函数
  2. 如果只是相似而不是相同,我们可以先 移动语句 重组代码顺序
  3. 如果重复代码在不同的子类中,我们可以用 函数上移 将代码放到父类中

过长函数

过长的函数会导致代码很难理解

解决方法:

  1. 提炼函数,拆分成多个小函数
  2. 以查询取代临时变量,减少难以理解的临时变量
  3. 引入参数对象、保持参数完整性,减少过长的参数列表
  4. 如果还有_太多参数和临时变量,我们可以以 以命令取代函数 将这个长的函数封装成一个命令类,构造函数是参数
  5. 对于循环 拆分循环 ,对于条件表达式可以 分解条件表达式,或者以 多态处理

过长参数列表

危害同上

解决方法:

  1. 假设参数列表中的一个参数,是通过另一个参数求解出来的,我们可以用 已查询取代参数 来去掉这个参数
  2. 保持参数完整性
  3. 已入参数对象
  4. 移除标记参数
  5. 如果多个函数引用相同的参数列表,可以 函数组合成类

全局数据

如果没有对数据的修改封装,很可能会造成系统BUG

解决方案

  1. 少用,如果必须要用缩短其作用域(包级别调用)
  2. 封装变量

可变数据

对数据的修改很可能会导致BUG,同上

解决方案

  1. 封装变量
  2. 拆分变量
  3. 移动语句,提炼函数
  4. 将查询函数和修改函数分离
  5. 移除设值函数
  6. 如果可变数据能在其他地方计算出来,我们可以 查询取代派生变量 即:字段只没有变,每次返回的都是计算过程
  7. 函数组合成类、函数组合成变换、引用对象改为值对象

发散式变化

某个模块因为不同的原因在不同的地方发生变化,最终导致这个模块无法维护,这个其实违反了地耦合的原则,一个类应该指关心自己的上下文。

解决方案

  1. 先用拆分阶段将两者拆分、然后用搬移函数将处理逻辑分开。
  2. 如果函数内部混合了俩类处理逻辑、先提炼函数讲起拆分,如果是以类的方式定义的可以用提炼类来做拆分

霰弹式修改

如果每次修改一个地方,都需要在很多其他类中做修改,这个其实违反了高内聚的原则。

解决方案

  1. 搬移函数或者搬移字段把要修改的代码放在同一个模块
  2. 如果有很多函数都在操作想死的数据,可以使用 函数组合
  3. 如果优先函数是功能转换或者数据填充,可以使用 函数组合成变换
  4. 如果一些函数输出可以组合后提供给一段专门使用这些计算结果的逻辑,可以使用 拆分阶段
  5. 一个常用策略是 内联函数 或者是 内连类 将不该分散的逻辑拽到一起。

依恋情节

todo

记一次安装opencv和gocv踩过的坑

Posted on 2020-12-07 | Edited on 2022-09-21 | In 经验积累 , 项目积累

最近负责一个项目将人脸算法工程,和人脸后端工程集成到一个arm架构的小型盒子中。我主要负责后端工程这部分,语言栈是golang,通过gocv调用opencv【ffmpeg】拉取rtsp视频流到本地抽帧进行人脸的比对。

这其中涉及到gocv+opencv,由于盒子的的架构比较特殊切公司出于安全原因无法连接外网,大部分依赖只能通过源码编译,我踩了不少的坑特此记录

如何安装ffmpeg

所以我马上手动编译ffmpeg,一般我们都可以用yum安装,但是这里不能联网只能源码安装

1
2
3
4
5
cd ffmpeg
./configure --enable-shared --enable-swscale --enable-gpl --enable-nonfree --enable-pic --prefix=/usr/local/ffmpeg --enable-version3 --enable-postproc --enable-pthreads --enable-avisynth

make
make install

如何编译opencv

下载opencv,注意版本,当前gocv的依赖库制定的是opencv4.5.0

注意安装opencv需要cmake3

1
2
3
4
5
6
cd opencv-${OPENCV_VERSION} #这里进入opencv目录
mkdir build
cd build
cmake3 -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=/usr/local -DOPENCV_EXTRA_MODULES_PATH=../../opencv_contrib-${OPENCV_VERSION}/modules -DOPENCV_GENERATE_PKGCONFIG=ON -DWITH_FFMPEG=on ..
make -j $proc
make install
  • CMAKE_INSTALL_PREFIX 安装目录
  • OPENCV_EXTRA_MODULES_PATH 制定的一些插件目录这里用不到
  • OPENCV_GENERATE_PKGCONFIG 编译golang的程序时候gocv检查依赖是通过pkg-config所以要安装,会生成.pc文件,之后我们可以将.pc文件放到pkg-config,后面会讲到如何放
  • WITH_FFMPEG 处理视频流的解码要依赖

安装opencv,执行cmake3时候解决FFMPEG NO

我在编译opencv4.5.0,执行cmake时候发现ffmpeg总是no,导致程序启动时候无法顺利拉流下来提示Error read file :xxxx这是由于安装opencv时候check依赖ffmpeg失败导致的。原来opencv检查ffpmeg是通过pkg-config的,所以

解决方案:我们需要将刚才编译的ffmpeg下的*.pc都拷贝到pkg-config指定的目录中

指定pkg-config

我们在编译golang程序时候有可能会失败,指找不到opencv库,有可能是pkg-config加载失败导致的解决方案

1
2
export PKG_CONFIG_PATH=<存放*.pc的目录,一般是xxx/pkgconfig>:$PKG_CONFIG_PATH
pkg-config --cflags -- opencv4 #测试opencv4

加载动态链接库

如果想让系统找到opencv库文件需要这么做

1
2
3
4
cd /etc/ld.so.conf.d
touch OpenCV.conf
echo <libopenxxx.so所在的补录,这里一般是/usr/local/lib > OpenCV.conf
ldconfig

编译golang程序

以为没法连外网所以把源码和依赖拷贝到盒子中

下载依赖

  • go mod tidy
  • go mod vendor

    编译

  • go build -mod vendor

至此,基本上坑都踩过了,特此记录。

Spring源码-工具-EmbeddedValueResolverAware

Posted on 2020-10-15 | Edited on 2022-09-21 | In java , spring , 工具

todo

读取配置文件的配置用

SpringMvc源码-RequestMappingHandlerAdapter

Posted on 2020-10-13 | Edited on 2022-09-21 | In java , spring , mvc

HandlerAdapter接口的说明

在SprintMVC中,HandlerAdapter组件是一个handler适配器类,它通过handle方法调用request对应的controller方法来处理请求【HandlerMethod】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
 /**
* Use the given handler to handle this request.
* The workflow that is required may vary widely.
* @param request current HTTP request
* @param response current HTTP response
* @param handler handler to use. This object must have previously been passed
* to the {@code supports} method of this interface, which must have 实际上是 HandlerMethod
* returned {@code true}.
* @throws Exception in case of errors
* @return a ModelAndView object with the name of the view and the required
* model data, or {@code null} if the request has been handled directly
*/
//
@Nullable
ModelAndView handle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception;

如何获取HandlerAdapter

见DispatcherServlet.getHandlerAdapter

1
2
3
4
5
6
7
8
9
10
11
12
13
protected HandlerAdapter getHandlerAdapter(Object handler) throws ServletException {
//遍历DipatcherServlet.properties里org.springframework.web.servlet.HandlerAdapter的配置,调用handlerAdpater.supports看那个符合条件
//Springboot中,会执行RequestMappingHandler.supports-->AbstractHandlerMethodAdapter.supports 即handler是HandlerMetod
if (this.handlerAdapters != null) {
for (HandlerAdapter adapter : this.handlerAdapters) {
if (adapter.supports(handler)) {
return adapter;
}
}
}
throw new ServletException("No adapter for handler [" + handler +
"]: The DispatcherServlet configuration needs to include a HandlerAdapter that supports this handler");
}

handle方法的执行

调用路径是AbstractHandlerMethodAdapter.handle()–>AbstractHandlerMethodAdapter.handleInternal()–>RequestMappingHandlerAdapter.handleInternal。

见RequestMappingHandlerAdapter.handleInternal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Override
protected ModelAndView handleInternal(HttpServletRequest request,
HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {

ModelAndView mav;
//校验requestMethod和requireSession
checkRequest(request);

// Execute invokeHandlerMethod in synchronized block if required.
// session线程不安全,如果想用户多次请求都访问同一个session需要加一个全局锁。。性能极差很少见
if (this.synchronizeOnSession) {
HttpSession session = request.getSession(false);
if (session != null) {
Object mutex = WebUtils.getSessionMutex(session);
synchronized (mutex) {
mav = invokeHandlerMethod(request, response, handlerMethod);
}
}
else {
// No HttpSession available -> no mutex necessary
mav = invokeHandlerMethod(request, response, handlerMethod);
}
}
else {
// No synchronization on session demanded at all...
// 关键方法,代理方法执行handlerMethod,返回modelAndView
mav = invokeHandlerMethod(request, response, handlerMethod);
}

if (!response.containsHeader(HEADER_CACHE_CONTROL)) {
if (getSessionAttributesHandler(handlerMethod).hasSessionAttributes()) {
applyCacheSeconds(response, this.cacheSecondsForSessionAttributeHandlers);
}
else {
prepareResponse(response);
}
}

return mav;
}

controller方法的执行原理,代理模式和反射的应用:invokeHandlerMethod

最终的执行实际上是利用了java的反射包,以及代理模式,关键代码见:invokeHandlerMethod

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/**
* Invoke the {@link RequestMapping} handler method preparing a {@link ModelAndView}
* if view resolution is required.
* @since 4.2
* @see #createInvocableHandlerMethod(HandlerMethod)
*/
@Nullable
protected ModelAndView invokeHandlerMethod(HttpServletRequest request,
HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {
//HttpServletRequest 的封装
ServletWebRequest webRequest = new ServletWebRequest(request, response);
try {
//用来处理request中的参数映射,WebDataBinderFactory里的值见:afterPropertiesSet
WebDataBinderFactory binderFactory = getDataBinderFactory(handlerMethod);
//用来创建初始化model
ModelFactory modelFactory = getModelFactory(handlerMethod, binderFactory);
//根据handlerMethod实例化一个ServletInvocableHandlerMethod来处理请求
ServletInvocableHandlerMethod invocableMethod = createInvocableHandlerMethod(handlerMethod);
//设置argumentResolvers处理request的参数 argumentResolvers里的值见afterPropertiesSet
if (this.argumentResolvers != null) {
invocableMethod.setHandlerMethodArgumentResolvers(this.argumentResolvers);
}
//设置argumentResolvers处理response的参数 returnValueHandlers里的值见afterPropertiesSet
// 可自定义 TODO 比如想给所有的ResponseBody返回值封装成{code:0,msg:1,data:null}
if (this.returnValueHandlers != null) {
invocableMethod.setHandlerMethodReturnValueHandlers(this.returnValueHandlers);
}
invocableMethod.setDataBinderFactory(binderFactory);
invocableMethod.setParameterNameDiscoverer(this.parameterNameDiscoverer);

//modelAndView的上下文对象
ModelAndViewContainer mavContainer = new ModelAndViewContainer();
mavContainer.addAllAttributes(RequestContextUtils.getInputFlashMap(request));
//初始化Model对象,同时将sessionAtrributes值和ModelAtrributes的值都合并在一起放在Model中
modelFactory.initModel(webRequest, mavContainer, invocableMethod);
mavContainer.setIgnoreDefaultModelOnRedirect(this.ignoreDefaultModelOnRedirect);

//todo 未读
AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response);
asyncWebRequest.setTimeout(this.asyncRequestTimeout);

WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
asyncManager.setTaskExecutor(this.taskExecutor);
asyncManager.setAsyncWebRequest(asyncWebRequest);
asyncManager.registerCallableInterceptors(this.callableInterceptors);
asyncManager.registerDeferredResultInterceptors(this.deferredResultInterceptors);

if (asyncManager.hasConcurrentResult()) {
Object result = asyncManager.getConcurrentResult();
mavContainer = (ModelAndViewContainer) asyncManager.getConcurrentResultContext()[0];
asyncManager.clearConcurrentResult();
LogFormatUtils.traceDebug(logger, traceOn -> {
String formatted = LogFormatUtils.formatValue(result, !traceOn);
return "Resume with async result [" + formatted + "]";
});
invocableMethod = invocableMethod.wrapConcurrentResult(result);
}

//关键代码,通过handlerMethod的代理方法,执行Controller的方法,将结果存储到mavContainer
invocableMethod.invokeAndHandle(webRequest, mavContainer);
if (asyncManager.isConcurrentHandlingStarted()) {
return null;
}

//获取ModelAndView
return getModelAndView(mavContainer, modelFactory, webRequest);
}
finally {
webRequest.requestCompleted();
}
}

invocableMethod.invokeAndHandle(webRequest, mavContainer);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public void invokeAndHandle(ServletWebRequest webRequest, ModelAndViewContainer mavContainer,
Object... providedArgs) throws Exception {

//关键代码,通过java的反射机制doInvoke执行handlerMethod对应的controller方法
Object returnValue = invokeForRequest(webRequest, mavContainer, providedArgs);
setResponseStatus(webRequest);

if (returnValue == null) {
if (isRequestNotModified(webRequest) || getResponseStatus() != null || mavContainer.isRequestHandled()) {
disableContentCachingIfNecessary(webRequest);
mavContainer.setRequestHandled(true);
return;
}
}
else if (StringUtils.hasText(getResponseStatusReason())) {
mavContainer.setRequestHandled(true);
return;
}

mavContainer.setRequestHandled(false);
Assert.state(this.returnValueHandlers != null, "No return value handlers");
try {
//调用returnValueHandlers 将返回值写到response中(webRequest.getNativeResponse)里
// 由于现在开发采用前后端分离,RestApi往往会用到RequestResponseBodyMethodProcessor,可以看这个类,返回值和controller方法有@ResponseBody
this.returnValueHandlers.handleReturnValue(
returnValue, getReturnValueType(returnValue), mavContainer, webRequest);
}
catch (Exception ex) {
if (logger.isTraceEnabled()) {
logger.trace(formatErrorForReturnValue(returnValue), ex);
}
throw ex;
}
}

invokeForRequest方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Nullable
public Object invokeForRequest(NativeWebRequest request, @Nullable ModelAndViewContainer mavContainer,
Object... providedArgs) throws Exception {
//从request里获取controller执行方法的参数,request里的值变为方法里设置的参数
Object[] args = getMethodArgumentValues(request, mavContainer, providedArgs);
if (logger.isTraceEnabled()) {
logger.trace("Arguments: " + Arrays.toString(args));
}
//通过java的反射机制,执行controller里的方法,方法参数getMethodArgumentValues获取
return doInvoke(args);
}

protected Object doInvoke(Object... args) throws Exception {
ReflectionUtils.makeAccessible(getBridgedMethod());
try {
//todo 通过反射执行方法 书签
//调用java反射包的invoke 代理执行HandlerMethod里econtroller对应的方法
// getBridgedMethod= 如果没有意外等于controller被执行的方法
// getBean()=Controller对象
// args=request通过argumentResolver得到的方法参数对象
return getBridgedMethod().invoke(getBean(), args);
}
catch (IllegalArgumentException ex) {
assertTargetBean(getBridgedMethod(), getBean(), args);
String text = (ex.getMessage() != null ? ex.getMessage() : "Illegal argument");
throw new IllegalStateException(formatInvokeError(text, args), ex);
}
catch (InvocationTargetException ex) {
// Unwrap for HandlerExceptionResolvers ...
Throwable targetException = ex.getTargetException();
if (targetException instanceof RuntimeException) {
throw (RuntimeException) targetException;
}
else if (targetException instanceof Error) {
throw (Error) targetException;
}
else if (targetException instanceof Exception) {
throw (Exception) targetException;
}
else {
throw new IllegalStateException(formatInvokeError("Invocation failure", args), targetException);
}
}
}

SpringMvc源码-RequestMappingHandlerMapping

Posted on 2020-10-13 | Edited on 2022-09-21 | In java , spring , mvc

RequestMappingHandlerMapping负责将RequestMapping注解的方法与url关联起来,并且返回HandlerExecutionChain

加载过程

DispatcherServlet.initStrategies的initHandlerMappings方法中会通过读取DispatchServerlet.proeprties对HandlerMapping进行初始化,这里重点看RequestMappingHandlerMapping类

DispatcherServlet.properties的值

1
2
3
org.springframework.web.servlet.HandlerMapping=org.springframework.web.servlet.handler.BeanNameUrlHandlerMapping,\
org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerMapping,\
org.springframework.web.servlet.function.support.RouterFunctionMapping

初始化HandlerMapping

HandlerMapping,封装了是request与他的处理类HandlerMethod的映射关系,他的职责主要是通过request找到对应的handlerMapping

AbstractHandlerMethodMapping实现了InitializingBean所以在afterPropertiesSet会进行初始化工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void afterPropertiesSet() {
//调用initHandlerMethods
initHandlerMethods();
}

//从ApplicationContext扫描所有的bean, 找到并且将handler method注册到容器中【将url和handlerMethod关联】
protected void initHandlerMethods() {
//获取application中所有的bean
for (String beanName : getCandidateBeanNames()) {
if (!beanName.startsWith(SCOPED_TARGET_NAME_PREFIX)) {
//重点方法
processCandidateBean(beanName);
}
}
handlerMethodsInitialized(getHandlerMethods());
}

重点方法:processCandidateBean

根据bean找到method,封装成handlerMethod注册到容器中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected void processCandidateBean(String beanName) {
Class<?> beanType = null;
try {
beanType = obtainApplicationContext().getType(beanName);
} catch (Throwable ex) {
// An unresolvable bean type, probably from a lazy bean - let's ignore it.
if (logger.isTraceEnabled()) {
logger.trace("Could not resolve type for bean '" + beanName + "'", ex);
}
}
//这里isHandler判断beanType是否是一个Controller【判断是否有@Controller和@RequestMapping注解】
if (beanType != null && isHandler(beanType)) {
//重点方法
detectHandlerMethods(beanName);
}
}

重点方法

detectHandlerMethods遍历bean的method方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
protected void detectHandlerMethods(Object handler) {
Class<?> handlerType = (handler instanceof String ?
obtainApplicationContext().getType((String) handler) : handler.getClass());

if (handlerType != null) {
Class<?> userType = ClassUtils.getUserClass(handlerType);
//遍历handler中的方法,通过getMappingForMethod返回RequestMappingInfo
// 通过方法上的RequestMapping注解,生成RequestMappingInfo,并且和Method关联
Map<Method, T> methods = MethodIntrospector.selectMethods(userType,
(MethodIntrospector.MetadataLookup<T>) method -> {
try {
return getMappingForMethod(method, userType);
} catch (Throwable ex) {
throw new IllegalStateException("Invalid mapping on handler class [" +
userType.getName() + "]: " + method, ex);
}
});
if (logger.isTraceEnabled()) {
logger.trace(formatMappings(userType, methods));
}

//遍历method-->RequestMappingInfo 并且将method和mapping注册到 mappingRegistry中RequestMappingHandlerMapping对其进行了改写
methods.forEach((method, mapping) -> {
Method invocableMethod = AopUtils.selectInvocableMethod(method, userType);
registerHandlerMethod(handler, invocableMethod, mapping);
});
}
}

重点方法:registerHandlerMethod(handler, invocableMethod, mapping)

向容器注册HandlerMapping:registerHandlerMethod(handler, invocableMethod, mapping);
这里的mappingRegistry 是AbstractHandlerMethodMapping的内部类MappingRegistry;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
protected void registerHandlerMethod(Object handler, Method method, T mapping) {
this.mappingRegistry.register(mapping, handler, method);
}

public void register(T mapping, Object handler, Method method) {
// Assert that the handler method is not a suspending one.
if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
throw new IllegalStateException("Unsupported suspending handler method detected: " + method);
}
this.readWriteLock.writeLock().lock();
try {
//封装成HandlerMethod
HandlerMethod handlerMethod = createHandlerMethod(handler, method);
//判断mapping是否已经存在,存在抛异常
validateMethodMapping(handlerMethod, mapping);
//关联 mapping-->handlerMethod
this.mappingLookup.put(mapping, handlerMethod);
//根据mapping获取url并且将Url和mapping关联起来
List<String> directUrls = getDirectUrls(mapping);
for (String url : directUrls) {
this.urlLookup.add(url, mapping);
}

//关联name【RequestMappingInfoHandlerMethodMappingNamingStrategy.getName】和handlerMethod
String name = null;
if (getNamingStrategy() != null) {
name = getNamingStrategy().getName(handlerMethod, mapping);
addMappingName(name, handlerMethod);
}

//corsLookup绑定handlerMethod和corsConfig,注解或者方法上带CrossOrigin
CorsConfiguration corsConfig = initCorsConfiguration(handler, method, mapping);
if (corsConfig != null) {
this.corsLookup.put(handlerMethod, corsConfig);
}

this.registry.put(mapping, new MappingRegistration<>(mapping, handlerMethod, directUrls, name));
} finally {
this.readWriteLock.writeLock().unlock();
}
}

至此所有的RequestMapping都注册到容器中

doDipatch时候

通过reqeust获取handler和拦截器封装成HandlerExecutionChain过程

找到在初始化时候注册的handlerMappings,调用它们的getHandler方法,具体实现在AbstraceHanlderMapping中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public final HandlerExecutionChain getHandler(HttpServletRequest request) throws Exception {
//根据request获取一个指定的handler,如果没找到返回null,类型是HandlerMethod,调用链路RequestMappingHandlerMapping.getHandlerInternal-->AbstractHandlerMethodMapping.getgetHandlerInternal

//重点方法1
Object handler = getHandlerInternal(request);
if (handler == null) {
handler = getDefaultHandler();
}
if (handler == null) {
return null;
}
// Bean name or resolved handler?
if (handler instanceof String) {
String handlerName = (String) handler;
handler = obtainApplicationContext().getBean(handlerName);
}

//重点方法2 返回HandlerExecutionChain【handler和拦截器】的封装
HandlerExecutionChain executionChain = getHandlerExecutionChain(handler, request);

if (logger.isTraceEnabled()) {
logger.trace("Mapped to " + handler);
} else if (logger.isDebugEnabled() && !request.getDispatcherType().equals(DispatcherType.ASYNC)) {
logger.debug("Mapped to " + executionChain.getHandler());
}

if (hasCorsConfigurationSource(handler)) {
CorsConfiguration config = (this.corsConfigurationSource != null ? this.corsConfigurationSource.getCorsConfiguration(request) : null);
CorsConfiguration handlerConfig = getCorsConfiguration(handler, request);
config = (config != null ? config.combine(handlerConfig) : handlerConfig);
executionChain = getCorsHandlerExecutionChain(request, executionChain, config);
}

return executionChain;
}

重点方法1 getHandlerInternal

这里的实现见:AbstractHandlerMethodMapping.getgetHandlerInternal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@Override
protected HandlerMethod getHandlerInternal(HttpServletRequest request) throws Exception {
//获取urlPath用于查找HandlerMethod
String lookupPath = getUrlPathHelper().getLookupPathForRequest(request);
request.setAttribute(LOOKUP_PATH, lookupPath);
this.mappingRegistry.acquireReadLock();
try {
//查找HandlerMethod 优先通过Path找,没找到轮询mappingLookup,之后对找到的HandlerMethod排序找到最合适的HandlerMethod
HandlerMethod handlerMethod = lookupHandlerMethod(lookupPath, request);
//re-create bean 享元模式防止请求修改HandlerMethod
return (handlerMethod != null ? handlerMethod.createWithResolvedBean() : null);
} finally {
this.mappingRegistry.releaseReadLock();
}
}

protected HandlerMethod lookupHandlerMethod(String lookupPath, HttpServletRequest request) throws Exception {
//
List<Match> matches = new ArrayList<>();
//通过path查找到符合要求的RequestMappingInfo
List<T> directPathMatches = this.mappingRegistry.getMappingsByUrl(lookupPath);
if (directPathMatches != null) {
addMatchingMappings(directPathMatches, matches, request);
}

//如果通过path没找到去mappingLookup轮询一遍查找
if (matches.isEmpty()) {
// No choice but to go through all mappings...
addMatchingMappings(this.mappingRegistry.getMappings().keySet(), matches, request);
}

//不为空
if (!matches.isEmpty()) {
//根据RequestMappingInfoHandlerMapping.getMappingComparator排序Match.mapping的Compare然后取第一个
// todo 优先级未看
Comparator<Match> comparator = new MatchComparator(getMappingComparator(request));
matches.sort(comparator);
//排序后取第一个作为返回结果
Match bestMatch = matches.get(0);
//取出来俩个然后优先级一样直接抛出异常意思是Handler重复"多个优先级"
if (matches.size() > 1) {
if (logger.isTraceEnabled()) {
logger.trace(matches.size() + " matching mappings: " + matches);
}
if (CorsUtils.isPreFlightRequest(request)) {
return PREFLIGHT_AMBIGUOUS_MATCH;
}
Match secondBestMatch = matches.get(1);
if (comparator.compare(bestMatch, secondBestMatch) == 0) {
Method m1 = bestMatch.handlerMethod.getMethod();
Method m2 = secondBestMatch.handlerMethod.getMethod();
String uri = request.getRequestURI();
throw new IllegalStateException(
"Ambiguous handler methods mapped for '" + uri + "': {" + m1 + ", " + m2 + "}");
}
}
request.setAttribute(BEST_MATCHING_HANDLER_ATTRIBUTE, bestMatch.handlerMethod);
//为request设置值:PATH_WITHIN_HANDLER_MAPPING_ATTRIBUTE,返回对应的handlerMethod
//这里其实就是把mapping,和handler都返回只不过mapping放在了requst里
handleMatch(bestMatch.mapping, lookupPath, request);
return bestMatch.handlerMethod;
} else {
//都没匹配到调用该方法见RequestMappingInfoHandlerMapping,其中不符合RequestMappingInfo的会抛异常
return handleNoMatch(this.mappingRegistry.getMappings().keySet(), lookupPath, request);
}
}

重点方法2 getHandlerExecutionChain

返回handler和拦截器的封装,注:springboot的WebMvcConfigurer.addInterceptors方法中添加的拦截器会转成MappedInterceptor添加到RRequestMappingHandlerMapping中代码可以从WebMvcAutoConfiguration中看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
protected HandlerExecutionChain getHandlerExecutionChain(Object handler, HttpServletRequest request) {
//如果是一个存在的HandlerExecutionChain,直接返回,否则new一个HandlerExecutionChain
//new HandlerExecutionChain(handler)在HandlerExecutionChain的构造方法里会再判断下Handler的类型,
// 如果是HandlerExecutionChain进行merge
// 如果是HandlerMethod会进行赋值具体见HandlerExecutionChain构造方法
HandlerExecutionChain chain = (handler instanceof HandlerExecutionChain ?
(HandlerExecutionChain) handler : new HandlerExecutionChain(handler));

String lookupPath = this.urlPathHelper.getLookupPathForRequest(request, LOOKUP_PATH);
//处理拦截器,
// 如果是MappedInterceptor,判断lookupPath是否匹配如果匹配将拦截器加入到HandlerExecutionChain
// 如果是HandlerInterceptor 或者WebRequestInterceptor直接将拦截器加入到HandlerExecutionChain
// todo 注意:springboot 通过WebMvcConfigurer配置的interceptor最终都会转成MappedInterceptor,
// 见InterceptorRegistration.getInterceptor(),入口在WebMvcAutoConfiguration.requestMappingHandlerMapping()方法
for (HandlerInterceptor interceptor : this.adaptedInterceptors) {
if (interceptor instanceof MappedInterceptor) {
MappedInterceptor mappedInterceptor = (MappedInterceptor) interceptor;
if (mappedInterceptor.matches(lookupPath, this.pathMatcher)) {
chain.addInterceptor(mappedInterceptor.getInterceptor());
}
} else {
chain.addInterceptor(interceptor);
}
}
return chain;
}

返回了包含handler和interceptor的HandlerExecutionChain为下一步handlerAdapter执行做好准备

附录:MappingRegistry几个重要的容器

上面提到的register过程中涉及到了MappingRegistry类,它包含了几个重要容器用来缓存url,RequestMappingInfo HandlerMethod的对应关系

  • urlLookup: url–>List<RequestMappingInfo>
  • mappingLookup:RequestMappingInfo–>HandlerMethod

附录:@RequestMapping注解

RequestMapping是一个用来处理请求地址映射的注解,可用于类或方法上。用于类上,表示类中的所有响应请求的方法都是以该地址作为父路径。

  • value:指定请求的实际地址,指定的地址可以是URI Template 模式(后面将会说明);
  • method:指定请求的method类型, GET、POST、PUT、DELETE等;
  • consumes:指定处理请求的提交内容类型(Content-Type),例如application/json, text/html;
  • produces:指定返回的内容类型,仅当request请求头中的(Accept)类型中包含该指定类型才返回;
  • params:指定request中必须包含某些参数值是,才让该方法处理。
  • headers:指定request中必须包含某些指定的header值,才能让该方法处理请求。

如何精炼领域模型

Posted on 2020-10-04 | Edited on 2022-09-21 | In 读后感 , 领域驱动设计

如何保持领域模型的完整性描述如何通过Bounded Context使自己的领域模型保持完整,但是随着业务的不断发展模型也在不断的完善,模型本身会膨胀到我们难以控制。所以我们要时刻精简我们的模型,如何清晰的明确哪些是我们模型要解决的主要矛盾,即:“Core Domain”

Core Domain

核心领域,我们建模中亟待解决的主要矛盾

  • 它内容应该是精炼的
  • 它在工作中的优先级应该是最高的最先被解决的
  • 它应该是趋于稳定的,因为Core Domain经常被变更很有可能是我们对模型理解不够

Generic Subdomain

一些和我们当前要解决的核心问题【Core Domain】的通用知识我们应该分离出去至少在设计Core Domain时候它们的优先级不高,属于次要矛盾。比如订单系统中的“用户”,它不属于订单系统的Core Domain,订单系统的Core Domain更应该是订单的状态、流转等信息

突出Core Domain的俩种方法

  1. Domain Vision Statement-领域远景说明:它很类似我们上学时候总结文章的主要内容,应该简要指出我们当前核心要解决的问题主旨,以及我们模型的核心价值,与此无关的信息都不要提,也就是我们常说的痛点。
  2. Highted Core:突出核心,Core Domain应该能很容易的被分辨出来,应该被团队所有人非常容易的理解

Cohesive Mechanism 内聚机制

如果模型包含了很复杂、专业的算法,集成在模型中可能导致Core Domain的混乱那么我们可以把算法抽象出去并且提供api给Core Domain。很类似Generic Subdomain,它俩的区别可能Cohesive Mechanism只是封装了算法而不是模型。

Segregated Core 分离核心

模型中如果有一些起到支撑作用的对象、类和Core Domain掺杂在一起我们应该对其进行重构以保持Core Domain的简介、易理解。这时候我们往往先审核Core Domain将和其无关的代码放在其他的Moudle中,然后分别对Core Domain和那个Moudle进行重构。

难点在于,团队对于Core Domain的统一认知,因为Core Domain在重构过程中是不断变化的

##

如何保持领域模型的完整性

Posted on 2020-10-01 | Edited on 2022-09-21 | In 读后感 , 领域驱动设计

随着团队的扩张以及各种业务需求,我们单一的模型往往会逐步的扩大到一个不可维护的状态,如:概念的复用导致业务的耦合,这时候我们为了保证领域模型和Ubiquitous Language在概念上一致可能会涉及到模型的拆分,将一个单一的大模型拆分到多个小模型中,同时团队也随着模型打散、重组。让各人员各司其职,这个过程往往成本很高且漫长为了少走弯路我们可以有以下几个方法和原则去遵循

Bounded Context

字面意思限定上下文,就是我们要针对我们的业务划分出不同的边界。每一个边界是一个Bounded Context,一个概念在Bounded Context内部是高度统一的,它只作用于这个Context中。不同的Bounded Context之间应该尽量隔离,如果它们需要通信,那么最好约定好一个协议通过接口方式交互。否则,俩个Bounded Context共用一个模型轻则出现一个有N多同质属性的对象,或者一个属性在俩个Context中定义的不一样的情况。

Continuous Integration

字面理解像是持续集成的意思,当我们确定好Bounded Context,为了不断的迭代、精华模型,团队成员要持续的集成完善Bounded Context。【注意Continuous Integration是指团队中的是在一个Bounded Context中进行的】

注意,为了实现Continuous Integration我们应该

  1. 分布集成,采用可重现的合并/构建技术
  2. 自动化测试套件;【单元测试可满足】
  3. 小范围的集成试错,并且制定一每一次集成的Milestone
  4. 集成中坚持使用Ubiquitous Language

Context Map

当一个大项目出现多个Bounded Context时候我们要规划一个全局的Context Map,并且找到他们的关联,以及关联的交互方式。后面会讲集中Bounded Context的交互方式。

Shared Kernel

最理想的方式,假如俩个模型高度重合我们可用共用一部分基础设施或者底层逻辑。避免一部分重复的工作。比如共用一种数据库、共用一套数据层

Customer/Suppiler Development Team

俩个模型是下游依赖上游的单向依赖关系。下游依赖上游输出,但是上游不需要下游的反馈,如:业务的统计系统,需要上游的日志、数据,上游业务系统却不太需要统计系统的反馈。这时候俩个模型可以采用这种客户/供应商关系

  1. 当下游依赖上游的输出和结果,并且是依赖是单向的
  2. 上游愿意配合下游的工作
  3. 一个boss【个人觉得这个最重要。起码在我司】

Conformist

跟随者模式,加入上游不配合或者整个系统没有人维护了,这时候为了上游的数据能响应下游需求,加入上游模型我们可以复用质量不差,便于使用,我们完全没必要推翻他们,可以选择采用跟随者模式。

使用时候严格遵守上游的Bounded Context可以避免俩个模型之间数据格式的转换

Anticorruption Layer

防腐层【隔离层】,为俩个Bounded Context提供隔离机制。由于上游、下游的Bounded Context无法完全复用,那么数据就需要进行转化,为了防止俩个Context的概念互相渗透,这种转换我们往往放在防腐层进行。

在防腐层我们最长用到的是Adapter和Facade俩种模型,下游的service通过Adpater来兼容调用被上游的服务,Facade在被调用者端隐藏对象数据的组装。防腐层可以部署在下游也可以部署在上游

Separate Way

如果上游的Bounded Context和下游的Bounded Context区别非常大。em…各行其道吧。完全隔离开。

PS:我们审视我们的项目是否有必要把项目集成在一起?他们是否能独立存在?如果能,建议各行其到

Open Host Service

为了减少模型之间的转换工作我们可以为我们的服务提供Api,我想着就是微服务为什么兴起的原因。

总结

个人认为根据Context Map的各Bounded Context的紧密程度来划分:Shared Kernel > Customer/Suppiler Development Team > Conformist>Anticorruption Layer > Separate Way

我们在思考方案时候可以遵循这个原则

另外建议在开发中对于Bounded Context的设计采用减法,程序要保持小规模的持续集成。

Hive Sql学习记录一

Posted on 2020-09-18 | Edited on 2022-09-21 | In 大数据 , Hive

内部表和外部表

  • 内部表【managed table】:默认创建的就是内部表,删除内部表会删除meta信息和数据,适合作为中间表。
  • 外部表 【external table】:创建时候加上关键字external,删除外部表只会删除meta信息不会删除数据,这样可以利用到历史数据,适合多个表利用同一份数据。【比如一份log,多个部门都想利用这时候建议外部表】。
  • 临时表:
  • 分区表:hive的数据本质上是按照一定规范映射成对应的目录存储在hadoop的hdfs上的,一张表对饮一个文件,我们每一次查询实际是全表扫描,为了减少每次查询我们会采用 分区表 的方式存储数据,这样查询时候值需要扫描制定分区的文件即可【对应mysql的数据分表或者分区】,分区表在创建时候需要制定一个字段作为分区字段,insert时候必须带上这个字段。

Location

  • 建外部表使用LOCATION,读取到数据,同时插入数据,会在LOCATION位置新生成文件000000_0,删除外部表,数据文件还在,证明外部表功能还在。
  • 建立外部表,不使用LOCATION,会在默认在/hive/warehouse/数据库名称/表名,建立目录。然后插入数据,会生成新文件,删除表。目录和文件都还在。符合外部表功能。
  • 建内部表使用LOCATION,读取到数据,同时插入数据也会生成新文件,但删除内部表,数据文件都会消失,整个目录都会删除掉,也正好符合内部表的功能。
  • 建立内部表,不使用LOCATION。在相应位置创建新目录,插入数据,一样生成新文件。但是如果删除内部表,目录和数据文件都会删除掉。

Go-sarama的遇到的几个坑

Posted on 2020-08-17 | Edited on 2022-09-21 | In golang , 编程技巧

sarama是golang对kafka操作的封装,在使用过程中主要遇到如下的问题,总结如下:

kafka的error的处理

kakfa的error处理是通过一个 chan <-error 如果不处理error当发生error的时候会造成协程阻塞,导致协程夯住

这里建议放在另一个协程处理,防止处理error影响了message的处理

1
2
3
4
5
go func() {
for err := range c.consumer.Errors() {
log.Trace.Errorf(context.TODO(), "_kafka_error", "errorError:%v", err.Error())
}
}()

sarma-cluster的作用

这个包最大的用处是autocommit offset

封装按照timestamp和condition消费制定offset的数据

首先,根据key找到partiton,根据timestamp找到offset,然后过滤condition。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// PartitionClientConnector 封装kafka的client
// checkTimoutMills,z在拉取消息时候会启动一个ticker检测拉取是否过期,checkTimoutMills用来控制检测器的频率
// timeOutMills,判断上一次拉取消息和当前时间的间隔超过这个值且consumer处于空闲状态会停止拉取消息
type PartitionClientConnector struct {
client sarama.Client
checkTimoutMills int64
timeOutMills int64
}

// NewPartitionConsumer 构造器
func NewPartitionConsumer(brokers []string, clientID string) (*PartitionClientConnector, error) {
client, err := sarama.NewClient(brokers, newSaramaConfig(clientID, sarama.OffsetOldest))
if err != nil {
return nil, err
}
return &PartitionClientConnector{
client: client,
checkTimoutMills: 1000,
timeOutMills: 5000,
}, nil
}

func newSaramaConfig(clientID string, initialOffset int64) *sarama.Config {
config := sarama.NewConfig()
config.Version = sarama.V0_10_2_0
config.Consumer.Offsets.CommitInterval = time.Second
config.Consumer.Offsets.Initial = initialOffset
config.Consumer.Return.Errors = true
config.ClientID = clientID

//sasl
if len(conf.ServiceConfig.KafkaConf.SaslUser) > 0 && len(conf.ServiceConfig.KafkaConf.SaslPassword) > 0 {
config.Net.SASL.User = conf.ServiceConfig.KafkaConf.SaslUser
config.Net.SASL.Password = conf.ServiceConfig.KafkaConf.SaslPassword
config.Net.SASL.Handshake = true
config.Net.SASL.Enable = true
}
return config
}

// getPartition 通过key来取partition的值
func (pc *PartitionClientConnector) GetPartition(topic string, key string) (int32, error) {
partition, err := pc.client.Partitions(topic)
if err != nil {
return -1, err
}
return getPartition(key, len(partition)), nil
}

// GetStartOffsetByTime 获取开始的Offset
func (pc *PartitionClientConnector) GetStartOffsetByTime(topic string, partitionID int32, time int64) (int64, error) {
return pc.client.GetOffset(topic, partitionID, time)
}

func (pc *PartitionClientConnector) createPartitionConsumer(topic string, partitionID int32, beginOffset int64) (sarama.PartitionConsumer, error) {
consumer, err := sarama.NewConsumerFromClient(pc.client)
if err != nil {
return nil, err
}
return consumer.ConsumePartition(topic, partitionID, beginOffset)
}

// PullPartitionMessageByCondition 根据key通过javaHash找到指定的partition,根据Condition拉取消息
func (pc *PartitionClientConnector) PullPartitionMessageByCondition(topic string, key string,
startTime int64, endTime int64,
filter ConsumerMsgFilter,
consumerHandler PullMsgHandler) error {

partitionID, err := pc.GetPartition(topic, key)
if err != nil {
return err
}
startOffset, err := pc.GetStartOffsetByTime(topic, partitionID, startTime)
if err != nil {
return err
}

//
consumer, err := pc.createPartitionConsumer(topic, partitionID, startOffset)
if err != nil {
return err
}
defer consumer.Close()

endOffset, err := pc.GetStartOffsetByTime(topic, partitionID, endTime)
if err != nil {
return err
}

startOffset, endTime, endOffset)
var msgOut int64
//check timeoout
ticker := time.NewTicker(time.Duration(pc.checkTimoutMills) * time.Millisecond)

lastRecvNano := time.Now().UnixNano()
isIdle := true
for {
select {
case msg := <-consumer.Messages():
msgOut = msg.Offset
isIdle = false
if endOffset > startOffset && msg.Offset > endOffset {
isIdle = true
goto stopLoop
}
if msg.Offset < startOffset {
continue
}
if filter != nil && !filter.Conditional(msg) {
continue
}
consumerHandler.processMsg(msg)
lastRecvNano = time.Now().UnixNano()
isIdle = true
case err := <-consumer.Errors():
goto stopLoop
case <-ticker.C: //检查过期停止阻塞 当现在的事件-上一次拉取的时间>设置的timeOutMills,并且是空闲状态
if (time.Now().UnixNano()-lastRecvNano)/int64(time.Millisecond) >= pc.timeOutMills &&
isIdle {
log.Trace.Infof(context.TODO(), trace.DLTagHTTPFailed, "time out.spent")
goto stopLoop
}
}
}
stopLoop:
return nil
}

// Close 关闭清理资源
func (pc *PartitionClientConnector) Close() {
pc.client.Close()
}

func getPartition(key string, partition int) int32 {
return int32(math.Abs(float64(stringutil.JavaHashCode(key) % int64(partition))))
}
1234…23

Liu hao

励志当好厨子的程序员

229 posts
54 categories
81 tags
RSS
GitHub E-Mail
© 2018 – 2023 Liu hao
Powered by Hexo v3.9.0
|
Theme – NexT.Pisces v7.0.0