liuhao163.github.io

杂七杂八


  • Home

  • Categories

  • Tags

  • Archives

  • Sitemap

ThreadLocal:减少线程之间的共享

Posted on 2019-05-18 | Edited on 2022-09-21 | In java , 并发

解决并发问题最简单的方案就是减少共享,JAVA提供了一个简单的工具类即ThreadLocal,我们会用他来保存线程之间独立存在的变量,比如springmvc中,我们每个请求特有的一些属性。

ThreadLocal的设计

在设计上JAVA的Thread内部类会持有一个ThreadLocalMap的对象threadLocals,Map的结构(ThreadLocal,Value),其中ThreadLocal是弱引用。
这么做的设计有俩点:

  1. 因为ThreadLocal对象和线程的声明周期息息相关,从数据的亲原性来讲,由Thread对象管理更为合适
  2. 防止内存泄露,因为ThreadLocal内部持有线程和Value的对应关系,当线程声明结束后,ThreadLocal中的Thread对象不会主动释放。从而造成内存泄露

ThreadLocal的内存泄露

在线程池中,Thread的生命周期往往很长,而因为ThreadLocalMap对于ThreadLocal是软引用,所以当ThreadLocal没有引用时候会被释放掉,但是Value确是强引用,所以会造成ThreadLocal释放掉了,但是Value没法释放导致内存泄露

解决方法:手动释放

InheritableThreadLocal继承性

因为ThreadLocal是线程不共享的,所以如果线程在创建一个子线程是看不到主线程ThreadLocal对象的值的,为了让子线程能看到父类线程提供InheritableThreadLocal,但是不建议使用,因为往往会导致子线程修改了值引起主线程的逻辑混乱。

Copy-on-Write模式Copy-on-Write模式:不是延时策略的COW

Posted on 2019-05-18 | Edited on 2022-09-21 | In java , 并发

在操作系统和编程领域中充斥这大量的CoW。linux的进程fork,redis扩容时候的cow策略,以及java中的CopyOnWriteArrayList和CopyOnWriteArraySet等。

JAVA中的CoW

我们在面试会经常问List或者Set的线程安全的替代方案是什么?Vector但是Vector性能不高,性能高的替代方案是什么呢?就是CopyOnWriteArrayList和CopyOnWriteArraySet。

Cow容器的缺点:

  1. 因为在修改时候会复制整个容器,所以是以牺牲内存为代价的
  2. 因为复制容器所以在访问时候会有一定的时延。

Cow的原理和使用场景

在读取数据的时候共享,在修改的时候会开始复制,并且为了保证线程安全可能会在复制时候采取阻塞策略。使用场景读多写少。

额外的思考

redis的cow的扩容机制

Immutability模式:如何利用不变性解决并发问题?

Posted on 2019-05-18 | Edited on 2022-09-21 | In java , 并发

并发情况下如果对一个共享变量进行读写就会有并发问题,如果只有读是没有并发问题的,所以解决并发问题的一个重要思路就是不提供变量的写的功能

如何快速实现不可变类

  1. 类的声明以及属性都要声明成final对象(类的声明是为了防止子类继承父类修改父类的属性)
  2. 只提供只读的方法,如果要修改对象,去创建一个新的不可变对象。

例如String,是典型的一个不可变对象,String中的replace对象在修改时候我们是创建了一个新的不可变对象

利用flyweight模式减少重复对象的创建

由于不可变对象不能修改,如果遇到修改话可能会产生大量的对象占用系统内存,为了避免这种情况发生JAVA采用了flyweight模式来减少对象的产生,用一句话概述

提前缓存好一部分对象,如果没有在去创建,如果有就直接取用,因为这些对象是不可变的共享完全没问题。
比较特殊的是Long,因为对象范围比较大,所以Long只保存了-128-127的对象。具体见代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Long valueOf(long l) {
final int offset = 128;
// [-128,127] 直接的数字做了缓存
if (l >= -128 && l <= 127) {
return LongCache
.cache[(int)l + offset];
}
return new Long(l);
}
// 缓存,等价于对象池
// 仅缓存 [-128,127] 直接的数字
static class LongCache {
static final Long cache[]= new Long[-(-128) + 127 + 1];

static {
for(int i=0; i<cache.length; i++)
cache[i] = new Long(i-128);
}
}

注意事项

不可变对象的属性即时声明了不可变,但是他的属性也有可能改变如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//Foo 线程安全
class Foo {
int age = 0;
String name = "abc";
}

final class Bar {
final Foo foo=new Foo();//可改变foo里的属性
final int[] i=new int[10];//可改变数组元素

public void setAge(int i){
foo.age=i;
}
}

不可变类作为属性本身不具备不可见性并且是线程不安全的

1
2
3
4
5
6
7
8
9
10
11
12
//Foo 线程安全
final class Foo{
final int age=0;
final int name="abc";
}
//Bar 线程不安全
class Bar {
Foo foo;//并发时候会有问题
void setFoo(Foo f){
this.foo=f;
}
}

Future:如何用多线程实现最优的“烧水泡茶”程序?

Posted on 2019-04-28 | Edited on 2022-09-21 | In java , 并发

线程池中可以通过Future模式实现对异步任务的状态的监控和获取异步任务的结果,具体见ExecutorService下的3个方法

获取异步任务的结果

1
2
3
4
5
6
// 提交 Runnable 任务
Future<?> submit(Runnable task)
// 提交 Callable 任务
<T> Future<T> submit(Callable<T> task);
// 提交 Runnable 任务及结果引用
<T> Future<T> submit(Runnable task, T result);

第一个方法是无返回值很类似join方法
第二个方法是可以获取task的返回值
第三个方法是可以通过T result来实现俩个线程之间的交互。如下

FutreTask

和Furtre很类似但是它继承自Furture和Runable可以作为对象传递给线程池或者线程。然后通过get来获取执行结果。如下代码,开水泡茶问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class FutureTest {
private static ExecutorService executor = Executors.newFixedThreadPool(2);

public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> ft2 = new FutureTask<>(() -> {
System.out.println("洗茶杯");
Thread.sleep(1000L);

System.out.println("找茶叶");
Thread.sleep(1000L);

return "龙井";
});

FutureTask<String> ft1 = new FutureTask<>(() -> {
System.out.println("洗茶壶");
Thread.sleep(1000L);

System.out.println("烧水");
Thread.sleep(10000L);
System.out.println("水开了");
String ret = ft2.get();
return "上茶"+ret;
});


executor.submit(ft1);
executor.submit(ft2);
System.out.println(ft1.get());
}
}

近期的使用场景:
比如一个很大的列表查询,你可以拆分成10个线程同时查询然后在查询完后合并成一个结果集。

CompletableFuture

简化Future的线程调度难度。构造函数是4个静态方法

1
2
3
4
5
6
// 使用默认线程池
static CompletableFuture<Void> runAsync(Runnable runnable)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 可以指定线程池 对于IO这种消耗线程的方式强烈推荐
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

ft1完成后执行的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Function对象有参数有返回值
CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);

# Consumer对象有参数无返回值
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);

# Runnable对象无参数无返回值
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);

CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);

And合并关系

ft1、ft2都结束后做一些操作,类似join。方法区别如上

1
2
3
4
5
6
CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);

OR或者关系

ft1、ft2有一个完成

1
2
3
4
5
6
CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);

异常处理

1
2
3
4
5
CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);

CompletionService

我们用ThreadPool和Future优化了步编程中协作的部分,取代了线程的join。但是我们在实际使用中往往会碰到下面的问题,具体见代码

  1. 运行了3个异步任务,分别用了15s,10s,5s。
  2. 之后会在另一个线程池对这些任务的结果进行计算。这时候在计算的线程池中会变成串行,由于1任务过久,所以任务2,3即便先完成了前面的步骤也要等1完成后在执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(3);

Future<Integer> ft1 = threadPool.submit(() -> {
Thread.sleep(15000);
System.out.println("ft1 sleep 15s return 1");
return 1;
});

Future<Integer> ft2 = threadPool.submit(() -> {
Thread.sleep(10000);
System.out.println("ft2 sleep 10s return 2");
return 2;
});

Future<Integer> ft3 = threadPool.submit(() -> {
Thread.sleep(5000);
System.out.println("ft3 sleep 5s return 3");
return 3;
});


ExecutorService calT=Executors.newSingleThreadExecutor();
Future ftRes=calT.submit(()->{
try {
System.out.println("save:"+ft1.get());
System.out.println("save:"+ft2.get());
System.out.println("save:"+ft3.get());
//todo 比较大小
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});

ftRes.get();
}

如果对这段代码进一步优化呢我们可以采用增加阻塞队列的方式。许多人加个阻塞队列就好了。异步执行的结果加入到阻塞队列中,最终通过阻塞队列后去方法。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
BlockingQueue<Integer> queue=new ArrayBlockingQueue<Integer>(3);
ExecutorService calT=Executors.newFixedThreadPool(3);
calT.execute(()-> {
queue.put(ft1.get());
});
calT.execute(()-> {
queue.put(ft2.get());
});
calT.execute(()-> {
queue.put(ft3.get());
});
for(int i=0;i<3;i++){
try {
System.out.println("save:"+ queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("all finish");

确实能解决上述的问题,JAVA8之后为我们提供了更方便的写法(实现原理相同),即:CompletionService,有另种狗仔方法

1
2
new ExecutorCompletionService(Executor executor); //LinkedBlockingQueue 默认使用
new ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue)

上面的程序可以简写为这样

1
2
3
4
5
6
7
8
9
10
11
ExecutorCompletionService<Integer> ecs=new ExecutorCompletionService(threadPool,queue);
ecs.submit(callable1);
ecs.submit(callable2);
ecs.submit(callable3);
for(int i=0;i<3;i++){
try {
System.out.println("save:"+ ecs.take().get());
} catch (InterruptedException e) {
e.printStackTrace();
}
}

关于take()和pool()区别。take()在队列为空会阻塞线程,poll如果队列为空会返回null,

1
2
3
4
5
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;//会等待一段时间。

使用场景

Dubbo的Forking Cluster模式,对于实时性较高的操作,consumer端会同时并行调用俩个请求,哪个现有结果优先返回,代码如下,不过缺点是比较浪费资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
*
* @param fork 并行度是2
* @param timeOutMills 超时时间
* @param callable 逻辑方法
* @param <T> 返回值,多个方法返回第一个返回的
* @return
*/
public <T> T forkCluster(int fork, int timeOutMills, Callable<T> callable) {
ExecutorCompletionService<T> forkService = new ExecutorCompletionService(Executors.newFixedThreadPool(fork), new ArrayBlockingQueue<>(fork));
List<Future<T>> ftContainer=new ArrayList<>();
for (int i = 0; i < 2; i++) {
ftContainer.add(forkService.submit(callable));
}

try {
T result = forkService.poll(timeOutMills, TimeUnit.MILLISECONDS).get();
if (result == null) {
throw new RuntimeException("返回结果为空抛异常");
}
return result;
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException("返回结果为空抛异常");
} catch (ExecutionException e) {
e.printStackTrace();
throw new RuntimeException("返回结果为空抛异常");
}finally {
ftContainer.forEach(ft->{
ft.cancel(true);
});
}
}

Fork/join用分治思想加速我们的递归

待完善

原子类:无锁工具类的典范

Posted on 2019-04-28 | Edited on 2022-09-21 | In java , 并发

无锁方案,在竞争冲突不大的情况下比锁性能会高不少。对应JAVA中各种的Atmoic类。实现原理是硬件的支持,CPU提供了原子化的支持即CAS(Compare And Swap)指令。CAS包含3个字段:共享变量的内存地址A;用于比较值B和共享变量的新值C。只有A的值等于B才允许更新成C。作为一条CPU指令其本身就是原子性的。很类似乐观锁的原理。JAVA在实现是是通过自旋来解决的如下伪代码:

1
2
3
4
5
6
7
8
public void add10K() {
int newVal = -1;
int cur = -1;
do {
cur = ai.get();//get 单前值
newVal = cur + 10000; //得到新值
} while (ai.compareAndSet(cur, newVal)); //失败后自旋
}

java中是通过sun.misc.Unsafe来实现的

CountDownLatch和CyclicBarrier:如何让多线程步调一致?

Posted on 2019-04-28 | Edited on 2022-09-21 | In java , 并发

CountDownLatch和CyclicBarrier主要解决线程的调度问题。例如如下场景:订单的对账,订单系统完成的订单和已经派发的订单的对账,对于有异常的订单要修复。这里是3个操作,见下面伪代码

1
2
3
4
5
Order order=getOrder();

POrder porder=getPOder();

checkAndFix(order,porder);

如果每天的订单量很大这就会很耗时,我们可以考虑串行的方法,可以用线程池的方法同时去查询俩个订单,然后在check这时候我们要解决线程调度的问题,CountDownLatch就排上了用场,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CountDownLatch countDownLatch = new CountDownLatch(2);
ExecutorService pool = Executors.newFixedThreadPool(2);
AtomicReference<Object> order = new AtomicReference<>();
pool.execute(() -> {
order.set(getOrder());
countDownLatch.countDown();
});

AtomicReference<Object> porder = new AtomicReference<>();
pool.execute(() -> {
porder.set(getPOrder());
countDownLatch.countDown();
});

try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
checkAndFix(order.get(),porder.get());

不过这里checkAndFix还是主线程串行的能否在进一步优化呢,答案是能这里用到CyclicBarrier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
ExecutorService callBack = Executors.newFixedThreadPool(1);
Vector<Object> orders=new Vector<>();
Vector<Object> porders=new Vector<>();
CyclicBarrier cyclicBarrier = new CyclicBarrier(2,()->{
callBack.execute(()->{
checkAndFix( orders.get(0),porders.get(0));
});
});

while(true){
ExecutorService pool = Executors.newFixedThreadPool(2);
pool.execute(() -> {
orders.add(getOrder());
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
pool.execute(() -> {
porders.add(getPOrder());
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}

CyclicBarrier和CountDownLatch的区别是可重入的,当见到0还会从2开始。另外CyclicBarrier可以设置Callback

CountDownLatch和CyclicBarrier:如何让多线程步调一致?

Posted on 2019-04-24 | Edited on 2022-09-21

StampdLock:Java8中的乐观锁

Posted on 2019-04-24 | Edited on 2022-09-21 | In java , 并发

上文中讲到了ReadWriteLock这种读写锁方法作为缓存策略那么有没有效率更高的方法

重点方法
long stamp = stampedLock.tryOptimisticRead();//乐观锁
long stamp = stampedLock.readLock(); //悲观读锁
long stamp = stampedLock.writeLock();//主观读锁
boolean hasWriteLock = stampedLock.validate();//如果有写锁返回false如果没有返回true

如下面的代码先通过tryOptimisticRead获取一个stamp这个操作是没有锁的,这中间可以读取变量,然后在调用validate方法校验是否有写锁,如果有,将锁升级成读锁。如果没有直接返回。因为没有写锁时候是不加锁的所以效率很高。这点很类似咱们mysql的乐观锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public long read() {
long stamp = stampedLock.tryOptimisticRead();
//无锁操作
//如果有写锁返回false,升级成悲观读
if (!stampedLock.validate(stamp)) {
stamp = stampedLock.readLock();
try {
//读模板
} finally {
stampedLock.unlockRead(stamp);
}
}
return i;
}

//写模板
public void write(long i) {
long stamp = stampedLock.writeLock();
try {
//todo 写
} finally {
stampedLock.unlock(stamp);
}
}

ReadWriteLock:如何快速实现一个完备的缓存

Posted on 2019-04-21 | Edited on 2022-09-21 | In java , 并发

ReadWirteLock的作用和说明

ReadWriteLock是锁的一种,他适合哪种读多写少的场景,做有名的就是缓存场景。因为他们的读锁是不互斥的,但是读锁和写锁、写锁之间是互斥的。具体使用方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Cache<K,V> {
final Map<K, V> m =
new HashMap<>();
final ReadWriteLock rwl =
new ReentrantReadWriteLock();
// 读锁
final Lock r = rwl.readLock();
// 写锁
final Lock w = rwl.writeLock();
// 读缓存
V get(K key) {
r.lock();
try { return m.get(key); }
finally { r.unlock(); }
}
// 写缓存
V put(String key, Data v) {
w.lock();
try { return m.put(key, v); }
finally { w.unlock(); }
}
}

进一步的实现懒加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class Cache<K,V> {
final Map<K, V> m =
new HashMap<>();

final ReadWriteLock rwl = new ReentrantReadWriteLock();
final Lock r = rwl.readLock();
final Lock w = rwl.writeLock();
V get(K key) {
V v = null;
// 读缓存
r.lock(); ①
try {
v = m.get(key); ②
} finally{
r.unlock(); ③
}
// 缓存中存在,返回
if(v != null) { ④
return v;
}
// 缓存中不存在,查询数据库
w.lock(); ⑤
try {
// 再次验证
// 其他线程可能已经查询过数据库
v = m.get(key); ⑥
if(v == null){ ⑦
// 查询数据库
v= 省略代码无数
m.put(key, v);
}
} finally{
w.unlock();
}
return v;
}
}

读写锁的升级和降级

注意,读写锁是不支持锁的升级的即,在读锁中升级到写锁,因为读锁和写锁互斥,因为读锁升级到写锁的时候,读写锁互斥、写锁需要等待读锁释放,而读锁被写锁阻塞住会造成死锁,但是反过来写锁降级是可以的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class CachedData {
Object data;
volatile boolean cacheValid;
final ReadWriteLock rwl =
new ReentrantReadWriteLock();
// 读锁
final Lock r = rwl.readLock();
// 写锁
final Lock w = rwl.writeLock();

void processCachedData() {
// 获取读锁
r.lock();
if (!cacheValid) {
// 释放读锁,因为不允许读锁的升级
r.unlock();
// 获取写锁
w.lock();
try {
// 再次检查状态
if (!cacheValid) {
data = ...
cacheValid = true;
}
// 释放写锁前,降级为读锁
// 降级是可以的
r.lock(); ①
} finally {
// 释放写锁
w.unlock();
}
}
// 此处仍然持有读锁
try {use(data);}

finally {r.unlock();}
}
}

Semaphore:如何构建限流器

Posted on 2019-04-21 | Edited on 2022-09-21 | In java , 并发

信号量模型

如图:

avatar

简单来说就是一个队列,一个计数器,三个方法

init()–初始化队列和信号量
down()–信号量-1如果<0,当前线程阻塞,并且进入队列等待。
up()–信号量+1如果>=0,则唤醒一个线程别且把它从当前队列中剔除。

在java当中的Semaphore的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
 static int count;
// 初始化信号量
static final Semaphore s
= new Semaphore(1);
// 用信号量保证互斥
static void addOne() {
s.acquire();
try {
count+=1;
} finally {
s.release();
}
}

信号量是如何保证原子性的呢?
俩个线程都执行s.acquire()是原子性的会修改计数器,假设线程1先执行,所以计数器是0,线程2为-1,因为线程2小于0所以线程2阻塞,并且将当前线程加入到队列中线程1大于等于0所以线程1执行count+=1;这时候线程1执行s.release();计数器为0,大于等于0,所以唤醒队列中的一个线程。

Semaphore的作用

和Lock相比可以同时允许多个线程同时访问临界区。下面的代码是事先了一个队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class ObjPool<T, R> {
final List<T> pool;
// 用信号量实现限流器
final Semaphore sem;
// 构造函数
ObjPool(int size, T t){
pool = new Vector<T>(){};
for(int i=0; i<size; i++){
pool.add(t);
}
sem = new Semaphore(size);
}
// 利用对象池的对象,调用 func
R exec(Function<T,R> func) {
T t = null;
sem.acquire();
try {
t = pool.remove(0);
return func.apply(t);
} finally {
pool.add(t);
sem.release();
}
}
}
// 创建对象池
ObjPool<Long, String> pool =
new ObjPool<Long, String>(10, 2);
// 通过对象池获取 t,之后执行
pool.exec(t -> {
System.out.println(t);
return t.toString();
});
1…161718…23

Liu hao

励志当好厨子的程序员

229 posts
54 categories
81 tags
RSS
GitHub E-Mail
© 2018 – 2023 Liu hao
Powered by Hexo v3.9.0
|
Theme – NexT.Pisces v7.0.0