#include <PEtherApp.h>
Public Member Functions | |
virtual void | setNewFID (InformationItem *II, string FID, QoSList qos, vector< string > lidpath) |
Protected Types | |
typedef std::map < InformationItem *, StreamData *, InformationItem > | FIDMap |
The module's FID Table. | |
typedef FIDMap::iterator | FIDIter |
The iterator for the FID Table. | |
Protected Member Functions | |
virtual void | initialize (int stage) |
virtual int | numInitStages () const |
virtual void | handleMessage (cMessage *msg) |
virtual void | finish () |
virtual MACAddress | resolveDestMACAddress () |
virtual void | chooseSubscription () |
virtual void | cancelInformationItem () |
virtual void | sendStreamData (cMessage *timer) |
virtual void | receivePacket (cMessage *msg) |
virtual bool | genInformationItem () |
Protected Attributes | |
FIDMap | FID_map |
double | psProportion |
Proportion between the publishers and subscribers. | |
MACAddress | destMACAddress |
The publication MAC address -- Preferably Broadcast. |
The Publish Ethernet Application. This module acts more like a streaming application. The only interesting this about it, is the use of FID for the proper routing of the created streams, from the publisher to all assigned subscribers. The FID assignment is a operation of the Topology Manager, and this is why, the function setNewFID() is made public, so that the Topology Manager module of the simulation will be able to call it.
typedef FIDMap::iterator PEtherApp::FIDIter [protected] |
The iterator for the FID Table.
typedef std::map<InformationItem *, StreamData *, InformationItem> PEtherApp::FIDMap [protected] |
The module's FID Table.
void PEtherApp::cancelInformationItem | ( | ) | [protected, virtual] |
Cancels the publication of a random Information Item from this application. The difference between this publisher and the base application BasePSApp, in this function is that the publisher has also to delete the chosen Information Item element form the FID Table as well.
00413 { 00414 00415 //If element was successfully removed from the Information Item table, remove it 00416 //from the FID Table as well 00417 if(BasePSApp::cancelInformationItem(UNPUBLISH_INFO)) { 00418 00419 ev << this->getParentModule()->getFullName() << ": Canceling publication." << endl; 00420 00421 //For all entries in the FID Table of this module find the one missing from the 00422 //subscriptions table and delete it 00423 for(FIDIter iter=FID_map.begin();iter!=FID_map.end();iter++) { 00424 00425 subscriptionItem subitem; 00426 subitem.ii=iter->first; 00427 subscrIter subiter = find(subscriptions.begin(),subscriptions.end(), subitem); 00428 00429 if(subiter==subscriptions.end()) { 00430 ev << "Deleting Information item from FID Table: " << iter->first << endl; 00431 //Cancel Timer self-message 00432 iter->second->tmsg=cancelEvent(iter->second->tmsg); 00433 00434 //Delete Timer self-message 00435 delete iter->second->tmsg; 00436 00437 FID_map.erase(iter); 00438 } 00439 } 00440 } 00441 }
void PEtherApp::chooseSubscription | ( | ) | [protected, virtual] |
Chooses a random subscription from the Information Item Table, according to the popularity of each element. Afterwards, the application module tries to become a publisher to this subscription, although it might be already a publisher of it.
00120 { 00121 00122 //Try to receive a subscriber request for an Information Item 00123 InformationItem *subscription=returnRandII(); 00124 if(subscription!=NULL) { 00125 00126 //Inform the Global Information Item Table 00127 if(iit->addPubElement(subscription,this->getParentModule())) { 00128 00129 ev << this->getParentModule()->getFullName() << ": Existing Information Item met with new publication!" << endl; 00130 00131 subscriptionItem subitem; 00132 subitem.ii=subscription; 00133 subscriptions.push_back(subitem); 00134 } 00135 } 00136 //If non returned for any reason print an error message 00137 else { 00138 ev << this->getParentModule()->getFullName() << ": Tried to find subscription but list was empty!" << endl; 00139 } 00140 }
void PEtherApp::finish | ( | ) | [protected, virtual] |
The finish function of the application. Just calls the finish function of the base class and deletes the FID Table of this publisher application.
Reimplemented from BasePSApp.
00080 { 00081 //Delete all inactive timers of FID_map 00082 for(FIDIter iter=FID_map.begin();iter!=FID_map.end();iter++) { 00083 if(iter->second->active==false) { 00084 //These items are not scheduled since they are paused 00085 if(iter->second->tmsg!=NULL) 00086 delete iter->second->tmsg; 00087 } 00088 } 00089 00090 //Clear the FID table 00091 FID_map.clear(); 00092 00093 //Call the base function finish function 00094 BasePSApp::finish(); 00095 }
bool PEtherApp::genInformationItem | ( | ) | [protected, virtual] |
It generates a information Item and places it into the subscriptions table of the publisher, and informs the Information Item Table module, if exists.
00308 { 00309 //Create subscription item 00310 subscriptionItem subitem; 00311 00312 //Generate random Information Item ID 00313 char ID[PURSUIT_ID_LEN+1]; 00314 for(int i=0;i<PURSUIT_ID_LEN;i++) 00315 { 00316 ID[i]=(char)(1+intrand(255)); 00317 } 00318 //Put string end character 00319 ID[PURSUIT_ID_LEN] = '\0'; 00320 00321 //Generate a scope ID -- static is just fine 00322 InformationItem *II=new InformationItem("0457A0B220ABF100",ID); 00323 00324 //Assign the successfully created Information Item in the subscription 00325 subitem.ii=II; 00326 00327 //Create randomly Meta-data for the future Information Item 00328 //TODO: Find a method for generating these metadata 00329 subitem.qos[QoS_BW_M]=(uint16_t)datarate->doubleValue(); 00330 subitem.qos[QoS_FLOW]=1; 00331 00332 /* First the Topology Manager has to be informed of the new Information Item 00333 * and it's meta data. This happens to avoid finding paths without being 00334 * informed of the QoS requirements of each item. If there is a mistake, 00335 * such as the Information Item already exists, the Topology Manager will 00336 * not accept the packet. The same information lies on the rendezvous table.*/ 00337 publishMetaDataPacket(subitem); 00338 00339 /***Inform the Global Information Item Table***/ 00340 if(iit->addPubElement(II,this->getParentModule())) { 00341 00342 subscriptions.push_back(subitem); 00343 return true; 00344 } 00345 /**********************************************/ 00346 00347 return false; 00348 }
void PEtherApp::handleMessage | ( | cMessage * | msg | ) | [protected, virtual] |
Handles incoming messages for the publisher application. In case of incoming messages, it records some statistics, although this functionality is just kept for future use. The most important use of this function is the interpretation of the received self messages, controlling the transmission of packets for the published Information Item streams and the creation new publications.
Reimplemented from BasePSApp.
00144 { 00145 if (msg->isSelfMessage()) 00146 { 00147 //Self message to create or delete a new publication 00148 if(strcmp(msg->getFullName(),"chooseSubscriber")==0) { 00149 //Decide whether to make new subscription 00150 double now=simTime().dbl(); 00151 //Get a random variable according the the Gaussian distribution 00152 double pubPeak=normal(this->par("actPeak").doubleValue(),PEAK_DEV); 00153 00154 //Try to create a new publication, if the random variable is close enough to the current time. 00155 //To create a periodical functionality, the modulo of the current time is taken for the time 00156 //period equal to three times the mean value of the Gaussian random variable. 00157 if(pubPeak-(2*PEAK_DEV+1)<fmod(now,3*pubPeak) && pubPeak+(2*PEAK_DEV+1)>fmod(now,3*pubPeak)) { 00158 if(dblrand()<0.8) { 00159 chooseSubscription(); 00160 } 00161 else { 00162 genInformationItem(); 00163 } 00164 } 00165 //Or delete an old one if the current time is close enough the the double of the Gaussian mean. 00166 //Notice that a bit more space is given to the removing of publications, since it creates 00167 //better results. 00168 else if(2*pubPeak-(2*PEAK_DEV+2)<fmod(now,3*pubPeak) && 2*pubPeak+(2*PEAK_DEV+2)>fmod(now,3*pubPeak)) { 00169 cancelInformationItem(); 00170 } 00171 00172 simtime_t pt = par("pubInterval").doubleValue(); 00173 // Send packet with a small jitter 00174 simtime_t jitter = dblrand() * MAXJITTER; 00175 scheduleAt(simTime()+pt+jitter, msg); 00176 } 00177 //Self message indicating it is time to send a stream 00178 else if(strcmp(msg->getFullName(),"StreamTmr")==0) { 00179 sendStreamData(msg); 00180 } 00181 } 00182 else 00183 { 00184 receivePacket(msg); 00185 } 00186 00187 }
void PEtherApp::initialize | ( | int | stage | ) | [protected, virtual] |
Initialization function of the publisher application. Schedules the initial self messages and the class parameters.
Reimplemented from BasePSApp.
00048 { 00049 BasePSApp::initialize(stage); 00050 00051 // we can only initialize in the 2nd stage (stage==1), because 00052 // assignment of "auto" MAC addresses takes place in stage 0 00053 if (stage == 1) 00054 { 00055 //Read the proportion of choosing publishers against adding subscribers 00056 psProportion = simulation.getSystemModule()->par("PubSubProportion"); 00057 if(psProportion<0 || psProportion>1) 00058 psProportion=0.5; 00059 00060 //Watch the FID Table 00061 WATCH_MAP(FID_map); 00062 00063 destMACAddress = resolveDestMACAddress(); 00064 00065 // if no dest address given, nothing to do 00066 if (destMACAddress.isUnspecified()) 00067 return; 00068 00069 //Generate self message for choosing a subscriber to publish for 00070 cMessage *chooseSubmsg = new cMessage("chooseSubscriber"); 00071 simtime_t pt = par("pubInterval").doubleValue(); 00072 // Send packet with a small jitter 00073 simtime_t jitter = dblrand() * MAXJITTER; 00074 scheduleAt(simTime()+pt+jitter, chooseSubmsg); 00075 } 00076 }
virtual int PEtherApp::numInitStages | ( | ) | const [inline, protected, virtual] |
void PEtherApp::receivePacket | ( | cMessage * | msg | ) | [protected, virtual] |
It actually only records their statistics and deletes them. It has now current use and it is kept for future expansion of the class.
00296 { 00297 EV << "Received packet `" << msg->getName() << "'\n"; 00298 00299 packetsReceived++; 00300 simtime_t lastEED = simTime() - msg->getCreationTime(); 00301 eedVector.record(lastEED); 00302 eedStats.collect(lastEED); 00303 00304 delete msg; 00305 }
MACAddress PEtherApp::resolveDestMACAddress | ( | ) | [protected, virtual] |
Interprets the MAC address chosen as in input parameter for sending all publication packets to.
00099 { 00100 MACAddress destMACAddress; 00101 const char *destAddress = par("destAddress"); 00102 if (destAddress[0]) 00103 { 00104 // try as mac address first, then as a module 00105 if (!destMACAddress.tryParse(destAddress)) 00106 { 00107 cModule *destStation = simulation.getModuleByPath(destAddress); 00108 if (!destStation) 00109 error("cannot resolve MAC address '%s': not a 12-hex-digit MAC address or a valid module path name", destAddress); 00110 cModule *destMAC = destStation->getSubmodule("mac"); 00111 if (!destMAC) 00112 error("module '%s' has no 'mac' submodule", destAddress); 00113 destMACAddress.setAddress(destMAC->par("address")); 00114 } 00115 } 00116 return destMACAddress; 00117 }
void PEtherApp::sendStreamData | ( | cMessage * | timer | ) | [protected, virtual] |
Send the packets of a stream corresponding to a specific Information Item, in the interval determined by the waitInterval variable of the class. The packets are sent to the MAC address specified by the destMACAddress variable of the class and with FID taken from the FID Table of the application. In case the FID is empty, no packet is sent until a new value is filled in.
00191 { 00192 //Access current stream 00193 InformationItem *II = (InformationItem *) timer->getContextPointer(); 00194 00195 FIDIter iter=FID_map.find(II); 00196 00197 if(iter==FID_map.end()) { 00198 ev << this->getParentModule()->getFullName() << ": Error trying to send a deleted Information Item stream!" << endl; 00199 00200 //Delete obsolete timer 00201 delete timer; 00202 } 00203 else { 00204 00205 //If FID is not empty, continue sending stream 00206 if(iter->second->FID.compare("")!=0) { 00207 00208 seqNum++; 00209 00210 // generate and send a packet 00211 char msgname[30]; 00212 sprintf(msgname, "PSmsg-%d-%ld", getId(), seqNum); 00213 ev << "Generating packet `" << msgname << "'\n"; 00214 00215 PSEtherAppMsg *datapacket = new PSEtherAppMsg(msgname, IEEE802CTRL_DATA); 00216 00217 // ---------------------------------------------// 00218 // ------------- Set Packet header -------------// 00219 // ---------------------------------------------// 00220 00221 datapacket->setFID(iter->second->FID.c_str()); 00222 00223 //Set the information item scope length 00224 datapacket->setScopeLen(iter->first->scopeSize()); 00225 00226 //Set the Information Item scope and ID 00227 datapacket->setIIID(*iter->first); 00228 00229 //Set the information item type 00230 datapacket->setType(PUBLISHED_DATA); 00231 00232 //If we are checking for false positives. This field does not count in the 00233 //bit length of the message since it is supposed not to be there! 00234 if(simulation.getSystemModule()->par("FP_check").boolValue()==true) { 00235 //Set the path length 00236 datapacket->setPathArraySize(iter->second->path.size()); 00237 //Set the lid of each link 00238 for(uint i=0;i<iter->second->path.size();i++) 00239 datapacket->setPath(i,iter->second->path[i].c_str()); 00240 } 00241 00242 Ieee802Ctrl *etherctrl = new Ieee802Ctrl(); 00243 etherctrl->setDest(destMACAddress); 00244 datapacket->setControlInfo(etherctrl); 00245 00246 //Fix the size of the packet 00247 int length=datapacket->getByteLength(); 00248 datapacket->setByteLength(length + FID_LEN /*FID*/ + 1 /*type*/ + 1 /*scopeLen*/ + PURSUIT_ID_LEN*iter->first->scopeSize() /*Scope*/ + PURSUIT_ID_LEN /*Inf_Item ID*/); 00249 00250 //---------------------------------------------// 00251 //-------------- Set Packet Load --------------// 00252 //---------------------------------------------// 00253 datapacket->setByteLength(MAX_ETH_MTU); 00254 //Check if Information Item represents stream or a file 00255 /*if(iter->second->byteSize>=0) { 00256 //If file and the remaining size is more than the maximum remaining payload 00257 if(iter->second->byteSize>(MAX_ETH_MTU-length)) { 00258 datapacket->setByteLength(MAX_ETH_MTU); 00259 iter->second->byteSize-=(MAX_ETH_MTU-length); 00260 } 00261 //Else sent all the remaining bytes 00262 else { 00263 int length=datapacket->getByteLength(); 00264 datapacket->setByteLength(length + iter->second->byteSize); 00265 iter->second->byteSize=0; 00266 } 00267 } 00268 //If stream leave a byte free for differentiation in the reception 00269 else 00270 datapacket->setByteLength(MAX_ETH_MTU-1);*/ 00271 //---------------------------------------------// 00272 00273 //Set up timer for the next stream packet according to the requested bitrate 00274 double packet_interval=(datapacket->getBitLength()/(datarate->doubleValue()*1000000)); 00275 00276 //Check for huge intervals and mimize them 00277 if(packet_interval>100) 00278 packet_interval=100; 00279 00280 simtime_t interval=packet_interval; // We are talking about Mbps in the datarate 00281 scheduleAt(simTime()+interval, timer); 00282 00283 send(datapacket, "ifOut", 0); 00284 iter->second->numPkSent++; 00285 packetsSent++; 00286 } 00287 //If the is no FID delete the timer self message 00288 else { 00289 iter->second->active=false; 00290 } 00291 } 00292 }
void PEtherApp::setNewFID | ( | InformationItem * | II, | |
string | FID, | |||
QoSList | qos, | |||
vector< string > | lidpath | |||
) | [virtual] |
A public function of this class that allows other modules to fill in new FIDs for the published Information Items of this application. It is usually called by the Topology Manager, in order to inform the publisher of the new FID for an Information Item currently being published.
00351 { 00352 00353 Enter_Method("TM informing %s with new FID",this->getParentModule()->getFullName()); 00354 00355 //Check is the publishing application supports this information item 00356 subscriptionItem subitem; 00357 subitem.ii=II; 00358 subscrIter subiter=find(subscriptions.begin(),subscriptions.end(),subitem); 00359 00360 //If the information item was found 00361 if(subiter!=subscriptions.end()) { 00362 //Correct the QoS requirements in case they are wrong 00363 subiter->qos=qos; 00364 00365 //Check if Information Item is already in the FID table 00366 FIDIter iter=FID_map.find(II); 00367 //If the Information Item already exists in FID Table 00368 if(iter!=FID_map.end()) { 00369 00370 //It doesn't matter if the FID is empty 00371 iter->second->FID=FID; 00372 00373 //It doesn't matter if the LID path is empty 00374 iter->second->path=lidpath; 00375 00376 //If the stream is not active right now, create new timer message right away 00377 if(iter->second->active!=true) { 00378 iter->second->active=true; 00379 00380 //Cancel the future scheduling for this stream 00381 iter->second->tmsg=cancelEvent(iter->second->tmsg); 00382 //First packet right away 00383 sendStreamData(iter->second->tmsg); 00384 } 00385 } 00386 //If the Information Item doesn't have an FID record, create it 00387 else { 00388 StreamData *d = new StreamData(); 00389 d->FID = FID; 00390 d->path=lidpath; 00391 d->numPkSent = 0; 00392 d->byteSize = par("fileSize").doubleValue(); 00393 d->active=true; 00394 00395 InformationItem *IIkey=new InformationItem(*II); 00396 FID_map.insert(std::pair<InformationItem *, StreamData *>(IIkey,d)); 00397 00398 cMessage *timer = new cMessage("StreamTmr"); 00399 timer->setContextPointer(IIkey); 00400 00401 //Set a pointer to the timer packet so that we can find it! 00402 d->tmsg=timer; 00403 00404 // first packet right away 00405 sendStreamData(timer); 00406 } 00407 } 00408 else 00409 ev << "Error: II not found in subscriptions!!!" << endl; 00410 }
MACAddress PEtherApp::destMACAddress [protected] |
The publication MAC address -- Preferably Broadcast.
FIDMap PEtherApp::FID_map [protected] |
The map defining which data stream options describe each Information item, the Publisher is currently transmitting.
double PEtherApp::psProportion [protected] |
Proportion between the publishers and subscribers.
Stream transmission parameters