User Manual, Developers Guide and API Documentation

QueueProxy.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002  * This file is part of openWNS (open Wireless Network Simulator)
00003  * _____________________________________________________________________________
00004  *
00005  * Copyright (C) 2004-2007
00006  * Chair of Communication Networks (ComNets)
00007  * Kopernikusstr. 5, D-52074 Aachen, Germany
00008  * phone: ++49-241-80-27910,
00009  * fax: ++49-241-80-22242
00010  * email: info@openwns.org
00011  * www: http://www.openwns.org
00012  * _____________________________________________________________________________
00013  *
00014  * openWNS is free software; you can redistribute it and/or modify it under the
00015  * terms of the GNU Lesser General Public License version 2 as published by the
00016  * Free Software Foundation;
00017  *
00018  * openWNS is distributed in the hope that it will be useful, but WITHOUT ANY
00019  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
00020  * A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00021  * details.
00022  *
00023  * You should have received a copy of the GNU Lesser General Public License
00024  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
00025  *
00026  ******************************************************************************/
00027 
00028 #include <LTE/helper/QueueProxy.hpp>
00029 #include <LTE/timing/RegistryProxy.hpp>
00030 #include <WNS/probe/bus/utils.hpp>
00031 
00032 using namespace lte::helper;
00033 using namespace wns::scheduler;
00034 using namespace wns::scheduler::queue;
00035 
00036 STATIC_FACTORY_REGISTER_WITH_CREATOR(QueueProxy,
00037                                      wns::scheduler::queue::QueueInterface,
00038                                      "lte.QueueProxy",
00039                                      wns::HasReceptorConfigCreator);
00040 
00041 QueueProxy::QueueProxy(wns::ldk::HasReceptorInterface* hasReceptor, const wns::pyconfig::View& _config)
00042     : QueueInterface(),
00043       queue(NULL), //queue(config),
00044       rrhandler(NULL),
00045       rrStorage(NULL),
00046       queueIsExternal(true),
00047       logger(_config.get("logger")),
00048       config(_config),
00049       probeContextProviderForCid(NULL),
00050       probeContextProviderForPriority(NULL)
00051 {
00052     // make a real queue if it is required so (internal, not external):
00053     if (!queueIsExternal) queue = new wns::scheduler::queue::SimpleQueue(hasReceptor, config);
00054     MESSAGE_SINGLE(NORMAL, logger, "QueueProxy::QueueProxy()");
00055 }
00056 
00057 QueueProxy::~QueueProxy()
00058 {
00059     MESSAGE_SINGLE(VERBOSE, logger, "QueueProxy::~QueueProxy()");
00060     if (queue!=NULL && !queueIsExternal) delete queue;
00061     if (probeContextProviderForCid) { delete probeContextProviderForCid; }
00062     if (probeContextProviderForPriority) { delete probeContextProviderForPriority; }
00063 }
00064 
00065 void
00066 QueueProxy::setColleagues(RegistryProxyInterface* _registry) {
00067     colleagues.registry = _registry;
00068     assure(colleagues.registry != NULL,"colleagues.registry==NULL");
00069     if (queue!=NULL) {
00070         // Forward call to internal queue
00071         queue->setColleagues(_registry);
00072     }
00073 }
00074 
00075 void
00076 QueueProxy::setFUN(wns::ldk::fun::FUN* fun)
00077 {
00078     MESSAGE_SINGLE(NORMAL, logger, "QueueProxy::setFUN()");
00079     assure(fun!=NULL, "fun==NULL");
00080     assure(queue!=NULL || rrhandler!=NULL,"need at least queue or rrhandler");
00081     if (queue!=NULL) queue->setFUN(fun);
00082     if (rrhandler!=NULL) { // only needed in this case; queue has its own probes
00083         // Initialization of probes...
00084         wns::probe::bus::ContextProviderCollection localContext(&fun->getLayer()->getContextProviderCollection());
00085         for (int ii = 0; ii<config.len("localIDs.keys()"); ++ii)
00086         {
00087             std::string key = config.get<std::string>("localIDs.keys()",ii);
00088             uint32_t value  = config.get<uint32_t>("localIDs.values()",ii);
00089             localContext.addProvider( wns::probe::bus::contextprovider::Constant(key, value) );
00090         }
00091         probeContextProviderForCid  = new wns::probe::bus::contextprovider::Variable("cid", 0);
00092         probeContextProviderForPriority  = new wns::probe::bus::contextprovider::Variable("MAC.QoSClass", 0);
00093         localContext.addProvider(wns::probe::bus::contextprovider::Container(probeContextProviderForCid));
00094         localContext.addProvider(wns::probe::bus::contextprovider::Container(probeContextProviderForPriority));
00095         std::string sizeProbeName = config.get<std::string>("sizeProbeName");
00096         sizeProbeBus = wns::probe::bus::ContextCollectorPtr(new wns::probe::bus::ContextCollector(localContext, sizeProbeName));
00097     }
00098 }
00099 
00100 void
00101 QueueProxy::setQueue(wns::scheduler::queue::QueueInterface* _queue)
00102 {
00103     assure(rrhandler==NULL,"you can either set Queue or RRHandler. Not both!");
00104     if (queue!=NULL) delete queue; // old object
00105     queueIsExternal = true; // don't delete in destructor later
00106     queue=_queue;
00107     MESSAGE_SINGLE(NORMAL, logger, "configured as proxy to queue");
00108 }
00109 
00110 void
00111 QueueProxy::setRRHandler(lte::controlplane::RRHandler* _rrhandler)
00112 {
00113     assure(queue==NULL,"you can either set Queue or RRHandler. Not both!");
00114     rrhandler = dynamic_cast<lte::controlplane::RRHandlerBS*>(_rrhandler);
00115     assure(rrhandler!=NULL,"rrhandler is invalid");
00116     MESSAGE_SINGLE(NORMAL, logger, "configured as proxy to RRHandler");
00117     rrStorage = rrhandler->getRRStorage();
00118     assure(rrStorage!=NULL,"rrStorage is invalid");
00119 }
00120 
00121 bool
00122 QueueProxy::isAccepting(const wns::ldk::CompoundPtr&  compound ) const
00123 {
00124     assure(rrhandler==NULL,"isAccepting() cannot be used in RRHandler proxy mode");
00125     assure(queue!=NULL,"queue==NULL");
00126     return queue->isAccepting(compound);
00127 }
00128 
00129 void
00130 QueueProxy::put(const wns::ldk::CompoundPtr&  compound ) {
00131     assure(rrhandler==NULL,"put() cannot be used in RRHandler proxy mode. This compound's journey:\n"<<compound->dumpJourney());
00132     assure(queue!=NULL,"queue==NULL");
00133     queue->put(compound);
00134     // no probing required in this case
00135 }
00136 
00137 wns::scheduler::UserSet
00138 QueueProxy::getQueuedUsers() const
00139 {
00140     assure(queue!=NULL || rrhandler!=NULL,"queue==NULL && rrhandler==NULL");
00141     if (queue!=NULL) {
00142         return queue->getQueuedUsers();
00143     } else {
00144         assure(rrStorage!=NULL,"rrStorage is invalid");
00145         return rrStorage->getActiveUsers();
00146     }
00147 }
00148 
00149 wns::scheduler::ConnectionSet
00150 QueueProxy::getActiveConnections() const
00151 {
00152     assure(queue!=NULL || rrhandler!=NULL,"queue==NULL && rrhandler==NULL");
00153     if (queue!=NULL) {
00154         return queue->getActiveConnections();
00155     } else {
00156         assure(rrStorage!=NULL,"rrStorage is invalid");
00157         return rrStorage->getActiveConnections();
00158     }
00159 }
00160 
00161 void
00162 QueueProxy::writeProbe(wns::scheduler::ConnectionID cid, unsigned int priority) const
00163 {
00164     if (probeContextProviderForCid && probeContextProviderForPriority && sizeProbeBus) {
00165         probeContextProviderForCid->set(cid /*int context*/);
00166         probeContextProviderForPriority->set(priority);
00167         double bits = numBitsForCid(cid);
00168         sizeProbeBus->put(bits); // absolute
00169     }
00170 }
00171 
00172 unsigned long int
00173 QueueProxy::numCompoundsForCid(wns::scheduler::ConnectionID cid) const
00174 {
00175     assure(queue!=NULL || rrhandler!=NULL,"queue==NULL && rrhandler==NULL");
00176     if (queue!=NULL) {
00177         return queue->numCompoundsForCid(cid);
00178     } else {
00179         assure(rrStorage!=NULL,"rrStorage is invalid");
00180         return rrStorage->numCompoundsForCid(cid);
00181     }
00182 }
00183 
00184 unsigned long int
00185 QueueProxy::numBitsForCid(wns::scheduler::ConnectionID cid) const
00186 {
00187     assure(queue!=NULL || rrhandler!=NULL,"queue==NULL && rrhandler==NULL");
00188     if (queue!=NULL) {
00189         return queue->numBitsForCid(cid);
00190     } else {
00191         assure(rrStorage!=NULL,"rrStorage is invalid");
00192         return rrStorage->numBitsForCid(cid);
00193     }
00194 }
00195 
00196 wns::scheduler::QueueStatusContainer
00197 QueueProxy::getQueueStatus() const
00198 {
00199     assure(queue!=NULL || rrhandler!=NULL,"queue==NULL && rrhandler==NULL");
00200     if (queue!=NULL) {
00201         return queue->getQueueStatus();
00202     } else {
00203         assure(rrStorage!=NULL,"rrStorage is invalid");
00204         return rrStorage->getQueueStatus();
00205     }
00206 }
00207 
00208 wns::ldk::CompoundPtr
00209 QueueProxy::getHeadOfLinePDU(wns::scheduler::ConnectionID cid)
00210 {
00211     assure(queue!=NULL || rrhandler!=NULL,"queue==NULL && rrhandler==NULL");
00212     if (queue!=NULL) {
00213         return queue->getHeadOfLinePDU(cid);
00214     } else {
00215         // this case is deprecated. We support dynamicSegmentation, so why should we be asked for an arbitrary pdu?
00216         assure(rrhandler!=NULL,"rrhandler is invalid");
00217         assure(rrStorage!=NULL,"rrStorage is invalid");
00218         //wns::scheduler::UserID destinationUser = colleagues.registry->getUserForCID(cid);
00219         wns::service::dll::UnicastAddress destinationAddress
00220             = colleagues.registry->getPeerAddressForCID(cid);
00221         //= layer2->getStationManager()->getStationByNode(user)->getDLLAddress();
00222         uint32_t bitsPerPDU = rrhandler->getDefaultBitsPerPDU();
00223         wns::service::dll::FlowID flowID = cid;
00224         int servedBits = rrStorage->decrementRequest(flowID, bitsPerPDU);
00225         return rrhandler->createFakePDU(destinationAddress, servedBits, flowID);
00226     }
00227 }
00228 
00229 int
00230 QueueProxy::getHeadOfLinePDUbits(wns::scheduler::ConnectionID cid)
00231 {
00232     assure(queue!=NULL || rrhandler!=NULL,"queue==NULL && rrhandler==NULL");
00233     if (queue!=NULL) {
00234         return queue->getHeadOfLinePDUbits(cid);
00235     } else {
00236         assure(rrhandler!=NULL,"rrhandler is invalid");
00237         assure(rrStorage!=NULL,"rrStorage is invalid");
00238         uint32_t defaultBitsPerPDU = rrhandler->getDefaultBitsPerPDU(); // constant. No concrete knowledge available
00239         uint32_t currentBits = rrStorage->numBitsForCid(cid);
00240         uint32_t headOfLinePDUbits = currentBits;
00241         MESSAGE_SINGLE(NORMAL, logger, "getHeadOfLinePDUbits(cid="<<cid<<"): default="<<defaultBitsPerPDU<<", current="<<currentBits<<" => "<<headOfLinePDUbits<<" bits");
00242         return headOfLinePDUbits;
00243     }
00244 }
00245 
00246 bool
00247 QueueProxy::isEmpty() const
00248 {
00249     if (queue!=NULL) {
00250         return queue->isEmpty();
00251     } else {
00252         assure(rrStorage!=NULL,"rrStorage is invalid");
00253         return rrStorage->isEmpty();
00254     }
00255 }
00256 
00257 bool
00258 QueueProxy::hasQueue(wns::scheduler::ConnectionID cid)
00259 {
00260     if (queue!=NULL) {
00261         return queue->hasQueue(cid);
00262     } else {
00263         assure(rrStorage!=NULL,"rrStorage is invalid");
00264         return rrStorage->knowsFlow(cid);
00265     }
00266 }
00267 
00268 bool
00269 QueueProxy::queueHasPDUs(wns::scheduler::ConnectionID cid) const
00270 {
00271     if (queue!=NULL) {
00272         return queue->queueHasPDUs(cid);
00273     } else {
00274         assure(rrStorage!=NULL,"rrStorage is invalid");
00275         return (rrStorage->numCompoundsForCid(cid) > 0);
00276     }
00277 }
00278 
00279 wns::scheduler::ConnectionSet
00280 QueueProxy::filterQueuedCids(wns::scheduler::ConnectionSet connections)
00281 {
00282     if (queue!=NULL) {
00283         return queue->filterQueuedCids(connections);
00284     } else {
00285         assure(rrStorage!=NULL,"rrStorage is invalid");
00286         return rrStorage->filterActiveConnections(connections);
00287     }
00288 }
00289 
00290 QueueInterface::ProbeOutput
00291 QueueProxy::resetAllQueues()
00292 {
00293     if (queue!=NULL) {
00294         ProbeOutput probeOutput = queue->resetAllQueues();
00295         //probeOutput.bits = p.bits;
00296         //probeOutput.compounds = p.compounds;
00297         return probeOutput;
00298     } else {
00299         ProbeOutput probeOutput;
00300         wns::scheduler::ConnectionSet allActiveConnections = rrStorage->getActiveConnections();
00301         for (ConnectionSet::const_iterator iter = allActiveConnections.begin(); iter != allActiveConnections.end(); )
00302         {
00303             wns::scheduler::ConnectionID flowId = (*iter);
00304             int priority = colleagues.registry->getPriorityForConnection(flowId);
00305             writeProbe(flowId, priority);
00306             probeOutput.bits += numBitsForCid(flowId);
00307             probeOutput.compounds += numCompoundsForCid(flowId);
00308             rrStorage->resetFlow(flowId);
00309         }
00310         return probeOutput;
00311     }
00312 }
00313 
00314 QueueInterface::ProbeOutput
00315 QueueProxy::resetQueues(wns::scheduler::UserID user)
00316 {
00317     if (queue!=NULL) {
00318         ProbeOutput probeOutput = queue->resetQueues(user);
00319         return probeOutput;
00320     } else {
00321         ProbeOutput probeOutput;
00322         //probeOutput.bits = numBitsForUser(user);
00323         //probeOutput.compounds = numCompoundsForUser(user);
00324         probeOutput.bits = 0;
00325         probeOutput.compounds = 0;
00326         assure(rrStorage!=NULL,"rrStorage is invalid");
00327         rrStorage->resetUser(user);
00328         return probeOutput;
00329     }
00330 }
00331 
00332 QueueInterface::ProbeOutput
00333 QueueProxy::resetQueue(wns::scheduler::ConnectionID cid)
00334 {
00335     if (queue!=NULL) {
00336         ProbeOutput probeOutput = queue->resetQueue(cid);
00337         //probeOutput.bits = p.bits;
00338         //probeOutput.compounds = p.compounds;
00339         probeOutput.bits = 0;
00340         probeOutput.compounds = 0;
00341         return probeOutput;
00342     } else {
00343         ProbeOutput probeOutput;
00344         probeOutput.bits = numBitsForCid(cid);
00345         probeOutput.compounds = numCompoundsForCid(cid);
00346         assure(rrStorage!=NULL,"rrStorage is invalid");
00347         rrStorage->resetFlow(cid);
00348         return probeOutput;
00349     }
00350 }
00351 
00352 void
00353 QueueProxy::frameStarts()
00354 {
00355     if (queue!=NULL) {
00356         queue->frameStarts();
00357     }
00358 }
00359 
00360 std::string
00361 QueueProxy::printAllQueues()
00362 {
00363     if (queue!=NULL)
00364         return queue->printAllQueues();
00365     else if (rrhandler!=NULL && rrStorage!=NULL)
00366         return rrStorage->printQueueStatus();
00367     else
00368         return "ERROR";
00369 }
00370 
00371 bool
00372 QueueProxy::supportsDynamicSegmentation() const
00373 {
00374     MESSAGE_SINGLE(NORMAL, logger, "supportsDynamicSegmentation() = "<<(rrhandler!=NULL));
00375     return (rrhandler!=NULL); // true for UL Master Scheduler
00376 }
00377 
00378 wns::ldk::CompoundPtr
00379 QueueProxy::getHeadOfLinePDUSegment(wns::scheduler::ConnectionID cid, int bits)
00380 {
00381     if (queue!=NULL) {
00382         throw wns::Exception("getHeadOfLinePDUSegment() is unsupported");
00383         return wns::ldk::CompoundPtr();
00384     } else {
00385         assure(rrhandler!=NULL,"rrhandler is invalid");
00386         assure(rrStorage!=NULL,"rrStorage is invalid");
00387         wns::service::dll::UnicastAddress destinationAddress
00388             = colleagues.registry->getPeerAddressForCID(cid);
00389         int servedBits = rrStorage->decrementRequest(cid, bits);
00390         wns::service::dll::FlowID flowID = cid;
00391         MESSAGE_SINGLE(NORMAL, logger, "getHeadOfLinePDUSegment(cid="<<cid<<",bits="<<bits<<"): "<<servedBits<<" bits");
00392         //return rrhandler->createFakePDU(destinationAddress, servedBits, flowID);
00393         // Try to send NULL (empty compound):
00394         return wns::ldk::CompoundPtr();
00395     }
00396 }
00397 
00398 int
00399 QueueProxy::getMinimumSegmentSize() const
00400 {
00401     if (queue!=NULL) {
00402         throw wns::Exception("getMinimumSegmentSize() is unsupported");
00403         return 0;
00404     } else {
00405         assure(rrhandler!=NULL,"rrhandler is invalid");
00406         return rrhandler->getDefaultBitsPerPDU();;
00407     }
00408 }
00409 
00410 std::queue<wns::ldk::CompoundPtr> 
00411 QueueProxy::getQueueCopy(wns::scheduler::ConnectionID cid)
00412 { 
00413     wns::Exception("You should not call getQueueCopy of the QueueProxy."); 
00414 }
00415 

Generated on Sun May 27 03:31:53 2012 for openWNS by  doxygen 1.5.5