在DStream.transform()中使用SQL ?

时间:2023-02-09 20:49:55

There are some examples for use SQL over Spark Streaming in foreachRDD(). But if I want to use SQL in tranform():

有一些在foreachRDD()中使用SQL / Spark流的示例。但是,如果我想要在tranform()中使用SQL:

case class AlertMsg(host:String, count:Int, sum:Double)
val lines = ssc.socketTextStream("localhost", 8888)
lines.transform( rdd => {
  if (rdd.count > 0) {
    val t = sqc.jsonRDD(rdd)
    t.registerTempTable("logstash")
    val sqlreport = sqc.sql("SELECT host, COUNT(host) AS host_c, AVG(lineno) AS line_a FROM logstash WHERE path = '/var/log/system.log' AND lineno > 70 GROUP BY host ORDER BY host_c DESC LIMIT 100")
    sqlreport.map(r => AlertMsg(r(0).toString,r(1).toString.toInt,r(2).toString.toDouble))
  } else {
    rdd
  }
}).print()

I got such error:

我得到了这样的错误:

[error] /Users/raochenlin/Downloads/spark-1.2.0-bin-hadoop2.4/logstash/src/main/scala/LogStash.scala:52: no type parameters for method transform: (transformFunc: org.apache.spark.rdd.RDD[String] => org.apache.spark.rdd.RDD[U])(implicit evidence$5: scala.reflect.ClassTag[U])org.apache.spark.streaming.dstream.DStream[U] exist so that it can be applied to arguments (org.apache.spark.rdd.RDD[String] => org.apache.spark.rdd.RDD[_ >: LogStash.AlertMsg with String <: java.io.Serializable]) [error] --- because --- [error] argument expression's type is not compatible with formal parameter type; [error] found : org.apache.spark.rdd.RDD[String] => org.apache.spark.rdd.RDD[_ >: LogStash.AlertMsg with String <: java.io.Serializable] [error] required: org.apache.spark.rdd.RDD[String] => org.apache.spark.rdd.RDD[?U] [error] lines.transform( rdd => { [error] ^ [error] one error found [error] (compile:compile) Compilation failed

(错误)/用户/ raochenlin /下载/ spark-1.2.0-bin-hadoop2.4 / logstash / src / main / scala / logstash。scala:52:没有方法转换的类型参数(transformFunc: org.apache.spark.rdd)。RDD[String] => org.apache.spark.rdd.RDD[U](隐式证据$5:scala.reflec.classtag [U]) org.apache.spark.stream.dstream。DStream[U]的存在使它可以应用于参数(org.apache.spark.rdd)。抽样[String]= > org.apache.spark.rdd。(_ >:LogStash抽样。由于---[错误]参数表达式的类型与形式参数类型不兼容;(错误)发现:org.apache.spark.rdd。抽样[String]= > org.apache.spark.rdd。(_ >:LogStash抽样。具有字符串<:java.io的AlertMsg。可序列化的][错误]要求:org.apache.spark.rdd。抽样[String]= > org.apache.spark.rdd.RDD[?U][错误]。变换抽样= > {(错误)^(错误)发现一个错误(错误)(编译:编译)编译失败

Seems only if I use sqlreport.map(r => r.toString) can be a correct usage?

除非我使用sqlreport。map(r => r. tostring)是正确的用法吗?

1 个解决方案

#1


0  

dstream.transform take a function transformFunc: (RDD[T]) ⇒ RDD[U] In this case, the if must result in the same type on both evaluations of the condition, which is not the case:

dstream。变换函数transformFunc:抽样[T])⇒抽样(U)在这种情况下,如果必须导致相同类型的两个条件的评估,这并非如此:

if (count == 0) => RDD[String]
if (count > 0) => RDD[AlertMsg]

In this case, remove the optimization of if rdd.count ... sothat you have an unique transformation path.

在这种情况下,删除if rdd的优化。数……所以你有一个独特的变换路径。

#1


0  

dstream.transform take a function transformFunc: (RDD[T]) ⇒ RDD[U] In this case, the if must result in the same type on both evaluations of the condition, which is not the case:

dstream。变换函数transformFunc:抽样[T])⇒抽样(U)在这种情况下,如果必须导致相同类型的两个条件的评估,这并非如此:

if (count == 0) => RDD[String]
if (count > 0) => RDD[AlertMsg]

In this case, remove the optimization of if rdd.count ... sothat you have an unique transformation path.

在这种情况下,删除if rdd的优化。数……所以你有一个独特的变换路径。