如何限制Google Cloud Pub / Sub Queue

时间:2022-08-07 15:26:10

I'm using Google's Pub/Sub queue to handle messages between services. Some of the subscribers connect to rate-limit APIs.

我正在使用Google的Pub / Sub队列来处理服务之间的消息。一些订户连接到速率限制API。

For example, I'm pushing street addresses onto a pub/sub topic. I have a Cloud function which subscribes (via push) to that topic, and calls out to an external rate-limited geocoding service. Ideally, my street addresses could be pushed onto the topic with no delay, and the topic would retain those messages - calling the subscriber in a rate-limited fashion.

例如,我正在将街道地址推送到pub / sub主题。我有一个Cloud功能,可以订阅(通过推送)到该主题,并呼叫外部速率限制地理编码服务。理想情况下,我的街道地址可以毫不拖延地推到主题上,主题将保留这些消息 - 以限速的方式呼叫用户。

Is there anyway to configure such a delay, or a message distribution rate limit? Increasing the Ack window doesn't really help: I've architected this system to prevent long-running functions.

无论如何配置这样的延迟或消息分发率限制?增加Ack窗口并没有多大帮助:我已经构建了这个系统以防止长时间运行的功能。

1 个解决方案

#1


2  

An aproach to solve your problem is by using: async.queue

解决问题的方法是使用:async.queue

There you have a concurrency attribute wich you can manage the rate limit.

你有一个并发属性,你可以管理速率限制。

// create a queue object with concurrency 2
var q = async.queue(function(task, callback) {
    console.log('hello ' + task.name);
    callback();
}, 2);

// assign a callback
q.drain = function() {
    console.log('all items have been processed');
};

// add some items to the queue
q.push({name: 'foo'}, function(err) {
    console.log('finished processing foo');
});

// quoted from async documentation

#1


2  

An aproach to solve your problem is by using: async.queue

解决问题的方法是使用:async.queue

There you have a concurrency attribute wich you can manage the rate limit.

你有一个并发属性,你可以管理速率限制。

// create a queue object with concurrency 2
var q = async.queue(function(task, callback) {
    console.log('hello ' + task.name);
    callback();
}, 2);

// assign a callback
q.drain = function() {
    console.log('all items have been processed');
};

// add some items to the queue
q.push({name: 'foo'}, function(err) {
    console.log('finished processing foo');
});

// quoted from async documentation