免费注册 查看新帖 |

ChinaUnix.net

  平台 论坛 博客 文库 频道自动化运维 虚拟化 储存备份 C/C++ PHP MySQL 嵌入式 Linux系统
最近访问板块 发新帖
查看: 11153 | 回复: 2

[Spark] java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWrita [复制链接]

论坛徽章:
1
戌狗
日期:2013-10-24 17:31:55
发表于 2015-11-21 19:45 |显示全部楼层
以下程序提交后报错(错误列在后边),请指教!

import com.twitter.chill.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

/**
* Created by lyxing.bj on 2015/11/17.
*/
public class Spark_hbase_test implements Serializable {
    static String convertScanToString (Scan scan)throws IOException {
        ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
        return Base64.encodeBytes(proto.toByteArray());
    }

    public void start() throws Exception {

        JavaSparkContext sc = new JavaSparkContext("local", "spark-hbase-test", System.getenv("SPARK_HOME", System.getenv("JARS");
        
        Configuration conf = HBaseConfiguration.create();
        Connection connection = ConnectionFactory.createConnection(conf);
        Scan scan = new Scan();
        SingleColumnValueFilter scvf = new SingleColumnValueFilter(Bytes.toBytes("meta_data",Bytes.toBytes("HDSD00_01_185_D", CompareFilter.CompareOp.GREATER_OR_EQUAL,new BinaryComparator("2015-10-01 00:00:00".getBytes()));
        SingleColumnValueFilter scvf1 = new SingleColumnValueFilter(Bytes.toBytes("meta_data",Bytes.toBytes("HDSD00_01_185_D", CompareFilter.CompareOp.LESS_OR_EQUAL,new BinaryComparator("2015-10-31 00:00:00".getBytes()));
        scvf.setFilterIfMissing(true);
        scvf1.setFilterIfMissing(true);
        List<Filter> filters = new ArrayList<Filter>();
        filters.add(scvf);
        filters.add(scvf1);
        FilterList fl = new FilterList(FilterList.Operator.MUST_PASS_ALL,filters);

        scan.setFilter(fl);
        scan.addColumn(Bytes.toBytes("meta_data",Bytes.toBytes("HDSD00_01_185_D");
        scan.addColumn(Bytes.toBytes("meta_data",Bytes.toBytes("HDSD00_01_003");
        try {
            String tablename = "HDSC02_09";
            conf.set(HConstants.ZOOKEEPER_QUORUM, "node1.jkzldev.com,node2.jkzldev.com,node4.jkzldev.com:2181");

            conf.set("zookeeper.znode.parent","/hbase-unsecure");

            conf.set(TableInputFormat.INPUT_TABLE, tablename);
            conf.set(TableInputFormat.SCAN, convertScanToString(scan));

            JavaPairRDD<ImmutableBytesWritable, Result> hbaseRDD = sc.newAPIHadoopRDD(conf,
                    TableInputFormat.class, ImmutableBytesWritable.class, Result.class);

            System.out.println("hbaseRDD'S count=" + hbaseRDD.count());
            List<Tuple2<ImmutableBytesWritable,Result>> hbaseRDD_list = hbaseRDD.collect();
            for (Tuple2<ImmutableBytesWritable,Result> tuple : hbaseRDD_list) {
                System.out.println("eeeeee-----" + tuple._2().getValue(Bytes.toBytes("meta_data"),Bytes.toBytes("HDSD00_01_185_D")) );
            }

            //构建RDD transformation
            JavaPairRDD<String,Integer> discharge = hbaseRDD.mapToPair(
                    new PairFunction<Tuple2<ImmutableBytesWritable,Result>,String,Integer>(){

                        public Tuple2<String,Integer> call(
                                Tuple2<ImmutableBytesWritable,Result> immutableBytesWritableResultTuple2
                        )throws Exception{
                            //时间
                            byte[] HDSD00_01_185_D = immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("meta_data"),Bytes.toBytes("HDSD00_01_185_D"));
                            //性别
                            byte[] HDSD00_01_003 = immutableBytesWritableResultTuple2._2.getValue(Bytes.toBytes("meta_data"),Bytes.toBytes("HDSD00_01_003"));

                            if(HDSD00_01_185_D != null && HDSD00_01_003 !=null ){
                                System.out.println("time+office" + Bytes.toString(HDSD00_01_185_D).substring(0,9) + Bytes.toString(HDSD00_01_003));
                                return new Tuple2<String, Integer>(Bytes.toString(HDSD00_01_185_D).substring(0,9) + Bytes.toString(HDSD00_01_003),1);
                            }
                            return null;
                        }
                    }
            );
            System.out.println("discharge's count=" + discharge.count());
            List<Tuple2<String,Integer>> discharge1 = discharge.collect();

            System.out.println("result=" + discharge1.get(1)._1 + discharge1.get(1)._2);
            //构建RDD action
            JavaPairRDD<String,Integer> counts = discharge.reduceByKey(new Function2<Integer, Integer, Integer>() {
                public Integer call(Integer integer, Integer integer2) throws Exception {
                    return integer + integer2;
                }
            });
            if(counts.count()>0){
                List<Tuple2<String, Integer>> one = counts.take(1);

                System.out.print(one.get(1)._1 + one.get(1)._2);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {
//        new Spark_hbase_test().start();
        new Spark_hbase_test().start();
        System.exit(0);
    }
}

错误记录如下:
566a61e0x0, quorum=node1.jkzldev.com:2181,node2.jkzldev.com:2181,node4.jkzldev.com:2181, baseZNode=/hbase-unsecure
15/11/20 15:37:04 INFO ClientCnxn: Opening socket connection to server node1.jkzldev.com/172.17.110.116:2181. Will not attempt to authenticate using SASL (unknown error)
15/11/20 15:37:04 INFO ClientCnxn: Socket connection established to node1.jkzldev.com/172.17.110.116:2181, initiating session
15/11/20 15:37:04 INFO ClientCnxn: Session establishment complete on server node1.jkzldev.com/172.17.110.116:2181, sessionid = 0x151149ae9b600bf, negotiated timeout = 40000
15/11/20 15:37:04 INFO TableInputFormatBase: Input split length: 1 M bytes.
15/11/20 15:37:04 INFO ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x151149ae9b600bf
15/11/20 15:37:04 INFO ZooKeeper: Session: 0x151149ae9b600bf closed
15/11/20 15:37:04 INFO ClientCnxn: EventThread shut down
15/11/20 15:37:05 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable
Serialization stack:
        - object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 34 31 38 37 32 36 30 37 2d 39 5f 31 30 32 39 36 39 30 36 5f 30 30 30 36 32 33 34 32 37 5f 31 34 34 34 30 36 30 38 30 30 30 30 30 24 30)
        - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
        - object (class scala.Tuple2, (34 31 38 37 32 36 30 37 2d 39 5f 31 30 32 39 36 39 30 36 5f 30 30 30 36 32 33 34 32 37 5f 31 34 34 34 30 36 30 38 30 30 30 30 30 24 30,keyvalues={41872607-9_10282457_000614540_1443888000000$0/meta_data:HDSD00_01_185_D/1447233947980/Put/vlen=19/seqid=0}))
        - element of array (index: 0)
        - array (class [Lscala.Tuple2;, size 546)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:3
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
15/11/20 15:37:05 ERROR TaskSetManager: Task 0.0 in stage 1.0 (TID 1) had a not serializable result: org.apache.hadoop.hbase.io.ImmutableBytesWritable
Serialization stack:
        - object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 34 31 38 37 32 36 30 37 2d 39 5f 31 30 32 39 36 39 30 36 5f 30 30 30 36 32 33 34 32 37 5f 31 34 34 34 30 36 30 38 30 30 30 30 30 24 30)
        - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
        - object (class scala.Tuple2, (34 31 38 37 32 36 30 37 2d 39 5f 31 30 32 39 36 39 30 36 5f 30 30 30 36 32 33 34 32 37 5f 31 34 34 34 30 36 30 38 30 30 30 30 30 24 30,keyvalues={41872607-9_10282457_000614540_1443888000000$0/meta_data:HDSD00_01_185_D/1447233947980/Put/vlen=19/seqid=0}))
        - element of array (index: 0)
        - array (class [Lscala.Tuple2;, size 546); not retrying
15/11/20 15:37:05 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
15/11/20 15:37:05 INFO TaskSchedulerImpl: Cancelling stage 1
15/11/20 15:37:05 INFO DAGScheduler: Stage 1 (collect at Spark_hbase_test.java:231) failed in 0.227 s
15/11/20 15:37:05 INFO DAGScheduler: Job 1 failed: collect at Spark_hbase_test.java:231, took 0.241115 s

论坛徽章:
15
2015七夕节徽章
日期:2015-08-21 11:06:172017金鸡报晓
日期:2017-01-10 15:19:56极客徽章
日期:2016-12-07 14:07:30shanzhi
日期:2016-06-17 17:59:3115-16赛季CBA联赛之四川
日期:2016-04-13 14:36:562016猴年福章徽章
日期:2016-02-18 15:30:34IT运维版块每日发帖之星
日期:2016-01-28 06:20:0015-16赛季CBA联赛之新疆
日期:2016-01-25 14:01:34IT运维版块每周发帖之星
日期:2016-01-07 23:04:26数据库技术版块每日发帖之星
日期:2016-01-03 06:20:00数据库技术版块每日发帖之星
日期:2015-12-01 06:20:00IT运维版块每日发帖之星
日期:2015-11-10 06:20:00
发表于 2015-11-23 21:30 |显示全部楼层
我估计你baidu一下就能得到答案,从抛出的异常来看,我估计你看看这个程序就知道了
http://my.oschina.net/132722/blog/196350

论坛徽章:
1
戌狗
日期:2013-10-24 17:31:55
发表于 2016-05-16 09:00 |显示全部楼层
回复 2# heguangwu


    谢谢
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

数据风云,十年变迁
DTCC 第十届中国数据库技术大会已启航!

2019年5月8日~5月10日,由IT168旗下ITPUB企业社区平台主办的第十届中国数据库技术大会(DTCC2019),将在北京隆重召开。大会将邀请百余位行业专家,就热点技术话题进行分享,是广大数据领域从业人士的又一次年度盛会和交流平台。与SACC2018类似,本届大会将采用“3+2”模式:3天传统技术演讲+2天深度主题培训。大会不仅提供超100场的主题演讲,还会提供连续2天的深度课程培训,深化数据领域的项目落地实践方案。
DTCC2019,一场值得期待的数据技术盛会,殷切地希望您报名参与!

活动入口>>
  

北京盛拓优讯信息技术有限公司. 版权所有 16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122
中国互联网协会会员  联系我们:huangweiwei@it168.com
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP