十一 20

1.      Executor接口
    Executor接口提供了一个类似于线程池的管理工具。用于只需要往Executor中提交Runnable对象,剩下的启动线程等工作,都会有对应的实现类来完成。ScheduledExecutorService比ExecutorService增加了,时间上的控制,即用户可以在提交的时候额外的定义该任务的启动时机,以及随后的执行间隔和延迟等。
    例子:
    任务:
    public class ETask implements Runnable{
         private int id = 0;
         public ETask(int id){
              this.id = id;
         }
         public void run(){
              try{
                  System.out.println(id+” Start”);
                  Thread.sleep(1000);
                  System.out.println(id+” Do”);
                  Thread.sleep(1000);
                  System.out.println(id+” Exit”);
             }catch(Exception e){
                  e.printStackTrace();
             }
         }
    }
   
    测试类:
    public class ETest{
         public static void main(String[] args){        
             ExecutorService executor = Executors.newFixedThreadPool(2);
             for(int i=0;i<5;i++){
                  Runnable r = new ETask(i);
                  executor.execute(r);
             }
             executor.shutdown();
         }
    }

    输出:
    0 Start
    1 Start
    0 Do
    1 Do
    0 Exit
    2 Start
    1 Exit
    3 Start
    2 Do
    3 Do
    2 Exit
    3 Exit
    4 Start
    4 Do
    4 Exit

2.      Future和Callable
    Callable是一个类似于Runnable的接口,他与Runnable的区别是,她在执行完毕之后能够返回结果。Future用于获取线程的执行结果,或者取消已向Executor的任务。当我们通过Future提供的get()方法获取任务的执行结果时,如果任务没有完成,则调用get()方法的线程将会被阻塞,知道任务完成为止。一般我们都会使用Future的实现类FutureTask。
    例子:
    Callable对象:
    public class ETask implements Callable{
         private String id = null;
         public ETask(String id){
              this.id = id;
         }
   
         public String call(){
             try{
                  System.out.println(id+” Start”);
                  Thread.sleep(1000);
                  System.out.println(id+” Do”);
                  Thread.sleep(1000);
                  System.out.println(id+” Exit”);          
             }catch(Exception e){
                  e.printStackTrace();
             }
             return id;
         }
    }

    测试类:
    public class ETest{
         public static void main(String[] args){        
             ExecutorService executor = Executors.newFixedThreadPool(2);
             for(int i=0;i<5;i++){            
                  try{
                      Callable c = new ETask(String.valueOf(i));
                       FutureTask ft = new FutureTask(c);
                       executor.execute(ft);
                       System.out.println(“Finish:” + ft.get());          
                  }catch(Exception e){
                      e.printStackTrace();
                  }
             }
              executor.shutdown();
         }
    }

    输出:
    0 Start
    0 Do
    0 Exit
    Finish:0
    1 Start
    1 Do
    1 Exit
    Finish:1
    2 Start
    …

3.      CompletionService和ExecutorCompletionService
    CompletionService类似于一个Executor和Queue的混合。我们可以通过submit()向CompletionService提交任务,然后通过poll()来获取第一个完成的任务,也可以通过take()来阻塞等待下一个完成的任务。ExecutorCompletionService是CompletionService的实现类,他需要提供一个Executor作为构造函数的参数。

    例子:
    Executor executor = …;
    CompletionService cs = new ExecutorCompletionService(executor);
    Future fs = cs.submit(…);
    Future ft = cs.take();

4.      Semaphore
    信号量是用于同步和互斥的低级原语。信号量提供的acquire()和release()操作,与操作系统上的p,v操作同。

    例子:
    缓冲区:
    public class Buffer{
         private Semaphore s = null;
         private Semaphore p = null;
         Vector v = new Vector();
         
         public Buffer(int capacity){
              s = new Semaphore(capacity);
             p = new Semaphore(0);
         }
   
         public void put(int i){
             try{
                  s.acquire();
                  v.add(new Integer(i));
                  p.release();
              }catch(Exception e){
                  e.printStackTrace();
             }
         }
   
         public int get(){  
              int i = 0;
             try{
                  p.acquire();
                  i = ((Integer)v.remove(0)).intValue();
                  s.release();
             }catch(Exception e){
                  e.printStackTrace();
             }
              return i;
         }    
    }


    生产者:
    public class Producer extends Thread{
         private Buffer b;
         private int count;
         private int step;
         private int id;

         public Producer(Buffer b,int step,int id){
              this.b =  b;
             this.step = step;
             this.id = id;
              count = 0;
         }
   
         public void run(){
             try{
                  while(true){
                      System.out.println(“In put”);
                       b.put(count);
                       System.out.println(“Producer “+id+”:”+count);
                       count++;
                      Thread.sleep(step);
                       System.out.println(“Out put”);
                  }
              }catch(Exception e){
                  e.printStackTrace();
             }
         }
    }

    消费者:
    public class Consumer extends Thread{
         private Buffer b;
         private int step;
         private int id;
   
         public Consumer(Buffer b,int step,int id){
             this.b = b;
              this.step = step;
             this.id = id;
         }
         
         public void run(){
             try{
                  while(true){
                       System.out.println(“In get”);
                      System.out.println(“\t\tConsume “+id+”:”+b.get());
                       System.out.println(“Out get”);
                       Thread.sleep(step);
                  }
              }catch(Exception e){
                  e.printStackTrace();
             }    
         }
    }

    测试程序:
    public class CPTest{
         public static void main(String[] args){
              Buffer b = new Buffer(3);
             Consumer c1 = new Consumer(b,1000,1);
             Consumer c2 = new Consumer(b,1000,2);
              Producer p1 = new Producer(b,100,1);
             Producer p2 = new Producer(b,100,2);
       
             c1.start();
              c2.start();
             p1.start();
             p2.start();
         }
    }

5.      CyclicBarrier
    CyclicBarrier可以让一组线程在某一个时间点上进行等待,当所有进程都到达该等待点后,再继续往下执行。CyclicBarrier使用完以后,通过调用reset()方法,可以重用该CyclicBarrier。线程通过调用await()来减少计数。
CyclicBarrier

    例子:
    任务:
    public class Task extends Thread{
         private String id;
         private CyclicBarrier c;
         private int time;
   
         public Task(CyclicBarrier c,String id,int time){
              this.c = c;
             this.id = id;
              this.time = time;
         }
   
         public void run(){
              try{
                  System.out.println(id+” Start”);
                 Thread.sleep(time);
                  System.out.println(id+” Finish”);
                  c.await();
                  System.out.println(id+” Exit”);          
              }catch(Exception e){
                  e.printStackTrace();
             }
         }    
    }

    测试类:
    public class Test{
         public static void main(String[] args){
             CyclicBarrier c = new CyclicBarrier(3,new Runnable(){
                  public void run(){
                       System.out.println(“All Work Done”);
                  }
             });
              Task t1 = new Task(c,”1″,1000);
             Task t2 = new Task(c,”2″,3000);
             Task t3 = new Task(c,”3″,5000);
              t1.start();
             t2.start();
             t3.start();        
         }
    }

    输出结果:
    1 Start
    2 Start
    3 Start
    1 Finish
    2 Finish
    3 Finish
    All Work Done
    3 Exit
    1 Exit
    2 Exit

6.      CountdownLatch
    CountdownLatch具有与CyclicBarrier相似的功能,也能让一组线程在某个点上进行同步。但是与CyclicBarrier不同的是:1.CountdownLatch不能重用,2.线程在CountdownLatch上调用await()操作一定会被阻塞,直到计数值为0时才会被唤醒,而且计数值只能通过conutDown()方法进行减少。

特别的,当CountdownLatch的值为1时,该Latch被称为“启动大门”,所有任务线程都在该Latch上await(),直到某个非任务线程调用countDown()触发,所有任务线程开始同时工作。

7.      Exchanger
    Exchanger是一个类似于计数值为2的CyclicBarrier。她允许两个线程在某个点上进行数据交换。
      例子:
    public class FillAndEmpty {
        Exchanger exchanger = new Exchanger();
        DataBuffer initialEmptyBuffer = … a made-up type
        DataBuffer initialFullBuffer = …

        public class FillingLoop implements Runnable {
             public void run() {
                  DataBuffer currentBuffer = initialEmptyBuffer;
                  try {
                      while (currentBuffer != null) {
                           addToBuffer(currentBuffer);
                           if (currentBuffer.full())
                                currentBuffer = exchanger.exchange(currentBuffer);
                      }
                  }catch(InterruptedException ex) { … handle … }
             }
        }

        public class EmptyingLoop implements Runnable {
             public void run() {
                  DataBuffer currentBuffer = initialFullBuffer;
                  try {
                      while (currentBuffer != null) {
                           takeFromBuffer(currentBuffer);
                           if (currentBuffer.empty())
                                currentBuffer = exchanger.exchange(currentBuffer);
                      }
                  } catch (InterruptedException ex) { … handle …}
             }
        }

        public void start() {
             new Thread(new FillingLoop()).start();
             new Thread(new EmptyingLoop()).start();
        }
    }

8.      Lock,Condition
    锁是最基本的同步原语。通过在锁上面调用lock()和unlock()操作,可以达到与synchronized关键字相似的效果,但是有一点要注意的是,锁必须显式释放,如果由于抛出异常,而没有释放锁,将导致死锁出现。Condition提供的await(),signal(),signal()操作,与原来的wai(),notify(),notifyAll()操作具有相似的含义。Lock的两个主要子类是ReentrantLock和ReadWriteLock。其中ReadWriteLock的作用是允许多人读,而一人写。

    例子:
    使用Lock和Condition的生产者,消费者问题
    public class BoundedBuffer {
        final Lock lock = new ReentrantLock();
        final Condition notFull  = lock.newCondition();
        final Condition notEmpty = lock.newCondition();
        final Object[] items = new Object[100];
        int putptr, takeptr, count;
       
        public void put(Object x) throws InterruptedException {
             lock.lock();
             try {
                  while (count == items.length)
                      notFull.await();
                  items[putptr] = x;
                  if (++putptr == items.length)
                       putptr = 0;
                  ++count;
                  notEmpty.signal();
             } finally {
                  lock.unlock();
              }
         }
   
         public Object take() throws InterruptedException {
              lock.lock();
             try {
                  while (count == 0)
                      notEmpty.await();
                  Object x = items[takeptr];
                  if (++takeptr == items.length)
                       takeptr = 0;
                  –count;
                  notFull.signal();
                  return x;
              } finally {
                  lock.unlock();
             }
         }
    }    

9.      小结:新的concurrent包提供了一个从低到高的同步操作。


Tags:

作者:Jock

Leave a Reply

You must be logged in to post a comment.

Switch to our mobile site