0

This question is similar to Spring Cloud Stream topic per message for different consumers but the difference is that I want multiple Sinks in one consumer springboot application and I want to do this by rabbitmq topic(which is by default in spring cloud stream). I am not able to figure out correct configuration or somethign wrong in code. I have 3 sinks/cosumers. consumer1 is default and every message goes there.

2020欧洲杯时间表**Updated as suggested by Garry **

Comment: my Producer App has routing key='*.events' application.yml

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: my-exchange
      rabbit:
        bindings:
          output:
            producer:
              routing-key-expression: headers['*.events']
  application:
    name: publisher-service
server:
  port: 15010

Producer code snippet Comment:message is sent with routing key ="test.events" . I sm not sure of 2nd argument but i am assuming it is bindingrouting-key =test1.events.billing which means I want it to be delivered to billing consumer besides default consumer.

 source.output().send(MessageBuilder.withPayload(eventRequest.getEventMessage())
                    .setHeader("*.events", "test1.events.billing")
                    .build());

Consumer configuration Comment: I want 3 queues assigned to exchange ="myexchange" . I am not sure if config is right. application.yml

spring:
  cloud:
      stream:
        bindings:
          defaultconsumer:
            destination: my-exchange
            group: queue1
          billingconsumer:
            destination: my-exchange
            group: queue2
          messageconsumer:
            destination: my-exchange
            group: queue3

        rabbit:
          bindings:
            defaultconsumer:
              consumer:
                bindingRoutingKey: '*.events.#'
            billingconsumer:
              consumer:
                bindingRoutingKey: test1.events.billing
            messageconsumer:
              consumer:
                bindingRoutingKey: test2.events.messages

  application:
    name: subscriber-service
server:
  port: 15020

Consumer code: IEventConsumer.java Comment: I am not sure the code below is right

public interface IEventConsumer {
     String INPUT = "my-exchange";

    @Input
    SubscribableChannel defaultconsumer();

    @Input
    SubscribableChannel billingconsumer();

    @Input
    SubscribableChannel messageconsumer();
}

2020欧洲杯时间表EventConsumer.java Comment: All Iwant from below is the message should not be received my messsageConsumer! But in reality it goes thru all these methods.



    @StreamListener("defaultconsumer")
    public void subscribe1(EventMessage eventMessage) {
        logger.info(" DefaultEventConsumer received new event [" + eventMessage.toString() + "] ");
    }


   @StreamListener("billingconsumer")
    public void subscribe2(EventMessage eventMessage) {
        logger.info(" billingEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

    @StreamListener("messageconsumer")
    public void subscribe3(EventMessage eventMessage) {
        logger.info(" messageEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

Apparently something is wrong above and I dont see this working .Any ideas?

2
    @Input(INPUT)
    SubscribableChannel defaultconsumer();

    @Input(INPUT)
    SubscribableChannel billingconsumer();

    @Input(INPUT)
    SubscribableChannel messageconsumer();

You are giving all three bindings the same name; just use @INPUT and the method name will be used as the binding name.

And

@StreamListener("defaultconsumer")

etc.

EDIT

I just copied your code and it worked fine...

@SpringBootApplication
@EnableBinding({ IEventConsumer.class, Source.class })
public class So60879187Application {

    private static final Logger logger = LoggerFactory.getLogger(So60879187Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So60879187Application.class, args);
    }

    @StreamListener("defaultconsumer")
    public void subscribe1(String eventMessage) {
        logger.info(" DefaultEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

    @StreamListener("billingconsumer")
    public void subscribe2(String eventMessage) {
        logger.info(" billingEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

    @StreamListener("messageconsumer")
    public void subscribe3(String eventMessage) {
        logger.info(" messageEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

    @Bean
    public ApplicationRunner runner(MessageChannel output) {
        return args -> output.send(MessageBuilder.withPayload("foo")
                .setHeader("*.events", "test1.events.billing")
                .build());
    }

}

interface IEventConsumer {
    String INPUT = "my-exchange";

    @Input
    SubscribableChannel defaultconsumer();

    @Input
    SubscribableChannel billingconsumer();

    @Input
    SubscribableChannel messageconsumer();

}
spring:
  cloud:
      stream:
        bindings:
          defaultconsumer:
            destination: my-exchange
            group: queue1
          billingconsumer:
            destination: my-exchange
            group: queue2
          messageconsumer:
            destination: my-exchange
            group: queue3
          output:
            destination: my-exchange

        rabbit:
          bindings:
            defaultconsumer:
              consumer:
                bindingRoutingKey: '*.events.#'
            billingconsumer:
              consumer:
                bindingRoutingKey: test1.events.billing
            messageconsumer:
              consumer:
                bindingRoutingKey: test2.events.messages
            output:
              producer:
                routing-key-expression: headers['*.events']

  application:
    name: subscriber-service
server:
  port: 15020

and

2020-03-27 09:45:33.607  INFO 30366 --- [change.queue1-1] com.example.demo.So60879187Application   
  :  DefaultEventConsumer received new event [foo] 
2020-03-27 09:45:33.607  INFO 30366 --- [change.queue2-1] com.example.demo.So60879187Application   
  :  billingEventConsumer received new event [foo] 

EDIT2

2020欧洲杯时间表Newer functional programming model equivalent...

@SpringBootApplication
public class So608791871Application {

    private static final Logger logger = LoggerFactory.getLogger(So608791871Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So608791871Application.class, args);
    }

    @Bean
    public Consumer<String> defaultconsumer() {
        return eventMessage ->
                logger.info(" DefaultEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

    @Bean
    public Consumer<String> billingconsumer() {
        return eventMessage ->
                logger.info(" billingEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

    @Bean
    public Consumer<String> messageconsumer() {
        return eventMessage ->
                logger.info(" messageEventConsumer received new event [" + eventMessage.toString() + "] ");
    }

    private final DirectProcessor<Message<?>> output = DirectProcessor.create();

    @Bean
    public Supplier<Flux<Message<?>>> output() {
        return () -> this.output;
    }

    @Bean
    public ApplicationRunner runner() {
        Message<String> msg1 = MessageBuilder.withPayload("foo")
                .setHeader("*.events", "test1.events.billing")
                .build();
        Message<String> msg2 = MessageBuilder.withPayload("bar")
                .setHeader("*.events", "test2.events.messages")
                .build();
        return args -> {
            this.output.onNext(msg1);
            this.output.onNext(msg2);
        };
    }

}
spring:
  cloud:
    function:
      definition: defaultconsumer;billingconsumer;messageconsumer;output
    stream:
      bindings:
        defaultconsumer-in-0:
          destination: my-exchange
          group: queue1
        billingconsumer-in-0:
          destination: my-exchange
          group: queue2
        messageconsumer-in-0:
          destination: my-exchange
          group: queue3
        output-out-0:
          destination: my-exchange

      rabbit:
        bindings:
          defaultconsumer-in-0:
            consumer:
              bindingRoutingKey: '*.events.#'
          billingconsumer-in-0:
            consumer:
              bindingRoutingKey: test1.events.billing
          messageconsumer-in-0:
            consumer:
              bindingRoutingKey: test2.events.messages
          output-out-0:
            producer:
              routing-key-expression: headers['*.events']

  application:
    name: subscriber-service
server:
  port: 15020

and

2020-03-27 14:28:37.426  INFO 3646 --- [change.queue3-1] com.example.demo.So608791871Application
  :  messageEventConsumer received new event [bar] 
2020-03-27 14:28:37.426  INFO 3646 --- [change.queue1-1] com.example.demo.So608791871Application
  :  DefaultEventConsumer received new event [foo] 
2020-03-27 14:28:37.426  INFO 3646 --- [change.queue2-1] com.example.demo.So608791871Application
  :  billingEventConsumer received new event [foo] 
2020-03-27 14:28:37.429  INFO 3646 --- [change.queue1-1] com.example.demo.So608791871Application
  :  DefaultEventConsumer received new event [bar] 
|improve this answer|||||
  • I updated (please see above). Now the message is not even arriving and In rabbitmq console, I now see 4 exchanges : 1 from producer myexchange(no bindings i) and other 3 are defaultconsumer,messageconsumer and billingconsumer.The consumer exchanges have bindings like defaultconsumer.anonymous.VZ94MTkTSaaDPLXuhtgcIA etc. for each ones. I think binding config is wrong. not able to figure out. – Vivek Misra Mar 27 at 6:58
  • Anonymous bindings mean that the group property is not being applied for some reason. – Gary Russell Mar 27 at 13:23
  • I just copied your code and configuration and it works fine for me. – Gary Russell Mar 27 at 13:49
  • Thnx,Garry for our time .I need to look closely what silly mistake I am doing. – Vivek Misra Mar 27 at 17:12
  • Once I get it fixed, i will update the post and then accept – Vivek Misra Mar 27 at 17:19

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service, privacy policy and cookie policy

Not the answer you're looking for? Browse other questions tagged or ask your own question.