User Manual, Developers Guide and API Documentation

SimpleQueue.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002  * This file is part of openWNS (open Wireless Network Simulator)
00003  * _____________________________________________________________________________
00004  *
00005  * Copyright (C) 2004-2009
00006  * Chair of Communication Networks (ComNets)
00007  * Kopernikusstr. 5, D-52074 Aachen, Germany
00008  * phone: ++49-241-80-27910,
00009  * fax: ++49-241-80-22242
00010  * email: info@openwns.org
00011  * www: http://www.openwns.org
00012  * _____________________________________________________________________________
00013  *
00014  * openWNS is free software; you can redistribute it and/or modify it under the
00015  * terms of the GNU Lesser General Public License version 2 as published by the
00016  * Free Software Foundation;
00017  *
00018  * openWNS is distributed in the hope that it will be useful, but WITHOUT ANY
00019  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
00020  * A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00021  * details.
00022  *
00023  * You should have received a copy of the GNU Lesser General Public License
00024  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
00025  *
00026  ******************************************************************************/
00027 
00028 #include <WNS/scheduler/queue/SimpleQueue.hpp>
00029 #include <WNS/probe/bus/utils.hpp>
00030 #include <WNS/ldk/Layer.hpp>
00031 #include <WNS/ldk/Compound.hpp>
00032 
00033 using namespace wns::scheduler;
00034 using namespace wns::scheduler::queue;
00035 
00036 
00037 STATIC_FACTORY_REGISTER_WITH_CREATOR(SimpleQueue,
00038                                      QueueInterface,
00039                                      "SimpleQueue",
00040                                      wns::HasReceptorConfigCreator);
00041 
00042 SimpleQueue::SimpleQueue(wns::ldk::HasReceptorInterface*, const wns::pyconfig::View& _config)
00043     : probeContextProviderForCid(NULL),
00044       probeContextProviderForPriority(NULL),
00045       maxSize(0),
00046       logger(_config.get("logger")),
00047       config(_config),
00048       myFUN()
00049 {
00050 }
00051 
00052 SimpleQueue::~SimpleQueue()
00053 {
00054     if (probeContextProviderForCid) { delete probeContextProviderForCid; }
00055     if (probeContextProviderForPriority) { delete probeContextProviderForPriority; }
00056 }
00057 
00058 void SimpleQueue::setFUN(wns::ldk::fun::FUN* fun)
00059 {
00060     // read the localIDs from the config
00061     wns::probe::bus::ContextProviderCollection localContext(&fun->getLayer()->getContextProviderCollection());
00062     for (int ii = 0; ii<config.len("localIDs.keys()"); ++ii)
00063     {
00064         std::string key = config.get<std::string>("localIDs.keys()",ii);
00065         unsigned long int value  = config.get<unsigned long int>("localIDs.values()",ii);
00066         localContext.addProvider( wns::probe::bus::contextprovider::Constant(key, value) );
00067     }
00068     probeContextProviderForCid  = new wns::probe::bus::contextprovider::Variable("cid", 0);
00069     probeContextProviderForPriority  = new wns::probe::bus::contextprovider::Variable("MAC.QoSClass", 0);
00070     localContext.addProvider(wns::probe::bus::contextprovider::Container(probeContextProviderForCid));
00071     localContext.addProvider(wns::probe::bus::contextprovider::Container(probeContextProviderForPriority));
00072 
00073     std::string sizeProbeName = config.get<std::string>("sizeProbeName");
00074     sizeProbeBus = wns::probe::bus::ContextCollectorPtr(new wns::probe::bus::ContextCollector(localContext, sizeProbeName));
00075     myFUN = fun;
00076 }
00077 
00078 bool SimpleQueue::isAccepting(const wns::ldk::CompoundPtr&  compound ) const {
00079     int size = compound->getLengthInBits();
00080 
00081     ConnectionID cid = colleagues.registry->getCIDforPDU(compound);
00082 
00083     // if this is a brand new connection, return true because couldn't have
00084     // exceeded limit
00085     if (queues.find(cid) == queues.end())
00086     {
00087         MESSAGE_BEGIN(VERBOSE, logger, m, "");
00088         m << "Accepting PDU of size " <<  size <<  " into queue that would be newly created for CID " << cid
00089           << " - " << colleagues.registry->getNameForUser(colleagues.registry->getUserForCID(cid)) <<"\n";
00090         MESSAGE_END();
00091         return true;
00092     }
00093 
00094     if (size + queues.find(cid)->second.bits > maxSize)
00095     {
00096         MESSAGE_BEGIN(VERBOSE, logger, m, "");
00097         m << "Not accepting PDU of size "
00098           << size << " because queue size " << queues.find(cid)->second.bits << " for CID " << cid
00099           << " - " << colleagues.registry->getNameForUser(colleagues.registry->getUserForCID(cid)) <<"\n";
00100         MESSAGE_END();
00101         return  false;
00102     }
00103 
00104     MESSAGE_BEGIN(VERBOSE, logger, m, "");
00105     m << "Accepting PDU of size " <<  size <<  " because queue size is only " << queues.find(cid)->second.bits << " for CID " << cid
00106       << " - " << colleagues.registry->getNameForUser(colleagues.registry->getUserForCID(cid)) <<"\n";
00107     MESSAGE_END();
00108     return true;
00109 }
00110 
00111 void
00112 SimpleQueue::put(const wns::ldk::CompoundPtr& compound) {
00113     assure(compound, "No valid PDU");
00114     assure(compound != wns::ldk::CompoundPtr(), "No valid PDU" );
00115     assure(isAccepting(compound),"sendData() has been called without isAccepting()");
00116     assure(colleagues.registry, "Need a registry as colleague, please set first");
00117 
00118     ConnectionID cid = colleagues.registry->getCIDforPDU(compound);
00119     int     priority = colleagues.registry->getPriorityForConnection(cid); // only for probes
00120 
00121     // saves pdu and automatically create new queue if necessary
00122     // needs a 'map' to do so.
00123     (queues[cid].pduQueue).push(compound);
00124     queues[cid].bits += compound->getLengthInBits();
00125 
00126     if (probeContextProviderForCid && probeContextProviderForPriority && sizeProbeBus) {
00127         probeContextProviderForCid->set(cid /*int context*/);
00128         probeContextProviderForPriority->set(priority);
00129         sizeProbeBus->put((double)queues[cid].bits / (double)maxSize); // relative (0..100%)
00130     } else {
00131         MESSAGE_SINGLE(NORMAL, logger, "SimpleQueue::put(cid="<<cid<<"): size="<<queues[cid].bits<<"): undefined sizeProbeBus="<<sizeProbeBus);
00132     }
00133 
00134 }
00135 
00136 // [rs]: obsolete? Better use cid-related questions
00137 UserSet
00138 SimpleQueue::getQueuedUsers() const {
00139     UserSet users;
00140     for (QueueContainer::const_iterator iter = queues.begin(); iter != queues.end(); ++iter)
00141     {
00142         if ((*iter).second.pduQueue.size() != 0)
00143         {
00144             ConnectionID cid = iter->first;
00145             UserID user = colleagues.registry->getUserForCID(cid);
00146             users.insert(user);
00147         }
00148     }
00149     return users;
00150 }
00151 
00152 ConnectionSet
00153 SimpleQueue::getActiveConnections() const
00154 {
00155     ConnectionSet result;
00156 
00157     for (QueueContainer::const_iterator iter = queues.begin(); iter != queues.end(); ++iter)
00158         if ((*iter).second.pduQueue.size() != 0)
00159             result.insert((*iter).first);
00160 
00161     return result;
00162 }
00163 
00164 unsigned long int
00165 SimpleQueue::numCompoundsForCid(ConnectionID cid) const
00166 {
00167     QueueContainer::const_iterator iter = queues.find(cid);
00168     assure(iter != queues.end(),"cannot find queue for cid="<<cid);
00169     return iter->second.pduQueue.size();
00170 }
00171 
00172 unsigned long int
00173 SimpleQueue::numBitsForCid(ConnectionID cid) const
00174 {
00175     QueueContainer::const_iterator iter = queues.find(cid);
00176     assure(iter != queues.end(),"cannot find queue for cid="<<cid);
00177     return iter->second.bits;
00178 }
00179 
00180 // result is sorted per-cid
00181 QueueStatusContainer
00182 SimpleQueue::getQueueStatus() const
00183 {
00184     wns::scheduler::QueueStatusContainer result;
00185 
00186     // Find all queues that belong to this user (obsolete)
00187     // Find all queues
00188     for (QueueContainer::const_iterator iter = queues.begin(); iter != queues.end(); ++iter)
00189     {
00190         ConnectionID cid = iter->first;
00191         QueueStatus queueStatus;
00192         queueStatus.numOfBits      = iter->second.bits;
00193         queueStatus.numOfCompounds = iter->second.pduQueue.size();
00194         result.insert(cid,queueStatus);
00195         MESSAGE_SINGLE(NORMAL, logger, "SimpleQueue::getQueueStatus():"
00196                        << " for cid=" << cid
00197                        << ": bits=" << iter->second.bits
00198                        << ", PDUs=" << iter->second.pduQueue.size());
00199     }
00200     return result;
00201 }
00202 
00203 wns::ldk::CompoundPtr
00204 SimpleQueue::getHeadOfLinePDU(ConnectionID cid) {
00205     assure(queueHasPDUs(cid), "getHeadOfLinePDU called for CID without PDUs or non-existent CID");
00206 
00207     wns::ldk::CompoundPtr pdu = queues[cid].pduQueue.front();
00208     queues[cid].pduQueue.pop();
00209     queues[cid].bits -= pdu->getLengthInBits();
00210 
00211     if (probeContextProviderForCid && probeContextProviderForPriority && sizeProbeBus) {
00212         probeContextProviderForCid->set(cid /*int context*/);
00213         int priority = colleagues.registry->getPriorityForConnection(cid);
00214         probeContextProviderForPriority->set(priority);
00215         sizeProbeBus->put((double)queues[cid].bits / (double)maxSize); // relative (0..100%)
00216     }
00217 
00218     return pdu;
00219 }
00220 
00221 int
00222 SimpleQueue::getHeadOfLinePDUbits(ConnectionID cid)
00223 {
00224     assure(queueHasPDUs(cid), "getHeadOfLinePDUbits called for CID without PDUs or non-existent CID");
00225     return queues[cid].pduQueue.front()->getLengthInBits();
00226 }
00227 
00228 std::queue<wns::ldk::CompoundPtr> 
00229 SimpleQueue::getQueueCopy(ConnectionID cid)
00230 {
00231     assure(queues.find(cid) != queues.end(), "getQueueCopy called for non-existent CID");
00232     return queues[cid].pduQueue;
00233 }
00234 
00235 bool
00236 SimpleQueue::isEmpty() const
00237 {
00238     for (QueueContainer::const_iterator iter = queues.begin(); iter != queues.end(); ++iter)
00239     {
00240         if ((*iter).second.pduQueue.size() != 0)
00241             return false;
00242     }
00243     return true;
00244 }
00245 bool
00246 SimpleQueue::hasQueue(ConnectionID cid)
00247 {
00248     return queues.find(cid) != queues.end();
00249 }
00250 
00251 bool
00252 SimpleQueue::queueHasPDUs(ConnectionID cid) const {
00253     if (queues.find(cid) == queues.end())
00254         return false;
00255     return (queues.find(cid)->second.pduQueue.size() != 0);
00256 }
00257 
00258 ConnectionSet
00259 SimpleQueue::filterQueuedCids(ConnectionSet connections) {
00260     ConnectionSet activeConnections;
00261     for ( wns::scheduler::ConnectionSet::iterator iter = connections.begin(); iter != connections.end(); ++iter )
00262     {
00263         ConnectionID cid = *iter;
00264         if ( queueHasPDUs(cid) )
00265             activeConnections.insert(cid);
00266     }
00267     return activeConnections;
00268 }
00269 
00270 void
00271 SimpleQueue::setColleagues(RegistryProxyInterface* _registry) {
00272     colleagues.registry = _registry;
00273     maxSize = colleagues.registry->getQueueSizeLimitPerConnection();
00274 }
00275 
00276 QueueInterface::ProbeOutput
00277 SimpleQueue::resetAllQueues()
00278 {
00279     // Store number of bits and compounds for Probe which will be deleted
00280     ProbeOutput probeOutput;
00281     for (QueueContainer::iterator iter = queues.begin();
00282          iter != queues.end(); ++iter)
00283     {
00284         ConnectionID cid = iter->first;
00285         probeOutput.bits += iter->second.bits;
00286         probeOutput.compounds += iter->second.pduQueue.size();
00287         if (probeContextProviderForCid && probeContextProviderForPriority && sizeProbeBus) {
00288             probeContextProviderForCid->set(cid);
00289             int priority = colleagues.registry->getPriorityForConnection(cid);
00290             probeContextProviderForCid->set(priority);
00291             sizeProbeBus->put(0.0 /*double wert*/);
00292         }
00293     }
00294 
00295     // queues is a std::map that stores the std::queues that store the
00296     // CompoundPtrs. So by doing a queues.clear(), the destructors are called
00297     // and the refCounting mechanism of the CompoundPtr takes care of actually
00298     // deleting the compounds.
00299     queues.clear();
00300 
00301     return probeOutput;
00302 }
00303 
00304 // [rs]: obsolete? Better use cid-related questions
00305 QueueInterface::ProbeOutput
00306 SimpleQueue::resetQueues(UserID _user)
00307 {
00308     // MESSAGE_SINGLE(NORMAL, logger, "SimpleQueue::resetQueues(): obsolete"); // TODO [rs]; not supported by [aoz]
00309     // Store number of bits and compounds for Probe which will be deleted
00310     ProbeOutput probeOutput;
00311 
00312     // Find all queues that belong to this user and delete them.  This one is a
00313     // little bit tricky, see section 6.6.2 of Josutti's STL book: we have to be
00314     // careful when deleting the current iterator position
00315     for (QueueContainer::iterator iter = queues.begin(); iter != queues.end(); )
00316     {
00317         ConnectionID cid = iter->first;
00318         UserID user = colleagues.registry->getUserForCID(cid);
00319         if (user == _user)
00320         {
00321             ConnectionID cid = iter->first;
00322             probeOutput.bits += iter->second.bits;
00323             probeOutput.compounds += iter->second.pduQueue.size();
00324             if (probeContextProviderForCid && probeContextProviderForPriority && sizeProbeBus) {
00325                 probeContextProviderForCid->set(cid);
00326                 int priority = colleagues.registry->getPriorityForConnection(cid);
00327                 probeContextProviderForCid->set(priority);
00328                 sizeProbeBus->put(0.0 /*double wert*/);
00329             }
00330             queues.erase(iter++);
00331         }
00332         else
00333             ++iter;
00334     }
00335     return probeOutput;
00336 }
00337 
00338 QueueInterface::ProbeOutput
00339 SimpleQueue::resetQueue(ConnectionID cid)
00340 {
00341     // Store number of bits and compounds for Probe which will be deleted
00342     ProbeOutput probeOutput;
00343     probeOutput.bits += queues[cid].bits;
00344     probeOutput.compounds += queues[cid].pduQueue.size();
00345     if (probeContextProviderForCid && probeContextProviderForPriority && sizeProbeBus) {
00346         probeContextProviderForCid->set(cid /*int context*/);
00347         int priority = colleagues.registry->getPriorityForConnection(cid);
00348         probeContextProviderForCid->set(priority);
00349         sizeProbeBus->put(0.0 /*double wert*/);
00350     }
00351 
00352 #ifndef NDEBUG
00353     int numRemoved =
00354 #endif
00355         queues.erase(cid);
00356     assure(numRemoved == 1, "Non-existing or too many queues with that CID");
00357 
00358     return probeOutput;
00359 }
00360 
00361 std::string
00362 SimpleQueue::printAllQueues()
00363 {
00364     std::stringstream s;
00365     for (QueueContainer::iterator iter = queues.begin();
00366          iter != queues.end(); ++iter)
00367     {
00368         ConnectionID cid = iter->first;
00369         int bits      = iter->second.bits;
00370         int compounds = iter->second.pduQueue.size();
00371         s << cid << ":" << bits << "," << compounds << " ";
00372     }
00373     return s.str();
00374 }
00375 
00376 

Generated on Fri May 25 03:31:54 2012 for openWNS by  doxygen 1.5.5