文章一:ETL和Kettle简单介绍
文章二:Kattle API 实战
前言:
为什么要用Kettle和KETTLE JAVA API?
Kettle是什么?kettle:是一个开源ETL工具。kettle提供了基于java的图形化界面,使用非常方便,kettle的ETL工具集合也比較多,经常使用的ETL工具都包括了。
为什么使用KETTLE JAVA API:就像kettle文档所说:KETTLE JAVA API : Program your own Kettle transformation,kettle提供了基于 JAVA的脚步编写功能,能够灵活地自己定义ETL过程,使自行定制、批量处理等成为可能,这才是一个程序猿须要做的工作,而不仅是象使用word一样操作 kettle用户界面。
KETTLE JAVA API 实战操作记录:
一、 搭建好开发环境 :到http://www.kettle.be站点下载kettle的源代码包,加压缩,比如解压缩到d:/kettle文件夹
二、 打开eclipse,新建一个项目,要使用jdk1.5.0,由于kettle的要使用System.getenv(),仅仅有在jdk1.5.0才被支持。提起getenv(),好像有一段几起几落的记录,曾一度被抛弃,如今又被jdk1.5支持了.
三、 建一个class : TransBuilder.java,能够把d:/kettle/ extra/TransBuilder.java的内容原样复制到你的TransBuilder.java里。
四、 根据须要编辑源代码。并须要对原程序进行例如以下改动,在头部添加:
import org.eclipse.swt.dnd.Transfer;
//这个包被遗漏了,原始位置kettle根文件夹/libswt/win32/swt.jar
//add by chq(www.chq.name) on 2006.07.20
(后来发现,不必加这个引用,由于编译时不须要)
五、 编译准备,在eclipse中添加jar包,主要包括(主要根据extra/TransBuilder.bat):
/lib/kettle.jar
/libext/CacheDB.jar
/libext/SQLBaseJDBC.jar
/libext/activation.jar
/libext/db2jcc.jar
/libext/db2jcc_license_c.jar
/libext/edtftpj-1.4.5.jar
/libext/firebirdsql-full.jar
/libext/firebirdsql.jar
/libext/gis-shape.jar
/libext/hsqldb.jar
/libext/ifxjdbc.jar
/libext/javadbf.jar
/libext/jconn2.jar
/libext/js.jar
/libext/jt400.jar
/libext/jtds-1.1.jar
/libext/jxl.jar
/libext/ktable.jar
/libext/log4j-1.2.8.jar
/libext/mail.jar
/libext/mysql-connector-java-3.1.7-bin.jar
/libext/ojdbc14.jar
/libext/orai18n.jar
/libext/pg74.215.jdbc3.jar
/libext/edbc.jar
(注意 :以下这个包被遗漏了,要加上。原始位置kettle根文件夹/libswt/win32/swt.jar)
/libswt/win32/swt.jar
六、 编译成功后,准备执行
为使程序不必登陆就能够执行,须要设置环境署文件:kettle.properties,位置在用户文件夹里,一般在 /Documents and Settings/用户/.kettle/,主要内容例如以下:
KETTLE_REPOSITORY=kettle@m80
KETTLE_USER=admin
KETTLE_PASSWORD=passwd
七、 好了,如今能够执行一下了,看看数据是不是已经复制到目标表了。
以下是执行时的控制台信息输出:
以下是自己主动生成的Transformation :
以下为改动后的程序源代码:
--------------------------------------------------------------------------------
- package name.chq.test;
- import java.io.DataOutputStream;
- import java.io.File;
- import java.io.FileOutputStream;
- import be.ibridge.kettle.core.Const;
- import be.ibridge.kettle.core.LogWriter;
- import be.ibridge.kettle.core.NotePadMeta;
- import be.ibridge.kettle.core.database.Database;
- import be.ibridge.kettle.core.database.DatabaseMeta;
- import be.ibridge.kettle.core.exception.KettleException;
- import be.ibridge.kettle.core.util.EnvUtil;
- import be.ibridge.kettle.trans.StepLoader;
- import be.ibridge.kettle.trans.Trans;
- import be.ibridge.kettle.trans.TransHopMeta;
- import be.ibridge.kettle.trans.TransMeta;
- import be.ibridge.kettle.trans.step.StepMeta;
- import be.ibridge.kettle.trans.step.StepMetaInterface;
- import be.ibridge.kettle.trans.step.selectvalues.SelectValuesMeta;
- import be.ibridge.kettle.trans.step.tableinput.TableInputMeta;
- import be.ibridge.kettle.trans.step.tableoutput.TableOutputMeta;
- //这个包被遗漏了,原始位置kettle根文件夹/libswt/win32/swt.jar
- //add by chq([link=http://www.chq.name]www.chq.name[/link]) on 2006.07.20
- //import org.eclipse.swt.dnd.Transfer;
- /**
- * Class created to demonstrate the creation of transformations on-the-fly.
- *
- * @author Matt
- *
- */
- public class TransBuilder
- {
- public static final String[] databasesXML = {
- "<?xml version=/"1.0/" encoding=/"UTF-8/"?>" +
- "<connection>" +
- "<name>target</name>" +
- "<server>192.168.17.35</server>" +
- "<type>ORACLE</type>" +
- "<access>Native</access>" +
- "<database>test1</database>" +
- "<port>1521</port>" +
- "<username>testuser</username>" +
- "<password>pwd</password>" +
- "<servername/>" +
- "<data_tablespace/>" +
- "<index_tablespace/>" +
- "<attributes>" +
- "<attribute><code>EXTRA_OPTION_MYSQL.defaultFetchSize</code><attribute>500</attribute></attribute>" +
- "<attribute><code>EXTRA_OPTION_MYSQL.useCursorFetch</code><attribute>true</attribute></attribute>" +
- "<attribute><code>PORT_NUMBER</code><attribute>1521</attribute></attribute>" +
- "</attributes>" +
- "</connection>" ,
- "<?xml version=/"1.0/" encoding=/"UTF-8/"?>" +
- "<connection>" +
- "<name>source</name>" +
- "<server>192.168.16.12</server>" +
- "<type>ORACLE</type>" +
- "<access>Native</access>" +
- "<database>test2</database>" +
- "<port>1521</port>" +
- "<username>testuser</username>" +
- "<password>pwd2</password>" +
- "<servername/>" +
- "<data_tablespace/>" +
- "<index_tablespace/>" +
- "<attributes>" +
- "<attribute><code>EXTRA_OPTION_MYSQL.defaultFetchSize</code><attribute>500</attribute></attribute>" +
- "<attribute><code>EXTRA_OPTION_MYSQL.useCursorFetch</code><attribute>true</attribute></attribute>" +
- "<attribute><code>PORT_NUMBER</code><attribute>1521</attribute></attribute>" +
- "</attributes>" +
- "</connection>"
- };
- /**
- * Creates a new Transformation using input parameters such as the tablename to read from.
- * @param transformationName The name of the transformation
- * @param sourceDatabaseName The name of the database to read from
- * @param sourceTableName The name of the table to read from
- * @param sourceFields The field names we want to read from the source table
- * @param targetDatabaseName The name of the target database
- * @param targetTableName The name of the target table we want to write to
- * @param targetFields The names of the fields in the target table (same number of fields as sourceFields)
- * @return A new transformation
- * @throws KettleException In the rare case something goes wrong
- */
- public static final TransMeta buildCopyTable(
- String transformationName,String sourceDatabaseName, String sourceTableName,
- String[] sourceFields, String targetDatabaseName, String targetTableName,
- String[] targetFields)
- throws KettleException
- {
- LogWriter log = LogWriter.getInstance();
- EnvUtil.environmentInit();
- try
- {
- //
- // Create a new transformation...
- //
- TransMeta transMeta = new TransMeta();
- transMeta.setName(transformationName);
- // Add the database connections
- for (int i=0;i<databasesXML.length;i++)
- {
- DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]);
- transMeta.addDatabase(databaseMeta);
- }
- DatabaseMeta sourceDBInfo = transMeta.findDatabase(sourceDatabaseName);
- DatabaseMeta targetDBInfo = transMeta.findDatabase(targetDatabaseName);
- //
- // Add a note
- //
- String note = "Reads information from table [" + sourceTableName+ "] on database ["
- + sourceDBInfo + "]" + Const.CR;
- note += "After that, it writes the information to table [" + targetTableName + "] on database ["
- + targetDBInfo + "]";
- NotePadMeta ni = new NotePadMeta(note, 150, 10, -1, -1);
- transMeta.addNote(ni);
- //
- // create the source step...
- //
- String fromstepname = "read from [" + sourceTableName + "]";
- TableInputMeta tii = new TableInputMeta();
- tii.setDatabaseMeta(sourceDBInfo);
- String selectSQL = "SELECT "+Const.CR;
- for (int i=0;i<sourceFields.length;i++)
- {
- /* modi by chq(www.chq.name): use * to replace the fields,经分析,下面语句能够处理‘*‘ */
- if (i>0)
- selectSQL+=", ";
- else selectSQL+=" ";
- selectSQL+=sourceFields[i]+Const.CR;
- }
- selectSQL+="FROM "+sourceTableName;
- tii.setSQL(selectSQL);
- StepLoader steploader = StepLoader.getInstance();
- String fromstepid = steploader.getStepPluginID(tii);
- StepMeta fromstep = new StepMeta(log, fromstepid, fromstepname, (StepMetaInterface) tii);
- fromstep.setLocation(150, 100);
- fromstep.setDraw(true);
- fromstep.setDescription("Reads information from table [" + sourceTableName
- + "] on database [" + sourceDBInfo + "]");
- transMeta.addStep(fromstep);
- //
- // add logic to rename fields
- // Use metadata logic in SelectValues, use SelectValueInfo...
- //
- /* 不必改名或映射 add by chq(www.chq.name) on 2006.07.20
- SelectValuesMeta svi = new SelectValuesMeta();
- svi.allocate(0, 0, sourceFields.length);
- for (int i = 0; i < sourceFields.length; i++)
- {
- svi.getMetaName()[i] = sourceFields[i];
- svi.getMetaRename()[i] = targetFields[i];
- }
- String selstepname = "Rename field names";
- String selstepid = steploader.getStepPluginID(svi);
- StepMeta selstep = new StepMeta(log, selstepid, selstepname, (StepMetaInterface) svi);
- selstep.setLocation(350, 100);
- selstep.setDraw(true);
- selstep.setDescription("Rename field names");
- transMeta.addStep(selstep);
- TransHopMeta shi = new TransHopMeta(fromstep, selstep);
- transMeta.addTransHop(shi);
- fromstep = selstep; //设定了新的起点 by chq([link=http://www.chq.name]www.chq.name[/link]) on 2006.07.20
- */
- //
- // Create the target step...
- //
- //
- // Add the TableOutputMeta step...
- //
- String tostepname = "write to [" + targetTableName + "]";
- TableOutputMeta toi = new TableOutputMeta();
- toi.setDatabase(targetDBInfo);
- toi.setTablename(targetTableName);
- toi.setCommitSize(200);
- toi.setTruncateTable(true);
- String tostepid = steploader.getStepPluginID(toi);
- StepMeta tostep = new StepMeta(log, tostepid, tostepname, (StepMetaInterface) toi);
- tostep.setLocation(550, 100);
- tostep.setDraw(true);
- tostep.setDescription("Write information to table [" + targetTableName + "] on database [" + targetDBInfo + "]");
- transMeta.addStep(tostep);
- //
- // Add a hop between the two steps...
- //
- TransHopMeta hi = new TransHopMeta(fromstep, tostep);
- transMeta.addTransHop(hi);
- // OK, if we're still here: overwrite the current transformation...
- return transMeta;
- }
- catch (Exception e)
- {
- throw new KettleException("An unexpected error occurred creating the new transformation", e);
- }
- }
- /**
- * 1) create a new transformation
- * 2) save the transformation as XML file
- * 3) generate the SQL for the target table
- * 4) Execute the transformation
- * 5) drop the target table to make this program repeatable
- *
- * @param args
- */
- public static void main(String[] args) throws Exception
- {
- EnvUtil.environmentInit();
- // Init the logging...
- LogWriter log = LogWriter.getInstance("TransBuilder.log", true, LogWriter.LOG_LEVEL_DETAILED);
- // Load the Kettle steps & plugins
- StepLoader stloader = StepLoader.getInstance();
- if (!stloader.read())
- {
- log.logError("TransBuilder", "Error loading Kettle steps & plugins... stopping now!");
- return;
- }
- // The parameters we want, optionally this can be
- String fileName = "NewTrans.xml";
- String transformationName = "Test Transformation";
- String sourceDatabaseName = "source";
- String sourceTableName = "testuser.source_table";
- String sourceFields[] = {
- "*"
- };
- String targetDatabaseName = "target";
- String targetTableName = "testuser.target_table";
- String targetFields[] = {
- "*"
- };
- // Generate the transformation.
- TransMeta transMeta = TransBuilder.buildCopyTable(
- transformationName,
- sourceDatabaseName,
- sourceTableName,
- sourceFields,
- targetDatabaseName,
- targetTableName,
- targetFields
- );
- // Save it as a file:
- String xml = transMeta.getXML();
- DataOutputStream dos = new DataOutputStream(new FileOutputStream(new File(fileName)));
- dos.write(xml.getBytes("UTF-8"));
- dos.close();
- System.out.println("Saved transformation to file: "+fileName);
- // OK, What's the SQL we need to execute to generate the target table?
- String sql = transMeta.getSQLStatementsString();
- // Execute the SQL on the target table:
- Database targetDatabase = new Database(transMeta.findDatabase(targetDatabaseName));
- targetDatabase.connect();
- targetDatabase.execStatements(sql);
- // Now execute the transformation...
- Trans trans = new Trans(log, transMeta);
- trans.execute(null);
- trans.waitUntilFinished();
- // For testing/repeatability, we drop the target table again
- /* modi by chq([link=http://www.chq.name]www.chq.name[/link]) on 2006.07.20 不必删表
- //targetDatabase.execStatement("drop table "+targetTableName);
- targetDatabase.disconnect();
- }
文章三:Kattle JAVA API
http://wiki.pentaho.com/display/EAI/Pentaho+Data+Integration+-+Java+API+Examples
Program your own Kettle transformation
The example described below performs the following actions:
- create a new transformation
- save the transformation as XML file
- generate the SQL for the target table
- Execute the transformation
- drop the target table to make this program repeatable
The complete source code for the example is distributed in the distribution zip file. You can find this file in the downloads section. (Kettle version 2.1.3 or higher)
After unzipping this file, you can find the source code in the 揟ransBuilder.java� file in the 揺xtra� directory.
The Kettle Java API for Kettle is found here: Kettle Java API
// Generate the transformation.
TransMeta transMeta = TransBuilder.buildCopyTable(
transformationName,
sourceDatabaseName,
sourceTableName,
sourceFields,
targetDatabaseName,
targetTableName,
targetFields
);// Save it as a file:
String xml = transMeta.getXML();
DataOutputStream dos = new DataOutputStream(new FileOutputStream(new File(fileName)));
dos.write(xml.getBytes("UTF-8"));
dos.close();
System.out.println("Saved transformation to file: "+fileName);// OK, What's the SQL we need to execute to generate the target table?
String sql = transMeta.getSQLStatementsString();// Execute the SQL on the target table:
Database targetDatabase = new Database(transMeta.findDatabase(targetDatabaseName));
targetDatabase.connect();
targetDatabase.execStatements(sql);// Now execute the transformation...
Trans trans = new Trans(log, transMeta);
trans.execute(null);
trans.waitUntilFinished();// For testing/repeatability, we drop the target table again
targetDatabase.execStatement("drop table "+targetTableName);
targetDatabase.disconnect();
Below is the source code for the method that creates the transformation:
/**
* Creates a new Transformation using input parameters such as the tablename to read from.
* @param transformationName The name of the transformation
* @param sourceDatabaseName The name of the database to read from
* @param sourceTableName The name of the table to read from
* @param sourceFields The field names we want to read from the source table
* @param targetDatabaseName The name of the target database
* @param targetTableName The name of the target table we want to write to
* @param targetFields The names of the fields in the target table (same number of fields as sourceFields)
* @return A new transformation metadata object
* @throws KettleException In the rare case something goes wrong
*/public static final TransMeta buildCopyTable(
String transformationName,
String sourceDatabaseName,
String sourceTableName,
String[] sourceFields,
String targetDatabaseName,
String targetTableName,
String[] targetFields) throws KettleException
{LogWriter log = LogWriter.getInstance();
try
{//
// Create a new transformation...
//
TransMeta transMeta = new TransMeta();
transMeta.setName(transformationName);// Add the database connections
for (int i=0;i<databasesXML.length;i++)
{
DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]);
transMeta.addDatabase(databaseMeta);
}DatabaseMeta sourceDBInfo = transMeta.findDatabase(sourceDatabaseName);
DatabaseMeta targetDBInfo = transMeta.findDatabase(targetDatabaseName);//
// Add a note
//String note = "Reads information from table [" + sourceTableName+ "] on database [" + sourceDBInfo + "]" + Const.CR;
note += "After that, it writes the information to table [" + targetTableName + "] on database [" + targetDBInfo + "]";
NotePadMeta ni = new NotePadMeta(note, 150, 10, -1, -1);
transMeta.addNote(ni);//
// create the source step...
//String fromstepname = "read from [" + sourceTableName + "]";
TableInputMeta tii = new TableInputMeta();
tii.setDatabaseMeta(sourceDBInfo);
String selectSQL = "SELECT "+Const.CR;
for (int i=0;i<sourceFields.length;i++)
{
if (i>0) selectSQL+=", "; else selectSQL+=" ";
selectSQL+=sourceFields[i]+Const.CR;
}
selectSQL+="FROM "+sourceTableName;
tii.setSQL(selectSQL);StepLoader steploader = StepLoader.getInstance();
String fromstepid = steploader.getStepPluginID(tii);
StepMeta fromstep = new StepMeta(log, fromstepid, fromstepname, (StepMetaInterface) tii);
fromstep.setLocation(150, 100);
fromstep.setDraw(true);
fromstep.setDescription("Reads information from table [" + sourceTableName + "] on database [" + sourceDBInfo + "]");
transMeta.addStep(fromstep);//
// add logic to rename fields
// Use metadata logic in SelectValues, use SelectValueInfo...
//SelectValuesMeta svi = new SelectValuesMeta();
svi.allocate(0, 0, sourceFields.length);
for (int i = 0; i < sourceFields.length; i++)
{svi.getMetaName()[i] = sourceFields[i];
svi.getMetaRename()[i] = targetFields[i];}
String selstepname = "Rename field names";
String selstepid = steploader.getStepPluginID(svi);
StepMeta selstep = new StepMeta(log, selstepid, selstepname, (StepMetaInterface) svi);
selstep.setLocation(350, 100);
selstep.setDraw(true);
selstep.setDescription("Rename field names");
transMeta.addStep(selstep);TransHopMeta shi = new TransHopMeta(fromstep, selstep);
transMeta.addTransHop(shi);
fromstep = selstep;//
// Create the target step...
////
// Add the TableOutputMeta step...
//String tostepname = "write to [" + targetTableName + "]";
TableOutputMeta toi = new TableOutputMeta();
toi.setDatabase(targetDBInfo);
toi.setTablename(targetTableName);
toi.setCommitSize(200);
toi.setTruncateTable(true);String tostepid = steploader.getStepPluginID(toi);
StepMeta tostep = new StepMeta(log, tostepid, tostepname, (StepMetaInterface) toi);
tostep.setLocation(550, 100);tostep.setDraw(true);
tostep.setDescription("Write information to table [" + targetTableName + "] on database [" + targetDBInfo + "]");
transMeta.addStep(tostep);//
// Add a hop between the two steps...
//TransHopMeta hi = new TransHopMeta(fromstep, tostep);
transMeta.addTransHop(hi);// The transformation is complete, return it...
return transMeta;
}
catch (Exception e)
{throw new KettleException("An unexpected error occurred creating the new transformation", e);
}
}