VIP_fuck 发表于 2016-06-01 21:44

用 epoll 监听 socket的时候,还有必要让 socket 非阻塞吗?

我感觉没有必要,比如有读事件发生,说明已经有数据可读了,这时候 socket 阻塞和非阻塞貌似没有什么区别。

大牛指教。

yulihua49 发表于 2016-06-01 22:21

本帖最后由 yulihua49 于 2016-06-01 22:45 编辑

一般不需要。特殊情况需要。
特殊情况是:
少量线程处理大量连接。其中一个或几个连接正在传输大包数据。恰巧使用了低速,或拥塞网络。比如在车上使用2G或3G网络传大数据到地面数据中心。
阻塞的话,就会长时间占用一个线程,导致影响了别的用户的作业,哪怕人家只传一个小包。
这个场景需要使用NIO(NON block IO)。
但是NIO非常不方便,通过重重回调才能完成一个包,把业务逻辑切的很碎。
这时可能需要协程技术,上层对下层发一个看起来是同步阻塞的函数调用,实现一个完整的包IO。但是在底层,IO暂停期间释放线程,让他为别人服务。等IO具备条件了再有线程继续你的工作(其间不时的挂起(yield),解挂( resume))直至完成返回。函数调用时跟返回时未必是同一个线程。

特别是,如果一个线程处理大量连接,则必须要使用协程技术。

VIP_fuck 发表于 2016-06-01 22:43

回复 2# yulihua49


    非常感谢

   

VIP_fuck 发表于 2016-06-01 22:49

回复 2# yulihua49


    不过还是会有一个疑问:

    我的理解:如果 epoll 返回一个 socket 的 EPOLLIN 事件,那么说明数据已经到了接收端的缓冲区了,这时候读数据是合情合理,不应该算作影响其他作业。本次读取完毕之后,释放线程,继续调度。所以阻塞非阻塞是不影响的。


    不知道这么理解有什么问题,大牛指教。

VIP_fuck 发表于 2016-06-01 23:04

假设一个场景:一个大型的文件,比如几个 G,发送端把文件拆分成块,每块 32M,多线程发送。

接收端接收数据时,如果是非阻塞,那么可能收到假如 100k 数据之后,这个连接上因为网络或者其他原因暂时没有后续数据收到,这时候读的接口就会返回 0 或者非阻塞的 errno,然后跳出线程,线程继续参与调度。

我的问题是:如果这个连接上后续的数据来了,重新开始接收后续数据,那怎么和之前的 100k 对应上呢?

如果是阻塞模式,就不会有这个问题。

VIP_fuck 发表于 2016-06-01 23:07

哦,,,,也可以记录每个连接上读取的是第几块数据块,读取了多少数据,这样的话,每个线程都不断片儿了。

这么看的话,还是非阻塞要高效些。

yulihua49 发表于 2016-06-03 10:19

本帖最后由 yulihua49 于 2016-06-03 10:48 编辑

VIP_fuck 发表于 2016-06-01 22:49 static/image/common/back.gif
回复 2# yulihua49



如果传一个大包,只来了一部分数据,就会激活EPOLLIN,但是你真的同步接收,还是要等待全部收完。
下边是一个通用的同步/NIO两用的收发接口,平时就是同步阻塞的。在epoll激活时调用之。稍加修改你就可以使用:
typedef int (*T_YIELD)(int socket,int rwflg,int timeout);
#define TIMEOUTERR -11
/**********************************************
* @(#) SDBC 7.1 TCP RECV/SENDTools                      *
* to suport coroutine
**********************************************/
#include <sys/socket.h>
#include <sys/time.h>
#include <unistd.h>
#include <fcntl.h>
#include <sc.h>

#ifndef MIN
#define MIN(a,b) ((a)<(b))?(a):(b)
#endif

static T_YIELD yield=NULL;

T_YIELD get_yield()
{
      return yield;
}

T_YIELD set_yield(T_YIELD new_yield)
{
T_YIELD oldyield=yield;
      yield=new_yield;
      return oldyield;
}

//timeout for second
int RecvNet(int socket,char *buf,int n,int timeout)
{
int bcount=0,br,ret;
int i,repeat=0;
int fflag=-1;

      if(socket<0) return SYSERR;
      if(!buf && n<0) return 0;
      if(yield) {
                fflag=fcntl(socket,F_GETFL,0);
                if(fflag!=-1) fcntl(socket,F_SETFL,fflag|O_ASYNC|O_NONBLOCK); //异步操作
      } else if(timeout>0) {
                struct timeval tmout;
                tmout.tv_sec=timeout;
                tmout.tv_usec=0;
                ret=setsockopt(socket,SOL_SOCKET,SO_RCVTIMEO,(char *)&tmout,sizeof(tmout));
                if(ret) ShowLog(1,"%s:setsockopt errno=%d,%s",__FUNCTION__,errno,strerror(errno));
      }

      *buf=0;
      br=0;

      while(bcount<n){
                if((br=read(socket,buf,n-bcount))>0){
                        bcount+=br;
                        buf+=br;
                        repeat=0;
                        continue;
                }
                if(fflag==-1 && errno==EAGAIN) return TIMEOUTERR;
                if(br<=0 && errno && errno != EAGAIN){
                  if(errno!=ECONNRESET)
                        ShowLog(1,"%s:br=%d,err=%d,%s",__FUNCTION__,br,errno,strerror(errno));
                  break;
                }
                if(bcount < n && fflag!=-1) { //切换任务
                        if(repeat++>3) return -errno;
ShowLog(5,"%s:tid=%lX,socket=%d,yield to schedle bcount=%d/%d",__FUNCTION__,pthread_self(),socket,bcount,n);
                        i=yield(socket,0,timeout);
                        if(i<0) {
                        if(timeout>0) {
                              struct timeval tmout;
                              tmout.tv_sec=timeout;
                              tmout.tv_usec=0;
                              ret=setsockopt(socket,SOL_SOCKET,SO_RCVTIMEO,(char *)&tmout,sizeof(tmout));
                        }
                              fcntl(socket,F_SETFL,fflag);
                              fflag=-1;
                              if(i==TIMEOUTERR) return i;
                        }
                }
      }
      if(fflag!=-1) fcntl(socket,F_SETFL,fflag);
      return bcount==0?-1:bcount;
}

int SendNet(int socket,char *buf,int n,int MTU)
{
int bcount,bw;
int sz,i=0;
int fflag=-1;
size_t SendSize;

      if(socket<0) return SYSERR;
      if(yield) {
                fflag=fcntl(socket,F_GETFL,0);
                if(fflag != -1) fcntl(socket,F_SETFL,fflag|O_NONBLOCK); //异步操作
      }
      bcount=0;
      bw=0;
      if(MTU>500) SendSize=MTU;
      else SendSize=n;
      while(bcount<n){
                sz=MIN(n-bcount,SendSize);
                if((bw=write(socket,buf,sz))>0){
                        bcount+=bw;
                        buf+=bw;
                }
                if(bw<0&&errno!=EAGAIN) {
                        ShowLog(1,"%s:err=%d,%s",__FUNCTION__,errno,strerror(errno));
                        break;
                }
                if(bw < sz && fflag != -1) { //切换任务
//ShowLog(5,"%s:tid=%lX,socket=%d,yield bw=%d/%d",__FUNCTION__,pthread_self(),socket,bw,sz);
                  i=yield(socket,1,0);
                  if(i<0) {
                        fcntl(socket,F_SETFL,fflag);
                        fflag = -1;
                  }
                }
      }
      if(fflag != -1)fcntl(socket,F_SETFL,fflag);
      return bcount==0?-1:bcount;
}

VIP_fuck 发表于 2016-06-03 10:23

回复 7# yulihua49


    曾经项目里边确实就是同步收,不过 cpu 跑满了,当时还被组长强制科普什么是非阻塞。。。

    我觉得要是不同步收的话,用 6 楼说的办法,应该也行。找时间尝试一下。

yulihua49 发表于 2016-06-03 10:29

本帖最后由 yulihua49 于 2016-06-03 10:35 编辑

VIP_fuck 发表于 2016-06-01 23:07 static/image/common/back.gif
哦,,,,也可以记录每个连接上读取的是第几块数据块,读取了多少数据,这样的话,每个线程都不断片儿了。 ...
用法是这样的,缺省情况,yield函数是NULL,这个就是同步阻塞的,项目一开始先实现简单的功能,如果将来需要异步,写一个协程接口,set到yield,就自动成了异步接口。

typedef int (*T_YIELD)(int socket,int rwflg,int timeout);

可以看出,yield接口非常简单,只要求你的socket,rwflag,0=r,1=w。timeout,in second。返回值0,已经成功resume,其他失败。超时-11。
与协程的实现方法无关,不管你采用什么方法实现按这个接口走就行了。

yulihua49 发表于 2016-06-03 10:39

VIP_fuck 发表于 2016-06-01 23:04 static/image/common/back.gif
假设一个场景:一个大型的文件,比如几个 G,发送端把文件拆分成块,每块 32M,多线程发送。

接收端接收 ...
这就需要contex了,保持一个任务的上下文。在协程(coroutine)里,上下文就是在线程栈里,协程自动保存。
页: [1] 2
查看完整版本: 用 epoll 监听 socket的时候,还有必要让 socket 非阻塞吗?