PEtherApp Class Reference

#include <PEtherApp.h>

Inheritance diagram for PEtherApp:
BasePSApp cSimpleModule cModule cComponent cDefaultList cNoncopyableOwnedObject cOwnedObject noncopyable cNamedObject cObject

List of all members.

Public Member Functions

virtual void setNewFID (InformationItem *II, string FID, QoSList qos, vector< string > lidpath)

Protected Types

typedef std::map
< InformationItem
*, StreamData
*, InformationItem
FIDMap
 The module's FID Table.
typedef FIDMap::iterator FIDIter
 The iterator for the FID Table.

Protected Member Functions

virtual void initialize (int stage)
virtual int numInitStages () const
virtual void handleMessage (cMessage *msg)
virtual void finish ()
virtual MACAddress resolveDestMACAddress ()
virtual void chooseSubscription ()
virtual void cancelInformationItem ()
virtual void sendStreamData (cMessage *timer)
virtual void receivePacket (cMessage *msg)
virtual bool genInformationItem ()

Protected Attributes

FIDMap FID_map
double psProportion
 Proportion between the publishers and subscribers.
MACAddress destMACAddress
 The publication MAC address -- Preferably Broadcast.

Detailed Description

The Publish Ethernet Application. This module acts more like a streaming application. The only interesting this about it, is the use of FID for the proper routing of the created streams, from the publisher to all assigned subscribers. The FID assignment is a operation of the Topology Manager, and this is why, the function setNewFID() is made public, so that the Topology Manager module of the simulation will be able to call it.


Member Typedef Documentation

typedef FIDMap::iterator PEtherApp::FIDIter [protected]

The iterator for the FID Table.

typedef std::map<InformationItem *, StreamData *, InformationItem> PEtherApp::FIDMap [protected]

The module's FID Table.


Member Function Documentation

void PEtherApp::cancelInformationItem (  )  [protected, virtual]

Cancels the publication of a random Information Item from this application. The difference between this publisher and the base application BasePSApp, in this function is that the publisher has also to delete the chosen Information Item element form the FID Table as well.

00413                                       {
00414 
00415         //If element was successfully removed from the Information Item table, remove it
00416         //from the FID Table as well
00417         if(BasePSApp::cancelInformationItem(UNPUBLISH_INFO)) {
00418 
00419                 ev << this->getParentModule()->getFullName() << ": Canceling publication." << endl;
00420 
00421                 //For all entries in the FID Table of this module find the one missing from the
00422                 //subscriptions table and delete it
00423                 for(FIDIter iter=FID_map.begin();iter!=FID_map.end();iter++) {
00424 
00425                         subscriptionItem subitem;
00426                         subitem.ii=iter->first;
00427                         subscrIter subiter = find(subscriptions.begin(),subscriptions.end(), subitem);
00428 
00429                         if(subiter==subscriptions.end()) {
00430                                 ev << "Deleting Information item from FID Table: " << iter->first << endl;
00431                                 //Cancel Timer self-message
00432                                 iter->second->tmsg=cancelEvent(iter->second->tmsg);
00433 
00434                                 //Delete Timer self-message
00435                                 delete iter->second->tmsg;
00436 
00437                                 FID_map.erase(iter);
00438                         }
00439                 }
00440         }
00441 }

void PEtherApp::chooseSubscription (  )  [protected, virtual]

Chooses a random subscription from the Information Item Table, according to the popularity of each element. Afterwards, the application module tries to become a publisher to this subscription, although it might be already a publisher of it.

00120                                    {
00121 
00122         //Try to receive a subscriber request for an Information Item
00123         InformationItem *subscription=returnRandII();
00124         if(subscription!=NULL) {
00125 
00126                 //Inform the Global Information Item Table
00127                 if(iit->addPubElement(subscription,this->getParentModule())) {
00128 
00129                         ev << this->getParentModule()->getFullName() << ": Existing Information Item met with new publication!" << endl;
00130 
00131                         subscriptionItem subitem;
00132                         subitem.ii=subscription;
00133                         subscriptions.push_back(subitem);
00134                 }
00135         }
00136         //If non returned for any reason print an error message
00137         else {
00138                 ev << this->getParentModule()->getFullName() << ": Tried to find subscription but list was empty!" << endl;
00139         }
00140 }

void PEtherApp::finish (  )  [protected, virtual]

The finish function of the application. Just calls the finish function of the base class and deletes the FID Table of this publisher application.

Reimplemented from BasePSApp.

00080 {
00081         //Delete all inactive timers of FID_map
00082         for(FIDIter iter=FID_map.begin();iter!=FID_map.end();iter++) {
00083         if(iter->second->active==false) {
00084                         //These items are not scheduled since they are paused
00085                         if(iter->second->tmsg!=NULL)
00086                                 delete iter->second->tmsg;
00087                 }
00088         }
00089 
00090         //Clear the FID table
00091         FID_map.clear();
00092 
00093         //Call the base function finish function
00094         BasePSApp::finish();
00095 }

bool PEtherApp::genInformationItem (  )  [protected, virtual]

It generates a information Item and places it into the subscriptions table of the publisher, and informs the Information Item Table module, if exists.

00308                                    {
00309         //Create subscription item
00310         subscriptionItem subitem;
00311 
00312         //Generate random Information Item ID
00313         char ID[PURSUIT_ID_LEN+1];
00314         for(int i=0;i<PURSUIT_ID_LEN;i++)
00315         {
00316                         ID[i]=(char)(1+intrand(255));
00317         }
00318         //Put string end character
00319         ID[PURSUIT_ID_LEN] = '\0';
00320 
00321         //Generate a scope ID -- static is just fine
00322         InformationItem *II=new InformationItem("0457A0B220ABF100",ID);
00323 
00324         //Assign the successfully created Information Item in the subscription
00325         subitem.ii=II;
00326 
00327         //Create randomly Meta-data for the future Information Item
00328         //TODO: Find a method for generating these metadata
00329         subitem.qos[QoS_BW_M]=(uint16_t)datarate->doubleValue();
00330         subitem.qos[QoS_FLOW]=1;
00331 
00332         /* First the Topology Manager has to be informed of the new Information Item
00333          * and it's meta data. This happens to avoid finding paths without being
00334          * informed of the QoS requirements of each item. If there is a mistake,
00335          * such as the Information Item already exists, the Topology Manager will
00336          * not accept the packet. The same information lies on the rendezvous table.*/
00337         publishMetaDataPacket(subitem);
00338 
00339         /***Inform the Global Information Item Table***/
00340         if(iit->addPubElement(II,this->getParentModule())) {
00341 
00342                 subscriptions.push_back(subitem);
00343                 return true;
00344         }
00345         /**********************************************/
00346 
00347         return false;
00348 }

void PEtherApp::handleMessage ( cMessage msg  )  [protected, virtual]

Handles incoming messages for the publisher application. In case of incoming messages, it records some statistics, although this functionality is just kept for future use. The most important use of this function is the interpretation of the received self messages, controlling the transmission of packets for the published Information Item streams and the creation new publications.

Reimplemented from BasePSApp.

00144 {
00145     if (msg->isSelfMessage())
00146     {
00147         //Self message to create or delete a new publication
00148         if(strcmp(msg->getFullName(),"chooseSubscriber")==0) {
00149                 //Decide whether to make new subscription
00150                         double now=simTime().dbl();
00151                         //Get a random variable according the the Gaussian distribution
00152                         double pubPeak=normal(this->par("actPeak").doubleValue(),PEAK_DEV);
00153 
00154                         //Try to create a new publication, if the random variable is close enough to the current time.
00155                         //To create a periodical functionality, the modulo of the current time is taken for the time
00156                         //period equal to three times the mean value of the Gaussian random variable.
00157                         if(pubPeak-(2*PEAK_DEV+1)<fmod(now,3*pubPeak) && pubPeak+(2*PEAK_DEV+1)>fmod(now,3*pubPeak)) {
00158                                 if(dblrand()<0.8) {
00159                                         chooseSubscription();
00160                                 }
00161                                 else {
00162                                         genInformationItem();
00163                                 }
00164                 }
00165                         //Or delete an old one if the current time is close enough the the double of the Gaussian mean.
00166                         //Notice that a bit more space is given to the removing of publications, since it creates
00167                         //better results.
00168                         else if(2*pubPeak-(2*PEAK_DEV+2)<fmod(now,3*pubPeak) && 2*pubPeak+(2*PEAK_DEV+2)>fmod(now,3*pubPeak)) {
00169                         cancelInformationItem();
00170                 }
00171 
00172                 simtime_t pt = par("pubInterval").doubleValue();
00173                 // Send packet with a small jitter
00174                 simtime_t jitter = dblrand() * MAXJITTER;
00175                 scheduleAt(simTime()+pt+jitter, msg);
00176         }
00177         //Self message indicating it is time to send a stream
00178         else if(strcmp(msg->getFullName(),"StreamTmr")==0) {
00179                 sendStreamData(msg);
00180         }
00181     }
00182     else
00183     {
00184         receivePacket(msg);
00185     }
00186 
00187 }

void PEtherApp::initialize ( int  stage  )  [protected, virtual]

Initialization function of the publisher application. Schedules the initial self messages and the class parameters.

Reimplemented from BasePSApp.

00048 {
00049         BasePSApp::initialize(stage);
00050 
00051     // we can only initialize in the 2nd stage (stage==1), because
00052     // assignment of "auto" MAC addresses takes place in stage 0
00053     if (stage == 1)
00054     {
00055         //Read the proportion of choosing publishers against adding subscribers
00056         psProportion = simulation.getSystemModule()->par("PubSubProportion");
00057                 if(psProportion<0 || psProportion>1)
00058                         psProportion=0.5;
00059 
00060         //Watch the FID Table
00061         WATCH_MAP(FID_map);
00062 
00063         destMACAddress = resolveDestMACAddress();
00064 
00065         // if no dest address given, nothing to do
00066         if (destMACAddress.isUnspecified())
00067             return;
00068 
00069         //Generate self message for choosing a subscriber to publish for
00070         cMessage *chooseSubmsg = new cMessage("chooseSubscriber");
00071                 simtime_t pt = par("pubInterval").doubleValue();
00072                 // Send packet with a small jitter
00073                 simtime_t jitter = dblrand() * MAXJITTER;
00074                 scheduleAt(simTime()+pt+jitter, chooseSubmsg);
00075     }
00076 }

virtual int PEtherApp::numInitStages (  )  const [inline, protected, virtual]

Returns the number of the initialization stages.

Reimplemented from BasePSApp.

00086 {return 5;}

void PEtherApp::receivePacket ( cMessage msg  )  [protected, virtual]

It actually only records their statistics and deletes them. It has now current use and it is kept for future expansion of the class.

00296 {
00297     EV << "Received packet `" << msg->getName() << "'\n";
00298 
00299     packetsReceived++;
00300     simtime_t lastEED = simTime() - msg->getCreationTime();
00301     eedVector.record(lastEED);
00302     eedStats.collect(lastEED);
00303 
00304     delete msg;
00305 }

MACAddress PEtherApp::resolveDestMACAddress (  )  [protected, virtual]

Interprets the MAC address chosen as in input parameter for sending all publication packets to.

00099 {
00100     MACAddress destMACAddress;
00101     const char *destAddress = par("destAddress");
00102     if (destAddress[0])
00103     {
00104         // try as mac address first, then as a module
00105         if (!destMACAddress.tryParse(destAddress))
00106         {
00107             cModule *destStation = simulation.getModuleByPath(destAddress);
00108             if (!destStation)
00109                 error("cannot resolve MAC address '%s': not a 12-hex-digit MAC address or a valid module path name", destAddress);
00110             cModule *destMAC = destStation->getSubmodule("mac");
00111             if (!destMAC)
00112                 error("module '%s' has no 'mac' submodule", destAddress);
00113             destMACAddress.setAddress(destMAC->par("address"));
00114         }
00115     }
00116     return destMACAddress;
00117 }

void PEtherApp::sendStreamData ( cMessage timer  )  [protected, virtual]

Send the packets of a stream corresponding to a specific Information Item, in the interval determined by the waitInterval variable of the class. The packets are sent to the MAC address specified by the destMACAddress variable of the class and with FID taken from the FID Table of the application. In case the FID is empty, no packet is sent until a new value is filled in.

00191 {
00192         //Access current stream
00193         InformationItem *II = (InformationItem *) timer->getContextPointer();
00194 
00195         FIDIter iter=FID_map.find(II);
00196 
00197         if(iter==FID_map.end()) {
00198                 ev << this->getParentModule()->getFullName() << ": Error trying to send a deleted Information Item stream!" << endl;
00199 
00200                 //Delete obsolete timer
00201                 delete timer;
00202         }
00203         else {
00204 
00205                 //If FID is not empty, continue sending stream
00206                 if(iter->second->FID.compare("")!=0) {
00207 
00208                         seqNum++;
00209 
00210                         // generate and send a packet
00211                         char msgname[30];
00212                         sprintf(msgname, "PSmsg-%d-%ld", getId(), seqNum);
00213                         ev << "Generating packet `" << msgname << "'\n";
00214 
00215                         PSEtherAppMsg *datapacket = new PSEtherAppMsg(msgname, IEEE802CTRL_DATA);
00216 
00217                         // ---------------------------------------------//
00218                         // ------------- Set Packet header -------------//
00219                         // ---------------------------------------------//
00220 
00221                         datapacket->setFID(iter->second->FID.c_str());
00222 
00223                         //Set the information item scope length
00224                         datapacket->setScopeLen(iter->first->scopeSize());
00225 
00226                         //Set the Information Item scope and ID
00227                         datapacket->setIIID(*iter->first);
00228 
00229                         //Set the information item type
00230                         datapacket->setType(PUBLISHED_DATA);
00231 
00232                         //If we are checking for false positives. This field does not count in the
00233                         //bit length of the message since it is supposed not to be there!
00234                         if(simulation.getSystemModule()->par("FP_check").boolValue()==true) {
00235                                 //Set the path length
00236                                 datapacket->setPathArraySize(iter->second->path.size());
00237                                 //Set the lid of each link
00238                                 for(uint i=0;i<iter->second->path.size();i++)
00239                                         datapacket->setPath(i,iter->second->path[i].c_str());
00240                         }
00241 
00242                         Ieee802Ctrl *etherctrl = new Ieee802Ctrl();
00243                         etherctrl->setDest(destMACAddress);
00244                         datapacket->setControlInfo(etherctrl);
00245 
00246                         //Fix the size of the packet
00247                         int length=datapacket->getByteLength();
00248                         datapacket->setByteLength(length + FID_LEN /*FID*/ + 1 /*type*/ + 1 /*scopeLen*/ + PURSUIT_ID_LEN*iter->first->scopeSize() /*Scope*/ + PURSUIT_ID_LEN /*Inf_Item ID*/);
00249 
00250                         //---------------------------------------------//
00251                         //-------------- Set Packet Load --------------//
00252                         //---------------------------------------------//
00253                         datapacket->setByteLength(MAX_ETH_MTU);
00254                         //Check if Information Item represents stream or a file
00255                         /*if(iter->second->byteSize>=0) {
00256                                 //If file and the remaining size is more than the maximum remaining payload
00257                                 if(iter->second->byteSize>(MAX_ETH_MTU-length)) {
00258                                         datapacket->setByteLength(MAX_ETH_MTU);
00259                                         iter->second->byteSize-=(MAX_ETH_MTU-length);
00260                                 }
00261                                 //Else sent all the remaining bytes
00262                                 else {
00263                                         int length=datapacket->getByteLength();
00264                                         datapacket->setByteLength(length + iter->second->byteSize);
00265                                         iter->second->byteSize=0;
00266                                 }
00267                         }
00268                         //If stream leave a byte free for differentiation in the reception
00269                         else
00270                                 datapacket->setByteLength(MAX_ETH_MTU-1);*/
00271                         //---------------------------------------------//
00272 
00273                         //Set up timer for the next stream packet according to the requested bitrate
00274                         double packet_interval=(datapacket->getBitLength()/(datarate->doubleValue()*1000000));
00275 
00276                         //Check for huge intervals and mimize them
00277                         if(packet_interval>100)
00278                                 packet_interval=100;
00279 
00280                         simtime_t interval=packet_interval;             // We are talking about Mbps in the datarate
00281                         scheduleAt(simTime()+interval, timer);
00282 
00283                         send(datapacket, "ifOut", 0);
00284                         iter->second->numPkSent++;
00285                         packetsSent++;
00286                 }
00287                 //If the is no FID delete the timer self message
00288                 else {
00289                         iter->second->active=false;
00290                 }
00291         }
00292 }

void PEtherApp::setNewFID ( InformationItem II,
string  FID,
QoSList  qos,
vector< string >  lidpath 
) [virtual]

A public function of this class that allows other modules to fill in new FIDs for the published Information Items of this application. It is usually called by the Topology Manager, in order to inform the publisher of the new FID for an Information Item currently being published.

00351                                                                                               {
00352 
00353         Enter_Method("TM informing %s with new FID",this->getParentModule()->getFullName());
00354 
00355         //Check is the publishing application supports this information item
00356         subscriptionItem subitem;
00357         subitem.ii=II;
00358         subscrIter subiter=find(subscriptions.begin(),subscriptions.end(),subitem);
00359 
00360         //If the information item was found
00361         if(subiter!=subscriptions.end()) {
00362                 //Correct the QoS requirements in case they are wrong
00363                 subiter->qos=qos;
00364 
00365                 //Check if Information Item is already in the FID table
00366                 FIDIter iter=FID_map.find(II);
00367                 //If the Information Item already exists in FID Table
00368                 if(iter!=FID_map.end()) {
00369 
00370                         //It doesn't matter if the FID is empty
00371                         iter->second->FID=FID;
00372 
00373                         //It doesn't matter if the LID path is empty
00374                         iter->second->path=lidpath;
00375 
00376                         //If the stream is not active right now, create new timer message right away
00377                         if(iter->second->active!=true) {
00378                                 iter->second->active=true;
00379 
00380                                 //Cancel the future scheduling for this stream
00381                                 iter->second->tmsg=cancelEvent(iter->second->tmsg);
00382                                 //First packet right away
00383                                 sendStreamData(iter->second->tmsg);
00384                         }
00385                 }
00386                 //If the Information Item doesn't have an FID record, create it
00387                 else {
00388                         StreamData *d = new StreamData();
00389                         d->FID = FID;
00390                         d->path=lidpath;
00391                         d->numPkSent = 0;
00392                         d->byteSize =  par("fileSize").doubleValue();
00393                         d->active=true;
00394 
00395                         InformationItem *IIkey=new InformationItem(*II);
00396                         FID_map.insert(std::pair<InformationItem *, StreamData *>(IIkey,d));
00397 
00398                         cMessage *timer = new cMessage("StreamTmr");
00399                         timer->setContextPointer(IIkey);
00400 
00401                         //Set a pointer to the timer packet so that we can find it!
00402                         d->tmsg=timer;
00403 
00404                         // first packet right away
00405                         sendStreamData(timer);
00406                 }
00407         }
00408         else
00409                 ev << "Error: II not found in subscriptions!!!" << endl;
00410 }


Member Data Documentation

MACAddress PEtherApp::destMACAddress [protected]

The publication MAC address -- Preferably Broadcast.

The map defining which data stream options describe each Information item, the Publisher is currently transmitting.

double PEtherApp::psProportion [protected]

Proportion between the publishers and subscribers.

Stream transmission parameters


The documentation for this class was generated from the following files:
Generated on Thu Jun 14 17:12:42 2012 for PAL by  doxygen 1.6.3