免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 3209 | 回复: 7
打印 上一主题 下一主题

[NetBSD] 多进程编程求助 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2006-08-14 23:16 |只看该作者 |倒序浏览
现在在NetBSD环境中编写一个I/O benchmark测试程序,大家帮我分析一下这样做法对不对?

为了测试存储设备(如磁盘)的性能,NetBSD自带一些小程序如rawIO, randread等都可以测试磁盘的随机或者顺序读写性能,但不符合我的要求。我现在想使用真实的trace作为

I/O请求,这样的测试结果更能代表磁盘的真实环境中的性能。但这样做有些问题,由于NetBSD不支持异步I/O,在单进程环境下,发出一个I/O请求必须等待其完成之后才可以发送

下一个I/O请求,那么对于在上一个请求未完成之后而必须按照trace的下一个时间戳必须发送下一个请求的请求,单进程就无能为力;我设想的一个解决办法是,将其改成多进程的测

试程序,如果上一个请求未完成而又需要发送下一个请求,则在另外一个进程内发送.基于上述思想,我做了个小程序,主要的设计流程如下:
1.主进程将trace文件的I/O请求序列(时间戳,请求地址,请求长度,读或者写)读取到内存;
2.1主进程创建多个fifo管道,作为主进程与执行I/O的通信通道;
2.2主进程fork几个I/O执行进程,其任务是打开fifo管道,不断读取是否有写入的数据,如果有,则执行对块设备的I/O请求并记录每次I/O的响应时间;
3.1主进程初始化一个定时器;
3.2定时器定时发送一个信号,信号接收函数检查当前时间是否到达下一个时间戳,如果到达,则将这个I/O
请求通过fifo管道写入到某个子进程上;
4.主进程等待;
5.定时器检查到trace中所有的请求发送完毕,就发送一个信号,让所有执行进程退出;
6.主进程一直等待所有子进程结束,然后打印所有I/O请求的响应时间;
7.主进程退出;

目前的进展是,主进程能够fork多个子进程用于执行I/O操作,也能够通过定时器发送
信号与子进程进行通讯,子进程也能够按照信号要求执行对应的I/O操作,但目前的问题在于:
主进程等待这一步,我采用的是while(1);导致所有的子进程接收到信号也不立即开始执行I/O
操作,一直等待所有的trace的请求全部分发完毕才开始,就好像所有的子进程都被主进程的while(1);阻塞了,这样
显然无法满足时间戳的对应要求,请大侠看看,帮我出出主意.谢谢了

论坛徽章:
0
2 [报告]
发表于 2006-08-15 10:03 |只看该作者
请各位大侠帮忙看看!

论坛徽章:
2
亥猪
日期:2014-03-19 16:36:35午马
日期:2014-11-23 23:48:46
3 [报告]
发表于 2006-08-15 10:34 |只看该作者
文字太多,没怎么看明白。依感觉说 你把while换成sleep试试

论坛徽章:
0
4 [报告]
发表于 2006-08-15 11:00 |只看该作者
实际上问题只有一点:
为什么主进程运行while的同时,其子进程好像也被阻塞了

论坛徽章:
0
5 [报告]
发表于 2006-08-15 19:39 |只看该作者
顶!

论坛徽章:
2
亥猪
日期:2014-03-19 16:36:35午马
日期:2014-11-23 23:48:46
6 [报告]
发表于 2006-08-16 08:07 |只看该作者
能否贴上核心逻辑代码片断

论坛徽章:
0
7 [报告]
发表于 2006-08-16 09:46 |只看该作者
完整的代码如下:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <sys/file.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/mman.h>
#include <unistd.h>
#include <sys/errno.h>
#include <sys/param.h>
#include <sys/wait.h>
#include <sys/resource.h>
#include <signal.h>

#ifdef __osf__
#include <sys/ioctl.h>
#endif
#ifdef BSD4_4
#include <sys/disklabel.h>
#endif

#ifdef __NetBSD__
#include <errno.h>
#endif

#ifdef __alpha
#define        Int64           long
#define        Quad           "l"
#define        atoi64(x)       strtol(x, NULL, 10)
#else
#define        Int64   long long
#define        Quad   "ll"
#define        atoi64(x)       strtoq(x, NULL, 10)
#endif

#ifdef __osf__
pid_t wait4 (pid_t process_id, int *status_location, int options, struct rusage *resource_usage);
#endif

#define USEC_PER_SEC 1000000
/* get time of day */
#define GETTIME(_t) /*microtime(&(_t))*/gettimeofday(&(_t),NULL)
#define TIMEVAL_DIFF(_start_,_end_,_diff_) { \
        if ((_end_)->tv_usec < (_start_)->tv_usec) { \
                (_diff_)->tv_usec = ((_end_)->tv_usec + USEC_PER_SEC) \
                                - (_start_)->tv_usec; \
                (_diff_)->tv_sec = ((_end_)->tv_sec-1) - (_start_)->tv_sec; \
        } \
        else { \
                (_diff_)->tv_usec = (_end_)->tv_usec - (_start_)->tv_usec; \
                (_diff_)->tv_sec  = (_end_)->tv_sec  - (_start_)->tv_sec; \
        } \
}

#define         BLOCK_SIZE                                 512
#define         MAX_TRACE_COUNT                 100008

#define         WRITE                                          0x00000000
#define         READ                                            0x00000001

#define         MAXCHILD                                 256         /* maximum number of processes to start */

#define         SHUTDOWN_CODE                      99999999

typedef struct io_ti{
        struct timeval starttime;
        struct timeval endtime;
        struct timeval elpsdtime;       
}io_time;

typedef struct io_tr{
  double time ;
  int    blkno ;
  int    blkcount;
  unsigned int flag;
} io_trace ;



/*global variants*/
int proc;                                                    /* index of this process in childinfo (child) */
pid_t apid [MAXCHILD];
int nproc = 8;                                                    /* (default) number to start */

int filedes;
FILE* fifofp[MAXCHILD];

int dev_blksize;

io_trace trace[MAX_TRACE_COUNT];       
struct timeval begin_time[MAX_TRACE_COUNT];       

io_time* testtime;

unsigned long request_no;
unsigned long trace_num;

/*function declaration*/
int  trreader(char * filename, io_trace * trace);
void init_sigaction(void);
void send_request(int signo);
int do_io(int filedes,unsigned int rwflag, int blkaddr, int blklength, char* io_buffer);
void io_thread(int proc);



int main (int argc, unsigned char *argv [])
{
          FILE * fp;
        int retcode;
        int i;
       struct stat sb;

       unsigned long request_shutdown,sec_time,microsec_time;
          
        char* fifoname[16]={
                "fifo_0", "fifo_1", "fifo_2", "fifo_3", "fifo_4", "fifo_5", "fifo_6", "fifo_7", "fifo_8", "fifo_9", "fifo_a", "fifo_b", "fifo_c", "fifo_d", "fifo_e", "fifo_f",
        };

#ifdef BSD4_4
  testtime = mmap (NULL,
                    MAX_TRACE_COUNT * sizeof (struct io_ti),
                    PROT_READ | PROT_WRITE,
                    MAP_INHERIT | MAP_SHARED | MAP_ANON,
                    -1,
                    (off_t) 0 );
#else
  testtime = mmap (NULL,
                    MAX_TRACE_COUNT * sizeof (struct io_ti),
                    PROT_READ | PROT_WRITE,
                    MAP_SHARED | MAP_ANON,
                    -1,
                    (off_t) 0 );
#endif
  if (testtime == (struct io_ti *) MAP_FAILED)
    {
    fprintf (stderr, "Can't mmap shared memory: %s\n", strerror (errno));
    exit (1);
    }

        /*read trace */
        trace_num = trreader("./mytrace", trace);
        if(trace_num <= 0)
                    printf("error \n");
        printf("%d records\n", trace_num);

        trace_num=100;

        if ((filedes = open ("/dev/raid0e", O_RDWR, 0)) < 0)
            {
                    fprintf (stderr, "Can't open device: %s\n", strerror (errno));
                    exit (1);
            }

        /*get the device size*/
        retcode=fstat(filedes, &sb);
        dev_blksize=sb.st_size /BLOCK_SIZE;
        printf("block device capacity:%d blocks\n",dev_blksize);


        /*create fifo*/
        for (proc = 0; proc < nproc; proc++)
        {
                retcode=mkfifo(fifoname[proc], S_IWGRP|S_IWOTH);
                if(retcode!=0){
                            fprintf (stderr, "mkfifo %s failed: %s\n", fifoname[proc], strerror (errno));
                            exit (1);
                }
        }

      /* First, start up the procs */
      for (proc = 0; proc < nproc; proc++)                    /* spawn some processes */
        {
        pid_t pid = fork ();
        if (pid == 0)
          io_thread (proc);
        else if (pid < 0)
          {
          fprintf (stderr, "Can't fork process %d: %s\n", proc, strerror (errno));
          exit (1);
          }
        else
          apid [proc] = pid;
              }
          
      for (proc = 0; proc < nproc; proc++)
        {
                if((fifofp[proc] = fopen(fifoname[proc], "w")) == NULL) {
                            printf("open fifo file %s error\n", fifoname[proc]);
                            return -1 ;
                  }
              }
          
        init_sigaction();

        struct itimerval mytimer;
        mytimer.it_value.tv_sec=0;
        mytimer.it_value.tv_usec=10000;  /*10ms*/
        mytimer.it_interval=mytimer.it_value;
        setitimer(ITIMER_REAL ,&mytimer,NULL);



#if 1
        while(1){
                if(request_no==SHUTDOWN_CODE)
                        break;
                else
                        sleep(1);
        }
#endif
  

      /* Now wait for them to finish */
      for (proc = 0; proc < nproc; proc++)
        {
                int status;
                pid_t pid;

                pid = waitpid (apid [proc], &status, 0);

                if (pid < 0)
                  {
                          fprintf (stderr, "Can't wait4: %s\n", strerror (errno));
                          exit (1);
                  }
        }
          
        close(filedes);
       
        for(i=0; i<trace_num; i++)
                {
                printf("request send time=%d.%06d\n",(int) begin_time[i].tv_sec, (int) begin_time[i].tv_usec);               
                printf("io_thread(%d):starttime=%d.%06d endtime=%d.%06d responsetime=%d.%06d\n",i,
                (int) testtime[i].starttime.tv_sec, (int) testtime[i].starttime.tv_usec,
                (int) testtime[i].endtime.tv_sec, (int) testtime[i].endtime.tv_usec,
                (int) testtime[i].elpsdtime.tv_sec, (int) testtime[i].elpsdtime.tv_usec);
                }

}


int  trreader(char * filename, io_trace * trace)
{
          FILE * fp ;
          int i ;
        int count ;

        if((fp = fopen(filename, "r")) == NULL) {
                    printf("open trace file error \n");
                    return -1 ;
          }

          i = 0 ;
  
        while(! feof(fp)){
                    if((count = fread(&trace[i], sizeof(io_trace), 1, fp)) < 1)
                              break;
                            i++ ;
         }
       
          fclose(fp);

          return i ; // return the total number of trace entry.
}


void init_sigaction(void)
{
        struct sigaction act;
        act.sa_handler=send_request;
        act.sa_flags=0;
        sigemptyset(&act.sa_mask);
        sigaction(SIGALRM,&act,NULL);
}

void send_request(int signo)
{
        if(request_no<trace_num){
                gettimeofday (&begin_time[request_no], NULL);       
                fwrite(&request_no, sizeof(unsigned long), 1, fifofp[(request_no % nproc)]);
                request_no++;
        }
        else{
                if(request_no==trace_num){
                        for (proc = 0; proc < nproc; proc++)
                        {
                                request_no=SHUTDOWN_CODE;
                                fwrite(&request_no, sizeof(unsigned long), 1, fifofp[proc]);               
                                 fclose(fifofp[proc]);       
                              }

                /*disable the timer*/
                struct itimerval mytimer2;
                mytimer2.it_value.tv_sec=0;
                mytimer2.it_value.tv_usec=0;  /*disable a timer*/
                mytimer2.it_interval=mytimer2.it_value;
                setitimer(ITIMER_REAL, &mytimer2, NULL);

                }
        }

}


/*
* Do one block of I/O, read or write depending on the current mode.
* Return 0 on success, -1 on failure.
*/
int do_io(int filedes,unsigned int rwflag, int blkaddr, int blklength, char* io_buffer)
{
          int retval;
        off_t pos;                                                    /* where we actually seek to */

        if ((pos = lseek (filedes, blkaddr*BLOCK_SIZE, SEEK_SET)) != blkaddr*BLOCK_SIZE)
        fprintf (stderr,
                 "can't seek to %" Quad "x (%" Quad "x): %s\n",
                 blkaddr*BLOCK_SIZE,
                 pos,
                 strerror (errno));

            if (rwflag && READ)
                retval = read(filedes, io_buffer, (size_t) blklength * BLOCK_SIZE);
            else
                retval = write(filedes, io_buffer, (size_t) blklength * BLOCK_SIZE);
/*#define DEBUG*/
#ifdef DEBUG
                fprintf(stderr, "%s filedes=%d, buffer=0x%lx, blkaddr=%d,  blksize=%d\n",
                            (rwflag && READ) ? "read" : "write", filedes, (long) io_buffer, blkaddr,
                            blklength );
#endif
            if (retval == -1) {
                fprintf(stderr, "Error %s %s: %s\n",
                            (rwflag && READ)  ? "reading" : "writing",
                            "(unknown file)", strerror(errno));
                return -1;
           }
            if (retval != blklength * BLOCK_SIZE) {
                fprintf(stderr, "Error %s %d bytes from %s instead of %d.\n",
                            (rwflag && READ)  ? "read" : "wrote",
                            retval, "(unknown file)", blklength * BLOCK_SIZE);
                return -1;
            }
            return 0;
}


void io_thread(int proc)
{
        char buf[128*512];
        FILE* fp;
      unsigned long request,i;

        char* fifoname[16]={
                "fifo_0", "fifo_1", "fifo_2", "fifo_3", "fifo_4", "fifo_5", "fifo_6", "fifo_7", "fifo_8", "fifo_9", "fifo_a", "fifo_b", "fifo_c", "fifo_d", "fifo_e", "fifo_f",
        };

       
        if((fp = fopen(fifoname[proc], "r")) == NULL) {
                    printf("open fifo file error\n");
                    exit(-1) ;
          }

        while(1){
       
        fread(&request, sizeof(unsigned long), 1, fp);
        if(request==SHUTDOWN_CODE)
                goto end;
       
        i=request;

        if(trace[i].blkno>=dev_blksize)
                trace[i].blkno=trace[i].blkno % dev_blksize;
                       
        GETTIME(testtime[i].starttime);
        do_io(filedes,trace[i].flag,trace[i].blkno,trace[i].blkcount,buf);
        GETTIME(testtime[i].endtime);
        TIMEVAL_DIFF(&testtime[i].starttime, &testtime[i].endtime, &testtime[i].elpsdtime);
#if 0
        printf("io_thread(%d):starttime=%d.%06d endtime=%d.%06d responsetime=%d.%06d\n",proc,
                (int) testtime[i].starttime.tv_sec, (int) testtime[i].starttime.tv_usec,
                (int) testtime[i].endtime.tv_sec, (int) testtime[i].endtime.tv_usec,
                (int) testtime[i].elpsdtime.tv_sec, (int) testtime[i].elpsdtime.tv_usec);       
#endif

        }

end:
        fclose(fp);

        exit(0);
}

论坛徽章:
0
8 [报告]
发表于 2006-08-20 16:40 |只看该作者
顶一下。。。。。
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP