JUC


JUC

概述

  1. JUC全名java.util.concurrent

  2. 由三部分构成:

    • java.util.concurrent

    • java.util.concurrent.atomic

    • java.util.concurrent.lock

  3. 什么是进程,什么是线程

  4. 什么是并行/并发

顺序

在高内聚低耦合的前提下,线程操作资源类

实现

这里的Ticket就是资源类,遇见并发,先写出资源类

同时请务必注意并理解代码中的注释

//具体的操作方法在类的内部实现,仅对外暴露方法调用接口(高内聚)
//遇见多并发先写资源类(低耦合)
class Ticket {
    private int number = 50;

    public synchronized void saleTicket() {
        if (number > 0) {
            System.out.println(Thread.currentThread().getName());
            System.out.println("卖出第" + (number--) + "剩余" + number+"张");
        }
    }
}

public class SaleTicket {
    public static void main(String[] args) {
        final Ticket ticket = new Ticket();

        //接口也可以new,如匿名内部类
        //可以利用代码模板将下面的东西写为模板
        new Thread(new Runnable() {
            public void run() {
                for (int i = 0; i < 30; i++) {
                    ticket.saleTicket();
                }
            }
        },"thread1").start();

        new Thread(new Runnable() {
            public void run() {
                for (int i = 0; i < 30; i++) {
                    ticket.saleTicket();
                }
            }
        },"thread2").start();

        new Thread(new Runnable() {
            public void run() {
                for (int i = 0; i < 30; i++) {
                    ticket.saleTicket();
                }
            }
        },"thread3").start();
    }

优化

java.util.concurrent.locks.ReentrantLock可重入锁

手册上给的示例代码

 class X {
   private final ReentrantLock lock = new ReentrantLock();
   // ...

   public void m() {
     lock.lock();  // block until condition holds
     try {
       // ... method body
     } finally {
       lock.unlock()
     }
   }
 }

使用匿名表达式优化代码

new Thread(() -> {
    for (int i = 0; i < 40; i++) ticket.saleTicket();
}, "thread1").start();

Java多线程的状态

通过查看Thread.State状态可以发现进程有如下六种状态

  1. NEW

  2. RUNNABLE

  3. BLOCKED

  4. WAITING:WAITING状态会一直等直到获得使用权

  5. TIMED_WAITING:TIMED_WAITING会等待一定时间,若时间内没有获得资源则停止等待

  6. TERMINATED

多线程的三步交互

  1. 判断

  2. 操作

  3. 通知

    由于if的判断后wait会使线程释放原先的资源,而且此时未经再次判断就向下执行造成错误。

   public synchronized void increment() throws InterruptedException {
       //1.判断
       if (number != 0) {
           this.wait();
       }
       //2.操作
       number++;
       System.out.println(Thread.currentThread().getName() + "\t" + number);
       //3.通知
       this.notify();
   }

解决虚假唤醒问题

多线程判断过程中判断要用while而不是if,请看上方多线程三步交互中的3.通知

wait与sleep的区别

  • wait让当前线程等待,并释放所有的锁
  • sleep让当前线程进入睡眠状态,保留所有的锁

JUC中的相关操作

  • 在JUC中,lock, await, signal 分别对应原先的synchronized, wait, notify

使用新版本的JUC替换传统方法

public synchronized void increment() throws InterruptedException {
    lock.lock();
    try {
        //1.判断
        while (number != 0) {
            condition.await();
        }
        //2.操作
        number++;
        System.out.println(Thread.currentThread().getName() + "\t" + number);
        //3.通知
        condition.signalAll();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlock();
    }
}

标志位的设置

lock与condition配合使用可以实现精准控制与精准通知

//用于标志三个不同的线程
private int number = 1;
Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();

public void print5() {
    lock.lock();

    try {
        //判断
        while (number != 1) {
            condition1.await();
        }
        //操作
        for (int i = 0; i <= 5; i++) {
            System.out.println(Thread.currentThread().getName() + "\t" + i);
        }
        //通知
        number = 2;
        condition2.signal();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        lock.unlock();
    }
}

8锁现象

public class EightLocks {
    public synchronized void sendEmail() {
        System.out.println("发邮件");
    }

    public synchronized void sendSMS() {
        System.out.println("发短信");
    }

    public void hello() {
        System.out.println("hello");
    }
}

现有两个线程同时执行如下方法 ,求输出的结果

注意理解

  1. 标准访问 先email后SMS
  2. 邮件方法sleep四秒 先email后SMS
  3. 邮件方法sleep四秒,和普通的hello方法 先hello后email
  4. 邮件方法sleep四秒,两部手机分别email和SMS 先SMS后email
  5. 邮件方法sleep四秒,两个静态同步方法,同一部手机 先email后SMS
  6. 邮件方法sleep四秒,两个静态同步方法,两部手机 先email后SMS
  7. 邮件方法sleep四秒,一个普通方法,一个静态同步方法,一部手机 先SMS后email
  8. 邮件方法sleep四秒,一个普通方法,一个静态同步方法,两部手机 先SMS后email

List不安全类

不安全示例:

public class UnsafeList {
    public static void main(String[] args) {
        ArrayList<String> list = new ArrayList<>();

        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                list.add(UUID.randomUUID().toString().substring(0, 8));
                System.out.println(list);
            }, String.valueOf(i)).start();
        }
    }
}

报错信息

Exception in thread "82" java.util.ConcurrentModificationException
    at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1042)
    at java.base/java.util.ArrayList$Itr.next(ArrayList.java:996)
    at java.base/java.util.AbstractCollection.toString(AbstractCollection.java:472)
    at java.base/java.lang.String.valueOf(String.java:2951)
    at java.base/java.io.PrintStream.println(PrintStream.java:897)
    at xyz.blog.JUC.UnsafeList.lambda$main$0(UnsafeList.java:18)
    at java.base/java.lang.Thread.run(Thread.java:834)

解决方式:

  1. List的子类Vector的add方法有synchronized加锁

  2. 使用Collections.synchronizedList

此外,Map, Set, HashMap, HashSet也是线程不安全的

CopyOnWriteArrayList

  • 写时复制原理

    部分源码

    public boolean add(E e) {
        synchronized(this.lock) {
            Object[] es = this.getArray();
            int len = es.length;
            es = Arrays.copyOf(es, len + 1);
            es[len] = e;
            this.setArray(es);
            return true;
        }
    }
    private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
        s.defaultReadObject();
        this.resetLock();
        int len = s.readInt();
        SharedSecrets.getJavaObjectInputStreamAccess().checkArray(s, Object[].class, len);
        Object[] es = new Object[len];
    
        for(int i = 0; i < len; ++i) {
            es[i] = s.readObject();
        }
    
        this.setArray(es);
    }

实现多线程的方法

1.继承Thread类

2.实现Runnable接口

在Java5以后,多了以下两种新的方式

3.实现Callable接口

Callable与Runnable的异同:

  • Callable实现call方法,Runnable实现run方法

  • call有异常,run无异常

  • call有返回值,run无返回值

  • FutureTask

    get方法一般放在最后,因为get在这里会阻塞等待直到线程运行完成

    FutureTask当线程相同时仅会被调用一次

    FutureTask task = new FutureTask(new Callable() {
        @Override
        public Object call() throws Exception {
             return null;
        }
      });
    new Thread(task).start();
    System.out.println(task.get());
  • 顺序线程执行CountDownLatch

    CountDownLatch countDownLatch = new CountDownLatch(6);
    
    //进行减一计数直到减为0时,才能执行await()以后的代码
    CountDownLatch.countDown();
    
    //当计数不为0时,在此阻塞
    CountDownLatch.await();
  • 加法计数器CyclicBarrier

    与CountDownLatch相似,只是CyclicBarrier当计数加到指定数值时才可以触发,CyclicBarrier的每一个await计数加一

  • 信号量Semaphore,Java的Semaphore原理与Linux相似,具体的实现方法为:acquire()与release()。需要注意的是当资源数为1时,Semaphore与synchronized关键字等价

ReadWriteLock

ReadWriteLock类似于操作系统中的读写者问题,在这里也不再介绍原理而着重于使用

ReadWriteLock.readLock().lock();

ReadWriteLock.readLock().unlock();

ReadWriteLock.writeLock().lock();

ReadWriteLock.writeLock().unlock();

线程池

ThreadPoolExecutor

ExecutorService threadPool1 = Executors.newFixedThreadPool(5);
ExecutorService threadPool2 = Executors.newSingleThreadExecutor();
ExecutorService threadPool3 = Executors.newCachedThreadPool();

threadPool1.execute(Runnable);

threadPool1.shutdown();

上述三个API对应的源码

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}

public static ExecutorService newSingleThreadExecutor() {
    return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
}

//newCachedThreadPool中的最大数目过大,可能造成OOM异常
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}

请注意源码中的LinkedBlockingQueue与ThreadPoolExecutor

实际开发中,并不会使用以上的三个API

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

参数理解

  1. corePoolSize:线程池中的常驻核心线程数

  2. maximumPoolSize:线程池中的最大线程数

    //计算密集型,默认为CPU核数+1(Runtime.getRunTime().availableProcessors())

    //IO密集型,默认CPU核数*2

  3. keepAliveTime:线程空闲时间大于keepAliveTime会被销毁

  4. unit:keepAliveTime的单位

  5. workQueue:任务队列,提交但尚未被执行的任务

  6. threadFactory:线程工厂,用于创建线程,一般取默认

  7. handler:拒绝策略,如何拒绝请求执行的runnable策略

线程池拒绝策略

  1. AbortPolicy:默认方法,直接抛出RejectedExecutionException阻止系统继续运行

  2. CallerRunsPolicy:不会抛任务也不会抛异常,而是将某些任务回退到调用者,从而降低新任务的流量

  3. DisPolicy:直接抛弃多出的任务

  4. DisOldestPolicy:直接抛弃等待时间最久的任务

分支合并框架ForkJoin

实现

class MyTask extends RecursiveTask<Integer> {

    private static final Integer ADJUST_VALUE = 10;

    private int begin;
    private int end;
    private int result;

    public MyTask(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }

    protected Integer compute() {

        if ((end - begin) <= ADJUST_VALUE) {
            for (int i = begin; i <= end; i++) {
                result = result + i;
            }
        } else {
            int middle = (end + begin) / 2;
            MyTask task01 = new MyTask(begin, middle);
            MyTask task02 = new MyTask(middle + 1, end);
            task01.fork();
            task02.fork();
            result = task01.join() + task02.join();
        }
        return result;
    }
}

public class ForkJoinDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        MyTask task = new MyTask(1, 100);

        ForkJoinPool threadPool = new ForkJoinPool();

        ForkJoinTask<Integer> forkJoinTask = threadPool.submit(task);

        System.out.println(forkJoinTask.get());

        threadPool.shutdown();
    }
}

异步回调

琐碎的问题

  1. redis:6379
    MySQL:3306

  2. 匿名表达式:

    仅有一个方法的接口称为函数式接口,可以直接利用() -> {}完成匿名表达式,可以在接口上使用@FunctionalInterface注解

    Foo foo = (x, y) -> {
        return x + y;
    };
  3. Java8中允许接口中添加方法的实现(default)

    Java中的函数式接口允许有多个默认方法,但仅允许有一个未实现方法

  4. wait()是Object类的方法

  5. TimeUnit.second.sleep提供了新的sleep方法

  6. list.foreach(System.out::println)方法引用

  7. 常用随机数生成UUID.randomUUID().toString(),System.currentTimeMillis()

  8. HashSet的底层是一个HashMap, 源码如下

    public HashSet() {
        this.map = new HashMap();
    }
    
    //默认大小为16,负载因子为0.75
    //Map大小达到16*0.75后会自行扩容到2的整数幂
    //与之对应,ArrayList扩容到原来的1.5倍
    this.map = new HashMap(Math.max((int)((float)c.size() / 0.75F) + 1, 16));
    
    private static final Object PRESENT = new Object();
    public boolean add(E e) {
        return this.map.put(e, PRESENT) == null;
    }

    HashMap中为数组+链表+红黑树

    HashMap源码

    transient HashMap.Node<K, V>[] table;
    transient Set<Entry<K, V>> entrySet;

    HashMap.Node源码,同样的hash值对应的节点尾插到一个队列中

    HashMap.Node<K, V> next;

    当同一个链表的数据数目超过8以后,链表会转变为一棵红黑树

    没有CopyOnWriteHashMap,但有ConcurrentHashMap

  9. 设计模式五大原则(SOLID):

    • 单一功能原则(Single responsibility principle)
    • 开闭原则(Open close principal)
    • 里氏替换原则(Liskov Substitution principle)
    • 接口隔离原则(interface-segregation principles)
    • 依赖反转原则(Dependency inversion principle,DIP)
  10. Java范型代表的意义

    • E:Element

    • T:Type

    • K:Key

    • V:Value

    • N:Number

11.BlockingQueue中的offer方法和poll方法在功能方面大致与add和remove方法相同,但是,前者在插入/删除失败时返回false/null,后者会抛出异常。put/take如果队列已满/空,则会阻塞直到有空位置。


文章作者: Dale Mo
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Dale Mo !
 上一篇
深入分析synchronized原理和锁膨胀过程 深入分析synchronized原理和锁膨胀过程
深入分析synchronized原理和锁膨胀过程 synchronized实现原理synchronized是依赖于JVM来实现同步的,在同步方法和代码块的原理有点区别。 同步代码块我们在代码块加上synchronized关键字 public
2020-09-06
下一篇 
解决idea无法读取properties文件的解决方案 解决idea无法读取properties文件的解决方案
解决idea无法读取properties文件的解决方案最近在学JDBC连接的时候遇到的idea无法读取properties文件的问题,一直在报无法读取文件的错误,后经查询发现,idea与eclipse不同,idea有着 Content Ro
2020-09-04
  目录