学习dbms_parallel_execute包

时间:2023-03-08 23:56:42
学习dbms_parallel_execute包

一、简介

  ORACLE11g R2版本的新特性之一就是引进了DBMS_PARALLEL_EXECUTE包,使用DBMS_PARALLEL_EXECUTE包批量并行递增式的更新表。

  更多ORACLE11g新特性请参考:http://www.cnblogs.com/oracle-dba/articles/3632223.html
基本原理:
  (1) 把数据集分割成小的块(chunk),可基于rowid进行分块,也可根据指定范围的行数(rows)进行分块。
  (2) 在每一个块上以并行的方式应用update语句,在每个块执行完成后,立即提交,原理是通过调用JOB进行并发操作。
好处在于:
  (1) 在执行update操作时,仅仅锁住一个chunk而非锁住整个表;
  (2) 因为对每个chunk 执行完毕就提交,所以当update操作失败后,之前变更的并不会回滚;
  (3) 减小回滚空间(undo)的使用;
  (4) 提高性能,可根据服务器的性能设置chunk_size与parallel_level的大小。
备注:parallel_level取决于CPU的个数,以及parallel_threads_per_cpu。

DBMS_PARALLEL_EXECUTE 使用三种将一个表的数据分割成多个chunk的方法:
  (1) CREATE_CHUNKS_BY_NUMBER_COL : 通过指定的字段来切割表
  (2) CREATE_CHUNKS_BY_ROWID : 通过ROWID来切割表,本文只介绍通过BY ROWID 进行分割
  (4) CREATE_CHUNKS_BY_SQL : 通过用户提供的sql语句来切割表

二、实践操作

  DBMS_PARALLEL_EXECUTE这个包操作起来比较简单,大体步骤为:

  (1)创建任务task,即调用create_task()过程;

  (2)创建分块规则,即调用create_chunk_by_rowid()或者create_chunk_by_number_col()等过程;

  (3)写动态update的sql语句,复杂和简单的均可,可以通过dbms_output.put_line()来验证动态sql的正确性;

  (4)运行task任务,即调用run_task()过程;

  (5)监控task任务,可通过视图user_parallel_execute_tasks和user_parallel_execute_chunks来监控task状态;

  (6)task成功运行完毕后,将任务删除,即调用drop_task()过程。

 2.1 准备工作

   给用户授权

    调用dbms_parallel_execute包时,需要有执行create job的权限,因此需要给用于赋予create job权限。

      grant create job to username;

   也可把grant execute on dbms_scheduler to username;赋予某用户。

 2.2 写代码

   实践操作代码,有改动,仅做参考。

CREATE OR REPLACE PROCEDURE Pro_Parallel_Exec_Update(i_Taskid IN VARCHAR2,
--任务ID 传入是序列 para_task_seq.nextval
i_Inputdate IN DATE,
--日期
i_Tabname IN VARCHAR2,
--更新表名
i_Column IN VARCHAR2,
--更新列名
i_Tasktp IN VARCHAR2 DEFAULT 'BY ROWID'
-- 任务类型 未启用的,可设置不能类型的 update
-- 如: 如不同类型的分块方法 by rowid , by number_col ,by sql
) IS
/*******************************************************************
PROC_NAME : 并行更新过程,本例采用BY ROWID 方法
EDIT_NAME : zzd @2014-03-26
REQUIREMENT :
1) ORACLE 11g R2 版本
2) DBMS_PARALLEL_EXECUTE
3) OWN CREATE JOB PRIVILEGE eg: GRANT EXECUTE ON DBMS_SCHEDULER TO USER ;
4) SET JOB_QUEUE_PROCESSES
REFERENCE:
(1) USER_PARALLEL_EXECUTE_CHUNKS
(2) USER_PARALLEL_EXECUTE_TASKS
(3) DBMS_PARALLEL_EXECUTE.DROP_TASK(TASK_NAME);
ATTACH :
(1) 分块范围 Chunk_Size 和并行度 Paralle_Level 可以根据服务器性能自行设置
(2) DBMS_PARALLEL_EXECUTE 包三种 (BY ROWID, BY NUMBER_COL ,BY SQL ) 分块方法,原理差不多
(3) 更多说明参考PLSQL Packages and Types Reference_11R2.pdf
********************************************************************/
--- 任务名常量
v_Taskname VARCHAR2(20) := 'UPDATE_TASK';
--- schema常量
v_Schema CONSTANT VARCHAR2(20) := 'USER';
--- rowid范围常量
v_Cksize CONSTANT INT := 8000;
--- 并行度大小
v_Plevel CONSTANT INT := 10;
--- 日期变量
v_Inputdate VARCHAR2(64);
--- 临时动态sql 变量
v_Sql_Stmt VARCHAR2(4000);
--- 临时动态insert_sql 变量
v_Sql_Insert VARCHAR2(4000);
--- 控制尝试运行次数
v_Trynum NUMBER;
--- 返回运行状态
v_Status NUMBER;
BEGIN
v_Trynum := 0;
--- 日期转换
v_Inputdate := 'TO_DATE(''' || To_Char(i_Inputdate, 'YYYY-MM-DD') ||
''',''' || 'YYYY-MM-DD' || ''')';
--- 写日志开始更新
Dbms_Output.Put_Line('开始运行 ' || ' USER = ' || v_Schema ||
' Table_Name= ' || i_Tabname ||
' Update_Column = ' || i_Column || ' 任务类型为: ' ||
i_Tasktp);
--- 注意这里只能用实体表作为临时表,此处称“中间表”否则在创建task 这步会把临时表的数据情况,事务性和会话性临时表均不可以
--- 清空 中间表数据
EXECUTE IMMEDIATE ' Truncate TABLE User_Table_Temp '; -- 注这个是个实体中间表
--- 动态 insert sql 语句
v_Sql_Insert := ' INSERT INTO User_Table_Temp(Table_Name, Status)
SELECT a.Table_Name, a.Status
FROM User_Tables a
WHERE a.Table_Name LIKE ' || '''' ||
' USER_NAME% ' || ''''; --- 执行动态sql
EXECUTE IMMEDIATE v_Sql_Insert;
--- 测试动态sql
Dbms_Output.Put_Line(v_Sql_Insert); --- 创建任务
v_Taskname := v_Taskname || ' _ ' || i_Taskid;
Dbms_Parallel_Execute.Create_Task(v_Taskname); --- 使用 ROWID 进行范围分组
Dbms_Parallel_Execute.Create_Chunks_By_Rowid(Task_Name => v_Taskname,
Table_Owner => Upper(v_Schema),
Table_Name => Upper(i_Tabname),
By_Row => TRUE,
Chunk_Size => v_Cksize); --- 动态update的sql 脚本
v_Sql_Stmt := ' UPDATE /*+ ROWID (dda) */ ' || Upper(i_Tabname) || ' b
SET b.' || i_Column; v_Sql_Stmt := v_Sql_Stmt || ' = (SELECT a.TABLE_NAME|| A.STATUS
FROM User_Table_Temp a
WHERE b. Object_Name' ||
' = a.TABLE_NAME
AND b.Created = ' || v_Inputdate || ') '; v_Sql_Stmt := v_Sql_Stmt || '
WHERE EXISTS (SELECT 1
FROM User_Table_Temp a
WHERE b.Object_Name ' ||
' = a.TABLE_NAME
AND b.Created = ' || v_Inputdate || ')'; v_Sql_Stmt := v_Sql_Stmt || Chr(10) ||
'AND ROWID BETWEEN :Start_Id AND :End_Id '; --- 测试动态SQL
Dbms_Output.Put_Line(v_Sql_Stmt);
--- 执行并行更新
Dbms_Parallel_Execute.Run_Task(Task_Name => v_Taskname,
Sql_Stmt => v_Sql_Stmt,
Language_Flag => Dbms_Sql.Native,
Parallel_Level => v_Plevel);
--- 更新结束
Dbms_Output.Put_Line('完成运行 ' || ' USER = ' || v_Schema ||
' Table_Name= ' || i_Tabname ||
' Update_Column = ' || i_Column || ' 任务类型为: ' ||
i_Tasktp); --- 如果 task任务出差,则恢复它,并重新执行,如此进行尝试两次
v_Status := Dbms_Parallel_Execute.Task_Status(v_Taskname); WHILE (v_Trynum < 2 AND v_Status != Dbms_Parallel_Execute.Finished) LOOP
v_Trynum := v_Trynum + 1;
--- 恢复任务,尝试继续执行
Dbms_Parallel_Execute.Resume_Task(v_Taskname);
--- 获取当前任务状态
v_Status := Dbms_Parallel_Execute.Task_Status(v_Taskname); END LOOP;
-- 更新完毕后,删除此任务
Dbms_Parallel_Execute.Drop_Task(v_Taskname); END Pro_Parallel_Exec_Update;

 三、相关视图

  (1)USER_PARALLEL_EXECUTE_CHUNKS

  (2)USER_PARALLEL_EXECUTE_TASKS