-1

I am new to Rabbitmq and i would like to build a non-blocking consumer. i followed How to build a nonblocking Consumer when using AsyncRabbitTemplate with Request/Reply Pattern and created an application and deployed in local. But i get AmqpReplyTimeoutException .

Here i attach some log :


2020-03-30 23:13:33,781 DEBUG [main] RabbitTemplate []>: Publishing message [(Body:'test' MessageProperties [headers={}, correlationId=a6a94f00-c5a3-4f41-a5b9-69293bc6d153, replyTo=amq.rabbitmq.reply-to, contentType=text/plain, contentEncoding=UTF-8, contentLength=4, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])] on exchange [], routingKey = [tax.webflux.reactor.determine]

AmqpReplyTimeoutException [Reply timed out, requestMessage=(Body:'test' MessageProperties [headers={}, correlationId=a6a94f00-c5a3-4f41-a5b9-69293bc6d153, replyTo=amq.rabbitmq.reply-to, contentType=text/plain, contentEncoding=UTF-8, contentLength=4, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])]
    at org.springframework.amqp.rabbit.AsyncRabbitTemplate$RabbitFuture$TimeoutTask.run(AsyncRabbitTemplate.java:757)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask.run(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

Here is my class :


import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.annotation.PostConstruct;

import org.springframework.amqp.core.Address;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate.RabbitConverterFuture;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;

import com.rabbitmq.client.Channel;

@ComponentScan("com.sap.slh.tax.*")
@SpringBootApplication
@EnableAsync
public class SpringwebfluxdemoApplication {
    private final ExecutorService exec = Executors.newCachedThreadPool();

    @Autowired
    private RabbitTemplate template;

    @Autowired
    private AmqpAdmin amqpAdmin;

    @Bean
    public ApplicationRunner runner(AsyncRabbitTemplate asyncTemplate) {
        return args -> {
            RabbitConverterFuture<Object> future = asyncTemplate.convertSendAndReceive("tax.webflux.reactor.determine", "test");
            future.addCallback(r -> {
                System.out.println("Reply: " + r);
            }, t -> {
                t.printStackTrace();
            });
        };
    }

    @Bean
    public AsyncRabbitTemplate asyncTemplate(RabbitTemplate template) {
        return new AsyncRabbitTemplate(template);
    }

    @PostConstruct
    public void initializeQueue() {
        TopicExchange taxServiceTopicExchange = (TopicExchange) ExchangeBuilder.topicExchange("tax.webflux.reactor.TAXSERVICE")
                .durable(true).build();
        Queue taxAttributesDeterminationQueue = QueueBuilder.durable("tax.webflux.reactor.queue").build();
        Binding binding = BindingBuilder.bind(taxAttributesDeterminationQueue).to(taxServiceTopicExchange)
                .with("tax.webflux.reactor.determine");
        amqpAdmin.declareExchange(taxServiceTopicExchange);
        amqpAdmin.declareQueue(taxAttributesDeterminationQueue);
        amqpAdmin.declareBinding(binding);
    }

    @RabbitListener(queues = "tax.webflux.reactor.queue")
    public void listen(String in, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag,
            @Header(AmqpHeaders.CORRELATION_ID) String correlationId,
            @Header(AmqpHeaders.REPLY_TO) String replyTo) {

        ListenableFuture<String> future = handleInput(in);
        future.addCallback(result -> {
            Address address = new Address(replyTo);
            this.template.convertAndSend(address.getExchangeName(), address.getRoutingKey(), result, m -> {
                m.getMessageProperties().setCorrelationId(correlationId);
                return m;
            });
            try {
                channel.basicAck(tag, false);
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }, t -> {
            t.printStackTrace();
        });
    }

    private ListenableFuture<String> handleInput(String in) {
        SettableListenableFuture<String> future = new SettableListenableFuture<String>();
        exec.execute(() -> {
            try {
                Thread.sleep(2000);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            future.set(in.toUpperCase());
        });
        return future;
    }


    public static void main(String[] args) {
        SpringApplication.run(SpringwebfluxdemoApplication.class, args);
    }

}

2020欧洲杯时间表My application.yml file:

---
spring:
  rabbitmq:
    host: "${vcap.services.rabbitmq.credentials.hostname:localhost}"
    password: "${vcap.services.rabbitmq.credentials.password:guest}"
    port: "${vcap.services.rabbitmq.credentials.port:5672}"
    username: "${vcap.services.rabbitmq.credentials.username:guest}"
    virtual_host: "${vcap.services.rabbitmq.credentials.virtual_host:/}"

2020欧洲杯时间表Any help to solve this would be appreciated. Thanks in advance.

0

That answer is 18 months old; the framework now supports the listener method returning a Mono<?> or Future<?>.

See .

Then you don't need all that code in the listener - just complete the returned value.

| improve this answer | |

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service, privacy policy and cookie policy

Not the answer you're looking for? Browse other questions tagged or ask your own question.