从Hive源码解读大数据开发为什么可以脱离SQL、Java、Scala

时间:2022-03-01 01:27:20

从Hive源码解读大数据开发为什么可以脱离SQL、Java、Scala

前言

【本文适合有一定计算机基础/半年工作经验的读者食用。立个Flg,愿天下不再有肤浅的SQL Boy】

谈到大数据开发,占据绝大多数人口的就是SQL Boy,不接受反驳,毕竟大数据主要就是为机器学习和统计报表服务的,自然从Oracle数据库开发转过来并且还是只会写几句SQL的人不在少数,个别会Python写个spark.sql(“一个sql字符串”)的已经是SQL Boy中的人才。这种只能处理结构化表的最基础的大数据开发人员,就是我们常提到的梗:肤浅的SQL Boy。。。对大数据完全不懂,思想还停留在数据库时代,大数据组件也都是拿来当RDBMS来用。。。这种业务开发人员的技术水平其实不敢恭维。

还有从Java后端开发转过来的,虽然不适应,但还是可以一个Main方法流畅地操作Spark、Flink,手写个JDBC,做点简单的二开,这种就是平台开发人员,技术水平要更高一些。Java写得好,Scala其实上手也快。

但是。。。这并不代表做大数据只能用SQL/Java/Scala。。。这么局限的话,也不比SQL Boy强到哪里去。

笔者最早还搞过嵌入式开发,自然明白C/C#/C++也可以搞大数据。。。

本文将以大数据开发中最常见的数仓组件Hive的drop table为例,抛砖引玉,解读为神马大数据开发可以脱离SQL、Java、Scala。

为神马可以脱离SQL

数据不外乎结构化数据和非结构化数据,SQL只能处理极其有限的结构化表【RDBMS、整齐的csv/tsv等】,绝大多数的半结构化、非结构化数据SQL是无能为力的【log日志文件、音图等】。古代的MapReduce本身就不可以用SQL,Spark和Flink老版本都是基于API的,没有SQL的年代大家也活得好好的。大数据组件对SQL的支持日渐友好都是后来的事情,主要是为了降低门槛,让SQL Boy也可以用上大数据技术。

肤浅的SQL Boy们当然只知道:

drop table db_name.tb_name;

正常情况这个Hive表就会被drop掉,认知也就局限于Hive是个数据库。

但是大数据平台开发知道去翻看Hive的Java API:

https://svn.apache.org/repos/infra/websites/production/hive/content/javadocs/r3.1.3/api/index.html

知道还有这种方式:

package com.zhiyong;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;

/**
 * @program: zhiyong_study
 * @description: 测试MetaStore
 * @author: zhiyong
 * @create: 2023-03-22 22:57
 **/
public class MetaStoreDemo {
    public static void main(String[] args) throws Exception{
        HiveConf hiveConf = new HiveConf();
        HiveMetaStoreClient client = new HiveMetaStoreClient(hiveConf);
        client.dropTable("db_name","tb_name");
    }
}

通过调用API的方式,同样可以drop掉表。显然不一定要用DDL。通过HiveMetaStoreClient的方式,还可以create建表等操作。

懂大数据底层的平台开发当然还有更狠的方式:直接连Hive存元数据的MySQL,对元数据表的数据做精准crud。。。

对结构化表的ETL或者其它的运算处理完全可以用Spark的DataFrame、Flink的DataStream编程,纯API方式实现,SQL能实现的Java和Scala都能实现,至于SQL实现不了的Java和Scala也能实现。。。

笔者实在是想不到除了RDBMS和各类包皮产品【在开源的Apache组件基础上做一些封装】,还有哪些场景是只能用SQL的。。。

至此,可以说明大数据可以脱离SQL。

为神马可以脱离Java

虽然Hive底层是Java写的,但是这并不意味着只能用Java操作Hive。认知这么肤浅的话,也就活该一辈子调参调API了。。。

找到dropTable的实际入口

从Hive3.1.2源码,可以找到dropTable方法:

@Override
  public void dropTable(String dbname, String name, boolean deleteData,
      boolean ignoreUnknownTab) throws MetaException, TException,
      NoSuchObjectException, UnsupportedOperationException {
    dropTable(getDefaultCatalog(conf), dbname, name, deleteData, ignoreUnknownTab, null);
  }

  @Override
  public void dropTable(String dbname, String name, boolean deleteData,
      boolean ignoreUnknownTab, boolean ifPurge) throws TException {
    dropTable(getDefaultCatalog(conf), dbname, name, deleteData, ignoreUnknownTab, ifPurge);
  }

  @Override
  public void dropTable(String dbname, String name) throws TException {
    dropTable(getDefaultCatalog(conf), dbname, name, true, true, null);
  }

  @Override
  public void dropTable(String catName, String dbName, String tableName, boolean deleteData,
                        boolean ignoreUnknownTable, boolean ifPurge) throws TException {
    //build new environmentContext with ifPurge;
    EnvironmentContext envContext = null;
    if(ifPurge){
      Map<String, String> warehouseOptions;
      warehouseOptions = new HashMap<>();
      warehouseOptions.put("ifPurge", "TRUE");
      envContext = new EnvironmentContext(warehouseOptions);
    }
    dropTable(catName, dbName, tableName, deleteData, ignoreUnknownTable, envContext);

  }

虽然有多个同名方法,但是底层调用的还是同一个方法:

  /**
   * Drop the table and choose whether to: delete the underlying table data;
   * throw if the table doesn't exist; save the data in the trash.
   *
   * @param catName catalog name
   * @param dbname database name
   * @param name table name
   * @param deleteData
   *          delete the underlying data or just delete the table in metadata
   * @param ignoreUnknownTab
   *          don't throw if the requested table doesn't exist
   * @param envContext
   *          for communicating with thrift
   * @throws MetaException
   *           could not drop table properly
   * @throws NoSuchObjectException
   *           the table wasn't found
   * @throws TException
   *           a thrift communication error occurred
   * @throws UnsupportedOperationException
   *           dropping an index table is not allowed
   * @see org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface#drop_table(java.lang.String,
   *      java.lang.String, boolean)
   */
  public void dropTable(String catName, String dbname, String name, boolean deleteData,
      boolean ignoreUnknownTab, EnvironmentContext envContext) throws MetaException, TException,
      NoSuchObjectException, UnsupportedOperationException {
    Table tbl;
    try {
      tbl = getTable(catName, dbname, name);
    } catch (NoSuchObjectException e) {
      if (!ignoreUnknownTab) {
        throw e;
      }
      return;
    }
    HiveMetaHook hook = getHook(tbl);
    if (hook != null) {
      hook.preDropTable(tbl);
    }
    boolean success = false;
    try {
      drop_table_with_environment_context(catName, dbname, name, deleteData, envContext);
      if (hook != null) {
        hook.commitDropTable(tbl, deleteData || (envContext != null && "TRUE".equals(envContext.getProperties().get("ifPurge"))));
      }
      success=true;
    } catch (NoSuchObjectException e) {
      if (!ignoreUnknownTab) {
        throw e;
      }
    } finally {
      if (!success && (hook != null)) {
        hook.rollbackDropTable(tbl);
      }
    }
  }

主要就是获取了表对象,然后做了preDropTable预提交和commitDropTable实际的提交。这种2PC方式表面上还是很严谨。。。

可以发现HiveMetaHook这其实是个接口:

package org.apache.hadoop.hive.metastore;

/**
 * HiveMetaHook defines notification methods which are invoked as part
 * of transactions against the metastore, allowing external catalogs
 * such as HBase to be kept in sync with Hive's metastore.
 *
 *<p>
 *
 * Implementations can use {@link MetaStoreUtils#isExternalTable} to
 * distinguish external tables from managed tables.
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface HiveMetaHook {

  public String ALTER_TABLE_OPERATION_TYPE = "alterTableOpType";

  public List<String> allowedAlterTypes = ImmutableList.of("ADDPROPS", "DROPPROPS");

  /**
   * Called before a table definition is removed from the metastore
   * during DROP TABLE.
   *
   * @param table table definition
   */
  public void preDropTable(Table table)
    throws MetaException;

  /**
   * Called after failure removing a table definition from the metastore
   * during DROP TABLE.
   *
   * @param table table definition
   */
  public void rollbackDropTable(Table table)
    throws MetaException;

  /**
   * Called after successfully removing a table definition from the metastore
   * during DROP TABLE.
   *
   * @param table table definition
   *
   * @param deleteData whether to delete data as well; this should typically
   * be ignored in the case of an external table
   */
  public void commitDropTable(Table table, boolean deleteData)
    throws MetaException;
}

继承关系:

从Hive源码解读大数据开发为什么可以脱离SQL、Java、Scala

显然不是这个:

package org.apache.hadoop.hive.metastore;

public abstract class DefaultHiveMetaHook implements HiveMetaHook {
  /**
   * Called after successfully INSERT [OVERWRITE] statement is executed.
   * @param table table definition
   * @param overwrite true if it is INSERT OVERWRITE
   *
   * @throws MetaException
   */
  public abstract void commitInsertTable(Table table, boolean overwrite) throws MetaException;

  /**
   * called before commit insert method is called
   * @param table table definition
   * @param overwrite true if it is INSERT OVERWRITE
   *
   * @throws MetaException
   */
  public abstract void preInsertTable(Table table, boolean overwrite) throws MetaException;

  /**
   * called in case pre commit or commit insert fail.
   * @param table table definition
   * @param overwrite true if it is INSERT OVERWRITE
   *
   * @throws MetaException
   */
  public abstract void rollbackInsertTable(Table table, boolean overwrite) throws MetaException;
}

更不可能是这个test的Mock类:

/**
 * Mock class used for unit testing.
 * {@link org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2#testLockingOnInsertIntoNonNativeTables()}
 */
public class StorageHandlerMock extends DefaultStorageHandler {
 
}

所以是AccumuloStorageHandler这个类:

package org.apache.hadoop.hive.accumulo;

/**
 * Create table mapping to Accumulo for Hive. Handle predicate pushdown if necessary.
 */
public class AccumuloStorageHandler extends DefaultStorageHandler implements HiveMetaHook,
    HiveStoragePredicateHandler {
    }

但是:

  @Override
  public void preDropTable(Table table) throws MetaException {
    // do nothing
  }

这个do nothing!!!一言难尽。这种2PC方式表面上确实很严谨。。。

所以dropTable的入口是:

  @Override
  public void commitDropTable(Table table, boolean deleteData) throws MetaException {
    String tblName = getTableName(table);
    if (!isExternalTable(table)) {
      try {
        if (deleteData) {
          TableOperations tblOpts = connectionParams.getConnector().tableOperations();
          if (tblOpts.exists(tblName)) {
            tblOpts.delete(tblName);
          }
        }
      } catch (AccumuloException e) {
        throw new MetaException(StringUtils.stringifyException(e));
      } catch (AccumuloSecurityException e) {
        throw new MetaException(StringUtils.stringifyException(e));
      } catch (TableNotFoundException e) {
        throw new MetaException(StringUtils.stringifyException(e));
      }
    }
  }

按照最简单的内部表、需要删数据来看,实际上调用的是这个delete方法。而TableOperations又是个接口:

package org.apache.accumulo.core.client.admin;

/**
 * Provides a class for administering tables
 *
 */

public interface TableOperations {
  /**
   * Delete a table
   *
   * @param tableName
   *          the name of the table
   * @throws AccumuloException
   *           if a general error occurs
   * @throws AccumuloSecurityException
   *           if the user does not have permission
   * @throws TableNotFoundException
   *           if the table does not exist
   */
  void delete(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException;
}

继承关系简单:

从Hive源码解读大数据开发为什么可以脱离SQL、Java、Scala

当然就是这个实现类:

package org.apache.accumulo.core.client.impl;

public class TableOperationsImpl extends TableOperationsHelper {
  @Override
  public void delete(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
    checkArgument(tableName != null, "tableName is null");

    List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableName.getBytes(UTF_8)));
    Map<String,String> opts = new HashMap<>();

    try {
      doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_DELETE, args, opts);
    } catch (TableExistsException e) {
      // should not happen
      throw new AssertionError(e);
    }

  }
}

所以实际入口是这里的doTableFateOperation方法。枚举体的FateOperation.TABLE_DELETE=2。

找到doTableFateOperation方法的调用栈

跳转到:

  private void doTableFateOperation(String tableOrNamespaceName, Class<? extends Exception> namespaceNotFoundExceptionClass, FateOperation op,
      List<ByteBuffer> args, Map<String,String> opts) throws AccumuloSecurityException, AccumuloException, TableExistsException, TableNotFoundException {
    try {
      doFateOperation(op, args, opts, tableOrNamespaceName);
    } 
  }

继续跳转:

  String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,String> opts, String tableOrNamespaceName) throws AccumuloSecurityException,
      TableExistsException, TableNotFoundException, AccumuloException, NamespaceExistsException, NamespaceNotFoundException {
    return doFateOperation(op, args, opts, tableOrNamespaceName, true);
  }

继续跳转:

  String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,String> opts, String tableOrNamespaceName, boolean wait)
      throws AccumuloSecurityException, TableExistsException, TableNotFoundException, AccumuloException, NamespaceExistsException, NamespaceNotFoundException {
    Long opid = null;

    try {
      opid = beginFateOperation();
      executeFateOperation(opid, op, args, opts, !wait);
      if (!wait) {
        opid = null;
        return null;
      }
      String ret = waitForFateOperation(opid);
      return ret;
    } catch (ThriftSecurityException e) {
      switch (e.getCode()) {
        case TABLE_DOESNT_EXIST:
          throw new TableNotFoundException(null, tableOrNamespaceName, "Target table does not exist");
        case NAMESPACE_DOESNT_EXIST:
          throw new NamespaceNotFoundException(null, tableOrNamespaceName, "Target namespace does not exist");
        default:
          String tableInfo = Tables.getPrintableTableInfoFromName(context.getInstance(), tableOrNamespaceName);
          throw new AccumuloSecurityException(e.user, e.code, tableInfo, e);
      }
    } catch (ThriftTableOperationException e) {
      switch (e.getType()) {
        case EXISTS:
          throw new TableExistsException(e);
        case NOTFOUND:
          throw new TableNotFoundException(e);
        case NAMESPACE_EXISTS:
          throw new NamespaceExistsException(e);
        case NAMESPACE_NOTFOUND:
          throw new NamespaceNotFoundException(e);
        case OFFLINE:
          throw new TableOfflineException(context.getInstance(), Tables.getTableId(context.getInstance(), tableOrNamespaceName));
        default:
          throw new AccumuloException(e.description, e);
      }
    } catch (Exception e) {
      throw new AccumuloException(e.getMessage(), e);
    } finally {
      Tables.clearCache(context.getInstance());
      // always finish table op, even when exception
      if (opid != null)
        try {
          finishFateOperation(opid);
        } catch (Exception e) {
          log.warn(e.getMessage(), e);
        }
    }
  }

在这里可以发现一些奇怪的现象,居然catch了好多Thrift相关的Exception。继续跳转:

  // This method is for retrying in the case of network failures; anything else it passes to the caller to deal with
  private void executeFateOperation(long opid, FateOperation op, List<ByteBuffer> args, Map<String,String> opts, boolean autoCleanUp)
      throws ThriftSecurityException, TException, ThriftTableOperationException {
    while (true) {
      MasterClientService.Iface client = null;
      try {
        client = MasterClient.getConnectionWithRetry(context);
        client.executeFateOperation(Tracer.traceInfo(), context.rpcCreds(), opid, op, args, opts, autoCleanUp);
        break;
      } catch (TTransportException tte) {
        log.debug("Failed to call executeFateOperation(), retrying ... ", tte);
        UtilWaitThread.sleep(100);
      } finally {
        MasterClient.close(client);
      }
    }
  }

这个死循环里获取了Client对象。但是这个Client一看就没那么简单。。。调用的executeFateOperation方法还不能直接Idea点开,需要手动定位。

分析client对象

package org.apache.accumulo.core.client.impl;

import com.google.common.net.HostAndPort;

public class MasterClient {
  private static final Logger log = LoggerFactory.getLogger(MasterClient.class);

  public static MasterClientService.Client getConnectionWithRetry(ClientContext context) {
    while (true) {

      MasterClientService.Client result = getConnection(context);
      if (result != null)
        return result;
      UtilWaitThread.sleep(250);
    }
  }
}

实际上又是这个:

public static class Client extends FateService.Client implements Iface {
}

所以其父类是:

package org.apache.accumulo.core.master.thrift;

@SuppressWarnings({"unchecked", "serial", "rawtypes", "unused"}) public class FateService {
    public interface Iface {

    public void executeFateOperation(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, long opid, FateOperation op, List<ByteBuffer> arguments, Map<String,String> options, boolean autoClean) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException, org.apache.thrift.TException;

  }
    public void executeFateOperation(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, long opid, FateOperation op, List<ByteBuffer> arguments, Map<String,String> options, boolean autoClean) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException, org.apache.thrift.TException
{
  send_executeFateOperation(tinfo, credentials, opid, op, arguments, options, autoClean);
  recv_executeFateOperation();
}
    
    public static class Client extends org.apache.thrift.TServiceClient implements Iface {
    }
}

所以这种client对象才可以执行executeFateOperation方法。

查看executeFateOperation方法

分为2步,字面意思send_executeFateOperation方法发送了啥,recv_executeFateOperation方法又接收了啥。显然发送消息是需要重点关心的:

public void send_executeFateOperation(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, long opid, FateOperation op, List<ByteBuffer> arguments, Map<String,String> options, boolean autoClean) throws org.apache.thrift.TException
{
  executeFateOperation_args args = new executeFateOperation_args();
  args.setTinfo(tinfo);
  args.setCredentials(credentials);
  args.setOpid(opid);
  args.setOp(op);
  args.setArguments(arguments);
  args.setOptions(options);
  args.setAutoClean(autoClean);
  sendBase("executeFateOperation", args);
}

这个发送的方法把入参的表名、操作类型【Drop表】设置为sendBase方法的入参。

package org.apache.thrift;

/**
 * A TServiceClient is used to communicate with a TService implementation
 * across protocols and transports.
 */
public abstract class TServiceClient {

  protected void sendBase(String methodName, TBase<?,?> args) throws TException {
    sendBase(methodName, args, TMessageType.CALL);
  }


  private void sendBase(String methodName, TBase<?,?> args, byte type) throws TException {
    oprot_.writeMessageBegin(new TMessage(methodName, type, ++seqid_));
    args.write(oprot_);
    oprot_.writeMessageEnd();
    oprot_.getTransport().flush();
  }

}

其中:

package org.apache.thrift.protocol;

/**
 * Message type constants in the Thrift protocol.
 *
 */
public final class TMessageType {
  public static final byte CALL  = 1;
  public static final byte REPLY = 2;
  public static final byte EXCEPTION = 3;
  public static final byte ONEWAY = 4;
}

这个type传入的其实是1。用于构造方法:

package org.apache.thrift.protocol;

/**
 * Helper class that encapsulates struct metadata.
 *
 */
public final class TMessage {

  public TMessage(String n, byte t, int s) {
    name = n;
    type = t;
    seqid = s;
  }

  public final String name;
  public final byte type;
  public final int seqid;


}

另一个泛型TBase:

package org.apache.thrift;

import java.io.Serializable;

import org.apache.thrift.protocol.TProtocol;

/**
 * Generic base interface for generated Thrift objects.
 *
 */
public interface TBase<T extends TBase<?,?>, F extends TFieldIdEnum> extends Comparable<T>,  Serializable {

  /**
   * Reads the TObject from the given input protocol.
   *
   * @param iprot Input protocol
   */
  public void read(TProtocol iprot) throws TException;

  /**
   * Writes the objects out to the protocol
   *
   * @param oprot Output protocol
   */
  public void write(TProtocol oprot) throws TException;
}

按照注释可以知道write方法是把Java的对象输出给协议。

executeFateOperation_args类:

public static class executeFateOperation_args implements org.apache.thrift.TBase<executeFateOperation_args, executeFateOperation_args._Fields>, java.io.Serializable, Cloneable, Comparable<executeFateOperation_args>   {
public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
    }   
}

它的write方法:

package org.apache.thrift.scheme;

import org.apache.thrift.TBase;

public interface IScheme<T extends TBase> {

  public void read(org.apache.thrift.protocol.TProtocol iproto, T struct) throws org.apache.thrift.TException;

  public void write(org.apache.thrift.protocol.TProtocol oproto, T struct) throws org.apache.thrift.TException;

}

又是跳转到接口。。。

从Hive源码解读大数据开发为什么可以脱离SQL、Java、Scala

可以看到有2大抽象类。

getScheme拿到的:


package org.apache.thrift.protocol;

import java.nio.ByteBuffer;

import org.apache.thrift.TException;
import org.apache.thrift.scheme.IScheme;
import org.apache.thrift.scheme.StandardScheme;
import org.apache.thrift.transport.TTransport;

/**
 * Protocol interface definition.
 *
 */
public abstract class TProtocol {
  public Class<? extends IScheme> getScheme() {
    return StandardScheme.class;
  }
    
  public abstract void writeMessageBegin(TMessage message) throws TException;
}

显然get到的是StandardScheme类。而writeMessageBegin又是这个抽象类的抽象方法。

该抽象类的继承关系:

从Hive源码解读大数据开发为什么可以脱离SQL、Java、Scala

至此可以知道原生支持的协议有这些。最常用的当然就是二进制协议:TBinaryProtocol。

查看TBinaryProtocol二进制协议

package org.apache.thrift.protocol;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;

import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransport;

/**
 * Binary protocol implementation for thrift.
 *
 */
public class TBinaryProtocol extends TProtocol {
  public void writeMessageBegin(TMessage message) throws TException {
    if (strictWrite_) {
      int version = VERSION_1 | message.type;
      writeI32(version);
      writeString(message.name);
      writeI32(message.seqid);
    } else {
      writeString(message.name);
      writeByte(message.type);
      writeI32(message.seqid);
    }
  }
}

可以看出writeMessageBegin方法就是实际的写数据操作,把消息拆分后写出。

public void writeString(String str) throws TException {
  try {
    byte[] dat = str.getBytes("UTF-8");
    writeI32(dat.length);
    trans_.write(dat, 0, dat.length);
  } catch (UnsupportedEncodingException uex) {
    throw new TException("JVM DOES NOT SUPPORT UTF-8");
  }
}

以此为例。会去把数据作为字节数组写出:

package org.apache.thrift.transport;

import java.io.Closeable;

/**
 * Generic class that encapsulates the I/O layer. This is basically a thin
 * wrapper around the combined functionality of Java input/output streams.
 *
 */
public abstract class TTransport implements Closeable {

  /**
   * Reads up to len bytes into buffer buf, starting at offset off.
   *
   * @param buf Array to read into
   * @param off Index to start reading at
   * @param len Maximum number of bytes to read
   * @return The number of bytes actually read
   * @throws TTransportException if there was an error reading data
   */
  public abstract int read(byte[] buf, int off, int len)
    throws TTransportException;

  /**
   * Writes up to len bytes from the buffer.
   *
   * @param buf The output data buffer
   * @param off The offset to start writing from
   * @param len The number of bytes to write
   * @throws TTransportException if there was an error writing data
   */
  public abstract void write(byte[] buf, int off, int len)
    throws TTransportException;
}

这才是真正的传输对象。其继承关系:

从Hive源码解读大数据开发为什么可以脱离SQL、Java、Scala

搞过嵌入式开发的一定很熟悉这个Socket!!!就是IP+port的那个Socket。应用层与TCP/IP传输层间的抽象层。。。

查看TIOStreamTransport传输类

package org.apache.thrift.transport;

/**
 * This is the most commonly used base transport. It takes an InputStream
 * and an OutputStream and uses those to perform all transport operations.
 * This allows for compatibility with all the nice constructs Java already
 * has to provide a variety of types of streams.
 *
 */
public class TIOStreamTransport extends TTransport {

  public int read(byte[] buf, int off, int len) throws TTransportException {
    if (inputStream_ == null) {
      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot read from null inputStream");
    }
    int bytesRead;
    try {
      bytesRead = inputStream_.read(buf, off, len);
    } catch (IOException iox) {
      throw new TTransportException(TTransportException.UNKNOWN, iox);
    }
    if (bytesRead < 0) {
      throw new TTransportException(TTransportException.END_OF_FILE);
    }
    return bytesRead;
  }

  /**
   * Writes to the underlying output stream if not null.
   */
  public void write(byte[] buf, int off, int len) throws TTransportException {
    if (outputStream_ == null) {
      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to null outputStream");
    }
    try {
      outputStream_.write(buf, off, len);
    } catch (IOException iox) {
      throw new TTransportException(TTransportException.UNKNOWN, iox);
    }
  }

  /**
   * Flushes the underlying output stream if not null.
   */
  public void flush() throws TTransportException {
    if (outputStream_ == null) {
      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot flush null outputStream");
    }
    try {
      outputStream_.flush();
    } catch (IOException iox) {
      throw new TTransportException(TTransportException.UNKNOWN, iox);
    }
  }
}

其子类TSocket重写了IP、Port和init等。

小结Drop表的流程

至此可以得知Java用API操作Hive的原理,大致是这样:

顶层API【dropTable】→表操作实现类【TableOperationsImpl】的删表方法【doTableFateOperation】
→executeFateOperation方法→Client类的实例对象的executeFateOperation方法
→sendBase方法→executeFateOperation_args静态类的实例对象的write方法输出数据给传输协议TProtocol
→传输协议类的write方法具体把数据写出给ThriftServerThriftServer接收到消息后执行对应的操作

最出名的Thrift当然是Hive自己的Hive Server【Standalone】和Hive Server2,还有Spark的Thrift Server,借助它们,可以用JDBC或者Cli的方式去操作Hive。

但是!!!Thrift的初衷就是实现语言无关,毕竟底层只需要能把数据传输到位即可,数据传输并不是Java的特权。

其它语言的Thrift

从Hive源码解读大数据开发为什么可以脱离SQL、Java、Scala

service-rpc这个路径下,可以发现有cpp、Java、php、py,rb的包!!!

Hive的官方文档写的很明白:

https://cwiki.apache.org/confluence/display/Hive/HiveClient#HiveClient-ThriftJavaClient

The command line client currently only supports an embedded server. The JDBC and Thrift-Java clients support both embedded and standalone servers. Clients in other languages only support standalone servers.

命令行模式目前只能用于嵌入式服务,JDBC和Thrift-Java的Client可以支持嵌入式和独立部署的服务。别的语言的Client只支持在独立部署的服务使用。

Connection con = DriverManager.getConnection("jdbc:hive://localhost:10000/default", "", "");
Statement stmt = con.createStatement();

这种古代的Hive Server就是嵌入模式。。。

Connection con = DriverManager.getConnection("jdbc:hive2://localhost:10000/default", "", "");

这种Hive Server2就是独立部署模式。

官方还给出了python的案例:

#!/usr/bin/env python
 
import sys
 
from hive import ThriftHive
from hive.ttypes import HiveServerException
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
 
try:
    transport = TSocket.TSocket('localhost', 10000)
    transport = TTransport.TBufferedTransport(transport)
    protocol = TBinaryProtocol.TBinaryProtocol(transport)
 
    client = ThriftHive.Client(protocol)
    transport.open()
 
    client.execute("CREATE TABLE r(a STRING, b INT, c DOUBLE)")
    client.execute("LOAD TABLE LOCAL INPATH '/path' INTO TABLE r")
    client.execute("SELECT * FROM r")
    while (1):
      row = client.fetchOne()
      if (row == None):
        break
      print row
    client.execute("SELECT * FROM r")
    print client.fetchAll()
 
    transport.close()
 
except Thrift.TException, tx:
    print '%s' % (tx.message)

以及PHP的案例:

<?php
// set THRIFT_ROOT to php directory of the hive distribution
$GLOBALS['THRIFT_ROOT'] = '/lib/php/';
// load the required files for connecting to Hive
require_once $GLOBALS['THRIFT_ROOT'] . 'packages/hive_service/ThriftHive.php';
require_once $GLOBALS['THRIFT_ROOT'] . 'transport/TSocket.php';
require_once $GLOBALS['THRIFT_ROOT'] . 'protocol/TBinaryProtocol.php';
// Set up the transport/protocol/client
$transport = new TSocket('localhost', 10000);
$protocol = new TBinaryProtocol($transport);
$client = new ThriftHiveClient($protocol);
$transport->open();
 
// run queries, metadata calls etc
$client->execute('SELECT * from src');
var_dump($client->fetchAll());
$transport->close();

Ruby好歹也给了个参考: https://github.com/forward3d/rbhive

至于Java、C++就不给Client的案例了。。。也是很容易理解。。。毕竟Java有JDBC和高层API,一般不会有人去用底层API了。

如果是做平台开发或者组件开发这种真正用得上底层API的情况,地方支援*发型的*ava程序猿,查API填参数让程序跑起来,这点工程能力还是有的。

至于C++程序猿强悍的造*功力,没准像临摹Kafka的Red Panda那样,哪天也照猫画虎折腾出个C++版的Hive。。。

既然可以通过Thrift实现语言无关,那么调用组件就不必局限于Java或者Scala。而造*从来也不是Java和Scala的专利。

这就是为神马大数据开发可以脱离Java和Scala。

尾言

大数据并不是趋向SQL化,只是为了扩大受众群体,让广大技术水平不高的业务开发人员也能吃上大数据技术的红利。且SQL在处理结构化表的特定场景下开发效率更高。
但是。。。哪怕是这种极度细分的场景,SQL还是有很多缺陷,虽然API的方式也没有好到哪里去。

造*和组件调用,就更是语言无关的事情了。。。编程语言往往只是个表达思想的载体,技术栈足够全面才有做选择的权力。

转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/129742904
从Hive源码解读大数据开发为什么可以脱离SQL、Java、Scala