In the context of multiple producers and consumers, the same times of producing and consuming occur alternately. For example, four times of push, then four times of pop, then four times of push again, etc. It is evident that the max length minuses the min length of the queue is value of the same times.
package thread; import java.util.Random; import entity.Queue; public class Producer extends Thread { private Queue queue; public Producer(int id, Queue queue) { this.queue = queue; this.setName("producer-" + id); } @Override public void run() { while (true) { Random r = new Random(); int value = r.nextInt(10) + 100; queue.push(value); try { Thread.sleep(r.nextInt(3)*1000); }catch(Exception e){} } } }
package thread; import java.util.Random; import entity.Queue; public class Consumer extends Thread { private Queue queue; public Consumer(int id, Queue queue) { this.queue = queue; this.setName("consumer-" + id); } @Override public void run() { while (true) { queue.pop(); Random r = new Random(); try { Thread.sleep(r.nextInt(3)*1000); }catch(Exception e){} } } }
package entity; import java.util.ArrayList; import java.util.List; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Queue { private int maxLen; private int minLen; private boolean pushOp = true; private List<Integer> list = new ArrayList<Integer>(); private Lock lock = new ReentrantLock(); private Condition proCondition = lock.newCondition(); private Condition conCondition = lock.newCondition(); public Queue(int maxLen, int minLen) { this.maxLen = maxLen; this.minLen = minLen; } public void push(Integer value) { try { lock.lock(); while (list.size() == maxLen) { proCondition.await(); } if (pushOp) { list.add(value); System.out.println(Thread.currentThread().getName() + " push=" + value + ", List=" + list.toString()); if (list.size() == maxLen) { pushOp = false; conCondition.signalAll(); } } } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } public Integer pop() { Integer value = null; try { lock.lock(); while (list.size() == minLen) { conCondition.await(); } if (!pushOp) { value = list.get(0); list.remove(0); System.out.println(Thread.currentThread().getName() + " pop=" + value + ", List=" + list.toString()); if (list.size()==minLen) { pushOp = true; proCondition.signalAll(); } } } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } return value; } }
package app; import entity.Queue; import thread.Consumer; import thread.Producer; public class Main { public static void main(String[] args) throws InterruptedException{ Queue myQueue = new Queue(5, 1); // set the max & min length of the queue for (int i=0;i<10;i++) { // Fewer threads, such as 3, is ok too. new Producer(i+1, myQueue).start(); new Consumer(i+1, myQueue).start(); } } }
来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/32498/viewspace-2641352/,如需转载,请注明出处,否则将追究法律责任。