00001
00035 #include <stdlib.h>
00036 #include "packet.h"
00037 #include "ip.h"
00038 #include "rtp.h"
00039 #include <math.h>
00040
00041 #define CONGESTED 0.5
00042 #define UNLOADED 0.3
00043 #define LOADED 0.1
00044 #define DELTA 0.75
00045 #define NOW Scheduler::instance().clock()
00046
00050 static class RTPSourceClass : public TclClass {
00051 public:
00052 RTPSourceClass() : TclClass("RTPSource") {}
00053 TclObject* create(int argc, const char*const* argv) {
00054 if (argc >= 5)
00055 return (new RTPSource(atoi(argv[4])));
00056
00057 return 0;
00058 }
00059 } class_rtp_source;
00060
00066 static class RTPSessionClass : public TclClass {
00067 public:
00068 RTPSessionClass() : TclClass("Session/RTP") {}
00069 TclObject* create(int, const char*const*) {
00070 return (new RTPSession());
00071 }
00072 } class_rtp_session;
00073
00077 RTPSession::RTPSession()
00078 : allsrcs_(0), localsrc_(0), receivers_(0),last_np_(0), T_one_way_(0),rh_(0),
00079 we_sent(0),last_pkts_lost_(0),last_ehsr_(-1),tx_rate_(0), time_elapsed_(0)
00080 {
00081 bind("enableFlowControl_", &enableFlowControl_);
00082 bind("rx_recv_",&rx_recv_);
00083 bind("jitter_",&jitter_);
00084 bind("RTT_",&RTT_);
00085 bind("z_score",&z_score);
00086 bind("smooth_loss_",&smooth_loss_);
00087 smooth_rate_ = 0;
00088 alpha = 1;
00089 smooth_factor = 0.1;
00090 last_time_report_ = NOW;
00091 weight[0] = 1;
00092 weight[1] = 1;
00093 weight[2] = 1;
00094 weight[3] = 1;
00095 weight[4] = 0.8;
00096 weight[5] = 0.6;
00097 weight[6] = 0.4;
00098 weight[7] = 0.2;
00099 }
00100
00104 RTPSession::~RTPSession()
00105 {
00106 while (allsrcs_ != 0) {
00107 RTPSource* p = allsrcs_;
00108 allsrcs_ = allsrcs_->next;
00109 delete p;
00110 }
00111 delete localsrc_;
00112 while (receivers_ != 0) {
00113 RTPReceiver* p = receivers_;
00114 receivers_ = receivers_->next;
00115 delete p;
00116 }
00117 lst.~list<double>();
00118
00119 }
00120
00127 void RTPSession::localsrc_update(int)
00128 {
00129 localsrc_->np(1);
00130 localsrc_->is_sender(true);
00131 }
00132
00139 void RTPSession::localsrc_update_nbytes(int n)
00140 {
00141 localsrc_->nbytes(n);
00142
00143 }
00144
00145 #define RTCP_HDRSIZE 8
00146 #define RTCP_SR_SIZE 20
00147 #define RTCP_RR_SIZE 48
00148
00155 int RTPSession::build_report(int bye)
00156 {
00157 rh_ = new hdr_rtp;
00158 rh_->rr_ = 0;
00159 rh_->sr_ = 0;
00160 int nsrc = 0;
00161 int nrr = 0;
00162 int len = RTCP_HDRSIZE;
00163 we_sent = 0;
00164 double fraction = 0.0;
00165 time_elapsed_ = NOW - last_time_report_;
00166 last_time_report_ = NOW;
00167 if (localsrc_->np() != last_np_) {
00168 last_np_ = localsrc_->np();
00169 we_sent = 1;
00170 len += RTCP_SR_SIZE;
00171
00172 sender_report* sr;
00173
00174 sr = new sender_report;
00175 sr->s_ext_ = 0;
00176 sr->sender_srcid()= localsrc_->srcid();
00177 sr->pkts_sent() = localsrc_->np();
00178 sr->octets_sent() = localsrc_->nbytes();
00179 RTPReceiver *p;
00180 for (p = receivers_; p != 0; p = p->next) {
00181 sr_extensions* se = new sr_extensions;
00182 se->srcid()=p->srcid();
00183 se->eff_rtt()=p->eff_rtt();
00184 se->next = sr->s_ext_;
00185 sr->s_ext_ = se;
00186 }
00187
00188
00189 rh_->sr_ = sr;
00190 }
00191 for (RTPSource* sp = allsrcs_; sp != 0; sp = sp->next) {
00192 ++nsrc;
00193 int received = sp->np() - sp->snp();
00194
00195 if (received == 0) {
00196
00197 continue;
00198 }
00199 if(localsrc_->srcid() != sp->srcid()) {
00200 int bytes = received * (sp->ps());
00201 rx_recv_ = ( 8 * (double)bytes)/time_elapsed_;
00202 sp->snp(sp->np());
00203 len += RTCP_RR_SIZE;
00204
00205 int expected_interval = sp->ehsr() - last_ehsr_;
00206 last_ehsr_ = sp->ehsr();
00207 int lost_interval = expected_interval - received;
00208
00209
00210 if (lost_interval <= 0 || expected_interval == 0 ) {
00211 fraction = 0;
00212
00213 } else fraction = ((double)lost_interval / (double)expected_interval);
00214 calculate_RTT();
00215
00216 if( sp->np() >= 1) {
00217 if(fraction == 0) {
00218 increase_rate(sp->ps());
00219
00220 } else {
00221 measure_smooth_loss(fraction);
00222 calculateR_tcp(sp->ps());
00223 }
00224 }
00225
00226 receiver_report* rr;
00227 rr = new receiver_report;
00228
00229 rr->cum_pkts_lost() = sp->cum_pkts_lost();
00230 rr->LSR() = sp->LSR();
00231 rr->DLSR()= Scheduler::instance().clock() - sp->SRT();
00232 rr->R_tcp() = smooth_rate_;
00233 rr->jitter() = sp->jitter();
00234 if(bye) {
00235 printf ("TIME for BYE %f \n", NOW);
00236 rr->bye()= 1;
00237 remove_sender(sp);
00238 } else rr->bye() = 0;
00239 rh_->rr_ = rr;
00240
00241 if (++nrr >= 31)
00242 break;
00243 }
00244
00245 }
00246
00247
00248 if (bye) {
00249 len += build_bye();
00250 }
00251 else
00252 len += build_sdes();
00253
00254 Tcl::instance().evalf("%s adapt-timer %d %d %d", name(),
00255 nsrc, nrr, we_sent);
00256 Tcl::instance().evalf("%s sample-size %d", name(), len);
00257
00258 return (len);
00259 }
00260
00264 int RTPSession::build_bye()
00265 {
00266
00267 return (8);
00268 }
00272 int RTPSession::build_sdes()
00273 {
00274
00275
00276 return (20);
00277 }
00288 void RTPSession::recv(Packet* p, Handler*)
00289 {
00290 hdr_cmn* mh = hdr_cmn::access(p);
00291 hdr_rtp* rh = hdr_rtp::access(p);
00292 RTPSource* s = lookup(rh->srcid());
00293
00294 if (s == 0) {
00295 if(rh->srcid()!=localsrc_->srcid()){
00296 Tcl& tcl = Tcl::instance();
00297 tcl.evalf("%s new-source %d", name(),rh->srcid());
00298 s = (RTPSource*)TclObject::lookup(tcl.result());
00299 }
00300 }
00301 if(rh->srcid()!=localsrc_->srcid())
00302 {
00303
00304 T_one_way_ = NOW - rh->timestamp();
00305
00306 int pkts_lost = 0;
00307 int difference = rh->seqno() - s->ehsr();
00308 if (difference <= 1) {
00309 pkts_lost = 0;
00310
00311 }
00312
00313 if (difference > 1) {
00314 pkts_lost = difference -1;
00315
00316 }
00317
00318 double transit = NOW - rh->timestamp();
00319 double d = transit - s->transit();
00320 s->transit(transit);
00321 if (d < 0) d = -d;
00322 s->jitter( s->jitter() + (1./16.) * (d - s->jitter()));
00323 evaluate_jitter(s->jitter());
00324 jitter_ = s->jitter();
00325
00326 s->np(1);
00327 s->cum_pkts_lost(pkts_lost);
00328 s->ehsr(rh->seqno());
00329 s->nbytes(mh->size());
00330 s->ps(mh->size());
00331 }
00332 Packet::free(p);
00333 }
00334
00341 void RTPSession::evaluate_jitter(double value)
00342 {
00343 jitter_ = 0.8 * value - 0.2 * jitter_;
00344 jitter_lst.push_front(jitter_);
00345 list<double> temp_lst;
00346 temp_lst = jitter_lst;
00347 int size = jitter_lst.size();
00348 double temp_array[size];
00349 double dev_array[size];
00350 double mean =0;
00351 double count =0;
00352 double std_dev =0;
00353
00354 if (size >1) {
00355 for (int i=0; i<size; i++) {
00356 temp_array[i]=jitter_lst.front();
00357 count += temp_array[i];
00358 jitter_lst.pop_front();
00359 }
00360 mean = count/size;
00361 count =0;
00362 for (int j =0; j<size; j++) {
00363 dev_array[j]=temp_array[j]-mean;
00364 count+=pow(dev_array[j],2);
00365 }
00366 std_dev = sqrt(count/(size-1));
00367 double temp = temp_lst.front();
00368 z_score = (temp - mean)/std_dev;
00369 jitter_lst = temp_lst;
00370 if(-DELTA < z_score < DELTA) {
00371 smooth_factor = LOADED;
00372 } else if ( z_score <= -DELTA) {
00373 smooth_factor = UNLOADED;
00374 } else smooth_factor = CONGESTED;
00375 }
00376 }
00377
00382 void RTPSession::calculate_alpha(double value)
00383 {
00384 if( value == 0) {
00385 alpha = 1;
00386 } else
00387 alpha = (value/T_one_way_) -1;
00388 }
00389
00390
00398 void RTPSession::calculate_RTT()
00399 {
00400 double temp=(1+alpha) * T_one_way_;
00401
00402 RTT_ = temp * 0.1 + 0.9 * RTT_;
00403 }
00404
00413 void RTPSession::increase_rate(int ps)
00414 {
00415 double sample_tx = smooth_rate_ + (double)ps/RTT_;
00416
00417 smooth_rate(sample_tx);
00418 }
00419
00426 void RTPSession::measure_smooth_loss (double fraction)
00427 {
00428
00429 double I_tot0 = 0;
00430 double I_tot1 = 0;
00431 double W_tot = 0;
00432 for (int i=0; i<7; i++) {
00433 pkt_loss_history[i+1]= pkt_loss_history[i];
00434 }
00435 pkt_loss_history[0] = fraction;
00436
00437 for (int i=0; i<7; i++) {
00438 I_tot0 = I_tot0+ (pkt_loss_history[i] * weight[i]);
00439 W_tot = W_tot + weight[i];
00440 }
00441
00442 for (int i=1; i<8; i++) {
00443 I_tot1 = I_tot1 + (pkt_loss_history[i] * weight[i-1]);
00444 }
00445 double I_tot =0;
00446 if( I_tot0 > I_tot1) {
00447 I_tot = I_tot0;
00448 } else {
00449 I_tot = I_tot1;
00450 }
00451 double I_mean = I_tot/W_tot;
00452
00453 smooth_loss_= I_mean;
00454 }
00455
00460 void RTPSession::calculateR_tcp(int ps)
00461 {
00462
00463 double min =0;
00464 double fraction = 3 * sqrt((3 * smooth_loss_) / 8);
00465 if(fraction < 1 ){
00466 min = fraction;
00467 } else min =1;
00468
00469 double sample_tx = (double)ps / (RTT_ * (sqrt(2 * smooth_loss_ /3)) + 4 * (RTT_ * min * smooth_loss_) * (1 + (32 * pow(smooth_loss_,2))));
00470 printf("Time:%f Receiver:%d smooth_loss_:%f \n", NOW,localsrc_->srcid(),smooth_loss_);
00471
00472 smooth_rate(sample_tx);
00473 }
00474
00475
00476
00481 void RTPSession::smooth_rate(double inst_tx)
00482 {
00483 smooth_rate_ = smooth_factor * inst_tx + smooth_rate_ * (1 - smooth_factor);
00484 }
00485
00486
00487
00497 void RTPSession::recv_ctrl(Packet* p)
00498 {
00499 hdr_cmn* mh = hdr_cmn::access(p);
00500 hdr_rtp* rh = hdr_rtp::access(p);
00501 u_int32_t local_src = localsrc_->srcid();
00502
00503 if(rh->srcid() != local_src) {
00504
00505 if (rh->sr_ != 0) {
00506 RTPSource* source = lookup(rh->sr_->sender_srcid());
00507 if (source != 0 && source->srcid()!= local_src) {
00508 source->LSR(rh->timestamp());
00509 source->SRT(NOW);
00510
00511 if(rh->sr_->s_ext_ != 0) {
00512 sr_extensions* se = new sr_extensions;
00513 for (se = rh->sr_->s_ext_; se != 0; se = se->next) {
00514
00515 if(se->srcid()==localsrc_->srcid()) {
00516
00517 if(se->eff_rtt() == 0) {
00518 calculate_alpha(0);
00519 } else calculate_alpha(se->eff_rtt());
00520 }
00521
00522 }
00523 }
00524
00525 }
00526 }
00527
00528 if(localsrc_->is_sender()){
00529
00530 if(rh->rr_ != 0){
00531 RTPReceiver* s = lookup_rcv(rh->srcid());
00532
00533 if (s != 0) {
00534 for (s = receivers_; s!= 0; s = s->next) {
00535
00536 if(rh->srcid() == s->srcid()) {
00537
00538 double eff_rtt = 0.0;
00539
00540 if(rh->rr_->LSR()!=0) {
00541 eff_rtt = NOW - rh->rr_->LSR() - rh->rr_->DLSR();
00542 }
00543 s->eff_rtt(eff_rtt);
00544 s->rate(rh->rr_->R_tcp());
00545 if(rh->rr_->bye() == 1) {
00546 remove_receiver(s);
00547 }
00548
00549 }
00550
00551 }
00552 if(enableFlowControl_ == 1 && receivers_) {
00553 update_rate();
00554 } else {
00555 printf("TIME: %f Sender %d NOT HAVING RECEIVERS \n", NOW,localsrc_->srcid());
00556
00557
00558 }
00559 }
00560 else {
00561 s = new RTPReceiver(rh->srcid());
00562 enter_rcv(s);
00563 }
00564
00565 }
00566 }
00567 }
00568
00569 Tcl::instance().evalf("%s sample-size %d", name(), mh->size());
00570 Packet::free(p);
00571 }
00572
00573
00581 void RTPSession::update_rate()
00582 {
00583 RTPReceiver* s;
00584 for (s = receivers_; s!= 0; s = s->next) {
00585 if(s->rate() !=0) {
00586 lst.push_front(s->rate());
00587 }
00588 }
00589 lst.sort();
00590 double inst_tx =lst.front();
00591 lst.clear();
00592 if(inst_tx != tx_rate_) {
00593
00594 if(inst_tx < (tx_rate_/2)) {
00595 tx_rate_ = tx_rate_/2;
00596 } else tx_rate_ = inst_tx;
00597 printf("Time:%f Sender:%d ...Rate change to:%f \n", NOW,localsrc_->srcid(),8*tx_rate_);
00598 Tcl::instance().evalf("%s session_bw %fb/s",name(),8*tx_rate_);
00599 Tcl::instance().evalf("%s transmit %fb/s",name(),8*tx_rate_);
00600 }
00601
00602 }
00603
00608 hdr_rtp* RTPSession::access_hdr_rtp()
00609 {
00610 return rh_;
00611 }
00612
00619 RTPSource* RTPSession::lookup(u_int32_t srcid)
00620 {
00621 RTPSource *p;
00622 for (p = allsrcs_; p != 0; p = p->next)
00623 {
00624 if (p->srcid() == srcid)
00625 return (p);
00626 }
00627 return (0);
00628 }
00629
00636 RTPReceiver* RTPSession::lookup_rcv(u_int32_t srcid)
00637 {
00638
00639 RTPReceiver *p;
00640 for (p = receivers_; p != 0; p = p->next)
00641 {
00642 if (p->srcid() == srcid)
00643 return (p);
00644 }
00645 return (0);
00646 }
00647
00651 void RTPSession::print_rcv( )
00652 {
00653
00654 RTPReceiver *p;
00655 for (p = receivers_; p != 0; p = p->next)
00656 {
00657 printf("Receiver:%d\n", p->srcid());
00658 }
00659 }
00660
00665 void RTPSession::enter(RTPSource* s)
00666 {
00667 if(s->srcid() != localsrc_->srcid() ) {
00668 printf("Time:%f Receiver:%d ...I add sender %d \n", NOW,localsrc_->srcid(), s->srcid());
00669 s->next = allsrcs_;
00670 allsrcs_ = s;
00671 }
00672 }
00673
00678 void RTPSession::enter_rcv(RTPReceiver* s)
00679 {
00680 printf("Time:%f Sender:%d ...I add receiver %d \n", NOW,localsrc_->srcid(), s->srcid());
00681 s->next = receivers_;
00682 receivers_ = s;
00683 }
00684
00689 void RTPSession::remove_receiver(RTPReceiver* p)
00690 {
00691
00692 printf("Time:%f Sender:%d ...I remove receiver %d \n", NOW,localsrc_->srcid(),p->srcid());
00693 RTPReceiver* s;
00694 RTPReceiver* q;
00695 if (receivers_==p)
00696 {
00697 q = receivers_;
00698 receivers_=receivers_->next;
00699 delete q;
00700 }
00701 else
00702 {
00703 for (s = receivers_; s!=0; s=s->next)
00704 {
00705 if (s->next==p)
00706 {
00707 q = s->next;
00708 s->next=q->next;
00709 delete q;
00710 }
00711 }
00712 }
00713
00714 }
00715
00720 void RTPSession::remove_sender(RTPSource* p)
00721 {
00722 printf("Time:%f Receiver:%d ...I remove sender %d \n", NOW,localsrc_->srcid(), p->srcid());
00723 if(p->srcid() != localsrc_->srcid()) {
00724 RTPSource* s;
00725 RTPSource* q;
00726 if (allsrcs_==p)
00727 {
00728 q = allsrcs_;
00729 allsrcs_= allsrcs_->next;
00730 delete q;
00731 }
00732 else
00733 {
00734 for (s = allsrcs_; s!=0; s=s->next)
00735 {
00736 if (s->next==p)
00737 {
00738 q = s->next;
00739 s->next=q->next;
00740 delete q;
00741 }
00742 }
00743 }
00744
00745 }
00746 rx_recv_= 0;
00747 jitter_=0;
00748 }
00749
00750
00756 void RTPSession::initial_rate(double a)
00757 {
00758 tx_rate_ =(double)a/8;
00759 smooth_rate_ = tx_rate_;
00760
00761 }
00762
00767 int RTPSession::command(int argc, const char*const* argv)
00768 {
00769 if (argc == 3) {
00770 if (strcmp(argv[1], "enter") == 0) {
00771 RTPSource* s = (RTPSource*)TclObject::lookup(argv[2]);
00772 enter(s);
00773 return (TCL_OK);
00774 }
00775 if (strcmp(argv[1], "localsrc") == 0) {
00776 localsrc_ = (RTPSource*)TclObject::lookup(argv[2]);
00777 enter(localsrc_);
00778 return (TCL_OK);
00779 }
00780 if (strcmp(argv[1], "rate") == 0) {
00781 initial_rate(atoi(argv[2]));
00782 return(TCL_OK);
00783 }
00784 }
00785
00786 return (TclObject::command(argc, argv));
00787 }
00788
00793 RTPReceiver::RTPReceiver(u_int32_t srcid)
00794 : next(0),cum_pkts_lost_(0),eff_rtt_(0),rate_(0)
00795 {
00796 srcid_ = srcid;
00797 }
00798
00803 RTPSource::RTPSource(u_int32_t srcid)
00804 : next(0),np_(0), snp_(0), ehsr_(-1), nbytes_(0),cum_pkts_lost_(0),LSR_(0),SRT_(0),is_sender_(false),ps_(0),rate_(0),transit_(0),jitter_(0)
00805 {
00806 bind("srcid_", (int*)&srcid_);
00807 srcid_ = srcid;
00808 }
00809
00810
00811
00812
00813