rtp_thread.c

Go to the documentation of this file.
00001 /* * 
00002  * This file is part of libnemesi
00003  *
00004  * Copyright (C) 2007 by LScube team <team@streaming.polito.it>
00005  * See AUTHORS for more details
00006  * 
00007  * libnemesi is free software; you can redistribute it and/or
00008  * modify it under the terms of the GNU Lesser General Public
00009  * License as published by the Free Software Foundation; either
00010  * version 2.1 of the License, or (at your option) any later version.
00011  *
00012  * libnemesi is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00015  * Lesser General Public License for more details.
00016  *
00017  * You should have received a copy of the GNU Lesser General Public
00018  * License along with libnemesi; if not, write to the Free Software
00019  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
00020  *  
00021  * */
00022 
00028 #include "rtp.h"
00029 #include "comm.h"
00030 #include "bufferpool.h"
00031 #include "parsers/rtpparsers.h"
00032 #include "utils.h"
00033 
00034 #define PO_BUFF_SIZE_SEC 0
00035 #define PO_BUFF_SIZE_MSEC 700
00036 
00043 static void rtp_clean(void * thrd)
00044 {
00045     rtp_thread *rtp_th = (rtp_thread *) thrd;
00046     rtp_session *rtp_sess = rtp_th->rtp_sess_head;
00047     rtp_session *prev_rtp_sess;
00048     rtp_ssrc *csrc, *psrc;
00049     struct rtp_conflict *conf, *pconf;
00050     rtp_fmts_list *fmtlist, *pfmtlist;
00051     int i;
00052 
00053     nms_printf(NMSML_DBG1, "RTP Thread is dying suicide!\n");
00054 //      pthread_mutex_lock(&rtp_th->syn);
00055 //      pthread_mutex_trylock(&rtp_th->syn);
00056 
00057     while (rtp_sess != NULL) {
00058         close(rtp_sess->transport.RTP.sock.fd);
00059         close(rtp_sess->transport.RTCP.sock.fd);
00060 
00061         csrc = rtp_sess->ssrc_queue;
00062 
00063         while (csrc != NULL) {
00064             psrc = csrc;
00065             csrc = csrc->next;
00066             for (i = 0; i < 9; i++)
00067                 free(((char **) (&(psrc->ssrc_sdes)))[i]);
00068             free(psrc->rtp_from.addr);
00069             free(psrc->rtcp_from.addr);
00070             free(psrc->rtcp_to.addr);
00071             for (i = 0; i < 128; i++)
00072                 if (rtp_sess->parsers_uninits[i])
00073                     rtp_sess->parsers_uninits[i] (psrc, i);
00074             free(psrc->po);
00075             free(psrc);
00076         }
00077         bpkill(rtp_sess->bp);
00078         free(rtp_sess->bp);
00079 
00080         // transport allocs
00081         free((rtp_sess->transport).spec);
00082 
00083         conf = rtp_sess->conf_queue;
00084         while (conf) {
00085             pconf = conf;
00086             conf = conf->next;
00087             free(pconf->transaddr.addr);
00088             free(pconf);
00089         }
00090         // announced rtp payload list
00091         for (fmtlist = rtp_sess->announced_fmts; fmtlist;
00092              pfmtlist = fmtlist, fmtlist =
00093              fmtlist->next, free(pfmtlist));
00094         // rtp payload types definitions attributes
00095         for (i = 0; i < 128; i++)
00096             if (rtp_sess->ptdefs[i]) {
00097                 int j;
00098                 for (j = 0; j< rtp_sess->ptdefs[i]->attrs.size; j++)
00099                     free(rtp_sess->ptdefs[i]->attrs.data[j]);
00100                 free(rtp_sess->ptdefs[i]->attrs.data);
00101             }
00102         // rtp payload types dynamic definitions
00103         for (i = 96; i < 128; free(rtp_sess->ptdefs[i++]));
00104 
00105         prev_rtp_sess = rtp_sess;
00106         rtp_sess = rtp_sess->next;
00107         free(prev_rtp_sess);
00108     }
00109     rtp_th->rtp_sess_head = NULL;
00110 
00111 //      pthread_mutex_unlock(&rtp_th->syn);
00112     free(rtp_th);
00113     nms_printf(NMSML_DBG1, "RTP Thread R.I.P.\n");
00114 }
00115 
00121 static void *rtp(void *args)
00122 {
00123     rtp_thread *thread = args;
00124     rtp_session *rtp_sess_head = thread->rtp_sess_head;
00125     pthread_mutex_t *syn = &thread->syn;
00126     rtp_session *rtp_sess;
00127     /* struct timespec ts; */
00128     int maxfd = 0;
00129 
00130     fd_set readset;
00131     char buffering = 1;
00132 
00133     for (rtp_sess = rtp_sess_head; rtp_sess; rtp_sess = rtp_sess->next)
00134         bpinit(rtp_sess->bp);
00135 
00136     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
00137     pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
00138 /*    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); */
00139     pthread_cleanup_push(rtp_clean, args);
00140 
00141     /* Playout Buffer Size */
00142     /*
00143        dec_args->startime.tv_sec=0;
00144        dec_args->startime.tv_usec=700*(1000);
00145      */
00146     // dec_args->startime.tv_sec=PO_BUFF_SIZE_SEC;
00147     // dec_args->startime.tv_usec=PO_BUFF_SIZE_MSEC*(1000);
00148     /* 500 msec */
00149 
00150     while (1) {
00151         FD_ZERO(&readset);
00152 
00153         for (rtp_sess = rtp_sess_head; rtp_sess;
00154              rtp_sess = rtp_sess->next) {
00155             maxfd = max(rtp_sess->transport.RTP.sock.fd, maxfd);
00156             FD_SET(rtp_sess->transport.RTP.sock.fd, &readset);
00157         }
00158 
00159         select(maxfd + 1, &readset, NULL, NULL, NULL);
00160 
00161         for (rtp_sess = rtp_sess_head; rtp_sess;
00162              rtp_sess = rtp_sess->next)
00163             if (FD_ISSET(rtp_sess->transport.RTP.sock.fd, &readset)) {
00164                 if (buffering) {
00165                     if (rtp_sess->bp->flcount >= thread->prebuffer_size) {
00166                         pthread_mutex_unlock(syn);
00167                         buffering = 0;
00168                         nms_printf(NMSML_NORM, "\rPrebuffer complete.\n");
00169                     } else {    // TODO: buffering based on rtp jitter
00170                         nms_printf(NMSML_NORM, "\rBuffering (%d%%)\t",
00171                                (100 * rtp_sess->bp->flcount) /
00172                                thread->prebuffer_size);
00173                     }
00174                 }
00175                 if (rtp_recv(rtp_sess)) {
00176                     /* Waiting 20 msec for decoder ready */
00177                     nms_printf(NMSML_DBG1,
00178                            "Waiting for decoder ready!\n");
00179                     usleep(20);
00180                 }
00181             }
00182     }
00183 
00184     pthread_cleanup_pop(1);
00185 }
00186 
00192 rtp_thread *rtp_init(void)
00193 {
00194     rtp_thread *rtp_th = NULL;
00195 
00196     if (!(rtp_th = (rtp_thread *) calloc(1, sizeof(rtp_thread)))) {
00197         nms_printf(NMSML_FATAL, "Could not alloc memory!\n");
00198         return NULL;
00199     }
00200 
00201     rtp_parsers_init();
00202 
00203     if (pthread_mutex_init(&(rtp_th->syn), NULL)) {
00204         free(rtp_th);
00205         return NULL;
00206     }
00207 
00208     // use a safe default
00209     rtp_th->prebuffer_size = BP_SLOT_NUM / 2;
00210 
00211     /* Decoder blocked 'till buffering is complete */
00212     pthread_mutex_lock(&(rtp_th->syn));
00213 
00214     return rtp_th;
00215 }
00216 
00225 int rtp_thread_create(rtp_thread * rtp_th)
00226 {
00227     int err;
00228     pthread_attr_t rtp_attr;
00229     rtp_session *rtp_sess;
00230     rtp_fmts_list *fmt;
00231 
00232     pthread_attr_init(&rtp_attr);
00233     if (pthread_attr_setdetachstate(&rtp_attr, PTHREAD_CREATE_JOINABLE) != 0)
00234         return nms_printf(NMSML_FATAL,
00235                   "Cannot set RTP Thread attributes (detach state)\n");
00236 
00237     if ((err = pthread_create(&rtp_th->rtp_tid, 
00238                               &rtp_attr, &rtp, (void *) rtp_th)) > 0)
00239         return nms_printf(NMSML_FATAL, "%s\n", strerror(err));
00240 
00241     for (rtp_sess = rtp_th->rtp_sess_head; rtp_sess;
00242          rtp_sess = rtp_sess->next) {
00243         for (fmt = rtp_sess->announced_fmts; fmt; fmt = fmt->next) {
00244             if (rtp_sess->parsers_inits[fmt->pt]) {
00245                 err = rtp_sess->parsers_inits[fmt->pt] (rtp_sess, fmt->pt);
00246                 if (err)
00247                     return nms_printf(NMSML_FATAL,
00248                             "Cannot init the parser for pt %d\n", fmt->pt);
00249             }
00250         }
00251     }
00252 
00253     rtp_th->run = 1;
00254     return 0;
00255 }

Generated on Tue Feb 3 03:10:02 2009 for libnemesi by  doxygen 1.5.4