JUC
概述
JUC全名java.util.concurrent
由三部分构成:
java.util.concurrent
java.util.concurrent.atomic
java.util.concurrent.lock
什么是进程,什么是线程
什么是并行/并发
顺序
在高内聚低耦合的前提下,线程操作资源类
实现
这里的Ticket就是资源类,遇见并发,先写出资源类
同时请务必注意并理解代码中的注释
//具体的操作方法在类的内部实现,仅对外暴露方法调用接口(高内聚)
//遇见多并发先写资源类(低耦合)
class Ticket {
private int number = 50;
public synchronized void saleTicket() {
if (number > 0) {
System.out.println(Thread.currentThread().getName());
System.out.println("卖出第" + (number--) + "剩余" + number+"张");
}
}
}
public class SaleTicket {
public static void main(String[] args) {
final Ticket ticket = new Ticket();
//接口也可以new,如匿名内部类
//可以利用代码模板将下面的东西写为模板
new Thread(new Runnable() {
public void run() {
for (int i = 0; i < 30; i++) {
ticket.saleTicket();
}
}
},"thread1").start();
new Thread(new Runnable() {
public void run() {
for (int i = 0; i < 30; i++) {
ticket.saleTicket();
}
}
},"thread2").start();
new Thread(new Runnable() {
public void run() {
for (int i = 0; i < 30; i++) {
ticket.saleTicket();
}
}
},"thread3").start();
}
优化
java.util.concurrent.locks.ReentrantLock可重入锁
- Lock相对于synchronized关键字多了tryLock方法,也即可以尝试获取锁
- 同时,也允许了获取锁时候的中断,可以控制获取锁的时长
- synchronized方法会直接阻塞进程,虽然JVM对synchronized做了锁膨胀优化,但效果还是不如reentrant自选+阻塞形式好,减少了用户态和内核态的切换次数。详见:http://localhost:4000/2020/09/06/shen-ru-fen-xi-synchronized-yuan-li-he-suo-peng-zhang-guo-cheng/
手册上给的示例代码
class X {
private final ReentrantLock lock = new ReentrantLock();
// ...
public void m() {
lock.lock(); // block until condition holds
try {
// ... method body
} finally {
lock.unlock()
}
}
}
使用匿名表达式优化代码
new Thread(() -> {
for (int i = 0; i < 40; i++) ticket.saleTicket();
}, "thread1").start();
Java多线程的状态
通过查看Thread.State状态可以发现进程有如下六种状态
NEW
RUNNABLE
BLOCKED
WAITING:WAITING状态会一直等直到获得使用权
TIMED_WAITING:TIMED_WAITING会等待一定时间,若时间内没有获得资源则停止等待
TERMINATED
多线程的三步交互
判断
操作
通知
由于if的判断后wait会使线程释放原先的资源,而且此时未经再次判断就向下执行造成错误。
public synchronized void increment() throws InterruptedException {
//1.判断
if (number != 0) {
this.wait();
}
//2.操作
number++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
//3.通知
this.notify();
}
解决虚假唤醒问题
多线程判断过程中判断要用while而不是if,请看上方多线程三步交互中的3.通知
wait与sleep的区别
- wait让当前线程等待,并释放所有的锁
- sleep让当前线程进入睡眠状态,保留所有的锁
JUC中的相关操作
- 在JUC中,lock, await, signal 分别对应原先的synchronized, wait, notify
使用新版本的JUC替换传统方法
public synchronized void increment() throws InterruptedException {
lock.lock();
try {
//1.判断
while (number != 0) {
condition.await();
}
//2.操作
number++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
//3.通知
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
标志位的设置
lock与condition配合使用可以实现精准控制与精准通知
//用于标志三个不同的线程
private int number = 1;
Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void print5() {
lock.lock();
try {
//判断
while (number != 1) {
condition1.await();
}
//操作
for (int i = 0; i <= 5; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
//通知
number = 2;
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
8锁现象
public class EightLocks {
public synchronized void sendEmail() {
System.out.println("发邮件");
}
public synchronized void sendSMS() {
System.out.println("发短信");
}
public void hello() {
System.out.println("hello");
}
}
现有两个线程同时执行如下方法 ,求输出的结果
注意理解
- 标准访问 先email后SMS
- 邮件方法sleep四秒 先email后SMS
- 邮件方法sleep四秒,和普通的hello方法 先hello后email
- 邮件方法sleep四秒,两部手机分别email和SMS 先SMS后email
- 邮件方法sleep四秒,两个静态同步方法,同一部手机 先email后SMS
- 邮件方法sleep四秒,两个静态同步方法,两部手机 先email后SMS
- 邮件方法sleep四秒,一个普通方法,一个静态同步方法,一部手机 先SMS后email
- 邮件方法sleep四秒,一个普通方法,一个静态同步方法,两部手机 先SMS后email
List不安全类
不安全示例:
public class UnsafeList {
public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
}, String.valueOf(i)).start();
}
}
}
报错信息
Exception in thread "82" java.util.ConcurrentModificationException
at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1042)
at java.base/java.util.ArrayList$Itr.next(ArrayList.java:996)
at java.base/java.util.AbstractCollection.toString(AbstractCollection.java:472)
at java.base/java.lang.String.valueOf(String.java:2951)
at java.base/java.io.PrintStream.println(PrintStream.java:897)
at xyz.blog.JUC.UnsafeList.lambda$main$0(UnsafeList.java:18)
at java.base/java.lang.Thread.run(Thread.java:834)
解决方式:
List的子类Vector的add方法有synchronized加锁
使用Collections.synchronizedList

此外,Map, Set, HashMap, HashSet也是线程不安全的
CopyOnWriteArrayList
写时复制原理
部分源码
public boolean add(E e) { synchronized(this.lock) { Object[] es = this.getArray(); int len = es.length; es = Arrays.copyOf(es, len + 1); es[len] = e; this.setArray(es); return true; } }private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); this.resetLock(); int len = s.readInt(); SharedSecrets.getJavaObjectInputStreamAccess().checkArray(s, Object[].class, len); Object[] es = new Object[len]; for(int i = 0; i < len; ++i) { es[i] = s.readObject(); } this.setArray(es); }
实现多线程的方法
1.继承Thread类
2.实现Runnable接口
在Java5以后,多了以下两种新的方式
3.实现Callable接口
Callable与Runnable的异同:
Callable实现call方法,Runnable实现run方法
call有异常,run无异常
call有返回值,run无返回值
FutureTask
get方法一般放在最后,因为get在这里会阻塞等待直到线程运行完成
FutureTask当线程相同时仅会被调用一次
FutureTask task = new FutureTask(new Callable() { @Override public Object call() throws Exception { return null; } }); new Thread(task).start(); System.out.println(task.get());
顺序线程执行CountDownLatch
CountDownLatch countDownLatch = new CountDownLatch(6); //进行减一计数直到减为0时,才能执行await()以后的代码 CountDownLatch.countDown(); //当计数不为0时,在此阻塞 CountDownLatch.await();加法计数器CyclicBarrier
与CountDownLatch相似,只是CyclicBarrier当计数加到指定数值时才可以触发,CyclicBarrier的每一个await计数加一
信号量Semaphore,Java的Semaphore原理与Linux相似,具体的实现方法为:acquire()与release()。需要注意的是当资源数为1时,Semaphore与synchronized关键字等价
ReadWriteLock
ReadWriteLock类似于操作系统中的读写者问题,在这里也不再介绍原理而着重于使用
ReadWriteLock.readLock().lock();
ReadWriteLock.readLock().unlock();
ReadWriteLock.writeLock().lock();
ReadWriteLock.writeLock().unlock();
线程池
ThreadPoolExecutor
ExecutorService threadPool1 = Executors.newFixedThreadPool(5);
ExecutorService threadPool2 = Executors.newSingleThreadExecutor();
ExecutorService threadPool3 = Executors.newCachedThreadPool();
threadPool1.execute(Runnable);
threadPool1.shutdown();
上述三个API对应的源码
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
}
//newCachedThreadPool中的最大数目过大,可能造成OOM异常
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}
请注意源码中的LinkedBlockingQueue与ThreadPoolExecutor
实际开发中,并不会使用以上的三个API
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
参数理解
corePoolSize:线程池中的常驻核心线程数
maximumPoolSize:线程池中的最大线程数
//计算密集型,默认为CPU核数+1(Runtime.getRunTime().availableProcessors())
//IO密集型,默认CPU核数*2
keepAliveTime:线程空闲时间大于keepAliveTime会被销毁
unit:keepAliveTime的单位
workQueue:任务队列,提交但尚未被执行的任务
threadFactory:线程工厂,用于创建线程,一般取默认
handler:拒绝策略,如何拒绝请求执行的runnable策略
线程池拒绝策略
AbortPolicy:默认方法,直接抛出RejectedExecutionException阻止系统继续运行
CallerRunsPolicy:不会抛任务也不会抛异常,而是将某些任务回退到调用者,从而降低新任务的流量
DisPolicy:直接抛弃多出的任务
DisOldestPolicy:直接抛弃等待时间最久的任务
分支合并框架ForkJoin
实现
class MyTask extends RecursiveTask<Integer> {
private static final Integer ADJUST_VALUE = 10;
private int begin;
private int end;
private int result;
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
protected Integer compute() {
if ((end - begin) <= ADJUST_VALUE) {
for (int i = begin; i <= end; i++) {
result = result + i;
}
} else {
int middle = (end + begin) / 2;
MyTask task01 = new MyTask(begin, middle);
MyTask task02 = new MyTask(middle + 1, end);
task01.fork();
task02.fork();
result = task01.join() + task02.join();
}
return result;
}
}
public class ForkJoinDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyTask task = new MyTask(1, 100);
ForkJoinPool threadPool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = threadPool.submit(task);
System.out.println(forkJoinTask.get());
threadPool.shutdown();
}
}
异步回调
琐碎的问题
redis:6379
MySQL:3306匿名表达式:
仅有一个方法的接口称为函数式接口,可以直接利用() -> {}完成匿名表达式,可以在接口上使用@FunctionalInterface注解
Foo foo = (x, y) -> { return x + y; };Java8中允许接口中添加方法的实现(default)
Java中的函数式接口允许有多个默认方法,但仅允许有一个未实现方法
wait()是Object类的方法
TimeUnit.second.sleep提供了新的sleep方法
list.foreach(System.out::println)方法引用
常用随机数生成UUID.randomUUID().toString(),System.currentTimeMillis()
HashSet的底层是一个HashMap, 源码如下
public HashSet() { this.map = new HashMap(); } //默认大小为16,负载因子为0.75 //Map大小达到16*0.75后会自行扩容到2的整数幂 //与之对应,ArrayList扩容到原来的1.5倍 this.map = new HashMap(Math.max((int)((float)c.size() / 0.75F) + 1, 16)); private static final Object PRESENT = new Object(); public boolean add(E e) { return this.map.put(e, PRESENT) == null; }HashMap中为数组+链表+红黑树
HashMap源码
transient HashMap.Node<K, V>[] table; transient Set<Entry<K, V>> entrySet;HashMap.Node源码,同样的hash值对应的节点尾插到一个队列中
HashMap.Node<K, V> next;当同一个链表的数据数目超过8以后,链表会转变为一棵红黑树
没有CopyOnWriteHashMap,但有ConcurrentHashMap
设计模式五大原则(SOLID):
- 单一功能原则(Single responsibility principle)
- 开闭原则(Open close principal)
- 里氏替换原则(Liskov Substitution principle)
- 接口隔离原则(interface-segregation principles)
- 依赖反转原则(Dependency inversion principle,DIP)
Java范型代表的意义
E:Element
T:Type
K:Key
V:Value
N:Number
11.BlockingQueue中的offer方法和poll方法在功能方面大致与add和remove方法相同,但是,前者在插入/删除失败时返回false/null,后者会抛出异常。put/take如果队列已满/空,则会阻塞直到有空位置。