如何在Promise.all中正确承诺链?

时间:2022-09-15 20:47:31

Basically, I grab all the rows from a schedule table, then process individual row of it. If the row is already in command table, skip. Otherwise, I insert it.

基本上,我从调度表中获取所有行,然后处理它的各行。如果该行已在命令表中,请跳过。否则,我插入它。

I have 2 chained promises within Promise.all(rows.map(function(row){

我在Promise.all中有2个链式承诺(rows.map(function(row){

return is_schedule_cmd_already_pending(schedule_id).then(function(num){
   return insert_into_pending_cmd(num, schedule_id, device_name, cmd);  
});

I print the sql statement in console.log in is_schedule_cmd_already_pending and insert_into_pending_cmd

我在is_schedule_cmd_already_pending和insert_into_pending_cmd中的console.log中打印sql语句

The printing order is out of sync.

打印指令不同步。

It should be executed in order like this, for each row, in sync style.

它应按照这样的顺序执行,对于每一行,以同步方式。

- schedule_id, device_name, day_code, schedule_time, schedule_hr, schedule_min, cmd -
- curr_date_code, curr_h, curr_min - 
.. match up day ..
~~ match up hour ~~
## match up d-h-m, run ##
is_schedule_cmd_already_pending
insert_into_pending_cmd


- schedule_id, device_name, day_code, schedule_time, schedule_hr, schedule_min, cmd -
- curr_date_code, curr_h, curr_min - 
.. match up day ..
~~ match up hour ~~
## match up d-h-m, run ##
is_schedule_cmd_already_pending
insert_into_pending_cmd


- schedule_id, device_name, day_code, schedule_time, schedule_hr, schedule_min, cmd -
- curr_date_code, curr_h, curr_min - 
.. match up day ..
~~ match up hour ~~
## match up d-h-m, run ##
is_schedule_cmd_already_pending
insert_into_pending_cmd
.......
.......

Instead, it is like (i.e. all insert_into_pending_cmd happen at the very end, which is not what I want)

相反,它就像(即所有insert_into_pending_cmd发生在最后,这不是我想要的)

- schedule_id, device_name, day_code, schedule_time, schedule_hr, schedule_min, cmd -
- curr_date_code, curr_h, curr_min - 
.. match up day ..
~~ match up hour ~~
## match up d-h-m, run ##
is_schedule_cmd_already_pending



- schedule_id, device_name, day_code, schedule_time, schedule_hr, schedule_min, cmd -
- curr_date_code, curr_h, curr_min - 
.. match up day ..
~~ match up hour ~~
## match up d-h-m, run ##
is_schedule_cmd_already_pending



- schedule_id, device_name, day_code, schedule_time, schedule_hr, schedule_min, cmd -
- curr_date_code, curr_h, curr_min - 
.. match up day ..
~~ match up hour ~~
## match up d-h-m, run ##
is_schedule_cmd_already_pending

.......
.......
.......

insert_into_pending_cmd
insert_into_pending_cmd
insert_into_pending_cmd

Full code

var config = require("./config.js");
var Promise = require('bluebird');
var mysql = require('promise-mysql');
var ON_DEATH = require('death');


var g_pool = null;

function connect_db() {
  g_pool = mysql.createPool(config.db_config);
}


function close_db() {
  g_pool.end(function (err) {
    // all connections in the pool have ended
    });
}



// http://thecodeship.com/web-development/alternative-to-javascript-evil-setinterval/
function interval(func, wait, times){
    var interv = function(w, t) {
    return function() {
        if(typeof t === "undefined" || t-- > 0) {
        setTimeout(interv, w);
        try {
            func.call(null);
        }
        catch(e) {
            t = 0;
          throw e.toString();
        }
      }
        };
    }(wait, times);

  setTimeout(interv, wait);
}


function get_current_utc_time() {
  var curr_date_obj = new Date();
    var time_utc = "";

    // somehow the date format is not accurate.
  //var time_utc = dateFormat(now, "yyyy-mm-dd h:MM:ss", true);

    var year = curr_date_obj.getUTCFullYear(); 
    var month = add_zero(curr_date_obj.getUTCMonth() + 1); // count from 0
    var date = add_zero(curr_date_obj.getUTCDate()); // count from 1
    var hr = add_zero(curr_date_obj.getUTCHours());
    var min = add_zero(curr_date_obj.getUTCMinutes()); 
    // we ignore the second
    var sec = "00";

    time_utc = year + "-" + month + "-" + date + " " + hr + ":" + min + ":" + sec;

    console.log("-current utc-");
    console.log(time_utc);

  return time_utc;
};


// http://www.w3schools.com/jsref/jsref_getutchours.asp
function add_zero(i) {
  if (i < 10) {
    i = "0" + i;
  }
  return i;
}


function insert_into_pending_cmd(msg_obj) {
  console.log();
  console.log("-insert_into_pending_cmd-");

    var schedule_id = msg_obj.schedule_id;
    var device_name = msg_obj.device_name;
    var cmd = msg_obj.cmd;
    var is_pending = msg_obj.is_pending;

  if(is_pending) {
    return Promise.resolve();
  }
  else {
    var curr_time = get_current_utc_time();
    var sql = "insert into Command set CommandDate = " + "'" + curr_time + "'" + "," + "RemoteName = " + "'" + device_name + "'" + "," + "CommandJSON = " + "'" + cmd + "'" + "," + "CommandComplete = 0" + "," + "ScheduleId = " + "'" + schedule_id + "'";

    return g_pool.query(sql).then(function(){
      return Promise.resolve();
    });
  }
}


function is_schedule_cmd_already_pending(msg_obj) {
    console.log();
  console.log("-is_schedule_cmd_already_pending-");

    var schedule_id = msg_obj.schedule_id;
    var device_name = msg_obj.device_name;
    var cmd = msg_obj.cmd;
    var is_run = msg_obj.is_run;

    var local_msg_obj = {};

  if(is_run) {
    var sql = "select count(*) as num from Command where ScheduleId = " + "'" + schedule_id + "'" + " and CommandComplete = 0 and (UNIX_TIMESTAMP(UTC_TIMESTAMP()) - UNIX_TIMESTAMP(CommandDate)) < 600 and (UNIX_TIMESTAMP(UTC_TIMESTAMP()) - UNIX_TIMESTAMP(CommandDate)) > 0";
    return g_pool.query(sql).then(function(rows){
      var num = rows[0].num;
      if(num == 0) {
                local_msg_obj = {
                    schedule_id: schedule_id,
                    device_name: device_name,
                    cmd: cmd,
                    is_pending: false
                };
        return Promise.resolve(local_msg_obj);
      }
      else {
                local_msg_obj = {
          schedule_id: schedule_id,
          device_name: device_name,
          cmd: cmd,
          is_pending: true 
        };
        return Promise.resolve(local_msg_obj);
      }
    });
  }
  else {
        local_msg_obj = {
        schedule_id: schedule_id,
      device_name: device_name,
      cmd: cmd,
      is_pending: true 
    };
    return Promise.resolve(local_msg_obj);
  }
}




function is_matchup_schedule_time(row) {
    // get all field
  var schedule_id = row.ScheduleId;
  var device_name = row.ScheduleRemoteName;
  var day_code = row.ScheduleDaycode;
  var schedule_time = row.ScheduleTime;
  var cmd = row.ScheduleCommandJSON;

  // get hour and min
  var schedule_time_arr = schedule_time.split(":");
  var schedule_hour = schedule_time_arr[0];
  var schedule_min = schedule_time_arr[1];

  // print
  console.log();
  console.log();
  console.log("- schedule_id, device_name, day_code, schedule_time, schedule_hr, schedule_min, cmd -");
  console.log(schedule_id);
  console.log(device_name);
  console.log(day_code);
  console.log(schedule_time);
  console.log(schedule_hour);
  console.log(schedule_min);
  console.log(cmd);

  // curr date obj
  var curr_date_obj = new Date();
  var curr_date_code = add_zero(curr_date_obj.getUTCDay());

    // print current
  console.log();
  console.log("- curr_date_code, curr_h, curr_min - ");
  console.log(curr_date_code);
  console.log(add_zero(curr_date_obj.getUTCHours()));
  console.log(add_zero(curr_date_obj.getUTCMinutes()));

    // var
    var msg_obj = {};

    // Match up day
  if(day_code == curr_date_code) {
    console.log();
    console.log(".. match up day ..");

    // Match up hour
    var curr_hour = add_zero(curr_date_obj.getUTCHours());
    if(schedule_hour == curr_hour) {
      console.log();
      console.log("~~ match up hour ~~");

      // Match up min
      var curr_min = add_zero(curr_date_obj.getUTCMinutes());
      if(schedule_min == curr_min) {
        console.log();
        console.log("## match up d-h-m, run ##");

                msg_obj = {
                    schedule_id: schedule_id,
                    device_name: device_name,
                    cmd: cmd,
                    is_run: true                                    
                };

                return Promise.resolve(msg_obj);            
      }
    }
  }
  else {

  }

    //
    msg_obj = {
    schedule_id: schedule_id,
    device_name: device_name,
    cmd: cmd,
    is_run: false 
  };

    return Promise.resolve(msg_obj);
}


// NOTE -------------
function process_schedule_rows(rows) {
    return Promise.mapSeries(rows, function(row) {
        return is_matchup_schedule_time(row)
            .then(is_schedule_cmd_already_pending)
            .then(insert_into_pending_cmd)
            .catch(function(e){
                throw e;
            })
    });
}


function do_schedule() {
  console.log();
  console.log("---- start do_schedule ----");

  g_pool.query("select * from Schedule order by ScheduleId asc")
  .then(process_schedule_rows)
    .catch(function(e){
        throw e;
    });
}


// main func
function main() {
    console.log("db host:");
  console.log(config.db_host);

    connect_db();

    interval(function(){
        do_schedule();
    }, 5000, undefined);

    // Clean up
  ON_DEATH(function(signal, err) {
    console.log();
    console.log("-- script interupted --");
    console.log("close db");

    // close db
    close_db();    

    process.exit();
  });

}


// run main func
main();

1 个解决方案

#1


1  

Your Promise.all() pattern is running all your rows in parallel where completion of the various operations involved in processing them can happen in any order. That's how your code is designed to work.

您的Promise.all()模式正在并行运行所有行,其中处理它们所涉及的各种操作的完成可以按任何顺序进行。这就是您的代码设计工作的方式。

To sequence them so one row runs at a time and the next row is processed after the prior one is completely done, you need to use a different design pattern. A classic way to serialize promises that are processing an array is using .reduce() like this:

要对它们进行排序,以便一次运行一行,并在完成前一行之后处理下一行,则需要使用不同的设计模式。序列化处理数组的promise的经典方法是使用.reduce(),如下所示:

// process each row sequentially
rows.reduce(function(p, row) {
    return p.then(function() {
        return is_schedule_cmd_already_pending(schedule_id).then(function(num) {
            return insert_into_pending_cmd(num, schedule_id, device_name, cmd);
        });
    });
}, Promise.resolve()).then(function(data) {
    // everything done here
}).catch(function(err) {
    // error here
});

This creates an extended promise chain where each row is processed as a step in the promise chain and the next link in the chain doesn't run until the prior one is done.

这将创建一个扩展的promise链,其中每一行都作为promise链中的一个步骤进行处理,并且链中的下一个链接在前一个链完成之前不会运行。


The above scheme works with standard ES6 promises. I personally prefer using the Bluebird promise library which has Promise.mapSeries() which is explicitly designed for this:

上述方案适用于标准的ES6承诺。我个人更喜欢使用具有Promise.mapSeries()的Bluebird promise库,它是为此明确设计的:

const Promise = require('bluebird');

Promise.mapSeries(rows, function(row) {
    return is_schedule_cmd_already_pending(schedule_id).then(function(num) {
        return insert_into_pending_cmd(num, schedule_id, device_name, cmd);
    });
}).then(function(data) {
    // everything done here
}).catch(function(err) {
    // error here
});

FYI, there are lots of issues with error handling in your real code. Promises make async error handling orders of magnitudes easier. If you promisify your lower level operations (or use the promise interface to your database) and then write your control flow and logic only in promise-based code, it will be massively easier to write proper error handling. Lines of code like this:

仅供参考,您的实际代码中存在许多错误处理问题。 Promise使异步错误处理量级的顺序更容易。如果你宣传你的低级操作(或使用数据库的promise接口),然后只在基于promise的代码中编写控制流和逻辑,那么编写正确的错误处理将非常容易。像这样的代码行:

if (err) throw err;

that are inside a plain async callback are NOT going to give you proper error handling. Use promises for everything in your control flow and it will be very easy to propagate and handle async errors. It's actually quite difficult to do this properly with nested plain async callbacks and your code shows several mistakes. Convert all async operations to promises and it will be easy to do it right.

在普通异步回调中的内容不会给你正确的错误处理。对控制流中的所有内容使用promises,传播和处理异步错误非常容易。使用嵌套的普通异步回调正确地执行此操作实际上非常困难,并且您的代码显示了几个错误。将所有异步操作转换为promises,并且很容易做到正确。

#1


1  

Your Promise.all() pattern is running all your rows in parallel where completion of the various operations involved in processing them can happen in any order. That's how your code is designed to work.

您的Promise.all()模式正在并行运行所有行,其中处理它们所涉及的各种操作的完成可以按任何顺序进行。这就是您的代码设计工作的方式。

To sequence them so one row runs at a time and the next row is processed after the prior one is completely done, you need to use a different design pattern. A classic way to serialize promises that are processing an array is using .reduce() like this:

要对它们进行排序,以便一次运行一行,并在完成前一行之后处理下一行,则需要使用不同的设计模式。序列化处理数组的promise的经典方法是使用.reduce(),如下所示:

// process each row sequentially
rows.reduce(function(p, row) {
    return p.then(function() {
        return is_schedule_cmd_already_pending(schedule_id).then(function(num) {
            return insert_into_pending_cmd(num, schedule_id, device_name, cmd);
        });
    });
}, Promise.resolve()).then(function(data) {
    // everything done here
}).catch(function(err) {
    // error here
});

This creates an extended promise chain where each row is processed as a step in the promise chain and the next link in the chain doesn't run until the prior one is done.

这将创建一个扩展的promise链,其中每一行都作为promise链中的一个步骤进行处理,并且链中的下一个链接在前一个链完成之前不会运行。


The above scheme works with standard ES6 promises. I personally prefer using the Bluebird promise library which has Promise.mapSeries() which is explicitly designed for this:

上述方案适用于标准的ES6承诺。我个人更喜欢使用具有Promise.mapSeries()的Bluebird promise库,它是为此明确设计的:

const Promise = require('bluebird');

Promise.mapSeries(rows, function(row) {
    return is_schedule_cmd_already_pending(schedule_id).then(function(num) {
        return insert_into_pending_cmd(num, schedule_id, device_name, cmd);
    });
}).then(function(data) {
    // everything done here
}).catch(function(err) {
    // error here
});

FYI, there are lots of issues with error handling in your real code. Promises make async error handling orders of magnitudes easier. If you promisify your lower level operations (or use the promise interface to your database) and then write your control flow and logic only in promise-based code, it will be massively easier to write proper error handling. Lines of code like this:

仅供参考,您的实际代码中存在许多错误处理问题。 Promise使异步错误处理量级的顺序更容易。如果你宣传你的低级操作(或使用数据库的promise接口),然后只在基于promise的代码中编写控制流和逻辑,那么编写正确的错误处理将非常容易。像这样的代码行:

if (err) throw err;

that are inside a plain async callback are NOT going to give you proper error handling. Use promises for everything in your control flow and it will be very easy to propagate and handle async errors. It's actually quite difficult to do this properly with nested plain async callbacks and your code shows several mistakes. Convert all async operations to promises and it will be easy to do it right.

在普通异步回调中的内容不会给你正确的错误处理。对控制流中的所有内容使用promises,传播和处理异步错误非常容易。使用嵌套的普通异步回调正确地执行此操作实际上非常困难,并且您的代码显示了几个错误。将所有异步操作转换为promises,并且很容易做到正确。