00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00028 #include "rtp.h"
00029 #include "comm.h"
00030 #include "bufferpool.h"
00031 #include "parsers/rtpparsers.h"
00032 #include "utils.h"
00033
00034 #define PO_BUFF_SIZE_SEC 0
00035 #define PO_BUFF_SIZE_MSEC 700
00036
00043 static void rtp_clean(void * thrd)
00044 {
00045 rtp_thread *rtp_th = (rtp_thread *) thrd;
00046 rtp_session *rtp_sess = rtp_th->rtp_sess_head;
00047 rtp_session *prev_rtp_sess;
00048 rtp_ssrc *csrc, *psrc;
00049 struct rtp_conflict *conf, *pconf;
00050 rtp_fmts_list *fmtlist, *pfmtlist;
00051 int i;
00052
00053 nms_printf(NMSML_DBG1, "RTP Thread is dying suicide!\n");
00054
00055
00056
00057 while (rtp_sess != NULL) {
00058 close(rtp_sess->transport.RTP.sock.fd);
00059 close(rtp_sess->transport.RTCP.sock.fd);
00060
00061 csrc = rtp_sess->ssrc_queue;
00062
00063 while (csrc != NULL) {
00064 psrc = csrc;
00065 csrc = csrc->next;
00066 for (i = 0; i < 9; i++)
00067 free(((char **) (&(psrc->ssrc_sdes)))[i]);
00068 free(psrc->rtp_from.addr);
00069 free(psrc->rtcp_from.addr);
00070 free(psrc->rtcp_to.addr);
00071 for (i = 0; i < 128; i++)
00072 if (rtp_sess->parsers_uninits[i])
00073 rtp_sess->parsers_uninits[i] (psrc, i);
00074 free(psrc->po);
00075 free(psrc);
00076 }
00077 bpkill(rtp_sess->bp);
00078 free(rtp_sess->bp);
00079
00080
00081 free((rtp_sess->transport).spec);
00082
00083 conf = rtp_sess->conf_queue;
00084 while (conf) {
00085 pconf = conf;
00086 conf = conf->next;
00087 free(pconf->transaddr.addr);
00088 free(pconf);
00089 }
00090
00091 for (fmtlist = rtp_sess->announced_fmts; fmtlist;
00092 pfmtlist = fmtlist, fmtlist =
00093 fmtlist->next, free(pfmtlist));
00094
00095 for (i = 0; i < 128; i++)
00096 if (rtp_sess->ptdefs[i]) {
00097 int j;
00098 for (j = 0; j< rtp_sess->ptdefs[i]->attrs.size; j++)
00099 free(rtp_sess->ptdefs[i]->attrs.data[j]);
00100 free(rtp_sess->ptdefs[i]->attrs.data);
00101 }
00102
00103 for (i = 96; i < 128; free(rtp_sess->ptdefs[i++]));
00104
00105 prev_rtp_sess = rtp_sess;
00106 rtp_sess = rtp_sess->next;
00107 free(prev_rtp_sess);
00108 }
00109 rtp_th->rtp_sess_head = NULL;
00110
00111
00112 free(rtp_th);
00113 nms_printf(NMSML_DBG1, "RTP Thread R.I.P.\n");
00114 }
00115
00121 static void *rtp(void *args)
00122 {
00123 rtp_thread *thread = args;
00124 rtp_session *rtp_sess_head = thread->rtp_sess_head;
00125 pthread_mutex_t *syn = &thread->syn;
00126 rtp_session *rtp_sess;
00127
00128 int maxfd = 0;
00129
00130 fd_set readset;
00131 char buffering = 1;
00132
00133 for (rtp_sess = rtp_sess_head; rtp_sess; rtp_sess = rtp_sess->next)
00134 bpinit(rtp_sess->bp);
00135
00136 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
00137 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
00138
00139 pthread_cleanup_push(rtp_clean, args);
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150 while (1) {
00151 FD_ZERO(&readset);
00152
00153 for (rtp_sess = rtp_sess_head; rtp_sess;
00154 rtp_sess = rtp_sess->next) {
00155 maxfd = max(rtp_sess->transport.RTP.sock.fd, maxfd);
00156 FD_SET(rtp_sess->transport.RTP.sock.fd, &readset);
00157 }
00158
00159 select(maxfd + 1, &readset, NULL, NULL, NULL);
00160
00161 for (rtp_sess = rtp_sess_head; rtp_sess;
00162 rtp_sess = rtp_sess->next)
00163 if (FD_ISSET(rtp_sess->transport.RTP.sock.fd, &readset)) {
00164 if (buffering) {
00165 if (rtp_sess->bp->flcount >= thread->prebuffer_size) {
00166 pthread_mutex_unlock(syn);
00167 buffering = 0;
00168 nms_printf(NMSML_NORM, "\rPrebuffer complete.\n");
00169 } else {
00170 nms_printf(NMSML_NORM, "\rBuffering (%d%%)\t",
00171 (100 * rtp_sess->bp->flcount) /
00172 thread->prebuffer_size);
00173 }
00174 }
00175 if (rtp_recv(rtp_sess)) {
00176
00177 nms_printf(NMSML_DBG1,
00178 "Waiting for decoder ready!\n");
00179 usleep(20);
00180 }
00181 }
00182 }
00183
00184 pthread_cleanup_pop(1);
00185 }
00186
00192 rtp_thread *rtp_init(void)
00193 {
00194 rtp_thread *rtp_th = NULL;
00195
00196 if (!(rtp_th = (rtp_thread *) calloc(1, sizeof(rtp_thread)))) {
00197 nms_printf(NMSML_FATAL, "Could not alloc memory!\n");
00198 return NULL;
00199 }
00200
00201 rtp_parsers_init();
00202
00203 if (pthread_mutex_init(&(rtp_th->syn), NULL)) {
00204 free(rtp_th);
00205 return NULL;
00206 }
00207
00208
00209 rtp_th->prebuffer_size = BP_SLOT_NUM / 2;
00210
00211
00212 pthread_mutex_lock(&(rtp_th->syn));
00213
00214 return rtp_th;
00215 }
00216
00225 int rtp_thread_create(rtp_thread * rtp_th)
00226 {
00227 int err;
00228 pthread_attr_t rtp_attr;
00229 rtp_session *rtp_sess;
00230 rtp_fmts_list *fmt;
00231
00232 pthread_attr_init(&rtp_attr);
00233 if (pthread_attr_setdetachstate(&rtp_attr, PTHREAD_CREATE_JOINABLE) != 0)
00234 return nms_printf(NMSML_FATAL,
00235 "Cannot set RTP Thread attributes (detach state)\n");
00236
00237 if ((err = pthread_create(&rtp_th->rtp_tid,
00238 &rtp_attr, &rtp, (void *) rtp_th)) > 0)
00239 return nms_printf(NMSML_FATAL, "%s\n", strerror(err));
00240
00241 for (rtp_sess = rtp_th->rtp_sess_head; rtp_sess;
00242 rtp_sess = rtp_sess->next) {
00243 for (fmt = rtp_sess->announced_fmts; fmt; fmt = fmt->next) {
00244 if (rtp_sess->parsers_inits[fmt->pt]) {
00245 err = rtp_sess->parsers_inits[fmt->pt] (rtp_sess, fmt->pt);
00246 if (err)
00247 return nms_printf(NMSML_FATAL,
00248 "Cannot init the parser for pt %d\n", fmt->pt);
00249 }
00250 }
00251 }
00252
00253 rtp_th->run = 1;
00254 return 0;
00255 }