如何在ActiveJobs中检查队列?

时间:2022-10-30 08:22:42

Before enqueueing a job I would like to inspect the queue and see if a job already exists in the queue with the exact same arguments and in that case not enqueue the job. But I cannot find out how I should be able to do this. Is it possible?

在对作业进行排队之前,我希望检查队列,看看是否有一个作业已经存在于具有相同参数的队列中,并且在这种情况下不会对作业进行排队。但我不知道该怎么做。是可能的吗?

I know I can easily do it in my tests with the use of the TestHelper. TestHelper relies on the TestAdapter which we of course does not use in the production environment.

我知道我可以很容易地在我的测试中使用TestHelper。TestHelper依赖于TestAdapter,我们当然不会在生产环境中使用它。

A little bit more background. In our API we retrieve the clients version number in each request. We use Intercom for support and want to present the app version in Intercom so we can see what version our customers use when addressing support issues. But to limit the number of calls to Intercom I delay each posting to Intercom a couple of minutes and while a post is enqueued I do not want to enqueue new ones with the same data.

多一点背景知识。在我们的API中,我们在每个请求中检索客户机版本号。我们使用对讲机进行支持,并希望在对讲机中显示应用程序版本,这样我们就可以看到客户在解决支持问题时使用的是什么版本。但是,为了限制对讲机的调用数量,我将每个贴子延迟到对讲机上几分钟,并且当一个贴子被放入队列时,我不希望对具有相同数据的新贴子进行排队。

My question is related to List queued tasks with ActiveJob AsyncAdapter but that question only deals with the number of enqueued jobs.

我的问题与使用ActiveJob AsyncAdapter列出排队任务有关,但这个问题只涉及排队任务的数量。

Efficiently reschedule ActiveJob (resque/sidekiq) indicates that this is not possible and I would need to implement the solution separately.

有效地重新安排ActiveJob (resque/sidekiq)表明这是不可能的,我需要分别实现这个解决方案。

Can I somehow inspect a queue and the jobs in it with ActiveJobs or do I need to keep track of what I have queued and what has been performed?

我是否可以使用ActiveJobs检查队列和其中的作业,或者我是否需要跟踪我已排队的内容和已执行的内容?

1 个解决方案

#1


2  

The following is an unfinished implementation. It only supports Sidekiq at the moment (I assumed you're using Sidekiq).

下面是未完成的实现。它现在只支持Sidekiq(我假设您在使用Sidekiq)。

Note: Have only partially tested this, so you might need to update some things below. I'll check this fully later if I get the time.

注意:只对其进行了部分测试,因此您可能需要更新下面的一些内容。如果我有时间的话,我以后会详细检查的。

class ActiveJob::Base
  def self.where(jid: nil, arguments: [])
    found_jobs = []

    job_adapter = Rails.application.config.active_job.queue_adapter

    case job_adapter

    when :sidekiq
      queue = Sidekiq::Queue.new(queue_name)

      if jid.present?
        job = queue.find_job(jid)

        if job
          # if arguments supplied, check also if args match
          if arguments.present?
            should_disregard_job_class = self == ActiveJob::Base
            job_has_same_class = self.to_s == job.args[0]['job_class']
            job_has_same_arguments = arguments == job.args[0]['arguments'][3..-1]

            if (should_disregard_job_class || job_has_same_class) && job_has_same_arguments
              found_jobs << job
            end

          else
            found_jobs << job
          end
        end

      else
        # TODO: optimise below as it is slow
        queue.each do |job|
          should_disregard_job_class = self == ActiveJob::Base
          job_has_same_class = self.to_s == job.args[0]['job_class']
          job_has_same_arguments = arguments == job.args[0]['arguments'][3..-1]

          if (should_disregard_job_class || job_has_same_class) && job_has_same_arguments
            found_jobs << job
          end 
        end
      end

    when :resque
      # TODO
    else
      raise "TODO: missing Adapter implementation for #{job_adapter}"
    end

    found_jobs
  end
end

Usage

jobs = ActiveJob::Base.where(jid: '12345678abcdef')
jobs = MyCustomJob.where(jid: '12345678abcdef')
jobs = MyCustomJob.where(arguments: ['firstjobargumentvalue', 'secondjobargumentvalue'])

#1


2  

The following is an unfinished implementation. It only supports Sidekiq at the moment (I assumed you're using Sidekiq).

下面是未完成的实现。它现在只支持Sidekiq(我假设您在使用Sidekiq)。

Note: Have only partially tested this, so you might need to update some things below. I'll check this fully later if I get the time.

注意:只对其进行了部分测试,因此您可能需要更新下面的一些内容。如果我有时间的话,我以后会详细检查的。

class ActiveJob::Base
  def self.where(jid: nil, arguments: [])
    found_jobs = []

    job_adapter = Rails.application.config.active_job.queue_adapter

    case job_adapter

    when :sidekiq
      queue = Sidekiq::Queue.new(queue_name)

      if jid.present?
        job = queue.find_job(jid)

        if job
          # if arguments supplied, check also if args match
          if arguments.present?
            should_disregard_job_class = self == ActiveJob::Base
            job_has_same_class = self.to_s == job.args[0]['job_class']
            job_has_same_arguments = arguments == job.args[0]['arguments'][3..-1]

            if (should_disregard_job_class || job_has_same_class) && job_has_same_arguments
              found_jobs << job
            end

          else
            found_jobs << job
          end
        end

      else
        # TODO: optimise below as it is slow
        queue.each do |job|
          should_disregard_job_class = self == ActiveJob::Base
          job_has_same_class = self.to_s == job.args[0]['job_class']
          job_has_same_arguments = arguments == job.args[0]['arguments'][3..-1]

          if (should_disregard_job_class || job_has_same_class) && job_has_same_arguments
            found_jobs << job
          end 
        end
      end

    when :resque
      # TODO
    else
      raise "TODO: missing Adapter implementation for #{job_adapter}"
    end

    found_jobs
  end
end

Usage

jobs = ActiveJob::Base.where(jid: '12345678abcdef')
jobs = MyCustomJob.where(jid: '12345678abcdef')
jobs = MyCustomJob.where(arguments: ['firstjobargumentvalue', 'secondjobargumentvalue'])