免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 4914 | 回复: 6
打印 上一主题 下一主题

多线程写文件问题,求指点 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2011-11-03 21:24 |只看该作者 |倒序浏览
本人是Linux C初学者,现在需要写一个实时读取加速网卡数据并将其负载写盘的程序。我现在写的程序实现4个线程同时从网卡接收数据流,并为每一个流建立一个文件写入其负载。目前这种模式已经跑通。但是随着数据量加大,这种模式一般写到1000个文件左右,文件指针就会耗尽,程序报fopen failed! 请各位高手给想想办法,改变一下程序结构,让这个程序既能实时按每个流一个文件的形式落下负载,同时又不把文件指针耗尽。代码如下:
  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <string.h>
  4. #include <assert.h>
  5. #include <time.h>
  6. #include <fcntl.h>
  7. #include <sys/types.h>
  8. #include <unistd.h>
  9. #include <sys/stat.h>
  10. #include <sys/time.h>
  11. #include <arpa/inet.h>
  12. #include <pthread.h>
  13. #include "libpag.h"
  14. #include "eb_pcap.h"

  15. typedef struct receive_thread_info {
  16.     pthread_t tid;
  17.     int sid;
  18.     unsigned int file_start_num;
  19.     unsigned int file_finish_num;
  20. } receive_thread_info_t;

  21. /*
  22. * create a file
  23. */
  24. static FILE* get_file(receive_thread_info_t *info_p)
  25. {
  26.     char file_name[16];
  27.     FILE* fp;
  28.     memset(file_name, 0, 16);
  29.     sprintf(file_name, "%d_%d", info_p->sid, info_p->file_start_num);
  30.     strcat(file_name, ".txt");
  31.         //为每个流建立一个文件
  32.     fp = fopen(file_name, "wb");
  33.     if (fp == NULL) {
  34.         printf("%s open faild!\n", file_name);
  35.         exit(1);       
  36.     }
  37.     return fp;
  38. }

  39. /*
  40. * write packet to the file
  41. */
  42. static void write_pkt(struct pktinfo *pktinfo_ptr)
  43. {
  44.     struct tcp_stream *tcp = pktinfo_ptr->pdata;
  45.     FILE *fp = (FILE *)tcp->appinfo;
  46.     struct half_stream *half_p = pag_gethalfstream(pktinfo_ptr);
  47.     if (half_p == NULL) {
  48.         return;
  49.     }
  50.    //写入负载
  51.     if (half_p->count_new > 0) {
  52.         fwrite(half_p->data, half_p->count_new, 1 , fp);
  53.         fflush(fp);
  54.     }
  55. }

  56. static void deal_stream(struct pktinfo *pktinfo_ptr, receive_thread_info_t *info_p)
  57. {
  58.     struct tcp_stream *tcp;
  59.     FILE *fp;
  60.    
  61.     tcp = pktinfo_ptr->pdata;
  62.     if (tcp->appinfo != NULL) {
  63.         write_pkt(pktinfo_ptr);            
  64.     } else {
  65.             fp = get_file(info_p);
  66.             info_p->file_start_num++;
  67.             tcp->appinfo = (void *)fp;
  68.             write_pkt(pktinfo_ptr);
  69.     }
  70.    
  71.     if (pag_isstreamclosed(pktinfo_ptr)) {
  72.         if (tcp->appinfo != NULL) {
  73.             fp = (FILE *)tcp->appinfo;
  74.             fclose(fp);
  75.             info_p->file_finish_num++;
  76.             tcp->appinfo = NULL;
  77.         }
  78.         pag_delstream(pktinfo_ptr->pdata);
  79.     }
  80. }

  81. /*
  82. *receive thread, get stream infomation
  83. */
  84. void * receive_pthread(void *arg)
  85. {
  86.     receive_thread_info_t *info_p = (receive_thread_info_t *)arg;
  87.     struct pktinfo *pktinfo_p = NULL;

  88.    
  89.     while (1) {
  90.             /* 从网卡的一个数据流队列中读取数据流信息,可能是流也可能是包信息 */
  91.             pktinfo_p = pag_getstream(info_p->sid);
  92.              /* 判断:如果是udp 包则不处理 */
  93.             if (pktinfo_p == NULL || pktinfo_p->ipproto != 0x06) {
  94.                 usleep(10);
  95.                 continue;
  96.             }
  97.             /* 对每个tcp包进行处理 */
  98.             deal_stream(pktinfo_p, info_p);
  99.     }
  100.     return NULL;
  101. }

  102. int main(int argc, char **argv)
  103. {
  104.     int i, ret;
  105.     void *value_ptr;
  106.     int sid_counter = 4;
  107.     if ((ret = pag_open()) == -1){
  108.             printf("pag_open error\n");
  109.             exit(1);
  110.     }
  111.     pag_setlinknum(500);
  112.     receive_thread_info_t * info_p = (receive_thread_info_t *)calloc(sid_counter,
  113.             sizeof(receive_thread_info_t));
  114.     if (info_p == NULL) {
  115.         printf("calloc error\n");
  116.         exit(1);
  117.     }
  118.    
  119.       
  120.     /* 建立线程 */
  121.     for (i = 0; i < sid_counter; i++) {
  122.         info_p[i].file_start_num = 0;
  123.         info_p[i].file_finish_num = 0;
  124.         info_p[i].sid = i;
  125.         pthread_create(&info_p[i].tid,
  126.             NULL, receive_pthread, info_p + i);
  127.     }

  128.     for (i = 0; i < sid_counter; i++) {
  129.             pthread_join(info_p[i].tid, &value_ptr);
  130.     }
  131.     pag_close();
  132.     free(info_p);
  133.     exit(1);
  134. }
复制代码

论坛徽章:
1
2015年辞旧岁徽章
日期:2015-03-03 16:54:15
2 [报告]
发表于 2011-11-04 07:41 |只看该作者

  1. /*
  2. * write packet to the file
  3. */
  4. static void write_pkt(struct pktinfo *pktinfo_ptr)
  5. {
  6.     struct tcp_stream *tcp = pktinfo_ptr->pdata;
  7.     FILE *fp = (FILE *)tcp->appinfo;
  8.     struct half_stream *half_p = pag_gethalfstream(pktinfo_ptr);
  9.     if (half_p == NULL) {
  10.         return;
  11.     }
  12.    //写入负载
  13.     if (half_p->count_new > 0) {
  14.         fwrite(half_p->data, half_p->count_new, 1 , fp);
  15.         fflush(fp);
  16.     }
  17.     fclose(fp);    // 关闭文件指针
  18. }
复制代码

论坛徽章:
0
3 [报告]
发表于 2011-11-04 14:02 |只看该作者
linux下一个进程可以打开的文件指针是有限制的

论坛徽章:
0
4 [报告]
发表于 2011-11-05 21:34 |只看该作者
不用的文件要close啊

论坛徽章:
0
5 [报告]
发表于 2011-11-06 19:23 |只看该作者
对一个文件处理完了,要记得关闭!

论坛徽章:
0
6 [报告]
发表于 2011-11-07 10:33 |只看该作者
贴了这么多源码,为什么不直接上伪代码呢?或者流程图?

论坛徽章:
2
技术图书徽章
日期:2015-12-16 09:12:1619周年集字徽章-庆
日期:2019-09-12 16:09:19
7 [报告]
发表于 2011-11-07 18:09 |只看该作者
本人是Linux C初学者,现在需要写一个实时读取加速网卡数据并将其负载写盘的程序。我现在写的程序实现4个线 ...
colrain_h 发表于 2011-11-03 21:24



    要有一個open之後,對應一個close.
如果是沒法知道是否open之後有關掉的.
好像可以用:
        fdn = getdtablesize();
        for(i=0; i<fdn; i++)
                close(i);
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP