hbase thrift 访问队列

时间:2023-03-09 16:35:56
hbase thrift 访问队列

public class CallQueue implements BlockingQueue<Runnable> {

  private static Log LOG = LogFactory.getLog(CallQueue.class);



  private final BlockingQueue<Call> underlyingQueue;

  private final ThriftMetrics metrics;



  public CallQueue(BlockingQueue<Call> underlyingQueue,

                   ThriftMetrics metrics) {

    this.underlyingQueue = underlyingQueue;

    this.metrics = metrics;

  }



  private static long now() {

    return System.nanoTime();

  }

//访问线程

public static class Call implements Runnable {

    final long startTime;

    final Runnable underlyingRunnable;



    Call(Runnable underlyingRunnable) {

      this.underlyingRunnable = underlyingRunnable;

      this.startTime = now();

    }



    @Override

    public void run() {

      underlyingRunnable.run();

    }



    public long timeInQueue() {

      return now() - startTime;

    }



    @Override

    public boolean equals(Object other) {

      if (other instanceof Call) {

        Call otherCall = (Call)(other);

        return this.underlyingRunnable.equals(otherCall.underlyingRunnable);

      } else if (other instanceof Runnable) {

        return this.underlyingRunnable.equals(other);

      }

      return false;

    }



    @Override

    public int hashCode() {

      return this.underlyingRunnable.hashCode();

    }

  }

//在队列中获取默认Runnable

  @Override

  public Runnable poll() {

    Call result = underlyingQueue.poll();

    updateMetrics(result);

    return result;

  }



  private void updateMetrics(Call result) {

    if (result == null) {

      return;

    }

    metrics.incTimeInQueue(result.timeInQueue());

    metrics.setCallQueueLen(this.size());

  }

//在队列中获取Runnable

  @Override

  public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {

    Call result = underlyingQueue.poll(timeout, unit);

    updateMetrics(result);

    return result;

  }

 //队列中删除runnable

  @Override

  public Runnable remove() {

    Call result = underlyingQueue.remove();

    updateMetrics(result);

    return result;

  }



  @Override

  public Runnable take() throws InterruptedException {

    Call result = underlyingQueue.take();

    updateMetrics(result);

    return result;

  }

//添加到队列中

@Override

  public int drainTo(Collection<? super Runnable> destination) {

    return drainTo(destination, Integer.MAX_VALUE);

  }

//添加到队列中

@Override

  public int drainTo(Collection<? super Runnable> destination,

                     int maxElements) {

    if (destination == this) {

      throw new IllegalArgumentException(

          "A BlockingQueue cannot drain to itself.");

    }

    List<Call> drained = new ArrayList<Call>();

    underlyingQueue.drainTo(drained, maxElements);

    for (Call r : drained) {

      updateMetrics(r);

    }

    destination.addAll(drained);

    int sz = drained.size();

    LOG.info("Elements drained: " + sz);

    return sz;

  }

//队列中是否能提供call

@Override

  public boolean offer(Runnable element) {

    return underlyingQueue.offer(new Call(element));

  }



  @Override

  public boolean offer(Runnable element, long timeout, TimeUnit unit)

      throws InterruptedException {

    return underlyingQueue.offer(new Call(element), timeout, unit);

  }

@Override

public void put(Runnable element) throws InterruptedException {

    underlyingQueue.put(new Call(element));

  }



  @Override

  public boolean add(Runnable element) {

    return underlyingQueue.add(new Call(element));

  }



  @Override

  public boolean addAll(Collection<? extends Runnable> elements) {

    int added = 0;

    for (Runnable r : elements) {

      added += underlyingQueue.add(new Call(r)) ? 1 : 0;

    }

    return added != 0;

  }



  @Override

  public Runnable element() {

    return underlyingQueue.element();

  }



  @Override

  public Runnable peek() {

    return underlyingQueue.peek();

  }

//清空队列

  @Override

  public void clear() {

    underlyingQueue.clear();

  }



  @Override

  public boolean containsAll(Collection<?> elements) {

    return underlyingQueue.containsAll(elements);

  }



  @Override

  public boolean isEmpty() {

    return underlyingQueue.isEmpty();

  }



  @Override

  public Iterator<Runnable> iterator() {

    return new Iterator<Runnable>() {

      final Iterator<Call> underlyingIterator = underlyingQueue.iterator();

      @Override

      public Runnable next() {

        return underlyingIterator.next();

      }



      @Override

      public boolean hasNext() {

        return underlyingIterator.hasNext();

      }



      @Override

      public void remove() {

        underlyingIterator.remove();

      }

    };

  }



  @Override

  public boolean removeAll(Collection<?> elements) {

    return underlyingQueue.removeAll(elements);

  }



  @Override

  public boolean retainAll(Collection<?> elements) {

    return underlyingQueue.retainAll(elements);

  }



  @Override

  public int size() {

    return underlyingQueue.size();

  }



  @Override

  public Object[] toArray() {

    return underlyingQueue.toArray();

  }



  @Override

  public <T> T[] toArray(T[] array) {

    return underlyingQueue.toArray(array);

  }



  @Override

  public boolean contains(Object element) {

    return underlyingQueue.contains(element);

  }



  @Override

  public int remainingCapacity() {

    return underlyingQueue.remainingCapacity();

  }



  @Override

  public boolean remove(Object element) {

    return underlyingQueue.remove(element);

  }

}

相关文章