免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 5600 | 回复: 4
打印 上一主题 下一主题

[原创]发布一个分布式计算框架雏形 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2011-06-02 13:20 |只看该作者 |倒序浏览
本帖最后由 redor 于 2011-06-04 11:47 编辑

目前这个任务调度只负责任务的数据ID或者数据ID打包,不负责存储数据本身,需要单独
的数据存储,任务计算节点需要自行设计数据存储部分,这个程序包会提供一个获取任务
和提交任务的客户端接口libmtask,mtask.h;具体的实现参考btask.c 这个就是一个
benchmark。任务调度服务启动/usr/sbin/qtaskd -d -c /etc/qtaskd.ini
然后通过浏览器打开http://serverIP:2080/ 页面上可以添加任务,这些任务有编号,会显示任务目前的完成情况。
btask使用方法就是对照自己配置好的任务,任务计算节点可以有三种类型:
1. 只提交任务,比如数据源头,这个节点不获取任务(./btask -h IP -p 2066 -m 0 -q 1 -d);
2. 获取任务计算完成以后提交一个完成通知,另外同时提交给下一个任务(./btask -h IP -p 2066 -m 1 -q 2 -d);
3. 结束计算节点,只获取任务完成之后提交完成通知,不需要把任务继续提交给一个任务(./btask -h IP -p 2066 -m 2 -q 0 -d)。 



下载:
SRPM打包:http://libibase.googlecode.com/files/srpms-20110603183000.tar.gz
RPM打包: http://libibase.googlecode.com/files/rpms-20110603183000.tar.gz
源码: http://libibase.googlecode.com/files/qmtask-0.0.5.tar.gz


客户端API

  1. /* set message task */
  2. int mtask_set(MTASK *mtask, char *ip, int port, int mtaskid, int qtaskid);
  3. /* connect to qtask
  4. * -1 mtask is NULL
  5. * -2 socket() failed
  6. * -3 connect() failed
  7. * */
  8. int mtask_connect(MTASK *mtask);

  9. /* get new task count
  10. * -1 mtask is NULL
  11. * -2 mtask->fd <= 0 and mtask_connect failed
  12. * -3 write() task header failed
  13. * -4 write() task list[] failed
  14. * -5 read task header failed
  15. * -6 malloc failed
  16. * -7 read task list[] failed
  17. * ret >= 0 mhead.packetid
  18. * */
  19. int mtask_commit(MTASK *mtask, int flag, char *packet, int packet_len);

  20. /* push packet
  21. * return value
  22. * -1 mtask is NULL
  23. * -2 connection is bad
  24. * -3 Invalid packet data
  25. * -4 write() header failed
  26. * -5 write() packet failed
  27. * -6 recv() header failed
  28. * */
  29. int mtask_push(MTASK *mtask, int flag, char *packet, int packet_len);

  30. /* pop packet
  31. * return value
  32. * -1 mtask is NULL
  33. * -2 connection is bad
  34. * -3 Invalid commitid
  35. * -4 write() header failed
  36. * -5 recv() header failed
  37. * -6 malloc() for packet failed
  38. * -7 recv() packet failed
  39. */
  40. int mtask_pop(MTASK *mtask);

  41. /* over packet
  42. * return value
  43. * -1 mtask is NULL
  44. * -2 connection is bad
  45. * -3 Invalid packetid and commitid
  46. * -4 write() header failed
  47. * -5 recv() header failed
  48. * */
  49. int mtask_finish(MTASK *mtask, int flag);

  50. /* close message task */
  51. void mtask_close(MTASK *mtask);

复制代码
写了一个调用的例子,不包括数据存取部分,演示了ID包的任务提交, 任务获取, 任务完成, 还在弄具体的数据计算系统,弄完给大家分享。

  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <string.h>
  4. #include <unistd.h>
  5. #include <errno.h>
  6. #include <sys/socket.h>
  7. #include <netinet/in.h>
  8. #include <arpa/inet.h>
  9. #include <sys/types.h>
  10. #include "mtask.h"
  11. int main(int argc, char **argv)
  12. {
  13.     int i = 0, x = 0, mid = 0, qid = 0, port = 0, flag = 0,
  14.         isdaemon = 0, isout = 0, len = 0, packetid = 0;
  15.     char *ip = NULL, *packet = NULL, block[MTASK_PACKET_MAX * sizeof(int64_t)], ch = 0;
  16.     int64_t old = 0, *list = NULL;
  17.     MTASK mtask = {0};
  18.     pid_t pid = 0;

  19.     /* get configure file */
  20.     while((ch = getopt(argc, argv, "h:p:m:q:d")) != -1)
  21.     {
  22.         switch(ch)
  23.         {
  24.             case 'h':
  25.                 ip = optarg;
  26.                 break;
  27.             case 'p':
  28.                 port = atoi(optarg);
  29.                 break;
  30.             case 'm':
  31.                 mid = atoi(optarg);
  32.                 break;
  33.             case 'q':
  34.                 qid = atoi(optarg);
  35.                 break;
  36.             case 'o':
  37.                 isout = 1;
  38.                 break;
  39.             case 'd':
  40.                 isdaemon = 1;
  41.                 break;
  42.             default:
  43.                 break;
  44.             }
  45.     }
  46.     if(ip == NULL || port <= 0 || (qid == 0 && mid == 0))
  47.     {
  48.         fprintf(stderr, "Usage:%s -h host -p port -m commitid -q queueid -o output -d working as daemon\n", argv[0]);
  49.         _exit(-1);
  50.     }
  51.     /* daemon */
  52.     if(isdaemon)
  53.     {
  54.         pid = fork();
  55.         switch (pid) {
  56.             case -1:
  57.                 perror("fork()");
  58.                 exit(EXIT_FAILURE);
  59.                 break;
  60.             case 0: /* child process */
  61.                 if(setsid() == -1)
  62.                     exit(EXIT_FAILURE);
  63.                 break;
  64.             default:/* parent */
  65.                 _exit(EXIT_SUCCESS);
  66.                 break;
  67.         }
  68.     }
  69.     if(mtask_set(&mtask, ip, port, mid, qid) == 0
  70.             && mtask_connect(&mtask) == 0)
  71.     {
  72.         list = (int64_t *)block;
  73.         if(mid <= 0 && qid > 0)
  74.         {
  75.             do
  76.             {
  77.                 old = random();
  78.                 flag = 0;
  79.                 if((old%33) == 0) flag = MTASK_TO_QHEAD;
  80.                 x = 0;
  81.                 while(x < MTASK_PACKET_MAX)
  82.                 {
  83.                     list[x++] = (int64_t)random();
  84.                 }
  85.                 if((packetid = mtask_push(&mtask, flag, block,
  86.                                 sizeof(int64_t) * MTASK_PACKET_MAX)) >= 0)
  87.                 {
  88.                     if(isout)fprintf(stdout, "1:{%d:{packetid:%d}}\n", i, packetid);
  89.                     ++i;
  90.                 }
  91.                 else
  92.                 {
  93.                     sleep(1);
  94.                 }
  95.             }while(1);
  96.         }
  97.         else
  98.         {
  99.             /*
  100.             packetid = 0;
  101.             do
  102.             {
  103.                 old = random();
  104.                 flag = 0;
  105.                 if((old%33) == 0) flag = MTASK_TO_QHEAD;
  106.                 if((packetid = mtask_commit(&mtask, flag, NULL, 0)) > 0)
  107.                 {
  108.                     packet = mtask.packet;
  109.                     len = mtask.length;
  110.                     if(isout)fprintf(stdout, "2:{%d:{packetid:%d length:%d}}\n", i, packetid, len);
  111.                     ++i;
  112.                 }
  113.                 else
  114.                 {
  115.                     packet = NULL;
  116.                     len = 0;
  117.                     sleep(1);
  118.                 }
  119.             }while(1);
  120.             */
  121.             packetid = 0;
  122.             do
  123.             {
  124.                 old = random();
  125.                 flag = 0;
  126.                 if((old%33) == 0) flag = MTASK_TO_QHEAD;
  127.                 if((packetid = mtask_pop(&mtask)) > 0)
  128.                 {
  129.                     packet = mtask.packet;
  130.                     len = mtask.length;
  131.                     if(isout)fprintf(stdout, "2:{%d:{packetid:%d length:%d}}\n", i, packetid, len);
  132.                     ++i;
  133.                     mtask_finish(&mtask, flag);
  134.                 }
  135.                 else
  136.                 {
  137.                     packet = NULL;
  138.                     len = 0;
  139.                     sleep(1);
  140.                 }
  141.             }while(1);
  142.         }
  143.         mtask_close(&mtask);
  144.     }
  145.     return 0;
  146. }
复制代码

论坛徽章:
324
射手座
日期:2013-08-23 12:04:38射手座
日期:2013-08-23 16:18:12未羊
日期:2013-08-30 14:33:15水瓶座
日期:2013-09-02 16:44:31摩羯座
日期:2013-09-25 09:33:52双子座
日期:2013-09-26 12:21:10金牛座
日期:2013-10-14 09:08:49申猴
日期:2013-10-16 13:09:43子鼠
日期:2013-10-17 23:23:19射手座
日期:2013-10-18 13:00:27金牛座
日期:2013-10-18 15:47:57午马
日期:2013-10-18 21:43:38
2 [报告]
发表于 2011-06-02 14:12 |只看该作者
看不太明白{:3_184:}

论坛徽章:
0
3 [报告]
发表于 2011-06-02 14:49 |只看该作者
回复 2# hellioncu


    就是支持N个节点同时计算的一个任务调度服务,看截图。

论坛徽章:
0
4 [报告]
发表于 2011-06-03 12:53 |只看该作者
如何编写任务  发布任务呢

论坛徽章:
0
5 [报告]
发表于 2011-06-04 11:35 |只看该作者
回复 4# newmax123


    任务部分属于业务层的东西, 调度中心只负责存储数据的ID,具体的数据存储需要使用独立的存储服务,我正在弄一个基于mongodb的计算系统,弄完放出来给大家看。
任务的获取和任务的finish可以直接调用libmtask mtask.h里的函数 btask.c里有例子。
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP