rtp_buffer.c

Go to the documentation of this file.
00001 /* * 
00002  * This file is part of libnemesi
00003  *
00004  * Copyright (C) 2007 by LScube team <team@streaming.polito.it>
00005  * See AUTHORS for more details
00006  * 
00007  * libnemesi is free software; you can redistribute it and/or
00008  * modify it under the terms of the GNU Lesser General Public
00009  * License as published by the Free Software Foundation; either
00010  * version 2.1 of the License, or (at your option) any later version.
00011  *
00012  * libnemesi is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00015  * Lesser General Public License for more details.
00016  *
00017  * You should have received a copy of the GNU Lesser General Public
00018  * License along with libnemesi; if not, write to the Free Software
00019  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
00020  *  
00021  * */
00022 
00023 
00029 #include "rtp.h"
00030 #include "rtpptdefs.h"
00031 #include "bufferpool.h"
00032 
00037 int rtp_fill_buffers(rtp_thread * rtp_th)
00038 {
00039     pthread_mutex_lock(&(rtp_th->syn));
00040     pthread_mutex_unlock(&(rtp_th->syn));
00041 
00042     return !rtp_th->run;
00043 }
00044 
00054 int rtp_fill_buffer(rtp_ssrc * stm_src, rtp_frame * fr, rtp_buff * config)
00055 {
00056     rtp_pkt *pkt;
00057     int err;
00058     double ts_jump;
00059 
00060     /* If we did a seek, we must wait for seek reset and bufferpool clean up,
00061      * so wait until rtp_recv receives the first new packet and resets the bufferpool
00062      */
00063     if (stm_src->done_seek) {
00064         usleep(1000);
00065         return RTP_BUFF_EMPTY;
00066     }
00067 
00068     if (!(pkt = rtp_get_pkt(stm_src, NULL))) {
00069         usleep(1000);
00070         return RTP_BUFF_EMPTY;
00071     }
00072 
00073     fr->pt = RTP_PKT_PT(pkt);
00074     fr->timestamp = RTP_PKT_TS(pkt);
00075 
00076     ts_jump = ((double) (fr->timestamp - stm_src->ssrc_stats.lastts)) /
00077         (double) stm_src->rtp_sess->ptdefs[fr->pt]->rate;
00078 
00079 /*
00080     if (ts_jump > 10) {
00081         fprintf(stderr, "Out of sync timestamp: %u - %u: %g for Payload: %d\n", fr->timestamp, stm_src->ssrc_stats.lastts, ts_jump, fr->pt);
00082         stm_src->ssrc_stats.lastts = fr->timestamp;
00083         rtp_rm_pkt(stm_src);
00084         return RTP_BUFF_EMPTY;
00085     }
00086 */
00087 
00088     fr->fps = stm_src->rtp_sess->fps;
00089     stm_src->ssrc_stats.lastts = fr->timestamp;
00090 
00091     while ((err = stm_src->rtp_sess->parsers[fr->pt] (stm_src, fr, config)) 
00092             == EAGAIN);
00093     /*
00094      * The parser can set the timestamp on its own
00095      */
00096     fr->time_sec = ((double) (fr->timestamp - stm_src->ssrc_stats.firstts)) /
00097         (double) stm_src->rtp_sess->ptdefs[fr->pt]->rate;
00098     return err;
00099 }
00100 
00109 double rtp_get_next_ts(rtp_ssrc * stm_src)
00110 {                // TODO: calculate time using RTCP infos
00111     rtp_pkt *pkt;
00112 
00113     if (!(pkt = rtp_get_pkt(stm_src, NULL)))
00114         return -1;
00115 
00116     return ((double) (RTP_PKT_TS(pkt) - stm_src->ssrc_stats.firstts)) /
00117         (double) stm_src->rtp_sess->ptdefs[pkt->pt]->rate;
00118 }
00119 
00127 int16_t rtp_get_next_pt(rtp_ssrc * stm_src)
00128 {
00129     rtp_pkt *pkt;
00130 
00131     if (!(pkt = rtp_get_pkt(stm_src, NULL)))
00132         return RTP_BUFF_EMPTY;
00133 
00134     return pkt->pt;
00135 }
00136 
00142 void rtp_update_fps(rtp_ssrc * stm_src, uint32_t timestamp, unsigned pt) {
00143     if (timestamp != stm_src->rtp_sess->ptdefs[pt]->prev_timestamp) {
00144         stm_src->rtp_sess->fps =
00145         (double) stm_src->rtp_sess->ptdefs[pt]->rate/
00146         abs(timestamp - stm_src->rtp_sess->ptdefs[pt]->prev_timestamp);
00147         stm_src->rtp_sess->ptdefs[pt]->prev_timestamp = timestamp;
00148     }
00149 }
00150 
00156 float rtp_get_fps(rtp_ssrc * stm_src)
00157 {
00158     return stm_src->rtp_sess->fps;
00159 }
00160 
00175 rtp_pkt *rtp_get_n_pkt(rtp_ssrc * stm_src, unsigned int *len, unsigned int pkt_num)
00176 {                // TODO complete;
00177     int buffer_index;
00178 
00179     pthread_mutex_lock(&(stm_src->po->po_mutex));
00180     buffer_index = stm_src->po->potail;
00181     while ((buffer_index >= 0) && (pkt_num-- > 0))
00182         buffer_index = stm_src->po->pobuff[buffer_index].next;
00183     pthread_mutex_unlock(&(stm_src->po->po_mutex));
00184 
00185     if (buffer_index < 0)
00186         return NULL;
00187 
00188     if (len)
00189         *len = (stm_src->po->pobuff[buffer_index]).pktlen;
00190 
00191     return (rtp_pkt *) (*(stm_src->po->bufferpool) + buffer_index);
00192 }
00193 
00211 rtp_pkt *rtp_get_pkt(rtp_ssrc * stm_src, size_t * len)
00212 {
00213     int index;
00214 
00215     do {
00216         pthread_mutex_lock(&(stm_src->po->po_mutex));
00217         index = stm_src->po->potail;
00218         pthread_mutex_unlock(&(stm_src->po->po_mutex));
00219 
00220         if (index < 0) return NULL;
00221     } while (!stm_src->rtp_sess->
00222          ptdefs[((rtp_pkt *) (*(stm_src->po->bufferpool) +
00223                       index))->pt]
00224          &&
00225          /* always true - XXX be careful if bufferpool API changes -> */
00226          !rtp_rm_pkt(stm_src));
00227 
00228     if (len)
00229         *len = (stm_src->po->pobuff[index]).pktlen;
00230 
00231     return (rtp_pkt *) (*(stm_src->po->bufferpool) + index);
00232 }
00233 
00239 inline int rtp_rm_pkt(rtp_ssrc * stm_src)
00240 {
00241     return bprmv(stm_src->rtp_sess->bp, stm_src->po,
00242              stm_src->po->potail);
00243 }
00244 
00249 static void socket_clear(rtp_ssrc * stm_src)
00250 {
00251     rtp_session * rtp_sess = stm_src->rtp_sess;
00252     char buffer[BP_SLOT_SIZE];
00253     fd_set readset;
00254     struct timeval timeout;
00255 
00256     memset(&timeout, 0, sizeof(struct timeval));
00257 
00258     while(1) {
00259         FD_ZERO(&readset);
00260         FD_SET(rtp_sess->transport.RTP.sock.fd, &readset);
00261 
00262         select(rtp_sess->transport.RTP.sock.fd + 1, &readset, NULL, NULL, &timeout);
00263 
00264         if (FD_ISSET(rtp_sess->transport.RTP.sock.fd, &readset))
00265             recvfrom(rtp_sess->transport.RTP.sock.fd, buffer, BP_SLOT_SIZE, 0,
00266                          NULL, NULL);
00267         else
00268             break;
00269     }
00270 }
00271 
00276 void rtp_rm_all_pkts(rtp_ssrc * stm_src)
00277 {
00278     playout_buff * po = stm_src->po;
00279     buffer_pool * bp = stm_src->rtp_sess->bp;
00280 
00281     //Clear the RECV BUFFER
00282     socket_clear(stm_src);
00283 
00284     //Clear PLAYOUTBUFFER and Bufferpool
00285     pthread_mutex_lock(&(po->po_mutex));
00286     pthread_mutex_lock(&(bp->fl_mutex));
00287     while(po->potail >= 0) {
00288         int index = po->potail;
00289 
00290         if (po->pobuff[index].next != -1)
00291             po->pobuff[po->pobuff[index].next].prev =
00292                 po->pobuff[index].prev;
00293         else
00294             po->potail = po->pobuff[index].prev;
00295         if (po->pobuff[index].prev != -1)
00296             po->pobuff[po->pobuff[index].prev].next =
00297                 po->pobuff[index].next;
00298         else
00299             po->pohead = po->pobuff[index].next;
00300 
00301         po->pocount--;
00302 
00303         bp->freelist[index] = bp->flhead;
00304         bp->flhead = index;
00305         bp->flcount--;
00306     }
00307 
00308     pthread_cond_signal(&(bp->cond_full));
00309     pthread_mutex_unlock(&(bp->fl_mutex));
00310     pthread_mutex_unlock(&(po->po_mutex));
00311 }

Generated on Tue Feb 3 03:10:02 2009 for libnemesi by  doxygen 1.5.4