![]() |
User Manual, Developers Guide and API Documentation |
![]() |
00001 /******************************************************************************* 00002 * This file is part of openWNS (open Wireless Network Simulator) 00003 * _____________________________________________________________________________ 00004 * 00005 * Copyright (C) 2004-2007 00006 * Chair of Communication Networks (ComNets) 00007 * Kopernikusstr. 16, D-52074 Aachen, Germany 00008 * phone: ++49-241-80-27910, 00009 * fax: ++49-241-80-22242 00010 * email: info@openwns.org 00011 * www: http://www.openwns.org 00012 * _____________________________________________________________________________ 00013 * 00014 * openWNS is free software; you can redistribute it and/or modify it under the 00015 * terms of the GNU Lesser General Public License version 2 as published by the 00016 * Free Software Foundation; 00017 * 00018 * openWNS is distributed in the hope that it will be useful, but WITHOUT ANY 00019 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 00020 * A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 00021 * details. 00022 * 00023 * You should have received a copy of the GNU Lesser General Public License 00024 * along with this program. If not, see <http://www.gnu.org/licenses/>. 00025 * 00026 ******************************************************************************/ 00027 00028 #include <WNS/module/Base.hpp> 00029 00030 #include <WNS/Exception.hpp> 00031 00032 #include <WNS/StaticFactory.hpp> 00033 #include <WNS/PyConfigViewCreator.hpp> 00034 #include <WNS/ldk/Layer.hpp> 00035 #include <WNS/probe/bus/ContextProviderCollection.hpp> 00036 00037 #include <TCP/CumulativeACK.hpp> 00038 #include <TCP/CongestionControl.hpp> 00039 #include <TCP/TCPHeader.hpp> 00040 00041 using namespace tcp; 00042 00043 STATIC_FACTORY_REGISTER_WITH_CREATOR( 00044 CumulativeACK, 00045 wns::ldk::FunctionalUnit, 00046 "tcp.CumulativeACK", 00047 wns::ldk::FUNConfigCreator 00048 ); 00049 00050 00051 CumulativeACK::CumulativeACK(wns::ldk::fun::FUN* _fun, const wns::pyconfig::View& _pyco): 00052 wns::ldk::CommandTypeSpecifier<CumulativeACKCommand>(_fun), 00053 wns::ldk::HasReceptor<>(), 00054 wns::ldk::HasConnector<>(), 00055 wns::ldk::HasDeliverer<>(), 00056 wns::Cloneable<CumulativeACK>(), 00057 pyco(_pyco), 00058 logger(pyco.get("logger")), 00059 fun(_fun), 00060 ccStrategy(NULL), 00061 advertisedWindowSize(pyco.get<unsigned long int>("advertisedWindow")), 00062 sequenceNR(0), 00063 ackNR(0), 00064 lastACKInOrder(0), 00065 receivingCompounds(), 00066 sendingCompounds(), 00067 timeoutSendingCompounds(), 00068 ackCompound(wns::ldk::CompoundPtr()), 00069 copiedACKCompound(wns::ldk::CompoundPtr()), 00070 tcpHeaderReader(NULL) 00071 { 00072 MESSAGE_SINGLE(NORMAL, logger, "TCP::CumulativeACK instance created."); 00073 00074 ccStrategy = new CongestionControl(pyco.get<wns::pyconfig::View>("congestionControl")); 00075 00076 MESSAGE_SINGLE(NORMAL, logger, "Initializing timeout value for segment retransmissions."); 00077 00078 retransmissionTimeout = ccStrategy->getRetransmissionTimeout(); 00079 00080 MESSAGE_SINGLE(NORMAL, logger, "Initializing window size."); 00081 00082 assure(sendCredit() > sendingCompounds.size(), "Wrong initialization of send credit or number of sendingCompounds!"); 00083 00084 wns::probe::bus::ContextProviderCollection localcpc(&getFUN()->getLayer()->getContextProviderCollection()); 00085 00086 windowSizeContextCollector = wns::probe::bus::ContextCollectorPtr( 00087 new wns::probe::bus::ContextCollector(localcpc, "tcp.cumulativeACK.windowSize")); 00088 00089 sendCreditContextCollector = wns::probe::bus::ContextCollectorPtr( 00090 new wns::probe::bus::ContextCollector(localcpc, "tcp.cumulativeACK.sendCredit")); 00091 } 00092 00093 00094 CumulativeACK::~CumulativeACK() 00095 { 00096 delete ccStrategy; 00097 00098 receivingCompounds.clear(); 00099 sendingCompounds.clear(); 00100 timeoutSendingCompounds.clear(); 00101 } 00102 00103 00104 bool 00105 CumulativeACK::doIsAccepting(const wns::ldk::CompoundPtr& _compound __attribute__ ((unused))) const 00106 { 00107 assure(_compound, "doIsAccepting called with invalid compound."); 00108 MESSAGE_SINGLE(NORMAL, logger, "doIsAccepting called."); 00109 00110 return sendCredit() > 0; 00111 } 00112 00113 00114 void 00115 CumulativeACK::onFUNCreated() 00116 { 00117 tcpHeaderReader = getFUN()->getCommandReader("tcp.tcpHeader"); 00118 assure(tcpHeaderReader, "No reader for the TCP Header available!"); 00119 } 00120 00121 00122 void 00123 CumulativeACK::doOnData(const wns::ldk::CompoundPtr& _compound) 00124 { 00125 assure(_compound, "doOnData called with an invalid compound."); 00126 00127 CumulativeACKCommand* cumACKCmd = dynamic_cast<CumulativeACKCommand*>(getCommand(_compound->getCommandPool())); 00128 00129 switch(cumACKCmd->peer.type) 00130 { 00131 case CumulativeACKCommand::I: 00132 MESSAGE_SINGLE(NORMAL, logger, "Received TCP segment with sequence-nr " << cumACKCmd->peer.sequenceNumber 00133 << ". Expected sequence-nr.: " << ackNR); 00134 00135 if (cumACKCmd->peer.sequenceNumber == ackNR) 00136 { 00137 // received segment is in order 00138 wns::ldk::CommandPool* ackPCI = createReply(_compound->getCommandPool()); 00139 this->ackCompound = wns::ldk::CompoundPtr(new wns::ldk::Compound(ackPCI)); 00140 CumulativeACKCommand* ackCommand = activateCommand(ackPCI); 00141 00142 //catch DuplicateKeyValue exception 00143 try 00144 { 00145 receivingCompounds.insert(ackNR, _compound); 00146 } 00147 catch (wns::container::Registry<unsigned long int, wns::ldk::CompoundPtr>::DuplicateKeyValue) 00148 { 00149 // accept duplicate TCP segments 00150 } 00151 00152 wns::ldk::CompoundPtr compound; 00153 00154 while(receivingCompounds.knows(ackNR)) 00155 { 00156 try 00157 { 00158 compound = receivingCompounds.find(ackNR); 00159 } 00160 catch (wns::Exception) 00161 { 00162 throw wns::Exception("No TCP segment available for sequence number " + ackNR); 00163 } 00164 00165 // put to upper layer 00166 getDeliverer()->getAcceptor(compound)->onData(compound); 00167 00168 receivingCompounds.erase(ackNR); 00169 00170 // continue only if next sequence number has 00171 // been received yet 00172 ackNR++; 00173 } 00174 00175 ackCommand->peer.type = CumulativeACKCommand::ACK; 00176 ackCommand->peer.ACKNumber = ackNR; 00177 ackCommand->peer.advertisedWindowSize = advertisedWindowSize - receivingCompounds.size(); 00178 00179 copiedACKCompound = ackCompound->copy(); 00180 00181 updateTCPHeader(this->ackCompound); 00182 00183 MESSAGE_SINGLE(NORMAL, logger, "Sending ACK number: " << ackCommand->peer.ACKNumber); 00184 getConnector()->getAcceptor(this->ackCompound)->sendData(this->ackCompound); 00185 00186 ackCompound = copiedACKCompound; 00187 00188 } 00189 else if(cumACKCmd->peer.sequenceNumber < ackNR) 00190 { 00191 // duplication of an already acked segment; already 00192 // deleted from container 00193 assure(!receivingCompounds.knows(cumACKCmd->peer.sequenceNumber), 00194 "Predecessor compound received. Already forwarded to upper FU."); 00195 00196 } 00197 else 00198 { 00199 // received seqNr. > expected one -- buffering 00200 // received segment is not in order; send duplicate ACK 00201 // with highest in-order sequence number 00202 00203 if (not receivingCompounds.knows(cumACKCmd->peer.sequenceNumber)) 00204 receivingCompounds.insert(cumACKCmd->peer.sequenceNumber, _compound); 00205 00206 if (!getFUN()->getProxy()->commandIsActivated(this->ackCompound->getCommandPool(), this)) 00207 activateCommand(this->ackCompound->getCommandPool()); 00208 00209 CumulativeACKCommand* ackCommand __attribute__ ((unused)) = getCommand(this->ackCompound->getCommandPool()); 00210 00211 ackCommand->peer.type = CumulativeACKCommand::ACK; 00212 ackCommand->peer.ACKNumber = ackNR; 00213 ackCommand->peer.advertisedWindowSize = advertisedWindowSize - receivingCompounds.size(); 00214 00215 copiedACKCompound = ackCompound->copy(); 00216 00217 updateTCPHeader(this->ackCompound); 00218 // ackCompound has to be resent 00219 assure(ackCommand->peer.ACKNumber == ackNR, "Wrong acknowledgement number."); 00220 MESSAGE_SINGLE(NORMAL, logger, "Sending Duplicate ACK for segment " << ackCommand->peer.ACKNumber); 00221 getConnector()->getAcceptor(this->ackCompound)->sendData(this->ackCompound); 00222 00223 ackCompound = copiedACKCompound; 00224 } 00225 break; 00226 00227 case CumulativeACKCommand::ACK: 00228 MESSAGE_SINGLE(NORMAL, logger, "Received acknowledgement. ACK-Nr. " << cumACKCmd->peer.ACKNumber); 00229 00230 // set the sending credit imposed by receiver 00231 // (sliding window mechanism) 00232 advertisedWindowSize = cumACKCmd->peer.advertisedWindowSize; 00233 00234 // the ACK-nr might never be greater than sequence number of the next 00235 // segment to be sent 00236 assure(cumACKCmd->peer.ACKNumber <= sequenceNR, 00237 "Invalid acknowledgement number!"); 00238 00239 if (cumACKCmd->peer.ACKNumber > lastACKInOrder) 00240 { 00248 assure(sendingCompounds.knows(cumACKCmd->peer.ACKNumber-1) && timeoutSendingCompounds.knows(cumACKCmd->peer.ACKNumber-1), 00249 "Sequence number " << cumACKCmd->peer.ACKNumber << " has already been acknowledged."); 00250 00251 lastACKInOrder = cumACKCmd->peer.ACKNumber; 00252 00253 // all segments up to ack-nr. - 1 successfully received 00254 // remove all predecessors 00255 unsigned long int tmp = lastACKInOrder - 1; 00256 while(sendingCompounds.knows(tmp)) 00257 { 00258 // ack all unacked packets 00259 ccStrategy->onSegmentAcknowledged(); 00260 00261 try 00262 { 00263 timeoutSendingCompounds.find(tmp)->cancelTimeout(); 00264 } 00265 catch (wns::Exception) 00266 { 00267 throw wns::Exception("Missing timeout event for TCP segment!"); 00268 } 00269 sendingCompounds.erase(tmp); 00270 tmp--; 00271 } 00272 00273 if (sendCredit() > 0) 00274 this->doWakeup(); 00275 00276 } 00277 else 00278 { 00279 // duplicate ACK segments have been acknowledged before 00280 assure(sendingCompounds.knows(cumACKCmd->peer.ACKNumber), "Invalid compound to be sent."); 00281 00282 ccStrategy->onSegmentLoss(CongestionControl::DUPLICATE_ACK, cumACKCmd->peer.ACKNumber); 00283 00284 if(ccStrategy->duplicateACKThresholdReached(cumACKCmd->peer.ACKNumber)) 00285 { 00286 // fast retransmit 00287 MESSAGE_SINGLE(NORMAL, logger, "Fast retransmit for ACK-Nr. " << cumACKCmd->peer.ACKNumber); 00288 this->retransmitData(cumACKCmd->peer.ACKNumber); 00289 } 00290 } 00291 00292 MESSAGE_SINGLE(NORMAL, logger, "Sliding window mechanism. Send credit: " << sendCredit()); 00293 00294 windowSizeContextCollector->put(ccStrategy->getWindowSize()); 00295 sendCreditContextCollector->put(sendCredit()); 00296 break; 00297 00298 default: 00299 assure(false, "Unknown command type for TCP::CumulativeACK."); 00300 break; 00301 } 00302 } 00303 00304 00305 00306 void 00307 CumulativeACK::doSendData(const wns::ldk::CompoundPtr& _compound) 00308 { 00309 assure(_compound, "doSendData called with an invalid compound"); 00310 00311 CumulativeACKCommand* cumACKCmd = activateCommand(_compound->getCommandPool()); 00312 00313 cumACKCmd->peer.sequenceNumber = sequenceNR; 00314 cumACKCmd->peer.type = CumulativeACKCommand::I; 00315 00316 // create copy for possible retransmissions 00317 wns::ldk::CompoundPtr compound = _compound->copy(); 00318 00319 assure(!sendingCompounds.knows(sequenceNR), "Invalid compound to be sent."); 00320 00321 if(isAccepting(_compound)) 00322 { 00323 try 00324 { 00325 sendingCompounds.insert(sequenceNR, compound); 00326 // set timeout 00327 timeoutSendingCompounds.insert(sequenceNR, new Timeout(this, sequenceNR, ccStrategy->getRetransmissionTimeout())); 00328 } 00329 catch(wns::Exception) 00330 { 00331 throw wns::Exception("TCP segment already sent for sequence number: " + sequenceNR); 00332 } 00333 00334 updateTCPHeader(_compound); 00335 MESSAGE_SINGLE(NORMAL, logger, "Sending data. Sequence number: " << sequenceNR); 00336 getConnector()->getAcceptor(_compound)->sendData(_compound); 00337 00338 sequenceNR++; 00339 } 00340 else 00341 { 00342 // buffer each compound 00343 } 00344 } 00345 00346 00347 void 00348 CumulativeACK::retransmitData(const unsigned long int seqNr) 00349 { 00350 wns::ldk::CompoundPtr _compound; 00351 try 00352 { 00353 _compound = sendingCompounds.find(seqNr); 00354 } 00355 catch (wns::Exception) 00356 { 00357 throw wns::Exception("No tcp segment available for sequence number " + ackNR); 00358 } 00359 00360 wns::ldk::CompoundPtr compound = _compound->copy(); 00361 00362 CumulativeACKCommand* cmd __attribute__ ((unused)) = getCommand(_compound->getCommandPool()); 00363 00364 try 00365 { 00366 sendingCompounds.update(seqNr, compound); 00367 timeoutSendingCompounds.erase(seqNr); 00368 timeoutSendingCompounds.insert(seqNr, new Timeout(this, seqNr, ccStrategy->getRetransmissionTimeout())); 00369 } 00370 catch (wns::Exception) 00371 { 00372 throw wns::Exception("TCP segment not available!"); 00373 } 00374 00375 // compound already buffered, therefore always accepting 00376 MESSAGE_SINGLE(NORMAL, logger, "Retransmitting data. Sequence number: " << cmd->peer.sequenceNumber); 00377 00378 updateTCPHeader(_compound); 00379 getConnector()->getAcceptor(_compound)->sendData(_compound); 00380 } 00381 00382 00383 void 00384 CumulativeACK::doWakeup() 00385 { 00386 MESSAGE_SINGLE(NORMAL, logger, "doWakeup called. Forwarding to upper FU."); 00387 00388 if (sendCredit() > 0) 00389 getReceptor()->wakeup(); 00390 } 00391 00392 00393 unsigned long int 00394 CumulativeACK::min(const unsigned long int x, const unsigned long int y) const 00395 { 00396 return (x<=y) ? x : y; 00397 } 00398 00399 00400 unsigned long int 00401 CumulativeACK::sendCredit() const 00402 { 00403 if (ccStrategy->getWindowSize() > sendingCompounds.size()) 00404 return (min(ccStrategy->getWindowSize()-sendingCompounds.size(),advertisedWindowSize)); 00405 else 00406 return 0; 00407 } 00408 00409 void 00410 CumulativeACK::updateTCPHeader(const wns::ldk::CompoundPtr& _compound) 00411 { 00412 CumulativeACKCommand* cumACKCmd = dynamic_cast<CumulativeACKCommand*>(getCommand(_compound->getCommandPool())); 00413 00414 assure(cumACKCmd!=NULL, "Not a Cumulative ACK Command!"); 00415 assure(tcpHeaderReader!=NULL, "No reader for the TCP Header available!"); 00416 00417 TCPCommand* tcpHeader = tcpHeaderReader->readCommand<TCPCommand>( 00418 _compound->getCommandPool()); 00419 00420 assure(tcpHeader!=NULL, "No valid TCP Header found!"); 00421 00422 tcpHeader->peer.ack = cumACKCmd->peer.type == CumulativeACKCommand::I ? false : true; 00423 }
1.5.5