MQ 05 批量消息问题

2019/07/24 MQ 共 1627 字,约 5 分钟
MiniPa

P-C 生产者/消费者模型最大痛点:何时进行消费

1.大多消费动作都是事件驱动的
  • 1.攒够了一定数量 ==> 定量
  • 2.到达一定时间 ==> 定时
  • 3.队列里来了新数据 ==> 适合及时性要求高场景
    如客户端向服务端投递数据,只要队列有数据,就刷出数据,否则挂起等待
// 往外刷数据过程中,又得到第二批数据,伪代码如下
Executor executor = Executors.newFixedThreadPool(n);
final BlockingQueue<Message> queue = new ArrayBlockingQueue<>();

private Runnable task = new Runnable({

    //这里由于共享队列,Runnable可以复用,故做成全局的
   public void run(){
      List<Message> messages  = new ArrayList<>(20);
      queue.drainTo(messages20);
      doSend(messages);
	//阻塞,在这个过程中会有新的消息到来,如果4个线程都占满,
	// 队列就有机会囤新的消息
   }
});

public void send(Message message){
    queue.offer(message);
    executor.submit(task)
}

// 这种方式是消息延迟和批量的一个比较好的平衡,但优先响应低延迟。
// 延迟的最高程度由上一次发送的等待时间决定。
// 但可能造成的问题是发送过快的话批量的大小不够满足性能的极致。

Executor executor = Executors.newFixedThreadPool(4);
final BlockingQueue<Message> queue = new ArrayBlockingQueue<>();

volatile long last = System.currentMills();
Executors.newSingleThreadScheduledExecutor().submit(new Runnable(){
   flush();
}500500TimeUnits.MILLS);

private Runnable task = new Runnable({
	//这里由于共享队列,Runnable可以复用,顾做成全局的。
   public void run(){
      List<Message> messages  = new ArrayList<>(20);
      queue.drainTo(messages20);
      doSend(messages);
//阻塞,在这个过程中会有新的消息到来,如果4个线程都占满,队列就有机会屯新的消息。
   }
});

public void send(Message message){
    last = System.currentMills();
    queue.offer(message);
    flush();
}

private void flush(){
 if(queue.size>200||System.currentMills()-last>200){
       executor.submit(task)
  }
}

相反,对于可以用适量的延迟来换取高性能的场景来说,用定时/定量二选一的方式可能会更为理想, 既到达一定数量才发送,但如果数量一直达不到,也不能干等,有一个时间上限

具体说来,在上文的 submit 之前,多判断一个时间和数量,并且 Runnable 内部维护一个定时器,避免没有新任务到来时旧的任务永远没有机会触发发送条件。 对于server端的数据落地,使用这种方式就非常方便。

2.网络请求,小包合大包会提高性能,主要原因有两个

  • 减少无谓的请求头,如果你每个请求只有几字节,而头却有几十字节,无疑效率非常低下。

  • 减少回复的ack包个数。把请求合并后,ack包数量必然减少,确认和重发的成本就会降低。

参考:

文档信息

Search

    Table of Contents