![]() |
User Manual, Developers Guide and API Documentation |
![]() |
00001 /****************************************************************************** 00002 * WinProSt Protocol Stack) * 00003 * __________________________________________________________________________ * 00004 * * 00005 * Copyright (C) 2004 * 00006 * Lehrstuhl f?r Kommunikationsnetze (ComNets) * 00007 * Kopernikusstr. 16, D-52074 Aachen, Germany * 00008 * phone: ++49-241-80-27910 (phone), fax: ++49-241-80-22242 * 00009 * email: msg@comnets.de, www: http://winner.comnets.rwth-aachen.de/~msg * 00010 ******************************************************************************/ 00011 00012 #include <WNS/ldk/sar/SegAndConcat.hpp> 00013 #include <WNS/service/dll/FlowID.hpp> 00014 #include <WNS/ldk/Layer.hpp> 00015 00016 #include <boost/bind.hpp> 00017 #include <boost/function.hpp> 00018 00019 using namespace wns::ldk::sar; 00020 00021 STATIC_FACTORY_REGISTER_WITH_CREATOR( 00022 SegAndConcat, 00023 wns::ldk::FunctionalUnit, "wns.sar.SegAndConcat", 00024 wns::ldk::FUNConfigCreator); 00025 00026 SegAndConcat::SegAndConcat(wns::ldk::fun::FUN* fun, 00027 const wns::pyconfig::View& config): 00028 wns::ldk::CommandTypeSpecifier<SegAndConcatCommand>(fun), 00029 logger_(config.get("logger")), 00030 commandName_(config.get<std::string>("commandName")), 00031 segmentSize_(config.get<Bit>("segmentSize")), 00032 headerSize_(config.get<Bit>("headerSize")), 00033 sduLengthAddition_(config.get<Bit>("sduLengthAddition")), 00034 nextOutgoingSN_(0), 00035 reorderingWindow_(config.get("reorderingWindow")), 00036 isSegmenting_(config.get<bool>("isSegmenting")), 00037 segmentDropRatioProbeName_(config.get<std::string>("segmentDropRatioProbeName")) 00038 { 00039 reorderingWindow_.connectToReassemblySignal(boost::bind(&SegAndConcat::onReorderedPDU, this, _1, _2)); 00040 reorderingWindow_.connectToDiscardSignal(boost::bind(&SegAndConcat::onDiscardedPDU, this, _1, _2)); 00041 00042 wns::probe::bus::ContextProviderCollection* cpcParent = &fun->getLayer()->getContextProviderCollection(); 00043 wns::probe::bus::ContextProviderCollection cpc(cpcParent); 00044 00045 segmentDropRatioCC_ = wns::probe::bus::ContextCollectorPtr( 00046 new wns::probe::bus::ContextCollector(cpc, segmentDropRatioProbeName_)); 00047 00048 if(!config.isNone("delayProbeName")) 00049 { 00050 std::string delayProbeName = config.get<std::string>("delayProbeName"); 00051 minDelayCC_ = wns::probe::bus::ContextCollectorPtr( 00052 new wns::probe::bus::ContextCollector(cpc, 00053 delayProbeName + ".minDelay")); 00054 maxDelayCC_ = wns::probe::bus::ContextCollectorPtr( 00055 new wns::probe::bus::ContextCollector(cpc, 00056 delayProbeName + ".maxDelay")); 00057 sizeCC_ = wns::probe::bus::ContextCollectorPtr( 00058 new wns::probe::bus::ContextCollector(cpc, 00059 delayProbeName + ".stop.compoundSize")); 00060 00061 // Same name as the probe prefix 00062 probeHeaderReader_ = fun->getCommandReader(delayProbeName); 00063 00064 reassemblyBuffer_.enableDelayProbing(minDelayCC_, maxDelayCC_, probeHeaderReader_); 00065 } 00066 } 00067 00068 SegAndConcat::SegAndConcat(const SegAndConcat& other): 00069 wns::ldk::CommandTypeSpecifier<SegAndConcatCommand>(other.getFUN()), 00070 logger_(other.logger_), 00071 commandName_(other.commandName_), 00072 segmentSize_(other.segmentSize_), 00073 headerSize_(other.headerSize_), 00074 sduLengthAddition_(other.sduLengthAddition_), 00075 nextOutgoingSN_(other.nextOutgoingSN_), 00076 reorderingWindow_(other.reorderingWindow_), 00077 reassemblyBuffer_(other.reassemblyBuffer_), 00078 isSegmenting_(other.isSegmenting_), 00079 segmentDropRatioCC_(wns::probe::bus::ContextCollectorPtr( 00080 new wns::probe::bus::ContextCollector(*other.segmentDropRatioCC_))), 00081 minDelayCC_(wns::probe::bus::ContextCollectorPtr( 00082 new wns::probe::bus::ContextCollector(*other.minDelayCC_))), 00083 maxDelayCC_(wns::probe::bus::ContextCollectorPtr( 00084 new wns::probe::bus::ContextCollector(*other.minDelayCC_))), 00085 sizeCC_(wns::probe::bus::ContextCollectorPtr( 00086 new wns::probe::bus::ContextCollector(*other.sizeCC_))), 00087 probeHeaderReader_(other.probeHeaderReader_) 00088 { 00089 reorderingWindow_.connectToReassemblySignal(boost::bind(&SegAndConcat::onReorderedPDU, this, _1, _2)); 00090 00091 reorderingWindow_.connectToDiscardSignal(boost::bind(&SegAndConcat::onDiscardedPDU, this, _1, _2)); 00092 } 00093 00094 SegAndConcat::~SegAndConcat() 00095 { 00096 } 00097 00098 void 00099 SegAndConcat::onFUNCreated() 00100 { 00101 MESSAGE_SINGLE(NORMAL, logger_, "SegAndConcat::onFUNCreated()"); 00102 reassemblyBuffer_.initialize(getFUN()->getCommandReader(commandName_)); 00103 } 00104 00105 wns::ldk::CommandReaderInterface* 00106 SegAndConcat::getCommandReader() const 00107 { 00108 return getFUN()->getCommandReader(commandName_); 00109 } 00110 00111 void 00112 SegAndConcat::processIncoming(const wns::ldk::CompoundPtr& compound) 00113 { 00114 wns::ldk::CommandPool* commandPool = compound->getCommandPool(); 00115 00116 SegAndConcatCommand* command; 00117 command = getCommand(commandPool); 00118 00119 reorderingWindow_.onSegment(command->peer.sn_, compound); 00120 } 00121 00122 void 00123 SegAndConcat::processOutgoing(const wns::ldk::CompoundPtr& sdu) 00124 { 00125 if (!isSegmenting_) 00126 { 00127 this->senderPendingSegments_.push_back(sdu); 00128 MESSAGE_SINGLE(NORMAL, logger_, "Adding one SDU with " << sdu->getLengthInBits() 00129 << " bits to pending segments. Segmenting disabled."); 00130 return; 00131 } 00132 00133 Bit sduPCISize = 0; 00134 Bit sduDataSize = 0; 00135 Bit sduTotalSize = 0; 00136 Bit cumSize = 0; 00137 Bit nextSegmentSize = 0; 00138 00139 wns::ldk::CommandPool* commandPool = sdu->getCommandPool(); 00140 getFUN()->calculateSizes(commandPool, sduPCISize, sduDataSize); 00141 sduTotalSize = sduPCISize + sduDataSize + sduLengthAddition_; 00142 00143 bool isBegin = true; 00144 bool isEnd = false; 00145 00146 while(cumSize < sduTotalSize) 00147 { 00148 cumSize += segmentSize_; 00149 if (cumSize >= sduTotalSize) 00150 { 00151 nextSegmentSize = sduTotalSize - (cumSize - segmentSize_); 00152 isEnd = true; 00153 } 00154 else 00155 { 00156 nextSegmentSize = segmentSize_; 00157 } 00158 00159 // Prepare segment 00160 SegAndConcatCommand* command = NULL; 00161 00162 wns::ldk::CompoundPtr nextSegment(new wns::ldk::Compound(getFUN()->getProxy()->createCommandPool())); 00163 command = activateCommand(nextSegment->getCommandPool()); 00164 command->setSequenceNumber(nextOutgoingSN_); 00165 command->addSDU(sdu->copy()); 00166 nextOutgoingSN_ += 1; 00167 00168 isBegin ? command->setBeginFlag() : command->clearBeginFlag(); 00169 isEnd ? command->setEndFlag() : command->clearEndFlag(); 00170 00171 command->increaseDataSize(nextSegmentSize); 00172 command->increaseHeaderSize(headerSize_); 00173 this->commitSizes(nextSegment->getCommandPool()); 00174 this->senderPendingSegments_.push_back(nextSegment); 00175 00176 isBegin = false; 00177 } 00178 } 00179 00180 void 00181 SegAndConcat::onReorderedPDU(long sn, wns::ldk::CompoundPtr c) 00182 { 00183 MESSAGE_SINGLE(NORMAL, logger_, "onReorderedPDU(sn=" << sn << "):"); 00184 if (!reassemblyBuffer_.isNextExpectedSegment(c)) 00185 { 00186 // Segment missing 00187 MESSAGE_SINGLE(NORMAL, logger_, "onReorderedPDU: PDU " << reassemblyBuffer_.getNextExpectedSN() 00188 << " is missing. Clearing reassembly buffer."); 00189 00190 for(size_t ii=0; ii < reassemblyBuffer_.size(); ++ii) 00191 { 00192 segmentDropRatioCC_->put(1.0); 00193 } 00194 reassemblyBuffer_.clear(); 00195 } 00196 00197 if (reassemblyBuffer_.accepts(c)) 00198 { 00199 MESSAGE_SINGLE(NORMAL, logger_, "onReorderedPDU: Putting PDU " 00200 << getCommand(c->getCommandPool())->peer.sn_ << " of size " 00201 << getCommand(c->getCommandPool())->totalSize() << " bits into reassembly buffer"); 00202 reassemblyBuffer_.insert(c); 00203 } 00204 else 00205 { 00206 MESSAGE_SINGLE(NORMAL, logger_, "onReorderedPDU: Dropping PDU " 00207 << getCommand(c->getCommandPool())->peer.sn_ << ". isBegin=False."); 00208 } 00209 00210 reassembly::ReassemblyBuffer::SegmentContainer sc; 00211 MESSAGE_SINGLE(VERBOSE, logger_, reassemblyBuffer_.dump()); 00212 00213 int numberOfReassembledSegments = 0; 00214 sc = reassemblyBuffer_.getReassembledSegments(numberOfReassembledSegments); 00215 00216 for (int ii=0; ii < numberOfReassembledSegments; ++ii) 00217 { 00218 segmentDropRatioCC_->put(0.0); 00219 } 00220 00221 MESSAGE_SINGLE(NORMAL, logger_, "reassemble: getReassembledSegments() sc.size()=" << sc.size()); 00222 00223 if (getDeliverer()->size() > 0) 00224 { 00225 reassembly::ReassemblyBuffer::SegmentContainer::iterator it; 00226 for (it=sc.begin(); it!=sc.end(); ++it) 00227 { 00228 MESSAGE_SINGLE(NORMAL, logger_, "reassemble: Passing " << (*it)->getLengthInBits() 00229 << " bits to upper FU."); 00230 // This sends the complete PDU upwards: 00231 getDeliverer()->getAcceptor( (*it) )->onData( (*it) ); 00232 if(sizeCC_ != NULL) 00233 sizeCC_->put((*it)->getLengthInBits()); 00234 } 00235 } 00236 else 00237 { 00238 MESSAGE_SINGLE(NORMAL, logger_, "reassemble: No upper FU available."); 00239 } 00240 } 00241 00242 void 00243 SegAndConcat::onDiscardedPDU(long, wns::ldk::CompoundPtr) 00244 { 00245 segmentDropRatioCC_->put(1.0); 00246 } 00247 00248 bool 00249 SegAndConcat::hasCapacity() const 00250 { 00251 return (senderPendingSegments_.empty()); 00252 } 00253 00254 const wns::ldk::CompoundPtr 00255 SegAndConcat::hasSomethingToSend() const 00256 { 00257 if (!senderPendingSegments_.empty()) 00258 { 00259 return senderPendingSegments_.front(); 00260 } 00261 else 00262 { 00263 return wns::ldk::CompoundPtr(); 00264 } 00265 } 00266 00267 wns::ldk::CompoundPtr 00268 SegAndConcat::getSomethingToSend() 00269 { 00270 assure(hasSomethingToSend(), "getSomethingToSend although nothing to send"); 00271 wns::ldk::CompoundPtr compound = senderPendingSegments_.front(); 00272 00273 if (isSegmenting_) 00274 { 00275 MESSAGE_SINGLE(NORMAL, logger_, "getSomethingToSend: Passing segment " 00276 << getCommand(compound->getCommandPool())->peer.sn_ << " of size " 00277 << (getCommand(compound->getCommandPool())->totalSize()) << " bits to lower layer"); 00278 } 00279 00280 senderPendingSegments_.pop_front(); 00281 return compound; 00282 } 00283 00284 void 00285 SegAndConcat::calculateSizes(const wns::ldk::CommandPool* commandPool, Bit& commandPoolSize, Bit& dataSize) const 00286 { 00287 SegAndConcatCommand* command; 00288 command = getCommand(commandPool); 00289 00290 commandPoolSize = command->peer.headerSize_; 00291 dataSize = command->peer.dataSize_ + command->peer.paddingSize_; 00292 }
1.5.5