2

I am a consumer for rabbitmq and using spring-amqp. Right now when i go into admin, all the connections are showing running but channels inside them are all idle (Prefetch:250, Unacked:250) . Could you Please help? How to use this prefetch properly ? Do i need to close connections ? How can I increase channels per connection . Right now there is only one channel per connection. Following is the code configuration snippet. I am using out if the box spring amqp configuration for most of the things .Also I am using a custom rabbitmq message listener to ack or unack messages.

<!-- RabbitMQ configuration -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.vhosts}" requested-heartbeat="${rabbitmq.requestedHeartBeat}"/>

    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" message-converter="rabbitJsonConverter"/>
    channel.Close
    <bean id="rabbitJsonConverter" class="rabbitmq.messages.converter.CustomJackson2JsonMessageConverter">
        <property name="classMapper">
            <bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
                <property name="defaultType" value="rabbitmq.messages.custom.dto.CustomRabbitMQMessage"/>
            </bean>
        </property>
    </bean>

    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" requeue-rejected="true">
        <rabbit:listener queue-names="${rabbitmq.queuename}" ref="customRabbitMQMessageListener" method="onMessage"/>
    </rabbit:listener-container>

<bean id="customRabbitMQMessageListener" class="rabbitmq.messages.listener.CustomRabbitMQMessageListener" >
        <property name="customerAccountService" ref="customerAccountService" />
    </bean>

**Listener Code**
LOG.debug("***** LISTENING RABBITMQ MESSAGES START******");
        channel.basicRecover(true);
        try {
                boolean ack = performOperationsOnMessage(msg);
                if (ack) {
                    channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
                } else
                    channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);
                LOG.debug("***** LISTENING RABBITMQ MESSAGES FINISHED******");
        } catch (Exception exp) {
            LOG.error("Exception occured during perform Change operation, RabbitMQ message: " + exp.getMessage(), exp);
            channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);
        }

private boolean performOperationsOnMessage(Message msg) {
        RabbitMQMessage message = null;
            try {
                message = (RabbitMQMessage) rabbitJsonConverter.fromMessage(msg);
            } catch (MessageConversionException exp) {
                LOG.warn("Exception occurred during the conversion or any other issue", exp);
                return true;
            }
        if (message == null || message.getOperation() == null || message.getResource() == null || message.getResource().getUuid() == null) {
            LOG.warn("Received an empty message  or emptry operation or empty resource or empty uuid from queue ");
            return true;
        }
        if (message.getOperation().equals(RabbitMQMessage.RossoOperation.remove.name())) {
            return performRemoveOperation(message);
        }
        if (message.getOperation().equals(RabbitMQMessage.RossoOperation.change.name())) {
            return performChangeOperation(message);
        }
        return true;
    }
  • Show, please, a code of your CustomRabbitMQMessageListener. What does it do? Do you really call channel.basicAck() ? – Artem Bilan Mar 25 at 15:38
  • You haven't described a problem - the numbers just mean there are 250 messages outstanding at each consumer. If you are not seeing progress, it means you have failed to ack or nack those messages. The broker will only allow prefetch unack'd messages. The word idle here doesn't mean anything - you'll have to ask the RabbitMQ engineers what it means (rabbitmq-users Google Group). It's not clear what you mean by "one channel per connection". As you can see, each consumer has its own channel. – Gary Russell Mar 25 at 15:39
  • @ArtemBilan : yes what I do is in the CustomRabbitMQMessageListener is that if I have performed the required action successfully on the message, then I do channel.basicAck() and in cases where I get internal exceptions or due to some constraints, I want to process that message later again, then i send channel.basicNack() – SANKALP sharma Mar 25 at 23:03
  • @GaryRussell - If I failed to acknowledge these messages and they have reached count where prefetch is equal to unacked messages, what should I do here to ack or nack these messages ? . Since rabbimq stops sending messages to channel once prefecth=unack. Also official rabbitmq messages say the following: – SANKALP sharma Mar 25 at 23:13
  • 1
    @GaryRussell , Artem : By the way, fixing the listener worked. Now the queue is completely consumed and it is empty. I will update the answer. Kindly accept it. Thanks guys for your suggestions : ) – SANKALP sharma Mar 28 at 20:07
2

2020欧洲杯时间表So the solution for this problem was in the listener code that was configured for manual acknowledgement. There were some branches in the logic which were leaving the listener unable to acknowledge some messages and that's how unacked count on the channels reached prefetch (250) leaving RabbitMQ stopping sending messages to the channels.

2020欧洲杯时间表Fix: As you would see the updated listener code in the question, it never leaves any message unacknowledged. Also in the negative acknowledgement channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true), requeue (last variable in the signature) should be true so that messages can be requeued back to the same queue

| improve this answer | |

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service, privacy policy and cookie policy

Not the answer you're looking for? Browse other questions tagged or ask your own question.