package com.bcxin.ars.rabbitmq; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * 消息者 * @author linqinglin * @date 2018/12/15 0015 10:39 */ @Component public class RabbitConsumer implements ChannelAwareMessageListener { private Logger logger = LoggerFactory.getLogger(RabbitConsumer.class); @Override public void onMessage(Message message, Channel channel) throws Exception { String body = new String(message.getBody(), "UTF-8"); logger.info("消息内容:::::::::::"+body); logger.info(message.getMessageProperties().getConsumerQueue()); boolean mqFlag=true;//业务处理 //还有一个点就是如何获取mq消息的报文部分message? if(mqFlag){ basicACK(message,channel);//处理正常--ack }else{ basicNACK(message,channel);//处理异常--nack } } //正常消费掉后通知mq服务器移除此条mq private void basicACK(Message message,Channel channel){ try{ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }catch(IOException e){ logger.info("通知服务器移除mq时异常,异常信息:"+e); } } //处理异常,mq重回队列 private void basicNACK(Message message,Channel channel){ try{ channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); }catch(IOException e){ logger.info("mq重新进入服务器时出现异常,异常信息:"+e); } } }