![]() |
User Manual, Developers Guide and API Documentation |
![]() |
00001 /******************************************************************************* 00002 * This file is part of openWNS (open Wireless Network Simulator) 00003 * _____________________________________________________________________________ 00004 * 00005 * Copyright (C) 2004-2009 00006 * Chair of Communication Networks (ComNets) 00007 * Kopernikusstr. 5, D-52074 Aachen, Germany 00008 * phone: ++49-241-80-27910, 00009 * fax: ++49-241-80-22242 00010 * email: info@openwns.org 00011 * www: http://www.openwns.org 00012 * _____________________________________________________________________________ 00013 * 00014 * openWNS is free software; you can redistribute it and/or modify it under the 00015 * terms of the GNU Lesser General Public License version 2 as published by the 00016 * Free Software Foundation; 00017 * 00018 * openWNS is distributed in the hope that it will be useful, but WITHOUT ANY 00019 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 00020 * A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 00021 * details. 00022 * 00023 * You should have received a copy of the GNU Lesser General Public License 00024 * along with this program. If not, see <http://www.gnu.org/licenses/>. 00025 * 00026 ******************************************************************************/ 00027 00028 #include <WNS/scheduler/queue/QueueProxy.hpp> 00029 #include <WNS/ldk/Layer.hpp> 00030 00031 using namespace wns::scheduler::queue; 00032 00033 STATIC_FACTORY_REGISTER_WITH_CREATOR(QueueProxy, 00034 wns::scheduler::queue::QueueInterface, 00035 "wns.scheduler.queue.QueueProxy", 00036 wns::HasReceptorConfigCreator); 00037 00038 QueueProxy::QueueProxy(wns::ldk::HasReceptorInterface*, const wns::pyconfig::View& _config) : 00039 queueManagerServiceName_(_config.get<std::string>("queueManagerServiceName")), 00040 supportsDynamicSegmentation_(_config.get<bool>("supportsDynamicSegmentation")), 00041 copyQueue_(NULL), 00042 logger_(_config.get("logger")), 00043 myFUN_(NULL) 00044 { 00045 MESSAGE_BEGIN(NORMAL, logger_, m, "QueueProxy"); 00046 m << " Created "; 00047 m << " QueueProxy Queue using QueueManagerService "; 00048 m << queueManagerServiceName_; 00049 MESSAGE_END(); 00050 00051 if(supportsDynamicSegmentation_) 00052 { 00053 copyQueue_ = new detail::SegmentingInnerCopyQueue(_config.get("segmentingQueueConfig")); 00054 } 00055 else 00056 { 00057 copyQueue_ = new detail::SimpleInnerCopyQueue(); 00058 } 00059 00060 } 00061 00062 QueueProxy::~QueueProxy() 00063 { 00064 assure(copyQueue_ != NULL, "Want to delete copyQueue_ but it is NULL"); 00065 delete copyQueue_; 00066 } 00067 00068 void QueueProxy::setFUN(wns::ldk::fun::FUN* fun) 00069 { 00070 myFUN_ = fun; 00071 colleagues.queueManager_ = fun->getLayer()-> 00072 getManagementService<wns::scheduler::queue::IQueueManager>( 00073 queueManagerServiceName_); 00074 assure(colleagues.queueManager_, "QueueProxy needs a QueueManager"); 00075 00076 copyQueue_->setFUN(myFUN_); 00077 00078 MESSAGE_BEGIN(NORMAL, logger_, m, myFUN_->getName()); 00079 m << " Received valid FUN pointer and QueueManagerService "; 00080 m << queueManagerServiceName_; 00081 MESSAGE_END(); 00082 } 00083 00084 bool QueueProxy::isAccepting(const wns::ldk::CompoundPtr& compound ) const 00085 { 00086 return false; 00087 } 00088 00089 void 00090 QueueProxy::put(const wns::ldk::CompoundPtr& compound) 00091 { 00092 assure(false, "Put called for readOnly QueueProxy"); 00093 } 00094 00095 wns::scheduler::UserSet 00096 QueueProxy::getQueuedUsers() const 00097 { 00098 wns::scheduler::UserSet us; 00099 00100 QueueContainer queues = colleagues.queueManager_->getAllQueues(); 00101 00102 QueueContainer::const_iterator it; 00103 for(it = queues.begin(); it != queues.end(); it++) 00104 { 00105 startCollectionIfNeeded(it->first); 00106 00107 wns::scheduler::ConnectionSet innerCs; 00108 00109 // We have to ask our RegistryProxy to map the CID to a UserID! 00110 innerCs = it->second->getActiveConnections(); 00111 wns::scheduler::ConnectionSet::iterator iit; 00112 00113 for(iit = innerCs.begin(); iit != innerCs.end(); iit++) 00114 us.insert(colleagues.registry_->getUserForCID(*iit)); 00115 } 00116 return us; 00117 00118 } 00119 00120 wns::scheduler::ConnectionSet 00121 QueueProxy::getActiveConnections() const 00122 { 00123 wns::scheduler::ConnectionSet cs; 00124 00125 QueueContainer queues = colleagues.queueManager_->getAllQueues(); 00126 00127 QueueContainer::iterator it; 00128 for(it = queues.begin(); it != queues.end(); it++) 00129 { 00130 startCollectionIfNeeded(it->first); 00131 00132 wns::scheduler::ConnectionSet innerCs; 00133 innerCs = it->second->getActiveConnections(); 00134 wns::scheduler::ConnectionSet::iterator iit; 00135 00136 for(iit = innerCs.begin(); iit != innerCs.end(); iit++) 00137 cs.insert(*iit); 00138 } 00139 return cs; 00140 } 00141 00142 unsigned long int 00143 QueueProxy::numCompoundsForCid(wns::scheduler::ConnectionID cid) const 00144 { 00145 assure(colleagues.queueManager_->getQueue(cid) != NULL, "No queue for this CID"); 00146 00147 if(!copyQueue_->knowsCID(cid)) 00148 { 00149 startCollectionIfNeeded(cid); 00150 return colleagues.queueManager_->getQueue(cid)->numCompoundsForCid(cid); 00151 } 00152 else 00153 { 00154 return copyQueue_->getSize(cid); 00155 } 00156 } 00157 00158 unsigned long int 00159 QueueProxy::numBitsForCid(wns::scheduler::ConnectionID cid) const 00160 { 00161 assure(colleagues.queueManager_->getQueue(cid) != NULL, "No queue for this CID"); 00162 00163 if(!copyQueue_->knowsCID(cid)) 00164 { 00165 startCollectionIfNeeded(cid); 00166 return colleagues.queueManager_->getQueue(cid)->numBitsForCid(cid); 00167 } 00168 else 00169 { 00170 return copyQueue_->getSizeInBit(cid); 00171 } 00172 } 00173 00174 wns::scheduler::QueueStatusContainer 00175 QueueProxy::getQueueStatus() const 00176 { 00177 wns::scheduler::QueueStatusContainer csc; 00178 00179 QueueContainer queues = colleagues.queueManager_->getAllQueues(); 00180 00181 QueueContainer::iterator it; 00182 for(it = queues.begin(); it != queues.end(); it++) 00183 { 00184 startCollectionIfNeeded(it->first); 00185 00186 wns::scheduler::QueueStatusContainer innerCsc; 00187 innerCsc = it->second->getQueueStatus(); 00188 wns::scheduler::QueueStatusContainer::const_iterator iit; 00189 00190 for(iit = innerCsc.begin(); iit != innerCsc.end(); iit++) 00191 csc.insert(iit->first, iit->second); 00192 } 00193 return csc; 00194 } 00195 00196 wns::ldk::CompoundPtr 00197 QueueProxy::getHeadOfLinePDU(wns::scheduler::ConnectionID cid) 00198 { 00199 assure(!copyQueue_->isEmpty(cid), "Requested PDU from emty queue"); 00200 00201 wns::ldk::CompoundPtr pdu = copyQueue_->getPDU(cid); 00202 return pdu; 00203 } 00204 00205 int 00206 QueueProxy::getHeadOfLinePDUbits(wns::scheduler::ConnectionID cid) 00207 { 00208 assure(hasQueue(cid), "No queue for this CID"); 00209 00210 if(!copyQueue_->knowsCID(cid)) 00211 { 00212 startCollectionIfNeeded(cid); 00213 return colleagues.queueManager_->getQueue(cid)->getHeadOfLinePDUbits(cid); 00214 } 00215 else 00216 { 00217 return copyQueue_->getHeadofLinePDUBit(cid); 00218 } 00219 } 00220 00221 bool 00222 QueueProxy::isEmpty() const 00223 { 00224 QueueContainer queues = colleagues.queueManager_->getAllQueues(); 00225 QueueContainer::iterator it; 00226 for(it = queues.begin(); it != queues.end(); it++) 00227 { 00228 if(queueHasPDUs(it->first)) 00229 return false; 00230 } 00231 return true; 00232 } 00233 00234 bool 00235 QueueProxy::hasQueue(wns::scheduler::ConnectionID cid) 00236 { 00237 wns::scheduler::queue::QueueInterface* queue; 00238 queue = colleagues.queueManager_->getQueue(cid); 00239 00240 return (queue == NULL)?false:true; 00241 } 00242 00243 bool 00244 QueueProxy::queueHasPDUs(wns::scheduler::ConnectionID cid) const 00245 { 00246 wns::scheduler::queue::QueueInterface* queue; 00247 queue = colleagues.queueManager_->getQueue(cid); 00248 00249 if(queue != NULL) 00250 { 00251 createQueueCopyIfNeeded(cid); 00252 00253 if(copyQueue_->knowsCID(cid)) 00254 { 00255 if(copyQueue_->isEmpty(cid)) 00256 { 00257 MESSAGE_BEGIN(NORMAL, logger_, m, myFUN_->getName()); 00258 m << " queueHasPDUs: CopyQueue for CID " << cid << " is empty."; 00259 MESSAGE_END(); 00260 00261 return false; 00262 } 00263 else 00264 { 00265 return true; 00266 } 00267 } 00268 else 00269 { 00270 MESSAGE_BEGIN(NORMAL, logger_, m, myFUN_->getName()); 00271 m << " queueHasPDUs: Passing call for CID " << cid << " to real queue."; 00272 MESSAGE_END(); 00273 00274 return queue->queueHasPDUs(cid); 00275 } 00276 } 00277 else 00278 { 00279 return false; 00280 } 00281 } 00282 00283 wns::scheduler::ConnectionSet 00284 QueueProxy::filterQueuedCids(wns::scheduler::ConnectionSet connections) 00285 { 00286 colleagues.queueManager_->getAllQueues(); 00287 } 00288 00289 void 00290 QueueProxy::setColleagues(wns::scheduler::RegistryProxyInterface* registry) 00291 { 00292 colleagues.registry_ = registry; 00293 } 00294 00295 wns::scheduler::queue::QueueInterface::ProbeOutput 00296 QueueProxy::resetAllQueues() 00297 { 00298 wns::scheduler::queue::QueueInterface::ProbeOutput po; 00299 00300 QueueContainer queues = colleagues.queueManager_->getAllQueues(); 00301 00302 QueueContainer::iterator it; 00303 for(it = queues.begin(); it != queues.end(); it++) 00304 { 00305 wns::scheduler::queue::QueueInterface::ProbeOutput innerPo; 00306 innerPo = it->second->resetAllQueues(); 00307 po.bits += innerPo.bits; 00308 po.compounds += innerPo.compounds; 00309 00310 // Empty the copyQueue but do not count for statistics 00311 if(copyQueue_->knowsCID(it->first)) 00312 copyQueue_->reset(it->first); 00313 } 00314 return po; 00315 } 00316 00317 wns::scheduler::queue::QueueInterface::ProbeOutput 00318 QueueProxy::resetQueues(wns::scheduler::UserID _user) 00319 { 00320 assure(false, "Not implemeted, use request with CID instead"); 00321 } 00322 00323 wns::scheduler::queue::QueueInterface::ProbeOutput 00324 QueueProxy::resetQueue(wns::scheduler::ConnectionID cid) 00325 { 00326 assure(hasQueue(cid), "No queue for this CID"); 00327 00328 // Empty the copyQueue but do not count for statistics 00329 if(copyQueue_->knowsCID(cid)) 00330 copyQueue_->reset(cid); 00331 00332 return colleagues.queueManager_->getQueue(cid)->resetQueue(cid); 00333 } 00334 00335 void 00336 QueueProxy::frameStarts() 00337 { 00338 } 00339 00340 std::string 00341 QueueProxy::printAllQueues() 00342 { 00343 std::stringstream s; 00344 00345 QueueContainer queues = colleagues.queueManager_->getAllQueues(); 00346 00347 QueueContainer::iterator it; 00348 for(it = queues.begin(); it != queues.end(); it++) 00349 { 00350 startCollectionIfNeeded(it->first); 00351 s << it->second->printAllQueues() << "\n"; 00352 } 00353 return s.str(); 00354 } 00355 00356 bool 00357 QueueProxy::supportsDynamicSegmentation() const 00358 { 00359 return supportsDynamicSegmentation_; 00360 } 00361 00362 wns::ldk::CompoundPtr 00363 QueueProxy::getHeadOfLinePDUSegment(wns::scheduler::ConnectionID cid, int bits) 00364 { 00365 assure(supportsDynamicSegmentation_, "Dynamic segmentation not supported"); 00366 assure(!copyQueue_->isEmpty(cid), "Requested PDU from emty queue"); 00367 00368 wns::ldk::CompoundPtr pdu = copyQueue_->getPDU(cid, bits); 00369 return pdu; 00370 } 00371 00372 int 00373 QueueProxy::getMinimumSegmentSize() const 00374 { 00375 assure(supportsDynamicSegmentation_, "Dynamic segmentation not supported"); 00376 detail::SegmentingInnerCopyQueue* q; 00377 q = dynamic_cast<detail::SegmentingInnerCopyQueue*>(copyQueue_); 00378 00379 return q->getMinimumSegmentSize(); 00380 } 00381 00382 void 00383 QueueProxy::createQueueCopyIfNeeded(wns::scheduler::ConnectionID cid) const 00384 { 00385 wns::simulator::Time now = wns::simulator::getEventScheduler()->getTime(); 00386 00387 wns::scheduler::queue::QueueInterface* queue; 00388 queue = colleagues.queueManager_->getQueue(cid); 00389 00390 // New round, create new PDUs in copyQueue 00391 if(queue != NULL && 00392 (lastChecked_.find(cid) == lastChecked_.end() || lastChecked_[cid] != now)) 00393 { 00394 lastChecked_[cid] = now; 00395 00396 // Empty the old copy queue 00397 if(copyQueue_->knowsCID(cid)) 00398 { 00399 MESSAGE_BEGIN(NORMAL, logger_, m, myFUN_->getName()); 00400 m << " Removing " << copyQueue_->getSize(cid) << " PDUs form old copyQueue "; 00401 m << " for CID " << cid; 00402 MESSAGE_END(); 00403 00404 copyQueue_->reset(cid); 00405 } 00406 00407 startCollectionIfNeeded(cid); 00408 00409 if(!queue->queueHasPDUs(cid)) 00410 { 00411 MESSAGE_BEGIN(NORMAL, logger_, m, myFUN_->getName()); 00412 m << " Real queue is empty for CID: "; 00413 m << cid; 00414 MESSAGE_END(); 00415 return; 00416 } 00417 00418 copyQueue_->setQueue(cid, queue->getQueueCopy(cid)); 00419 00420 MESSAGE_BEGIN(NORMAL, logger_, m, myFUN_->getName()); 00421 m << " Created a copy of " << copyQueue_->getSize(cid) << " PDUs for CID "; 00422 m << cid; 00423 MESSAGE_END(); 00424 00425 } 00426 } 00427 00428 std::queue<wns::ldk::CompoundPtr> 00429 QueueProxy::getQueueCopy(ConnectionID cid) 00430 { 00431 wns::Exception("You should not call getQueueCopy of the QueueProxy."); 00432 } 00433 00434 void 00435 QueueProxy::startCollectionIfNeeded(wns::scheduler::ConnectionID cid) const 00436 { 00437 wns::simulator::Time now = wns::simulator::getEventScheduler()->getTime(); 00438 00439 if(lastCollected_.find(cid) == lastCollected_.end() || lastCollected_[cid] != now) 00440 { 00441 lastCollected_[cid] = now; 00442 colleagues.queueManager_->startCollection(cid); 00443 } 00444 } 00445
1.5.5