9.1 消息队列选型对比
面试重要程度:⭐⭐⭐⭐⭐
常见提问方式: "你们项目中用的什么消息队列?为什么选择它?"
技术深度: 架构选型、性能对比、可靠性保证
预计阅读时间:30分钟
🎯 消息队列核心概念
什么是消息队列
消息队列(Message Queue,MQ)是一种应用程序间的通信方法,通过在消息的传输过程中保存消息的容器来实现。
核心作用:
- 解耦:降低系统间的耦合度
- 异步:提升系统响应速度
- 削峰:应对流量高峰
- 可靠性:保证消息不丢失
消息队列基本模型
/**
* 点对点模式(Queue)
*/
@RabbitListener(queues = "order.queue")
public void processOrder(OrderMessage message) {
log.info("Processing order: {}", message.getOrderId());
// 只有一个消费者能收到这条消息
}
/**
* 发布订阅模式(Topic)
*/
@RabbitListener(queues = "user.register.email")
public void sendEmail(UserRegisterEvent event) {
log.info("Sending welcome email to: {}", event.getEmail());
}
@RabbitListener(queues = "user.register.sms")
public void sendSMS(UserRegisterEvent event) {
log.info("Sending welcome SMS to: {}", event.getPhone());
}
🔄 主流消息队列对比
RabbitMQ详解
特点:
- 基于AMQP协议,Erlang开发
- 功能丰富,管理界面友好
- 支持多种消息模式
核心配置:
@Configuration
public class RabbitMQConfig {
// Direct Exchange - 精确匹配
@Bean
public DirectExchange orderDirectExchange() {
return new DirectExchange("order.direct");
}
// Topic Exchange - 模式匹配
@Bean
public TopicExchange userTopicExchange() {
return new TopicExchange("user.topic");
}
// 队列声明
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.process")
.withArgument("x-message-ttl", 60000)
.withArgument("x-max-length", 10000)
.build();
}
}
@Service
public class RabbitMQProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendOrderMessage(OrderMessage message) {
MessageProperties properties = new MessageProperties();
properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
properties.setExpiration("60000");
Message rabbitMessage = new Message(
JSON.toJSONBytes(message), properties);
rabbitTemplate.send("order.direct", "order.create", rabbitMessage);
}
}
可靠性保证:
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// 生产者确认
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("Message sent successfully: {}", correlationData);
} else {
log.error("Message send failed: {}", cause);
}
});
// 消息返回确认
template.setReturnsCallback(returned -> {
log.error("Message returned: {}", returned.getMessage());
});
return template;
}
@RabbitListener(queues = "order.queue")
public void processOrder(OrderMessage message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
orderService.processOrder(message);
channel.basicAck(deliveryTag, false); // 手动确认
} catch (Exception e) {
channel.basicNack(deliveryTag, false, true); // 重新入队
}
}
Kafka详解
特点:
- 高吞吐量、低延迟
- 分布式、可扩展
- 适合大数据场景
生产者配置:
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
return new DefaultKafkaProducerFactory<>(props);
}
}
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public void sendOrderEvent(OrderEvent event) {
kafkaTemplate.send("order-events", event.getOrderId(), event)
.addCallback(
result -> log.info("Order event sent: {}", event),
failure -> log.error("Send failed: {}", event, failure)
);
}
}
消费者配置:
@KafkaListener(topics = "order-events", groupId = "order-service")
public void handleOrderEvent(OrderEvent event,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
log.info("Received: partition={}, offset={}, event={}",
partition, offset, event);
orderService.processOrderEvent(event);
}
// 批量消费
@KafkaListener(topics = "user-behavior",
containerFactory = "batchKafkaListenerContainerFactory")
public void handleBatch(List<UserBehaviorEvent> events) {
log.info("Processing batch of {} events", events.size());
analyticsService.processBatch(events);
}
RocketMQ详解
特点:
- 阿里开源,Java开发
- 支持事务消息、顺序消息
- 丰富的消息类型
基本使用:
@Service
public class RocketMQProducerService {
@Autowired
private DefaultMQProducer producer;
// 同步发送
public void sendSyncMessage(String topic, String tag, Object message) {
try {
Message msg
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
Java面试圣经 文章被收录于专栏
Java面试圣经,带你练透java圣经
