AX2009 批处理作业中使用多线程---*采摘

时间:2023-03-10 05:36:54
AX2009 批处理作业中使用多线程---*采摘

*采摘

是前两种模式的一种混合,使用使用实体表存储单任务模式那样的每个工单,任务每次都取顶行做操作。单任务数不想单任务模式,一个工单一个任务。而是类似批量捆绑那样设置任务数。
表:demoTopPickProcessTrackTable

字段 类型  
SalesId String SalesId
processedStatus Enum NoYes

Class:

/*
*采摘
*/
class DemoBatchTopPicking extends RunBaseBatch
{
str 20 fromSalesOrder, toSalesOrder;
#define.CurrentVersion(1)
#localmacro.CurrentList
fromSalesOrder, toSalesOrder
#endmacro
} public void new()
{
super();
} void run()
{
SalesTable salesTable;
SalesFormLetter formletter;
DemoTopPickProcessTrackTable demoTopPickProcessTrackTable;
Map SalesMap;
; DemoTopPickProcessTrackTable.readPast(true); do
{
ttsBegin;
// when it finds no more work item to process do-while loop will exit select pessimisticlock firstOnly * from demoTopPickProcessTrackTable
where demoTopPickProcessTrackTable.ProcessedStatus == NoYes::No; select * from salesTable
where salesTable.salesId == demoTopPickProcessTrackTable.SalesID &&
salesTable.documentStatus == DocumentStatus::none; if (salesTable)
{
formletter = SalesFormLetter::construct(DocumentStatus::Invoice);
formletter.getLast();
formletter.resetParmListCommonCS();
formletter.allowEmptyTable(formletter.initAllowEmptyTable(true));
SalesMap = new Map(Types::Int64,Types::Record);
SalesMap.insert(salesTable.recid,salesTable);
// formletter.parmDataSourceRecordsPacked(SalesMap.pack());
// formletter.createParmUpdateFromParmUpdateRecord(SalesFormletterParmData::initSalesParmUpdateFormletter(DocumentStatus::Invoice, FormLetter.pack()));
formletter.showQueryForm(false);
formletter.initLinesQuery();
formletter.update(salesTable, systemDateGet(), SalesUpdate::All, AccountOrder::None, false, false);
} if(demoTopPickProcessTrackTable)
{
demoTopPickProcessTrackTable.ProcessedStatus = NoYes::Yes;
demoTopPickProcessTrackTable.update();
}
ttsCommit;
} while ( demoTopPickProcessTrackTable);
} public static DemoBatchTopPicking construct()
{
DemoBatchTopPicking c;
;
c = new DemoBatchTopPicking();
return c;
} public container pack()
{
return [#CurrentVersion, #CurrentList]; } public boolean unpack(container packedClass)
{
Version version = RunBase::getVersion(packedClass);
switch(version)
{
case #CurrentVersion:
[version,#CurrentList] = packedClass;
break;
default:
return false;
}
return true;
}

  

Job Test

static void scheduleDemoBatchTopPickingJob(Args _args)
{
BatchHeader batchHeader;
DemoBatchTopPicking demoBatchTopPicking;
DemoTopPickProcessTrackTable demoTopPickProcessTrackTable;
SalesTable salesTable; int totalNumberOfTasksNeeded = 10; int counter;
SalesId _SalesIdFrom,_SalesIdTo;
BatchInfo batchInfo;
; _SalesIdFrom = "SSO000001";
_SalesIdTo = "SSO001000"; ttsBegin;
select count(RecId) from salesTable
where salesTable.salesId >= _SalesIdFrom &&
salesTable.salesId <= _SalesIdTo
&& salesTable.documentStatus == DocumentStatus::none; info(Strfmt("%1",salesTable.recid));
delete_from demoTopPickProcessTrackTable; if (salesTable.recid > 0)
{
//Populating the staging table with the work items insert_recordset demoTopPickProcessTrackTable (SalesId)
select SalesId from salesTable
where salesTable.SalesId >= _SalesIdFrom &&
salesTable.SalesId <= _SalesIdTo &&
salesTable.DocumentStatus == DocumentStatus::None; update_recordSet demoTopPickProcessTrackTable
setting processedStatus = NoYes::No; batchHeader = BatchHeader::construct();
batchHeader.parmCaption(strFmt('Batch job for demoBatchTopPicking')); //Creating predefined number of tasks
for(counter = 1; counter <= totalNumberOfTasksNeeded; counter++)
{
info(strfmt("-->%1",counter)); demoBatchTopPicking = DemoBatchTopPicking::construct(); batchInfo = demoBatchTopPicking.batchInfo();
BatchInfo.parmCaption(strfmt('Invoicing :%1 ',counter));
batchInfo.parmGroupId("COUNTING"); batchHeader.addTask(demoBatchTopPicking);
}
batchHeader.save(); }
ttsCommit;
info('Done');
}
/*
*采摘: 假设,我们处理10万条记录 #创建的任务 #批处理线程(在我的测试服务器) #可并行执行的并行任务
10 10 10 批处理整个时段,10个任务同时运行,直到队列中没有更多的项目(暂存表中没有要处理的项目)
*/

  AX2009 批处理作业中使用多线程---*采摘