User Manual, Developers Guide and API Documentation

CumulativeACK.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002  * This file is part of openWNS (open Wireless Network Simulator)
00003  * _____________________________________________________________________________
00004  *
00005  * Copyright (C) 2004-2007
00006  * Chair of Communication Networks (ComNets)
00007  * Kopernikusstr. 16, D-52074 Aachen, Germany
00008  * phone: ++49-241-80-27910,
00009  * fax: ++49-241-80-22242
00010  * email: info@openwns.org
00011  * www: http://www.openwns.org
00012  * _____________________________________________________________________________
00013  *
00014  * openWNS is free software; you can redistribute it and/or modify it under the
00015  * terms of the GNU Lesser General Public License version 2 as published by the
00016  * Free Software Foundation;
00017  *
00018  * openWNS is distributed in the hope that it will be useful, but WITHOUT ANY
00019  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
00020  * A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00021  * details.
00022  *
00023  * You should have received a copy of the GNU Lesser General Public License
00024  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
00025  *
00026  ******************************************************************************/
00027 
00028 #include <WNS/module/Base.hpp>
00029 
00030 #include <WNS/Exception.hpp>
00031 
00032 #include <WNS/StaticFactory.hpp>
00033 #include <WNS/PyConfigViewCreator.hpp>
00034 #include <WNS/ldk/Layer.hpp>
00035 #include <WNS/probe/bus/ContextProviderCollection.hpp>
00036 
00037 #include <TCP/CumulativeACK.hpp>
00038 #include <TCP/CongestionControl.hpp>
00039 #include <TCP/TCPHeader.hpp>
00040 
00041 using namespace tcp;
00042 
00043 STATIC_FACTORY_REGISTER_WITH_CREATOR(
00044     CumulativeACK,
00045     wns::ldk::FunctionalUnit,
00046     "tcp.CumulativeACK",
00047     wns::ldk::FUNConfigCreator
00048     );
00049 
00050 
00051 CumulativeACK::CumulativeACK(wns::ldk::fun::FUN* _fun, const wns::pyconfig::View& _pyco):
00052     wns::ldk::CommandTypeSpecifier<CumulativeACKCommand>(_fun),
00053     wns::ldk::HasReceptor<>(),
00054     wns::ldk::HasConnector<>(),
00055     wns::ldk::HasDeliverer<>(),
00056     wns::Cloneable<CumulativeACK>(),
00057     pyco(_pyco),
00058     logger(pyco.get("logger")),
00059     fun(_fun),
00060     ccStrategy(NULL),
00061     advertisedWindowSize(pyco.get<unsigned long int>("advertisedWindow")),
00062     sequenceNR(0),
00063     ackNR(0),
00064     lastACKInOrder(0),
00065     receivingCompounds(),
00066     sendingCompounds(),
00067     timeoutSendingCompounds(),
00068     ackCompound(wns::ldk::CompoundPtr()),
00069     copiedACKCompound(wns::ldk::CompoundPtr()),
00070     tcpHeaderReader(NULL)
00071 {
00072     MESSAGE_SINGLE(NORMAL, logger, "TCP::CumulativeACK instance created.");
00073 
00074     ccStrategy = new CongestionControl(pyco.get<wns::pyconfig::View>("congestionControl"));
00075 
00076     MESSAGE_SINGLE(NORMAL, logger, "Initializing timeout value for segment retransmissions.");
00077 
00078     retransmissionTimeout = ccStrategy->getRetransmissionTimeout();
00079 
00080     MESSAGE_SINGLE(NORMAL, logger, "Initializing window size.");
00081 
00082     assure(sendCredit() > sendingCompounds.size(), "Wrong initialization of send credit or number of sendingCompounds!");
00083 
00084     wns::probe::bus::ContextProviderCollection localcpc(&getFUN()->getLayer()->getContextProviderCollection());
00085 
00086     windowSizeContextCollector = wns::probe::bus::ContextCollectorPtr(
00087         new wns::probe::bus::ContextCollector(localcpc, "tcp.cumulativeACK.windowSize"));
00088 
00089     sendCreditContextCollector = wns::probe::bus::ContextCollectorPtr(
00090         new wns::probe::bus::ContextCollector(localcpc, "tcp.cumulativeACK.sendCredit"));
00091 }
00092 
00093 
00094 CumulativeACK::~CumulativeACK()
00095 {
00096     delete ccStrategy;
00097 
00098     receivingCompounds.clear();
00099     sendingCompounds.clear();
00100     timeoutSendingCompounds.clear();
00101 }
00102 
00103 
00104 bool
00105 CumulativeACK::doIsAccepting(const wns::ldk::CompoundPtr& _compound __attribute__ ((unused))) const
00106 {
00107     assure(_compound, "doIsAccepting called with invalid compound.");
00108     MESSAGE_SINGLE(NORMAL, logger, "doIsAccepting called.");
00109 
00110     return sendCredit() > 0;
00111 }
00112 
00113 
00114 void
00115 CumulativeACK::onFUNCreated()
00116 {
00117     tcpHeaderReader = getFUN()->getCommandReader("tcp.tcpHeader");
00118     assure(tcpHeaderReader, "No reader for the TCP Header available!");
00119 }
00120 
00121 
00122 void
00123 CumulativeACK::doOnData(const wns::ldk::CompoundPtr& _compound)
00124 {
00125     assure(_compound, "doOnData called with an invalid compound.");
00126 
00127     CumulativeACKCommand* cumACKCmd = dynamic_cast<CumulativeACKCommand*>(getCommand(_compound->getCommandPool()));
00128 
00129     switch(cumACKCmd->peer.type)
00130     {
00131     case CumulativeACKCommand::I:
00132         MESSAGE_SINGLE(NORMAL, logger, "Received TCP segment with sequence-nr " << cumACKCmd->peer.sequenceNumber
00133                    << ". Expected sequence-nr.: " << ackNR);
00134 
00135         if (cumACKCmd->peer.sequenceNumber == ackNR)
00136         {
00137             // received segment is in order
00138             wns::ldk::CommandPool* ackPCI = createReply(_compound->getCommandPool());
00139             this->ackCompound = wns::ldk::CompoundPtr(new wns::ldk::Compound(ackPCI));
00140             CumulativeACKCommand* ackCommand = activateCommand(ackPCI);
00141 
00142             //catch DuplicateKeyValue exception
00143             try
00144             {
00145                 receivingCompounds.insert(ackNR, _compound);
00146             }
00147             catch (wns::container::Registry<unsigned long int, wns::ldk::CompoundPtr>::DuplicateKeyValue)
00148             {
00149                 // accept duplicate TCP segments
00150             }
00151 
00152             wns::ldk::CompoundPtr compound;
00153 
00154             while(receivingCompounds.knows(ackNR))
00155             {
00156                 try
00157                 {
00158                     compound = receivingCompounds.find(ackNR);
00159                 }
00160                 catch (wns::Exception)
00161                 {
00162                     throw wns::Exception("No TCP segment available for sequence number " + ackNR);
00163                 }
00164 
00165                 // put to upper layer
00166                 getDeliverer()->getAcceptor(compound)->onData(compound);
00167 
00168                 receivingCompounds.erase(ackNR);
00169 
00170                 // continue only if next sequence number has
00171                 // been received yet
00172                 ackNR++;
00173             }
00174 
00175             ackCommand->peer.type = CumulativeACKCommand::ACK;
00176             ackCommand->peer.ACKNumber = ackNR;
00177             ackCommand->peer.advertisedWindowSize = advertisedWindowSize - receivingCompounds.size();
00178 
00179             copiedACKCompound = ackCompound->copy();
00180 
00181             updateTCPHeader(this->ackCompound);
00182             
00183             MESSAGE_SINGLE(NORMAL, logger, "Sending ACK number: " << ackCommand->peer.ACKNumber);
00184             getConnector()->getAcceptor(this->ackCompound)->sendData(this->ackCompound);
00185 
00186             ackCompound = copiedACKCompound;
00187 
00188         }
00189         else if(cumACKCmd->peer.sequenceNumber < ackNR)
00190         {
00191             // duplication of an already acked segment; already
00192             // deleted from container
00193             assure(!receivingCompounds.knows(cumACKCmd->peer.sequenceNumber),
00194                    "Predecessor compound received. Already forwarded to upper FU.");
00195 
00196         }
00197         else
00198         {
00199             // received seqNr. > expected one -- buffering
00200             // received segment is not in order; send duplicate ACK
00201             // with highest in-order sequence number
00202 
00203             if (not receivingCompounds.knows(cumACKCmd->peer.sequenceNumber))
00204                 receivingCompounds.insert(cumACKCmd->peer.sequenceNumber, _compound);
00205 
00206             if (!getFUN()->getProxy()->commandIsActivated(this->ackCompound->getCommandPool(), this))
00207                 activateCommand(this->ackCompound->getCommandPool());
00208 
00209             CumulativeACKCommand* ackCommand __attribute__ ((unused)) = getCommand(this->ackCompound->getCommandPool());
00210 
00211             ackCommand->peer.type = CumulativeACKCommand::ACK;
00212             ackCommand->peer.ACKNumber = ackNR;
00213             ackCommand->peer.advertisedWindowSize = advertisedWindowSize - receivingCompounds.size();
00214 
00215             copiedACKCompound = ackCompound->copy();
00216 
00217             updateTCPHeader(this->ackCompound);
00218             // ackCompound has to be resent
00219             assure(ackCommand->peer.ACKNumber == ackNR, "Wrong acknowledgement number.");
00220             MESSAGE_SINGLE(NORMAL, logger, "Sending Duplicate ACK for segment " << ackCommand->peer.ACKNumber);
00221             getConnector()->getAcceptor(this->ackCompound)->sendData(this->ackCompound);
00222 
00223             ackCompound = copiedACKCompound;
00224         }
00225         break;
00226 
00227     case CumulativeACKCommand::ACK:
00228         MESSAGE_SINGLE(NORMAL, logger, "Received acknowledgement. ACK-Nr. " << cumACKCmd->peer.ACKNumber);
00229 
00230         // set the sending credit imposed by receiver
00231         // (sliding window mechanism)
00232         advertisedWindowSize = cumACKCmd->peer.advertisedWindowSize;
00233 
00234         // the ACK-nr might never be greater than sequence number of the next
00235         // segment to be sent
00236         assure(cumACKCmd->peer.ACKNumber <= sequenceNR,
00237                "Invalid acknowledgement number!");
00238 
00239         if (cumACKCmd->peer.ACKNumber > lastACKInOrder)
00240         {
00248             assure(sendingCompounds.knows(cumACKCmd->peer.ACKNumber-1) && timeoutSendingCompounds.knows(cumACKCmd->peer.ACKNumber-1),
00249                    "Sequence number " << cumACKCmd->peer.ACKNumber << " has already been acknowledged.");
00250 
00251             lastACKInOrder = cumACKCmd->peer.ACKNumber;
00252 
00253             // all segments up to ack-nr. - 1 successfully received
00254             // remove all predecessors
00255             unsigned long int tmp = lastACKInOrder - 1;
00256             while(sendingCompounds.knows(tmp))
00257             {
00258                 // ack all unacked packets
00259                 ccStrategy->onSegmentAcknowledged();
00260 
00261                 try
00262                 {
00263                     timeoutSendingCompounds.find(tmp)->cancelTimeout();
00264                 }
00265                 catch (wns::Exception)
00266                 {
00267                     throw wns::Exception("Missing timeout event for TCP segment!");
00268                 }
00269                 sendingCompounds.erase(tmp);
00270                 tmp--;
00271             }
00272 
00273             if (sendCredit() > 0)
00274                 this->doWakeup();
00275 
00276         }
00277         else
00278         {
00279             // duplicate ACK segments have been acknowledged before
00280             assure(sendingCompounds.knows(cumACKCmd->peer.ACKNumber), "Invalid compound to be sent.");
00281 
00282             ccStrategy->onSegmentLoss(CongestionControl::DUPLICATE_ACK, cumACKCmd->peer.ACKNumber);
00283 
00284             if(ccStrategy->duplicateACKThresholdReached(cumACKCmd->peer.ACKNumber))
00285             {
00286                 // fast retransmit
00287                 MESSAGE_SINGLE(NORMAL, logger, "Fast retransmit for ACK-Nr. " << cumACKCmd->peer.ACKNumber);
00288                 this->retransmitData(cumACKCmd->peer.ACKNumber);
00289             }
00290         }
00291 
00292         MESSAGE_SINGLE(NORMAL, logger, "Sliding window mechanism. Send credit: " << sendCredit());
00293 
00294         windowSizeContextCollector->put(ccStrategy->getWindowSize());
00295         sendCreditContextCollector->put(sendCredit());
00296         break;
00297 
00298     default:
00299         assure(false, "Unknown command type for TCP::CumulativeACK.");
00300         break;
00301     }
00302 }
00303 
00304 
00305 
00306 void
00307 CumulativeACK::doSendData(const wns::ldk::CompoundPtr& _compound)
00308 {
00309     assure(_compound, "doSendData called with an invalid compound");
00310 
00311         CumulativeACKCommand* cumACKCmd = activateCommand(_compound->getCommandPool());
00312 
00313         cumACKCmd->peer.sequenceNumber = sequenceNR;
00314         cumACKCmd->peer.type = CumulativeACKCommand::I;
00315 
00316         // create copy for possible retransmissions
00317         wns::ldk::CompoundPtr compound = _compound->copy();
00318 
00319         assure(!sendingCompounds.knows(sequenceNR), "Invalid compound to be sent.");
00320 
00321     if(isAccepting(_compound))
00322     {
00323         try
00324         {
00325             sendingCompounds.insert(sequenceNR, compound);
00326             // set timeout
00327             timeoutSendingCompounds.insert(sequenceNR, new Timeout(this, sequenceNR, ccStrategy->getRetransmissionTimeout()));
00328         }
00329         catch(wns::Exception)
00330         {
00331             throw wns::Exception("TCP segment already sent for sequence number: " + sequenceNR);
00332         }
00333 
00334         updateTCPHeader(_compound);
00335         MESSAGE_SINGLE(NORMAL, logger, "Sending data. Sequence number: " << sequenceNR);
00336         getConnector()->getAcceptor(_compound)->sendData(_compound);
00337 
00338         sequenceNR++;   
00339     }
00340     else
00341     {
00342         // buffer each compound
00343     }
00344 }
00345 
00346 
00347 void
00348 CumulativeACK::retransmitData(const unsigned long int seqNr)
00349 {
00350     wns::ldk::CompoundPtr _compound;
00351     try
00352     {
00353         _compound = sendingCompounds.find(seqNr);
00354     }
00355     catch (wns::Exception)
00356     {
00357         throw wns::Exception("No tcp segment available for sequence number " + ackNR);
00358     }
00359 
00360     wns::ldk::CompoundPtr compound = _compound->copy();
00361 
00362     CumulativeACKCommand* cmd __attribute__ ((unused)) = getCommand(_compound->getCommandPool());
00363 
00364     try
00365     {
00366         sendingCompounds.update(seqNr, compound);
00367         timeoutSendingCompounds.erase(seqNr);
00368         timeoutSendingCompounds.insert(seqNr, new Timeout(this, seqNr, ccStrategy->getRetransmissionTimeout()));
00369     }
00370     catch (wns::Exception)
00371     {
00372         throw wns::Exception("TCP segment not available!");
00373     }
00374 
00375     // compound already buffered, therefore always accepting
00376     MESSAGE_SINGLE(NORMAL, logger, "Retransmitting data. Sequence number: " << cmd->peer.sequenceNumber);
00377 
00378     updateTCPHeader(_compound);
00379     getConnector()->getAcceptor(_compound)->sendData(_compound);
00380 }
00381 
00382 
00383 void
00384 CumulativeACK::doWakeup()
00385 {
00386     MESSAGE_SINGLE(NORMAL, logger, "doWakeup called. Forwarding to upper FU.");
00387 
00388     if (sendCredit() > 0)
00389         getReceptor()->wakeup();
00390 }
00391 
00392 
00393 unsigned long int
00394 CumulativeACK::min(const unsigned long int x, const unsigned long int y) const
00395 {
00396     return (x<=y) ? x : y;
00397 }
00398 
00399 
00400 unsigned long int
00401 CumulativeACK::sendCredit() const
00402 {
00403     if (ccStrategy->getWindowSize() > sendingCompounds.size())
00404         return (min(ccStrategy->getWindowSize()-sendingCompounds.size(),advertisedWindowSize));
00405     else
00406         return 0;
00407 }
00408 
00409 void
00410 CumulativeACK::updateTCPHeader(const wns::ldk::CompoundPtr& _compound)
00411 {
00412     CumulativeACKCommand* cumACKCmd = dynamic_cast<CumulativeACKCommand*>(getCommand(_compound->getCommandPool()));
00413 
00414     assure(cumACKCmd!=NULL, "Not a Cumulative ACK Command!");
00415     assure(tcpHeaderReader!=NULL, "No reader for the TCP Header available!");
00416 
00417     TCPCommand* tcpHeader = tcpHeaderReader->readCommand<TCPCommand>(
00418         _compound->getCommandPool());
00419 
00420     assure(tcpHeader!=NULL, "No valid TCP Header found!");
00421 
00422     tcpHeader->peer.ack = cumACKCmd->peer.type == CumulativeACKCommand::I ? false : true;
00423 }

Generated on Sun May 27 03:31:40 2012 for openWNS by  doxygen 1.5.5