CS-Notes/interview/java/thread.md
2018-08-02 15:58:10 +08:00

17 KiB
Raw Blame History

CountDownLatch的用法

CountDownLatch 是一个同步工具类,用来协调多个线程之间的同步,或者说起到线程之间的通信。

如果想当前线程在别的线程执行完毕后执行可以使用CountDownLatch。

/**<p><b>Sample usage:</b> Here is a pair of classes in which a group
 * of worker threads use two countdown latches:
 * <ul>
 * <li>The first is a start signal that prevents any worker from proceeding
 * until the driver is ready for them to proceed;
 * <li>The second is a completion signal that allows the driver to wait
 * until all workers have completed.
 * </ul>
 *
 *  <pre> {@code
 * class Driver { // ...
 *   void main() throws InterruptedException {
 *     CountDownLatch startSignal = new CountDownLatch(1);
 *     CountDownLatch doneSignal = new CountDownLatch(N);
 *
 *     for (int i = 0; i < N; ++i) // create and start threads
 *       new Thread(new Worker(startSignal, doneSignal)).start();
 *
 *     doSomethingElse();            // don't let run yet
 *     startSignal.countDown();      // let all threads proceed
 *     doSomethingElse();
 *     doneSignal.await();           // wait for all to finish
 *   }
 * }
 *
 * class Worker implements Runnable {
 *   private final CountDownLatch startSignal;
 *   private final CountDownLatch doneSignal;
 *   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
 *     this.startSignal = startSignal;
 *     this.doneSignal = doneSignal;
 *   }
 *   public void run() {
 *     try {
 *       startSignal.await();
 *       doWork();
 *       doneSignal.countDown();
 *     } catch (InterruptedException ex) {} // return;
 *   }
 *
 *   void doWork() { ... }
 * }}</pre>

线程创建

线程创建通常有3中方法

  • 继承 Thread 类
  • 实现 Runnable 接口
  • 实现 Callable 接口

实现 Runnable 接口来创建一个线程:

public class MyThread extends Thread {

  public static void main(String[] args) {
    Thread t1 = new MyThread();
    t1.start();
    Thread t2 = new Thread(new MyThread2());
    t2.start();
    Callable call = new MyCallable();
    FutureTask<String> task = new FutureTask<String>(call);
    Thread t3 =  new Thread(task);
    t3.start();
  }

  @Override
  public void run() {
    System.out.println("线程运行中---------extends");
  }

  static class MyThread2 implements Runnable{

    @Override
    public void run() {
      System.out.println("线程运行中------runnable");
    }
  }

  static class MyCallable implements Callable{

    @Override
    public String call() throws Exception {
      System.out.println("线程运行中------callable");
      return "call over!";
    }
  }
}

线程间协作

join

在线程中调用另一个线程的 join() 方法,会将当前线程挂起,而不是忙等待,直到目标线程结束。

对于以下代码,虽然 b 线程先启动,但是因为在 b 线程中调用了 a 线程的 join() 方法b 线程会等待 a 线程结束才继续执行,因此最后能够保证 a 线程的输出先于 b 线程的输出。

public class JoinExample {

    public static void main(String[] args) {
        JoinExample example = new JoinExample();
        example.test();
    }
    private class A extends Thread {
        @Override
        public void run() {
            System.out.println("A");
        }
    }

    private class B extends Thread {

        private A a;

        B(A a) {
            this.a = a;
        }

        @Override
        public void run() {
            try {
                a.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("B");
        }
    }

    public void test() {
        A a = new A();
        B b = new B(a);
        b.start();
        a.start();
    }
}

wait

wait 方法会使得当前线程进入等待状态,使用 wait() 挂起期间,线程会释放锁。这是因为,如果没有释放锁,那么其它线程就无法进入对象的同步方法或者同步控制块中,那么就无法执行 notify() 或者 notifyAll() 来唤醒挂起的线程,造成死锁。

wait() 和 sleep() 的区别

  • wait() 是 Object 的方法,而 sleep() 是 Thread 的静态方法;
  • wait() 会释放锁sleep() 不会。

interrupt

线程中断的方法有两种:

  1. 使用 interrupted()来作为线程循环执行的判断条件
public class InterruptExample {

    private static class MyThread2 extends Thread {
        @Override
        public void run() {
            while (!interrupted()) {
                // ..
            }
            System.out.println("Thread end");
        }
    }
}
  1. 使用InterruptedException

通过调用一个线程的 interrupt() 来中断该线程,如果该线程处于阻塞、限期等待或者无限期等待状态,那么就会抛出 InterruptedException从而提前结束该线程。但是不能中断 I/O 阻塞和 synchronized 锁阻塞

对于以下代码,在 main() 中启动一个线程之后再中断它,由于线程中调用了 Thread.sleep() 方法,因此会抛出一个 InterruptedException从而提前结束线程不执行之后的语句。

public class InterruptExample {

    private static class MyThread1 extends Thread {
        @Override
        public void run() {
            try {
                Thread.sleep(2000);
                System.out.println("Thread run");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
public static void main(String[] args) throws InterruptedException {
    Thread thread1 = new MyThread1();
    thread1.start();
    thread1.interrupt();
    System.out.println("Main run");
}

Condition

java.util.concurrent 类库中提供了 Condition 类来实现线程之间的协调,可以在 Condition 上调用 await() 方法使线程等待,其它线程调用 signal() 或 signalAll() 方法唤醒等待的线程。相比于 wait() 这种等待方式await() 可以指定等待的条件,因此更加灵活。

public class AwaitSignalExample {
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void before() {
        lock.lock();
        try {
            System.out.println("before");
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void after() {
        lock.lock();
        try {
            condition.await();
            System.out.println("after");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

Executor 框架

Executor 框架主要由3大部分组成如下

  • 任务

被执行的任务需要实现 Runnable 或者 Callable接口

  • 任务的执行

任务执行的核心接口 Executor, 以及继承自 Executor 的 ExecutorService 接口。 Executor 框架有两个关键类实现了 ExecutorService 接口(ThreadPoolExecutor 和 ScheduledThreadPoolExecutor)

  • 异步计算的结果

Future 接口和 FutureTask 类

Executor 主要的类和接口

类和接口 说明
Executor 一个接口其定义了一个接收Runnable对象的方法executor其方法签名为executor(Runnable command), Executor 框架的基础,它将任务的提交和任务的执行分离开来
ExecutorService 是一个比Executor使用更广泛的子类接口其提供了生命周期管理的方法以及可跟踪一个或多个异步任务执行状况返回Future的方法
AbstractExecutorService ExecutorService执行方法的默认实现
ThreadPoolExecutor 线程池的核心实现类用来执行被提交的任务。通过调用Executors以下静态工厂方法来创建线程池并返回一个ExecutorService对象
ScheduledExecutorService 一个可定时调度任务的接口
ScheduledThreadPoolExecutor ScheduledExecutorService 的实现类, 可以在给定的延迟后运行命令,或者定期执行命令, 比 Timer更加灵活功能更加强大。

ThreadPoolExecutor

ThreadPoolExecutor 使用 Executors来创建提供了4种类型的ThreadPoolExectuor, 分别是 newFixedThreadPool 、 newSingleThreadExecutor 、newScheduledThreadPool、 newCachedThreadPool.

  • 构造函数
/**
 * Creates a new {@code ThreadPoolExecutor} with the given initial
 * parameters.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @param maximumPoolSize the maximum number of threads to allow in the
 *        pool
 * @param keepAliveTime when the number of threads is greater than
 *        the core, this is the maximum time that excess idle threads
 *        will wait for new tasks before terminating.
 * @param unit the time unit for the {@code keepAliveTime} argument
 * @param workQueue the queue to use for holding tasks before they are
 *        executed.  This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method.
 * @param threadFactory the factory to use when the executor
 *        creates a new thread
 * @param handler the handler to use when execution is blocked
 *        because the thread bounds and queue capacities are reached
 * @throws IllegalArgumentException if one of the following holds:<br>
 *         {@code corePoolSize < 0}<br>
 *         {@code keepAliveTime < 0}<br>
 *         {@code maximumPoolSize <= 0}<br>
 *         {@code maximumPoolSize < corePoolSize}
 * @throws NullPointerException if {@code workQueue}
 *         or {@code threadFactory} or {@code handler} is null
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

参数说明:

corePoolSize核心线程数如果运行的线程少于corePoolSize则创建新线程来执行新任务即使线程池中的其他线程是空闲的

maximumPoolSize:最大线程数可允许创建的线程数corePoolSize和maximumPoolSize设置的边界自动调整池大小 corePoolSize <运行的线程数< maximumPoolSize:仅当队列满时才创建新线程 corePoolSize=运行的线程数= maximumPoolSize创建固定大小的线程池

keepAliveTime:如果线程数多于corePoolSize,则这些多余的线程的空闲时间超过keepAliveTime时将被终止

unit:keepAliveTime参数的时间单位

workQueue:保存任务的阻塞队列,与线程池的大小有关: 当运行的线程数少于corePoolSize时在有新任务时直接创建新线程来执行任务而无需再进队列 当运行的线程数等于或多于corePoolSize在有新任务添加时则选加入队列不直接创建线程 当队列满时,在有新任务时就创建新线程

threadFactory:使用ThreadFactory创建新线程默认使用defaultThreadFactory创建线程

handle:定义处理被拒绝任务的策略默认使用ThreadPoolExecutor.AbortPolicy,任务被拒绝时将抛出RejectExecutorException

newFixedThreadPool

创建可重用且固定线程数的线程池,如果线程池中的所有线程都处于活动状态,此时再提交任务就在队列中等待,直到有可用线程;如果线程池中的某个线程由于异常而结束时,线程池就会再补充一条新线程。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  //使用一个基于FIFO排序的阻塞队列在所有corePoolSize线程都忙时新任务将在队列中等待
                                  new LinkedBlockingQueue<Runnable>());
}

newSingleThreadExecutor

创建一个单线程的Executor如果该线程因为异常而结束就新建一条线程来继续执行后续的任务

public static ExecutorService newSingleThreadExecutor() {
   return new FinalizableDelegatedExecutorService
                     //corePoolSize和maximumPoolSize都等于表示固定线程池大小为1
                        (new ThreadPoolExecutor(1, 1,
                                                0L, TimeUnit.MILLISECONDS,
                                                new LinkedBlockingQueue<Runnable>()));
}

newScheduledThreadPool

创建一个可延迟执行或定期执行的线程池

/**
 * Creates a new {@code ScheduledThreadPoolExecutor} with the
 * given core pool size.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @throws IllegalArgumentException if {@code corePoolSize < 0}
 */
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue());
}

使用 scheduledThread 来模拟心跳:

public class HeartBeat {
    public static void main(String[] args) {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
        Runnable task = new Runnable() {
            public void run() {
                System.out.println("HeartBeat.........................");
            }
        };
        executor.scheduleAtFixedRate(task,5,3, TimeUnit.SECONDS);   //5秒后第一次执行之后每隔3秒执行一次
    }
}

newCachedThreadPool

创建可缓存的线程池如果线程池中的线程在60秒未被使用就将被移除在执行新的任务时当线程池中有之前创建的可用线程就重用可用线程否则就新建一条线程

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  //使用同步队列,将任务直接提交给线程
                                  new SynchronousQueue<Runnable>());
}

例子:

public class ThreadPoolTest {
    public static void main(String[] args) throws InterruptedException {
     ExecutorService threadPool = Executors.newCachedThreadPool();//线程池里面的线程数会动态变化,并可在线程线被移除前重用
        for (int i = 1; i <= 3; i ++) {
            final  int task = i;   //10个任务
            TimeUnit.SECONDS.sleep(1);
            threadPool.execute(new Runnable() {    //接受一个Runnable实例
                public void run() {
                        System.out.println("线程名字: " + Thread.currentThread().getName() +  "  任务名为: "+task);
                }
            });
        }
    }
}

输出为每个任务新建一条线程共创建了3条线程 线程名字: pool-1-thread-1 任务名为: 1 线程名字: pool-1-thread-2 任务名为: 2 线程名字: pool-1-thread-3 任务名为: 3 去掉第6行的注释其输出如下始终重复利用一条线程因为newCachedThreadPool能重用可用线程 线程名字: pool-1-thread-1 任务名为: 1 线程名字: pool-1-thread-1 任务名为: 2 线程名字: pool-1-thread-1 任务名为: 3

Future 接口

Future 接口和实现了Future接口的FutureTask类用来表示异步计算的结果。当我们把Runnable 或者 Callable 接口的实现类提交给ThreadPoolExecutor时Exector会返回给FutureTask对象

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor, 主要用来在给定的延迟后运行任务。 主要包含3个成员变量

  • long: time ; 表示这个任务将要被执行的具体时间
  • long: sequenceNumber; 表示这个任务被添加到SheduledThreadPoolExecutor 中的序号
  • longperiod; 表示任务执行的间隔周期

该类采用了DelyQueue封装了一个优先队列该队列会对队列中的SheduledFutureTask 进行排序。time小的会排在前面如果time相同则会比较sequenceNumber, 就是说如果两个任务的执行时间相同,谁先提交就谁先执行

参考文档