catalog

第一章-JUC概述

1juc、线程介绍

尚硅谷视频:https://www.bilibili.com/video/BV1vE411D7KE?p=40

java8API:https://www.matools.com/api/java8

狂神笔记:https://gitee.com/kuangstudy/openclass/raw/master/%E7%8B%82%E7%A5%9E%E8%AF%B4JUC%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/%E7%8B%82%E7%A5%9E%E8%AF%B4JUC.pdf

java.util.current包下的类

线程状态 Thread.State是个enum枚举类,可以查看状态。

new新生、runnable就绪/运行、blocked阻塞、waiting等待、time_watting超时等待、terminated结束

  1. 初始(NEW):新创建了一个线程对象,但还没有调用start()方法。
  2. 运行(RUNNABLE):Java线程中将就绪(ready)和运行中(running)两种状态笼统的称为“运行”。 线程对象创建后,其他线程(比如main线程)调用了该对象的start()方法。该状态的线程位于可运行线程池中,等待被线程调度选中,获取CPU的使用权,此时处于就绪状态(ready)。就绪状态的线程在获得CPU时间片后变为运行中状态(running)。
  3. 阻塞(BLOCKED):表示线程阻塞于锁。 等待进入synchronized方法,代码块
  4. 等待(WAITING):进入该状态的线程需要等待其他线程做出一些特定动作(通知或中断)。 wait(),Thread.join()
  5. 超时等待(TIMED_WAITING):该状态不同于WAITING,它可以在指定的时间后自行返回。wait(Long),Thread.join(Long),wait(Long)
  6. 终止(TERMINATED):表示该线程已经执行完毕。

==sleep()和wait()区别,sleep()是抱着锁睡觉,wait()是释放资源等待。== ==wait()和notify()、notifyAll()配套使用。只能在synchronized代码块下使用。== java1.8的notify()是随机唤醒,但随机唤醒的是等待时间最长的。

线程 操作 资源

判断 干活 通知

多线程交互,防止多线程虚假唤醒。 (wait notify就是交互,判断用while)

2Lock

Lock lock=new ReentrantLock();可重入锁

Condition condition=lock.newCondition();

image-20200617154016435

synchronized换为了lock() wait()换成condition.await() notifyAll()换成condition.signalAll()

lock可以实现精确唤醒。

1-卖票

1常见卖票:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.sgg.saleticket;

public class SaleTicket {

    int ticket=40;

    public synchronized  void saleTicket() {
        if(ticket>0){
            System.out.println(Thread.currentThread().getName()+"卖出第"+(ticket--)+" 剩下:"+ticket);
        }
    }
    
}
class h1{
    public static void main(String[] args) {
        SaleTicket ticket=new SaleTicket();

        //线程操作资源类
        new Thread(() -> {
            for (int i = 0; i <40 ; i++) {
                ticket.saleTicket();
            }
        },"A").start();
        new Thread(() -> {
            for (int i = 0; i <40 ; i++) {
                ticket.saleTicket();
            }
        },"B").start();
        new Thread(() -> {
            for (int i = 0; i <40 ; i++) {
                ticket.saleTicket();
            }
        },"C").start();

    }//main
}

存在问题,在这里锁的是整个方法,但有可能,只需要这个方法的部门代码加锁,而整个方法加锁会影响其它线程执行,影响代码效率。

使用reentrant

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
lock.lock();

try {
  if(ticket>0){
    System.out.println(Thread.currentThread().getName()+"卖出第"+(ticket--)+" 剩下:"+ticket);
  }
} catch (Exception e) {
  e.printStackTrace();
} finally {
  lock.unlock();
}

2- 生产消费

productive、consumption

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package com.sgg.productive_consumption;

import static java.lang.Thread.sleep;

public class Productive_Consumption {
    public static void main(String[] args) {
        conditioner con=new conditioner();
        //生产消费,两个线程,可操作初始值为0变量
        //一个+1一个-1,10轮后,为0
        //线程 操作 资源
        //判断 干活 通知
        //多线程交互,防止多线程虚假唤醒。  (wait notify就是交互,判断用while)
        new Thread(()->{
            for (int i = 0; i <10 ; i++) {
                try {
                    sleep(40);
                    con.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A1 productive").start();

        new Thread(()->{
            for (int i = 0; i <10 ; i++) {
                try {
                    sleep(70);
                    con.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B1 consumption").start();

        new Thread(()->{
            for (int i = 0; i <10 ; i++) {
                try {
                    sleep(38);
                    con.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A2 productive").start();

        new Thread(()->{
            for (int i = 0; i <20 ; i++) {
                try {
                    sleep(120);
                    con.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"B2 consumption").start();

    }
}

 class conditioner{
 private int number=0;

public synchronized void increment() throws InterruptedException {
    while (number != 0) {
        wait();
    }
        number++;
    System.out.println(Thread.currentThread().getName()+" \t "+number);
    notifyAll();
}

public synchronized void decrement() throws InterruptedException {
    while (number == 0) {
        wait();
    }
        number--;
    System.out.println(Thread.currentThread().getName()+" \t "+number);
    notifyAll();
}

}//conditioner

生产消费新版本

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.sgg.productive_consumption;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static java.lang.Thread.sleep;

public class Productive_Consumption {
    public static void main(String[] args) {
        //productive consumption 生产消费
        //线程操作资源类
        //判断干活通知
        //线程交互用while
        conditioner conditioner=new conditioner();
        Runnable increment=() ->{
               for (int i = 0; i < 20; i++) {
                   conditioner.increment();
                }
        };
        Runnable decrement=() ->{
                for (int i = 0; i < 20; i++) {
                    conditioner.decrement();
                }
        };

        Thread incrementT=new Thread(increment,"add A");
        Thread decrementT=new Thread(decrement,"add B");
        incrementT.start();
        decrementT.start();
    }
}

 class conditioner{
    int number=0;
    Lock lock=new ReentrantLock();
Condition condition=lock.newCondition();

    public   void increment()  {
        lock.lock();
        try {
            while (number!=0){
                condition.await();
            }
            number++;
            System.out.println(Thread.currentThread().getName()+" "+number);
            condition.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
     public  void decrement()   {
        lock.lock();
         try {
             while (number==0){
                 condition.await();
             }
             number--;
             System.out.println(Thread.currentThread().getName()+" "+number);
             condition.signalAll();
         } catch (InterruptedException e) {
             e.printStackTrace();
         } finally {
         lock.unlock();
         }
     }

}//conditioner

3-精确唤醒

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package com.sgg.print;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class PrintABC {
    public static void main(String[] args) {
        Repository repository=new Repository();
        //循环打印abc
        new Thread(()->{
            for (int i = 0; i <15 ; i++) {
               // System.out.println("线程A开启,执行"+i);
                repository.print();
            }
        },"A线程").start();
        new Thread(()->{
            for (int i = 0; i <15 ; i++) {
              //  System.out.println("线程B开启,执行"+i);
                repository.print();
           }
        },"B线程").start();
        new Thread(()->{
            for (int i = 0; i <15 ; i++) {
               // System.out.println("线程C开启,执行"+i);
                repository.print();
            }
        },"C线程").start();
    }
}//class Print
    class Repository{
        private int number=1;//1A 2B 3C
        private Lock lock=new ReentrantLock();
        private Condition condition1=lock.newCondition();
        private Condition condition2=lock.newCondition();
        private Condition condition3=lock.newCondition();

        public void print5(){
            lock.lock();
            try{

                while (number!=1){
                    condition1.await();
                }
                for (int i = 0; i <5 ; i++) {
                    System.out.println(Thread.currentThread().getName() +i +"  A");
                }
                number=2;
                condition2.signal();
            }catch(Exception e){
             e.printStackTrace();
            }finally {
                lock.unlock();
            }
        }

        public void print10(){
            lock.lock();
            try{

                while (number!=2){
                    condition2.await();
                }
                for (int i = 0; i <5 ; i++) {
                    System.out.println(Thread.currentThread().getName()+i+"  B");
                }
                number=3;
                condition3.signal();
            }catch(Exception e){
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        }

        public void print15(){
            lock.lock();
            try{
                while (number!=3){
                    condition3.await();
                }
                for (int i = 0; i <5 ; i++) {
                    System.out.println(Thread.currentThread().getName()+i+"   C");
                }
                number=1;
                condition1.signal();
            }catch(Exception e){
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        }

        public void print(){
            lock.lock();
            try{
                if(number==1){
                    System.out.println("A");
                    number=2;
                    condition2.signal();
                    condition1.await();
                }else if (number==2){
                    System.out.println("B");
                    number=3;
                    condition3.signal();
                    condition2.await();
                }else if(number==3){
                    number=1;
                    System.out.println("C");
                    condition1.signal();
                    condition3.await();
                }

            }catch(Exception e){
             e.printStackTrace();
            }finally{
                lock.unlock();
            }
        }

    }//repository

3线程8锁

1标准访问,请问先打印邮件还是短信? 2邮件方法暂停4秒钟,请问先打印邮件还是短信? 3新增一个普通方法hello(),请问先打印邮件还是hello? 4两部手机,请问先打印邮件还是短信? 5两个静态同步方法,同- -部手机,请问先打印邮件还是短信? 6两个静态同步方法,2部手机,请问先打印邮件还是短信? 7 1个普通同步方法,1个静态同步方法,1部手机,请问先打印邮件还是短信? 8 1个普通同步方法, 1个静态同步方法,2部手机,请问先打印邮件还是短信?

若synchronized修饰普通方法上,那么锁的就是整个对象,同一时间,只有一个线程能获得整个对象的锁,然后去执行里面的方法。(就是粒度很大)

4list安全性

main方法运行这样一段代码,当I为3时,线程数据虽然有误,但是不会报错。

当I为30,会出现==java.util.ConcurrentModificationException==

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public static void main(String[] args) {
    //list不安全测试
    ArrayList<String> list=new ArrayList<String >();

    for (int i = 0; i <30 ; i++) {
        new Thread(() -> {
            list.add(UUID.randomUUID().toString().substring(0,8));
            System.out.println(list);
        },String.valueOf(i)).start();
    }
}

2导致原因:

3解决方案:

  • 1使用Vector集合
  • 2使用Collections.synchronizedList(new ArrayList<>());
  • ==3 new CopyOnWriteArrayList<>();==

4优化建议:image-20200618232345335

5Set安全性

HashSet的底层数据结构就是HashMap()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
 public HashSet() {
        map = new HashMap<>();
}
//当Set调用add()方法时
public boolean add(E e) {
        return map.put(e, PRESENT)==null;
}
PRESENT是一个静态常量
// Dummy value to associate with an Object in the backing Map
private static final Object PRESENT = new Object();

JUC下的HashSet的集合类为==CopyOnWriteArraySet();==

CopyOnWriteArraySet的底层为CopyOnWriteArrayList() 但CopyOnWriteArraySet是无序不重复集合,所以其每次add()前都会遍历数组,当集合内不存在时才添加。

6Map安全性

juc下的包为ConcurrentHashMap

7Callable

Callable是第三种创建线程方式:会抛出异常,有返回值,要重写的方法名,一个是run,一个是call

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class claaable1 implements Callable<Integer>{

    @Override
    public Integer call() throws Exception {
        System.out.println("hello Callable Thread");
        return null;
    }
}

//Callable线程的使用
//线程的启动是用new Thread(Runnable run).start();
//也就是说必须是Runnable的子类,Callable却不是
//所以需要一个类连接callable和runnable  --> FutureTask()
//FutureTask实现了Runnable接口,其构造函数要传入Callable接口,这样就实现了连接。
public static void main(String[] arg){
   FutureTask future=new FutureTask(new classble1());
        new Thread(future).start();
  //若手动调用get()那么主线程会被阻塞。
        System.out.println(future.get());
}

注意:

  1. 若使用一个FutureTask去启用两个线程,那么只会去计算一次。(因为上次的执行结果被存储了)
  2. 手动调用get()会立马取得结果,程序也将阻塞于此。

Volatile变量

Volatile是Java虚拟机提供的轻量级的同步机制

保证可见、不保证原子性、禁止指令重排

1变量可见性

在多线程并发执行下,多个线程修改共享的成员变量,会出现一个线程修改了共享变量的值后,另一个线程不能直接看到该线程修改后的变星的最新值。

1变量的不可见性案例: 主线程:一直判断flag是否为true,若为true则打印语句。 某线程:启动后修改flag为true。

由于测试中某线程总是会抢到cpu,所以模仿业务,某线程等一下才会修改为true。进行sleep一会,但我们发现,flag为true后,主线程不能及时发现flag修改了。==主线程此时的flag还是为false==

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.volatile1;

import lombok.SneakyThrows;

public class visibilityChar {

    public static void main(String[] args) {
        //volatile关键字测试

        MyThread thread=new MyThread();
        thread.start();

        while (true){
            if(thread.flag){
                System.out.println("主线程循环执行");
            }
        }

    }

}
class MyThread extends Thread{
    //成员变量
     boolean flag=false;

    @SneakyThrows
    @Override
    public void run(){
        sleep(500);
        flag=true;
        System.out.println("flag="+flag);
    }
}

image-20200618220446741

多线程并发修改变量不可见的原因,JMM(java memory model)java内存模型,是java虚拟机中规范定义的内存模型,其是标准化的,屏蔽了不同计算机的区别。

JMM的规定

  • Java内存模型规定所有的变量都是存在主存当中。(不包含局部变量,局部变量是线程私有)
  • 每个线程都有自己的工作内存。线程对变量的所有操作都是其复制主存的工作副本,而不是直接对主存进行操作。
  • 并且每个线程不能访问其他线程的工作内存,线程变量的值传递需要通过主存传递,变量的值何时从线程的工作内存写回主存,无法确定。

解决方案一 对主线程的if语句做一个synchronized包裹。

1
2
3
4
5
6
a线程获得锁
b清空工作内存
c从主存中拷贝共享变量最新值到工作内存成为副本
d执行代码
e把修改后的副本值刷新回主内存
f线程释放锁

解决方案2 为flag加volatile关键字修饰 原理:当线程修改volatile关键字修饰的变量时,该线程会立刻通知主存(堆)更新这个变量的值,然后主存会立刻通知其它线程重新读取。

2原子性

volatile无法保证原子性

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.volatile1;

import lombok.SneakyThrows;

public class visibilityChar {

    public static void main(String[] args) {
        //volatile关键字测试
        MyThread thread = new MyThread();
        //开启100个线程对count进行+到100000操作
        for (int i = 0; i <100 ; i++) {
            new Thread(thread,String.valueOf(i)).start();
        }


    }

}
class MyThread implements Runnable{
    //成员变量
    volatile int count;

    @SneakyThrows
    @Override
    public void run(){

        for (int i = 0; i <10000 ; i++) {
            count++;
            System.out.println("线程"+Thread.currentThread().getName()+"  ===>"+count);
        }
    }
}
//结果有时为1000000  有时为999995

image-20200618230615225

可以使用JDK1.5的juc的atomic包下的AtomicInteger类,这个类可以实现原子性操作。

第二章-JUC进阶

1线程辅助类

1-CountDownLatch

  • CountDownLatch/lætʃ/ 占有,抓住;闭锁主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。
  • 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞),
  • 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CountDownLatch latch=new CountDownLatch(6);
//辅助类
for (int i = 1; i <=6 ; i++) {
    new Thread(()->{
        System.out.println(Thread.currentThread().getName()+"进攻");
        latch.countDown();//计数器 减一
    }).start();
}
latch.await();//等待,必须等latch.countDown()计数变为0才能通过。
System.out.println("司令出击了-----");

2-CyclicBarrier

  • CyclicBarrier /ˈsaɪklɪk,ˈsɪklɪk/环的;循环的;周期的 /ˈbæriər/障碍物,屏障;界线
  • 的字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,
  • 直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
  • 线程进入屏障通过CyclicBarrier的await()方法。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
private static void cyclicBarrier() {
    CyclicBarrier cyclicBarrier=new CyclicBarrier(7);
    for (int i = 1; i <=7 ; i++) {
        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+"准备");
            try {
                cyclicBarrier.await();//所有屏障到达干活
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"一起进攻");
        }).start();
    }
}

3-semaphore

semaphore /ˈseməfɔːr/ 信号标,旗语;臂板信号装置

在信号量上我们定义两种操作:

  • acquire(获取) 当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),要么一直等下去,直到有线程释放信号量,或超时。

  • release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。

  • 信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
private static void semaphore() {
    Semaphore semaphore=new Semaphore(3);
    for (int i = 0; i <5 ; i++) {
        new Thread(() -> {
            try {
                //程序运行到这里 Semaphore会减一
                semaphore.acquire(); //  /əˈkwaɪər/ 获得;取得;学到;捕获
                System.out.println(Thread.currentThread().getName()+"线程抢到了车位");
                TimeUnit.MILLISECONDS.sleep(3000);
                System.out.println(Thread.currentThread().getName()+"离开了车位");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                semaphore.release();
            }
        }).start();
    }
}

应用:可以做抢购等、

2读写锁

读锁与读锁不互斥,读锁与写锁互斥,写锁与写锁互斥。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
ReadWriteLock readWriteLock=new ReentrantReadWriteLock();

//读锁
 readWriteLock.readLock().lock();
cache.get(k+"");
readWriteLock.readLock().unlock();

//写锁
  readWriteLock.writeLock().lock();
cache.put(k+"",k+"");
readWriteLock.writeLock().unlock();

案例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.juc;

import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockTest {
    //读写锁
    public static void main(String[] args) {
        //进一步细化 读写分离
        ReadWriteLock readWriteLock=new ReentrantReadWriteLock();

        MyCache cache=new MyCache();
        for (int i = 0; i <50 ; i++) {
            final int k=i;
            new Thread(()->{
                readWriteLock.writeLock().lock();
                cache.put(k+"",k+"");
                readWriteLock.writeLock().unlock();
            },String.valueOf(i)).start();
        }

        for (int i = 0; i <50 ; i++) {
            final int k=i;
            new Thread(()->{
                readWriteLock.readLock().lock();
                cache.get(k+"");
                readWriteLock.readLock().unlock();
            },String.valueOf(i)).start();
        }

    }
}
class MyCache{
    private  HashMap<String,Object> map=new HashMap<>();
    public void put(String key,Object v){
        System.out.println(Thread.currentThread().getName()+"------开始写"+" "+key);
        try {
            TimeUnit.MILLISECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        map.put(key,v);
        System.out.println(Thread.currentThread().getName()+"------结束写");
    }

    public void get(String key){

        System.out.println(Thread.currentThread().getName()+"//////开始读//////");
        Object o = map.get(key);
        System.out.println(Thread.currentThread().getName()+"//////结束读//////"+" "+o);
    }

}

3BlockingQueue

阻塞队列。

当队列是空的,从队列中获取元素的操作将会被阻塞。 当队列是满的,从队列中添加元素的操作将会被阻塞。

在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一 旦条件满足,被挂起的线程又会自动被唤起

image-20200628160841723

ArrayBlockingQueue只是其中一个实现类。

==ArrayBlockingQueue:由数组结构组成的有界阻塞队列。== ==LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列。== ==SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列。==

PriorityBlockingQueue:支持优先级排序的无界阻塞队列。 DelayQueue:使用优先级队列实现的延迟无界阻塞队列。 LinkedTransferQueue:由链表组成的无界阻塞队列。 LinkedBlockingDeque:由链表组成的双向阻塞队列。

阻塞队列的方法

image-20200628162118286

add()方法成功返回true,队列满了会抛异常

element检查是否有元素。会返回队列第一个元素,若不存在则抛出异常。


第二类特殊Boolean值的

offer()会返回true/false poll()移除成功返回队列里的值,移除失败返回null peek()返回队列首的元素,成功返回值,失败返回null


第三类阻塞的

put()会一直等到队列有空位,然后进行添加。 take()会一致等到队列有元素,然后进行移除。 否则会一直等待


第四类是超时等待

4悲观/乐观锁

什么是悲观锁和乐观锁

悲观锁

修改时,先锁住资源再进行修改,其他人无法访问。

java中如:synchronized、Lock就是悲观锁的实现

举例说明:数据库层面

  • 悲观锁 select * for update;
  • 乐观锁 添加一个字段version; 修改时 update xcc where version=1 若被别人改过,version不一致。则这个语句就是无效的 若没被别人改。修改时顺便version+1。

乐观锁

常用就是CAS算法、原子类AtomicInteger

举例说明 乐观锁:git push代码到仓库,会对比本地版本号和远程版本号,若版本号一致就可以修改。不一致则无法修改

第三章-线程池

1线程池好处

线程池的优势: 线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

它的主要特点为:线程复用;控制最大并发数;管理线程。

第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的销耗。 第二:提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。 第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

ThreadPoolExecutor

2三种线程池

1Executors.newFixedThreadPool(int)执行长期任务性能好,创建一个线程池,一池有N个固定的线程,有固定线程数的线程 2Executors.newSingleThreadExecutor()一个任务一个任务的执行,一池一线程 3Executors.newCachedThreadPool()执行很多短期异步任务,线程池根据需要创建新线程, 但在先前构建的线程可用时将重用它们。可扩容,遇强则强

==这三个线程池的底层都是ThreadPoolExecutor==

1Executors.newFixedThreadPool(int) /fɪkst/ 确定的;固执的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) {
    //创建一个含有n个线程的线程池
    ExecutorService executorService = Executors.newFixedThreadPool(5);

    for (int i = 0; i <10 ; i++) {
        //模拟10个线程向线程池请求服务
        int j=i;
        executorService.execute(()->{
            System.out.println(Thread.currentThread().getName()+"进行业务办理:"+j);
        });
        try {
            TimeUnit.MILLISECONDS.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    executorService.shutdown();//线程池使用完后需要关闭
}
//结果是线程池5个线程 轮询执行10个线程 直到执行完毕

3线程池7参数

ThreadpoolExecutor的构造方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 public ThreadPoolExecutor(int corePoolSize,
                           int maximumPoolSize,
                           long keepAliveTime,
                           TimeUnit unit,
                           BlockingQueue<Runnable> workQueue,
                           ThreadFactory threadFactory,
                           RejectedExecutionHandler handler) {
   if (corePoolSize < 0 ||
       maximumPoolSize <= 0 ||
       maximumPoolSize < corePoolSize ||
       keepAliveTime < 0)
     throw new IllegalArgumentException();
   if (workQueue == null || threadFactory == null || handler == null)
     throw new NullPointerException();
   this.acc = System.getSecurityManager() == null ?null :
   AccessController.getContext();
   this.corePoolSize = corePoolSize;
   this.maximumPoolSize = maximumPoolSize;
   this.workQueue = workQueue;
   this.keepAliveTime = unit.toNanos(keepAliveTime);
   this.threadFactory = threadFactory;
   this.handler = handler;
    }

七大参数详解

1、corePoolSize:线程池中的常驻核心线程数 2、maximumPoolSize:线程池中能够容纳同时 执行的最大线程数,此值必须大于等于1 3、keepAliveTime:多余的空闲线程的存活时间当前池中线程数量超过corePoolSize时,当空闲时间达到keepAliveTime时,多余线程会被销毁直到只剩下corePoolSize个线程为止。 4、unit:keepAliveTime的单位 5、workQueue:任务队列,被提交但尚未被执行的任务 6、threadFactory:表示生成线程池中工作线程的线程工厂, 用于创建线程,一般默认的即可 7、handler:拒绝策略,表示当队列满了,并且工作线程大于 等于线程池的最大线程数(maximumPoolSize)时如何来拒绝 请求执行的runnable的策略

image-20200701223834516

1任务来了,核心线程执行。然后其它任务放到阻塞队列。 2阻塞队列满了以后就进行扩容(达到上线),然后继续执行。 3若达到了最大线程数,则执行拒绝策略。

4拒绝策略和使用

image-20200704151457425

==能够容纳的请求=最大线程数+阻塞队列的长度。==

四种拒绝策略

AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行

CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。

DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中尝试再次提交当前任务。

DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.executor;

import java.util.concurrent.*;

public class ThreadPoolExecutorTest {
   public static void main(String[] args) {
       ExecutorService executorService = Executors.newFixedThreadPool(4);

       ThreadPoolExecutor pool=new ThreadPoolExecutor(
               2,
               Integer.MAX_VALUE,
               2,TimeUnit.SECONDS,
               new LinkedBlockingQueue<Runnable>(3),
               Executors.defaultThreadFactory(),
               new ThreadPoolExecutor.DiscardOldestPolicy());
       for (int i = 0; i <110 ; i++) {
           int j=i;
       pool.execute(()->{
          try {
               TimeUnit.MILLISECONDS.sleep(300);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
          int ii=j;
           System.out.println(Thread.currentThread().getName()+": hello world!!!"+ii);
       });
       }
   }
}

最大线程设置

若是CPU密集:Runtime.getRuntime().availableProcessors();cpu核心+1,上面就是获得cpu核心。 若是IO密集