- 论坛徽章:
- 0
|
本人是Linux C初学者,现在需要写一个实时读取加速网卡数据并将其负载写盘的程序。我现在写的程序实现4个线程同时从网卡接收数据流,并为每一个流建立一个文件写入其负载。目前这种模式已经跑通。但是随着数据量加大,这种模式一般写到1000个文件左右,文件指针就会耗尽,程序报fopen failed! 请各位高手给想想办法,改变一下程序结构,让这个程序既能实时按每个流一个文件的形式落下负载,同时又不把文件指针耗尽。代码如下:- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <assert.h>
- #include <time.h>
- #include <fcntl.h>
- #include <sys/types.h>
- #include <unistd.h>
- #include <sys/stat.h>
- #include <sys/time.h>
- #include <arpa/inet.h>
- #include <pthread.h>
- #include "libpag.h"
- #include "eb_pcap.h"
- typedef struct receive_thread_info {
- pthread_t tid;
- int sid;
- unsigned int file_start_num;
- unsigned int file_finish_num;
- } receive_thread_info_t;
- /*
- * create a file
- */
- static FILE* get_file(receive_thread_info_t *info_p)
- {
- char file_name[16];
- FILE* fp;
- memset(file_name, 0, 16);
- sprintf(file_name, "%d_%d", info_p->sid, info_p->file_start_num);
- strcat(file_name, ".txt");
- //为每个流建立一个文件
- fp = fopen(file_name, "wb");
- if (fp == NULL) {
- printf("%s open faild!\n", file_name);
- exit(1);
- }
- return fp;
- }
- /*
- * write packet to the file
- */
- static void write_pkt(struct pktinfo *pktinfo_ptr)
- {
- struct tcp_stream *tcp = pktinfo_ptr->pdata;
- FILE *fp = (FILE *)tcp->appinfo;
- struct half_stream *half_p = pag_gethalfstream(pktinfo_ptr);
- if (half_p == NULL) {
- return;
- }
- //写入负载
- if (half_p->count_new > 0) {
- fwrite(half_p->data, half_p->count_new, 1 , fp);
- fflush(fp);
- }
- }
- static void deal_stream(struct pktinfo *pktinfo_ptr, receive_thread_info_t *info_p)
- {
- struct tcp_stream *tcp;
- FILE *fp;
-
- tcp = pktinfo_ptr->pdata;
- if (tcp->appinfo != NULL) {
- write_pkt(pktinfo_ptr);
- } else {
- fp = get_file(info_p);
- info_p->file_start_num++;
- tcp->appinfo = (void *)fp;
- write_pkt(pktinfo_ptr);
- }
-
- if (pag_isstreamclosed(pktinfo_ptr)) {
- if (tcp->appinfo != NULL) {
- fp = (FILE *)tcp->appinfo;
- fclose(fp);
- info_p->file_finish_num++;
- tcp->appinfo = NULL;
- }
- pag_delstream(pktinfo_ptr->pdata);
- }
- }
- /*
- *receive thread, get stream infomation
- */
- void * receive_pthread(void *arg)
- {
- receive_thread_info_t *info_p = (receive_thread_info_t *)arg;
- struct pktinfo *pktinfo_p = NULL;
-
- while (1) {
- /* 从网卡的一个数据流队列中读取数据流信息,可能是流也可能是包信息 */
- pktinfo_p = pag_getstream(info_p->sid);
- /* 判断:如果是udp 包则不处理 */
- if (pktinfo_p == NULL || pktinfo_p->ipproto != 0x06) {
- usleep(10);
- continue;
- }
- /* 对每个tcp包进行处理 */
- deal_stream(pktinfo_p, info_p);
- }
- return NULL;
- }
- int main(int argc, char **argv)
- {
- int i, ret;
- void *value_ptr;
- int sid_counter = 4;
- if ((ret = pag_open()) == -1){
- printf("pag_open error\n");
- exit(1);
- }
- pag_setlinknum(500);
- receive_thread_info_t * info_p = (receive_thread_info_t *)calloc(sid_counter,
- sizeof(receive_thread_info_t));
- if (info_p == NULL) {
- printf("calloc error\n");
- exit(1);
- }
-
-
- /* 建立线程 */
- for (i = 0; i < sid_counter; i++) {
- info_p[i].file_start_num = 0;
- info_p[i].file_finish_num = 0;
- info_p[i].sid = i;
- pthread_create(&info_p[i].tid,
- NULL, receive_pthread, info_p + i);
- }
- for (i = 0; i < sid_counter; i++) {
- pthread_join(info_p[i].tid, &value_ptr);
- }
- pag_close();
- free(info_p);
- exit(1);
- }
复制代码 |
|