![]() |
User Manual, Developers Guide and API Documentation |
![]() |
00001 /******************************************************************************* 00002 * This file is part of openWNS (open Wireless Network Simulator) 00003 * _____________________________________________________________________________ 00004 * 00005 * Copyright (C) 2004-2009 00006 * Chair of Communication Networks (ComNets) 00007 * Kopernikusstr. 5, D-52074 Aachen, Germany 00008 * phone: ++49-241-80-27910, 00009 * fax: ++49-241-80-22242 00010 * email: info@openwns.org 00011 * www: http://www.openwns.org 00012 * _____________________________________________________________________________ 00013 * 00014 * openWNS is free software; you can redistribute it and/or modify it under the 00015 * terms of the GNU Lesser General Public License version 2 as published by the 00016 * Free Software Foundation; 00017 * 00018 * openWNS is distributed in the hope that it will be useful, but WITHOUT ANY 00019 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 00020 * A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 00021 * details. 00022 * 00023 * You should have received a copy of the GNU Lesser General Public License 00024 * along with this program. If not, see <http://www.gnu.org/licenses/>. 00025 * 00026 ******************************************************************************/ 00027 00028 #include <WNS/scheduler/queue/SegmentingQueue.hpp> 00029 #include <WNS/probe/bus/utils.hpp> 00030 #include <WNS/ldk/Layer.hpp> 00031 #include <WNS/ldk/Compound.hpp> 00032 00033 using namespace wns::scheduler; 00034 using namespace wns::scheduler::queue; 00035 00036 00037 STATIC_FACTORY_REGISTER_WITH_CREATOR(SegmentingQueue, 00038 QueueInterface, 00039 "SegmentingQueue", 00040 wns::HasReceptorConfigCreator); 00041 00042 SegmentingQueue::SegmentingQueue(wns::ldk::HasReceptorInterface*, const wns::pyconfig::View& _config) 00043 : segmentHeaderReader(NULL), 00044 probeHeaderReader(NULL), 00045 logger(_config.get("logger")), 00046 config(_config), 00047 myFUN(), 00048 maxSize(0), 00049 minimumSegmentSize(_config.get<unsigned long int>("minimumSegmentSize")), 00050 fixedHeaderSize(_config.get<Bit>("fixedHeaderSize")), 00051 extensionHeaderSize(_config.get<Bit>("extensionHeaderSize")), 00052 usePadding(_config.get<bool>("usePadding")), 00053 byteAlignHeader(_config.get<bool>("byteAlignHeader")), 00054 isDropping(_config.get<bool>("isDropping")) 00055 { 00056 } 00057 00058 SegmentingQueue::~SegmentingQueue() 00059 { 00060 if (segmentHeaderReader) { segmentHeaderReader = NULL;} 00061 } 00062 00063 void SegmentingQueue::setFUN(wns::ldk::fun::FUN* fun) 00064 { 00065 myFUN = fun; 00066 // read the localIDs from the config 00067 wns::probe::bus::ContextProviderCollection localContext(&fun->getLayer()->getContextProviderCollection()); 00068 for (int ii = 0; ii<config.len("localIDs.keys()"); ++ii) 00069 { 00070 std::string key = config.get<std::string>("localIDs.keys()",ii); 00071 unsigned long int value = config.get<unsigned long int>("localIDs.values()",ii); 00072 localContext.addProvider( wns::probe::bus::contextprovider::Constant(key, value) ); 00073 } 00074 00075 std::string sizeProbeName = config.get<std::string>("sizeProbeName"); 00076 sizeProbeBus = wns::probe::bus::ContextCollectorPtr(new wns::probe::bus::ContextCollector(localContext, sizeProbeName)); 00077 00078 std::string overheadProbeName = config.get<std::string>("overheadProbeName"); 00079 overheadProbeBus = wns::probe::bus::ContextCollectorPtr(new wns::probe::bus::ContextCollector(localContext, overheadProbeName)); 00080 00081 if(!config.isNone("delayProbeName")) 00082 { 00083 std::string delayProbeName = config.get<std::string>("delayProbeName"); 00084 delayProbeBus = wns::probe::bus::ContextCollectorPtr( 00085 new wns::probe::bus::ContextCollector(localContext, 00086 delayProbeName + ".delay")); 00087 00088 // Same name as the probe prefix 00089 probeHeaderReader = myFUN->getCommandReader(delayProbeName); 00090 } 00091 00092 std::string segmentHeaderCommandName = config.get<std::string>("segmentHeaderCommandName"); 00093 segmentHeaderReader = myFUN->getCommandReader(segmentHeaderCommandName); 00094 assure(segmentHeaderReader, "No reader for the Segment Header ("<<segmentHeaderCommandName<<") available!"); 00095 MESSAGE_SINGLE(NORMAL, logger, "SegmentingQueue::setFUN(): segmentHeaderCommandName="<<segmentHeaderCommandName); 00096 } 00097 00098 bool SegmentingQueue::isAccepting(const wns::ldk::CompoundPtr& compound ) const { 00099 int compoundSize = compound->getLengthInBits(); 00100 ConnectionID cid = colleagues.registry->getCIDforPDU(compound); 00101 00102 // if this is a brand new connection, return true because couldn't have 00103 // exceeded limit 00104 if (queues.find(cid) == queues.end()) 00105 { 00106 MESSAGE_BEGIN(VERBOSE, logger, m, ""); 00107 m << "Accepting PDU of size " << compoundSize << " into queue that would be newly created for CID=" << cid 00108 << " of user=" << colleagues.registry->getNameForUser(colleagues.registry->getUserForCID(cid)) <<"\n"; 00109 MESSAGE_END(); 00110 return true; 00111 } 00112 00113 if (compoundSize + queues.find(cid)->second.queuedNettoBits() > maxSize) 00114 { 00115 MESSAGE_BEGIN(VERBOSE, logger, m, ""); 00116 m << "Not accepting PDU of size=" << compoundSize 00117 << " because net queuesize=" << queues.find(cid)->second.queuedNettoBits() << " for CID=" << cid 00118 << " of user=" << colleagues.registry->getNameForUser(colleagues.registry->getUserForCID(cid)) <<"\n"; 00119 MESSAGE_END(); 00120 return false; 00121 } 00122 00123 MESSAGE_BEGIN(VERBOSE, logger, m, ""); 00124 m << "Accepting PDU of size=" << compoundSize << " because net queuesize=" << queues.find(cid)->second.queuedNettoBits() << " for CID=" << cid 00125 << " of user=" << colleagues.registry->getNameForUser(colleagues.registry->getUserForCID(cid)) <<"\n"; 00126 MESSAGE_END(); 00127 return true; 00128 } // isAccepting 00129 00130 void 00131 SegmentingQueue::put(const wns::ldk::CompoundPtr& compound) { 00132 assure(compound, "No valid PDU"); 00133 assure(compound != wns::ldk::CompoundPtr(), "No valid PDU" ); 00134 assure(colleagues.registry, "Need a registry as colleague, please set first"); 00135 00136 bool accepting = isAccepting(compound); 00137 if (isDropping && !accepting) 00138 { 00139 MESSAGE_SINGLE(VERBOSE, logger, "SegmentingQueue is not accepting. Dropping compound"); 00140 return; 00141 } 00142 else 00143 { 00144 assure(accepting, "sendData() has been called without isAccepting()"); 00145 } 00146 00147 ConnectionID cid = colleagues.registry->getCIDforPDU(compound); 00148 Bit compoundLength = compound->getLengthInBits(); 00149 assure(compoundLength>0,"compoundLength="<<compoundLength); 00150 00151 // saves pdu and automatically create new queue if necessary 00152 // needs a 'map' to do so. 00153 queues[cid].put(compound); 00154 00155 if (fixedOverhead.find(cid) == fixedOverhead.end()) 00156 { 00157 fixedOverhead[cid] = fixedHeaderSize; 00158 } 00159 00160 MESSAGE_SINGLE(NORMAL, logger, "SegmentingQueue::put(cid="<<cid<<"): after: bits="<<queues[cid].queuedNettoBits()<<"/"<<queues[cid].queuedBruttoBits(fixedOverhead[cid],extensionHeaderSize, byteAlignHeader)<<", PDUs="<<queues[cid].queuedCompounds()); 00161 00162 if (sizeProbeBus) { 00163 int priority = colleagues.registry->getPriorityForConnection(cid); // only for probes 00164 00165 sizeProbeBus->put((double)queues[cid].queuedBruttoBits(fixedOverhead[cid],extensionHeaderSize, byteAlignHeader) / (double)maxSize, 00166 boost::make_tuple("cid", cid, "MAC.QoSClass", priority)); // relative (0..100%) 00167 } else { 00168 MESSAGE_SINGLE(NORMAL, logger, "SegmentingQueue::put(cid="<<cid<<"): size="<<queues[cid].queuedBruttoBits(fixedOverhead[cid],extensionHeaderSize, byteAlignHeader)<<"): undefined sizeProbeBus="<<sizeProbeBus); 00169 } 00170 } // put 00171 00172 // [rs]: obsolete? Better use cid-related questions. Used frequently in OLD scheduler strategies 00173 UserSet 00174 SegmentingQueue::getQueuedUsers() const { 00175 UserSet users; 00176 00177 for (QueueContainer::const_iterator iter = queues.begin(); iter != queues.end(); ++iter) 00178 { 00179 if ( !( (*iter).second.empty()) ) 00180 { 00181 ConnectionID cid = iter->first; 00182 UserID user = colleagues.registry->getUserForCID(cid); 00183 users.insert(user); 00184 } 00185 } 00186 return users; 00187 } 00188 00189 ConnectionSet 00190 SegmentingQueue::getActiveConnections() const 00191 { 00192 ConnectionSet result; 00193 00194 for (QueueContainer::const_iterator iter = queues.begin(); iter != queues.end(); ++iter) { 00195 ConnectionID cid = iter->first; 00196 if ( !( (*iter).second.empty() ) ) { 00197 result.insert(cid); 00198 } else { 00199 assure(iter->second.queuedNettoBits()==0,"Zero packets but "<<iter->second.queuedNettoBits()<<" bits. How can this be?"); 00200 } 00201 } 00202 return result; 00203 } 00204 00205 unsigned long int 00206 SegmentingQueue::numCompoundsForCid(ConnectionID cid) const 00207 { 00208 QueueContainer::const_iterator iter = queues.find(cid); 00209 assure(iter != queues.end(),"cannot find queue for cid="<<cid); 00210 return iter->second.queuedCompounds(); 00211 } 00212 00213 unsigned long int 00214 SegmentingQueue::numBitsForCid(ConnectionID cid) const 00215 { 00216 00217 QueueContainer::const_iterator iter = queues.find(cid); 00218 assure(iter != queues.end(),"cannot find queue for cid="<<cid); 00219 00225 assure(fixedOverhead.find(cid)!=fixedOverhead.end(), "Cannot find overhead entry for cid " << cid); 00226 int overhead = fixedOverhead.find(cid)->second; 00227 return iter->second.queuedBruttoBits(overhead, extensionHeaderSize, byteAlignHeader); 00228 } // numBitsForCid() 00229 00230 // result is sorted per-cid 00231 QueueStatusContainer 00232 SegmentingQueue::getQueueStatus() const 00233 { 00234 wns::scheduler::QueueStatusContainer result; 00235 00236 // Find all queues 00237 for (QueueContainer::const_iterator iter = queues.begin(); iter != queues.end(); ++iter) 00238 { 00239 ConnectionID cid = iter->first; 00240 QueueStatus queueStatus; 00241 assure(fixedOverhead.find(cid)!=fixedOverhead.end(), "Cannot find overhead entry for cid " << cid); 00242 int overhead = fixedOverhead.find(cid)->second; 00243 queueStatus.numOfBits = iter->second.queuedBruttoBits(overhead, extensionHeaderSize, byteAlignHeader); 00244 queueStatus.numOfCompounds = iter->second.queuedCompounds(); 00245 result.insert(cid,queueStatus); 00246 MESSAGE_SINGLE(NORMAL, logger, "SegmentingQueue::getQueueStatus():" 00247 << " for cid=" << cid 00248 << ": bits=" << queueStatus.numOfBits 00249 << ", PDUs=" << queueStatus.numOfCompounds); 00250 // if we have bits we also must have a pdu: 00251 assure((queueStatus.numOfBits==0) 00252 || (queueStatus.numOfCompounds>0), 00253 "numOfBits="<<queueStatus.numOfBits<<" but numOfCompounds="<<queueStatus.numOfCompounds<<" for cid="<<cid); 00254 } 00255 return result; 00256 } // getQueueStatus() 00257 00258 wns::ldk::CompoundPtr 00259 SegmentingQueue::getHeadOfLinePDU(ConnectionID cid) { 00260 assure(false, "The SegmentingQueue does not support getHeadOfLinePDU"); 00261 } 00262 00263 // return only those bits which belong to one PDU 00264 int 00265 SegmentingQueue::getHeadOfLinePDUbits(ConnectionID cid) 00266 { 00267 QueueContainer::const_iterator iter = queues.find(cid); 00268 assure(iter != queues.end(),"cannot find queue for cid="<<cid); 00269 assure(queueHasPDUs(cid), "getHeadOfLinePDUbits called for CID without PDUs or non-existent CID="<<cid); 00270 00271 return numBitsForCid(cid); 00272 } 00273 00274 wns::ldk::CompoundPtr 00275 SegmentingQueue::getHeadOfLinePDUSegment(ConnectionID cid, int requestedBits) 00276 { 00277 assure(queueHasPDUs(cid), "getHeadOfLinePDUSegments(cid="<<cid<<",bits="<<requestedBits<<") called for CID without PDUs or non-existent CID"); 00278 00279 assure(segmentHeaderReader != NULL, "No valid segmentHeaderReader set! You need to call setFUN() first."); 00280 assure(fixedOverhead.find(cid)!=fixedOverhead.end(), "Cannot find overhead entry for cid " << cid); 00281 00282 wns::ldk::CompoundPtr segment; 00283 00284 int ov = fixedOverhead[cid]; 00285 00286 if (requestedBits <= ov) 00287 { 00288 ov = requestedBits - 1; 00289 } 00290 00291 segment = queues[cid].retrieve(requestedBits, ov, extensionHeaderSize, 00292 usePadding, byteAlignHeader, segmentHeaderReader, delayProbeBus, probeHeaderReader); 00293 00294 assure(segment != wns::ldk::CompoundPtr(), "Inner queue did not return a PDU"); 00295 00296 // Clear this. The next request will not include a fixed header 00297 // Will be reset in frameStarts() 00298 fixedOverhead[cid] -= ov; 00299 if (fixedOverhead[cid] < 0) 00300 { 00301 fixedOverhead[cid] = 0; 00302 } 00303 00304 segmentHeaderReader->commitSizes(segment->getCommandPool()); 00305 00306 ISegmentationCommand* header = segmentHeaderReader->readCommand<ISegmentationCommand>(segment->getCommandPool()); 00307 00308 if (sizeProbeBus) { 00309 int priority = colleagues.registry->getPriorityForConnection(cid); 00310 sizeProbeBus->put((double)queues[cid].queuedBruttoBits(fixedOverhead[cid],extensionHeaderSize, byteAlignHeader) / (double)maxSize, 00311 boost::make_tuple("cid", cid, "MAC.QoSClass", priority)); // relative (0..100%) 00312 } 00313 00314 if (overheadProbeBus) { 00315 int priority = colleagues.registry->getPriorityForConnection(cid); 00316 overheadProbeBus->put( ( (double) header->headerSize())/((double) header->totalSize()), 00317 boost::make_tuple("cid", cid, "MAC.QoSClass", priority)); // relative (0..100%) 00318 } 00319 00320 MESSAGE_SINGLE(NORMAL, logger, "getHeadOfLinePDUSegment(cid="<<cid<<",to="<<colleagues.registry->getNameForUser(colleagues.registry->getUserForCID(cid)) 00321 <<",bits="<<requestedBits<<"): totalSize="<<header->totalSize()<<" bits, sn="<< header->getSequenceNumber() ); 00322 00323 MESSAGE_SINGLE(NORMAL, logger, "getHeadOfLinePDUSegment(cid="<<cid<<"): after: bits="<<queues[cid].queuedNettoBits()<<"/"<<queues[cid].queuedBruttoBits(fixedOverhead[cid], extensionHeaderSize, byteAlignHeader)<<", PDUs="<<queues[cid].queuedCompounds() << ", fh: " << fixedOverhead[cid] << ", eh: " << extensionHeaderSize); 00324 00325 assure(header->totalSize()<=requestedBits,"pdulength="<<header->totalSize()<<" > bits="<<requestedBits); 00326 return segment; 00327 } 00328 00329 std::queue<wns::ldk::CompoundPtr> 00330 SegmentingQueue::getQueueCopy(ConnectionID cid) 00331 { 00332 assure(queues.find(cid) != queues.end(), "getQueueCopy called for non-existent CID"); 00333 return queues[cid].getQueueCopy(); 00334 } 00335 00336 bool 00337 SegmentingQueue::isEmpty() const 00338 { 00339 for (QueueContainer::const_iterator iter = queues.begin(); iter != queues.end(); ++iter) 00340 { 00341 if ( !( (*iter).second.empty() )) 00342 return false; 00343 } 00344 return true; 00345 } 00346 00347 bool 00348 SegmentingQueue::hasQueue(ConnectionID cid) 00349 { 00350 return queues.find(cid) != queues.end(); 00351 } 00352 00353 bool 00354 SegmentingQueue::queueHasPDUs(ConnectionID cid) const { 00355 if (queues.find(cid) == queues.end()) 00356 return false; 00357 return ( !(queues.find(cid)->second.empty()) ); 00358 } 00359 00360 ConnectionSet 00361 SegmentingQueue::filterQueuedCids(ConnectionSet connections) { 00362 ConnectionSet activeConnections; 00363 for ( wns::scheduler::ConnectionSet::iterator iter = connections.begin(); iter != connections.end(); ++iter ) 00364 { 00365 ConnectionID cid = *iter; 00366 if ( queueHasPDUs(cid) ) 00367 activeConnections.insert(cid); 00368 } 00369 return activeConnections; 00370 } 00371 00372 void 00373 SegmentingQueue::setColleagues(RegistryProxyInterface* _registry) { 00374 colleagues.registry = _registry; 00375 maxSize = colleagues.registry->getQueueSizeLimitPerConnection(); 00376 } 00377 00378 QueueInterface::ProbeOutput 00379 SegmentingQueue::resetAllQueues() 00380 { 00381 // Store number of bits and compounds for Probe which will be deleted 00382 ProbeOutput probeOutput; 00383 for (QueueContainer::iterator iter = queues.begin(); 00384 iter != queues.end(); ++iter) 00385 { 00386 ConnectionID cid = iter->first; 00387 probeOutput.bits += iter->second.queuedBruttoBits(fixedOverhead[cid],extensionHeaderSize, byteAlignHeader); 00388 probeOutput.compounds += iter->second.queuedCompounds(); 00389 if (sizeProbeBus) { 00390 int priority = colleagues.registry->getPriorityForConnection(cid); 00391 sizeProbeBus->put(0.0, boost::make_tuple("cid", cid, "MAC.QoSClass", priority)); // relative (0..100%) 00392 } 00393 } 00394 00395 // queues is a std::map that stores the std::queues that store the 00396 // CompoundPtrs. So by doing a queues.clear(), the destructors are called 00397 // and the refCounting mechanism of the CompoundPtr takes care of actually 00398 // deleting the compounds. 00399 queues.clear(); 00400 fixedOverhead.clear(); 00401 00402 return probeOutput; 00403 } 00404 00405 // [rs]: obsolete? Better use cid-related questions 00406 QueueInterface::ProbeOutput 00407 SegmentingQueue::resetQueues(UserID _user) 00408 { 00409 // MESSAGE_SINGLE(NORMAL, logger, "SegmentingQueue::resetQueues(): obsolete"); // TODO [rs]; not supported by [aoz] 00410 // Store number of bits and compounds for Probe which will be deleted 00411 ProbeOutput probeOutput; 00412 00413 // Find all queues that belong to this user and delete them. This one is a 00414 // little bit tricky, see section 6.6.2 of Josutti's STL book: we have to be 00415 // careful when deleting the current iterator position 00416 for (QueueContainer::iterator iter = queues.begin(); iter != queues.end(); ) 00417 { 00418 ConnectionID cid = iter->first; 00419 UserID user = colleagues.registry->getUserForCID(cid); 00420 if (user == _user) 00421 { 00422 ConnectionID cid = iter->first; 00423 probeOutput.bits += iter->second.queuedBruttoBits(fixedOverhead[cid],extensionHeaderSize, byteAlignHeader); 00424 probeOutput.compounds += iter->second.queuedCompounds(); 00425 if (sizeProbeBus) { 00426 int priority = colleagues.registry->getPriorityForConnection(cid); 00427 sizeProbeBus->put(0.0, boost::make_tuple("cid", cid, "MAC.QoSClass", priority)); // relative (0..100%) 00428 } 00429 queues.erase(iter++); 00430 } 00431 else 00432 ++iter; 00433 } 00434 for (FixedOverheadContainer::iterator iter = fixedOverhead.begin(); iter != fixedOverhead.end(); ) 00435 { 00436 ConnectionID cid = iter->first; 00437 UserID user = colleagues.registry->getUserForCID(cid); 00438 if (user == _user) 00439 { 00440 ConnectionID cid = iter->first; 00441 fixedOverhead.erase(iter++); 00442 } 00443 else 00444 ++iter; 00445 } 00446 00447 return probeOutput; 00448 } 00449 00450 QueueInterface::ProbeOutput 00451 SegmentingQueue::resetQueue(ConnectionID cid) 00452 { 00453 // Store number of bits and compounds for Probe which will be deleted 00454 ProbeOutput probeOutput; 00455 probeOutput.bits += queues[cid].queuedBruttoBits(fixedOverhead[cid],extensionHeaderSize, byteAlignHeader); 00456 probeOutput.compounds += queues[cid].queuedCompounds(); 00457 if (sizeProbeBus) { 00458 int priority = colleagues.registry->getPriorityForConnection(cid); 00459 sizeProbeBus->put(0.0, boost::make_tuple("cid", cid, "MAC.QoSClass", priority)); // relative (0..100%) 00460 } 00461 00462 #ifndef NDEBUG 00463 int numRemoved = 00464 #endif 00465 queues.erase(cid); 00466 fixedOverhead.erase(cid); 00467 assure(numRemoved == 1, "Non-existing or too many queues with that CID"); 00468 00469 return probeOutput; 00470 } 00471 00472 void 00473 SegmentingQueue::frameStarts() 00474 { 00475 MESSAGE_SINGLE(NORMAL, logger, "frameStarts(): resetting fixed header flags"); 00476 00477 for (FixedOverheadContainer::iterator it=fixedOverhead.begin(); it!=fixedOverhead.end(); ++it) 00478 { 00479 it->second = fixedHeaderSize; 00480 } 00481 } 00482 00483 std::string 00484 SegmentingQueue::printAllQueues() 00485 { 00486 std::stringstream s; 00487 for (QueueContainer::iterator iter = queues.begin(); 00488 iter != queues.end(); ++iter) 00489 { 00490 ConnectionID cid = iter->first; 00491 int bits = iter->second.queuedBruttoBits(fixedOverhead[cid],extensionHeaderSize, byteAlignHeader); 00492 int compounds = iter->second.queuedCompounds(); 00493 s << cid << ":" << bits << "," << compounds << " "; 00494 } 00495 return s.str(); 00496 } 00497 00498
1.5.5