- 论坛徽章:
- 0
|
本帖最后由 redor 于 2011-06-04 11:47 编辑
目前这个任务调度只负责任务的数据ID或者数据ID打包,不负责存储数据本身,需要单独
的数据存储,任务计算节点需要自行设计数据存储部分,这个程序包会提供一个获取任务
和提交任务的客户端接口libmtask,mtask.h;具体的实现参考btask.c 这个就是一个
benchmark。任务调度服务启动/usr/sbin/qtaskd -d -c /etc/qtaskd.ini
然后通过浏览器打开http://serverIP:2080/ 页面上可以添加任务,这些任务有编号,会显示任务目前的完成情况。
btask使用方法就是对照自己配置好的任务,任务计算节点可以有三种类型:
1. 只提交任务,比如数据源头,这个节点不获取任务(./btask -h IP -p 2066 -m 0 -q 1 -d);
2. 获取任务计算完成以后提交一个完成通知,另外同时提交给下一个任务(./btask -h IP -p 2066 -m 1 -q 2 -d);
3. 结束计算节点,只获取任务完成之后提交完成通知,不需要把任务继续提交给一个任务(./btask -h IP -p 2066 -m 2 -q 0 -d)。
下载:
SRPM打包:http://libibase.googlecode.com/files/srpms-20110603183000.tar.gz
RPM打包: http://libibase.googlecode.com/files/rpms-20110603183000.tar.gz
源码: http://libibase.googlecode.com/files/qmtask-0.0.5.tar.gz
客户端API
- /* set message task */
- int mtask_set(MTASK *mtask, char *ip, int port, int mtaskid, int qtaskid);
- /* connect to qtask
- * -1 mtask is NULL
- * -2 socket() failed
- * -3 connect() failed
- * */
- int mtask_connect(MTASK *mtask);
- /* get new task count
- * -1 mtask is NULL
- * -2 mtask->fd <= 0 and mtask_connect failed
- * -3 write() task header failed
- * -4 write() task list[] failed
- * -5 read task header failed
- * -6 malloc failed
- * -7 read task list[] failed
- * ret >= 0 mhead.packetid
- * */
- int mtask_commit(MTASK *mtask, int flag, char *packet, int packet_len);
- /* push packet
- * return value
- * -1 mtask is NULL
- * -2 connection is bad
- * -3 Invalid packet data
- * -4 write() header failed
- * -5 write() packet failed
- * -6 recv() header failed
- * */
- int mtask_push(MTASK *mtask, int flag, char *packet, int packet_len);
- /* pop packet
- * return value
- * -1 mtask is NULL
- * -2 connection is bad
- * -3 Invalid commitid
- * -4 write() header failed
- * -5 recv() header failed
- * -6 malloc() for packet failed
- * -7 recv() packet failed
- */
- int mtask_pop(MTASK *mtask);
- /* over packet
- * return value
- * -1 mtask is NULL
- * -2 connection is bad
- * -3 Invalid packetid and commitid
- * -4 write() header failed
- * -5 recv() header failed
- * */
- int mtask_finish(MTASK *mtask, int flag);
- /* close message task */
- void mtask_close(MTASK *mtask);
复制代码 写了一个调用的例子,不包括数据存取部分,演示了ID包的任务提交, 任务获取, 任务完成, 还在弄具体的数据计算系统,弄完给大家分享。
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <unistd.h>
- #include <errno.h>
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <arpa/inet.h>
- #include <sys/types.h>
- #include "mtask.h"
- int main(int argc, char **argv)
- {
- int i = 0, x = 0, mid = 0, qid = 0, port = 0, flag = 0,
- isdaemon = 0, isout = 0, len = 0, packetid = 0;
- char *ip = NULL, *packet = NULL, block[MTASK_PACKET_MAX * sizeof(int64_t)], ch = 0;
- int64_t old = 0, *list = NULL;
- MTASK mtask = {0};
- pid_t pid = 0;
- /* get configure file */
- while((ch = getopt(argc, argv, "h:p:m:q:d")) != -1)
- {
- switch(ch)
- {
- case 'h':
- ip = optarg;
- break;
- case 'p':
- port = atoi(optarg);
- break;
- case 'm':
- mid = atoi(optarg);
- break;
- case 'q':
- qid = atoi(optarg);
- break;
- case 'o':
- isout = 1;
- break;
- case 'd':
- isdaemon = 1;
- break;
- default:
- break;
- }
- }
- if(ip == NULL || port <= 0 || (qid == 0 && mid == 0))
- {
- fprintf(stderr, "Usage:%s -h host -p port -m commitid -q queueid -o output -d working as daemon\n", argv[0]);
- _exit(-1);
- }
- /* daemon */
- if(isdaemon)
- {
- pid = fork();
- switch (pid) {
- case -1:
- perror("fork()");
- exit(EXIT_FAILURE);
- break;
- case 0: /* child process */
- if(setsid() == -1)
- exit(EXIT_FAILURE);
- break;
- default:/* parent */
- _exit(EXIT_SUCCESS);
- break;
- }
- }
- if(mtask_set(&mtask, ip, port, mid, qid) == 0
- && mtask_connect(&mtask) == 0)
- {
- list = (int64_t *)block;
- if(mid <= 0 && qid > 0)
- {
- do
- {
- old = random();
- flag = 0;
- if((old%33) == 0) flag = MTASK_TO_QHEAD;
- x = 0;
- while(x < MTASK_PACKET_MAX)
- {
- list[x++] = (int64_t)random();
- }
- if((packetid = mtask_push(&mtask, flag, block,
- sizeof(int64_t) * MTASK_PACKET_MAX)) >= 0)
- {
- if(isout)fprintf(stdout, "1:{%d:{packetid:%d}}\n", i, packetid);
- ++i;
- }
- else
- {
- sleep(1);
- }
- }while(1);
- }
- else
- {
- /*
- packetid = 0;
- do
- {
- old = random();
- flag = 0;
- if((old%33) == 0) flag = MTASK_TO_QHEAD;
- if((packetid = mtask_commit(&mtask, flag, NULL, 0)) > 0)
- {
- packet = mtask.packet;
- len = mtask.length;
- if(isout)fprintf(stdout, "2:{%d:{packetid:%d length:%d}}\n", i, packetid, len);
- ++i;
- }
- else
- {
- packet = NULL;
- len = 0;
- sleep(1);
- }
- }while(1);
- */
- packetid = 0;
- do
- {
- old = random();
- flag = 0;
- if((old%33) == 0) flag = MTASK_TO_QHEAD;
- if((packetid = mtask_pop(&mtask)) > 0)
- {
- packet = mtask.packet;
- len = mtask.length;
- if(isout)fprintf(stdout, "2:{%d:{packetid:%d length:%d}}\n", i, packetid, len);
- ++i;
- mtask_finish(&mtask, flag);
- }
- else
- {
- packet = NULL;
- len = 0;
- sleep(1);
- }
- }while(1);
- }
- mtask_close(&mtask);
- }
- return 0;
- }
复制代码 |
|