SparkConf sparkConf = new SparkConf()
.setMaster("local").setAppName("ClzMap"); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); JavaRDD<String> line_str = javaSparkContext.textFile("C:\\Users\\Administrator\\Desktop\\stud.txt"); JavaRDD<KK> line_kk = line_str.map(new Function<String, KK>() {
@Override
public KK call(String s) throws Exception {
String attr[] = s.split(",");
KK k = new KK();
k.setName(attr[0]);
k.setAge(Integer.parseInt(attr[1]));
k.setYear(attr[2]);
return k;
}
}); SQLContext sqlContext = new SQLContext(javaSparkContext); DataFrame df = sqlContext.createDataFrame(line_kk, KK.class);//反射的方式 //在这理由两种方法进行数据过滤(1:使用DataFrame的javaApi,2:使用临时表的sql查询方式) //-------------------------第1种-----------------------
DataFrame df_filter = df.filter(df.col("age").geq(19));
//-------------------------end----------------------- //-------------------------第2种-----------------------
DataFrame df_filter1 = df.filter(df.col("age").geq(19));
df_filter1.registerTempTable("KK");//创建一个临时表,参数为表名
sqlContext.sql("select * from KK where age>=19");
//-------------------------end----------------------- JavaRDD<Row> df_row = df_filter1.javaRDD();//将DataFrame转化成RDD JavaRDD<KK> df_kk = df_row.map(new Function<Row, KK>() {
@Override
public KK call(Row row) throws Exception {//row的顺序和原来的文件输入可能有不同
KK k = new KK();
k.setAge(row.getInt(0));
k.setName(row.getString(1));
k.setYear(row.getString(2));
return k;
}
}); df_kk.foreach(new VoidFunction<KK>() {
@Override
public void call(KK kk) throws Exception {
System.out.println("getAge->" + kk.getAge());
System.out.println("getYear->" + kk.getYear());
System.out.println("getName->" + kk.getName());
System.out.println("=============");
}
});
文本文件的内容:
由上述代码可以看出,KK是一个实体类型并且可序列化(Serializable)!
zzq,19,2016
yyu,18,2016
uui,90,2015 ps:如果在运行期才能确定类型,则需要使用StructType动态构建类型,代码如下:
//构建一个动态类型
List<StructField> structFieldList = new ArrayList<StructField>();
structFieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true));//第三个参数决定这个属性是否可以为null
structFieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
structFieldList.add(DataTypes.createStructField("year", DataTypes.StringType, true));
StructType structType = DataTypes.createStructType(structFieldList); SQLContext sqlContext = new SQLContext(javaSparkContext); DataFrame df = sqlContext.createDataFrame(line_row, structType);