![]() |
User Manual, Developers Guide and API Documentation |
![]() |
00001 /******************************************************************************* 00002 * This file is part of openWNS (open Wireless Network Simulator) 00003 * _____________________________________________________________________________ 00004 * 00005 * Copyright (C) 2004-2009 00006 * Chair of Communication Networks (ComNets) 00007 * Kopernikusstr. 5, D-52074 Aachen, Germany 00008 * phone: ++49-241-80-27910, 00009 * fax: ++49-241-80-22242 00010 * email: info@openwns.org 00011 * www: http://www.openwns.org 00012 * _____________________________________________________________________________ 00013 * 00014 * openWNS is free software; you can redistribute it and/or modify it under the 00015 * terms of the GNU Lesser General Public License version 2 as published by the 00016 * Free Software Foundation; 00017 * 00018 * openWNS is distributed in the hope that it will be useful, but WITHOUT ANY 00019 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 00020 * A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 00021 * details. 00022 * 00023 * You should have received a copy of the GNU Lesser General Public License 00024 * along with this program. If not, see <http://www.gnu.org/licenses/>. 00025 * 00026 ******************************************************************************/ 00027 00028 #include <WNS/scheduler/queue/SimpleQueue.hpp> 00029 #include <WNS/probe/bus/utils.hpp> 00030 #include <WNS/ldk/Layer.hpp> 00031 #include <WNS/ldk/Compound.hpp> 00032 00033 using namespace wns::scheduler; 00034 using namespace wns::scheduler::queue; 00035 00036 00037 STATIC_FACTORY_REGISTER_WITH_CREATOR(SimpleQueue, 00038 QueueInterface, 00039 "SimpleQueue", 00040 wns::HasReceptorConfigCreator); 00041 00042 SimpleQueue::SimpleQueue(wns::ldk::HasReceptorInterface*, const wns::pyconfig::View& _config) 00043 : probeContextProviderForCid(NULL), 00044 probeContextProviderForPriority(NULL), 00045 maxSize(0), 00046 logger(_config.get("logger")), 00047 config(_config), 00048 myFUN() 00049 { 00050 } 00051 00052 SimpleQueue::~SimpleQueue() 00053 { 00054 if (probeContextProviderForCid) { delete probeContextProviderForCid; } 00055 if (probeContextProviderForPriority) { delete probeContextProviderForPriority; } 00056 } 00057 00058 void SimpleQueue::setFUN(wns::ldk::fun::FUN* fun) 00059 { 00060 // read the localIDs from the config 00061 wns::probe::bus::ContextProviderCollection localContext(&fun->getLayer()->getContextProviderCollection()); 00062 for (int ii = 0; ii<config.len("localIDs.keys()"); ++ii) 00063 { 00064 std::string key = config.get<std::string>("localIDs.keys()",ii); 00065 unsigned long int value = config.get<unsigned long int>("localIDs.values()",ii); 00066 localContext.addProvider( wns::probe::bus::contextprovider::Constant(key, value) ); 00067 } 00068 probeContextProviderForCid = new wns::probe::bus::contextprovider::Variable("cid", 0); 00069 probeContextProviderForPriority = new wns::probe::bus::contextprovider::Variable("MAC.QoSClass", 0); 00070 localContext.addProvider(wns::probe::bus::contextprovider::Container(probeContextProviderForCid)); 00071 localContext.addProvider(wns::probe::bus::contextprovider::Container(probeContextProviderForPriority)); 00072 00073 std::string sizeProbeName = config.get<std::string>("sizeProbeName"); 00074 sizeProbeBus = wns::probe::bus::ContextCollectorPtr(new wns::probe::bus::ContextCollector(localContext, sizeProbeName)); 00075 myFUN = fun; 00076 } 00077 00078 bool SimpleQueue::isAccepting(const wns::ldk::CompoundPtr& compound ) const { 00079 int size = compound->getLengthInBits(); 00080 00081 ConnectionID cid = colleagues.registry->getCIDforPDU(compound); 00082 00083 // if this is a brand new connection, return true because couldn't have 00084 // exceeded limit 00085 if (queues.find(cid) == queues.end()) 00086 { 00087 MESSAGE_BEGIN(VERBOSE, logger, m, ""); 00088 m << "Accepting PDU of size " << size << " into queue that would be newly created for CID " << cid 00089 << " - " << colleagues.registry->getNameForUser(colleagues.registry->getUserForCID(cid)) <<"\n"; 00090 MESSAGE_END(); 00091 return true; 00092 } 00093 00094 if (size + queues.find(cid)->second.bits > maxSize) 00095 { 00096 MESSAGE_BEGIN(VERBOSE, logger, m, ""); 00097 m << "Not accepting PDU of size " 00098 << size << " because queue size " << queues.find(cid)->second.bits << " for CID " << cid 00099 << " - " << colleagues.registry->getNameForUser(colleagues.registry->getUserForCID(cid)) <<"\n"; 00100 MESSAGE_END(); 00101 return false; 00102 } 00103 00104 MESSAGE_BEGIN(VERBOSE, logger, m, ""); 00105 m << "Accepting PDU of size " << size << " because queue size is only " << queues.find(cid)->second.bits << " for CID " << cid 00106 << " - " << colleagues.registry->getNameForUser(colleagues.registry->getUserForCID(cid)) <<"\n"; 00107 MESSAGE_END(); 00108 return true; 00109 } 00110 00111 void 00112 SimpleQueue::put(const wns::ldk::CompoundPtr& compound) { 00113 assure(compound, "No valid PDU"); 00114 assure(compound != wns::ldk::CompoundPtr(), "No valid PDU" ); 00115 assure(isAccepting(compound),"sendData() has been called without isAccepting()"); 00116 assure(colleagues.registry, "Need a registry as colleague, please set first"); 00117 00118 ConnectionID cid = colleagues.registry->getCIDforPDU(compound); 00119 int priority = colleagues.registry->getPriorityForConnection(cid); // only for probes 00120 00121 // saves pdu and automatically create new queue if necessary 00122 // needs a 'map' to do so. 00123 (queues[cid].pduQueue).push(compound); 00124 queues[cid].bits += compound->getLengthInBits(); 00125 00126 if (probeContextProviderForCid && probeContextProviderForPriority && sizeProbeBus) { 00127 probeContextProviderForCid->set(cid /*int context*/); 00128 probeContextProviderForPriority->set(priority); 00129 sizeProbeBus->put((double)queues[cid].bits / (double)maxSize); // relative (0..100%) 00130 } else { 00131 MESSAGE_SINGLE(NORMAL, logger, "SimpleQueue::put(cid="<<cid<<"): size="<<queues[cid].bits<<"): undefined sizeProbeBus="<<sizeProbeBus); 00132 } 00133 00134 } 00135 00136 // [rs]: obsolete? Better use cid-related questions 00137 UserSet 00138 SimpleQueue::getQueuedUsers() const { 00139 UserSet users; 00140 for (QueueContainer::const_iterator iter = queues.begin(); iter != queues.end(); ++iter) 00141 { 00142 if ((*iter).second.pduQueue.size() != 0) 00143 { 00144 ConnectionID cid = iter->first; 00145 UserID user = colleagues.registry->getUserForCID(cid); 00146 users.insert(user); 00147 } 00148 } 00149 return users; 00150 } 00151 00152 ConnectionSet 00153 SimpleQueue::getActiveConnections() const 00154 { 00155 ConnectionSet result; 00156 00157 for (QueueContainer::const_iterator iter = queues.begin(); iter != queues.end(); ++iter) 00158 if ((*iter).second.pduQueue.size() != 0) 00159 result.insert((*iter).first); 00160 00161 return result; 00162 } 00163 00164 unsigned long int 00165 SimpleQueue::numCompoundsForCid(ConnectionID cid) const 00166 { 00167 QueueContainer::const_iterator iter = queues.find(cid); 00168 assure(iter != queues.end(),"cannot find queue for cid="<<cid); 00169 return iter->second.pduQueue.size(); 00170 } 00171 00172 unsigned long int 00173 SimpleQueue::numBitsForCid(ConnectionID cid) const 00174 { 00175 QueueContainer::const_iterator iter = queues.find(cid); 00176 assure(iter != queues.end(),"cannot find queue for cid="<<cid); 00177 return iter->second.bits; 00178 } 00179 00180 // result is sorted per-cid 00181 QueueStatusContainer 00182 SimpleQueue::getQueueStatus() const 00183 { 00184 wns::scheduler::QueueStatusContainer result; 00185 00186 // Find all queues that belong to this user (obsolete) 00187 // Find all queues 00188 for (QueueContainer::const_iterator iter = queues.begin(); iter != queues.end(); ++iter) 00189 { 00190 ConnectionID cid = iter->first; 00191 QueueStatus queueStatus; 00192 queueStatus.numOfBits = iter->second.bits; 00193 queueStatus.numOfCompounds = iter->second.pduQueue.size(); 00194 result.insert(cid,queueStatus); 00195 MESSAGE_SINGLE(NORMAL, logger, "SimpleQueue::getQueueStatus():" 00196 << " for cid=" << cid 00197 << ": bits=" << iter->second.bits 00198 << ", PDUs=" << iter->second.pduQueue.size()); 00199 } 00200 return result; 00201 } 00202 00203 wns::ldk::CompoundPtr 00204 SimpleQueue::getHeadOfLinePDU(ConnectionID cid) { 00205 assure(queueHasPDUs(cid), "getHeadOfLinePDU called for CID without PDUs or non-existent CID"); 00206 00207 wns::ldk::CompoundPtr pdu = queues[cid].pduQueue.front(); 00208 queues[cid].pduQueue.pop(); 00209 queues[cid].bits -= pdu->getLengthInBits(); 00210 00211 if (probeContextProviderForCid && probeContextProviderForPriority && sizeProbeBus) { 00212 probeContextProviderForCid->set(cid /*int context*/); 00213 int priority = colleagues.registry->getPriorityForConnection(cid); 00214 probeContextProviderForPriority->set(priority); 00215 sizeProbeBus->put((double)queues[cid].bits / (double)maxSize); // relative (0..100%) 00216 } 00217 00218 return pdu; 00219 } 00220 00221 int 00222 SimpleQueue::getHeadOfLinePDUbits(ConnectionID cid) 00223 { 00224 assure(queueHasPDUs(cid), "getHeadOfLinePDUbits called for CID without PDUs or non-existent CID"); 00225 return queues[cid].pduQueue.front()->getLengthInBits(); 00226 } 00227 00228 std::queue<wns::ldk::CompoundPtr> 00229 SimpleQueue::getQueueCopy(ConnectionID cid) 00230 { 00231 assure(queues.find(cid) != queues.end(), "getQueueCopy called for non-existent CID"); 00232 return queues[cid].pduQueue; 00233 } 00234 00235 bool 00236 SimpleQueue::isEmpty() const 00237 { 00238 for (QueueContainer::const_iterator iter = queues.begin(); iter != queues.end(); ++iter) 00239 { 00240 if ((*iter).second.pduQueue.size() != 0) 00241 return false; 00242 } 00243 return true; 00244 } 00245 bool 00246 SimpleQueue::hasQueue(ConnectionID cid) 00247 { 00248 return queues.find(cid) != queues.end(); 00249 } 00250 00251 bool 00252 SimpleQueue::queueHasPDUs(ConnectionID cid) const { 00253 if (queues.find(cid) == queues.end()) 00254 return false; 00255 return (queues.find(cid)->second.pduQueue.size() != 0); 00256 } 00257 00258 ConnectionSet 00259 SimpleQueue::filterQueuedCids(ConnectionSet connections) { 00260 ConnectionSet activeConnections; 00261 for ( wns::scheduler::ConnectionSet::iterator iter = connections.begin(); iter != connections.end(); ++iter ) 00262 { 00263 ConnectionID cid = *iter; 00264 if ( queueHasPDUs(cid) ) 00265 activeConnections.insert(cid); 00266 } 00267 return activeConnections; 00268 } 00269 00270 void 00271 SimpleQueue::setColleagues(RegistryProxyInterface* _registry) { 00272 colleagues.registry = _registry; 00273 maxSize = colleagues.registry->getQueueSizeLimitPerConnection(); 00274 } 00275 00276 QueueInterface::ProbeOutput 00277 SimpleQueue::resetAllQueues() 00278 { 00279 // Store number of bits and compounds for Probe which will be deleted 00280 ProbeOutput probeOutput; 00281 for (QueueContainer::iterator iter = queues.begin(); 00282 iter != queues.end(); ++iter) 00283 { 00284 ConnectionID cid = iter->first; 00285 probeOutput.bits += iter->second.bits; 00286 probeOutput.compounds += iter->second.pduQueue.size(); 00287 if (probeContextProviderForCid && probeContextProviderForPriority && sizeProbeBus) { 00288 probeContextProviderForCid->set(cid); 00289 int priority = colleagues.registry->getPriorityForConnection(cid); 00290 probeContextProviderForCid->set(priority); 00291 sizeProbeBus->put(0.0 /*double wert*/); 00292 } 00293 } 00294 00295 // queues is a std::map that stores the std::queues that store the 00296 // CompoundPtrs. So by doing a queues.clear(), the destructors are called 00297 // and the refCounting mechanism of the CompoundPtr takes care of actually 00298 // deleting the compounds. 00299 queues.clear(); 00300 00301 return probeOutput; 00302 } 00303 00304 // [rs]: obsolete? Better use cid-related questions 00305 QueueInterface::ProbeOutput 00306 SimpleQueue::resetQueues(UserID _user) 00307 { 00308 // MESSAGE_SINGLE(NORMAL, logger, "SimpleQueue::resetQueues(): obsolete"); // TODO [rs]; not supported by [aoz] 00309 // Store number of bits and compounds for Probe which will be deleted 00310 ProbeOutput probeOutput; 00311 00312 // Find all queues that belong to this user and delete them. This one is a 00313 // little bit tricky, see section 6.6.2 of Josutti's STL book: we have to be 00314 // careful when deleting the current iterator position 00315 for (QueueContainer::iterator iter = queues.begin(); iter != queues.end(); ) 00316 { 00317 ConnectionID cid = iter->first; 00318 UserID user = colleagues.registry->getUserForCID(cid); 00319 if (user == _user) 00320 { 00321 ConnectionID cid = iter->first; 00322 probeOutput.bits += iter->second.bits; 00323 probeOutput.compounds += iter->second.pduQueue.size(); 00324 if (probeContextProviderForCid && probeContextProviderForPriority && sizeProbeBus) { 00325 probeContextProviderForCid->set(cid); 00326 int priority = colleagues.registry->getPriorityForConnection(cid); 00327 probeContextProviderForCid->set(priority); 00328 sizeProbeBus->put(0.0 /*double wert*/); 00329 } 00330 queues.erase(iter++); 00331 } 00332 else 00333 ++iter; 00334 } 00335 return probeOutput; 00336 } 00337 00338 QueueInterface::ProbeOutput 00339 SimpleQueue::resetQueue(ConnectionID cid) 00340 { 00341 // Store number of bits and compounds for Probe which will be deleted 00342 ProbeOutput probeOutput; 00343 probeOutput.bits += queues[cid].bits; 00344 probeOutput.compounds += queues[cid].pduQueue.size(); 00345 if (probeContextProviderForCid && probeContextProviderForPriority && sizeProbeBus) { 00346 probeContextProviderForCid->set(cid /*int context*/); 00347 int priority = colleagues.registry->getPriorityForConnection(cid); 00348 probeContextProviderForCid->set(priority); 00349 sizeProbeBus->put(0.0 /*double wert*/); 00350 } 00351 00352 #ifndef NDEBUG 00353 int numRemoved = 00354 #endif 00355 queues.erase(cid); 00356 assure(numRemoved == 1, "Non-existing or too many queues with that CID"); 00357 00358 return probeOutput; 00359 } 00360 00361 std::string 00362 SimpleQueue::printAllQueues() 00363 { 00364 std::stringstream s; 00365 for (QueueContainer::iterator iter = queues.begin(); 00366 iter != queues.end(); ++iter) 00367 { 00368 ConnectionID cid = iter->first; 00369 int bits = iter->second.bits; 00370 int compounds = iter->second.pduQueue.size(); 00371 s << cid << ":" << bits << "," << compounds << " "; 00372 } 00373 return s.str(); 00374 } 00375 00376
1.5.5