Java 线程 一 Thread

2016/07/01 Thread 线程 高并发 共 10892 字,约 32 分钟
MiniPa

线程 和 进程

  • 空间
    进程:独立的进程空间, 数据存放的堆栈空间独立
    线程:堆空间共享 栈空间独立 相互影响
  • 优先级:用户体验高的优先级设高点
    独占式操作系统 有优先级概念,共享式则没有
    线程内部 程序顺序执行
Thread.MIN_PRIORITY 
Thread.MAX_PRIORITY 
Thread.NORM_PRIORITY

// 当进程中只剩下守护线程时候 所有守护线程强制终止
// GC 就运行在一个守护线程上的,进程在所有前台线程结束后结束
// 软件的核心运算必须放前台线程算, 不明确都应放前台
  • 守护:每个线程都可以或不可以标记为一个守护程序
void SetDaemon(boolean); // 守护线程"后台线程"

当某个线程中运行的代码创建一个新 Thread 对象时, 该新线程的初始**优先级**被设定为创建线程的优先级,并且当且仅当创建线程是**守护线程**新线程才是守护程序   
通常都会有 "单个非守护线程"它通常会调用某个指定类的 main 方法  
  • Java 虚拟机会继续执行线程,直到下列任一情况出现时为止:
1.调用了 **Runtime** 类的 **exit** 方法,并且**安全管理器**允许退出操作发生。 
2.**非守护**线程的所有线程都已停止运行

无论是通过从对 run 方法的调用中返回 ,还是通过抛出一个传播到 run 方法之外的异常

  • 线程终止方式
    // 1.正常终止 // 2.异常结束

LifeCycle 线程生命周期

1.Thread 基本操作
Thread.yeild() // 正在运行的线程让出 cpu 返回 Runnable 状态
sleep(long ms) // 进入阻塞状态 到事件进入 Runnable
interrupt()    // 打断返回, 被打断的线程进入Runnable 并抛InterruptedException

sleep() // 不能用来进行时间控制,控制高精度运算完全不行,TimeOut 状态线程不占用处理器
void Thread.sleep();
void Thread.join(); // 加入等待当前线程结束 抛出InterruptException
	// 线程1 run()内有 t2.join(); 则 t1 要等 t2 线程运行结束才能执行

睡眠&异常打断机制有重要实际作用
	// 如看视频缓存,协调两线程的工作

threadlifecycle

2.sleep() & wait() 区别
  • sleep() 来自Thread() 休眠 自动唤醒,
    线程暂停 但监控状态保持 自动恢复 不释放锁 只让 cpu

  • wait() 来自Object() 手工调用 notify() notifyAll()
    放弃对象锁 进入锁池

// 线程interrupt()
public class Demo06 {
	static Thread ft;
	static Thread dt;

	public static void main(String[] args) {
		ft = new FanThread();
		dt = new DogThread();
		ft.start();
		dt.start();
	}
}

class FanThread extends Thread{
	@Override
	public void run(){
		try {
			System.out.println("Start FanThread");
			Thread.sleep(5000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			System.out.println("梦被打醒");
			e.printStackTrace();
		}
	}
}

class DogThread extends Thread{
	@Override
	public void run(){
		try {
			Thread.sleep(500);
			Demo06.ft.interrupt();
		} catch (Exception e) {
			// TODO: handle exception
		}
	}
}

// 多线程协调工作
public class Demo10 {

	static Thread t1;
	static Thread t2;
	static List<Integer> list;
	//体现两个线程间的协调工作能力
	//没有join()控制 结果会相当不理想
	public static void main(String[] args) {
		list = new ArrayList<Integer>();
		//t1向集合中添加10个数
		t1 = new Thread(){
			public void run(){
				for (int i = 0; i < 10; i++) {
					list.add((int)(Math.random()*100 + 1));
				}
				System.out.println(list);
			}
		};
		
		//t2在t1添加完后对数排序
		t2 = new Thread(){
			public void run(){
				try {
					t1.join();
					//join()2种结束方式:t1结束join()返回继续运行;
					//t2等待期间 t1 t2被其他线程打断  返回继续运行 ???
					//t2等待期间  t1 isInterrupted 会出什么情况
					Collections.sort(list);
					System.out.println(list);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		};
		t1.start();
		t2.start();
	}
}

线程参数

1.类成员返回数据 & 传递数据,与主线程产生时间上的冲突
while(thread1.value1!==null&& …) sleep(100);-方法  
thread.start(); thread.join() -- 更好方法
2.回掉函数返回数据
  • 构造方法传递数据
    创建对象时就传参完毕,方便 但不适合量大数据 需多重载构造方法

  • 变量&方法传参 - 字段对实例逐个赋值

  • 回调函数方式 – 动态传递参数,本质是事件函数 – 最原始引发事件的过程

public class MyThread3 extends Thread {
	private Work work;
	public MyThread3(Work work){
		this.work = work;
	}
	
	public void run(){
		java.util.Random random = new java.util.Random();
		Data data =  new Data();
		int n1 = random.nextInt(1000);
		int n2 = random.nextInt(2000);
		int n3 = random.nextInt(3000);
		
		work.process(data,n1,n2,n3);
		// 使用回调函数
		System.out.println(
			String.valueOf(n1)+"+"+
			String.valueOf(n2)+"+"+
			String.valueOf(n3)+"="+
			data.value);
	}
	
	public static void main(String[] args) {
		Thread thread = new MyThread3(new Work());
		thread.start();
	}
}

class Data{
	public int value = 0;
} 

// 因为要用在线程中 可能涉及到线程同步问题 所以不用int
// 而是包装为一个Data类 这样用 synchronized 同步控制就能成功
class Work{
	public void process(Data data,Integer ... numbers){
		for (int n : numbers) {
			data.value+=n;
		}
	}
}

Thread 线程操作

1.wait notify 等待通知机制
  • wait(): 释放锁 进入WAITING状态, 等待 queue ==> 等待唤醒

  • notify(): 唤醒 从等待 queue 移动到同步 queue 进入BLOCKED ==> 等待 lock

竞争锁 success 进入 Runnable 状态 ==> 等待 CPU => RUNING
wait() notify() notifyAll() 调用的前提都是获得了对象锁Monitor

[等待通知]

// 线程A 作为消费者:
// 线程B 作为生产者:

/**
获取对象的锁
进入 while(判断条件),并调用 wait() 方法
当条件满足跳出循环执行具体处理逻辑

线程 B 作为生产者:
获取对象锁
更改与线程 A 共用的判断条件
调用 notify() 方法
**/
// =========================================================
//Thread A
synchronized(Object){
    while(条件){
        Object.wait();
    }
    //do something
}

//Thread B
synchronized(Object){
    条件=false;//改变条件
    Object.notify();
}
  • 2.两Thread交替打印奇偶数
public class TwoThreadWaitNotify {

    private int start = 1;
    private boolean flag = false;

    public static void main(String[] args) {
        TwoThreadWaitNotify twoThread = new TwoThreadWaitNotify();

        Thread t1 = new Thread(new OuNum(twoThread));
        t1.setName("A");

        Thread t2 = new Thread(new JiNum(twoThread));
        t2.setName("B");

        t1.start();
        t2.start();
    }

    /**
     * 偶数线程
     */
    public static class OuNum implements Runnable {
        private TwoThreadWaitNotify number;
        public OuNum(TwoThreadWaitNotify number) {
            this.number = number;
        }

        @Override
        public void run() {

            while (number.start <= 100) {
                synchronized (TwoThreadWaitNotify.class) {
                    System.out.println("偶数线程抢到锁了");
                    if (number.flag) {
                        System.out.println(Thread.currentThread().getName() + "+-+偶数" + number.start);
                        number.start++;
                        number.flag = false;
                        TwoThreadWaitNotify.class.notify();

                    }else {
                        try {
                            TwoThreadWaitNotify.class.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }

            }
        }
    }


    /**
     * 奇数线程
     */
    public static class JiNum implements Runnable {
        private TwoThreadWaitNotify number;
        public JiNum(TwoThreadWaitNotify number) {
            this.number = number;
        }

        @Override
        public void run() {
            while (number.start <= 100) {
                synchronized (TwoThreadWaitNotify.class) {
                    System.out.println("奇数线程抢到锁了");
                    if (!number.flag) {
                        System.out.println(Thread.currentThread().getName() + "+-+奇数" + number.start);
                        number.start++;

                        number.flag = true;

                        TwoThreadWaitNotify.class.notify();
                    }else {
                        try {
                            TwoThreadWaitNotify.class.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }
}
2.wait sleep 比较
  • sleep: Thread 不释放锁,让出系统资源
    自动唤醒,可用 interrupt 强行打断
    Thread.sleep(0) 触发操作系统立刻重新进行一次 CPU 竞争
    可任意处使用,必须捕获异常

  • wait: Object 让出锁,进入等待池,让出系统资源
    需 notify 才会进入 Runnable 等待系统资源分配,
    只能在同步块、方法中使用
    无需捕获异常

3.join 阻塞当前线程

Blog

  • join = wait + notifyAll
    调用了 t.join(),就必须要等待线程t执行完毕后,才能继续执行其他线程

join

  • 本质:用 Object.wait() 通知一个 Thread 等待,让出 CPU 资源
    注意这里是会放弃已经占有的资源的
    直到Thread执行完毕,再调用notify()唤醒当前正在运行的线程
      // 核心逻辑
      while (isAlive()) {
          wait(0);
      }
    
  • 案例:

    private static void join() throws InterruptedException {
        Thread t1 = new Thread(new Runnable() {
          @Override
            public void run() {
                LOGGER.info("running");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }) ;
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("running2");
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }) ;
      
        t1.start();
        t2.start();
      
        //等待线程1终止
        t1.join();
        //等待线程2终止
        t2.join();
      
        LOGGER.info("main over");
    }
    
4.interrupted 中断线程

本线程中断自身是被允许的,且”中断标记”设置为 true
它线程调用本线程的 interrupt() 方法时,会通过 checkAccess() 检查权限
这有可能抛出SecurityException异常

Blog

1.thread.interrupt(): 将Thread一属性设置为true, 还是需要手动判断的
2.如果抛出了 InterruptedException 异常,该标志就会被JVM 重置为false

  public class StopThread implements Runnable {
      @Override
      public void run() {
          while ( !Thread.currentThread().isInterrupted()) {
              // 线程执行具体逻辑
              System.out.println(
                     Thread.currentThread().getName() + "运行中。。");
          }
          System.out.println(
  	              Thread.currentThread().getName() + "退出。。");
  
      }
  
      public static void main(String[] args) 
                              throws InterruptedException {
          Thread thread = new Thread(new StopThread(), "thread A");
          thread.start();
          System.out.println("main 线程正在运行") ;
          TimeUnit.MILLISECONDS.sleep(10) ;
          thread.interrupt();
      }
  }// 结果
  thread A运行中。。
  thread A运行中。。
  thread A退出
5.suspend 挂起 resume 重启 @Deprecated

suspend() 挂起线程,并不会去释放任何锁资源,其他线程都无法访问被它占用的锁。
直到对应的线程执行 resume() 方法后,被挂起的线程才能继续,从而其它被阻塞在这个锁的线程才可以继续执行 如果 resume() 操作出现在 suspend() 之前执行,那么线程将一直处于挂起状态,同时一直占用锁,这就产生了死锁。

被挂起的线程,它的线程状态居然还是 Runnable Blog

6.awaitTermination() 等待线程池进入 Terminated 状态

主 Thread 等待 ThreadPool 中 Thread 全部执行完毕,使用此方法前提是使用了shutdown()方法,如下案例

private static void executorService() throws Exception{
    BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(10);
    ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5,5,1, TimeUnit.MILLISECONDS,queue);

    poolExecutor.execute(new Runnable() {
        @Override
        public void run() {
            LOGGER.info("running");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
    poolExecutor.execute(new Runnable() {
        @Override
        public void run() {
            LOGGER.info("running2");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });

    poolExecutor.shutdown();
    while (!poolExecutor.awaitTermination(1,TimeUnit.SECONDS)){
        LOGGER.info("线程还在执行。。。");
    }
    LOGGER.info("main over");
}
7.Channel 利用管道进行线程间通信
public static void piped() throws IOException {
    //面向于字符 PipedInputStream 面向于字节
    PipedWriter writer = new PipedWriter();
    PipedReader reader = new PipedReader();

    //输入输出流建立连接
    writer.connect(reader);

    Thread t1 = new Thread(new Runnable() {
        @Override
        public void run() {
            LOGGER.info("running");
            try {
                for (int i = 0; i < 10; i++) {
                    writer.write(i+"");
                    Thread.sleep(10);
                }
            } catch (Exception e) {

            } finally {
                try {
                    writer.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }
    });

    Thread t2 = new Thread(new Runnable() {
        @Override
        public void run() {
            LOGGER.info("running2");
            int msg = 0;
            try {
                while ((msg = reader.read()) != -1) {
                    LOGGER.info("msg={}", (char) msg);
                }
            } catch (Exception e) {

            }
        }
    });
    t1.start();
    t2.start();
}
8.其它线程通信方式
  • volatile 共享可见变量

CountDownLatch 并发工具

实现 join 相同的功能,但是更加的灵活,基于 AQS AbstractQueuedSynchronized 实现,因为Thread都通过ThreadPool来处理,这个好像没啥用。

初始化一个 CountDownLatch 时告诉并发的线程,然后在每个线程处理完毕之后调用 countDown() 方法,
该方法会将 AQS 内置的一个 state 状态 -1 , 最终在主线程调用 await() 方法,它会阻塞直到 state == 0 的时候返回。

private static void countDownLatch() throws Exception{
    int thread = 3 ;
    long start = System.currentTimeMillis();
    final CountDownLatch countDown = new CountDownLatch(thread);

    for (int i= 0 ;i<thread ; i++){
        new Thread(new Runnable() {
            @Override
            public void run() {
                LOGGER.info("thread run");
                try {
                    Thread.sleep(2000);
                    countDown.countDown();

                    LOGGER.info("thread end");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    countDown.await(); // 阻塞知道所有线程处理完毕
    long stop = System.currentTimeMillis();
    LOGGER.info("main over total time={}",stop-start);
}

countdown

碰到的一个案例,还未解析清楚

final CountDownLatch latch = new CountDownLatch(ukeSmallClassList.size());
for (SmallClassLessonExcelVo usc : ukeSmallClassList) {

    UserInfoDTO teacherInfos = teacherOrTutorUserInfoMap.get(usc.getTeacherId());
    UserInfoDTO actualTeacherInfos = teacherOrTutorUserInfoMap.get(usc.getActualTeacherId());
    setTeacherInfo(null, usc, null, teacherInfos, false);
    setTeacherInfo(null, usc, null, actualTeacherInfos, true);

    UserInfoDTO tutorInfos = teacherOrTutorUserInfoMap.get(usc.getTutorId());
    UserInfoDTO actualTutorInfos = teacherOrTutorUserInfoMap.get(usc.getActualTutorId());
    setTutorInfo(null, usc, null, tutorInfos, false);
    setTutorInfo(null, usc, null, actualTutorInfos, true);

    FixedThreadPoolUtils.getInstance().addTask(new BaseRunable(
        new CourseLessonExportTask(usc, courseMap, lessonDetailMap), latch));
}

try {
    latch.await();
} catch (InterruptedException e) {
    e.printStackTrace();
    Thread.currentThread().interrupt();
}

CyclicBarrier 并发工具

屏障或者是栅栏,可以用于线程间通信, 实现等待 N 个线程都达到某个状态后继续运行的效果
可以实现 CountDownLatch 同样的功能,但是要更加灵活
可以调用 reset() 方法重置 CyclicBarrier
(需要自行捕获 BrokenBarrierException 处理) 然后重新执行

  • 首先初始化线程参与者

  • 调用 await() 将会在所有参与者线程都调用之前等待

  • 直到所有参与者都调用了await()后,所有线程从 await()返回继续后续逻辑

private static void cyclicBarrier() throws Exception {
    CyclicBarrier cyclicBarrier = new CyclicBarrier(3) ;

    new Thread(new Runnable() {
        @Override
        public void run() {
            LOGGER.info("thread run");
            try {
                cyclicBarrier.await() ;
            } catch (Exception e) {
                e.printStackTrace();
            }

            LOGGER.info("thread end do something");
        }
    }).start();

    new Thread(new Runnable() {
        @Override
        public void run() {
            LOGGER.info("thread run");
            try {
                cyclicBarrier.await() ;
            } catch (Exception e) {
                e.printStackTrace();
            }

            LOGGER.info("thread end do something");
        }
    }).start();

    new Thread(new Runnable() {
        @Override
        public void run() {
            LOGGER.info("thread run");
            try {
                Thread.sleep(5000);
                cyclicBarrier.await() ;
            } catch (Exception e) {
                e.printStackTrace();
            }

            LOGGER.info("thread end do something");
        }
    }).start();

    LOGGER.info("main thread");
}

CyclicBarrier

参考:

文档信息

Search

    Table of Contents