middle

时间:2023-03-09 18:38:33
middle

/**
* Created by lkl on 2017/7/31.
*/
/**
* Created by lkl on 2017/6/26.
*///spark-shell --driver-class-path /home/hadoop/test/mysqljdbc.jar
import java.math.BigDecimal
import java.sql.{DriverManager, ResultSet}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import java.text.SimpleDateFormat
import java.util.Date
object middle {

val rl= "jdbc:mysql://10.19.65.17:54321/emotion?user=emotion&password=qingxu&useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false"
classOf[com.mysql.jdbc.Driver]
val conn = DriverManager.getConnection(rl)

def main(args: Array[String]) {
val conf = new SparkConf()
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val titlesplit1 = sqlContext.jdbc("jdbc:mysql://10.19.65.17:54321/emotion?user=emotion&password=qingxu", "titlesplit")
val titlesplit = titlesplit1.toDF().registerTempTable("titlesplit")

val category1 = sqlContext.jdbc("jdbc:mysql://10.19.65.17:54321/emotion?user=emotion&password=qingxu", "category")
val category = category1.toDF().registerTempTable("category")
val categorys=sqlContext.sql("select category.`id`,category.`number`,category.`category`,category.`words` from category").toDF("id","number","category","words").registerTempTable("categorys")

val layer1 = sqlContext.jdbc("jdbc:mysql://10.19.65.17:54321/emotion?user=emotion&password=qingxu", "layer")
val layer = layer1.toDF().registerTempTable("layer")

val format = new java.text.SimpleDateFormat("yyyyMMdd")
val date = format.format(new java.util.Date().getTime())
import sqlContext.implicits._

val value = sqlContext.sql("SELECT titlesplit.`innserSessionid`," +
"titlesplit.`times`,categorys.`number`," +
"categorys.`category`," +
"SUM(layer.`VALUE`) svalue," +
"COUNT(DISTINCT titlesplit.`id`) AS wordscount," +
"COUNT(DISTINCT categorys.`id`) AS categoryscount," +
"COUNT(DISTINCT categorys.`id`)/COUNT(DISTINCT titlesplit.`id`) AS rank" +
" FROM titlesplit " +
" LEFT JOIN `layer` " +
"ON titlesplit.`words`=layer.`words` " +
" LEFT JOIN categorys " +
" ON titlesplit.`words`=categorys.`words`" +
"GROUP BY titlesplit.`innserSessionid`,titlesplit.`times`," +
"categorys.`number`,categorys.`category`").toDF("innserSessionid", "times", "number","category", "svalue", "wordscount", "categoryscount", "rank")

// value.insertIntoJDBC(rl,"middles",false)

//val jo = value.toDF("innserSessionid", "times", "category", "svalue", "wordscount", "categoryscount", "rank")

val p1 = value.map(p => {
val v0 = p.getString(0)
val v1 = p.getString(1)
val v2 = p.getString(2)
val v3 = p.getString(3)
val v4 = p.getDecimal(4)
var v5 = p.getLong(5)
val v6 = p.getLong(6)
val v7 = p.getDouble(7)
(v0,v1,v2,v3,v4,v5,v6,v7)

})
p1.foreach(p => {
val v1=p._1
val v2=p._2
val v3=p._3
val v4=p._4
val v5=p._5
val v6=p._6
val v7=p._7
val v8=p._8
val format2 = new java.text.SimpleDateFormat("yyyy-MM-dd")
val dat = format2.format(new java.util.Date().getTime() - 1 * 24 * 60 * 60 * 1000).toString
if(v2==dat)
{
insert(v1,v2,v3,v4,v5,v6,v7,v8)
}
})
conn.close()

}

def insert (value0:String,value1:String,value2:String,value3:String,value4:BigDecimal,value5:Long,value6:Long,value7:Double): Unit ={

val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE)
// CREATE TABLE words2(innersessionId VARCHAR(100),words VARCHAR(100), VARCHAR(100),posit VARCHAR(100),va VARCHAR(100))
try {
val prep = conn.prepareStatement("INSERT INTO middle(innserSessionid,times,number,category,svalue,wordscount,categoryscount,rank) VALUES (?,?,?,?,?,?,?,?) ")
prep.setString(1,value0)
prep.setString(2,value1)
prep.setString(3,value2)
prep.setString(4,value3)
prep.setBigDecimal(5,value4)
prep.setLong(6,value5)
prep.setLong(7,value6)
prep.setDouble(8,value7)
prep.executeUpdate
} catch{
case e:Exception =>e.printStackTrace
}
finally {
}

}

}