User Manual, Developers Guide and API Documentation

SegAndConcat.cpp

Go to the documentation of this file.
00001 /******************************************************************************
00002  * WinProSt Protocol Stack)                                                   *
00003  * __________________________________________________________________________ *
00004  *                                                                            *
00005  * Copyright (C) 2004                                                         *
00006  * Lehrstuhl f?r Kommunikationsnetze (ComNets)                                *
00007  * Kopernikusstr. 16, D-52074 Aachen, Germany                                 *
00008  * phone: ++49-241-80-27910 (phone), fax: ++49-241-80-22242                   *
00009  * email: msg@comnets.de, www: http://winner.comnets.rwth-aachen.de/~msg      *
00010  ******************************************************************************/
00011 
00012 #include <WNS/ldk/sar/SegAndConcat.hpp>
00013 #include <WNS/service/dll/FlowID.hpp>
00014 #include <WNS/ldk/Layer.hpp>
00015 
00016 #include <boost/bind.hpp>
00017 #include <boost/function.hpp>
00018 
00019 using namespace wns::ldk::sar;
00020 
00021 STATIC_FACTORY_REGISTER_WITH_CREATOR(
00022     SegAndConcat,
00023     wns::ldk::FunctionalUnit, "wns.sar.SegAndConcat",
00024     wns::ldk::FUNConfigCreator);
00025 
00026 SegAndConcat::SegAndConcat(wns::ldk::fun::FUN* fun,
00027                                        const wns::pyconfig::View& config):
00028     wns::ldk::CommandTypeSpecifier<SegAndConcatCommand>(fun),
00029     logger_(config.get("logger")),
00030     commandName_(config.get<std::string>("commandName")),
00031     segmentSize_(config.get<Bit>("segmentSize")),
00032     headerSize_(config.get<Bit>("headerSize")),
00033     sduLengthAddition_(config.get<Bit>("sduLengthAddition")),
00034     nextOutgoingSN_(0),
00035     reorderingWindow_(config.get("reorderingWindow")),
00036     isSegmenting_(config.get<bool>("isSegmenting")),
00037     segmentDropRatioProbeName_(config.get<std::string>("segmentDropRatioProbeName"))
00038 {
00039     reorderingWindow_.connectToReassemblySignal(boost::bind(&SegAndConcat::onReorderedPDU, this, _1, _2));
00040     reorderingWindow_.connectToDiscardSignal(boost::bind(&SegAndConcat::onDiscardedPDU, this, _1, _2));
00041 
00042     wns::probe::bus::ContextProviderCollection* cpcParent = &fun->getLayer()->getContextProviderCollection();
00043     wns::probe::bus::ContextProviderCollection cpc(cpcParent);
00044 
00045     segmentDropRatioCC_ = wns::probe::bus::ContextCollectorPtr(
00046         new wns::probe::bus::ContextCollector(cpc, segmentDropRatioProbeName_));
00047 
00048     if(!config.isNone("delayProbeName"))
00049     {
00050         std::string delayProbeName = config.get<std::string>("delayProbeName");
00051         minDelayCC_ = wns::probe::bus::ContextCollectorPtr(
00052             new wns::probe::bus::ContextCollector(cpc, 
00053                 delayProbeName + ".minDelay"));
00054         maxDelayCC_ = wns::probe::bus::ContextCollectorPtr(
00055             new wns::probe::bus::ContextCollector(cpc, 
00056                 delayProbeName + ".maxDelay"));
00057         sizeCC_ = wns::probe::bus::ContextCollectorPtr(
00058             new wns::probe::bus::ContextCollector(cpc, 
00059                 delayProbeName + ".stop.compoundSize"));
00060     
00061         // Same name as the probe prefix
00062         probeHeaderReader_ = fun->getCommandReader(delayProbeName);
00063 
00064         reassemblyBuffer_.enableDelayProbing(minDelayCC_, maxDelayCC_, probeHeaderReader_);
00065     }
00066 }
00067 
00068 SegAndConcat::SegAndConcat(const SegAndConcat& other):
00069     wns::ldk::CommandTypeSpecifier<SegAndConcatCommand>(other.getFUN()),
00070     logger_(other.logger_),
00071     commandName_(other.commandName_),
00072     segmentSize_(other.segmentSize_),
00073     headerSize_(other.headerSize_),
00074     sduLengthAddition_(other.sduLengthAddition_),
00075     nextOutgoingSN_(other.nextOutgoingSN_),
00076     reorderingWindow_(other.reorderingWindow_),
00077     reassemblyBuffer_(other.reassemblyBuffer_),
00078     isSegmenting_(other.isSegmenting_),
00079     segmentDropRatioCC_(wns::probe::bus::ContextCollectorPtr(
00080         new wns::probe::bus::ContextCollector(*other.segmentDropRatioCC_))),
00081     minDelayCC_(wns::probe::bus::ContextCollectorPtr(
00082         new wns::probe::bus::ContextCollector(*other.minDelayCC_))),
00083     maxDelayCC_(wns::probe::bus::ContextCollectorPtr(
00084         new wns::probe::bus::ContextCollector(*other.minDelayCC_))),
00085     sizeCC_(wns::probe::bus::ContextCollectorPtr(
00086         new wns::probe::bus::ContextCollector(*other.sizeCC_))),
00087     probeHeaderReader_(other.probeHeaderReader_)
00088 {
00089     reorderingWindow_.connectToReassemblySignal(boost::bind(&SegAndConcat::onReorderedPDU, this, _1, _2));
00090 
00091     reorderingWindow_.connectToDiscardSignal(boost::bind(&SegAndConcat::onDiscardedPDU, this, _1, _2));
00092 }
00093 
00094 SegAndConcat::~SegAndConcat()
00095 {
00096 }
00097 
00098 void
00099 SegAndConcat::onFUNCreated()
00100 {
00101     MESSAGE_SINGLE(NORMAL, logger_, "SegAndConcat::onFUNCreated()");
00102     reassemblyBuffer_.initialize(getFUN()->getCommandReader(commandName_));
00103 }
00104 
00105 wns::ldk::CommandReaderInterface*
00106 SegAndConcat::getCommandReader() const
00107 {
00108   return getFUN()->getCommandReader(commandName_);
00109 }
00110 
00111 void
00112 SegAndConcat::processIncoming(const wns::ldk::CompoundPtr& compound)
00113 {
00114     wns::ldk::CommandPool* commandPool = compound->getCommandPool();
00115 
00116     SegAndConcatCommand* command;
00117     command = getCommand(commandPool);
00118 
00119     reorderingWindow_.onSegment(command->peer.sn_, compound);
00120 }
00121 
00122 void
00123 SegAndConcat::processOutgoing(const wns::ldk::CompoundPtr& sdu)
00124 {
00125     if (!isSegmenting_)
00126     {
00127         this->senderPendingSegments_.push_back(sdu);
00128         MESSAGE_SINGLE(NORMAL, logger_, "Adding one SDU with " << sdu->getLengthInBits() 
00129                 << " bits to pending segments. Segmenting disabled.");
00130         return;
00131     }
00132 
00133     Bit sduPCISize = 0;
00134     Bit sduDataSize = 0;
00135     Bit sduTotalSize = 0;
00136     Bit cumSize = 0;
00137     Bit nextSegmentSize = 0;
00138 
00139     wns::ldk::CommandPool* commandPool = sdu->getCommandPool();
00140     getFUN()->calculateSizes(commandPool, sduPCISize, sduDataSize);
00141     sduTotalSize = sduPCISize + sduDataSize + sduLengthAddition_;
00142 
00143     bool isBegin = true;
00144     bool isEnd = false;
00145 
00146     while(cumSize < sduTotalSize)
00147     {
00148         cumSize += segmentSize_;
00149         if (cumSize >= sduTotalSize)
00150         {
00151             nextSegmentSize = sduTotalSize - (cumSize - segmentSize_);
00152             isEnd = true;
00153         }
00154         else
00155         {
00156             nextSegmentSize = segmentSize_;
00157         }
00158 
00159         // Prepare segment
00160         SegAndConcatCommand* command = NULL;
00161 
00162         wns::ldk::CompoundPtr nextSegment(new wns::ldk::Compound(getFUN()->getProxy()->createCommandPool()));
00163         command = activateCommand(nextSegment->getCommandPool());
00164         command->setSequenceNumber(nextOutgoingSN_);
00165         command->addSDU(sdu->copy());
00166         nextOutgoingSN_ += 1;
00167 
00168         isBegin ? command->setBeginFlag() : command->clearBeginFlag();
00169         isEnd ? command->setEndFlag() : command->clearEndFlag();
00170 
00171         command->increaseDataSize(nextSegmentSize);
00172         command->increaseHeaderSize(headerSize_);
00173         this->commitSizes(nextSegment->getCommandPool());
00174         this->senderPendingSegments_.push_back(nextSegment);
00175 
00176         isBegin = false;
00177     }
00178 }
00179 
00180 void
00181 SegAndConcat::onReorderedPDU(long sn, wns::ldk::CompoundPtr c)
00182 {
00183     MESSAGE_SINGLE(NORMAL, logger_, "onReorderedPDU(sn=" << sn << "):");
00184     if (!reassemblyBuffer_.isNextExpectedSegment(c))
00185     {
00186         // Segment missing
00187         MESSAGE_SINGLE(NORMAL, logger_, "onReorderedPDU: PDU " << reassemblyBuffer_.getNextExpectedSN() 
00188             << " is missing. Clearing reassembly buffer.");
00189 
00190         for(size_t ii=0; ii < reassemblyBuffer_.size(); ++ii)
00191         {
00192             segmentDropRatioCC_->put(1.0);
00193         }
00194         reassemblyBuffer_.clear();
00195     }
00196 
00197     if (reassemblyBuffer_.accepts(c))
00198     {
00199         MESSAGE_SINGLE(NORMAL, logger_, "onReorderedPDU: Putting PDU " 
00200             << getCommand(c->getCommandPool())->peer.sn_ << " of size " 
00201             << getCommand(c->getCommandPool())->totalSize() << " bits into reassembly buffer");
00202         reassemblyBuffer_.insert(c);
00203     }
00204     else
00205     {
00206         MESSAGE_SINGLE(NORMAL, logger_, "onReorderedPDU: Dropping PDU " 
00207             << getCommand(c->getCommandPool())->peer.sn_ << ". isBegin=False.");
00208     }
00209 
00210     reassembly::ReassemblyBuffer::SegmentContainer sc;
00211     MESSAGE_SINGLE(VERBOSE, logger_, reassemblyBuffer_.dump());
00212 
00213     int numberOfReassembledSegments = 0;
00214     sc = reassemblyBuffer_.getReassembledSegments(numberOfReassembledSegments);
00215 
00216     for (int ii=0; ii < numberOfReassembledSegments; ++ii)
00217     {
00218         segmentDropRatioCC_->put(0.0);
00219     }
00220 
00221     MESSAGE_SINGLE(NORMAL, logger_, "reassemble: getReassembledSegments() sc.size()=" << sc.size());
00222 
00223     if (getDeliverer()->size() > 0)
00224     {
00225         reassembly::ReassemblyBuffer::SegmentContainer::iterator it;
00226         for (it=sc.begin(); it!=sc.end(); ++it)
00227         {
00228             MESSAGE_SINGLE(NORMAL, logger_, "reassemble: Passing " << (*it)->getLengthInBits() 
00229                 << " bits to upper FU.");
00230             // This sends the complete PDU upwards:
00231             getDeliverer()->getAcceptor( (*it) )->onData( (*it) );
00232             if(sizeCC_ != NULL)
00233                 sizeCC_->put((*it)->getLengthInBits());
00234         }
00235     }
00236     else
00237     {
00238         MESSAGE_SINGLE(NORMAL, logger_, "reassemble: No upper FU available.");
00239     }
00240 }
00241 
00242 void
00243 SegAndConcat::onDiscardedPDU(long, wns::ldk::CompoundPtr)
00244 {
00245     segmentDropRatioCC_->put(1.0);
00246 }
00247 
00248 bool
00249 SegAndConcat::hasCapacity() const
00250 {
00251     return (senderPendingSegments_.empty());
00252 }
00253 
00254 const wns::ldk::CompoundPtr
00255 SegAndConcat::hasSomethingToSend() const
00256 {
00257     if (!senderPendingSegments_.empty())
00258     {
00259         return senderPendingSegments_.front();
00260     }
00261     else
00262     {
00263         return wns::ldk::CompoundPtr();
00264     }
00265 }
00266 
00267 wns::ldk::CompoundPtr
00268 SegAndConcat::getSomethingToSend()
00269 {
00270     assure(hasSomethingToSend(), "getSomethingToSend although nothing to send");
00271     wns::ldk::CompoundPtr compound = senderPendingSegments_.front();
00272 
00273     if (isSegmenting_)
00274     {
00275         MESSAGE_SINGLE(NORMAL, logger_, "getSomethingToSend: Passing segment " 
00276             << getCommand(compound->getCommandPool())->peer.sn_ << " of size " 
00277             << (getCommand(compound->getCommandPool())->totalSize()) << " bits to lower layer");
00278     }
00279 
00280     senderPendingSegments_.pop_front();
00281     return compound;
00282 }
00283 
00284 void
00285 SegAndConcat::calculateSizes(const wns::ldk::CommandPool* commandPool, Bit& commandPoolSize, Bit& dataSize) const
00286 {
00287     SegAndConcatCommand* command;
00288     command = getCommand(commandPool);
00289 
00290     commandPoolSize = command->peer.headerSize_;
00291     dataSize = command->peer.dataSize_ + command->peer.paddingSize_;
00292 }

Generated on Fri May 25 03:31:46 2012 for openWNS by  doxygen 1.5.5