Java并发编程实践

线程安全

定义: 1)不管运行时线程采取何种调度方式, 2)代码中不需要任何额外的同步或协同, 这个类都能表现出正确的行为, 无状态的对象, 不可变对象一定是线程安全的

同步机制: synchronized, volatile, lock, atom

两个维度: 防止某个线程正在使用对象状态而另一个线程在同时修改状态(同步机制); 确保当一个线程修改了对象状态后, 其他线程能够看到发生的状态变化(可见性)

synchronized 是可重入的, 重入的一种实现方法是为每个锁关联一个获取计数值和一个所有者线程。当计数值为0时, 这个锁就被认为是没有被任何线程持有

重排序: 在没有同步的情况下, 编译器、处理器以及运行时等都可能对操作的执行顺序进行一些意想不到的调整。在缺乏足够同步的多线程程序中, 要相对内存操作的执行顺序进行判断, 几乎无法得出正确的结论

进一步衍生: 重排序的As-if-serial语义意思是,所有的动作(Action)都可以为了优化而被重排序,但是必须保证它们重排序后的结果和程序代码本身的应有结果是一致的。Java编译器、运行时和处理器都会保证单线程下的as-if-serial语义。
比如,为了保证这一语义,重排序不会发生在有数据依赖的操作之中

加锁的意义: 保证互斥行为(可见性和原子性)以及内存可见性, 确保所有线程都能看到共享变量的最新值, 所有执行读操作或者写操作的线程都必须在同一个锁上同步

volatile 的实现原理: 不会将该变量上的操作重排序(内存屏障), 也不会放入缓存寄存器中

同步机制可确保原子性和可见性, volatile 只能保证可见性(++count 无法保证原子性)

ThreadLocal, 线程封闭, 不共享的变量: 为变量在每个线程中都创建一个副本,每个线程可以访问自己内部的副本变量, 实现为: ThreadLocal -> Map -> thread.threadLocals -> 内部类 ThreadLocalMap, 场景: 数据库连接, Session 管理

想要做到线程安全: 同步保证原子性操作, 轻量 Volatile 保证可见性, 不变的对象一定是线程安全的(final)

内存屏障:
1) Happens-before的前后两个操作不会被重排序且后者对前者的内存可见
2) LoadLoad屏障:对于这样的语句Load1; LoadLoad; Load2,在Load2及后续读取操作要读取的数据被访问前,保证Load1要读取的数据被读取完毕。
StoreStore屏障:对于这样的语句Store1; StoreStore; Store2,在Store2及后续写入操作执行前,保证Store1的写入操作对其它处理器可见。
LoadStore屏障:对于这样的语句Load1; LoadStore; Store2,在Store2及后续写入操作被刷出前,保证Load1要读取的数据被读取完毕。
StoreLoad屏障:对于这样的语句Store1; StoreLoad; Load2,在Load2及后续所有读取操作执行前,保证Store1的写入对所有处理器可见

volatile语义中的内存屏障:

  • 在每个volatile写操作前插入StoreStore屏障,在写操作后插入StoreLoad屏障;

  • 在每个volatile读操作前插入LoadLoad屏障,在读操作后插入LoadStore屏障;

final语义中的内存屏障:

  • 写final域:在编译器写final域完毕,构造体结束之前,会插入一个StoreStore屏障,保证前面的对final写入对其他线程/CPU可见,并阻止重排序。

  • 读final域:在上述规则2中,两步操作不能重排序的机理就是在读final域前插入了LoadLoad屏障。

锁的实现

乐观锁和悲观锁

  • 乐观锁, 持有后再请求, 告知失败, 请求线程可在一定时间内重试, 不会挂起

  • 悲观锁, 持有后再请求, 被挂起, 直到锁释放后恢复线程执行, 有可能获得该锁

构建线程安全类

同步容器: Vector 和 HashTable, 方法级加锁(synchronized), 并发效率低

并发容器: ConcurrentHashMap, ConcurrentLinkedQueue, CopyOnWriteArrayList 等, 灵活加锁, 并发效率高

阻塞队列实现生产者-消费者: BlockingQueue 的实现有 LinkedBlockingQueue 和 ArrayBlockingQueue 都是 FIFO 队列, PriorityBlockingQueue 是优先级队列, 其中 put 和 take 方法是阻塞, offer 和 poll 是非阻塞, 对于无边界的队列而言永远不会阻塞

同步工具类: 信号量(Semaphore), 栅栏(Barrier, 调用 await 的线程会中断, 直到等于 partion 时再开始执行), 闭锁(Latch, 调用 await 的线程会中断, 直到 countDown 为 0 被唤醒)

区别性: 闭锁是一次性的, 不能反复使用, 栅栏可以多次使用; 闭锁用于等待事件,而栅栏用于等待其他线程

工作密取: 消费者各自拥有一个双端队列 Deque, 如果一个消费者完成了自己双端队列中的全部工作,那么他就可以从其他消费者的双端队列末尾秘密的获取工作。具有更好的可伸缩性, 使用场景为网络爬虫

CopyOnWrite 容器适用于读多写少的并发场景, set 方法加锁, get 方法不加锁, set 方法是 copy 一份新的内容, 修改, 再讲原引用指向新的, 这篇文章一目了然: https://www.cnblogs.com/dolphin0520/p/3938914.html

结构化并发程序

无限制创建线程的不足: 线程生命周期开销非常高, 资源消耗大, 稳定性低

Executor 线程池框架, 抽象了执行任务的主要对象不是 Thread, 而是 Executor, 其实现了对生命周期的支持(ExecutorService), 以及统计信息收集, 应用程序管理机制和性能监控等机制, Executor 基于生产者 - 消费者模式

Executors 是线程池的工厂类, 本质上是初始化 ThreadPoolExecutor, 实际开发的时候应该避免使用工厂类

ExecutorService 封装了线程生命周期接口

线程中断, 取消, 关闭

调用 interrput 并不意味着立即停止目标线程正在进行的工作, 而只是传递了请求中断的消息, 通常, 中断是实现取消的最合理方式

通过 Future 和 Executor 框架可以构建可取消的任务

ExecutorService 提供了正常关闭 shutdown() 和 立刻关闭 shutdownNow()两种方法, 正常关闭速度慢但线程安全, 它会等到队列中所有任务都执行完成后才关闭

还可以通过 JVM 钩子关闭线程, 但是极度不推荐, 线程可分为两种: 普通线程和守护线程

使用 newTaskFor 钩子函数来改进用来封装非标准取消的方法。这是 ThreadPoolExecutor 的新特性, 当提交一个 callable 给 ExecutorService 时,submit 返回一个 Future,可以用 Future 来取消任务。

newTaskFor 钩子是一个工厂方法,创建一个 Future 来代表任务,这个 Future 属于由 FutureTask 实现的 RunnableFuture 接口,这个接口可以自定义 cancel 方法,实现自定义的取消方式

线程池

线程池大小如何界定, 是 Fix 还是 Cache?

  • 计算密集型: 线程池大小为: Ncpu + 1, I/O 密集型: 线程池大小为: 2 * Ncpu

  • newFixedThreadPool

    • corePoolSize == maximumPoolSize

    • workQueue 是无界队列(容易造成资源耗尽)

    • 当需要限制当前任务的数量以满足资源管理需求时, 那么可以选择固定大小的线程池(服务器网络请求限制)

  • newCachedThreadPool

    • workQueue 是有界队列, 饱和之后执行拒绝策略, 而实现中采用了 SynchronousQueue(同步移交)

    • 提供比固定大小的线程池更好的排队性能(而不是一直hang住等待线程)

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize, // 基本大小
int maximumPoolSize, // 最大大小
long keepAliveTime, //
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {...}

只有当任务相互独立时, 为线程池或者工作队列设置界限才是合理的, 如果任务之间存在依赖性, 那么有界的线程池或队列就可能导致线程 “饥饿” 死锁问题

活跃性, 性能和测试

死锁与线程获得锁的顺序有关, 在数据库系统的设计中考虑了死锁的检测以及恢复, 即随机选择放弃一个事务, 让其他事务执行

可伸缩性是指: 当增加计算资源时(例如 CPU, 内存, 存储容量或 I/O 带宽), 程序的吞吐量或者处理能力能相应增加

缩小锁的范围(方法 -> 对象 -> 并发容器), 减小锁的粒度, 锁分段(ConcurrentHashMap)

锁的实现

锁干了什么? 答: 对共享资源的互斥性以及内存可见性

内置锁有什么功能局限? 答: 无法中断一个正在等待获取锁的线程, 或者无法在请求获取一个锁时无限等待下去, 阻塞加锁

显示锁有什么好处? 答: 锁粒度更低, 可实现如 ConcurrentHashMap 式的分段加锁

内置锁: Synchronized 和 volatile

显示锁: ReentrantLock

内置锁的实现原理

synchronized 有以下3种应用方式

  • 修饰实例方法,作用于当前实例加锁

  • 修饰静态方法,作用于当前类对象加锁

  • 修饰代码块,指定加锁对象,对给定对象加锁

同步代码块的实现是基于进入和退出管程(Monitor)对象实现, 翻译成字节码是 monitorenter 和 monitorexit

同步方法的实现是JVM通过ACC_SYNCHRONIZED访问标志来辨别一个方法是否声明为同步方法

volatile 的实现原理: 不会将该变量上的操作重排序(内存屏障), 也不会放入缓存寄存器中

volatile语义中的内存屏障:

  • 在每个volatile写操作前插入StoreStore屏障,在写操作后插入StoreLoad屏障;

  • 在每个volatile读操作前插入LoadLoad屏障,在读操作后插入LoadStore屏障;

Synchronized 可确保原子性和可见性, volatile 只能保证可见性(++count 无法保证原子性)

显示锁的实现原理

1
2
3
4
5
6
7
8
9
//juc lock
public interface lock{
void lock();
void lockInterruptibly() throws InterruptedException; //中断锁
boolean tryLock(); //轮询锁
boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException; //定时锁
void unlock();
Condition newCondition();
}

公平性: 在公平锁上, 线程将按照它们发出请求的顺序来获取锁, 非公平则允许插队, 取决于场景中挂起和恢复线程的开销大小, 在激烈竞争的情况下, 则采取非公平锁比较好

实现: CopyOnWriteArrayList

一个资源可以被多个读操作访问, 或者被一个写操作访问, 但两者不能同时进行

内部是非公平锁, 读不加锁, 写加锁

显示锁的底层是 AQS(AbstractQueuedSynchronizer), 它使得一组线程(称之为等待线程集合)能够通过某种方式来等待特定的条件变成真, 并自动唤醒, 与传统队列不同的是它的对象是一个个正在等待的线程

总结: 与内置锁相比, 显示锁提供了一些扩展功能, 在处理锁的不可用性方面有着更高的灵活性

读写锁允许多个读线程并发地访问被保护的对象, 当访问以读取操作为主的数据结构时, 它能够提高程序的可伸缩性

无锁

CAS(Compare-and-Swap): 操作系统级别支持的一种无锁操作, 通过代码中显示的重试机制来实现

1
2
3
4
5
6
7
public synchronized int compareAndSwap(int expectedValue, int newValue){
int oldValue = value; //当前值
if(oldValue == expectedValue){ //当前值 == 期望值 才改变当前值
value = newValue;
}
return oldValue; //返回当前值
}

原子变量类是基于 CAS 实现的, 比锁的粒度更细, 量级更轻, 以下是 i++ 原子性的实现

1
2
3
4
5
int v;
do{
v = value.get();
}while(v != value.compareAndSwap(v, v + 1));
return v + 1;

在高度竞争的情况下, 锁的性能将超过原子变量的性能, 但通常竞争量很小的情况下, 原子变量更优, 这是因为锁在发生竞争时会挂起线程, 降低了CPU使用率, 调用原子变量的类可自行负责竞争管理(重试, 退让)

如需转载,请注明出处