免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 2208 | 回复: 6
打印 上一主题 下一主题

perl多线程&DBI【解决】 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2013-05-17 16:41 |只看该作者 |倒序浏览
本帖最后由 mousimin 于 2013-05-20 20:33 编辑

#经测试确实是threads模块的问题,已经换多进程实现,并将普通方法、多进程一块附上,请大家多指教#
我有一个文件,大约10927行,想先将文件split为11个文件(1000行一个文件),然后用11个线程去解析这11个文件,每个线程去逐行解析文件,去数据库查询相应的信息,最后写入到另外11个文件中。
但是每个线程只能解析一行数据就退出线程了,麻烦大侠们看下!代码和打印日志如下:
  1. #!/bin/env perl
  2. use strict;
  3. use DBI;
  4. use DBIs;
  5. use Env qw(HOME);
  6. use TimeOpt;
  7. use FileHandle;
  8. use threads;
  9. use threads::shared;

  10. #####################################################
  11. #初始化打印日志句柄
  12. #####################################################
  13. my $dsn_config;
  14. my $file_time = TimeOpt::CFtime("YYYYMMDDhhmmss");
  15. my $log_file = "$HOME/trace/CIRCUIT_ADD/CIRCUIT_ADD_${file_time}.log";
  16. system("mkdir -p $HOME/trace/CIRCUIT_ADD");
  17. my $fh = new FileHandle(">>$log_file");

  18. #####################################################
  19. #读取.mapi.ini配置文件
  20. #####################################################
  21. my $mapi = DBIs::Read_MapIniFile();
  22. #print Dumper $mapi;
  23. $dsn_config->{'COOKDB'}->{'db_type'} = $mapi->{'COOKDB'}->{'CONN_DBD'};
  24. $dsn_config->{'COOKDB'}->{'db_dsn'} = $mapi->{'COOKDB'}->{'CONN_DBN'};
  25. $dsn_config->{'COOKDB'}->{'db_user'} = $mapi->{'COOKDB'}->{'CONN_UID'};
  26. $dsn_config->{'COOKDB'}->{'db_pwd'} = $mapi->{'COOKDB'}->{'CONN_PWD'};
  27. $dsn_config->{'COOKDB'}->{'db_driver'} = $mapi->{'COOKDB'}->{'CONN_DBD'};

  28. system("split -l 1000 circuit.unl split");
  29. my @files:shared;
  30. my @all_threads;
  31. @files = `ls split*`;
  32. my $num = @files;
  33. for(my $i=0;$i<11;$i++)
  34. {
  35.         my $t = threads->new(\&parser);
  36.         print "there are $#files+1 files and now make the $i+1 st thread $t...\n";
  37.         push @all_threads,$t;
  38. }
  39. foreach my $thread(@all_threads)
  40. {
  41.         if($thread->is_joinable())
  42.         {
  43.                 print "join thread $thread...\n";
  44.                 $thread->join();
  45.         }
  46. }
  47. exit 0;

  48. sub test
  49. {
  50.         my $file =shift @files;
  51.         print "$file\n";
  52. }
  53. sub parser
  54. {
  55.         my ($dbh_kpidb,$db_msg,$data_hash,$no,$file);
  56.         my $file =shift @files;
  57.         my $rel = connect_db(\$dbh_kpidb,$dsn_config->{'COOKDB'},\$db_msg);
  58.         $file =~ s/\n$//;
  59.         print ("$file---$db_msg\n");
  60.         my $r_hd = new FileHandle("</opt/BOCO.DAL/ZHJKNRM/personal/musimin/circuit/$file");
  61.         my $w_hd = new FileHandle(">/opt/BOCO.DAL/ZHJKNRM/personal/musimin/circuit/out_$file");
  62.         while(my $row = <$r_hd>)
  63.         {
  64.                 my (@sql,$trace,$a_node_name,$z_node_name);
  65.                 my @row = split ("\t",$row);
  66.                 $no += 1;
  67.                 if($row[6] =~ /(.*)\/.*/)
  68.                 {
  69.                         $a_node_name = $1
  70.                 }else
  71.                 {
  72.                         $a_node_name ='11111111111';
  73.                 }
  74.                 if($row[7] =~ /(.*)\/.*/)
  75.                 {
  76.                         $z_node_name = $1
  77.                 }else
  78.                 {
  79.                         $z_node_name ='11111111111';
  80.                 }
  81.                 $sql[0] = "select max(int_id),max(userlabel) from objects_trans where object_class in (2001,2008) and userlabel like '%$a_node_name%'";
  82.                 $sql[1] = "select max(int_id),max(userlabel) from objects_trans where object_class in (2001,2008) and userlabel like '%$z_node_name%'";
  83.                 if($row[8] =~ /^(\S+)\s*GSM900/)
  84.                 {
  85.                         $sql[2] = "select max(int_id),max(userlabel) from objects where object_class = 201 and userlabel like 'G-%$1%'";
  86.                 }elsif($row[8] =~ /^(\S+)\s*GSM1800/)
  87.                 {
  88.                         $sql[2] = "select max(int_id),max(userlabel) from objects where object_class = 201 and userlabel like 'D-%$1%'";
  89.                 }elsif($row[8] =~ /^(\S+)\s*WCDMA/)
  90.                 {
  91.                         $sql[2] = "select max(int_id),max(userlabel) from objects where object_class = 9201 and userlabel like '%$1%'";
  92.                 }else
  93.                 {
  94.                         $sql[2] = "select max(int_id),max(userlabel) from objects where object_class = 9201 and userlabel like '%$row[8]%'";
  95.                 }
  96.                 my $n = 0;
  97.                 foreach my $sql(@sql)
  98.                 {
  99.                         $n += 1;
  100.                         my $sth_check = exec_sql(\$dbh_kpidb,$sql,\$db_msg);
  101.                         print ("$file---$db_msg\n");
  102.                         my @rows = $sth_check->fetchrow_array;
  103.                         if(defined $trace)
  104.                         {
  105.                                 $trace .= "\t$rows[0]\t$rows[1]";
  106.                         }else
  107.                         {
  108.                                 $trace = "$rows[0]\t$rows[1]";
  109.                         }

  110.                 }
  111.                 print $w_hd "$no\t$trace\n";
  112.                 if($no =~ /00$/)
  113.                 {
  114.                         print "No. $no line is writing into circuit_add.unl\n";
  115.                 }
  116.                 #next;
  117.         }
  118.         $r_hd->close;
  119.         $w_hd->close;
  120.         disconnect_db(\$dbh_kpidb,'nrmdb',\$db_msg);
  121.         print ("$file---$db_msg\n");
  122. }

  123. sub connect_db{
  124.         my $subdbh=shift;
  125.         my $config=shift;
  126.         my $err_msg = shift;
  127.         my ($sub_dbd,$sub_dbn,$sub_uid,$sub_pwd,$odbc_dsn);
  128.         my $db_driver;

  129.         $sub_dbd=$config->{'db_type'};
  130.         $sub_dbn=$config->{'db_dsn'};
  131.         $sub_uid=$config->{'db_user'};
  132.         $sub_pwd=$config->{'db_pwd'};
  133.         $odbc_dsn=$config->{'odbc_dsn'};
  134.         $db_driver=$config->{'db_driver'};
  135.         if(!$sub_dbd or !$sub_dbn){
  136.                 print  ( "no enough info about db_type and db_dsn connect_db \n");
  137.                 return -1;
  138.         }
  139.         my $col_dsn;
  140.         if($odbc_dsn ne ''){
  141.                 $col_dsn=$odbc_dsn;       
  142.         }
  143.         else{
  144.                 $col_dsn="dbi:$sub_dbd:$sub_dbn";       
  145.         }
  146.         if($subdbh=DBI->connect($col_dsn,$sub_uid,$sub_pwd,{RaiseError => 0,PrintError => 0, AutoCommit => 1 })){
  147.                 $err_msg = "Connect Database $col_dsn success\n";
  148.         }
  149.         else{
  150.                 my $i = 0;
  151.                 while($i < 3)
  152.                 {
  153.                         $i++;
  154.                         if($subdbh=DBI->connect($col_dsn,$sub_uid,$sub_pwd,{RaiseError => 0,PrintError => 0, AutoCommit => 1 })){
  155.                                 last;
  156.                         }
  157.                         else
  158.                         {
  159.                                 print  ( "Err:Connect Database $col_dsn $sub_uid/*** for $i times \n\tErr Info:$DBI::errstr\n\n");
  160.                                 print  ( "Try once more after 5 second...\n");
  161.                                 sleep(5);
  162.                         }
  163.                 }
  164.                 if($subdbh)
  165.                 {
  166.                         $err_msg = "Connect Database $col_dsn success\n";
  167.                 }
  168.                 else
  169.                 {
  170.                         $err_msg = "Try Connect $col_dsn $sub_uid/*** $i times fail!\n";
  171.                         return -1;
  172.                 }
  173.         }
  174.         if(uc($db_driver) eq 'INFORMIX'){
  175.                 $subdbh->do("set isolation to dirty read ;");
  176.                 $subdbh->do("set lock mode to wait 600 ;");
  177.         }
  178.         if(uc(($db_driver)) eq 'ORACLE'){
  179.                 $subdbh->do("ALTER SESSION SET NLS_DATE_FORMAT = \'YYYY-MM-DD HH24:MI:SS\'");
  180.         }
  181.         return 1;
  182. }

  183. sub disconnect_db{
  184.         my $dbh=shift;
  185.         my $db_name=shift;
  186.         my $err_msg = shift;
  187.         $dbh->disconnect();
  188.         $err_msg = "Sucess disconnect $db_name\n";
  189. }

  190. sub exec_sql{
  191.         my $ex_dbh = shift;
  192.         my $ex_sql = shift;
  193.         my $err_msg = shift;
  194.         my @para = @_;
  195.         my $ex_sth;
  196.         $err_msg = "Will exec SQL :\n$ex_sql\n";
  197.         if(!($ex_sth = $ex_dbh->prepare($ex_sql)))
  198.         {
  199.                 $err_msg .= "$DBI::errstr\n";
  200.                 return -1;
  201.         }
  202.         if(!($ex_sth->execute(@para)))
  203.         {
  204.                 $err_msg .= "$DBI::errstr\n";
  205.                 return -1;
  206.         }
  207.         return \$ex_sth;
  208.        
  209. }

  210. #####################################################
  211. #打印日志函数
  212. #####################################################
  213. sub print_log
  214. {
  215.         my $msg = shift;
  216.         my $file_time = TimeOpt::CFtime("YYYY-MM-DD hh:mm:ss");
  217.         print $fh "$file_time\n----------------------\n$msg\n\n";
  218.         print "$file_time\n----------------------\n$msg\n\n";
  219. }
复制代码
程序输出如下:
  1. there are 10+1 files and now make the 0+1 st thread threads=SCALAR(0x34668b8)...
  2. there are 9+1 files and now make the 1+1 st thread threads=SCALAR(0x34668e8)...
  3. there are 8+1 files and now make the 2+1 st thread threads=SCALAR(0x3466918)...
  4. splitaa---Connect Database dbi:Oracle:UNICOMJS success

  5. splitaa---Will exec SQL :
  6. select max(int_id),max(userlabel) from objects_trans where object_class in (2001,2008) and userlabel like '%F4400-利源路%'

  7. splitab---Connect Database dbi:Oracle:UNICOMJS success

  8. splitab---Will exec SQL :
  9. select max(int_id),max(userlabel) from objects_trans where object_class in (2001,2008) and userlabel like '%F3248-华显%'

  10. there are 7+1 files and now make the 3+1 st thread threads=SCALAR(0x3466948)...
  11. splitac---Connect Database dbi:Oracle:UNICOMJS success

  12. splitac---Will exec SQL :
  13. select max(int_id),max(userlabel) from objects_trans where object_class in (2001,2008) and userlabel like '%F4119-东井村%'

  14. there are 6+1 files and now make the 4+1 st thread threads=SCALAR(0x3466978)...
  15. splitad---Connect Database dbi:Oracle:UNICOMJS success

  16. splitaa---Will exec SQL :
  17. select max(int_id),max(userlabel) from objects_trans where object_class in (2001,2008) and userlabel like '%11111111111%'

  18. splitab---Will exec SQL :
  19. select max(int_id),max(userlabel) from objects_trans where object_class in (2001,2008) and userlabel like '%F3138-河西九楼%'

  20. splitad---Will exec SQL :
  21. select max(int_id),max(userlabel) from objects_trans where object_class in (2001,2008) and userlabel like '%H1678-南师大体育馆%'

  22. there are 5+1 files and now make the 5+1 st thread threads=SCALAR(0x34669a8)...
  23. splitae---Connect Database dbi:Oracle:UNICOMJS success

  24. splitae---Will exec SQL :
  25. select max(int_id),max(userlabel) from objects_trans where object_class in (2001,2008) and userlabel like '%F5033-双林%'

  26. there are 4+1 files and now make the 6+1 st thread threads=SCALAR(0x34669d8)...
  27. splitaa---Will exec SQL :
  28. select max(int_id),max(userlabel) from objects where object_class = 9201 and userlabel like '%利源路 3g%'

  29. splitab---Will exec SQL :
  30. select max(int_id),max(userlabel) from objects where object_class = 201 and userlabel like 'G-%华显%'

  31. splitac---Will exec SQL :
  32. select max(int_id),max(userlabel) from objects_trans where object_class in (2001,2008) and userlabel like '%F4939-河西九楼(二)%'

  33. splitaf---Connect Database dbi:Oracle:UNICOMJS success

  34. splitad---Will exec SQL :
  35. select max(int_id),max(userlabel) from objects_trans where object_class in (2001,2008) and userlabel like '%H1000-河西六楼%'

  36. there are 3+1 files and now make the 7+1 st thread threads=SCALAR(0x3466a08)...
  37. splitaf---Will exec SQL :
  38. select max(int_id),max(userlabel) from objects_trans where object_class in (2001,2008) and userlabel like '%F5043-大花%'

  39. splitac---Will exec SQL :
  40. select max(int_id),max(userlabel) from objects where object_class = 201 and userlabel like 'G-%东井村%'

  41. splitag---Connect Database dbi:Oracle:UNICOMJS success

  42. there are 2+1 files and now make the 8+1 st thread threads=SCALAR(0x3466a38)...
  43. splitae---Will exec SQL :
  44. select max(int_id),max(userlabel) from objects_trans where object_class in (2001,2008) and userlabel like '%F4611-浦口新生圩%'

  45. splitag---Will exec SQL :
  46. select max(int_id),max(userlabel) from objects_trans where object_class in (2001,2008) and userlabel like '%H1970-狮树%'

  47. splitad---Will exec SQL :
  48. select max(int_id),max(userlabel) from objects where object_class = 9201 and userlabel like '%南师大体育馆%'

  49. splitah---Connect Database dbi:Oracle:UNICOMJS success

  50. splitah---Will exec SQL :
  51. select max(int_id),max(userlabel) from objects_trans where object_class in (2001,2008) and userlabel like '%F4122-安怀村2%'

  52. there are 1+1 files and now make the 9+1 st thread threads=SCALAR(0x3466a68)...
  53. splitae---Will exec SQL :
  54. select max(int_id),max(userlabel) from objects where object_class = 9201 and userlabel like '%双林%'

  55. splitaf---Will exec SQL :
  56. select max(int_id),max(userlabel) from objects_trans where object_class in (2001,2008) and userlabel like '%F4007-新生圩%'

  57. splitai---Connect Database dbi:Oracle:UNICOMJS success

  58. splitai---Will exec SQL :
  59. select max(int_id),max(userlabel) from objects_trans where object_class in (2001,2008) and userlabel like '%F4576-百家湖国际花园1%'
复制代码
circuit_add_bak.rar (1.69 KB, 下载次数: 7)

circuit_add_fork.rar (2.31 KB, 下载次数: 12)

circuit_add_thread.rar (2.1 KB, 下载次数: 12)





论坛徽章:
0
2 [报告]
发表于 2013-05-17 18:50 |只看该作者
没用过多线程。不过你可以先这样测试下你的代码,如果不用多线程,顺序执行某一个文件会不会正确呢?先排除掉你代码的错误再看是不是线程的问题。如果代码没错可以试着换多进程吧。

论坛徽章:
1
2015年辞旧岁徽章
日期:2015-03-03 16:54:15
3 [报告]
发表于 2013-05-18 09:04 |只看该作者
回复 1# mousimin

“threads模块是垃圾”--Marc Lehmann
  1. use AnyEvent::DBI;
复制代码

论坛徽章:
0
4 [报告]
发表于 2013-05-20 20:25 |只看该作者
回复 2# picbhan
谢谢回答,应该就是多线程的问题,我之前已经用普通方法和多进程实现了,只是用多线程的方法练习下,附上多进程的代码,请大家多指教。
circuit_add_fork.rar (2.31 KB, 下载次数: 17)

   

论坛徽章:
0
5 [报告]
发表于 2013-05-20 20:28 |只看该作者
回复 3# py

多谢指教,经过测试,确实是threads模块本身的问题,已经换多进程的方法解决,并附了代码,请多指教。
AnyEvent模块不熟,以后慢慢学习~
   

论坛徽章:
0
6 [报告]
发表于 2013-05-20 21:33 |只看该作者
回复 4# mousimin


    这个如果多进程已经对了的话就不需要再继续去看代码了吧,毕竟还是有点长啊,呵呵。

论坛徽章:
0
7 [报告]
发表于 2013-05-21 08:56 |只看该作者
回复 6# picbhan


    嗯,我放上主要是让有需要的人看下,顺便指点下程序的不足。
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP