User Manual, Developers Guide and API Documentation

QueueProxy.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002  * This file is part of openWNS (open Wireless Network Simulator)
00003  * _____________________________________________________________________________
00004  *
00005  * Copyright (C) 2004-2009
00006  * Chair of Communication Networks (ComNets)
00007  * Kopernikusstr. 5, D-52074 Aachen, Germany
00008  * phone: ++49-241-80-27910,
00009  * fax: ++49-241-80-22242
00010  * email: info@openwns.org
00011  * www: http://www.openwns.org
00012  * _____________________________________________________________________________
00013  *
00014  * openWNS is free software; you can redistribute it and/or modify it under the
00015  * terms of the GNU Lesser General Public License version 2 as published by the
00016  * Free Software Foundation;
00017  *
00018  * openWNS is distributed in the hope that it will be useful, but WITHOUT ANY
00019  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
00020  * A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00021  * details.
00022  *
00023  * You should have received a copy of the GNU Lesser General Public License
00024  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
00025  *
00026  ******************************************************************************/
00027 
00028 #include <WNS/scheduler/queue/QueueProxy.hpp>
00029 #include <WNS/ldk/Layer.hpp>
00030 
00031 using namespace wns::scheduler::queue;
00032 
00033 STATIC_FACTORY_REGISTER_WITH_CREATOR(QueueProxy,
00034                                      wns::scheduler::queue::QueueInterface,
00035                                      "wns.scheduler.queue.QueueProxy",
00036                                      wns::HasReceptorConfigCreator);
00037 
00038 QueueProxy::QueueProxy(wns::ldk::HasReceptorInterface*, const wns::pyconfig::View& _config) :     
00039     queueManagerServiceName_(_config.get<std::string>("queueManagerServiceName")),
00040     supportsDynamicSegmentation_(_config.get<bool>("supportsDynamicSegmentation")),
00041     copyQueue_(NULL),
00042     logger_(_config.get("logger")),
00043     myFUN_(NULL)
00044 {
00045     MESSAGE_BEGIN(NORMAL, logger_, m, "QueueProxy");
00046     m << " Created ";
00047     m << " QueueProxy Queue using QueueManagerService ";
00048     m << queueManagerServiceName_;
00049     MESSAGE_END();
00050 
00051     if(supportsDynamicSegmentation_)
00052     {
00053         copyQueue_ = new detail::SegmentingInnerCopyQueue(_config.get("segmentingQueueConfig"));
00054     }
00055     else
00056     {
00057         copyQueue_ = new detail::SimpleInnerCopyQueue();
00058     }
00059 
00060 }
00061 
00062 QueueProxy::~QueueProxy()
00063 {
00064     assure(copyQueue_ != NULL, "Want to delete copyQueue_ but it is NULL");
00065     delete copyQueue_;
00066 }
00067 
00068 void QueueProxy::setFUN(wns::ldk::fun::FUN* fun)
00069 {
00070     myFUN_ = fun;
00071     colleagues.queueManager_ = fun->getLayer()->
00072             getManagementService<wns::scheduler::queue::IQueueManager>(
00073                 queueManagerServiceName_);
00074     assure(colleagues.queueManager_, "QueueProxy needs a QueueManager");
00075 
00076     copyQueue_->setFUN(myFUN_);
00077 
00078     MESSAGE_BEGIN(NORMAL, logger_, m, myFUN_->getName());
00079     m << " Received valid FUN pointer and QueueManagerService ";
00080     m << queueManagerServiceName_;
00081     MESSAGE_END();
00082 }
00083 
00084 bool QueueProxy::isAccepting(const wns::ldk::CompoundPtr&  compound ) const 
00085 {
00086     return false;
00087 }
00088 
00089 void
00090 QueueProxy::put(const wns::ldk::CompoundPtr& compound) 
00091 {
00092     assure(false, "Put called for readOnly QueueProxy");
00093 }
00094 
00095 wns::scheduler::UserSet
00096 QueueProxy::getQueuedUsers() const 
00097 {
00098     wns::scheduler::UserSet us;
00099 
00100     QueueContainer queues = colleagues.queueManager_->getAllQueues();    
00101 
00102     QueueContainer::const_iterator it;
00103     for(it = queues.begin(); it != queues.end(); it++)
00104     {
00105         startCollectionIfNeeded(it->first);
00106 
00107         wns::scheduler::ConnectionSet innerCs;
00108 
00109         // We have to ask our RegistryProxy to map the CID to a UserID!
00110         innerCs = it->second->getActiveConnections();
00111         wns::scheduler::ConnectionSet::iterator iit;
00112         
00113         for(iit = innerCs.begin(); iit != innerCs.end(); iit++)
00114             us.insert(colleagues.registry_->getUserForCID(*iit));
00115     }
00116     return us;
00117 
00118 }
00119 
00120 wns::scheduler::ConnectionSet
00121 QueueProxy::getActiveConnections() const
00122 {
00123     wns::scheduler::ConnectionSet cs;
00124 
00125     QueueContainer queues = colleagues.queueManager_->getAllQueues();
00126 
00127     QueueContainer::iterator it;
00128     for(it = queues.begin(); it != queues.end(); it++)
00129     {
00130         startCollectionIfNeeded(it->first);
00131 
00132         wns::scheduler::ConnectionSet innerCs;
00133         innerCs = it->second->getActiveConnections();
00134         wns::scheduler::ConnectionSet::iterator iit;
00135         
00136         for(iit = innerCs.begin(); iit != innerCs.end(); iit++)
00137             cs.insert(*iit);
00138     }
00139     return cs;
00140 }
00141 
00142 unsigned long int
00143 QueueProxy::numCompoundsForCid(wns::scheduler::ConnectionID cid) const
00144 {
00145     assure(colleagues.queueManager_->getQueue(cid) != NULL, "No queue for this CID");
00146 
00147     if(!copyQueue_->knowsCID(cid))
00148     {
00149         startCollectionIfNeeded(cid);
00150         return colleagues.queueManager_->getQueue(cid)->numCompoundsForCid(cid);
00151     }
00152     else
00153     {
00154         return copyQueue_->getSize(cid);
00155     }
00156 }
00157 
00158 unsigned long int
00159 QueueProxy::numBitsForCid(wns::scheduler::ConnectionID cid) const
00160 {
00161     assure(colleagues.queueManager_->getQueue(cid) != NULL, "No queue for this CID");
00162 
00163     if(!copyQueue_->knowsCID(cid))
00164     {
00165         startCollectionIfNeeded(cid);
00166         return colleagues.queueManager_->getQueue(cid)->numBitsForCid(cid);
00167     }
00168     else
00169     {
00170         return copyQueue_->getSizeInBit(cid);
00171     }
00172 }
00173 
00174 wns::scheduler::QueueStatusContainer
00175 QueueProxy::getQueueStatus() const
00176 {
00177     wns::scheduler::QueueStatusContainer csc;
00178 
00179     QueueContainer queues = colleagues.queueManager_->getAllQueues();    
00180 
00181     QueueContainer::iterator it;
00182     for(it = queues.begin(); it != queues.end(); it++)
00183     {
00184         startCollectionIfNeeded(it->first);
00185 
00186         wns::scheduler::QueueStatusContainer innerCsc;
00187         innerCsc = it->second->getQueueStatus();
00188         wns::scheduler::QueueStatusContainer::const_iterator iit;
00189 
00190         for(iit = innerCsc.begin(); iit != innerCsc.end(); iit++)
00191             csc.insert(iit->first, iit->second);
00192     }
00193     return csc;
00194 }
00195 
00196 wns::ldk::CompoundPtr
00197 QueueProxy::getHeadOfLinePDU(wns::scheduler::ConnectionID cid) 
00198 {        
00199     assure(!copyQueue_->isEmpty(cid), "Requested PDU from emty queue");
00200     
00201     wns::ldk::CompoundPtr pdu = copyQueue_->getPDU(cid);        
00202     return pdu;
00203 }
00204 
00205 int
00206 QueueProxy::getHeadOfLinePDUbits(wns::scheduler::ConnectionID cid)
00207 {
00208     assure(hasQueue(cid), "No queue for this CID");
00209 
00210     if(!copyQueue_->knowsCID(cid))
00211     {
00212         startCollectionIfNeeded(cid);
00213         return colleagues.queueManager_->getQueue(cid)->getHeadOfLinePDUbits(cid);
00214     }
00215     else
00216     {
00217         return copyQueue_->getHeadofLinePDUBit(cid);
00218     }
00219 }
00220 
00221 bool
00222 QueueProxy::isEmpty() const
00223 {
00224     QueueContainer queues = colleagues.queueManager_->getAllQueues();    
00225     QueueContainer::iterator it;
00226     for(it = queues.begin(); it != queues.end(); it++)
00227     {
00228         if(queueHasPDUs(it->first))
00229             return false;
00230     }
00231     return true;
00232 }
00233 
00234 bool
00235 QueueProxy::hasQueue(wns::scheduler::ConnectionID cid)
00236 {
00237     wns::scheduler::queue::QueueInterface* queue;
00238     queue = colleagues.queueManager_->getQueue(cid);
00239 
00240     return (queue == NULL)?false:true;
00241 }
00242 
00243 bool
00244 QueueProxy::queueHasPDUs(wns::scheduler::ConnectionID cid) const 
00245 {
00246     wns::scheduler::queue::QueueInterface* queue;
00247     queue = colleagues.queueManager_->getQueue(cid);
00248 
00249     if(queue != NULL)
00250     {
00251         createQueueCopyIfNeeded(cid);
00252 
00253         if(copyQueue_->knowsCID(cid))
00254         {
00255             if(copyQueue_->isEmpty(cid)) 
00256             {
00257                 MESSAGE_BEGIN(NORMAL, logger_, m, myFUN_->getName());
00258                 m << " queueHasPDUs: CopyQueue for CID " << cid << " is empty.";
00259                 MESSAGE_END();
00260 
00261                 return false;
00262             }
00263             else
00264             {
00265                 return true;
00266             }
00267         }
00268         else
00269         {
00270             MESSAGE_BEGIN(NORMAL, logger_, m, myFUN_->getName());
00271             m << " queueHasPDUs: Passing call for  CID " << cid << " to real queue.";
00272             MESSAGE_END();
00273 
00274             return queue->queueHasPDUs(cid);
00275         }
00276     }
00277     else
00278     {
00279         return false;
00280     }
00281 }
00282 
00283 wns::scheduler::ConnectionSet
00284 QueueProxy::filterQueuedCids(wns::scheduler::ConnectionSet connections) 
00285 {
00286     colleagues.queueManager_->getAllQueues();    
00287 }
00288 
00289 void
00290 QueueProxy::setColleagues(wns::scheduler::RegistryProxyInterface* registry) 
00291 {
00292     colleagues.registry_ = registry;
00293 }
00294 
00295 wns::scheduler::queue::QueueInterface::ProbeOutput
00296 QueueProxy::resetAllQueues()
00297 {
00298     wns::scheduler::queue::QueueInterface::ProbeOutput po;
00299 
00300     QueueContainer queues = colleagues.queueManager_->getAllQueues();    
00301 
00302     QueueContainer::iterator it;
00303     for(it = queues.begin(); it != queues.end(); it++)
00304     {
00305         wns::scheduler::queue::QueueInterface::ProbeOutput innerPo;
00306         innerPo = it->second->resetAllQueues();
00307         po.bits += innerPo.bits;
00308         po.compounds += innerPo.compounds;
00309 
00310         // Empty the copyQueue but do not count for statistics
00311         if(copyQueue_->knowsCID(it->first))
00312             copyQueue_->reset(it->first);
00313     }
00314     return po;   
00315 }
00316 
00317 wns::scheduler::queue::QueueInterface::ProbeOutput
00318 QueueProxy::resetQueues(wns::scheduler::UserID _user)
00319 {
00320     assure(false, "Not implemeted, use request with CID instead");
00321 }
00322 
00323 wns::scheduler::queue::QueueInterface::ProbeOutput
00324 QueueProxy::resetQueue(wns::scheduler::ConnectionID cid)
00325 {
00326     assure(hasQueue(cid), "No queue for this CID");
00327 
00328     // Empty the copyQueue but do not count for statistics
00329     if(copyQueue_->knowsCID(cid))
00330         copyQueue_->reset(cid);
00331 
00332     return colleagues.queueManager_->getQueue(cid)->resetQueue(cid);    
00333 }
00334 
00335 void
00336 QueueProxy::frameStarts()
00337 {
00338 }
00339 
00340 std::string
00341 QueueProxy::printAllQueues()
00342 {
00343     std::stringstream s;
00344     
00345     QueueContainer queues = colleagues.queueManager_->getAllQueues();    
00346 
00347     QueueContainer::iterator it;
00348     for(it = queues.begin(); it != queues.end(); it++)
00349     {    
00350         startCollectionIfNeeded(it->first);
00351         s << it->second->printAllQueues() << "\n";
00352     }
00353     return s.str();
00354 }
00355 
00356 bool
00357 QueueProxy::supportsDynamicSegmentation() const
00358 {
00359     return supportsDynamicSegmentation_;   
00360 }
00361 
00362 wns::ldk::CompoundPtr 
00363 QueueProxy::getHeadOfLinePDUSegment(wns::scheduler::ConnectionID cid, int bits)
00364 {
00365     assure(supportsDynamicSegmentation_, "Dynamic segmentation not supported");
00366     assure(!copyQueue_->isEmpty(cid), "Requested PDU from emty queue");
00367     
00368     wns::ldk::CompoundPtr pdu = copyQueue_->getPDU(cid, bits);        
00369     return pdu;
00370 }
00371 
00372 int 
00373 QueueProxy::getMinimumSegmentSize() const
00374 {
00375     assure(supportsDynamicSegmentation_, "Dynamic segmentation not supported");
00376     detail::SegmentingInnerCopyQueue* q;
00377     q = dynamic_cast<detail::SegmentingInnerCopyQueue*>(copyQueue_);
00378 
00379     return q->getMinimumSegmentSize();
00380 }
00381 
00382 void
00383 QueueProxy::createQueueCopyIfNeeded(wns::scheduler::ConnectionID cid) const
00384 {
00385     wns::simulator::Time now = wns::simulator::getEventScheduler()->getTime();
00386 
00387     wns::scheduler::queue::QueueInterface* queue;
00388     queue = colleagues.queueManager_->getQueue(cid);
00389 
00390     // New round, create new PDUs in copyQueue
00391     if(queue != NULL &&  
00392         (lastChecked_.find(cid) == lastChecked_.end() || lastChecked_[cid] != now))
00393     {
00394         lastChecked_[cid] = now;
00395 
00396         // Empty the old copy queue
00397         if(copyQueue_->knowsCID(cid))
00398         {
00399             MESSAGE_BEGIN(NORMAL, logger_, m, myFUN_->getName());
00400             m << " Removing " << copyQueue_->getSize(cid) << " PDUs form old copyQueue ";
00401             m << " for CID " << cid;
00402             MESSAGE_END();
00403 
00404             copyQueue_->reset(cid);
00405         }
00406         
00407         startCollectionIfNeeded(cid);
00408 
00409         if(!queue->queueHasPDUs(cid))
00410         {
00411             MESSAGE_BEGIN(NORMAL, logger_, m, myFUN_->getName());
00412             m << " Real queue is empty for CID: ";
00413             m << cid;
00414             MESSAGE_END();
00415             return;
00416         }
00417 
00418         copyQueue_->setQueue(cid, queue->getQueueCopy(cid));
00419 
00420         MESSAGE_BEGIN(NORMAL, logger_, m, myFUN_->getName());
00421         m << " Created a copy of " << copyQueue_->getSize(cid) << " PDUs for CID ";
00422         m << cid;
00423         MESSAGE_END();
00424 
00425     }
00426 }
00427 
00428 std::queue<wns::ldk::CompoundPtr> 
00429 QueueProxy::getQueueCopy(ConnectionID cid)
00430 { 
00431     wns::Exception("You should not call getQueueCopy of the QueueProxy."); 
00432 }
00433 
00434 void
00435 QueueProxy::startCollectionIfNeeded(wns::scheduler::ConnectionID cid) const
00436 {
00437     wns::simulator::Time now = wns::simulator::getEventScheduler()->getTime();
00438 
00439     if(lastCollected_.find(cid) == lastCollected_.end() || lastCollected_[cid] != now)
00440     {
00441         lastCollected_[cid] = now;
00442         colleagues.queueManager_->startCollection(cid);
00443     }    
00444 }
00445 

Generated on Sun May 27 03:31:53 2012 for openWNS by  doxygen 1.5.5