Spark Struct Streaming 写入Hbase 出错 Task not serializable: java.io.NotSerializableException

时间:2023-01-16 20:49:27

在使用spark2.2d的Struct Streaming写入Hbase数据库时报错 Task not serializable: java.io.NotSerializableException:org.apache.hadoop.hbase.client.HTable

报错说是什么序列化的错误,搞得我去修改spark的配置文件.
然而真正的原因是我把创建Hbase连接的代码写在foreach里面了.

具体原因可能是因为每次遍历都创建一个hbase连接,导致每个连接之间的序列化不一样(只是个人猜测,希望有人能指正)

下面代码是错误的.创建Hbase连接的代码不应该在foreach块里.


        import spark.implicits._

        println("========================= 计算每个小时实时的应用的使用时间 ==================================")
        println("========================= 需要输入查找天数参数======================================")
        var day = "2018-03-14"
        val hourData = spark
            .readStream
            .schema(schema)
            .json("D:\\data\\behavior\\"+day+"*.log")

        val appActiveTimeByHour = hourData
            .withColumn("data", explode($"data"))
            .select("day","endtime","data.*")
            .groupBy(
                window($"endtime", "10 minutes", "5 minutes"),
                $"package"
            )
            .sum("activetime")
            .writeStream
            .outputMode("complete")
            .foreach(
                new ForeachWriter[Row]{
                    println("========================= 开始======================================")
                        //  hbase的相关配置
    val hbaseconf: Configuration = HBaseConfiguration.create()
    hbaseconf.set("hbase.zookeeper.quorum",zookeeperservers)
    hbaseconf.set("hbase.zookeeper.property.clientPort", "2181")
    //hbaseconf.set("zookeeper.znode.parent","/hbase-unsecure")
    var connection: Connection = ConnectionFactory.createConnection(hbaseconf)                  

                    println("========================= 结束======================================")

                    def open(partitionId:Long,version:Long):Boolean={
                        true
                    }
                    def process(record:Row):Unit={
                        println(record.toString)
                        println(record.get(0).toString)
                        println(record.get(1).toString)
                        println(record.get(2).toString)
                        println("========================= ggg======================================")
                        val conf = HBaseConfiguration.create()
                        val table = connection.getTable(TableName.valueOf(tablename))
                        val theput= new Put(Bytes.toBytes(record.get(0).toString))
                        theput.addColumn(Bytes.toBytes("info"),Bytes.toBytes("package"),Bytes.toBytes(record.get(0).toString))
                        theput.addColumn(Bytes.toBytes("info"),Bytes.toBytes("day"),Bytes.toBytes(record.get(0).toString))
                        theput.addColumn(Bytes.toBytes("info"),Bytes.toBytes("endtime"),Bytes.toBytes(record.get(0).toString))
                        theput.addColumn(Bytes.toBytes("info"),Bytes.toBytes("activetime"),Bytes.toBytes(record.get(0).toString))
                        table.put(theput)

                    }
                    def close(errorOrNull:Throwable):Unit={
                        println("存入hbase出错")
                    }
                }
                )
            .queryName("HourlyUsage")
            .format("foreach")
            //.format("console")
            //.trigger( Trigger.ProcessingTime("20 seconds"))
            .start()
        appActiveTimeByHour.awaitTermination()
        println("=================================================================================")




    }

}