线程 和 进程
- 空间
进程:独立的进程空间, 数据存放的堆栈空间独立
线程:堆空间共享 栈空间独立 相互影响 - 优先级:用户体验高的优先级设高点
独占式操作系统 有优先级概念,共享式则没有
线程内部 程序顺序执行
Thread.MIN_PRIORITY
Thread.MAX_PRIORITY
Thread.NORM_PRIORITY
// 当进程中只剩下守护线程时候 所有守护线程强制终止
// GC 就运行在一个守护线程上的,进程在所有前台线程结束后结束
// 软件的核心运算必须放前台线程算, 不明确都应放前台
- 守护:每个线程都可以或不可以标记为一个守护程序
void SetDaemon(boolean); // 守护线程"后台线程"
当某个线程中运行的代码创建一个新 Thread 对象时, 该新线程的初始**优先级**被设定为创建线程的优先级,并且当且仅当创建线程是**守护线程**时,新线程才是守护程序。
通常都会有 "单个非守护线程"(它通常会调用某个指定类的 main 方法)
- Java 虚拟机会继续执行线程,直到下列任一情况出现时为止:
1.调用了 **Runtime** 类的 **exit** 方法,并且**安全管理器**允许退出操作发生。
2.**非守护**线程的所有线程都已停止运行
无论是通过从对 run 方法的调用中返回 ,还是通过抛出一个传播到 run 方法之外的异常
- 线程终止方式
// 1.正常终止 // 2.异常结束
LifeCycle 线程生命周期
1.Thread 基本操作
Thread.yeild() // 正在运行的线程让出 cpu 返回 Runnable 状态
sleep(long ms) // 进入阻塞状态 到事件进入 Runnable
interrupt() // 打断返回, 被打断的线程进入Runnable 并抛InterruptedException
sleep() // 不能用来进行时间控制,控制高精度运算完全不行,TimeOut 状态线程不占用处理器
void Thread.sleep();
void Thread.join(); // 加入等待当前线程结束 抛出InterruptException
// 线程1 run()内有 t2.join(); 则 t1 要等 t2 线程运行结束才能执行
睡眠&异常打断机制有重要实际作用:
// 如看视频缓存,协调两线程的工作
2.sleep() & wait() 区别
sleep() 来自Thread() 休眠 自动唤醒,
线程暂停 但监控状态保持 自动恢复 不释放锁 只让 cpuwait() 来自Object() 手工调用 notify() notifyAll()
放弃对象锁 进入锁池
// 线程interrupt()
public class Demo06 {
static Thread ft;
static Thread dt;
public static void main(String[] args) {
ft = new FanThread();
dt = new DogThread();
ft.start();
dt.start();
}
}
class FanThread extends Thread{
@Override
public void run(){
try {
System.out.println("Start FanThread");
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
System.out.println("梦被打醒");
e.printStackTrace();
}
}
}
class DogThread extends Thread{
@Override
public void run(){
try {
Thread.sleep(500);
Demo06.ft.interrupt();
} catch (Exception e) {
// TODO: handle exception
}
}
}
// 多线程协调工作
public class Demo10 {
static Thread t1;
static Thread t2;
static List<Integer> list;
//体现两个线程间的协调工作能力
//没有join()控制 结果会相当不理想
public static void main(String[] args) {
list = new ArrayList<Integer>();
//t1向集合中添加10个数
t1 = new Thread(){
public void run(){
for (int i = 0; i < 10; i++) {
list.add((int)(Math.random()*100 + 1));
}
System.out.println(list);
}
};
//t2在t1添加完后对数排序
t2 = new Thread(){
public void run(){
try {
t1.join();
//join()2种结束方式:t1结束join()返回继续运行;
//t2等待期间 t1 t2被其他线程打断 返回继续运行 ???
//t2等待期间 t1 isInterrupted 会出什么情况
Collections.sort(list);
System.out.println(list);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
t1.start();
t2.start();
}
}
线程参数
1.类成员返回数据 & 传递数据,与主线程产生时间上的冲突
while(thread1.value1!==null&& …) sleep(100);-方法
thread.start(); thread.join() -- 更好方法
2.回掉函数返回数据
构造方法传递数据
创建对象时就传参完毕,方便 但不适合量大数据 需多重载构造方法变量&方法传参 - 字段对实例逐个赋值
回调函数方式 – 动态传递参数,本质是事件函数 – 最原始引发事件的过程
public class MyThread3 extends Thread {
private Work work;
public MyThread3(Work work){
this.work = work;
}
public void run(){
java.util.Random random = new java.util.Random();
Data data = new Data();
int n1 = random.nextInt(1000);
int n2 = random.nextInt(2000);
int n3 = random.nextInt(3000);
work.process(data,n1,n2,n3);
// 使用回调函数
System.out.println(
String.valueOf(n1)+"+"+
String.valueOf(n2)+"+"+
String.valueOf(n3)+"="+
data.value);
}
public static void main(String[] args) {
Thread thread = new MyThread3(new Work());
thread.start();
}
}
class Data{
public int value = 0;
}
// 因为要用在线程中 可能涉及到线程同步问题 所以不用int
// 而是包装为一个Data类 这样用 synchronized 同步控制就能成功
class Work{
public void process(Data data,Integer ... numbers){
for (int n : numbers) {
data.value+=n;
}
}
}
Thread 线程操作
1.wait notify 等待通知机制
wait(): 释放锁 进入WAITING状态, 等待 queue ==> 等待唤醒
notify(): 唤醒 从等待 queue 移动到同步 queue 进入BLOCKED ==> 等待 lock
竞争锁 success 进入 Runnable 状态 ==> 等待 CPU => RUNING
wait() notify() notifyAll() 调用的前提都是获得了对象锁Monitor
[等待通知]
// 线程A 作为消费者:
// 线程B 作为生产者:
/**
获取对象的锁
进入 while(判断条件),并调用 wait() 方法
当条件满足跳出循环执行具体处理逻辑
线程 B 作为生产者:
获取对象锁
更改与线程 A 共用的判断条件
调用 notify() 方法
**/
// =========================================================
//Thread A
synchronized(Object){
while(条件){
Object.wait();
}
//do something
}
//Thread B
synchronized(Object){
条件=false;//改变条件
Object.notify();
}
- 2.两Thread交替打印奇偶数
public class TwoThreadWaitNotify {
private int start = 1;
private boolean flag = false;
public static void main(String[] args) {
TwoThreadWaitNotify twoThread = new TwoThreadWaitNotify();
Thread t1 = new Thread(new OuNum(twoThread));
t1.setName("A");
Thread t2 = new Thread(new JiNum(twoThread));
t2.setName("B");
t1.start();
t2.start();
}
/**
* 偶数线程
*/
public static class OuNum implements Runnable {
private TwoThreadWaitNotify number;
public OuNum(TwoThreadWaitNotify number) {
this.number = number;
}
@Override
public void run() {
while (number.start <= 100) {
synchronized (TwoThreadWaitNotify.class) {
System.out.println("偶数线程抢到锁了");
if (number.flag) {
System.out.println(Thread.currentThread().getName() + "+-+偶数" + number.start);
number.start++;
number.flag = false;
TwoThreadWaitNotify.class.notify();
}else {
try {
TwoThreadWaitNotify.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
/**
* 奇数线程
*/
public static class JiNum implements Runnable {
private TwoThreadWaitNotify number;
public JiNum(TwoThreadWaitNotify number) {
this.number = number;
}
@Override
public void run() {
while (number.start <= 100) {
synchronized (TwoThreadWaitNotify.class) {
System.out.println("奇数线程抢到锁了");
if (!number.flag) {
System.out.println(Thread.currentThread().getName() + "+-+奇数" + number.start);
number.start++;
number.flag = true;
TwoThreadWaitNotify.class.notify();
}else {
try {
TwoThreadWaitNotify.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
}
2.wait sleep 比较
sleep: Thread 不释放锁,让出系统资源
自动唤醒,可用 interrupt 强行打断
Thread.sleep(0) 触发操作系统立刻重新进行一次 CPU 竞争
可任意处使用,必须捕获异常wait: Object 让出锁,进入等待池,让出系统资源
需 notify 才会进入 Runnable 等待系统资源分配,
只能在同步块、方法中使用
无需捕获异常
3.join 阻塞当前线程
- join = wait + notifyAll
调用了 t.join(),就必须要等待线程t执行完毕后,才能继续执行其他线程
- 本质:用 Object.wait() 通知一个 Thread 等待,让出 CPU 资源
注意这里是会放弃已经占有的资源的
直到Thread执行完毕,再调用notify()唤醒当前正在运行的线程// 核心逻辑 while (isAlive()) { wait(0); }
案例:
private static void join() throws InterruptedException { Thread t1 = new Thread(new Runnable() { @Override public void run() { LOGGER.info("running"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } }) ; Thread t2 = new Thread(new Runnable() { @Override public void run() { LOGGER.info("running2"); try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } } }) ; t1.start(); t2.start(); //等待线程1终止 t1.join(); //等待线程2终止 t2.join(); LOGGER.info("main over"); }
4.interrupted 中断线程
本线程中断自身是被允许的,且”中断标记”设置为 true
它线程调用本线程的 interrupt() 方法时,会通过 checkAccess() 检查权限
这有可能抛出SecurityException异常
1.thread.interrupt(): 将Thread一属性设置为true, 还是需要手动判断的
2.如果抛出了 InterruptedException 异常,该标志就会被JVM 重置为false
public class StopThread implements Runnable {
@Override
public void run() {
while ( !Thread.currentThread().isInterrupted()) {
// 线程执行具体逻辑
System.out.println(
Thread.currentThread().getName() + "运行中。。");
}
System.out.println(
Thread.currentThread().getName() + "退出。。");
}
public static void main(String[] args)
throws InterruptedException {
Thread thread = new Thread(new StopThread(), "thread A");
thread.start();
System.out.println("main 线程正在运行") ;
TimeUnit.MILLISECONDS.sleep(10) ;
thread.interrupt();
}
}// 结果
thread A运行中。。
thread A运行中。。
thread A退出。
5.suspend 挂起 resume 重启 @Deprecated
suspend() 挂起线程,并不会去释放任何锁资源,其他线程都无法访问被它占用的锁。
直到对应的线程执行 resume() 方法后,被挂起的线程才能继续,从而其它被阻塞在这个锁的线程才可以继续执行 如果 resume() 操作出现在 suspend() 之前执行,那么线程将一直处于挂起状态,同时一直占用锁,这就产生了死锁。
被挂起的线程,它的线程状态居然还是 Runnable Blog
6.awaitTermination() 等待线程池进入 Terminated 状态
主 Thread 等待 ThreadPool 中 Thread 全部执行完毕,使用此方法前提是使用了shutdown()方法,如下案例
private static void executorService() throws Exception{
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(10);
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5,5,1, TimeUnit.MILLISECONDS,queue);
poolExecutor.execute(new Runnable() {
@Override
public void run() {
LOGGER.info("running");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
poolExecutor.execute(new Runnable() {
@Override
public void run() {
LOGGER.info("running2");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
poolExecutor.shutdown();
while (!poolExecutor.awaitTermination(1,TimeUnit.SECONDS)){
LOGGER.info("线程还在执行。。。");
}
LOGGER.info("main over");
}
7.Channel 利用管道进行线程间通信
public static void piped() throws IOException {
//面向于字符 PipedInputStream 面向于字节
PipedWriter writer = new PipedWriter();
PipedReader reader = new PipedReader();
//输入输出流建立连接
writer.connect(reader);
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("running");
try {
for (int i = 0; i < 10; i++) {
writer.write(i+"");
Thread.sleep(10);
}
} catch (Exception e) {
} finally {
try {
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("running2");
int msg = 0;
try {
while ((msg = reader.read()) != -1) {
LOGGER.info("msg={}", (char) msg);
}
} catch (Exception e) {
}
}
});
t1.start();
t2.start();
}
8.其它线程通信方式
- volatile 共享可见变量
CountDownLatch 并发工具
实现 join 相同的功能,但是更加的灵活,基于 AQS AbstractQueuedSynchronized 实现,因为Thread都通过ThreadPool来处理,这个好像没啥用。
初始化一个 CountDownLatch 时告诉并发的线程,然后在每个线程处理完毕之后调用 countDown() 方法,
该方法会将 AQS 内置的一个 state 状态 -1 , 最终在主线程调用 await() 方法,它会阻塞直到 state == 0 的时候返回。
private static void countDownLatch() throws Exception{
int thread = 3 ;
long start = System.currentTimeMillis();
final CountDownLatch countDown = new CountDownLatch(thread);
for (int i= 0 ;i<thread ; i++){
new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("thread run");
try {
Thread.sleep(2000);
countDown.countDown();
LOGGER.info("thread end");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
countDown.await(); // 阻塞知道所有线程处理完毕
long stop = System.currentTimeMillis();
LOGGER.info("main over total time={}",stop-start);
}
碰到的一个案例,还未解析清楚
final CountDownLatch latch = new CountDownLatch(ukeSmallClassList.size());
for (SmallClassLessonExcelVo usc : ukeSmallClassList) {
UserInfoDTO teacherInfos = teacherOrTutorUserInfoMap.get(usc.getTeacherId());
UserInfoDTO actualTeacherInfos = teacherOrTutorUserInfoMap.get(usc.getActualTeacherId());
setTeacherInfo(null, usc, null, teacherInfos, false);
setTeacherInfo(null, usc, null, actualTeacherInfos, true);
UserInfoDTO tutorInfos = teacherOrTutorUserInfoMap.get(usc.getTutorId());
UserInfoDTO actualTutorInfos = teacherOrTutorUserInfoMap.get(usc.getActualTutorId());
setTutorInfo(null, usc, null, tutorInfos, false);
setTutorInfo(null, usc, null, actualTutorInfos, true);
FixedThreadPoolUtils.getInstance().addTask(new BaseRunable(
new CourseLessonExportTask(usc, courseMap, lessonDetailMap), latch));
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
CyclicBarrier 并发工具
屏障或者是栅栏,可以用于线程间通信, 实现等待 N 个线程都达到某个状态后继续运行的效果
可以实现 CountDownLatch 同样的功能,但是要更加灵活
可以调用 reset() 方法重置 CyclicBarrier
(需要自行捕获 BrokenBarrierException 处理) 然后重新执行
首先初始化线程参与者
调用 await() 将会在所有参与者线程都调用之前等待
直到所有参与者都调用了await()后,所有线程从 await()返回继续后续逻辑
private static void cyclicBarrier() throws Exception {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3) ;
new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("thread run");
try {
cyclicBarrier.await() ;
} catch (Exception e) {
e.printStackTrace();
}
LOGGER.info("thread end do something");
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("thread run");
try {
cyclicBarrier.await() ;
} catch (Exception e) {
e.printStackTrace();
}
LOGGER.info("thread end do something");
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("thread run");
try {
Thread.sleep(5000);
cyclicBarrier.await() ;
} catch (Exception e) {
e.printStackTrace();
}
LOGGER.info("thread end do something");
}
}).start();
LOGGER.info("main thread");
}
参考:
文档信息
- 本文作者:jiushun.cheng
- 本文链接:https://minipa.github.io/2016/07/01/thread/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)