- 论坛徽章:
- 0
|
给一个多线程的例子,默认是长连接的,可以改为短连接:
void * MultiThreadTest( void *pParam )
{
int MessageSum=0;
memcpy(&MessageSum, pParam,4);
int nStartTime=time(NULL);
int nReqStartTime=0,nReqEndTime=0,nTimeout=0;
TrackerServerInfo theTrackerServer;
memset(&theTrackerServer,0,sizeof(theTrackerServer));
TrackerServerInfo *pTrackerServer = &theTrackerServer;
int result=0;
TrackerServerInfo storageServer;
char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
char remote_filename[256];
char buff[32];
char token[32 + 1];
char file_id[128];
char file_url[256];
int len;
int url_len;
time_t ts;
char *file_buff;
int64_t file_size;
char *meta_buff;
int store_path_index;
struct base64_context context;
nReqStartTime=time(NULL);
base64_init_ex(&context, 0, '-', '_', '.');//为了最后解析出文件大小需要初始化这个变量
int i=0;//发送数目
//处理在线请求时可以改成while(Running)
while(i<MessageSum)
{
//pTrackerServer = tracker_get_connection();
tracker_get_connection_r(pTrackerServer); //多线程安全函数
if (pTrackerServer->sock <= 0)
{
sleep(1);
continue;
}
store_path_index = 0;
if ((result=tracker_query_storage_store(pTrackerServer, \
&storageServer, &store_path_index)) != 0)
{
//查询tracker获取storage失败,重连tracker
printf("tracker_query_storage fail, " \
"error no: %d, error info: %s\n", \
result, strerror(result));
fdfs_quit(pTrackerServer);
tracker_disconnect_server(pTrackerServer);
sleep(1);
continue;
}
printf("group_name=%s, ip_addr=%s, port=%d\n", \
storageServer.group_name, \
storageServer.ip_addr, \
storageServer.port);
if ((result=tracker_connect_server(&storageServer)) != 0)
{
//tracker给了一个连不上的storage,重连tracker
fdfs_quit(pTrackerServer);
tracker_disconnect_server(pTrackerServer);
sleep(1);
continue;
}
//1下面保持tracker和storage的各1个长连接,本线程固定跟一个storage交互,出错后才返回重连进行容灾
//2实际中如果是大文件没必要保持长连接,则下面这个for循环去掉成顺序执行,出错时continue回while循环去重连tracker和storage
for(;i<MessageSum;i++)
{
//以上传为例,上传目前有3类,根据应用自己选一类即可,参见版本自带的fdfs_test.c
//if (strcmp(operation, "upload") == 0)
{
strcpy(group_name, "");
result = storage_upload_by_filename(pTrackerServer, \
&storageServer, store_path_index, \
local_filename, NULL, \
NULL, 0, \
group_name, remote_filename);
//下面的出错或成功处理,跟上述上传种类无关,通用的
if (result != 0)
{
printf("storage_upload_by_filename fail, " \
"error no: %d, error info: %s\n", \
result, strerror(result));
fdfs_quit(&storageServer);
tracker_disconnect_server(&storageServer);
fdfs_quit(pTrackerServer);
tracker_disconnect_server(pTrackerServer);
//上传次数的统计,测试用
pthread_mutex_lock(&theSumMutex);
nTradeFailed ++;
nSum++;
pthread_mutex_unlock(&theSumMutex);
break;
}
sprintf(file_id, "%s/%s", group_name, remote_filename);
url_len = sprintf(file_url, "http://%s:%d/%s", \
pTrackerServer->ip_addr, \
g_tracker_server_http_port, file_id);
/*
if (g_anti_steal_token)
{
ts = time(NULL);
fdfs_http_gen_token(&g_anti_steal_secret_key, file_id, \
ts, token);
sprintf(file_url + url_len, "?token=%s&ts=%d", \
token, (int)ts);
}*/
memset(buff, 0, sizeof(buff));
base64_decode_auto(&context, remote_filename + FDFS_FILE_PATH_LEN, \
strlen(remote_filename) - FDFS_FILE_PATH_LEN \
- (FDFS_FILE_EXT_NAME_MAX_LEN + 1), buff, &len);
printf("group_name=%s, remote_filename=%s\n", \
group_name, remote_filename);
printf("file timestamp=%d\n", buff2int(buff+sizeof(int)));
printf("file size="INT64_PRINTF_FORMAT"\n", \
buff2long(buff+sizeof(int)*2));
printf("file url: %s\n", file_url);
//上传次数的统计,测试用
pthread_mutex_lock(&theSumMutex);
nTradeOK ++;
nSum++;
pthread_mutex_unlock(&theSumMutex);
}
}
fdfs_quit(&storageServer);
tracker_disconnect_server(&storageServer);
fdfs_quit(pTrackerServer);
tracker_disconnect_server(pTrackerServer);
}
//上传次数的统计,测试用
int nEndTime=time(NULL);
pthread_mutex_lock(&theSumMutex);
OverThread ++;
nTimeAllUsed += (nEndTime - nStartTime);
pthread_mutex_unlock(&theSumMutex);
return NULL;
} |
|