过滤文本文档中的数据并插入Cassandra数据库

时间:2023-03-09 13:30:32
过滤文本文档中的数据并插入Cassandra数据库

代码如下:

package com.locationdataprocess;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement; //过滤数据中的GPS数据
public class FilterGPSData {
static Connection con=null;
public static void insertGPS(){
String fileName[]={"Zhengye_DriveTesting_08-18.09-07.txt","Zhengye_DriveTesting_08-18.09-13.txt",
"Zhengye_DriveTesting_08-18.17-40.txt","Zhengye_DriveTesting_08-18.17-48.txt",
"Zhengye_DriveTesting_08-19.17-19.txt","Zhengye_DriveTesting_08-20.09-33.txt",
"Zhengye_DriveTesting_08-20.18-05.txt","Zhengye_DriveTesting_08-19.09-08.txt",
"Zhengye_DriveTesting_08-21.18-07.txt","Zhengye_DriveTesting_08-21.18-07.txt"
};
for (int k = 0; k < 10; k++) {
File file = new File("H:\\项目数据\\Zhengye_Drive_Testing_Data\\"
+ fileName[k]);
try {
FileInputStream fis = new FileInputStream(file);
InputStreamReader isr = new InputStreamReader(fis);
BufferedReader br = new BufferedReader(isr);
String line = null;
while ((line = br.readLine()) != null) {
String s = line.trim();
// String pre[] = new String[8];
// pre[0] = "GPS Time";
// pre[1] = "Longitude";
// pre[2] = "Latitude";
// pre[3] = "Altitude";
// pre[4] = "Heading";
// pre[5] = "Speed";
// pre[6] = "Source";
// pre[7] = "Satellites";
// for(int i=0;i<pre.length;i++){
// if (s.startsWith(pre[i])) {
// System.out.println(s);
// break;
// }
// } if (s.startsWith("GPS Time")) {
try {
String data[] = new String[8];
data[0] = fileName[k].substring(21, 32)+" "+s.split("=")[1].trim();
// System.out.println(s);
int i = 0;
while (i < 7 && (line = br.readLine()) != null) {
if (!line.equals("")) {
data[i + 1] = line.split("=")[1].trim();
// System.out.println(line.trim());
i++;
}
}
String insert = "insert into gpsdata.gps(gps_time,longitude,latitude,altitude,heading,speed,source,satellites)"
+ " values(?,?,?,?,?,?,?,?)";
PreparedStatement psta = con
.prepareStatement(insert);
for (int j = 0; j < 8; j++) {
psta.setString(j + 1, data[j]);
}
psta.executeUpdate();
psta.close();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
System.out.println(fileName[k]+"插入成功!");
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} //连接数据库
public static void connection(){
try {
Class.forName("org.apache.cassandra.cql.jdbc.CassandraDriver");
con = DriverManager
.getConnection("jdbc:cassandra://127.0.0.1:9160/gpsdata");
String sql = "CREATE TABLE IF NOT EXISTS gpsdata.gps (gps_time varchar,longitude varchar,latitude varchar,altitude varchar,heading varchar,speed varchar,source varchar,satellites varchar,PRIMARY KEY (gps_time))";
Statement sta = con.createStatement();
sta.execute(sql);
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (SQLException e) {
System.out.println("连接断开!");
e.printStackTrace();
}
} //关闭连接
public static void disConnection(){
if(con!=null)
try {
con.close();
System.out.println("数据库连接正常关闭!");
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} public static void main(String[] args) {
connection();
insertGPS();
disConnection();
} }