00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00029 #include "rtp.h"
00030 #include "rtpptdefs.h"
00031 #include "bufferpool.h"
00032
00037 int rtp_fill_buffers(rtp_thread * rtp_th)
00038 {
00039 pthread_mutex_lock(&(rtp_th->syn));
00040 pthread_mutex_unlock(&(rtp_th->syn));
00041
00042 return !rtp_th->run;
00043 }
00044
00054 int rtp_fill_buffer(rtp_ssrc * stm_src, rtp_frame * fr, rtp_buff * config)
00055 {
00056 rtp_pkt *pkt;
00057 int err;
00058 double ts_jump;
00059
00060
00061
00062
00063 if (stm_src->done_seek) {
00064 usleep(1000);
00065 return RTP_BUFF_EMPTY;
00066 }
00067
00068 if (!(pkt = rtp_get_pkt(stm_src, NULL))) {
00069 usleep(1000);
00070 return RTP_BUFF_EMPTY;
00071 }
00072
00073 fr->pt = RTP_PKT_PT(pkt);
00074 fr->timestamp = RTP_PKT_TS(pkt);
00075
00076 ts_jump = ((double) (fr->timestamp - stm_src->ssrc_stats.lastts)) /
00077 (double) stm_src->rtp_sess->ptdefs[fr->pt]->rate;
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088 fr->fps = stm_src->rtp_sess->fps;
00089 stm_src->ssrc_stats.lastts = fr->timestamp;
00090
00091 while ((err = stm_src->rtp_sess->parsers[fr->pt] (stm_src, fr, config))
00092 == EAGAIN);
00093
00094
00095
00096 fr->time_sec = ((double) (fr->timestamp - stm_src->ssrc_stats.firstts)) /
00097 (double) stm_src->rtp_sess->ptdefs[fr->pt]->rate;
00098 return err;
00099 }
00100
00109 double rtp_get_next_ts(rtp_ssrc * stm_src)
00110 {
00111 rtp_pkt *pkt;
00112
00113 if (!(pkt = rtp_get_pkt(stm_src, NULL)))
00114 return -1;
00115
00116 return ((double) (RTP_PKT_TS(pkt) - stm_src->ssrc_stats.firstts)) /
00117 (double) stm_src->rtp_sess->ptdefs[pkt->pt]->rate;
00118 }
00119
00127 int16_t rtp_get_next_pt(rtp_ssrc * stm_src)
00128 {
00129 rtp_pkt *pkt;
00130
00131 if (!(pkt = rtp_get_pkt(stm_src, NULL)))
00132 return RTP_BUFF_EMPTY;
00133
00134 return pkt->pt;
00135 }
00136
00142 void rtp_update_fps(rtp_ssrc * stm_src, uint32_t timestamp, unsigned pt) {
00143 if (timestamp != stm_src->rtp_sess->ptdefs[pt]->prev_timestamp) {
00144 stm_src->rtp_sess->fps =
00145 (double) stm_src->rtp_sess->ptdefs[pt]->rate/
00146 abs(timestamp - stm_src->rtp_sess->ptdefs[pt]->prev_timestamp);
00147 stm_src->rtp_sess->ptdefs[pt]->prev_timestamp = timestamp;
00148 }
00149 }
00150
00156 float rtp_get_fps(rtp_ssrc * stm_src)
00157 {
00158 return stm_src->rtp_sess->fps;
00159 }
00160
00175 rtp_pkt *rtp_get_n_pkt(rtp_ssrc * stm_src, unsigned int *len, unsigned int pkt_num)
00176 {
00177 int buffer_index;
00178
00179 pthread_mutex_lock(&(stm_src->po->po_mutex));
00180 buffer_index = stm_src->po->potail;
00181 while ((buffer_index >= 0) && (pkt_num-- > 0))
00182 buffer_index = stm_src->po->pobuff[buffer_index].next;
00183 pthread_mutex_unlock(&(stm_src->po->po_mutex));
00184
00185 if (buffer_index < 0)
00186 return NULL;
00187
00188 if (len)
00189 *len = (stm_src->po->pobuff[buffer_index]).pktlen;
00190
00191 return (rtp_pkt *) (*(stm_src->po->bufferpool) + buffer_index);
00192 }
00193
00211 rtp_pkt *rtp_get_pkt(rtp_ssrc * stm_src, size_t * len)
00212 {
00213 int index;
00214
00215 do {
00216 pthread_mutex_lock(&(stm_src->po->po_mutex));
00217 index = stm_src->po->potail;
00218 pthread_mutex_unlock(&(stm_src->po->po_mutex));
00219
00220 if (index < 0) return NULL;
00221 } while (!stm_src->rtp_sess->
00222 ptdefs[((rtp_pkt *) (*(stm_src->po->bufferpool) +
00223 index))->pt]
00224 &&
00225
00226 !rtp_rm_pkt(stm_src));
00227
00228 if (len)
00229 *len = (stm_src->po->pobuff[index]).pktlen;
00230
00231 return (rtp_pkt *) (*(stm_src->po->bufferpool) + index);
00232 }
00233
00239 inline int rtp_rm_pkt(rtp_ssrc * stm_src)
00240 {
00241 return bprmv(stm_src->rtp_sess->bp, stm_src->po,
00242 stm_src->po->potail);
00243 }
00244
00249 static void socket_clear(rtp_ssrc * stm_src)
00250 {
00251 rtp_session * rtp_sess = stm_src->rtp_sess;
00252 char buffer[BP_SLOT_SIZE];
00253 fd_set readset;
00254 struct timeval timeout;
00255
00256 memset(&timeout, 0, sizeof(struct timeval));
00257
00258 while(1) {
00259 FD_ZERO(&readset);
00260 FD_SET(rtp_sess->transport.RTP.sock.fd, &readset);
00261
00262 select(rtp_sess->transport.RTP.sock.fd + 1, &readset, NULL, NULL, &timeout);
00263
00264 if (FD_ISSET(rtp_sess->transport.RTP.sock.fd, &readset))
00265 recvfrom(rtp_sess->transport.RTP.sock.fd, buffer, BP_SLOT_SIZE, 0,
00266 NULL, NULL);
00267 else
00268 break;
00269 }
00270 }
00271
00276 void rtp_rm_all_pkts(rtp_ssrc * stm_src)
00277 {
00278 playout_buff * po = stm_src->po;
00279 buffer_pool * bp = stm_src->rtp_sess->bp;
00280
00281
00282 socket_clear(stm_src);
00283
00284
00285 pthread_mutex_lock(&(po->po_mutex));
00286 pthread_mutex_lock(&(bp->fl_mutex));
00287 while(po->potail >= 0) {
00288 int index = po->potail;
00289
00290 if (po->pobuff[index].next != -1)
00291 po->pobuff[po->pobuff[index].next].prev =
00292 po->pobuff[index].prev;
00293 else
00294 po->potail = po->pobuff[index].prev;
00295 if (po->pobuff[index].prev != -1)
00296 po->pobuff[po->pobuff[index].prev].next =
00297 po->pobuff[index].next;
00298 else
00299 po->pohead = po->pobuff[index].next;
00300
00301 po->pocount--;
00302
00303 bp->freelist[index] = bp->flhead;
00304 bp->flhead = index;
00305 bp->flcount--;
00306 }
00307
00308 pthread_cond_signal(&(bp->cond_full));
00309 pthread_mutex_unlock(&(bp->fl_mutex));
00310 pthread_mutex_unlock(&(po->po_mutex));
00311 }