免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 1035 | 回复: 0
打印 上一主题 下一主题

进程间通讯-无名管道 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2008-10-02 16:12 |只看该作者 |倒序浏览
管道机制是所有unix都愿意提供的一种进程通信机制。管道是进程之间一个单向的数据流,在POSIX标准中只定义了半双工管道,既每个进程在使用一个文件描述符之前必须把另一个描述符关闭。在Linux中实现有所不同:每个管道的文件描述符仍然是单向的,但是在使用一个描述符时不必关闭另一个描述符。
管道操作主要涉及两个数据结构,pipe_inode_info和pipe_buffer.
在2.6中管道的缓冲区已经增加到16个,每个缓冲区对应一个页面(在高端内存中),这个改变大大增强了向管道写大量数据的用户态应用的性能。
1.管道的创建:

pipe系统调用有sys_pipe处理:
asmlinkage int sys_pipe(unsigned long __user * fildes)
{
int fd[2];
int error;
error = do_pipe(fd);
if (!error) {
   //将描述符拷贝到用户空间
  if (copy_to_user(fildes, fd, 2*sizeof(int)))
   error = -EFAULT;
}
return error;
}
它会调用do_pipe:
int do_pipe(int *fd)
{
       struct file *fw, *fr;
       int error;
       int fdw, fdr;

       fw = create_write_pipe();
       if (IS_ERR(fw))
              return PTR_ERR(fw);
       fr = create_read_pipe(fw);
       error = PTR_ERR(fr);
       if (IS_ERR(fr))
              goto err_write_pipe;

       error = get_unused_fd();
       if (error
              goto err_read_pipe;
       fdr = error;

       error = get_unused_fd();
       if (error
              goto err_fdr;
       fdw = error;

       error = audit_fd_pair(fdr, fdw);
       if (error
              goto err_fdw;

       fd_install(fdr, fr);
       fd_install(fdw, fw);
       fd[0] = fdr;
       fd[1] = fdw;

       return 0;

err_fdw:
       put_unused_fd(fdw);
err_fdr:
       put_unused_fd(fdr);
err_read_pipe:
       dput(fr->f_dentry);
       mntput(fr->f_vfsmnt);
       put_filp(fr);
err_write_pipe:
       free_write_pipe(fw);
       return error;
}
由于创建的过程比较简单,所以代码就不在这讲解了,主要就是用两个file对象引用同一个inode

2.管道的读操作:
先看一下读操作的过程:
  

对应的函数是pipe_read:

static ssize_t
pipe_read(struct kiocb *iocb, const struct iovec *_iov,
    unsigned long nr_segs, loff_t pos)
{
struct file *filp = iocb->ki_filp;
struct inode *inode = filp->f_path.dentry->d_inode;
struct pipe_inode_info *pipe;
int do_wakeup;
ssize_t ret;
struct iovec *iov = (struct iovec *)_iov;
size_t total_len;
//待读数据长度
total_len = iov_length(iov, nr_segs);
/* Null read succeeds. */
if (unlikely(total_len == 0))
  return 0;
do_wakeup = 0;
ret = 0;
mutex_lock(&inode->i_mutex);
pipe = inode->i_pipe;
for (;;) {
  // 获取管道大小,如果待读数据包含的缓冲区个数为0,证明管道中并没有待读数据
  int bufs = pipe->nrbufs;
  if (bufs) {
   //管道大小>0
   //获得第一个数据所在的管道索引
   int curbuf = pipe->curbuf;
   //第一个数据所在的缓冲区,是16个中的一个
   struct pipe_buffer *buf = pipe->bufs + curbuf;
   const struct pipe_buf_operations *ops = buf->ops;
   void *addr;
   //缓冲中有效数据的长度
   size_t chars = buf->len;
   int error, atomic;
   //如果有效数据长度>待读长度
   if (chars > total_len)
    chars = total_len;
   error = ops->pin(pipe, buf);
   if (error) {
    if (!ret)
     error = ret;
    break;
   }
   atomic = !iov_fault_in_pages_write(iov, chars);
redo:

   addr = ops->map(pipe, buf, atomic);
   
   //具体拷贝过程
   error = pipe_iov_copy_to_user(iov, addr + buf->offset, chars, atomic);
   
   ops->unmap(pipe, buf, addr);
   
   if (unlikely(error)) {
    /*
     * Just retry with the slow path if we failed.
     */
    if (atomic) {
     atomic = 0;
     goto redo;
    }
    if (!ret)
     ret = error;
    break;
   }
   ret += chars;
   buf->offset += chars;
   buf->len -= chars;
   if (!buf->len) {
    //如果这个缓冲空了,则读下一个缓冲
    buf->ops = NULL;
    //如果这个缓冲空了,释放对应的页
    ops->release(pipe, buf);
    //到下一个缓冲区
    curbuf = (curbuf + 1) & (PIPE_BUFFERS-1);
    pipe->curbuf = curbuf;
    //有效数据缓冲区个数减一
    pipe->nrbufs = --bufs;
    do_wakeup = 1;
   }
   total_len -= chars;
   if (!total_len)
    //如果待读数据都读出
    break; /* common path: read succeeded */
   //如果total_len不为0,证明并没有获得全部数据
  }
  if (bufs) /* More to do? */
   //其他缓冲区还有有效数据,继续循环读下一个缓冲区
   continue;
  if (!pipe->writers)
   break;
  //有没有等待的写者
  if (!pipe->waiting_writers) {
   /* syscall merging: Usually we must not sleep
    * if O_NONBLOCK is set, or if we got some data.
    * But if a writer sleeps in kernel space, then
    * we can wait for that data without violating POSIX.
    */
   
   //没有等待的写者
   
   if (ret)
    //已经读了一部分数据,返回读出的数据长度
    break;
   //没有读过数据
   
   if (filp->f_flags & O_NONBLOCK) {
    //不是阻塞的
    ret = -EAGAIN;
    break;
   }
   //且可以阻塞
  }
  //有写者等待
  
  if (signal_pending(current)) {
   if (!ret)
    ret = -ERESTARTSYS;
   break;
  }
  if (do_wakeup) {
   wake_up_interruptible_sync(&pipe->wait);
    kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
  }
  //读者等待
  pipe_wait(pipe);
}
mutex_unlock(&inode->i_mutex);
/* Signal writers asynchronously that there is more room. */
if (do_wakeup) {
  wake_up_interruptible(&pipe->wait);
  kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
}
if (ret > 0)
  file_accessed(filp);
return ret;
}

结合上图应该不难看明白

3.管道的写:
过程如下:

static ssize_t
pipe_write(struct kiocb *iocb, const struct iovec *_iov,
     unsigned long nr_segs, loff_t ppos)
{
struct file *filp = iocb->ki_filp;
struct inode *inode = filp->f_path.dentry->d_inode;
struct pipe_inode_info *pipe;
ssize_t ret;
int do_wakeup;
struct iovec *iov = (struct iovec *)_iov;
size_t total_len;
ssize_t chars;
//要写入的长度
total_len = iov_length(iov, nr_segs);
/* Null write succeeds. */
if (unlikely(total_len == 0))
  return 0;
do_wakeup = 0;
ret = 0;
mutex_lock(&inode->i_mutex);
pipe = inode->i_pipe;
if (!pipe->readers) {
  //如果没有读进程,就向当前进程发送一个SIGPIPE信号,这就是"Broken pipe(损坏的管道)"消息
  send_sig(SIGPIPE, current, 0);
  ret = -EPIPE;
  goto out;
}
/* We try to merge small writes */
//写的数据的长度不能超过一个页面
chars = total_len & (PAGE_SIZE-1); /* size of the last buffer */
if (pipe->nrbufs && chars != 0) {
  //第一个有效数据的管道号+包含有效数据的管道数目-1=最后一个非空管道缓冲区
  int lastbuf = (pipe->curbuf + pipe->nrbufs - 1) &
       (PIPE_BUFFERS-1);
  struct pipe_buffer *buf = pipe->bufs + lastbuf;
  const struct pipe_buf_operations *ops = buf->ops;
  int offset = buf->offset + buf->len;
  if (ops->can_merge && offset + chars
   
   int error, atomic = 1;
   void *addr;
   error = ops->pin(pipe, buf);
   if (error)
    goto out;
   iov_fault_in_pages_read(iov, chars);
redo1:
   //将数据拷贝到缓冲区中
   addr = ops->map(pipe, buf, atomic);
   error = pipe_iov_copy_from_user(offset + addr, iov,
       chars, atomic);
   ops->unmap(pipe, buf, addr);
   ret = error;
   do_wakeup = 1;
   if (error) {
    if (atomic) {
     atomic = 0;
     goto redo1;
    }
    goto out;
   }
   buf->len += chars;
   total_len -= chars;
   ret = chars;
   if (!total_len)
    goto out;
   //如果还有数据没有写入
  }
}
//向一个空缓冲区写
for (;;) {
  int bufs;
  if (!pipe->readers) {
   send_sig(SIGPIPE, current, 0);
   if (!ret)
    ret = -EPIPE;
   break;
  }
  bufs = pipe->nrbufs;
  if (bufs curbuf + bufs) & (PIPE_BUFFERS-1);
   struct pipe_buffer *buf = pipe->bufs + newbuf;
   struct page *page = pipe->tmp_page;
   char *src;
   int error, atomic = 1;
   if (!page) {
    page = alloc_page(GFP_HIGHUSER);
    if (unlikely(!page)) {
     ret = ret ? : -ENOMEM;
     break;
    }
    pipe->tmp_page = page;
   }
   /* Always wake up, even if the copy fails. Otherwise
    * we lock up (O_NONBLOCK-)readers that sleep due to
    * syscall merging.
    * FIXME! Is this really true?
    */
   do_wakeup = 1;
   chars = PAGE_SIZE;
   //写的数据最大为一个页
   if (chars > total_len)
    chars = total_len;
   iov_fault_in_pages_read(iov, chars);
redo2:
   //永久映射
   if (atomic)
    src = kmap_atomic(page, KM_USER0);
   else
   
    src = kmap(page);
   //将数据拷贝到缓冲区
   error = pipe_iov_copy_from_user(src, iov, chars,
       atomic);
   //释放映射
   if (atomic)
    kunmap_atomic(src, KM_USER0);
   else
    kunmap(page);
   if (unlikely(error)) {
    if (atomic) {
     atomic = 0;
     goto redo2;
    }
    if (!ret)
     ret = error;
    break;
   }
   ret += chars;
   /* Insert it into the buffer array */
   buf->page = page;
   buf->ops = &anon_pipe_buf_ops;
   buf->offset = 0;
   buf->len = chars;
   pipe->nrbufs = ++bufs;
   pipe->tmp_page = NULL;
   total_len -= chars;
   if (!total_len)
    break;
  }
  if (bufs
  //运行到这说明没有空缓冲区了
  if (filp->f_flags & O_NONBLOCK) {
   if (!ret)
    ret = -EAGAIN;
   break;
  }
  if (signal_pending(current)) {
   if (!ret)
    ret = -ERESTARTSYS;
   break;
  }
  if (do_wakeup) {
   wake_up_interruptible_sync(&pipe->wait);
   kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
   do_wakeup = 0;
  }
  //写者等待
  pipe->waiting_writers++;
  pipe_wait(pipe);
  pipe->waiting_writers--;
}
out:
mutex_unlock(&inode->i_mutex);
if (do_wakeup) {
  wake_up_interruptible(&pipe->wait);
  kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
}
if (ret > 0)
  file_update_time(filp);
return ret;
}


本文来自ChinaUnix博客,如果查看原文请点:http://blog.chinaunix.net/u2/79926/showart_1271551.html
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP