【原创】大数据基础之Spark(3)Spark Thrift实现原理及代码实现

时间:2023-03-09 04:28:11
【原创】大数据基础之Spark(3)Spark Thrift实现原理及代码实现

spark 2.1.1

一 启动命令

启动spark thrift命令

$SPARK_HOME/sbin/start-thriftserver.sh

然后会执行

org.apache.spark.deploy.SparkSubmit --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2

二 启动过程及代码分析

hive thrift代码详见:https://www.cnblogs.com/barneywill/p/10185168.html

HiveThriftServer2是spark thrift核心类,继承自Hive的HiveServer2

org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 extends org.apache.hive.service.server.HiveServer2

启动过程:

HiveThriftServer2.main

SparkSQLEnv.init (sparkConf sparkSession sparkContext sqlContext)

HiveThriftServer2.init

addService(ThriftBinaryCLIService)

HiveThriftServer2.start

ThriftBinaryCLIService.run

TServer.serve

类结构:【接口或父类->子类】

TServer->TThreadPoolServer

TProcessorFactory->SQLPlainProcessorFactory

TProcessor->TSetIpAddressProcessor

ThriftCLIService->ThriftBinaryCLIService

CLIService->SparkSQLCLIService (核心子类)

服务初始化过程:

CLIService.init

SparkSQLCLIService.init

addService(SparkSQLSessionManager)

initCompositeService

SparkSQLSessionManager.init

addService(SparkSQLOperationManager)

initCompositeService

SparkSQLOperationManager.init

三 DDL执行过程

ddl执行过程需要和hive metastore交互

从执行计划开始:

spark-sql> explain create table test_table(id string);
== Physical Plan ==
ExecutedCommand
+- CreateTableCommand CatalogTable(
Table: `test_table`
Created: Wed Dec 19 18:04:15 CST 2018
Last Access: Thu Jan 01 07:59:59 CST 1970
Type: MANAGED
Schema: [StructField(id,StringType,true)]
Provider: hive
Storage(InputFormat: org.apache.hadoop.mapred.TextInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat)), false
Time taken: 0.28 seconds, Fetched 1 row(s)

从执行计划里可以找到具体的Command,这里是CreateTableCommand

org.apache.spark.sql.execution.command.tables

case class CreateTableCommand(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand {

  override def run(sparkSession: SparkSession): Seq[Row] = {
sparkSession.sessionState.catalog.createTable(table, ifNotExists)
Seq.empty[Row]
}
}

这里可以看到是直接将请求分发给sparkSession.sessionState.catalog

org.apache.spark.sql.internal.SessionState

  /**
* Internal catalog for managing table and database states.
*/
lazy val catalog = new SessionCatalog(
sparkSession.sharedState.externalCatalog,
sparkSession.sharedState.globalTempViewManager,
functionResourceLoader,
functionRegistry,
conf,
newHadoopConf())

取的是sparkSession.sharedState.externalCatalog

org.apache.spark.sql.internal.SharedState

  /**
* A catalog that interacts with external systems.
*/
val externalCatalog: ExternalCatalog =
SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
SharedState.externalCatalogClassName(sparkContext.conf),
sparkContext.conf,
sparkContext.hadoopConfiguration)
...
private val HIVE_EXTERNAL_CATALOG_CLASS_NAME = "org.apache.spark.sql.hive.HiveExternalCatalog" private def externalCatalogClassName(conf: SparkConf): String = {
conf.get(CATALOG_IMPLEMENTATION) match {
case "hive" => HIVE_EXTERNAL_CATALOG_CLASS_NAME
case "in-memory" => classOf[InMemoryCatalog].getCanonicalName
}
}

这里可以看到是通过externalCatalogClassName反射实例化的,代码里硬编码使用的是org.apache.spark.sql.hive.HiveExternalCatalog

org.apache.spark.sql.hive.HiveExternalCatalog

  /**
* A Hive client used to interact with the metastore.
*/
val client: HiveClient = {
HiveUtils.newClientForMetadata(conf, hadoopConf)
} private def withClient[T](body: => T): T = synchronized {
try {
body
} catch {
case NonFatal(exception) if isClientException(exception) =>
val e = exception match {
// Since we are using shim, the exceptions thrown by the underlying method of
// Method.invoke() are wrapped by InvocationTargetException
case i: InvocationTargetException => i.getCause
case o => o
}
throw new AnalysisException(
e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e))
}
} override def createDatabase(
dbDefinition: CatalogDatabase,
ignoreIfExists: Boolean): Unit = withClient {
client.createDatabase(dbDefinition, ignoreIfExists)
}

这个类里执行任何ddl方法都会执行withClient,而withClient有synchronized,执行过程是直接把请求分发给client,下面看client是什么

org.apache.spark.sql.hive.client.IsolatedClientLoader

  /** The isolated client interface to Hive. */
private[hive] def createClient(): HiveClient = {
if (!isolationOn) {
return new HiveClientImpl(version, sparkConf, hadoopConf, config, baseClassLoader, this)
}
// Pre-reflective instantiation setup.
logDebug("Initializing the logger to avoid disaster...")
val origLoader = Thread.currentThread().getContextClassLoader
Thread.currentThread.setContextClassLoader(classLoader) try {
classLoader
.loadClass(classOf[HiveClientImpl].getName)
.getConstructors.head
.newInstance(version, sparkConf, hadoopConf, config, classLoader, this)
.asInstanceOf[HiveClient]
} catch {

可见client直接用的是org.apache.spark.sql.hive.client.HiveClientImpl

org.apache.spark.sql.hive.client.HiveClientImpl

  def withHiveState[A](f: => A): A = retryLocked {
val original = Thread.currentThread().getContextClassLoader
// Set the thread local metastore client to the client associated with this HiveClientImpl.
Hive.set(client)
// The classloader in clientLoader could be changed after addJar, always use the latest
// classloader
state.getConf.setClassLoader(clientLoader.classLoader)
// setCurrentSessionState will use the classLoader associated
// with the HiveConf in `state` to override the context class loader of the current
// thread.
shim.setCurrentSessionState(state)
val ret = try f finally {
Thread.currentThread().setContextClassLoader(original)
HiveCatalogMetrics.incrementHiveClientCalls(1)
}
ret
}
private def retryLocked[A](f: => A): A = clientLoader.synchronized {
... override def createDatabase(
database: CatalogDatabase,
ignoreIfExists: Boolean): Unit = withHiveState {
client.createDatabase(
new HiveDatabase(
database.name,
database.description,
database.locationUri,
Option(database.properties).map(_.asJava).orNull),
ignoreIfExists)
}

这个类执行任何ddl方法都会执行withHiveState,withHiveState会执行retryLocked,retryLocked上有synchronized;而且这里也是直接将请求分发给client,这里的client是hive的类org.apache.hadoop.hive.ql.metadata.Hive

四 DML执行过程

dml执行过程最后会执行到spark.sql

sql执行过程:

CLIService.executeStatement (返回OperationHandle)

SessionManager.getSession

SessionManager.openSession

SparkSQLSessionManager.openSession

SparkSQLOperationManager.sessionToContexts.set (openSession时:session和sqlContext建立映射)

HiveSession.executeStatement

HiveSessionImpl.executeStatementInternal

OperationManager.newExecuteStatementOperation

SparkSQLOperationManager.newExecuteStatementOperation

SparkSQLOperationManager.sessionToContexts.get (通过session取到sqlContext)

ExecuteStatementOperation.run

SparkExecuteStatementOperation.run

SparkExecuteStatementOperation.execute

SQLContext.sql (熟悉的spark sql)

可见从SparkSQLCLIService初始化开始,逐个将各个类的实现类改为spark的子类比如:

org.apache.spark.sql.hive.thriftserver.SparkSQLSessionManager extends org.apache.hive.service.cli.session.SessionManager
org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager extends org.apache.hive.service.cli.operation.OperationManager
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation extends org.apache.hive.service.cli.operation.ExecuteStatementOperation

从而实现底层实现的替换;

hive的HiveServer2为什么这么容易的被扩展,详见spark代码的sql/hive-thriftserver,这里应该是将hive1.2代码做了很多修改,以后升级就不那么容易了;
至于spark为什么要花这么大力气扩展HiveServer2而不是重新实现,可能是为了保持接口一致,这样有利于原来使用hive thrift的用户平滑的迁移到spark thrift,因为唯一的改动就是切换url,实际上,相同sql下的spark thrift和hive thrift表现还是有很多不同的。