12.output_run函数
位置:mjpg-streamer/plugins/output_http/output_http.c
功能:为客户端提供服务,传输视频数据
函数结构比较简单,创建线程,线程函数为server_thread
/******************************************************************************
Description.: This creates and starts the server thread
Input Value.: id determines which server instance to send commands to
Return Value: always 0
******************************************************************************/
int output_run(int id) {
DBG("launching server thread #%02d\n", id);
/* create thread and pass context to thread function */
pthread_create(&(servers[id].threadID), NULL, server_thread, &(servers[id])); //创建线程,线程函数为server_thread
pthread_detach(servers[id].threadID); //等待线程退出时回收资源,不阻塞线程
return 0;
}
13.server_thread函数
位置:mjpg-streamer/plugins/output_http/httpd.c
功能:创建TCP socket,为客户端提供网络服务
主要调用函数socket、bin、listen、accept,最后调用pthread_create创建线程,线程函数为client_thread。
/******************************************************************************
Description.: Open a TCP socket and wait for clients to connect. If clients
connect, start a new thread for each accepted connection.
Input Value.: arg is a pointer to the globals struct
Return Value: always NULL, will only return on exit
******************************************************************************/
void *server_thread( void *arg ) {
int on;
pthread_t client;
struct addrinfo *aip, *aip2;
struct addrinfo hints;
struct sockaddr_storage client_addr;
socklen_t addr_len = sizeof(struct sockaddr_storage);
fd_set selectfds;
int max_fds = 0;
char name[NI_MAXHOST];
int err;
int i;
context *pcontext = arg;
pglobal = pcontext->pglobal;
/* set cleanup handler to cleanup ressources */
pthread_cleanup_push(server_cleanup, pcontext);
bzero(&hints, sizeof(hints));
hints.ai_family = PF_UNSPEC;
hints.ai_flags = AI_PASSIVE;
hints.ai_socktype = SOCK_STREAM;
snprintf(name, sizeof(name), "%d", ntohs(pcontext->conf.port));
if((err = getaddrinfo(NULL, name, &hints, &aip)) != 0) {
perror(gai_strerror(err));
exit(EXIT_FAILURE);
}
for(i = 0; i < MAX_SD_LEN; i++)
pcontext->sd = -1;
/* open sockets for server (1 socket / address family) */
i = 0;
for(aip2 = aip; aip2 != NULL; aip2 = aip2->ai_next)
{
if((pcontext->sd = socket(aip2->ai_family, aip2->ai_socktype, 0)) < 0) { // 1.建立一个socket通信
continue;
}
/* ignore "socket already in use" errors */
on = 1;
if(setsockopt(pcontext->sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
perror("setsockopt(SO_REUSEADDR) failed");
}
/* IPv6 socket should listen to IPv6 only, otherwise we will get "socket already in use" */
on = 1;
if(aip2->ai_family == AF_INET6 && setsockopt(pcontext->sd, IPPROTO_IPV6, IPV6_V6ONLY,
(const void *)&on , sizeof(on)) < 0) {
perror("setsockopt(IPV6_V6ONLY) failed");
}
/* perhaps we will use this keep-alive feature oneday */
/* setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof(on)); */
if(bind(pcontext->sd, aip2->ai_addr, aip2->ai_addrlen) < 0) { // 2. 绑定
perror("bind");
pcontext->sd = -1;
continue;
}
if(listen(pcontext->sd, 10) < 0) { // 3.侦听
perror("listen");
pcontext->sd = -1;
} else {
i++;
if(i >= MAX_SD_LEN) {
OPRINT("%s(): maximum number of server sockets exceeded", __FUNCTION__);
i--;
break;
}
}
}
pcontext->sd_len = i;
if(pcontext->sd_len < 1) {
OPRINT("%s(): bind(%d) failed", __FUNCTION__, htons(pcontext->conf.port));
closelog();
exit(EXIT_FAILURE);
}
/* create a child for every client that connects */
while ( !pglobal->stop ) {
//int *pfd = (int *)malloc(sizeof(int));
cfd *pcfd = malloc(sizeof(cfd));
if (pcfd == NULL) {
fprintf(stderr, "failed to allocate (a very small amount of) memory\n");
exit(EXIT_FAILURE);
}
DBG("waiting for clients to connect\n");
do {
FD_ZERO(&selectfds);
for(i = 0; i < MAX_SD_LEN; i++) {
if(pcontext->sd != -1) {
FD_SET(pcontext->sd, &selectfds);
if(pcontext->sd > max_fds)
max_fds = pcontext->sd;
}
}
err = select(max_fds + 1, &selectfds, NULL, NULL, NULL);
if (err < 0 && errno != EINTR) {
perror("select");
exit(EXIT_FAILURE);
}
} while(err <= 0);
for(i = 0; i < max_fds + 1; i++) {
if(pcontext->sd != -1 && FD_ISSET(pcontext->sd, &selectfds)) {
pcfd->fd = accept(pcontext->sd, (struct sockaddr *)&client_addr, &addr_len); // 4.接收请求
pcfd->pc = pcontext;
/* start new thread that will handle this TCP connected client */
DBG("create thread to handle client that just established a connection\n");
if(getnameinfo((struct sockaddr *)&client_addr, addr_len, name, sizeof(name), NULL, 0, NI_NUMERICHOST) == 0) {
syslog(LOG_INFO, "serving client: %s\n", name);
}
if( pthread_create(&client, NULL, &client_thread, pcfd) != 0 ) { // 5. 创建线程client
DBG("could not launch another client thread\n");
close(pcfd->fd);
free(pcfd);
continue;
}
pthread_detach(client); // 6.等待线程client 退出时回收资源,不阻塞线程
}
}
}
DBG("leaving server thread, calling cleanup function now\n");
pthread_cleanup_pop(1); //将清除函数弹出清除栈
return NULL;
}
14.client_thread函数
位置:mjpg-streamer/plugins/output_http/httpd.c
功能:为客户端发送数据
主要调用函数send_snapshot和send_stream发socket发送数据
772行:
case A_SNAPSHOT:
DBG("Request for snapshot\n");
send_snapshot(lcfd.fd); // 6.发送快照
break;
case A_STREAM:
DBG("Request for stream\n");
send_stream(lcfd.fd); //7.发送数据
15.send_snapshot和send_stream函数
位置:mjpg-streamer/plugins/output_http/httpd.c
功能:发送数据
主要调用write发送数据,虽然兜来兜去,其实write的参数fd就是上面创建的socket:
320行:
/* send header and image now */ //8. 往socket写入数据
if( write(fd, buffer, strlen(buffer)) < 0 || \
write(fd, frame, frame_size) < 0) {
free(frame);
return;
}
348行:
if ( write(fd, buffer, strlen(buffer)) < 0 ) { //往socket写入数据
free(frame);
return;
}
16.write函数
位置:系统函数
功能:往文件写入数据,这里的文件指的是socket。
至此,mjpg-streamer代码总算分析完毕。本次只分析mjpg_streamer.c和input_uvc和output_http这3部分,其它部分不再分析。
另外过程中出现的一些技术也没做过多描述,如:
1.多线性
2.V4L2接口
这些内容不是三言两语就能说明白,也没理解透彻,留作后面捣鼓。
|