User Manual, Developers Guide and API Documentation

SegmentingQueue.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002  * This file is part of openWNS (open Wireless Network Simulator)
00003  * _____________________________________________________________________________
00004  *
00005  * Copyright (C) 2004-2009
00006  * Chair of Communication Networks (ComNets)
00007  * Kopernikusstr. 5, D-52074 Aachen, Germany
00008  * phone: ++49-241-80-27910,
00009  * fax: ++49-241-80-22242
00010  * email: info@openwns.org
00011  * www: http://www.openwns.org
00012  * _____________________________________________________________________________
00013  *
00014  * openWNS is free software; you can redistribute it and/or modify it under the
00015  * terms of the GNU Lesser General Public License version 2 as published by the
00016  * Free Software Foundation;
00017  *
00018  * openWNS is distributed in the hope that it will be useful, but WITHOUT ANY
00019  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
00020  * A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00021  * details.
00022  *
00023  * You should have received a copy of the GNU Lesser General Public License
00024  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
00025  *
00026  ******************************************************************************/
00027 
00028 #include <WNS/scheduler/queue/SegmentingQueue.hpp>
00029 #include <WNS/probe/bus/utils.hpp>
00030 #include <WNS/ldk/Layer.hpp>
00031 #include <WNS/ldk/Compound.hpp>
00032 
00033 using namespace wns::scheduler;
00034 using namespace wns::scheduler::queue;
00035 
00036 
00037 STATIC_FACTORY_REGISTER_WITH_CREATOR(SegmentingQueue,
00038                                      QueueInterface,
00039                                      "SegmentingQueue",
00040                                      wns::HasReceptorConfigCreator);
00041 
00042 SegmentingQueue::SegmentingQueue(wns::ldk::HasReceptorInterface*, const wns::pyconfig::View& _config)
00043     : segmentHeaderReader(NULL),
00044       probeHeaderReader(NULL),
00045       logger(_config.get("logger")),
00046       config(_config),
00047       myFUN(),
00048       maxSize(0),
00049       minimumSegmentSize(_config.get<unsigned long int>("minimumSegmentSize")),
00050       fixedHeaderSize(_config.get<Bit>("fixedHeaderSize")),
00051       extensionHeaderSize(_config.get<Bit>("extensionHeaderSize")),
00052       usePadding(_config.get<bool>("usePadding")),
00053       byteAlignHeader(_config.get<bool>("byteAlignHeader")),
00054       isDropping(_config.get<bool>("isDropping"))
00055 {
00056 }
00057 
00058 SegmentingQueue::~SegmentingQueue()
00059 {
00060     if (segmentHeaderReader) { segmentHeaderReader = NULL;}
00061 }
00062 
00063 void SegmentingQueue::setFUN(wns::ldk::fun::FUN* fun)
00064 {
00065     myFUN = fun;
00066     // read the localIDs from the config
00067     wns::probe::bus::ContextProviderCollection localContext(&fun->getLayer()->getContextProviderCollection());
00068     for (int ii = 0; ii<config.len("localIDs.keys()"); ++ii)
00069     {
00070         std::string key = config.get<std::string>("localIDs.keys()",ii);
00071         unsigned long int value  = config.get<unsigned long int>("localIDs.values()",ii);
00072         localContext.addProvider( wns::probe::bus::contextprovider::Constant(key, value) );
00073     }
00074 
00075     std::string sizeProbeName = config.get<std::string>("sizeProbeName");
00076     sizeProbeBus = wns::probe::bus::ContextCollectorPtr(new wns::probe::bus::ContextCollector(localContext, sizeProbeName));
00077 
00078     std::string overheadProbeName = config.get<std::string>("overheadProbeName");
00079     overheadProbeBus = wns::probe::bus::ContextCollectorPtr(new wns::probe::bus::ContextCollector(localContext, overheadProbeName));
00080 
00081     if(!config.isNone("delayProbeName"))
00082     {
00083         std::string delayProbeName = config.get<std::string>("delayProbeName");
00084         delayProbeBus = wns::probe::bus::ContextCollectorPtr(
00085             new wns::probe::bus::ContextCollector(localContext, 
00086                 delayProbeName + ".delay"));
00087     
00088         // Same name as the probe prefix
00089         probeHeaderReader = myFUN->getCommandReader(delayProbeName);
00090     }
00091 
00092     std::string segmentHeaderCommandName = config.get<std::string>("segmentHeaderCommandName");
00093     segmentHeaderReader = myFUN->getCommandReader(segmentHeaderCommandName);
00094     assure(segmentHeaderReader, "No reader for the Segment Header ("<<segmentHeaderCommandName<<") available!");
00095     MESSAGE_SINGLE(NORMAL, logger, "SegmentingQueue::setFUN(): segmentHeaderCommandName="<<segmentHeaderCommandName);
00096 }
00097 
00098 bool SegmentingQueue::isAccepting(const wns::ldk::CompoundPtr&  compound ) const {
00099     int compoundSize = compound->getLengthInBits();
00100     ConnectionID cid = colleagues.registry->getCIDforPDU(compound);
00101 
00102     // if this is a brand new connection, return true because couldn't have
00103     // exceeded limit
00104     if (queues.find(cid) == queues.end())
00105     {
00106         MESSAGE_BEGIN(VERBOSE, logger, m, "");
00107         m << "Accepting PDU of size " <<  compoundSize <<  " into queue that would be newly created for CID=" << cid
00108           << " of user=" << colleagues.registry->getNameForUser(colleagues.registry->getUserForCID(cid)) <<"\n";
00109         MESSAGE_END();
00110         return true;
00111     }
00112 
00113     if (compoundSize + queues.find(cid)->second.queuedNettoBits() > maxSize)
00114     {
00115         MESSAGE_BEGIN(VERBOSE, logger, m, "");
00116         m << "Not accepting PDU of size=" << compoundSize
00117           << " because net queuesize=" << queues.find(cid)->second.queuedNettoBits() << " for CID=" << cid
00118           << " of user=" << colleagues.registry->getNameForUser(colleagues.registry->getUserForCID(cid)) <<"\n";
00119         MESSAGE_END();
00120         return  false;
00121     }
00122 
00123     MESSAGE_BEGIN(VERBOSE, logger, m, "");
00124     m << "Accepting PDU of size=" <<  compoundSize <<  " because net queuesize=" << queues.find(cid)->second.queuedNettoBits() << " for CID=" << cid
00125       << " of user=" << colleagues.registry->getNameForUser(colleagues.registry->getUserForCID(cid)) <<"\n";
00126     MESSAGE_END();
00127     return true;
00128 } // isAccepting
00129 
00130 void
00131 SegmentingQueue::put(const wns::ldk::CompoundPtr& compound) {
00132     assure(compound, "No valid PDU");
00133     assure(compound != wns::ldk::CompoundPtr(), "No valid PDU" );
00134     assure(colleagues.registry, "Need a registry as colleague, please set first");
00135 
00136     bool accepting = isAccepting(compound);
00137     if (isDropping && !accepting)
00138     {
00139         MESSAGE_SINGLE(VERBOSE, logger, "SegmentingQueue is not accepting. Dropping compound");
00140         return;
00141     }
00142     else
00143     {
00144         assure(accepting, "sendData() has been called without isAccepting()");
00145     }
00146 
00147     ConnectionID cid = colleagues.registry->getCIDforPDU(compound);
00148     Bit compoundLength = compound->getLengthInBits();
00149     assure(compoundLength>0,"compoundLength="<<compoundLength);
00150 
00151     // saves pdu and automatically create new queue if necessary
00152     // needs a 'map' to do so.
00153     queues[cid].put(compound);
00154 
00155     if (fixedOverhead.find(cid) == fixedOverhead.end())
00156     {
00157         fixedOverhead[cid] = fixedHeaderSize;
00158     }
00159 
00160     MESSAGE_SINGLE(NORMAL, logger, "SegmentingQueue::put(cid="<<cid<<"): after: bits="<<queues[cid].queuedNettoBits()<<"/"<<queues[cid].queuedBruttoBits(fixedOverhead[cid],extensionHeaderSize, byteAlignHeader)<<", PDUs="<<queues[cid].queuedCompounds());
00161 
00162     if (sizeProbeBus) {
00163         int priority = colleagues.registry->getPriorityForConnection(cid); // only for probes
00164 
00165         sizeProbeBus->put((double)queues[cid].queuedBruttoBits(fixedOverhead[cid],extensionHeaderSize, byteAlignHeader) / (double)maxSize,
00166                           boost::make_tuple("cid", cid, "MAC.QoSClass", priority)); // relative (0..100%)
00167     } else {
00168         MESSAGE_SINGLE(NORMAL, logger, "SegmentingQueue::put(cid="<<cid<<"): size="<<queues[cid].queuedBruttoBits(fixedOverhead[cid],extensionHeaderSize, byteAlignHeader)<<"): undefined sizeProbeBus="<<sizeProbeBus);
00169     }
00170 } // put
00171 
00172 // [rs]: obsolete? Better use cid-related questions. Used frequently in OLD scheduler strategies
00173 UserSet
00174 SegmentingQueue::getQueuedUsers() const {
00175     UserSet users;
00176 
00177     for (QueueContainer::const_iterator iter = queues.begin(); iter != queues.end(); ++iter)
00178     {
00179         if ( !( (*iter).second.empty()) )
00180         {
00181             ConnectionID cid = iter->first;
00182             UserID user = colleagues.registry->getUserForCID(cid);
00183             users.insert(user);
00184         }
00185     }
00186     return users;
00187 }
00188 
00189 ConnectionSet
00190 SegmentingQueue::getActiveConnections() const
00191 {
00192     ConnectionSet result;
00193 
00194     for (QueueContainer::const_iterator iter = queues.begin(); iter != queues.end(); ++iter) {
00195         ConnectionID cid = iter->first;
00196         if ( !( (*iter).second.empty() ) ) {
00197             result.insert(cid);
00198         } else {
00199             assure(iter->second.queuedNettoBits()==0,"Zero packets but "<<iter->second.queuedNettoBits()<<" bits. How can this be?");
00200         }
00201     }
00202     return result;
00203 }
00204 
00205 unsigned long int
00206 SegmentingQueue::numCompoundsForCid(ConnectionID cid) const
00207 {
00208     QueueContainer::const_iterator iter = queues.find(cid);
00209     assure(iter != queues.end(),"cannot find queue for cid="<<cid);
00210     return iter->second.queuedCompounds();
00211 }
00212 
00213 unsigned long int
00214 SegmentingQueue::numBitsForCid(ConnectionID cid) const
00215 {
00216 
00217     QueueContainer::const_iterator iter = queues.find(cid);
00218     assure(iter != queues.end(),"cannot find queue for cid="<<cid);
00219 
00225     assure(fixedOverhead.find(cid)!=fixedOverhead.end(), "Cannot find overhead entry for cid " << cid);
00226     int overhead = fixedOverhead.find(cid)->second;
00227     return iter->second.queuedBruttoBits(overhead, extensionHeaderSize, byteAlignHeader);
00228 } // numBitsForCid()
00229 
00230 // result is sorted per-cid
00231 QueueStatusContainer
00232 SegmentingQueue::getQueueStatus() const
00233 {
00234     wns::scheduler::QueueStatusContainer result;
00235 
00236     // Find all queues
00237     for (QueueContainer::const_iterator iter = queues.begin(); iter != queues.end(); ++iter)
00238     {
00239         ConnectionID cid = iter->first;
00240         QueueStatus queueStatus;
00241         assure(fixedOverhead.find(cid)!=fixedOverhead.end(), "Cannot find overhead entry for cid " << cid);
00242         int overhead = fixedOverhead.find(cid)->second;
00243         queueStatus.numOfBits      = iter->second.queuedBruttoBits(overhead, extensionHeaderSize, byteAlignHeader);
00244         queueStatus.numOfCompounds = iter->second.queuedCompounds();
00245         result.insert(cid,queueStatus);
00246         MESSAGE_SINGLE(NORMAL, logger, "SegmentingQueue::getQueueStatus():"
00247                        << " for cid=" << cid
00248                        << ": bits=" << queueStatus.numOfBits
00249                        << ", PDUs=" << queueStatus.numOfCompounds);
00250         // if we have bits we also must have a pdu:
00251         assure((queueStatus.numOfBits==0)
00252                || (queueStatus.numOfCompounds>0),
00253                "numOfBits="<<queueStatus.numOfBits<<" but numOfCompounds="<<queueStatus.numOfCompounds<<" for cid="<<cid);
00254     }
00255     return result;
00256 } // getQueueStatus()
00257 
00258 wns::ldk::CompoundPtr
00259 SegmentingQueue::getHeadOfLinePDU(ConnectionID cid) {
00260     assure(false, "The SegmentingQueue does not support getHeadOfLinePDU");
00261 }
00262 
00263 // return only those bits which belong to one PDU
00264 int
00265 SegmentingQueue::getHeadOfLinePDUbits(ConnectionID cid)
00266 {
00267     QueueContainer::const_iterator iter = queues.find(cid);
00268     assure(iter != queues.end(),"cannot find queue for cid="<<cid);
00269     assure(queueHasPDUs(cid), "getHeadOfLinePDUbits called for CID without PDUs or non-existent CID="<<cid);
00270 
00271     return numBitsForCid(cid);
00272 }
00273 
00274 wns::ldk::CompoundPtr
00275 SegmentingQueue::getHeadOfLinePDUSegment(ConnectionID cid, int requestedBits)
00276 {
00277     assure(queueHasPDUs(cid), "getHeadOfLinePDUSegments(cid="<<cid<<",bits="<<requestedBits<<") called for CID without PDUs or non-existent CID");
00278 
00279     assure(segmentHeaderReader != NULL, "No valid segmentHeaderReader set! You need to call setFUN() first.");
00280     assure(fixedOverhead.find(cid)!=fixedOverhead.end(), "Cannot find overhead entry for cid " << cid);
00281 
00282     wns::ldk::CompoundPtr segment;
00283 
00284     int ov = fixedOverhead[cid];
00285 
00286     if (requestedBits <= ov)
00287     {
00288         ov = requestedBits - 1;
00289     }
00290 
00291     segment = queues[cid].retrieve(requestedBits, ov, extensionHeaderSize, 
00292         usePadding, byteAlignHeader, segmentHeaderReader, delayProbeBus, probeHeaderReader);
00293 
00294     assure(segment != wns::ldk::CompoundPtr(), "Inner queue did not return a PDU");
00295 
00296     // Clear this. The next request will not include a fixed header
00297     // Will be reset in frameStarts()
00298     fixedOverhead[cid] -= ov;
00299     if (fixedOverhead[cid] < 0)
00300     {
00301         fixedOverhead[cid] = 0;
00302     }
00303 
00304     segmentHeaderReader->commitSizes(segment->getCommandPool());
00305 
00306     ISegmentationCommand* header = segmentHeaderReader->readCommand<ISegmentationCommand>(segment->getCommandPool());
00307 
00308     if (sizeProbeBus) {
00309         int priority = colleagues.registry->getPriorityForConnection(cid);
00310         sizeProbeBus->put((double)queues[cid].queuedBruttoBits(fixedOverhead[cid],extensionHeaderSize, byteAlignHeader) / (double)maxSize,
00311                           boost::make_tuple("cid", cid, "MAC.QoSClass", priority)); // relative (0..100%)
00312     }
00313 
00314     if (overheadProbeBus) {
00315         int priority = colleagues.registry->getPriorityForConnection(cid);
00316         overheadProbeBus->put( ( (double) header->headerSize())/((double) header->totalSize()),
00317                           boost::make_tuple("cid", cid, "MAC.QoSClass", priority)); // relative (0..100%)
00318     }
00319     
00320     MESSAGE_SINGLE(NORMAL, logger, "getHeadOfLinePDUSegment(cid="<<cid<<",to="<<colleagues.registry->getNameForUser(colleagues.registry->getUserForCID(cid))
00321                    <<",bits="<<requestedBits<<"): totalSize="<<header->totalSize()<<" bits, sn="<< header->getSequenceNumber() );
00322 
00323     MESSAGE_SINGLE(NORMAL, logger, "getHeadOfLinePDUSegment(cid="<<cid<<"): after: bits="<<queues[cid].queuedNettoBits()<<"/"<<queues[cid].queuedBruttoBits(fixedOverhead[cid], extensionHeaderSize, byteAlignHeader)<<", PDUs="<<queues[cid].queuedCompounds() << ", fh: " << fixedOverhead[cid] << ", eh: " << extensionHeaderSize);
00324 
00325     assure(header->totalSize()<=requestedBits,"pdulength="<<header->totalSize()<<" > bits="<<requestedBits);
00326     return segment;
00327 }
00328 
00329 std::queue<wns::ldk::CompoundPtr> 
00330 SegmentingQueue::getQueueCopy(ConnectionID cid)
00331 {
00332     assure(queues.find(cid) != queues.end(), "getQueueCopy called for non-existent CID");
00333     return queues[cid].getQueueCopy();
00334 }
00335 
00336 bool
00337 SegmentingQueue::isEmpty() const
00338 {
00339     for (QueueContainer::const_iterator iter = queues.begin(); iter != queues.end(); ++iter)
00340     {
00341         if ( !( (*iter).second.empty() ))
00342             return false;
00343     }
00344     return true;
00345 }
00346 
00347 bool
00348 SegmentingQueue::hasQueue(ConnectionID cid)
00349 {
00350     return queues.find(cid) != queues.end();
00351 }
00352 
00353 bool
00354 SegmentingQueue::queueHasPDUs(ConnectionID cid) const {
00355     if (queues.find(cid) == queues.end())
00356         return false;
00357     return ( !(queues.find(cid)->second.empty()) );
00358 }
00359 
00360 ConnectionSet
00361 SegmentingQueue::filterQueuedCids(ConnectionSet connections) {
00362     ConnectionSet activeConnections;
00363     for ( wns::scheduler::ConnectionSet::iterator iter = connections.begin(); iter != connections.end(); ++iter )
00364     {
00365         ConnectionID cid = *iter;
00366         if ( queueHasPDUs(cid) )
00367             activeConnections.insert(cid);
00368     }
00369     return activeConnections;
00370 }
00371 
00372 void
00373 SegmentingQueue::setColleagues(RegistryProxyInterface* _registry) {
00374     colleagues.registry = _registry;
00375     maxSize = colleagues.registry->getQueueSizeLimitPerConnection();
00376 }
00377 
00378 QueueInterface::ProbeOutput
00379 SegmentingQueue::resetAllQueues()
00380 {
00381     // Store number of bits and compounds for Probe which will be deleted
00382     ProbeOutput probeOutput;
00383     for (QueueContainer::iterator iter = queues.begin();
00384          iter != queues.end(); ++iter)
00385     {
00386         ConnectionID cid = iter->first;
00387         probeOutput.bits += iter->second.queuedBruttoBits(fixedOverhead[cid],extensionHeaderSize, byteAlignHeader);
00388         probeOutput.compounds += iter->second.queuedCompounds();
00389         if (sizeProbeBus) {
00390             int priority = colleagues.registry->getPriorityForConnection(cid);
00391             sizeProbeBus->put(0.0, boost::make_tuple("cid", cid, "MAC.QoSClass", priority)); // relative (0..100%)
00392         }
00393     }
00394 
00395     // queues is a std::map that stores the std::queues that store the
00396     // CompoundPtrs. So by doing a queues.clear(), the destructors are called
00397     // and the refCounting mechanism of the CompoundPtr takes care of actually
00398     // deleting the compounds.
00399     queues.clear();
00400     fixedOverhead.clear();
00401 
00402     return probeOutput;
00403 }
00404 
00405 // [rs]: obsolete? Better use cid-related questions
00406 QueueInterface::ProbeOutput
00407 SegmentingQueue::resetQueues(UserID _user)
00408 {
00409     // MESSAGE_SINGLE(NORMAL, logger, "SegmentingQueue::resetQueues(): obsolete"); // TODO [rs]; not supported by [aoz]
00410     // Store number of bits and compounds for Probe which will be deleted
00411     ProbeOutput probeOutput;
00412 
00413     // Find all queues that belong to this user and delete them.  This one is a
00414     // little bit tricky, see section 6.6.2 of Josutti's STL book: we have to be
00415     // careful when deleting the current iterator position
00416     for (QueueContainer::iterator iter = queues.begin(); iter != queues.end(); )
00417     {
00418         ConnectionID cid = iter->first;
00419         UserID user = colleagues.registry->getUserForCID(cid);
00420         if (user == _user)
00421         {
00422             ConnectionID cid = iter->first;
00423             probeOutput.bits += iter->second.queuedBruttoBits(fixedOverhead[cid],extensionHeaderSize, byteAlignHeader);
00424             probeOutput.compounds += iter->second.queuedCompounds();
00425             if (sizeProbeBus) {
00426                 int priority = colleagues.registry->getPriorityForConnection(cid);
00427                 sizeProbeBus->put(0.0, boost::make_tuple("cid", cid, "MAC.QoSClass", priority)); // relative (0..100%)
00428             }
00429             queues.erase(iter++);
00430         }
00431         else
00432             ++iter;
00433     }
00434     for (FixedOverheadContainer::iterator iter = fixedOverhead.begin(); iter != fixedOverhead.end(); )
00435     {
00436         ConnectionID cid = iter->first;
00437         UserID user = colleagues.registry->getUserForCID(cid);
00438         if (user == _user)
00439         {
00440             ConnectionID cid = iter->first;
00441             fixedOverhead.erase(iter++);
00442         }
00443         else
00444             ++iter;
00445     }
00446 
00447     return probeOutput;
00448 }
00449 
00450 QueueInterface::ProbeOutput
00451 SegmentingQueue::resetQueue(ConnectionID cid)
00452 {
00453     // Store number of bits and compounds for Probe which will be deleted
00454     ProbeOutput probeOutput;
00455     probeOutput.bits += queues[cid].queuedBruttoBits(fixedOverhead[cid],extensionHeaderSize, byteAlignHeader);
00456     probeOutput.compounds += queues[cid].queuedCompounds();
00457     if (sizeProbeBus) {
00458         int priority = colleagues.registry->getPriorityForConnection(cid);
00459         sizeProbeBus->put(0.0, boost::make_tuple("cid", cid, "MAC.QoSClass", priority)); // relative (0..100%)
00460     }
00461 
00462 #ifndef NDEBUG
00463     int numRemoved =
00464 #endif
00465         queues.erase(cid);
00466     fixedOverhead.erase(cid);
00467     assure(numRemoved == 1, "Non-existing or too many queues with that CID");
00468 
00469     return probeOutput;
00470 }
00471 
00472 void
00473 SegmentingQueue::frameStarts()
00474 {
00475     MESSAGE_SINGLE(NORMAL, logger, "frameStarts(): resetting fixed header flags");
00476 
00477     for (FixedOverheadContainer::iterator it=fixedOverhead.begin(); it!=fixedOverhead.end(); ++it)
00478     {
00479         it->second = fixedHeaderSize;
00480     }
00481 }
00482 
00483 std::string
00484 SegmentingQueue::printAllQueues()
00485 {
00486     std::stringstream s;
00487     for (QueueContainer::iterator iter = queues.begin();
00488          iter != queues.end(); ++iter)
00489     {
00490         ConnectionID cid = iter->first;
00491         int bits      = iter->second.queuedBruttoBits(fixedOverhead[cid],extensionHeaderSize, byteAlignHeader);
00492         int compounds = iter->second.queuedCompounds();
00493         s << cid << ":" << bits << "," << compounds << " ";
00494     }
00495     return s.str();
00496 }
00497 
00498 

Generated on Fri May 25 03:31:54 2012 for openWNS by  doxygen 1.5.5