java项目执行kettle文件+JNDI连接配置+配置变量

时间:2024-03-16 19:26:18

Java执行kettle文件

添加依赖:

<dependency>
    <groupId>pentaho-kettle</groupId>
    <artifactId>kettle-engine</artifactId>
    <version>7.0.0.0-25</version>
</dependency>
<dependency>
    <groupId>pentaho-kettle</groupId>
    <artifactId>kettle-core</artifactId>
    <version>7.0.0.0-25</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.9.6</version>
</dependency>

工具类

package com.example.kettledemo.util;

import java.io.File;
import java.util.Map;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.util.EnvUtil;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
/**
 * @Package: com.example.kettledemo.util
 * @Since: 2020/5/13 14:26
 * @Version: V1.0
 */
public class kettleUtil {

        /**
         * 调用trans文件
         *
         * @param transFileName
         * @throws Exception
         */
        public static void callNativeTrans(String transFileName) throws Exception {
            callNativeTransWithParams(null, transFileName);
        }

        /**
         * 调用trans文件 带参数的
         *
         * @param params
         * @param transFileName
         * @throws Exception
         */
        public static void callNativeTransWithParams(String[] params, String transFileName) throws Exception {
            // 初始化
            KettleEnvironment.init();
            EnvUtil.environmentInit();
            TransMeta transMeta = new TransMeta(transFileName);
            //转换
            Trans trans = new Trans(transMeta);
            //执行
            trans.execute(params);
            //等待结束
            trans.waitUntilFinished();
            //抛出异常
            if (trans.getErrors() > 0) {
                throw new Exception("There are errors during transformation exception!(传输过程中发生异常)");
            }
        }

        /**
         * 调用job文件
         *
         * @param jobName
         * @throws Exception
         */
        public static void callNativeJob(String jobName, Map map) throws Exception {
            // 初始化
            String path = Thread.currentThread().getContextClassLoader().getResource("").getPath();
            File file = new File(path+"/simple-jndi");// path是jdbc.prtoperties上层文件夹路径
            String sysPath = file.getCanonicalPath();
            Const.JNDI_DIRECTORY = sysPath;
            KettleEnvironment.init();

            JobMeta jobMeta = new JobMeta(jobName, null);
            Job job = new Job(null, jobMeta);
            //向Job 脚本传递变量,脚本中获取参数值:${变量名}
            job.injectVariables(map);
            job.start();
            job.waitUntilFinished();
            if (job.getErrors() > 0) {
                throw new Exception("There are errors during job exception!(执行job发生异常)");
            }
        }
}

XXX.kjb中配置的变量:

java项目执行kettle文件+JNDI连接配置+配置变量

测试:执行kettle任务

filePath为kettle任务文件路径,param为变量

@GetMapping("test")
public void test(String filePath, String param) throws Exception {
    HashMap<Object, Object> map = Maps.newHashMap();
    map.put("param", param);
    kettleUtil.callNativeJob(filePath, map);
}

 

配置JNDI

1.将JNDI配置文件(data-integration/simple-jndi/jdbc.properties)放在一个指定的目录下,如项目根目录下simple-jndi/jdbc.properties:

项目根目录下simple-jndi/jdbc.properties:

java项目执行kettle文件+JNDI连接配置+配置变量

2.添加simple-jndi的jar依赖。

<dependency>
    <groupId>simple-jndi</groupId>
    <artifactId>simple-jndi</artifactId>
    <version>0.11.4.1</version>
</dependency>

3.调用kettle的环境初始化方法KettleEnvironment.init(true),加载JNDI

String path = Thread.currentThread().getContextClassLoader().getResource("").getPath();
File file = new File(path+"/simple-jndi");// path是jdbc.prtoperties上层文件夹路径
String sysPath = file.getCanonicalPath();
Const.JNDI_DIRECTORY = sysPath;
KettleEnvironment.init();