P-C 生产者/消费者模型最大痛点:何时进行消费
1.大多消费动作都是事件驱动的
- 1.攒够了一定数量 ==> 定量
- 2.到达一定时间 ==> 定时
- 3.队列里来了新数据 ==> 适合及时性要求高场景
如客户端向服务端投递数据,只要队列有数据,就刷出数据,否则挂起等待
// 往外刷数据过程中,又得到第二批数据,伪代码如下
Executor executor = Executors.newFixedThreadPool(n);
final BlockingQueue<Message> queue = new ArrayBlockingQueue<>();
private Runnable task = new Runnable({
//这里由于共享队列,Runnable可以复用,故做成全局的
public void run(){
List<Message> messages = new ArrayList<>(20);
queue.drainTo(messages,20);
doSend(messages);
//阻塞,在这个过程中会有新的消息到来,如果4个线程都占满,
// 队列就有机会囤新的消息
}
});
public void send(Message message){
queue.offer(message);
executor.submit(task)
}
// 这种方式是消息延迟和批量的一个比较好的平衡,但优先响应低延迟。
// 延迟的最高程度由上一次发送的等待时间决定。
// 但可能造成的问题是发送过快的话批量的大小不够满足性能的极致。
Executor executor = Executors.newFixedThreadPool(4);
final BlockingQueue<Message> queue = new ArrayBlockingQueue<>();
volatile long last = System.currentMills();
Executors.newSingleThreadScheduledExecutor().submit(new Runnable(){
flush();
},500,500,TimeUnits.MILLS);
private Runnable task = new Runnable({
//这里由于共享队列,Runnable可以复用,顾做成全局的。
public void run(){
List<Message> messages = new ArrayList<>(20);
queue.drainTo(messages,20);
doSend(messages);
//阻塞,在这个过程中会有新的消息到来,如果4个线程都占满,队列就有机会屯新的消息。
}
});
public void send(Message message){
last = System.currentMills();
queue.offer(message);
flush();
}
private void flush(){
if(queue.size>200||System.currentMills()-last>200){
executor.submit(task)
}
}
相反,对于可以用适量的延迟来换取高性能的场景来说,用定时/定量二选一的方式可能会更为理想, 既到达一定数量才发送,但如果数量一直达不到,也不能干等,有一个时间上限
具体说来,在上文的 submit 之前,多判断一个时间和数量,并且 Runnable 内部维护一个定时器,避免没有新任务到来时旧的任务永远没有机会触发发送条件。 对于server端的数据落地,使用这种方式就非常方便。
2.网络请求,小包合大包会提高性能,主要原因有两个
减少无谓的请求头,如果你每个请求只有几字节,而头却有几十字节,无疑效率非常低下。
减少回复的ack包个数。把请求合并后,ack包数量必然减少,确认和重发的成本就会降低。
参考:
文档信息
- 本文作者:jiushun.cheng
- 本文链接:https://minipa.github.io/2019/07/24/mq-batch/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)