首页>>后端>>java->JAVA多线程之ArrayBlockingQueue看Condition的实现

JAVA多线程之ArrayBlockingQueue看Condition的实现

时间:2023-11-29 本站 点击:36

今天所要介绍的Condition,大家平时业务开发中可能少会用,一般会在框架或中间件比较常见,本文以JAVA中ArrayBlockingQueue中的Condition的使用以及实现原理。

ArrayBlockingQueue中Condition的属性定义和构造函数

finalReentrantLocklock;/**Conditionforwaitingtakes*///等待在非空privatefinalConditionnotEmpty;/**Conditionforwaitingputs*///等待队列没有满privatefinalConditionnotFull;

ArrayBlockingQueue构造函数,两个Condition都是从ReentrantLock的newConditon方法构造的ConditionObject对象,

publicArrayBlockingQueue(intcapacity,booleanfair){if(capacity<=0)thrownewIllegalArgumentException();this.items=newObject[capacity];lock=newReentrantLock(fair);notEmpty=lock.newCondition();notFull=lock.newCondition();}

通过看ReentrantLock的newCondition方法可以看到实际Condition实现类就是ConditionObject,这就是今天要分析的重点对象.

finalConditionObjectnewCondition(){returnnewConditionObject();}

ConditionObject类结构

publicclassConditionObjectimplementsCondition,java.io.Serializable{privatestaticfinallongserialVersionUID=1173984872572414699L;/**Firstnodeofconditionqueue.*/\//头节点privatetransientConditionNodefirstWaiter;/**Lastnodeofconditionqueue.*///尾节点privatetransientConditionNodelastWaiter;/***Createsanew{@codeConditionObject}instance.*/publicConditionObject(){}

ArrayBlockingQueue的notfull和notEmpty的使用如下1 notFull是放入队列前,如果队列满了,则执行notFull.await,让线程等待, 在元素入队后,通过调用notEmpty.signal唤醒消息者线程.2.notEmpty是消息队列时,如果队列为空,则执行notEmpty.await让消费者线程等待,元素消费者消息完后,执行notFull.signal唤醒等待的生产者线程去往队列放元素.

publicvoidput(Ee)throwsInterruptedException{checkNotNull(e);finalReentrantLocklock=this.lock;lock.lockInterruptibly();try{while(count==items.length)notFull.await();enqueue(e);}finally{lock.unlock();}}privatevoidenqueue(Ex){//assertlock.getHoldCount()==1;//assertitems[putIndex]==null;finalObject[]items=this.items;items[putIndex]=x;if(++putIndex==items.length)putIndex=0;count++;notEmpty.signal();}publicEtake()throwsInterruptedException{finalReentrantLocklock=this.lock;lock.lockInterruptibly();try{while(count==0)notEmpty.await();returndequeue();}finally{lock.unlock();}}privateEdequeue(){//assertlock.getHoldCount()==1;//assertitems[takeIndex]!=null;finalObject[]items=this.items;@SuppressWarnings("unchecked")Ex=(E)items[takeIndex];items[takeIndex]=null;if(++takeIndex==items.length)takeIndex=0;count--;if(itrs!=null)itrs.elementDequeued();notFull.signal();returnx;}

下面我们着重分析下ConditionObject中await和signal是如何实现的。

ConditionObject中await实现

主要步骤:1.创建一个新建等待节点加入条件等待队列2.释放所有锁(即重置state变量为0)3.判断node是否在CLH双端等待队列中,如果不在则直接LockSupport.park阻塞线程,r然后检查是否中断4.如果CLH的等待队列中,则通过CLH队列中获取锁,5.如果node的nextWaiter不为空,则表示有新的Condtion等待节点,则Conditon的单向队列中移除取消状态的节点6.判断是否需要中断或者抛出异常

publicfinalvoidawait()throwsInterruptedException{if(Thread.interrupted())thrownewInterruptedException();//加入条件等待队列尾部Nodenode=addConditionWaiter();//全部释放锁longsavedState=fullyRelease(node);intinterruptMode=0;//是否在条件等待队列中while(!isOnSyncQueue(node)){LockSupport.park(this);if((interruptMode=checkInterruptWhileWaiting(node))!=0)break;}//获取等待队列if(acquireQueued(node,savedState)&&interruptMode!=THROW_IE)interruptMode=REINTERRUPT;//判断当前node下一个等待不为空,说明已经是在等待队列中,//则清除取消状态的等待节点if(node.nextWaiter!=null)//cleanupifcancelledunlinkCancelledWaiters();//说明是处于中断if(interruptMode!=0)reportInterruptAfterWait(interruptMode);}

如果最后的等待Node状态不是Node.Condition,则需要将取消的节点从等待队列中去掉,然后创建一个Node,参数中线程是当前线程,Node的状态是Node.Condition,加入到队列的尾部。

privateNodeaddConditionWaiter(){Nodet=lastWaiter;//IflastWaiteriscancelled,cleanout.if(t!=null&&t.waitStatus!=Node.CONDITION){unlinkCancelledWaiters();t=lastWaiter;}Nodenode=newNode(Thread.currentThread(),Node.CONDITION);if(t==null)firstWaiter=node;elset.nextWaiter=node;lastWaiter=node;returnnode;}

完成释放锁的操作,从这里可以看出,执行await操作是会释放锁的,,首先获取state的状态值,然后重置为0,成为无锁状态.返回有锁之前的state的值, 如果失败,则重置Node的waitStatus状态为CANCELLED状态

finallongfullyRelease(Nodenode){booleanfailed=true;try{longsavedState=getState();if(release(savedState)){failed=false;returnsavedState;}else{thrownewIllegalMonitorStateException();}}finally{if(failed)node.waitStatus=Node.CANCELLED;}}

等待队列中的节点去争抢锁,这个之前在ReentrantLock中分析过,这里就不再赘述,

finalbooleanacquireQueued(finalNodenode,longarg){booleanfailed=true;try{booleaninterrupted=false;for(;;){//获取新建node的前驱finalNodep=node.predecessor();//如果前驱是head,并且tryAcquire或if(p==head&&tryAcquire(arg)){setHead(node);p.next=null;//helpGCfailed=false;returninterrupted;}if(shouldParkAfterFailedAcquire(p,node)&&parkAndCheckInterrupt())interrupted=true;}}finally{if(failed)cancelAcquire(node);}}

ConditionObject中signal实现

ConditionObect的signal去唤醒firstWaiter(即Condition等待队列中第一个节点)

publicfinalvoidsignal(){if(!isHeldExclusively())thrownewIllegalMonitorStateException();Nodefirst=firstWaiter;if(first!=null)doSignal(first);}

doSignal是从firstWaiter开始循环,唤醒第一个没有被取消的等待节点

publicArrayBlockingQueue(intcapacity,booleanfair){if(capacity<=0)thrownewIllegalArgumentException();this.items=newObject[capacity];lock=newReentrantLock(fair);notEmpty=lock.newCondition();notFull=lock.newCondition();}0

transferForSignal则是将等待的node的waitStatus重置为0,然后从Condition队列中转移加入CLH的双端等待队列。

publicArrayBlockingQueue(intcapacity,booleanfair){if(capacity<=0)thrownewIllegalArgumentException();this.items=newObject[capacity];lock=newReentrantLock(fair);notEmpty=lock.newCondition();notFull=lock.newCondition();}1

下图是表示从Condition的等待队列中,会转移到CLH的等待对队列中.

总结

今天介绍的ConditonObject主要是条件等待队列,可以创建多个条件等待队列,ReentrantLock是只有CLH的双向队列,都是在双向队列上进行的等待,相比之下,Condition控制更加灵活,控制粒度更细。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/java/35.html