助力工业物联网,工业大数据之脚本开发【五】

时间:2023-01-29 16:54:54

01:脚本开发思路

  • 目标:实现自动化脚本开发的设计思路分析

  • 路径

    • step1:脚本目标

    • step2:实现流程

    • step3:脚本选型

    • step4:单个测试

  • 实施

    • 创建一个文件,存放要采集的表的名称

      #创建测试目录
      mkdir -p /opt/datas/shell
      cd /opt/datas/shell/
      #创建存放表名的文件
      vim test_full_table.txt
      ciss4.ciss_base_areas
      ciss4.ciss_base_baseinfo
      ciss4.ciss_base_csp
      ciss4.ciss_base_customer
      ciss4.ciss_base_device
    • 创建脚本

      vim test_full_import_table.sh
    • 构建采集的Sqoop命令

      sqoop import \
      -Dmapreduce.job.user.classpath.first=true \
      --connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin \
      --username ciss \
      --password 123456 \
      --table CISS4.CISS_SERVICE_WORKORDER \
      --delete-target-dir \
      --target-dir /test/full_imp/ciss4.ciss_service_workorder \
      --as-avrodatafile \
      --fields-terminated-by "\001" \
      -m 1
    • 封装脚本

      #!/bin/bash
      #export path
      source /etc/profile
      #export the tbname files
      TB_NAME=/opt/datas/shell/test_full_table.txt
      #export the import opt
      IMP_OPT="sqoop import -Dmapreduce.job.user.classpath.first=true"
      #export the jdbc opt
      JDBC_OPT="--connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin --username ciss --password 123456"
      
      #read tbname and exec sqoop
      while read tbname
      do
        ${IMP_OPT} ${JDBC_OPT} --table ${tbname^^} --delete-target-dir --target-dir /test/full_imp/${tbname^^} --as-avrodatafile --fields-terminated-by "\001" -m 1
      done < ${TB_NAME}
    • Shell:Linux原生Shell脚本,命令功能全面丰富,主要用于实现自动化Linux指令,适合于Linux中简单的自动化任务开发

    • Python:多平台可移植兼容脚本,自身库功能强大,主要用于爬虫、数据科学分析计算等,适合于复杂逻辑的处理计算场景

    • 场景:一般100行以内的代码建议用Shell,超过100行的代码建议用Python

    • 采集脚本选用:Shell

    • a. 获取表名

    • b.构建Sqoop命令

    • c.执行Sqoop命令

    • d.验证结果

    • 脚本目标:实现自动化将多张Oracle中的数据表全量或者增量采集同步到HDFS中

    • 实现流程

    • 脚本选型

    • 单个测试

  • 添加执行权限

    chmod u+x test_full_import_table.sh
  • 测试执行

    sh -x test_full_import_table.sh
  • 检查结果

    助力工业物联网,工业大数据之脚本开发【五】
  • 小结

    • 实现自动化脚本开发的设计思路分析

02:全量及增量采集脚本运行

  • 目标:实现全量采集脚本的运行

  • 实施

    • /data/dw/ods/one_make/full_imp:44张表

    • /data/dw/ods/one_make/incr_imp:57张表

    • 因oracle表特殊字段类型,导致sqoop导数据任务失败

    • oracle字段类型为:clob或date等特殊类型

    • 解决方案:在sqoop命令中添加参数,指定特殊类型字段列(SERIAL_NUM)的数据类型为string

    • —map-column-java SERIAL_NUM=String

    • 全量采集

      cd /opt/sqoop/one_make
      sh -x full_import_tables.sh
    • --outdir:Sqoop解析出来的MR的Java程序等输出文件输出的文件

    • 增量采集

      cd /opt/sqoop/one_make
      sh -x incr_import_tables.sh
    • 脚本中特殊的一些参数

    • 工单数据信息、呼叫中心信息、物料仓储信息、报销费用信息等

    • HDFS路径

      /data/dw/ods/one_make/incr_imp/表名/日期
    • Oracle表:组织机构信息、地区信息、服务商信息、数据字典等

    • HDFS路径

      /data/dw/ods/one_make/full_imp/表名/日期
    • 全量目标:将所有需要将实现全量采集的表进行全量采集存储到HDFS上

    • 增量目标:将所有需要将实现全量采集的表进行增量采集存储到HDFS上

    • 运行脚本

    • 特殊问题

    • 查看结果

  • 小结

    • 实现全量采集脚本的运行

03:Schema备份及上传

  • 目标:了解如何实现采集数据备份

  • 实施

    • Avro文件HDFS存储

      hdfs_schema_dir=/data/dw/ods/one_make/avsc
      hdfs dfs -put ${workhome}/java_code/*.avsc ${hdfs_schema_dir}
    • Avro文件本地打包

      local_schema_backup_filename=schema_${biz_date}.tar.gz
      tar -czf ${local_schema_backup_filename} ./java_code/*.avsc
    • Avro文件HDFS备份

      hdfs_schema_backup_filename=${hdfs_schema_dir}/avro_schema_${biz_date}.tar.gz
      hdfs dfs -put ${local_schema_backup_filename} ${hdfs_schema_backup_filename}
    • 运行测试

      cd /opt/sqoop/one_make/
      ./upload_avro_schema.sh
    • 验证结果

      /data/dw/ods/one_make/avsc/
      *.avsc
      schema_20210101.tar.gz
    • 需求:将每张表的Schema进行上传到HDFS上,归档并且备份

    • Avro文件本地存储

      workhome=/opt/sqoop/one_make
      --outdir ${workhome}/java_code
    • 小结

    • 了解如何实现采集数据备份

04:Python脚本

  • 目标:了解如果使用Python脚本如何实现

  • 实施

    • subprocess

      call(String:LinuxCommand):用于提交Linux命令的方法
    • logging

      basicConfig(level,filename,filemode,format):用于配置日志记录的方式
      info(Messege):用于记录具体的日志内容
    • time

      sleep(15) :休眠15s
    • 问题:所有的操作是Sqoop、HDFS等命令操作,如何能通过Python代码控制?

    • 解决:本质上是使用Python执行了Linux的Shell命令来实现的

    • 导包

      # 用于实现执行系统操作的包
      import os
      # 用于实现执行Linux的命令的包
      import subprocess
      # 用于实现日期获取解析的包
      import datetime
      # 用于执行时间操作的包
      import time
      # 用于做日志记录的包
      import logging
    • 原理本质

    • 核心代码解析

  • 小结

    • 了解如果使用Python脚本如何实现