00001
00035 #include <stdlib.h>
00036 #include "packet.h"
00037 #include "ip.h"
00038 #include "rtp.h"
00039 #include <math.h>
00040 #include "simstream.h"
00041
00042
00043 #define CONGESTED 0.5
00044 #define UNLOADED 0.3
00045 #define LOADED 0.1
00046 #define DELTA 0.75
00047 #define NOW Scheduler::instance().clock()
00048 #define TIMER_TIMEOUT 10
00049 #define INITIAL_TIMEOUT 10
00050
00051
00052
00056 static class RTPSourceClass : public TclClass {
00057 public:
00058 RTPSourceClass() : TclClass("RTPSource") {}
00059 TclObject* create(int argc, const char*const* argv) {
00060 if (argc >= 5)
00061 return (new RTPSource(atoi(argv[4])));
00062
00063 return 0;
00064 }
00065 } class_rtp_source;
00066
00072 static class RTPSessionClass : public TclClass {
00073 public:
00074 RTPSessionClass() : TclClass("Session/RTP") {}
00075 TclObject* create(int, const char*const*) {
00076 return (new RTPSession());
00077 }
00078 } class_rtp_session;
00079
00083 RTPSession::RTPSession()
00084 : allsrcs_(0), localsrc_(0), receivers_(0),stream_(0),last_np_(0),sender_timer_(this), T_one_way_(0),rh_(0),
00085 we_sent(0),last_pkts_lost_(0),last_ehsr_(-1),tx_rate_(0), time_elapsed_(0),running_(0)
00086 {
00087 bind("enableFlowControl_", &enableFlowControl_);
00088 bind("rx_recv_",&rx_recv_);
00089 bind("jitter_",&jitter_);
00090 bind("RTT_",&RTT_);
00091 bind("z_score",&z_score);
00092 bind("smooth_loss_",&smooth_loss_);
00093 bind("level_",&level_);
00094 smooth_rate_ = 0;
00095 alpha = 1;
00096 smooth_factor = 0.9;
00097 last_time_report_ = NOW;
00098 low_ = 0;
00099 high_= 0;
00100 avg_tx_ = 0;
00101
00102 weight[0] = 1;
00103 weight[1] = 1;
00104 weight[2] = 1;
00105 weight[3] = 1;
00106 weight[4] = 0.8;
00107 weight[5] = 0.6;
00108 weight[6] = 0.4;
00109 weight[7] = 0.2;
00110 }
00111
00115 RTPSession::~RTPSession()
00116 {
00117 while (allsrcs_ != 0) {
00118 RTPSource* p = allsrcs_;
00119 allsrcs_ = allsrcs_->next;
00120 delete p;
00121 }
00122 delete localsrc_;
00123 while (receivers_ != 0) {
00124 RTPReceiver* p = receivers_;
00125 receivers_ = receivers_->next;
00126 delete p;
00127 }
00128 lst.~list<double>();
00129 tx_lst.~list<double>();
00130
00131 }
00132
00139 void RTPSession::localsrc_update(int)
00140 {
00141 localsrc_->np(1);
00142 localsrc_->is_sender(true);
00143 }
00144
00151 void RTPSession::localsrc_update_nbytes(int n)
00152 {
00153 localsrc_->nbytes(n);
00154
00155 }
00156
00157 #define RTCP_HDRSIZE 8
00158 #define RTCP_SR_SIZE 20
00159 #define RTCP_RR_SIZE 48
00160
00167 int RTPSession::build_report(int bye)
00168 {
00169
00170
00171 rh_ = new hdr_rtp;
00172 rh_->rr_ = 0;
00173 rh_->sr_ = 0;
00174
00175 int nsrc = 0;
00176 int nrr = 0;
00177 int len = RTCP_HDRSIZE;
00178 we_sent = 0;
00179 double fraction = 0.0;
00180 time_elapsed_ = NOW - last_time_report_;
00181 last_time_report_ = NOW;
00182 if (localsrc_->np() != last_np_) {
00183 last_np_ = localsrc_->np();
00184 we_sent = 1;
00185 len += RTCP_SR_SIZE;
00186
00187 sender_report* sr;
00188
00189 sr = new sender_report;
00190 sr->s_ext_ = 0;
00191 sr->sender_srcid()= localsrc_->srcid();
00192 sr->pkts_sent() = localsrc_->np();
00193 sr->octets_sent() = localsrc_->nbytes();
00194 sr->low_tx()=stream_->getLow_tx();
00195 sr->medium_tx()=stream_->getMedium_tx();
00196 sr->high_tx()=stream_->getHigh_tx();
00197 RTPReceiver *p;
00198 for (p = receivers_; p != 0; p = p->next) {
00199 sr_extensions* se = new sr_extensions;
00200 se->srcid()=p->srcid();
00201 se->eff_rtt()=p->eff_rtt();
00202 se->next = sr->s_ext_;
00203 sr->s_ext_ = se;
00204 }
00205
00206 rh_->sr_ = sr;
00207 }
00208 for (RTPSource* sp = allsrcs_; sp != 0; sp = sp->next) {
00209 ++nsrc;
00210 int received = sp->np() - sp->snp();
00211
00212 if (received == 0) {
00213
00214 continue;
00215 }
00216 if(localsrc_->srcid() != sp->srcid()) {
00217
00218 int bytes = received * (sp->ps());
00219 rx_recv_ = ( 8 * (double)bytes)/time_elapsed_;
00220 stream_->setRcv_rate(rx_recv_);
00221 sp->snp(sp->np());
00222 len += RTCP_RR_SIZE;
00223
00224 int expected_interval = sp->ehsr() - last_ehsr_;
00225 last_ehsr_ = sp->ehsr();
00226 int lost_interval = expected_interval - received;
00227
00228
00229 if (lost_interval <= 0 || expected_interval == 0 ) {
00230 fraction = 0;
00231
00232 } else fraction = ((double)lost_interval / (double)expected_interval);
00233 calculate_RTT();
00234
00235 if( sp->np() >= 1) {
00236 if(fraction == 0) {
00237 increase_rate(sp->ps());
00238
00239 } else {
00240 measure_smooth_loss(fraction);
00241 calculateR_tcp(sp->ps());
00242 }
00243 }
00244
00245 receiver_report* rr;
00246 rr = new receiver_report;
00247
00248 rr->cum_pkts_lost() = sp->cum_pkts_lost();
00249 rr->LSR() = sp->LSR();
00250 rr->DLSR()= Scheduler::instance().clock() - sp->SRT();
00251 rr->R_tcp() = smooth_rate_;
00252 rr->jitter() = sp->jitter();
00253 rr->rx()=stream_->getAvg_rx();
00254 if(bye) {
00255 rr->bye()= 1;
00256 remove_sender(sp);
00257 } else rr->bye() = 0;
00258
00259 rh_->rr_ = rr;
00260
00261 if (++nrr >= 31)
00262 break;
00263 }
00264
00265 }
00266
00267
00268 if (bye) {
00269 len += build_bye();
00270 }
00271 else
00272 len += build_sdes();
00273
00274 Tcl::instance().evalf("%s adapt-timer %d %d %d", name(),
00275 nsrc, nrr, we_sent);
00276 Tcl::instance().evalf("%s sample-size %d", name(), len);
00277
00278 return (len);
00279 }
00280
00284 int RTPSession::build_bye()
00285 {
00286
00287 return (8);
00288 }
00292 int RTPSession::build_sdes()
00293 {
00294
00295
00296 return (20);
00297 }
00308 void RTPSession::recv(Packet* p, Handler*)
00309 {
00310
00311 hdr_cmn* mh = hdr_cmn::access(p);
00312 hdr_rtp* rh = hdr_rtp::access(p);
00313 RTPSource* s = lookup(rh->srcid());
00314
00315 if (s == 0) {
00316 if(rh->srcid()!=localsrc_->srcid()){
00317 Tcl& tcl = Tcl::instance();
00318 tcl.evalf("%s new-source %d", name(),rh->srcid());
00319 s = (RTPSource*)TclObject::lookup(tcl.result());
00320 last_ehsr_ = rh->seqno();
00321 }
00322 }
00323 if(rh->srcid()!=localsrc_->srcid())
00324 {
00325
00326 T_one_way_ = NOW - rh->timestamp();
00327
00328 int pkts_lost = 0;
00329 int difference = rh->seqno() - s->ehsr();
00330 if (difference <= 1) {
00331 pkts_lost = 0;
00332
00333 }
00334
00335 if (difference > 1) {
00336 pkts_lost = difference -1;
00337
00338 }
00339
00340 double transit = NOW - rh->timestamp();
00341 double d = transit - s->transit();
00342 s->transit(transit);
00343 if (d < 0) d = -d;
00344 s->jitter( s->jitter() + (1./16.) * (d - s->jitter()));
00345 evaluate_jitter(s->jitter());
00346 jitter_ = s->jitter();
00347
00348 s->np(1);
00349 s->cum_pkts_lost(pkts_lost);
00350 s->ehsr(rh->seqno());
00351 s->nbytes(mh->size());
00352 s->ps(mh->size());
00353 if(rh->T_epoch() == 1) {
00354 stream_->compare();
00355 }
00356 }
00357 Packet::free(p);
00358 }
00359
00366 void RTPSession::evaluate_jitter(double value)
00367 {
00368 jitter_ = 0.8 * value - 0.2 * jitter_;
00369 jitter_lst.push_front(jitter_);
00370 list<double> temp_lst;
00371 temp_lst = jitter_lst;
00372 int size = jitter_lst.size();
00373 double temp_array[size];
00374 double dev_array[size];
00375 double mean =0;
00376 double count =0;
00377 double std_dev =0;
00378
00379 if (size >1) {
00380 for (int i=0; i<size; i++) {
00381 temp_array[i]=jitter_lst.front();
00382 count += temp_array[i];
00383 jitter_lst.pop_front();
00384 }
00385 mean = count/size;
00386 count =0;
00387 for (int j =0; j<size; j++) {
00388 dev_array[j]=temp_array[j]-mean;
00389 count+=pow(dev_array[j],2);
00390 }
00391 std_dev = sqrt(count/(size-1));
00392 double temp = temp_lst.front();
00393 z_score = (temp - mean)/std_dev;
00394
00395 jitter_lst = temp_lst;
00396 if(-DELTA < z_score < DELTA) {
00397 smooth_factor = LOADED;
00398 } else if ( z_score <= -DELTA) {
00399 smooth_factor = UNLOADED;
00400 } else smooth_factor = CONGESTED;
00401 }
00402 }
00403
00404
00405
00410 void RTPSession::calculate_alpha(double value)
00411 {
00412
00413 if( value == 0) {
00414 alpha = 1;
00415 } else
00416 alpha = (value/T_one_way_) -1;
00417 }
00418
00419
00427 void RTPSession::calculate_RTT()
00428 {
00429 double temp = (1+alpha) * T_one_way_;
00430
00431 RTT_ = temp * 0.1 + 0.9 * RTT_;
00432 }
00433
00442 void RTPSession::increase_rate(int ps)
00443 {
00444 double sample_tx = smooth_rate_ + (double)ps/RTT_;
00445
00446 smooth_rate(sample_tx);
00447 }
00448
00455 void RTPSession::measure_smooth_loss (double fraction)
00456 {
00457
00458 double I_tot0 = 0;
00459 double I_tot1 = 0;
00460 double W_tot = 0;
00461 for (int i=0; i<7; i++) {
00462 pkt_loss_history[i+1]= pkt_loss_history[i];
00463 }
00464 pkt_loss_history[0] = fraction;
00465
00466 for (int i=0; i<7; i++) {
00467 I_tot0 = I_tot0+ (pkt_loss_history[i] * weight[i]);
00468 W_tot = W_tot + weight[i];
00469 }
00470
00471 for (int i=1; i<8; i++) {
00472 I_tot1 = I_tot1 + (pkt_loss_history[i] * weight[i-1]);
00473 }
00474 double I_tot =0;
00475 if( I_tot0 > I_tot1) {
00476 I_tot = I_tot0;
00477 } else {
00478 I_tot = I_tot1;
00479 }
00480 double I_mean = I_tot/W_tot;
00481
00482 smooth_loss_= I_mean;
00483 }
00484
00489 void RTPSession::calculateR_tcp(int ps)
00490 {
00491
00492 double min =0;
00493 double fraction = 3 * sqrt((3 * smooth_loss_) / 8);
00494 if(fraction < 1 ){
00495 min = fraction;
00496 } else min =1;
00497
00498 double sample_tx = (double)ps / (RTT_ * (sqrt(2 * smooth_loss_ /3)) + 4 * (RTT_ * min * smooth_loss_) * (1 + (32 * pow(smooth_loss_,2))));
00499
00500 smooth_rate(sample_tx);
00501 }
00502
00503
00504
00509 void RTPSession::smooth_rate(double inst_tx)
00510 {
00511 smooth_rate_ = smooth_factor * inst_tx + smooth_rate_ * (1 - smooth_factor);
00512 stream_->setInst_rx_rate(smooth_rate_);
00513 }
00514
00515
00516
00526 void RTPSession::recv_ctrl(Packet* p)
00527 {
00528 hdr_cmn* mh = hdr_cmn::access(p);
00529 hdr_rtp* rh = hdr_rtp::access(p);
00530 u_int32_t local_src = localsrc_->srcid();
00531
00532 if(rh->srcid() != local_src) {
00533
00534 if (rh->sr_ != 0) {
00535 RTPSource* source = lookup(rh->sr_->sender_srcid());
00536 if (source != 0 && source->srcid()!= local_src) {
00537 source->LSR(rh->timestamp());
00538 source->SRT(NOW);
00539 stream_->setLow_tx(rh->sr_->low_tx());
00540 stream_->setMedium_tx(rh->sr_->medium_tx());
00541 stream_->setHigh_tx(rh->sr_->high_tx());
00542 if(rh->sr_->s_ext_ != 0) {
00543 sr_extensions* se = new sr_extensions;
00544 for (se = rh->sr_->s_ext_; se != 0; se = se->next) {
00545
00546 if(se->srcid()==localsrc_->srcid()) {
00547
00548 if(se->eff_rtt() == 0) {
00549 calculate_alpha(0);
00550 } else calculate_alpha(se->eff_rtt());
00551 }
00552
00553 }
00554 }
00555
00556 }
00557 }
00558
00559 if(localsrc_->is_sender()){
00560
00561 if(rh->rr_ != 0){
00562 RTPReceiver* s = lookup_rcv(rh->srcid());
00563
00564 if (s != 0) {
00565 for (s = receivers_; s!= 0; s = s->next) {
00566
00567 if(rh->srcid() == s->srcid()) {
00568
00569 double eff_rtt = 0.0;
00570
00571 if(rh->rr_->LSR()!=0) {
00572 eff_rtt = NOW - rh->rr_->LSR() - rh->rr_->DLSR();
00573 }
00574 s->eff_rtt(eff_rtt);
00575 s->rate(rh->rr_->R_tcp());
00576 if(rh->rr_->bye() == 1) {
00577 remove_receiver(s);
00578 }
00579
00580 }
00581
00582 }
00583 if(enableFlowControl_ == 1 && receivers_) {
00584 update_rate();
00585 } else {
00586 printf("TIME: %f Sender %d NOT HAVING RECEIVERS \n", NOW,localsrc_->srcid());
00587 slow_start();
00588
00589 }
00590 }
00591 else {
00592 s = new RTPReceiver(rh->srcid());
00593 enter_rcv(s);
00594 }
00595
00596 }
00597 }
00598 }
00599
00600 Tcl::instance().evalf("%s sample-size %d", name(), mh->size());
00601 Packet::free(p);
00602 }
00603
00612 void RTPSession::update_rate()
00613 {
00614 RTPReceiver* s;
00615 for (s = receivers_; s!= 0; s = s->next) {
00616 if(s->rate() > 0) {
00617 lst.push_front(s->rate());
00618 }
00619 }
00620 lst.sort();
00621 double inst_tx =lst.front();
00622 lst.clear();
00623 if(inst_tx != tx_rate_) {
00624 tx_rate_ = inst_tx;
00625 if(tx_rate_ <= low_) {
00626 tx_rate_ = low_;
00627 }
00628 if(tx_rate_ > high_) {
00629 tx_rate_ = high_;
00630 }
00631 setInst_tx(tx_rate_);
00632 Tcl::instance().evalf("%s session_bw %fb/s",name(),8*tx_rate_);
00633 Tcl::instance().evalf("%s transmit %fb/s",name(),8*tx_rate_);
00634 }
00635 }
00636
00637
00641 void RTPSession::slow_start()
00642 {
00643 tx_rate_ = low_;
00644 setInst_tx(tx_rate_);
00645 printf("Starting Slow start at ...%f\n", tx_rate_ * 8);
00646 Tcl::instance().evalf("%s session_bw %fb/s",name(),8*tx_rate_);
00647 Tcl::instance().evalf("%s transmit %fb/s",name(),8*tx_rate_);
00648 }
00649
00650
00655 hdr_rtp* RTPSession::access_hdr_rtp()
00656 {
00657 return rh_;
00658 }
00659
00666 RTPSource* RTPSession::lookup(u_int32_t srcid)
00667 {
00668 RTPSource *p;
00669 for (p = allsrcs_; p != 0; p = p->next)
00670 {
00671 if (p->srcid() == srcid)
00672 return (p);
00673 }
00674 return (0);
00675 }
00676
00683 RTPReceiver* RTPSession::lookup_rcv(u_int32_t srcid)
00684 {
00685
00686 RTPReceiver *p;
00687 for (p = receivers_; p != 0; p = p->next)
00688 {
00689 if (p->srcid() == srcid)
00690 return (p);
00691 }
00692 return (0);
00693 }
00694
00698 void RTPSession::print_rcvs( )
00699 {
00700
00701 RTPReceiver *p;
00702 for (p = receivers_; p != 0; p = p->next)
00703 {
00704 printf("Receiver:%d\n", p->srcid());
00705 }
00706 }
00707
00711 void RTPSession::print_srcs( )
00712 {
00713
00714 RTPSource* s;
00715 for (s = allsrcs_; s != 0; s = s->next)
00716 {
00717 printf("Source:%d\n", s->srcid());
00718 }
00719 }
00720
00721
00726 void RTPSession::enter(RTPSource* s)
00727 {
00728
00729 s->next = allsrcs_;
00730 allsrcs_ = s;
00731 }
00732
00737 void RTPSession::enter_rcv(RTPReceiver* s)
00738 {
00739 printf("Time:%f Sender:%d ...I add receiver %d \n", NOW,localsrc_->srcid(), s->srcid());
00740 s->next = receivers_;
00741 receivers_ = s;
00742
00743 }
00744
00749 void RTPSession::remove_receiver(RTPReceiver* p)
00750 {
00751
00752 printf("Time:%f Sender:%d ...I remove receiver %d \n", NOW,localsrc_->srcid(),p->srcid());
00753
00754 RTPReceiver* s;
00755 RTPReceiver* q;
00756 if (receivers_== p)
00757 {
00758 q = receivers_;
00759 receivers_= receivers_->next;
00760 delete q;
00761 }
00762 else
00763 {
00764 for (s = receivers_; s!=0; s=s->next)
00765 {
00766 if (s->next==p)
00767 {
00768 q = s->next;
00769 s->next=q->next;
00770 delete q;
00771 }
00772 }
00773 }
00774
00775 }
00776
00781 void RTPSession::remove_sender(RTPSource* p)
00782 {
00783
00784 if(p->srcid() != localsrc_->srcid()) {
00785 RTPSource* s;
00786 RTPSource* q;
00787 if (allsrcs_==p)
00788 {
00789 q = allsrcs_;
00790 allsrcs_= allsrcs_->next;
00791 delete q;
00792 }
00793 else
00794 {
00795 for (s = allsrcs_; s!=0; s=s->next)
00796 {
00797 if (s->next==p)
00798 {
00799 q = s->next;
00800 s->next=q->next;
00801 delete q;
00802 }
00803 }
00804 }
00805
00806 }
00807 }
00808
00814 void RTPSession::initial_rate(double a)
00815 {
00816 tx_rate_ =(double)a/8;
00817 smooth_rate_ = tx_rate_;
00818
00819
00820 }
00821
00826 int RTPSession::command(int argc, const char*const* argv)
00827 {
00828 if (argc == 3) {
00829 if (strcmp(argv[1], "enter") == 0) {
00830 RTPSource* s = (RTPSource*)TclObject::lookup(argv[2]);
00831 enter(s);
00832 return (TCL_OK);
00833 }
00834 if (strcmp(argv[1], "localsrc") == 0) {
00835 localsrc_ = (RTPSource*)TclObject::lookup(argv[2]);
00836 enter(localsrc_);
00837 return (TCL_OK);
00838 }
00839 if (strcmp(argv[1], "rate") == 0) {
00840 initial_rate(atoi(argv[2]));
00841 return(TCL_OK);
00842 }
00843 if (strcmp(argv[1], "stream") == 0) {
00844 stream_ = (SimStream*)TclObject::lookup(argv[2]);
00845 return (TCL_OK);
00846 }
00847 }
00848 if (argc == 4) {
00849 if (strcmp(argv[1], "bw-limits") == 0) {
00850 low_ = atoi(argv[2])/8;
00851 high_ = atoi(argv[3])/8;
00852 return(TCL_OK);
00853 }
00854 }
00855
00856 return (TclObject::command(argc, argv));
00857 }
00858
00863 RTPReceiver::RTPReceiver(u_int32_t srcid)
00864 : next(0),cum_pkts_lost_(0),eff_rtt_(0),rate_(0),sent_report_(false)
00865 {
00866 srcid_ = srcid;
00867 }
00868
00869
00874 RTPSource::RTPSource(u_int32_t srcid)
00875 : next(0),np_(0), snp_(0), ehsr_(-1), nbytes_(0),cum_pkts_lost_(0),LSR_(0),SRT_(0),is_sender_(false),ps_(0),rate_(0),transit_(0),jitter_(0)
00876 {
00877 bind("srcid_", (int*)&srcid_);
00878 srcid_ = srcid;
00879 }
00880
00881
00882
00883
00884 void SenderTimer::expire(Event* ) {
00885 a_->timeout(0);
00886 }
00887
00888 void RTPSession::start_timer()
00889 {
00890 running_ = 1;
00891 sender_timer_.resched(INITIAL_TIMEOUT);
00892
00893 }
00894
00895 void RTPSession::stop_timer()
00896 {
00897 running_ = 0;
00898 sender_timer_.force_cancel();
00899
00900 }
00901
00902
00903
00904 void RTPSession::timeout(int)
00905 {
00906 if(running_) {
00907 average_tx();
00908 stream_->setAvg_tx(avg_tx_,level_);
00909 sender_timer_.resched(TIMER_TIMEOUT);
00910 }
00911 }
00912
00917 void RTPSession::setInst_tx(double newValue)
00918 {
00919 tx_lst.push_front(newValue);
00920 }
00921
00922
00923
00927 void RTPSession::average_tx()
00928 {
00929 int size = tx_lst.size();
00930 double sum = 0;
00931 if(size > 1) {
00932 for(int i = 0; i<size; i++) {
00933 sum += tx_lst.front();
00934 tx_lst.pop_front();
00935 }
00936 avg_tx_ = sum/size;
00937 }
00938 }
00939
00940
00941
00942