00001
00035 #include <stdlib.h>
00036 #include "packet.h"
00037 #include "ip.h"
00038 #include "rtp.h"
00039 #include <math.h>
00040
00041 #define NOW Scheduler::instance().clock()
00042
00046 static class RTPSourceClass : public TclClass {
00047 public:
00048 RTPSourceClass() : TclClass("RTPSource") {}
00049 TclObject* create(int argc, const char*const* argv) {
00050 if (argc >= 5)
00051 return (new RTPSource(atoi(argv[4])));
00052
00053 return 0;
00054 }
00055 } class_rtp_source;
00056
00062 static class RTPSessionClass : public TclClass {
00063 public:
00064 RTPSessionClass() : TclClass("Session/RTP") {}
00065 TclObject* create(int, const char*const*) {
00066 return (new RTPSession());
00067 }
00068 } class_rtp_session;
00069
00073 RTPSession::RTPSession()
00074 : allsrcs_(0), localsrc_(0), receivers_(0),last_np_(0), T_one_way_(0),rh_(0),
00075 we_sent(0),last_pkts_lost_(0),last_ehsr_(-1),tx_rate_(0), time_elapsed_(0)
00076 {
00077 bind("enableFlowControl_", &enableFlowControl_);
00078 bind("rx_recv_",&rx_recv_);
00079 bind("jitter_",&jitter_);
00080 bind("RTT_",&RTT_);
00081 bind("smooth_loss_",&smooth_loss_);
00082 alpha = 1;
00083 last_time_report_ = NOW;
00084
00085 weight[0] = 1;
00086 weight[1] = 1;
00087 weight[2] = 1;
00088 weight[3] = 1;
00089 weight[4] = 0.8;
00090 weight[5] = 0.6;
00091 weight[6] = 0.4;
00092 weight[7] = 0.2;
00093 }
00094
00098 RTPSession::~RTPSession()
00099 {
00100 while (allsrcs_ != 0) {
00101 RTPSource* p = allsrcs_;
00102 allsrcs_ = allsrcs_->next;
00103 delete p;
00104 }
00105 delete localsrc_;
00106 while (receivers_ != 0) {
00107 RTPReceiver* p = receivers_;
00108 receivers_ = receivers_->next;
00109 delete p;
00110 }
00111 lst.~list<double>();
00112
00113 }
00114
00121 void RTPSession::localsrc_update(int)
00122 {
00123 localsrc_->np(1);
00124 localsrc_->is_sender(true);
00125 }
00126
00133 void RTPSession::localsrc_update_nbytes(int n)
00134 {
00135 localsrc_->nbytes(n);
00136
00137 }
00138
00139 #define RTCP_HDRSIZE 8
00140 #define RTCP_SR_SIZE 20
00141 #define RTCP_RR_SIZE 48
00142
00149 int RTPSession::build_report(int bye)
00150 {
00151 rh_ = new hdr_rtp;
00152 rh_->rr_ = 0;
00153 rh_->sr_ = 0;
00154 int nsrc = 0;
00155 int nrr = 0;
00156 int len = RTCP_HDRSIZE;
00157 we_sent = 0;
00158 double fraction = 0.0;
00159 time_elapsed_ = NOW - last_time_report_;
00160 last_time_report_ = NOW;
00161 if (localsrc_->np() != last_np_) {
00162 last_np_ = localsrc_->np();
00163 we_sent = 1;
00164 len += RTCP_SR_SIZE;
00165
00166 sender_report* sr;
00167
00168 sr = new sender_report;
00169 sr->sender_srcid()= localsrc_->srcid();
00170 sr->pkts_sent() = localsrc_->np();
00171 sr->octets_sent() = localsrc_->nbytes();
00172 sr->rcvrs_ = receivers_;
00173
00174 rh_->sr_ = sr;
00175 }
00176 for (RTPSource* sp = allsrcs_; sp != 0; sp = sp->next) {
00177 ++nsrc;
00178 int received = sp->np() - sp->snp();
00179
00180 if (received == 0) {
00181
00182 continue;
00183 }
00184 if(localsrc_->srcid() != sp->srcid()) {
00185 int bytes = received * (sp->ps());
00186 rx_recv_ = ( 8 * (double)bytes)/time_elapsed_;
00187 sp->snp(sp->np());
00188 len += RTCP_RR_SIZE;
00189
00190 int expected_interval = sp->ehsr() - last_ehsr_;
00191 last_ehsr_ = sp->ehsr();
00192 int lost_interval = expected_interval - received;
00193
00194
00195 if (lost_interval <= 0 || expected_interval == 0 ) {
00196 fraction = 0;
00197
00198 } else fraction = ((double)lost_interval / (double)expected_interval);
00199 calculate_RTT();
00200
00201 if( sp->np() >= 1) {
00202 if(fraction == 0) {
00203 increase_rate(sp->ps());
00204
00205 } else {
00206 measure_smooth_loss(fraction);
00207 calculateR_tcp(sp->ps());
00208 }
00209 }
00210
00211 receiver_report* rr;
00212 rr = new receiver_report;
00213
00214 rr->cum_pkts_lost() = sp->cum_pkts_lost();
00215 rr->LSR() = sp->LSR();
00216 rr->DLSR()= NOW - sp->SRT();
00217 rr->R_tcp() = tx_rate_;
00218 rr->jitter() = sp->jitter();
00219 if(bye) {
00220 printf ("TIME for BYE %f \n", NOW);
00221 rr->bye()= 1;
00222 } else rr->bye() = 0;
00223 rh_->rr_ = rr;
00224
00225 if (++nrr >= 31)
00226 break;
00227 }
00228
00229 }
00230
00231
00232 if (bye) {
00233 len += build_bye();
00234 }
00235 else
00236 len += build_sdes();
00237
00238 Tcl::instance().evalf("%s adapt-timer %d %d %d", name(),
00239 nsrc, nrr, we_sent);
00240 Tcl::instance().evalf("%s sample-size %d", name(), len);
00241
00242 return (len);
00243 }
00244
00248 int RTPSession::build_bye()
00249 {
00250
00251 return (8);
00252 }
00256 int RTPSession::build_sdes()
00257 {
00258
00259
00260 return (20);
00261 }
00271 void RTPSession::recv(Packet* p, Handler*)
00272 {
00273
00274
00275 hdr_cmn* mh = hdr_cmn::access(p);
00276 hdr_rtp* rh = hdr_rtp::access(p);
00277
00278 RTPSource* s = lookup(rh->srcid());
00279
00280 if (s == 0) {
00281 if(rh->srcid()!=localsrc_->srcid()){
00282 Tcl& tcl = Tcl::instance();
00283 tcl.evalf("%s new-source %d", name(),rh->srcid());
00284 s = (RTPSource*)TclObject::lookup(tcl.result());
00285 }
00286 }
00287
00288 if(rh->srcid()!=localsrc_->srcid())
00289 {
00290
00291
00292 T_one_way_ = NOW - rh->timestamp();
00293
00294 int pkts_lost = 0;
00295 int difference = rh->seqno() - s->ehsr();
00296 if (difference <= 1) {
00297 pkts_lost = 0;
00298
00299 }
00300
00301 if (difference > 1) {
00302 pkts_lost = difference -1;
00303
00304 }
00305
00306 double transit = NOW - rh->timestamp();
00307 double d = transit - s->transit();
00308 s->transit(transit);
00309 if (d < 0) d = -d;
00310 s->jitter( s->jitter() + (1./16.) * (d - s->jitter()));
00311 jitter_= s->jitter();
00312 s->np(1);
00313 s->cum_pkts_lost(pkts_lost);
00314 s->ehsr(rh->seqno());
00315 s->nbytes(mh->size());
00316 s->ps(mh->size());
00317
00318 }
00319
00320 Packet::free(p);
00321 }
00322
00328 void RTPSession::calculate_alpha(double value)
00329 {
00330 if( value == 0) {
00331 alpha = 1;
00332 } else
00333 alpha = (value/T_one_way_) -1;
00334 }
00335
00336
00340 void RTPSession::calculate_RTT()
00341 {
00342 double temp=(1+alpha) * T_one_way_;
00343 RTT_ = temp * 0.1 + 0.9 * RTT_;
00344 }
00345
00352 void RTPSession::increase_rate(int ps)
00353 {
00354 tx_rate_ = tx_rate_ + (double)ps/RTT_;
00355
00356 }
00357
00364 void RTPSession::measure_smooth_loss (double fraction)
00365 {
00366
00367 double I_tot0 = 0;
00368 double I_tot1 = 0;
00369 double W_tot = 0;
00370 for (int i=0; i<7; i++) {
00371 pkt_loss_history[i+1]= pkt_loss_history[i];
00372 }
00373 pkt_loss_history[0] = fraction;
00374
00375 for (int i=0; i<7; i++) {
00376 I_tot0 = I_tot0+ (pkt_loss_history[i] * weight[i]);
00377 W_tot = W_tot + weight[i];
00378 }
00379
00380 for (int i=1; i<8; i++) {
00381 I_tot1 = I_tot1 + (pkt_loss_history[i] * weight[i-1]);
00382 }
00383 double I_tot =0;
00384 if( I_tot0 > I_tot1) {
00385 I_tot = I_tot0;
00386 } else {
00387 I_tot = I_tot1;
00388 }
00389 double I_mean = I_tot/W_tot;
00390
00391 smooth_loss_= I_mean;
00392
00393
00394
00395 }
00396
00401 void RTPSession::calculateR_tcp(int ps)
00402 {
00403
00404 double min =0;
00405 double fraction = 3 * sqrt((3 * smooth_loss_) / 8);
00406 if(fraction < 1 ){
00407 min = fraction;
00408 } else min =1;
00409
00410 tx_rate_ = (double)ps / (RTT_ * (sqrt(2 * smooth_loss_ /3)) + 4 * (RTT_ * min * smooth_loss_) * (1 + (32 * pow(smooth_loss_,2))));
00411 printf("Time:%f Receiver:%d smooth_loss_:%f \n", NOW,localsrc_->srcid(),smooth_loss_);
00412
00413
00414 }
00415
00425 void RTPSession::recv_ctrl(Packet* p)
00426 {
00427 hdr_cmn* mh = hdr_cmn::access(p);
00428 hdr_rtp* rh = hdr_rtp::access(p);
00429 u_int32_t local_src = localsrc_->srcid();
00430
00431 if(rh->srcid() != local_src) {
00432
00433 if (rh->sr_ != 0) {
00434 double temp = 0.0;
00435 RTPSource* source = lookup(rh->sr_->sender_srcid());
00436 if (source != 0 && source->srcid()!= local_src) {
00437 source->LSR(rh->timestamp());
00438 source->SRT(Scheduler::instance().clock());
00439 for (RTPReceiver* p = rh->sr_->rcvrs_; p != 0; p = p->next) {
00440
00441 if(p->srcid() == localsrc_->srcid()) {
00442 temp = p->eff_rtt();
00443 calculate_alpha(temp);
00444 }
00445 }
00446 }
00447 }
00448
00449 if(localsrc_->is_sender()){
00450
00451 if(rh->rr_!= 0){
00452
00453 RTPReceiver* s = lookup_rcv(rh->srcid());
00454 if (s == 0 ) {
00455 s = new RTPReceiver(rh->srcid());
00456 enter_rcv(s);
00457 return;
00458 }
00459
00460 if (s != 0) {
00461 for (s = receivers_; s!= 0; s = s->next) {
00462
00463 if(rh->srcid() == s->srcid()) {
00464 double eff_rtt = 0.0;
00465 double alpha = Scheduler::instance().clock();
00466 if(rh->rr_->LSR()!=0) {
00467 eff_rtt = alpha - rh->rr_->LSR() - rh->rr_->DLSR();
00468 }
00469 s->eff_rtt(eff_rtt);
00470 s->rate(rh->rr_->R_tcp());
00471 if(rh->rr_->bye() == 1) remove_receiver(s);
00472 }
00473
00474 }
00475
00476 }
00477 if(enableFlowControl_ == 1) {
00478 update_rate();
00479 }
00480 }
00481 }
00482 }
00483
00484 Tcl::instance().evalf("%s sample-size %d", name(), mh->size());
00485 Packet::free(p);
00486 }
00487
00488
00493 void RTPSession::update_rate()
00494 {
00495 RTPReceiver* s;
00496 for (s = receivers_; s!= 0; s = s->next) {
00497 if(s->rate() !=0) {
00498 lst.push_front(s->rate());
00499 }
00500 }
00501 lst.sort();
00502 double inst_tx =lst.front();
00503 lst.clear();
00504 if(inst_tx != tx_rate_) {
00505 if(inst_tx < (tx_rate_/2)) {
00506 tx_rate_ = tx_rate_/2;
00507 } else tx_rate_ = inst_tx;
00508 printf("Time:%f Sender:%d ...Rate change to:%f \n", NOW,localsrc_->srcid(),8*tx_rate_);
00509 Tcl::instance().evalf("%s session_bw %fb/s",name(),8*tx_rate_);
00510 Tcl::instance().evalf("%s transmit %fb/s",name(),8*tx_rate_);
00511 }
00512
00513 }
00514
00519 hdr_rtp* RTPSession::access_hdr_rtp()
00520 {
00521 return rh_;
00522 }
00523
00530 RTPSource* RTPSession::lookup(u_int32_t srcid)
00531 {
00532 RTPSource *p;
00533 for (p = allsrcs_; p != 0; p = p->next)
00534 {
00535 if (p->srcid() == srcid)
00536 return (p);
00537 }
00538 return (0);
00539 }
00540
00547 RTPReceiver* RTPSession::lookup_rcv(u_int32_t srcid)
00548 {
00549
00550 RTPReceiver *p;
00551 for (p = receivers_; p != 0; p = p->next)
00552 {
00553 if (p->srcid() == srcid)
00554 return (p);
00555 }
00556 return (0);
00557 }
00558
00562 void RTPSession::print_rcv( )
00563 {
00564
00565 RTPReceiver *p;
00566 for (p = receivers_; p != 0; p = p->next)
00567 {
00568 printf("Receiver:%d\n", p->srcid());
00569 }
00570 }
00571
00576 void RTPSession::enter(RTPSource* s)
00577 {
00578 if(s->srcid() != localsrc_->srcid() ) {
00579 printf("Time:%f Receiver:%d ...I add sender %d \n", NOW,localsrc_->srcid(), s->srcid());
00580 s->next = allsrcs_;
00581 allsrcs_ = s;
00582 }
00583 }
00584
00589 void RTPSession::enter_rcv(RTPReceiver* s)
00590 {
00591 printf("Time:%f Sender:%d ...I add receiver %d \n", NOW,localsrc_->srcid(), s->srcid());
00592 s->next = receivers_;
00593 receivers_ = s;
00594 }
00595
00600 void RTPSession::remove_receiver(RTPReceiver* p)
00601 {
00602
00603 printf("Time:%f Sender:%d ...I remove receiver %d \n", NOW,localsrc_->srcid(),p->srcid());
00604 RTPReceiver* s;
00605 RTPReceiver* q;
00606 if (receivers_==p)
00607 {
00608 q = receivers_;
00609 receivers_=receivers_->next;
00610 delete q;
00611 }
00612 else
00613 {
00614 for (s = receivers_; s!=0; s=s->next)
00615 {
00616 if (s->next==p)
00617 {
00618 q = s->next;
00619 s->next=q->next;
00620 delete q;
00621 }
00622 }
00623 }
00624
00625 }
00626
00632 void RTPSession::initial_rate(double a)
00633 {
00634 tx_rate_ =(double)a/8;
00635
00636
00637 }
00638
00643 int RTPSession::command(int argc, const char*const* argv)
00644 {
00645 if (argc == 3) {
00646 if (strcmp(argv[1], "enter") == 0) {
00647 RTPSource* s = (RTPSource*)TclObject::lookup(argv[2]);
00648 enter(s);
00649 return (TCL_OK);
00650 }
00651 if (strcmp(argv[1], "localsrc") == 0) {
00652 localsrc_ = (RTPSource*)TclObject::lookup(argv[2]);
00653 enter(localsrc_);
00654 return (TCL_OK);
00655 }
00656 if (strcmp(argv[1], "rate") == 0) {
00657 initial_rate(atoi(argv[2]));
00658 return(TCL_OK);
00659 }
00660 }
00661
00662 return (TclObject::command(argc, argv));
00663 }
00664
00669 RTPReceiver::RTPReceiver(u_int32_t srcid)
00670 : next(0),cum_pkts_lost_(0),eff_rtt_(0),rate_(0)
00671 {
00672 srcid_ = srcid;
00673 }
00674
00679 RTPSource::RTPSource(u_int32_t srcid)
00680 : next(0),np_(0), snp_(0), ehsr_(-1), nbytes_(0),cum_pkts_lost_(0),LSR_(0),SRT_(0),is_sender_(false),ps_(0),rate_(0),transit_(0),jitter_(0)
00681 {
00682 bind("srcid_", (int*)&srcid_);
00683 srcid_ = srcid;
00684 }
00685
00686
00687
00688
00689