- 论坛徽章:
- 0
|
一个线程版本的长连接转发程序,在工作繁忙时会发生在系统tcp协议栈中的堵塞,请大家帮忙找找问题所在。
程序的作用是同server和client间建立一个使用线程转发的通道,从sever发起的交易可以通过通道发送到client然后回来,
反之就是从client发起的通过通道到达server端,但是在交易量比较大的时候会发生在tcp协议栈中堵塞而无法转发的情况
请大家帮忙找找问题。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <dlfcn.h>
#include <errno.h>
#include <fcntl.h>
#include <string.h>
#include <malloc.h>
#include <stdarg.h>
#include <signal.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <netinet/in.h>
#define MAXSLOT 200
char remoteip[20];
long remotept = 0;
int headlen = 0;
struct sock_table{
char rmtip[20];
short sockfd;
}_TAB[MAXSLOT];
void *procTran(void *args)
{
fd_set rset;
int *rcvsockptr = args;
int rcvsock = 0;
int sndsock = 0;
int maxsock = 0;
int rc = 0;
int rlen = 0 ,slen = 0, len = 0;
char headbuf[7];
char rbuf[4097];
char sbuf[4097];
char tbuf[4097];
pthread_detach(pthread_self());
/* get the rcv socket fd */
rcvsock = *rcvsockptr;
free(rcvsockptr);
rcvsockptr = NULL;
/* set snd socket fd */
sndsock = TcpConnectToOther(remoteip, remotept);
if(sndsock <= 0) {
printf("TcpConnectToOther Fail!\n" ;
close(rcvsock);
pthread_exit(NULL);
}
for(;
{
maxsock = ( rcvsock > sndsock ? rcvsock : sndsock);
FD_ZERO(&rset);
FD_SET(rcvsock, &rset);
FD_SET(sndsock, &rset);
/* wait socket to read */
if ( (rc = select(maxsock+1, &rset, NULL, NULL, 0)) < 0) {
printf("select error happend!\n" ;
close(rcvsock);
close(sndsock);
pthread_exit(NULL);
}
/* rcv sock fd read something */
if (FD_ISSET(rcvsock, &rset)) {
/* recv head buffer */
memset(rbuf, 0x00, sizeof(rbuf));
len = 0;
rc = ReadN(rcvsock, rbuf, headlen);
if(rc <= 0 || rc != headlen) {
printf("read from Client error or Clinet disconnect!\n" ;
close(rcvsock);
close(sndsock);
pthread_exit(NULL);
}
len = atoi(rbuf);
/* recv body buffer */
rc = ReadN(rcvsock, rbuf+headlen, len);
if(rc <= 0 || rc != len) {
printf("read from Client error or disconnect!\n" ;
close(rcvsock);
close(sndsock);
pthread_exit(NULL);
}
/* send buffer out to server */
rc = WriteN(sndsock, rbuf, len+headlen);
if(rc <= 0 || rc != len+headlen) {
printf("write To Server fail!\n" ;
close(rcvsock);
close(sndsock);
pthread_exit(NULL);
}
}
/* snd sock fd read something */
if (FD_ISSET(sndsock, &rset)) {
/* recv head buffer */
memset(sbuf, 0x00, sizeof(sbuf));
rc = ReadN(sndsock, sbuf, headlen);
if(rc <= 0 || rc != headlen) {
printf("read from Server error or Server disconnect!\n" ;
close(rcvsock);
close(sndsock);
pthread_exit(NULL);
}
len = atoi(sbuf);
/* recv body buffer */
rc = ReadN(sndsock, sbuf+headlen, len);
if(rc <= 0 || rc != len) {
printf("read from Server error or disconnect!\n" ;
close(rcvsock);
close(sndsock);
pthread_exit(NULL);
}
/* send buffer out to client */
rc = WriteN(rcvsock, sbuf, len+headlen);
if(rc <= 0 || rc != len+headlen) {
printf("write To Client fail!\n" ;
close(rcvsock);
close(sndsock);
pthread_exit(NULL);
}
}
}
pthread_exit(NULL);
}
int main(int argc,char ** argv)
{
int listenfd = 0;
int i = 0;
unsigned int cli_len;
int errno;
int *sockfd = NULL;
void *procTran(void *);
char tmpbuf[100];
struct sockaddr_in cli_addr;
pthread_attr_t attr;
pthread_t tid;
/* check parameter num */
if (argc != 5) {
printf("usage:tranexch lsnport remoteip remoteport headlen\n" ;
return (-1);
}
/* init the rmtip structure */
for(i=0; i<MAXSLOT; i++)
memset(&_TAB, 0x00, sizeof(struct sock_table));
/* set global variable */
memset(remoteip, 0x00, sizeof(remoteip));
strncpy(remoteip, argv[2], sizeof(remoteip));
remotept = atoi(argv[3]);
headlen = atoi(argv[4]);
/* set listen sock */
listenfd = TcpBindSocketPortListen( atoi(argv[1]) );
if ( listenfd < 0 ) {
printf("TcpBindSocketPortListen error\n" );
return(-1);
}
if (pthread_attr_init(&attr) != 0) {
printf( "init thread attr error\n" );
return (-1);
}
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
memset((char *)&cli_addr, 0, sizeof(struct sockaddr_in));
for (;;) {
cli_len = sizeof(cli_addr);
sockfd = malloc(sizeof(int));
if( sockfd == NULL ) {
printf("malloc sockfd error!\n");
break;
}
*sockfd = accept(listenfd, (struct sockaddr *)&cli_addr, &cli_len);
if( *sockfd < 0 ) {
free(sockfd);
printf("accept socketfd error![%s]\n",strerror(errno));
break;
}
memset(tmpbuf, 0x00, sizeof(tmpbuf));
sprintf(tmpbuf, "%s", inet_ntoa(cli_addr.sin_addr));
for(i = 0; i < MAXSLOT; i++) {
if ( strcmp(tmpbuf, _TAB.rmtip) == 0 ) {
close(_TAB.sockfd);
_TAB.sockfd = *sockfd;
break;
}
if ( strlen(_TAB.rmtip) == 0 ) {
memcpy(_TAB.rmtip, tmpbuf, strlen(tmpbuf));
_TAB.sockfd = *sockfd;
break;
}
}
if ( i == MAXSLOT ) {
printf("max slot reached,please check it!\n");
close(*sockfd);
free(sockfd);
continue;
}
errno = pthread_create( &tid, &attr, procTran, (void *)sockfd);
if ( errno ) {
free(sockfd);
printf("pthread_create procTran error!\n");
break;
}
}
pthread_attr_destroy(&attr);
return(0);
} |
|