Flink1.11 脚本提交任务yarn以及java代码提交任务到yarn执行实践

时间:2024-03-23 10:15:57

1,我们先看官网,一起从官网看起

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/deployment/yarn_setup.html

Flink1.11 脚本提交任务yarn以及java代码提交任务到yarn执行实践

 

2,看到上图,我就忽略第一个模式了,在正式生产环境我们一般推崇第二种模式,或者第三种模式

 

3,查看执行参数命令

./bin/flink run --help 

Action "run" compiles and runs a program.

  Syntax: run [OPTIONS] <jar-file> <arguments>
  "run" action options:
     -c,--class <classname>               Class with the program entry point
                                          ("main()" method). Only needed if the
                                          JAR file does not specify the class in
                                          its manifest.
     -C,--classpath <url>                 Adds a URL to each user code
                                          classloader  on all nodes in the
                                          cluster. The paths must specify a
                                          protocol (e.g. file://) and be
                                          accessible on all nodes (e.g. by means
                                          of a NFS share). You can use this
                                          option multiple times for specifying
                                          more than one URL. The protocol must
                                          be supported by the {@link
                                          java.net.URLClassLoader}.
     -d,--detached                        If present, runs the job in detached
                                          mode
     -n,--allowNonRestoredState           Allow to skip savepoint state that
                                          cannot be restored. You need to allow
                                          this if you removed an operator from
                                          your program that was part of the
                                          program when the savepoint was
                                          triggered.
     -p,--parallelism <parallelism>       The parallelism with which to run the
                                          program. Optional flag to override the
                                          default value specified in the
                                          configuration.
     -py,--python <pythonFile>            Python script with the program entry
                                          point. The dependent resources can be
                                          configured with the `--pyFiles`
                                          option.
     -pyarch,--pyArchives <arg>           Add python archive files for job. The
                                          archive files will be extracted to the
                                          working directory of python UDF
                                          worker. Currently only zip-format is
                                          supported. For each archive file, a
                                          target directory be specified. If the
                                          target directory name is specified,
                                          the archive file will be extracted to
                                          a name can directory with the
                                          specified name. Otherwise, the archive
                                          file will be extracted to a directory
                                          with the same name of the archive
                                          file. The files uploaded via this
                                          option are accessible via relative
                                          path. '#' could be used as the
                                          separator of the archive file path and
                                          the target directory name. Comma (',')
                                          could be used as the separator to
                                          specify multiple archive files. This
                                          option can be used to upload the
                                          virtual environment, the data files
                                          used in Python UDF (e.g.: --pyArchives
                                          file:///tmp/py37.zip,file:///tmp/data.
                                          zip#data --pyExecutable
                                          py37.zip/py37/bin/python). The data
                                          files could be accessed in Python UDF,
                                          e.g.: f = open('data/data.txt', 'r').
     -pyexec,--pyExecutable <arg>         Specify the path of the python
                                          interpreter used to execute the python
                                          UDF worker (e.g.: --pyExecutable
                                          /usr/local/bin/python3). The python
                                          UDF worker depends on Python 3.5+,
                                          Apache Beam (version == 2.19.0), Pip
                                          (version >= 7.1.0) and SetupTools
                                          (version >= 37.0.0). Please ensure
                                          that the specified environment meets
                                          the above requirements.
     -pyfs,--pyFiles <pythonFiles>        Attach custom python files for job.
                                          These files will be added to the
                                          PYTHONPATH of both the local client
                                          and the remote python UDF worker. The
                                          standard python resource file suffixes
                                          such as .py/.egg/.zip or directory are
                                          all supported. Comma (',') could be
                                          used as the separator to specify
                                          multiple files (e.g.: --pyFiles
                                          file:///tmp/myresource.zip,hdfs:///$na
                                          menode_address/myresource2.zip).
     -pym,--pyModule <pythonModule>       Python module with the program entry
                                          point. This option must be used in
                                          conjunction with `--pyFiles`.
     -pyreq,--pyRequirements <arg>        Specify a requirements.txt file which
                                          defines the third-party dependencies.
                                          These dependencies will be installed
                                          and added to the PYTHONPATH of the
                                          python UDF worker. A directory which
                                          contains the installation packages of
                                          these dependencies could be specified
                                          optionally. Use '#' as the separator
                                          if the optional parameter exists
                                          (e.g.: --pyRequirements
                                          file:///tmp/requirements.txt#file:///t
                                          mp/cached_dir).
     -s,--fromSavepoint <savepointPath>   Path to a savepoint to restore the job
                                          from (for example
                                          hdfs:///flink/savepoint-1537).
     -sae,--shutdownOnAttachedExit        If the job is submitted in attached
                                          mode, perform a best-effort cluster
                                          shutdown when the CLI is terminated
                                          abruptly, e.g., in response to a user
                                          interrupt, such as typing Ctrl + C.
  Options for Generic CLI mode:
     -D <property=value>   Generic configuration options for
                           execution/deployment and for the configured executor.
                           The available options can be found at
                           https://ci.apache.org/projects/flink/flink-docs-stabl
                           e/ops/config.html
     -e,--executor <arg>   DEPRECATED: Please use the -t option instead which is
                           also available with the "Application Mode".
                           The name of the executor to be used for executing the
                           given job, which is equivalent to the
                           "execution.target" config option. The currently
                           available executors are: "collection", "remote",
                           "local", "kubernetes-session", "yarn-per-job",
                           "yarn-session".
     -t,--target <arg>     The deployment target for the given application,
                           which is equivalent to the "execution.target" config
                           option. The currently available targets are:
                           "collection", "remote", "local",
                           "kubernetes-session", "yarn-per-job", "yarn-session",
                           "yarn-application" and "kubernetes-application".

  Options for yarn-cluster mode:
     -d,--detached                        If present, runs the job in detached
                                          mode
     -m,--jobmanager <arg>                Address of the JobManager to which to
                                          connect. Use this flag to connect to a
                                          different JobManager than the one
                                          specified in the configuration.
     -yat,--yarnapplicationType <arg>     Set a custom application type for the
                                          application on YARN
     -yD <property=value>                 use value for given property
     -yd,--yarndetached                   If present, runs the job in detached
                                          mode (deprecated; use non-YARN
                                          specific option instead)
     -yh,--yarnhelp                       Help for the Yarn session CLI.
     -yid,--yarnapplicationId <arg>       Attach to running YARN session
     -yj,--yarnjar <arg>                  Path to Flink jar file
     -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container with
                                          optional unit (default: MB)
     -ynl,--yarnnodeLabel <arg>           Specify YARN node label for the YARN
                                          application
     -ynm,--yarnname <arg>                Set a custom name for the application
                                          on YARN
     -yq,--yarnquery                      Display available YARN resources
                                          (memory, cores)
     -yqu,--yarnqueue <arg>               Specify YARN queue.
     -ys,--yarnslots <arg>                Number of slots per TaskManager
     -yt,--yarnship <arg>                 Ship files in the specified directory
                                          (t for transfer)
     -ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container with
                                          optional unit (default: MB)
     -yz,--yarnzookeeperNamespace <arg>   Namespace to create the Zookeeper
                                          sub-paths for high availability mode
     -z,--zookeeperNamespace <arg>        Namespace to create the Zookeeper
                                          sub-paths for high availability mode

  Options for default mode:
     -m,--jobmanager <arg>           Address of the JobManager to which to
                                     connect. Use this flag to connect to a
                                     different JobManager than the one specified
                                     in the configuration.
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths
                                     for high availability mode

以及Flink脚本的配置参数路径:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/config.html#yarn

 

4,我这里要了解的是 Application模式,需要解释的就是Application模式与前面的cluster模式的区别就是,cluster模式我们提交脚本之后,yarn会自动去拉取Flink lib下的jar包,而Application模式是我们将依赖包手动放入hdfs,要运行的jar也手动放入到hdfs指定的路径。

首先演示的是:

1)脚本提交命令

  先上代码:

export HADOOP_CLASSPATH=/opt/cloudera/parcels/CDH/jars/*;./bin/flink run-application -t yarn-application -p 3  \
-Dparallelism.default=3 \
-Djobmanager.memory.process.size=1048m \
-Dtaskmanager.memory.process.size=1096m \
-Dyarn.application.name="MyFlinkWordCount2" \
-Dtaskmanager.numberOfTaskSlots=3 \
-Dyarn.containers.vcores=1 \
-Dyarn.provided.lib.dirs="hdfs://dev-ct6-dc-master01:8020/flink/libs" \
hdfs://dev-ct6-dc-master01:8020/flink/SocketWindowWordCount.jar  --hostname 192.168.6.31 --port 12345

代码解释:关于参数的解释,第一次看肯定有点懵逼,我在官网怎么没看到这些东西,不要慌,我们看官网写了一句话

Flink1.11 脚本提交任务yarn以及java代码提交任务到yarn执行实践

 说白了,就是在cluser命令脚本指定参数的参数名称前面加了一个大写的D,ok,你对cluser模式不熟悉的话请跟着我的步伐。

我们打开官网页面:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/config.html#yarn

这里就是yarn的配置参数页面。

我们 ctrl+f 搜索关键字'jobmanager.memory.process.size' ,会出现详细的解释

Flink1.11 脚本提交任务yarn以及java代码提交任务到yarn执行实践

 

再搜索一个 :

Flink1.11 脚本提交任务yarn以及java代码提交任务到yarn执行实践

所以我们只要会提交cluser模式的脚本,就会提交application 模式的脚本命令了,无非是在属性前面 + "D" 而已

2)java代码调用脚本执行

这里无非是我们java代码通过ssh2 调用linux的脚本,我就直接上代码了吧,也没啥好说的。

要引入jar包:

 

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import org.apache.log4j.Logger;

public class ConnectionSSH {
    private static final Logger logger = Logger.getLogger(ConnectionSSH.class);
    public static void main(String[] args) throws JSchException, IOException {
        JSch jsch = new JSch();
        String pubKeyPath = "C:\\Users\\Administrator\\Desktop\\id_rsa";
        jsch.addIdentity(pubKeyPath);

        String username = "OpsUser";
        String host = "192.168.6.31";
        Session session =jsch.getSession(username, host, 50022);//为了连接做准备



            session.setConfig("StrictHostKeyChecking", "no");
            session.connect();
//            String command = "cd /wyyt/software/flink;export HADOOP_CLASSPATH=/opt/cloudera/parcels/CDH/jars/*;./bin/flink run -m yarn-cluster  -yD yarn.containers.vcores=2 ./examples/batch/WordCount.jar";


        String command = "export HADOOP_CONF_DIR=/etc/hadoop/conf;cd /wyyt/software/flink;sh /wyyt/software/flink/test2.sh 22223545";


//      Channel channel=session.openChannel("shell");
            ChannelExec channel=(ChannelExec)session.openChannel("exec");
            channel.setCommand(command);


//      channel.setInputStream(System.in);
//      channel.setOutputStream(System.out);
//      InputStream in=channel.getInputStream();

            BufferedReader in = new BufferedReader(new InputStreamReader(channel.getInputStream()));

            channel.connect();

            String msg;

            while((msg = in.readLine()) != null){
                System.out.println(msg);
            }
            channel.disconnect();
            session.disconnect();


    }


}

或者:

package shell;

import java.io.IOException;

/**
 * @program: flink-zeppelin
 * @description:
 * @author: Mr.Wang
 * @create: 2020-09-09 11:41
 **/
public class CcxTest {
    public static void main(String[] args) {
        try {
            testDemo1();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static  void testDemo1() throws IOException {

        System.out.println("--------------------------------------");
        //todo 正常需要ip地址,用户和密码
//        String commandStr="cd /root;cat aa.txt;rm -rf aa.txt";
//        String commandStr="cd /root;rm -rf aa.txt";
        String commandStr="cd /wyyt/software/flink;./bin/flink run examples/streaming/WordCount.jar;";

        Boolean result=ConnectLinuxCommand.connectLinux("192.168.12.188","root","root",commandStr);
        System.out.println("结果:"+result);
        System.out.println("--------------------------------------");

        //todo 连接内网,不需要密码 不成功,失败,换个类
//        String commandStr="cd /wyyt/software/flink;cat aa.txt;";
//        Boolean result=ConnectLinuxCommand.connectLinuxWithoutPwd("192.168.6.31","OpsUser",commandStr);
//        System.out.println("结果:"+result);
//        System.out.println("--------------------------------------");
    }

    public static void testDemo2() throws IOException {
        try {
            ConnectLinuxCommand.scpGet("192.168.42.201","root","123456", "/home/aa.txt", "d:/aa");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void testDemo3() {
        try {
            ConnectLinuxCommand.scpPut("192.168.42.201","root","123456", "d:/aa/aa.txt", "/home/bb");
        } catch (Exception e) {
            e.printStackTrace();
        }


    }

}

 

package shell;

/**
 * @program: flink-zeppelin
 * @description:
 * @author: Mr.Wang
 * @create: 2020-09-09 11:40
 **/
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;

import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;



import ch.ethz.ssh2.Connection;
import ch.ethz.ssh2.SCPClient;
import ch.ethz.ssh2.Session;
import ch.ethz.ssh2.StreamGobbler;

/**
 *
 * 描述:连接linux服务器并执行相关的shell命令
 * 作者:cuicx
 * 版本:V1.0
 * 创建日期:2018年7月4日
 * 修改日期: 2018年7月4日
 */
public class ConnectLinuxCommand {
    private static final Logger logger = Logger.getLogger(ConnectLinuxCommand.class);

    private static String DEFAULTCHARTSET = "UTF-8";
    private static Connection conn;
    /**
     *
     * @Title: login
     * @Description: 用户名密码方式  远程登录linux服务器
     * @return: Boolean
     * @throws
     */
    public static Boolean login(RemoteConnect remoteConnect) {
        boolean flag = false;
        try {
            conn = new Connection(remoteConnect.getIp());
            conn.connect();// 连接
            flag = conn.authenticateWithPassword(remoteConnect.getUserName(), remoteConnect.getPassword());// 认证
            if (flag) {
                logger.error("认证成功!");
            } else {
                logger.error("认证失败!");
                conn.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return flag;
    }

    public static Boolean loginWithoutPwd(RemoteConnect remoteConnect) {
        boolean flag = true;
        try {
            conn = new Connection(remoteConnect.getIp());
            conn.connect();// 连接
            boolean authenticationPartialSuccess = conn.isAuthenticationPartialSuccess();
            System.out.println("authenticationPartialSuccess = " + authenticationPartialSuccess);
//            flag = conn.authenticateWithPassword(remoteConnect.getUserName(), remoteConnect.getPassword());// 认证
            if (flag) {
                logger.error("认证成功!");
            } else {
                logger.error("认证失败!");
                conn.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return flag;
    }
    /**
     *
     * @Title: loginByKey
     * @Description: 秘钥方式  远程登录linux服务器
     * @param remoteConnect
     * @param keyFile  一个文件对象指向一个文件,该文件包含OpenSSH**格式的用户的DSA或RSA私钥(PEM,不能丢失"-----BEGIN DSA PRIVATE KEY-----" or "-----BEGIN RSA PRIVATE KEY-----"标签
     * @param keyfilePass 如果秘钥文件加密 需要用该参数解密,如果没有加密可以为null
     * @return Boolean
     * @throws
     */
    public static Boolean loginByFileKey(RemoteConnect remoteConnect, File keyFile, String keyfilePass) {
        boolean flag = false;
        // 输入**所在路径
        // File keyfile = new File("C:\\temp\\private");
        try {
            conn = new Connection(remoteConnect.getIp());
            conn.connect();
            // 登录认证
            flag = conn.authenticateWithPublicKey(remoteConnect.getUserName(), keyFile, keyfilePass);
            if (flag) {
                logger.error("认证成功!");
            } else {
                logger.error("认证失败!");
                conn.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return flag;
    }

    /**
     *
     * @Title: loginByCharsKey
     * @Description: 秘钥方式  远程登录linux服务器
     * @param remoteConnect
     * @param keys  一个字符[],其中包含用户的DSA或RSA私钥(OpenSSH密匙格式,您不能丢失“----- begin DSA私钥-----”或“-----BEGIN RSA PRIVATE KEY-----“标签。char数组可以包含换行符/换行符。
     * @param keyPass 如果秘钥字符数组加密  需要用该字段解密  否则不需要可以为null
     * @return Boolean
     * @throws
     */
    public static Boolean loginByCharsKey(RemoteConnect remoteConnect, char[] keys, String keyPass) {
        boolean flag = false;
        // 输入**所在路径
        // File keyfile = new File("C:\\temp\\private");
        try {
            conn = new Connection(remoteConnect.getIp());
            conn.connect();
            // 登录认证
            flag = conn.authenticateWithPublicKey(remoteConnect.getUserName(), keys, keyPass);
            if (flag) {
                logger.error("认证成功!");
            } else {
                logger.error("认证失败!");
                conn.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return flag;
    }
    /**
     *
     * @Title: execute
     * @Description: 远程执行shll脚本或者命令
     * @param cmd 脚本命令
     * @return: result 命令执行完毕返回结果
     * @throws
     */
    public static String execute(String cmd){
        String result = "";
        try {
            Session session = conn.openSession();// 打开一个会话
            session.execCommand(cmd);// 执行命令
            result = processStdout(session.getStdout(), DEFAULTCHARTSET);
            // 如果为得到标准输出为空,说明脚本执行出错了
            if (StringUtils.isBlank(result)) {
                result = processStdout(session.getStderr(), DEFAULTCHARTSET);
            }
            conn.close();
            session.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return result;
    }
    /**
     * @Title: executeSuccess
     * @Description: 远程执行shell脚本或者命令
     * @return String 命令执行成功后返回的结果值,如果命令执行失败,返回空字符串,不是null
     * @throws
     */
    public static String executeSuccess(String cmd){
        String result = "";
        try {
            Session session = conn.openSession();// 打开一个会话
            session.execCommand(cmd);// 执行命令
            result = processStdout(session.getStdout(), DEFAULTCHARTSET);
            conn.close();
            session.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return result;
    }
    /**
     *
     * @Title: processStdout
     * @Description: 解析脚本执行的返回结果
     * @param in 输入流对象
     * @param charset 编码
     * @return String 以纯文本的格式返回
     * @throws
     */
    public static String processStdout(InputStream in, String charset){
        InputStream stdout = new StreamGobbler(in);
        StringBuffer buffer = new StringBuffer();
        try {
            BufferedReader br = new BufferedReader(new InputStreamReader(stdout, charset));
            String line = null;
            while ((line = br.readLine()) != null) {
                buffer.append(line + "\n");
            }
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return buffer.toString();
    }

    /**
     *
     * @Title: ConnectLinux
     * @Description: 通过用户名和密码关联linux服务器
     * @return
     * @return String
     * @throws
     */
    public static boolean connectLinux(String ip,String userName,String password,String commandStr) {

        logger.error("ConnectLinuxCommand  scpGet===" + "ip:" + ip + "  userName:" + userName + "  commandStr:"
                + commandStr);

        String returnStr = "";
        boolean result = true;
        RemoteConnect remoteConnect = new RemoteConnect();
        remoteConnect.setIp(ip);
        remoteConnect.setUserName(userName);
        remoteConnect.setPassword(password);
        try {
            if (login(remoteConnect)) {
                returnStr = execute(commandStr);
                System.out.println(result);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        if (StringUtils.isBlank(returnStr)) {
            result = false;
        }
        return result;
    }
    public static boolean connectLinuxWithoutPwd(String ip,String userName,String commandStr) {

        logger.error("ConnectLinuxCommand  scpGet===" + "ip:" + ip + "  userName:" + userName + "  commandStr:"
                + commandStr);

        String returnStr = "";
        boolean result = true;
        RemoteConnect remoteConnect = new RemoteConnect();
        remoteConnect.setIp(ip);
        remoteConnect.setUserName(userName);
        try {
            if (loginWithoutPwd(remoteConnect)) {
                returnStr = execute(commandStr);
                System.out.println(result);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        if (StringUtils.isBlank(returnStr)) {
            result = false;
        }
        return result;
    }
    /**
     *
     * @Title: scpGet
     * @Description: 从其他服务器获取文件到本服务器指定目录

     * @param password 密码(其他服务器)
     * @param remoteFile 文件位置(其他服务器)
     * @param localDir 本服务器目录
     * @throws IOException
     * @return void
     * @throws
     */
    public static void scpGet(String ip, String userName, String password, String remoteFile, String localDir)
            throws IOException {

        logger.error("ConnectLinuxCommand  scpGet===" + "ip:" + ip + "  userName:" + userName + "  remoteFile:"
                + remoteFile + "  localDir:" + localDir);
        RemoteConnect remoteConnect = new RemoteConnect();
        remoteConnect.setIp(ip);
        remoteConnect.setUserName(userName);
        remoteConnect.setPassword(password);
        if (login(remoteConnect)) {
            SCPClient client = new SCPClient(conn);
            client.get(remoteFile, localDir);
            conn.close();
        }
    }


    /**
     *
     * @Title: scpPut
     * @Description: 将文件复制到其他计算机中
     * @param password
     * @param localFile
     * @param remoteDir
     * @throws IOException
     * @return void
     * @throws
     */
    public static void scpPut(String ip, String userName, String password, String localFile, String remoteDir)
            throws IOException {
        logger.error("ConnectLinuxCommand  scpPut===" + "ip:" + ip + "  userName:" + userName + "  localFile:"
                + localFile + "  remoteDir:" + remoteDir);
        RemoteConnect remoteConnect = new RemoteConnect();
        remoteConnect.setIp(ip);
        remoteConnect.setUserName(userName);
        remoteConnect.setPassword(password);
        if (login(remoteConnect)) {
            SCPClient client = new SCPClient(conn);
            client.put(localFile, remoteDir);
            conn.close();
        }
    }
}

 

依赖:
<dependency>
    <groupId>ch.ethz.ganymed</groupId>
    <artifactId>ganymed-ssh2</artifactId>
    <version>build210</version>
</dependency>

 

3)代码提交任务到yarn执行

 脚本执行吗命令,最后肯定是代码执行,所以脚本调用的,代码都可以执行,先上代码吧:

package yarn;

import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.*;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterInformationRetriever;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import java.util.Collections;

import static org.apache.flink.configuration.JobManagerOptions.TOTAL_PROCESS_MEMORY;
import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;
import  org.apache.hadoop.fs.FSOutputSummer;
/**
 * @author zhangjun 欢迎关注我的公众号[大数据技术与应用实战],及时获取更多精彩实战内容
 * <p>
 * 通过api的方式以application的模式来提交flink任务到yarn集群
 */

public class SubmitJobApplicationMode{
    public static void main(String[] args){

        //flink的本地配置目录,为了得到flink的配置
        String configurationDirectory = "G:/flink_working_tools/yarn-clientconfig/yarn-conf";
        //存放flink集群相关的jar包目录
        String flinkLibs = "hdfs://dev-ct6-dc-master01:8020/flink/libs";
        //用户jar
        String userJarPath = "hdfs://dev-ct6-dc-master01:8020/flink/SocketWindowWordCount.jar";
        String flinkDistJar = "hdfs://dev-ct6-dc-master01:8020/flink/flink-yarn_2.11-1.11.0.jar";

        YarnClient yarnClient = YarnClient.createYarnClient();
        org.apache.hadoop.conf.Configuration entries = new org.apache.hadoop.conf.Configuration();
        entries.addResource(new Path("G:/flink_working_tools/yarn-clientconfig/yarn-conf/yarn-site.xml"));
        entries.addResource(new Path("G:/flink_working_tools/yarn-clientconfig/yarn-conf/hdfs-site.xml"));
        entries.addResource(new Path("G:/flink_working_tools/yarn-clientconfig/yarn-conf/core-site.xml"));
        YarnConfiguration yarnConfiguration = new YarnConfiguration(entries);
        yarnClient.init(yarnConfiguration);
        yarnClient.start();

        YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever
                .create(yarnClient);

        //获取flink的配置
        Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
                configurationDirectory);
        flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
        flinkConfiguration.set(
                PipelineOptions.JARS,
                Collections.singletonList(
                        userJarPath));

        Path remoteLib = new Path(flinkLibs);
        flinkConfiguration.set(
                YarnConfigOptions.PROVIDED_LIB_DIRS,
                Collections.singletonList(remoteLib.toString()));

        flinkConfiguration.set(
                YarnConfigOptions.FLINK_DIST_JAR,
                flinkDistJar);
        //设置为application模式
        flinkConfiguration.set(
                DeploymentOptions.TARGET,
                YarnDeploymentTarget.APPLICATION.getName());
        //yarn application name
        flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "TestApplication");

        flinkConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024",MEGA_BYTES));
        flinkConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024",MEGA_BYTES));

        ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
                .createClusterSpecification();

//    设置用户jar的参数和主类
        ApplicationConfiguration appConfig = new ApplicationConfiguration(args, null);

        YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
                flinkConfiguration,
                yarnConfiguration,
                yarnClient,
                clusterInformationRetriever,
                true);
        ClusterClientProvider<ApplicationId> clusterClientProvider = null;
        try {
            clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
                    clusterSpecification,
                    appConfig);
        } catch (ClusterDeploymentException e){
            e.printStackTrace();
        }

        ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
        ApplicationId applicationId = clusterClient.getClusterId();
        System.out.println(applicationId);
    }
}

解释一下要点:

1)依赖  ,因为我是flink 1.11.0,你可以自行根据你自己的版本,当然这个包你要从maven库拷贝出来,待会还需要

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-yarn_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

2)正常,这3个文件是需要的

Flink1.11 脚本提交任务yarn以及java代码提交任务到yarn执行实践

 

3)还是跟脚本命令一样,我们要设置配置参数

Flink1.11 脚本提交任务yarn以及java代码提交任务到yarn执行实践

我们点击到对应的源码里面去,无非是下面3个类,yarn的参数 jm,tm的参数配置:

YarnConfigOptions
TaskManagerOptions
JobManagerOptions 

举例:

我要设置jobmanager的配置参数,我们在官网查询到这个属性为:

jobmanager.memory.process.size

我们点击到 类 ‘JobManagerOptions ’ 搜索:jobmanager.memory.process.size

Flink1.11 脚本提交任务yarn以及java代码提交任务到yarn执行实践

代码参数设置就为:
flinkConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024",MEGA_BYTES));

后续还有指定日志打印格式哦,savepoint等等:

Flink1.11 脚本提交任务yarn以及java代码提交任务到yarn执行实践

4)我们将依赖包上传到hdfs,还有Flink1.11 脚本提交任务yarn以及java代码提交任务到yarn执行实践

5)后面就是执行了 。如果出现失败

比如:

org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn Application Cluster

       at org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:414)

       at org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:64)

       at org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:197)

       at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:919)

       at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)

       at java.security.AccessController.doPrivileged(Native Method)

       at javax.security.auth.Subject.doAs(Subject.java:422)

       at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)

       at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

       at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)

Caused by: org.apache.flink.configuration.IllegalConfigurationException: The number of requested virtual cores per node 8 exceeds the maximum number of virtual cores 4 available in the Yarn Cluster. Please note that the number of virtual cores is set to the number of task slots by default unless configured in the Flink config with 'yarn.containers.vcores.'

       at org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:294)

       at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:480)

       at org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:407)

 

 

或者需要hadoop_classpath等,就是参数没设置对,要加上hadoop依赖,参考上面的脚本。

设置yarn参数。