![]() |
User Manual, Developers Guide and API Documentation |
![]() |
00001 /******************************************************************************* 00002 * This file is part of openWNS (open Wireless Network Simulator) 00003 * _____________________________________________________________________________ 00004 * 00005 * Copyright (C) 2004-2007 00006 * Chair of Communication Networks (ComNets) 00007 * Kopernikusstr. 5, D-52074 Aachen, Germany 00008 * phone: ++49-241-80-27910, 00009 * fax: ++49-241-80-22242 00010 * email: info@openwns.org 00011 * www: http://www.openwns.org 00012 * _____________________________________________________________________________ 00013 * 00014 * openWNS is free software; you can redistribute it and/or modify it under the 00015 * terms of the GNU Lesser General Public License version 2 as published by the 00016 * Free Software Foundation; 00017 * 00018 * openWNS is distributed in the hope that it will be useful, but WITHOUT ANY 00019 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 00020 * A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 00021 * details. 00022 * 00023 * You should have received a copy of the GNU Lesser General Public License 00024 * along with this program. If not, see <http://www.gnu.org/licenses/>. 00025 * 00026 ******************************************************************************/ 00027 00028 #include <LTE/helper/QueueProxy.hpp> 00029 #include <LTE/timing/RegistryProxy.hpp> 00030 #include <WNS/probe/bus/utils.hpp> 00031 00032 using namespace lte::helper; 00033 using namespace wns::scheduler; 00034 using namespace wns::scheduler::queue; 00035 00036 STATIC_FACTORY_REGISTER_WITH_CREATOR(QueueProxy, 00037 wns::scheduler::queue::QueueInterface, 00038 "lte.QueueProxy", 00039 wns::HasReceptorConfigCreator); 00040 00041 QueueProxy::QueueProxy(wns::ldk::HasReceptorInterface* hasReceptor, const wns::pyconfig::View& _config) 00042 : QueueInterface(), 00043 queue(NULL), //queue(config), 00044 rrhandler(NULL), 00045 rrStorage(NULL), 00046 queueIsExternal(true), 00047 logger(_config.get("logger")), 00048 config(_config), 00049 probeContextProviderForCid(NULL), 00050 probeContextProviderForPriority(NULL) 00051 { 00052 // make a real queue if it is required so (internal, not external): 00053 if (!queueIsExternal) queue = new wns::scheduler::queue::SimpleQueue(hasReceptor, config); 00054 MESSAGE_SINGLE(NORMAL, logger, "QueueProxy::QueueProxy()"); 00055 } 00056 00057 QueueProxy::~QueueProxy() 00058 { 00059 MESSAGE_SINGLE(VERBOSE, logger, "QueueProxy::~QueueProxy()"); 00060 if (queue!=NULL && !queueIsExternal) delete queue; 00061 if (probeContextProviderForCid) { delete probeContextProviderForCid; } 00062 if (probeContextProviderForPriority) { delete probeContextProviderForPriority; } 00063 } 00064 00065 void 00066 QueueProxy::setColleagues(RegistryProxyInterface* _registry) { 00067 colleagues.registry = _registry; 00068 assure(colleagues.registry != NULL,"colleagues.registry==NULL"); 00069 if (queue!=NULL) { 00070 // Forward call to internal queue 00071 queue->setColleagues(_registry); 00072 } 00073 } 00074 00075 void 00076 QueueProxy::setFUN(wns::ldk::fun::FUN* fun) 00077 { 00078 MESSAGE_SINGLE(NORMAL, logger, "QueueProxy::setFUN()"); 00079 assure(fun!=NULL, "fun==NULL"); 00080 assure(queue!=NULL || rrhandler!=NULL,"need at least queue or rrhandler"); 00081 if (queue!=NULL) queue->setFUN(fun); 00082 if (rrhandler!=NULL) { // only needed in this case; queue has its own probes 00083 // Initialization of probes... 00084 wns::probe::bus::ContextProviderCollection localContext(&fun->getLayer()->getContextProviderCollection()); 00085 for (int ii = 0; ii<config.len("localIDs.keys()"); ++ii) 00086 { 00087 std::string key = config.get<std::string>("localIDs.keys()",ii); 00088 uint32_t value = config.get<uint32_t>("localIDs.values()",ii); 00089 localContext.addProvider( wns::probe::bus::contextprovider::Constant(key, value) ); 00090 } 00091 probeContextProviderForCid = new wns::probe::bus::contextprovider::Variable("cid", 0); 00092 probeContextProviderForPriority = new wns::probe::bus::contextprovider::Variable("MAC.QoSClass", 0); 00093 localContext.addProvider(wns::probe::bus::contextprovider::Container(probeContextProviderForCid)); 00094 localContext.addProvider(wns::probe::bus::contextprovider::Container(probeContextProviderForPriority)); 00095 std::string sizeProbeName = config.get<std::string>("sizeProbeName"); 00096 sizeProbeBus = wns::probe::bus::ContextCollectorPtr(new wns::probe::bus::ContextCollector(localContext, sizeProbeName)); 00097 } 00098 } 00099 00100 void 00101 QueueProxy::setQueue(wns::scheduler::queue::QueueInterface* _queue) 00102 { 00103 assure(rrhandler==NULL,"you can either set Queue or RRHandler. Not both!"); 00104 if (queue!=NULL) delete queue; // old object 00105 queueIsExternal = true; // don't delete in destructor later 00106 queue=_queue; 00107 MESSAGE_SINGLE(NORMAL, logger, "configured as proxy to queue"); 00108 } 00109 00110 void 00111 QueueProxy::setRRHandler(lte::controlplane::RRHandler* _rrhandler) 00112 { 00113 assure(queue==NULL,"you can either set Queue or RRHandler. Not both!"); 00114 rrhandler = dynamic_cast<lte::controlplane::RRHandlerBS*>(_rrhandler); 00115 assure(rrhandler!=NULL,"rrhandler is invalid"); 00116 MESSAGE_SINGLE(NORMAL, logger, "configured as proxy to RRHandler"); 00117 rrStorage = rrhandler->getRRStorage(); 00118 assure(rrStorage!=NULL,"rrStorage is invalid"); 00119 } 00120 00121 bool 00122 QueueProxy::isAccepting(const wns::ldk::CompoundPtr& compound ) const 00123 { 00124 assure(rrhandler==NULL,"isAccepting() cannot be used in RRHandler proxy mode"); 00125 assure(queue!=NULL,"queue==NULL"); 00126 return queue->isAccepting(compound); 00127 } 00128 00129 void 00130 QueueProxy::put(const wns::ldk::CompoundPtr& compound ) { 00131 assure(rrhandler==NULL,"put() cannot be used in RRHandler proxy mode. This compound's journey:\n"<<compound->dumpJourney()); 00132 assure(queue!=NULL,"queue==NULL"); 00133 queue->put(compound); 00134 // no probing required in this case 00135 } 00136 00137 wns::scheduler::UserSet 00138 QueueProxy::getQueuedUsers() const 00139 { 00140 assure(queue!=NULL || rrhandler!=NULL,"queue==NULL && rrhandler==NULL"); 00141 if (queue!=NULL) { 00142 return queue->getQueuedUsers(); 00143 } else { 00144 assure(rrStorage!=NULL,"rrStorage is invalid"); 00145 return rrStorage->getActiveUsers(); 00146 } 00147 } 00148 00149 wns::scheduler::ConnectionSet 00150 QueueProxy::getActiveConnections() const 00151 { 00152 assure(queue!=NULL || rrhandler!=NULL,"queue==NULL && rrhandler==NULL"); 00153 if (queue!=NULL) { 00154 return queue->getActiveConnections(); 00155 } else { 00156 assure(rrStorage!=NULL,"rrStorage is invalid"); 00157 return rrStorage->getActiveConnections(); 00158 } 00159 } 00160 00161 void 00162 QueueProxy::writeProbe(wns::scheduler::ConnectionID cid, unsigned int priority) const 00163 { 00164 if (probeContextProviderForCid && probeContextProviderForPriority && sizeProbeBus) { 00165 probeContextProviderForCid->set(cid /*int context*/); 00166 probeContextProviderForPriority->set(priority); 00167 double bits = numBitsForCid(cid); 00168 sizeProbeBus->put(bits); // absolute 00169 } 00170 } 00171 00172 unsigned long int 00173 QueueProxy::numCompoundsForCid(wns::scheduler::ConnectionID cid) const 00174 { 00175 assure(queue!=NULL || rrhandler!=NULL,"queue==NULL && rrhandler==NULL"); 00176 if (queue!=NULL) { 00177 return queue->numCompoundsForCid(cid); 00178 } else { 00179 assure(rrStorage!=NULL,"rrStorage is invalid"); 00180 return rrStorage->numCompoundsForCid(cid); 00181 } 00182 } 00183 00184 unsigned long int 00185 QueueProxy::numBitsForCid(wns::scheduler::ConnectionID cid) const 00186 { 00187 assure(queue!=NULL || rrhandler!=NULL,"queue==NULL && rrhandler==NULL"); 00188 if (queue!=NULL) { 00189 return queue->numBitsForCid(cid); 00190 } else { 00191 assure(rrStorage!=NULL,"rrStorage is invalid"); 00192 return rrStorage->numBitsForCid(cid); 00193 } 00194 } 00195 00196 wns::scheduler::QueueStatusContainer 00197 QueueProxy::getQueueStatus() const 00198 { 00199 assure(queue!=NULL || rrhandler!=NULL,"queue==NULL && rrhandler==NULL"); 00200 if (queue!=NULL) { 00201 return queue->getQueueStatus(); 00202 } else { 00203 assure(rrStorage!=NULL,"rrStorage is invalid"); 00204 return rrStorage->getQueueStatus(); 00205 } 00206 } 00207 00208 wns::ldk::CompoundPtr 00209 QueueProxy::getHeadOfLinePDU(wns::scheduler::ConnectionID cid) 00210 { 00211 assure(queue!=NULL || rrhandler!=NULL,"queue==NULL && rrhandler==NULL"); 00212 if (queue!=NULL) { 00213 return queue->getHeadOfLinePDU(cid); 00214 } else { 00215 // this case is deprecated. We support dynamicSegmentation, so why should we be asked for an arbitrary pdu? 00216 assure(rrhandler!=NULL,"rrhandler is invalid"); 00217 assure(rrStorage!=NULL,"rrStorage is invalid"); 00218 //wns::scheduler::UserID destinationUser = colleagues.registry->getUserForCID(cid); 00219 wns::service::dll::UnicastAddress destinationAddress 00220 = colleagues.registry->getPeerAddressForCID(cid); 00221 //= layer2->getStationManager()->getStationByNode(user)->getDLLAddress(); 00222 uint32_t bitsPerPDU = rrhandler->getDefaultBitsPerPDU(); 00223 wns::service::dll::FlowID flowID = cid; 00224 int servedBits = rrStorage->decrementRequest(flowID, bitsPerPDU); 00225 return rrhandler->createFakePDU(destinationAddress, servedBits, flowID); 00226 } 00227 } 00228 00229 int 00230 QueueProxy::getHeadOfLinePDUbits(wns::scheduler::ConnectionID cid) 00231 { 00232 assure(queue!=NULL || rrhandler!=NULL,"queue==NULL && rrhandler==NULL"); 00233 if (queue!=NULL) { 00234 return queue->getHeadOfLinePDUbits(cid); 00235 } else { 00236 assure(rrhandler!=NULL,"rrhandler is invalid"); 00237 assure(rrStorage!=NULL,"rrStorage is invalid"); 00238 uint32_t defaultBitsPerPDU = rrhandler->getDefaultBitsPerPDU(); // constant. No concrete knowledge available 00239 uint32_t currentBits = rrStorage->numBitsForCid(cid); 00240 uint32_t headOfLinePDUbits = currentBits; 00241 MESSAGE_SINGLE(NORMAL, logger, "getHeadOfLinePDUbits(cid="<<cid<<"): default="<<defaultBitsPerPDU<<", current="<<currentBits<<" => "<<headOfLinePDUbits<<" bits"); 00242 return headOfLinePDUbits; 00243 } 00244 } 00245 00246 bool 00247 QueueProxy::isEmpty() const 00248 { 00249 if (queue!=NULL) { 00250 return queue->isEmpty(); 00251 } else { 00252 assure(rrStorage!=NULL,"rrStorage is invalid"); 00253 return rrStorage->isEmpty(); 00254 } 00255 } 00256 00257 bool 00258 QueueProxy::hasQueue(wns::scheduler::ConnectionID cid) 00259 { 00260 if (queue!=NULL) { 00261 return queue->hasQueue(cid); 00262 } else { 00263 assure(rrStorage!=NULL,"rrStorage is invalid"); 00264 return rrStorage->knowsFlow(cid); 00265 } 00266 } 00267 00268 bool 00269 QueueProxy::queueHasPDUs(wns::scheduler::ConnectionID cid) const 00270 { 00271 if (queue!=NULL) { 00272 return queue->queueHasPDUs(cid); 00273 } else { 00274 assure(rrStorage!=NULL,"rrStorage is invalid"); 00275 return (rrStorage->numCompoundsForCid(cid) > 0); 00276 } 00277 } 00278 00279 wns::scheduler::ConnectionSet 00280 QueueProxy::filterQueuedCids(wns::scheduler::ConnectionSet connections) 00281 { 00282 if (queue!=NULL) { 00283 return queue->filterQueuedCids(connections); 00284 } else { 00285 assure(rrStorage!=NULL,"rrStorage is invalid"); 00286 return rrStorage->filterActiveConnections(connections); 00287 } 00288 } 00289 00290 QueueInterface::ProbeOutput 00291 QueueProxy::resetAllQueues() 00292 { 00293 if (queue!=NULL) { 00294 ProbeOutput probeOutput = queue->resetAllQueues(); 00295 //probeOutput.bits = p.bits; 00296 //probeOutput.compounds = p.compounds; 00297 return probeOutput; 00298 } else { 00299 ProbeOutput probeOutput; 00300 wns::scheduler::ConnectionSet allActiveConnections = rrStorage->getActiveConnections(); 00301 for (ConnectionSet::const_iterator iter = allActiveConnections.begin(); iter != allActiveConnections.end(); ) 00302 { 00303 wns::scheduler::ConnectionID flowId = (*iter); 00304 int priority = colleagues.registry->getPriorityForConnection(flowId); 00305 writeProbe(flowId, priority); 00306 probeOutput.bits += numBitsForCid(flowId); 00307 probeOutput.compounds += numCompoundsForCid(flowId); 00308 rrStorage->resetFlow(flowId); 00309 } 00310 return probeOutput; 00311 } 00312 } 00313 00314 QueueInterface::ProbeOutput 00315 QueueProxy::resetQueues(wns::scheduler::UserID user) 00316 { 00317 if (queue!=NULL) { 00318 ProbeOutput probeOutput = queue->resetQueues(user); 00319 return probeOutput; 00320 } else { 00321 ProbeOutput probeOutput; 00322 //probeOutput.bits = numBitsForUser(user); 00323 //probeOutput.compounds = numCompoundsForUser(user); 00324 probeOutput.bits = 0; 00325 probeOutput.compounds = 0; 00326 assure(rrStorage!=NULL,"rrStorage is invalid"); 00327 rrStorage->resetUser(user); 00328 return probeOutput; 00329 } 00330 } 00331 00332 QueueInterface::ProbeOutput 00333 QueueProxy::resetQueue(wns::scheduler::ConnectionID cid) 00334 { 00335 if (queue!=NULL) { 00336 ProbeOutput probeOutput = queue->resetQueue(cid); 00337 //probeOutput.bits = p.bits; 00338 //probeOutput.compounds = p.compounds; 00339 probeOutput.bits = 0; 00340 probeOutput.compounds = 0; 00341 return probeOutput; 00342 } else { 00343 ProbeOutput probeOutput; 00344 probeOutput.bits = numBitsForCid(cid); 00345 probeOutput.compounds = numCompoundsForCid(cid); 00346 assure(rrStorage!=NULL,"rrStorage is invalid"); 00347 rrStorage->resetFlow(cid); 00348 return probeOutput; 00349 } 00350 } 00351 00352 void 00353 QueueProxy::frameStarts() 00354 { 00355 if (queue!=NULL) { 00356 queue->frameStarts(); 00357 } 00358 } 00359 00360 std::string 00361 QueueProxy::printAllQueues() 00362 { 00363 if (queue!=NULL) 00364 return queue->printAllQueues(); 00365 else if (rrhandler!=NULL && rrStorage!=NULL) 00366 return rrStorage->printQueueStatus(); 00367 else 00368 return "ERROR"; 00369 } 00370 00371 bool 00372 QueueProxy::supportsDynamicSegmentation() const 00373 { 00374 MESSAGE_SINGLE(NORMAL, logger, "supportsDynamicSegmentation() = "<<(rrhandler!=NULL)); 00375 return (rrhandler!=NULL); // true for UL Master Scheduler 00376 } 00377 00378 wns::ldk::CompoundPtr 00379 QueueProxy::getHeadOfLinePDUSegment(wns::scheduler::ConnectionID cid, int bits) 00380 { 00381 if (queue!=NULL) { 00382 throw wns::Exception("getHeadOfLinePDUSegment() is unsupported"); 00383 return wns::ldk::CompoundPtr(); 00384 } else { 00385 assure(rrhandler!=NULL,"rrhandler is invalid"); 00386 assure(rrStorage!=NULL,"rrStorage is invalid"); 00387 wns::service::dll::UnicastAddress destinationAddress 00388 = colleagues.registry->getPeerAddressForCID(cid); 00389 int servedBits = rrStorage->decrementRequest(cid, bits); 00390 wns::service::dll::FlowID flowID = cid; 00391 MESSAGE_SINGLE(NORMAL, logger, "getHeadOfLinePDUSegment(cid="<<cid<<",bits="<<bits<<"): "<<servedBits<<" bits"); 00392 //return rrhandler->createFakePDU(destinationAddress, servedBits, flowID); 00393 // Try to send NULL (empty compound): 00394 return wns::ldk::CompoundPtr(); 00395 } 00396 } 00397 00398 int 00399 QueueProxy::getMinimumSegmentSize() const 00400 { 00401 if (queue!=NULL) { 00402 throw wns::Exception("getMinimumSegmentSize() is unsupported"); 00403 return 0; 00404 } else { 00405 assure(rrhandler!=NULL,"rrhandler is invalid"); 00406 return rrhandler->getDefaultBitsPerPDU();; 00407 } 00408 } 00409 00410 std::queue<wns::ldk::CompoundPtr> 00411 QueueProxy::getQueueCopy(wns::scheduler::ConnectionID cid) 00412 { 00413 wns::Exception("You should not call getQueueCopy of the QueueProxy."); 00414 } 00415
1.5.5