Storm读取Mysql数据库写入hdfs------针对数据量较少的数据

时间:2022-06-28 12:21:32

1、spout:

package com.TestStorm;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Reader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Map;

import org.apache.storm.generated.DistributedRPCInvocations.Processor.result;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class TestSpout extends BaseRichSpout {

String url = "jdbc:mysql://192.168.0.44:3306/mysql";
String username = "root";
String password = "mysql";

private ResultSet rs;
private SpoutOutputCollector collector;

public void nextTuple() {
try {
String str = "";
if (rs.next()) {
str = rs.getString(1) + " " + rs.getString(2) + "\n";
System.out.print(str);
collector.emit(new Values(str));
}
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) {

try {
String driver = "com.mysql.jdbc.Driver";
Class.forName(driver);
Connection conn = DriverManager.getConnection("jdbc:mysql://192.168.0.44:3306/mysql", "root", "mysql");
PreparedStatement state = conn.prepareStatement("select * from storm");
rs = state.executeQuery();

this.collector = arg2;
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

public void declareOutputFields(OutputFieldsDeclarer arg0) {

arg0.declare(new Fields("str"));
}
}


2、bolt:

package com.TestStorm;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

public class TestBolt extends BaseBasicBolt {

private String str;
private FSDataOutputStream fo;

public void execute(Tuple arg0, BasicOutputCollector arg1) {

String str = String.valueOf(arg0.getValueByField("str"));
System.out.println(str.length());

try {
Configuration conf = new Configuration();
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
conf.set("hadoop.job.ugi", "hadoop");
conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");

String hdfs = "/user/input/a.txt";
FileSystem fs = FileSystem.get(URI.create("hdfs://h11:9000/user"), conf);

if (fs.exists(new Path(hdfs))) {
fo = fs.append(new Path(hdfs));
} else {

fo = fs.create(new Path(hdfs));
}

if ((str.length()) > -1) {
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(str.getBytes()));
IOUtils.copyBytes(dis, fo, 2046, true);
//用字符串 创建一个字节输入流,在外面再封装一层DataInputStream
//dis-输入源
//fo-输出区
//2046-缓冲区大小
//true-是否关闭数据流
}

fs.close();
} catch (IllegalArgumentException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

public void prepare(Map stormConf, TopologyContext context) {


}

public void declareOutputFields(OutputFieldsDeclarer arg0) {

arg0.declare(new Fields("str"));
}
}


3、Job:

package com.TestStorm;

import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;

public class Job {

public static void main(String[] args)
throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("Testspout", new TestSpout(), 1);
builder.setBolt("Testbolt", new TestBolt(), 1).shuffleGrouping("Testspout");
Config config = new Config();
config.setDebug(false);
config.setNumWorkers(2);

StormSubmitter.submitTopology(args[0], config, builder.createTopology());

}

}