이 가이드에서는 RabbitMQ에 대한 Spring Cloud Stream의 지원을 사용하는 3가지 Spring Boot application을 개발하여 Cloud Foundry, Kubernetes 및 local machine에 배치합니다.
다른 가이드에서는 Data Flow를 사용하여 이 application을 배포합니다.
application을 수동으로 배포하면 Data Flow에서 자동으로 수행하는 단개를 보다 잘 이해할 수 있습니다.

다음 절에서는 이러한 applicaton을 처음부터 빌드하는 방법에 대해 설명합니다.
원하는 경우 이러한 application의 소스가 포함된 zip 파일을 다운로드 하여 압축을 풀고 deploy 섹션으로 진행할 수 있습니다.

브라우저에서 세 가지 application을 모두 포함하는 프로젝트를 다운로드 할 수 있습니다.
다음 예제와 같이 command line을 사용할 수도 있습니다.

wget https://github.com/spring-cloud/spring-cloud-dataflow-samples/blob/master/dataflow-website/stream-developer-guides/streams/standalone-stream-rabbitmq/dist/usage-cost-stream-rabbit.zip?raw=true -O usage-cost-stream-rabbit.zip

1. Development

우리는 RabbitMQ를 사용하여 통신하는 3개의 Spring Cloud Stream Application을 생성합니다.

시나리오는 고객을 위한 청구서를 작성하는 휴대 전화 회사입니다.
유저는 각 통화마다 duration 과 사용한 data 양이 있습니다.
청구서를 생성하는 프로세스의 일부로 원시 호출 데이터는 call 기간 동안의 비용 및 사용된 데이터의 양에 대한 비용으로 변환되어야 합니다.

call은 duration과 사용한 data 양을 포함한 UsageDetail을 사용하여 모델링됩니다.
bill은 call의 가격(costCall)과 data의 가격(costData)을 포함한 UserCostDetail을 사용하여 모델링 됩니다.

3 가지 streaming application은 다음과 같습니다.

  • UserDetailSender로 명명한 Source application은 userId 와 사용자의 통화 시간(duration)과 data 의 양을 가진 UserDetail을 전송 (UserDetail Json과 같은 객체)

  • UsageCostProcess로 명명한 Processor 는 UserDetail을 소비하여 userId 당 cost of the call과 cost of the data를 계산한 후 UsageCostDetail JSON으로 객체를 보냅니다.

  • UsageCostLogger로 명명한 Sink 는 UserCostDetail 객체를 소비하고 호출 및 데이터의 비용을 기록합니다.

1.1. Source

이 단계에서는 UsageDetailSender source를 만듭니다.

Spring initializer site에서 아래처럼 프로젝트를 생성하세요.

  1. 새 maven 프로젝트를 만듭니다.
    group : io.spring.dataflow.sample
    artifact: usage-detail-sender-rabbit

  2. dependency에 RabbitMQ 를 추가합니다.

  3. dependency에 Cloud Stream 을 추가합니다.

  4. dependency에 Actuator 를 추가합니다.

  5. 만약 Cloud Foundry 가 대상 플렛폼인 경우 dependency에 Spring Cloud Connector 를 추가합니다.

  6. Generate Project 버튼을 클릭하세요.

1.1.1. Business Logic

이제 이 application에 필요한 코드를 생성할 수 있습니다.

  1. io.spring.dataflow.sample.usagedetailsender 패키지에 UserDetail class를 생성합니다.
    UserDetail class는 userId, data, duration 속성을 가집니다.

  2. io.spring.dataflow.sample.usagedetailsender 패키지에 UsageDetailSender class를 생성합니다.
    다음과 같은 형태입니다.

    package io.spring.dataflow.sample.usagedetailsender;
    
    import java.util.Random;
    
    import io.spring.dataflow.sample.domain.UsageDetail;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.scheduling.annotation.EnableScheduling;
    import org.springframework.scheduling.annotation.Scheduled;
    
    @EnableScheduling
    @EnableBinding(Source.class)
    public class UsageDetailSender {
    
        @Autowired
        private Source source;
    
        private String[] users = {"user1", "user2", "user3", "user4", "user5"};
    
        @Scheduled(fixedDelay = 1000)
        public void sendEvents() {
            UsageDetail usageDetail = new UsageDetail();
            usageDetail.setUserId(this.users[new Random().nextInt(5)]);
            usageDetail.setDuration(new Random().nextInt(300));
            usageDetail.setData(new Random().nextInt(700));
            this.source.output().send(MessageBuilder.withPayload(usageDetail).build());
        }
    }
    

@EnableBinding annotation은 application을 messaging middleware에 bind 하려는 것을 나타냅니다.
이 annotation은 하나 이상의 interface를 매개 변수로 사용합니다.
이 경우 출력 채널 이름을 정의하는 Source interface입니다.
RabbitMQ의 경우 output 채널로 전송된 메시지는 TopicExchange 를 사용하여 RabbitMQ message brocker로 전송됩니다.

@EnableSCheduling annotation은 Spring의 Scheduling 기능을 활성화 할 것을 나타내며 @Scheduled annotation이 1초마다 fixedDelay로 수행되도록 합니다.

sendEvents method는 UserDetail을 생성한 후 Source 객체의 output().send() method를 통해 output 채널로 전송합니다.

1.1.2. Configuration

source application을 구성할 때 다음을 설정해야 합니다.

  • data를 publish 할 output binding destination (RabbitMQ exchange) producer

  • consumer applicaton으로 보내지는 메시지가 소비되는 그룹을 보장하게 하는 consumer group을 명시할 requiredGroups

src/main/resources/application.properties 에서 다음과 같은 속성을 추가할 수 있습니다.

spring.cloud.stream.bindings.output.destination=usage-detail
spring.cloud.stream.bindings.output.producer.requiredGroups=usage-cost-consumer
  • spring.cloud.stream.bindings.output.destination 속성은 UserDetailSender 객체의 output을 usage-detail RabbitMQ exchange에 바인딩합니다.

  • spring.cloud.stream.bindings.output.producer.requiredGroups 속성은 RabbitMQ exchange의 usage-detailusage-detail.usage-cost-consumer 에서 소비되도록 합니다.

Durable Queues

기본적으로 Spring Cloud Stream application은 anonymous auto-delete queue를 생성합니다.
이로 인해 producer application이 consumer application보다 먼저 실행된 경우 producer가 저장 및 전달하지 않는 메시지가 발생할 수 있습니다.
따라서 이후에 소비가 되도록 durable queue를 exchange에 연결시켜주어야 합니다.

영구 대기열을 미리 만들고 이를 exchange에 bind하려면 producer application은 다음과 같은 속성이 설정되야 합니다.

spring.cloud.stream.bindings.<channelName>.producer.requiredGroups

requiredGroups 속성은 producer가 메시지 전달을 보장해야하는 쉼표로 구분된 그룹 목록을 허용합니다.
이 속성을 설정하면 <exchange>.<requiredGroup> 형식을 사용하여 영구 대기열이 만들어집니다.

1.1.3. Building

이제 Usage Detail Sender application을 빌드할 수 있습니다.

usage-detail-sender 디렉토리에서 다음 command로 maven project를 빌드합니다.

./mvnw clean package

1.1.4. Testing

Spring cloud Stream은 Spring Cloud Stream application을 테스트하기 위해 spring-cloud-stream-test-support dependency를 제공합니다.
RabbitMQ binder 대신 Test binder를 사용하여 application의 outbound 및 inbound message를 추적하고 테스트합니다.
Test binder는 MessageCollector라 불리는 utility class를 통해 메모리에 메시지를 저장합니다.

UsageDetailSender application을 단위테스트하려면 다음 코드를 추가해보세요.

package io.spring.dataflow.sample.usagedetailsender;

import java.util.concurrent.TimeUnit;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.spring.dataflow.sample.UsageDetail;
import org.json.JSONObject;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.messaging.Message;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.Assert;

import static org.junit.Assert.assertTrue;

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class UsageDetailSenderApplicationTests {

    @Autowired
    private MessageCollector messageCollector;

    @Autowired
    private Source source;

    @Test
    public void contextLoads() {
    }

    @Test
    public void testUsageDetailSender() throws Exception {
        Message message = this.messageCollector.forChannel(this.source.output()).poll(1, TimeUnit.SECONDS);
        String usageDetailJSON = message.getPayload().toString();
        assertTrue(usageDetailJSON.contains("userId"));
        assertTrue(usageDetailJSON.contains("duration"));
        assertTrue(usageDetailJSON.contains("data"));
    }
}
  • contextLoads 테스트 케이스는 application이 성공적으로 시작되었는지 확인합니다.

  • testUsageDetailSender 테스트 케이스는 Test 바인더의 MessageCollector 에 의해 UsageDetailSender 로 전송된 메시지를 수집합니다.

1.2. Processor

이 단계에서는 UsageCostProcessor processor를 생성합니다.

Spring initializer site에서 아래처럼 프로젝트를 생성하세요.

  1. 새 Maven 프로젝트를 만듭니다.
    group: io.spring.dataflow.sample
    artifact: usage-cost-processor-rabbit

  2. dependency에 RabbitMQ 를 추가합니다.

  3. dependency에 Cloud Stream 을 추가합니다.

  4. dependency에 Actuator 를 추가합니다.

  5. 만약 Cloud Foundry 플랫폼인 경우 dependency에 Cloud Connectors 를 추가합니다.

  6. Generate Project 버튼을 클릭하세요.

1.2.1. Business Logic

이제 application에 필요한 코드를 생성할 수 있습니다.

  1. io.spring.dataflow.sample.usagecostprocessor 패키지에 UserDetail class를 생성합니다.
    UserDetail 클래스는 userId, data, duration 속성을 가집니다.

  2. io.spring.dataflow.sample.usagecostprocessor 패키지에 UsageCostDetail class를 생성합니다.
    UserCostDetail class는 userId, callCost, dataCost 속성을 가집니다.

  3. io.spring.dataflow.sample.usagecostprocessor 패키지에 UsageCostProcessor class를 생성합니다.
    UserDetail 메시지를 수신하고 데이터 비용을 계산하고 UserCostDetail 메시지를 보내는 역할을 합니다.
    소스코드는 다음과 같습니다.

    package io.spring.dataflow.sample.usagecostprocessor;
    
    import io.spring.dataflow.sample.UsageCostDetail;
    import io.spring.dataflow.sample.UsageDetail;
    
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Processor;
    import org.springframework.messaging.handler.annotation.SendTo;
    
    @EnableBinding(Processor.class)
    public class UsageCostProcessor {
    
        private double ratePerSecond = 0.1;
    
        private double ratePerMB = 0.05;
    
        @StreamListener(Processor.INPUT)
        @SendTo(Processor.OUTPUT)
        public UsageCostDetail processUsageCost(UsageDetail usageDetail) {
            UsageCostDetail usageCostDetail = new UsageCostDetail();
            usageCostDetail.setUserId(usageDetail.getUserId());
            usageCostDetail.setCallCost(usageDetail.getDuration() * this.ratePerSecond);
            usageCostDetail.setDataCost(usageDetail.getData() * this.ratePerMB);
            return usageCostDetail;
        }
    }
    

앞의 application에서 @EnableBinding annotation은 application을 messaging middleware에 bind하려는 것을 나타냅니다.
annotation은 하나 이상의 interface를 매개 변수로 사용합니다.
이 경우에는 채널을 정의하고 입력 및 출력하는 processor 입니다.

@StreamListener annotation은 application의 input 채널에 processUsageCost method를 바인딩 하고 json을 UserDetail 객체로 컨버트하여 전달합니다.

@SendTo annotation은 processUsageCost 메소드의 output을 TopicExchange 를 사용하는 RabbitMQ의 message brocker인 application의 output 채널로 보내는 역할을 합니다.

1.2.2. Configuration

processor application을 구성할 때 다음 properties를 설정해야 합니다.

  • anonymous auto-delete 또는 durable queue를 통해 subscribe 할 input binding destination (RabbitMQ exchange)

  • 어느 consumer application에 속해 있는 consumer group인지 정의할 group

  • producer가 publish 한 데이터를 보낼 output binding destination (RabbitMQ exchange)

  • 메시지 전달 보장을 밪기위한 consuper group을 정의할 requiredGroups

src/main/resources/application.properties 에서 다음과 같은 속성을 추가할 수 있습니다.

spring.cloud.stream.bindings.input.destination=usage-detail
spring.cloud.stream.bindings.input.group=usage-cost-consumer
spring.cloud.stream.bindings.output.destination=usage-cost
spring.cloud.stream.bindings.output.producer.requiredGroups=logger
  • spring.cloud.stream.bindings.input.destinationspring.cloud.stream.bindings.input.group 속성은 UsageCostProcessor 객체의 input 을 RabbitMQ exchange의 usage-detailusage-detail.usage-cost-consumer durable queue로 바인딩합니다.

  • spring.cloud.stream.bindings.output.destination 속성은 UsageCostProcessor 객체의 output을 RabbitMQ exchang의 usage-cost 로 바인딩합니다.

  • spring.cloud.stream.bindings.output.producer.requiredGroups 속성은 RabbitMQ exchage의 usage-cost 에서 소비될 usage-cost.logger durable queue를 생성합니다.

1.2.3. Building

이제 Usage Cost Processor application을 빌드할 수 있습니다.

usage-cost-processor 디렉토리에서 다음 command로 maven project를 빌드합니다.

./mvnw clean package

1.2.4. Testing

Spring cloud Stream은 Spring Cloud Stream application을 테스트하기 위해 spring-cloud-stream-test-support dependency를 제공합니다.
RabbitMQ binder 대신 Test binder를 사용하여 application의 outbound 및 inbound message를 추적하고 테스트합니다.
Test binder는 MessageCollector라 불리는 utility class를 통해 메모리에 메시지를 저장합니다.

`UsageCostProcessor`를 단위 테스트 하려면 다음 코드를 추가하세요.

package io.spring.dataflow.sample.usagecostprocessor;

import java.util.concurrent.TimeUnit;

import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;

import static org.junit.Assert.assertTrue;

@RunWith(SpringRunner.class)
@SpringBootTest
public class UsageCostProcessorApplicationTests {

  @Autowired
  private Processor processor;

  @Autowired
  private MessageCollector messageCollector;

  @Test
  public void contextLoads() {
  }

  @Test
  public void testUsageCostProcessor() throws Exception {
    this.processor.input().send(MessageBuilder.withPayload("{\"userId\":\"user3\",\"duration\":101,\"data\":502}").build());
    Message message = this.messageCollector.forChannel(this.processor.output()).poll(1, TimeUnit.SECONDS);
    assertTrue(message.getPayload().toString().equals("{\"userId\":\"user3\",\"callCost\":10.100000000000001,\"dataCost\":25.1}"));
  }

}
  • contextLoads 테스트 케이스는 application이 성공적으로 시작되었는지 확인합니다.

  • testUsageCostProcessor 테스트 케이스는 Test 바인더의 MessageCollector 에 의해 UsageCostProcessoroutput 메시지를 수집합니다.

1.3. Sink

이 단계에서는 UsageCostLogger Sink를 생성합니다.

Spring initializer site에서 아래처럼 프로젝트를 생성하세요.

  1. 새 maven 프로젝트를 만듭니다.
    group : io.spring.dataflow.sample
    artifact: usage-detail-logger-rabbit

  2. dependency에 RabbitMQ 를 추가합니다.

  3. dependency에 Cloud Stream 을 추가합니다.

  4. dependency에 Actuator 를 추가합니다.

  5. 만약 Cloud Foundry 가 대상 플렛폼인 경우 dependency에 Spring Cloud Connector 를 추가합니다.

  6. Generate Project 버튼을 클릭하세요.

1.3.1. Business Logic

이제 이 application에 필요한 코드를 생성할 수 있습니다.

  1. io.spring.dataflow.sample.usagecostlogger 패키지에 UserCostDetail class를 생성합니다.
    UserCostDetail class는 userId, callCost, dataCost 속성을 가집니다.

  2. io.spring.dataflow.sample.usagecostlogger 패키지에 UsageCostDetail class를 생성합니다.
    다음과 같은 형태입니다.

    package io.spring.dataflow.sample.usagecostlogger;
    
    import io.spring.dataflow.sample.UsageCostDetail;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    
    @EnableBinding(Sink.class)
    public class UsageCostLogger {
    
        private static final Logger logger = LoggerFactory.getLogger(UsageCostLoggerApplication.class);
    
        @StreamListener(Sink.INPUT)
        public void process(UsageCostDetail usageCostDetail) {
            logger.info(usageCostDetail.toString());
        }
    }
    

@EnableBinding annotation은 application을 messaging middleware에 bind 하려는 것을 나타냅니다.
이 annotation은 하나 이상의 interface를 매개 변수로 사용합니다.
이 경우 입력 채널 이름을 정의하는 Sink interface입니다.

@StreamListener annotation은 application의 input 채널을 process method의 UserCostDetail 객체에 JSON 을 변환하여 바인딩해줍니다.

1.3.2. Configuration

sink application을 구성할 떄 다음을 설정해야 합니다.

  • anonymous auto-delete 또는 durable queue를 통해 subscribed 할 input binding destination (RabbitMQ exchange)

  • consumer application이 속할 consumer group을 지정할 group

src/main/resources/application.properties 에서 다음과 같이 속성을 추가할 수 있습니다.

spring.cloud.stream.bindings.input.destination=usage-cost
spring.cloud.stream.bindings.input.group=logger

spring.cloud.stream.bindings.input.destinationspring.cloud.stream.bindings.input.group 속성은 UsageCostLogger 객체의 inputusage-cost.logger durable queue를 통해 RabbitMQ exchange의 usage-cost 에 바인딩해줍니다.

1.3.3. Building

이제 Usage Cost Logger application을 빌드할 수 있습니다.

usage-cost-logger 디렉토리에서 다음 command로 maven project를 빌드합니다.

./mvnw clean package

1.3.4. Testing

UsageCostLogger application을 단위테스트하려면 다음 코드를 추가해보세요.

package io.spring.dataflow.sample.usagecostlogger;

 import io.spring.dataflow.sample.UsageCostDetail;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;

 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.cloud.stream.annotation.EnableBinding;
 import org.springframework.cloud.stream.messaging.Sink;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Primary;
 import org.springframework.messaging.support.MessageBuilder;
 import org.springframework.test.context.junit4.SpringRunner;

 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;

 @RunWith(SpringRunner.class)
 @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
 public class UsageCostLoggerApplicationTests {

    @Autowired
    protected Sink sink;

    @Autowired
    protected UsageCostLogger usageCostLogger;

    @Test
    public void contextLoads() {
    }

    @Test
    public void testUsageCostLogger() throws Exception {
        ArgumentCaptor<UsageCostDetail> captor = ArgumentCaptor.forClass(UsageCostDetail.class);
        this.sink.input().send(MessageBuilder.withPayload("{\"userId\":\"user3\",\"callCost\":10.100000000000001,\"dataCost\":25.1}").build());
        verify(this.usageCostLogger).process(captor.capture());
    }

    @EnableAutoConfiguration
    @EnableBinding(Sink.class)
    static class TestConfig {

        // Override `UsageCostLogger` bean for spying.
        @Bean
        @Primary
        public UsageCostLogger usageCostLogger() {
            return spy(new UsageCostLogger());
        }
    }
 }
  • contextLoads 테스트 케이스는 application이 성공적으로 시작되었는지 확인합니다.

  • testUsageCostLogger 테스트 케이스는 Mockito 를 사용하여 UsageCostLoggerprocess method를 invoke 합ㅎ니다. 이를 위해 UsageCostLogger`의 Mock bean을 만듭니다.
    `UsageCostLogger
    를 mocking하기 위해 TestConfig@EnableBinding@EnableAutoConfiguration 도 선언합니다.

2. Deploymenet

이 섹션에서는 이전에 만든 application을 local machine, Cloud Foundry 및 Kubernetes에 배포합니다.

이 세가지 application (UsageDetailSender , UsageCostProcessorUsageCostLogger)를 배포하면 메시지 흐름은 다음과 같습니다.

UsageDetailSender -> UsageCostProcessor -> UsageCostLogger

UsageDetailSender source application의 output이 UsageCostProcessor processor application의 input에 연결됩니다.
UsageCostProcessor application의 output은 UsageCostLogger sink의 input에 연결됩니다.

application이 실행되면 RabbitMQ binder는 applicatio의 output 및 input 경계를 RabbitMQ message brocker의 exchange와 queue에 바인딩합니다.

2.1. Local

local 환경에서 standalone application으로 실행할 수 있습니다.

RabbitMQ docker 이미지를 설치하고 실행하려면 다음 명령을 실행하세요.

docker run -d --hostname rabbitmq --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:3.7.14-management

설치가 긑나면 local machine의 RabbitMQ 관리 콘솔 (localhost:15672)에 로그인할 수 있습니다. (기본 계정 guest:guest)

2.1.1. Running the UsageDetailSender Source

미리 정의된 configuration properties(유일한 server port와 함께)를 사용하여 UserDetailSender를 실행할 수 있습니다.

java -jar target/usage-detail-sender-rabbit-0.0.1-SNAPSHOT.jar --server.port=9001 &

application이 실행 중 일때 user-detail RabbitMQ exchange가 생상되고 usage-detail.usage-cost-consumer durable queue가 exchange에 아래 그림처럼 바인딩됩니다.

standalone rabbitmq usage detail sender

또한 Queues 를 클릭하여 queue를 확인하면 다음 그림 처럼 durable queue에 메시지가 소비되고 저장되고 있음을 확인할 수 있습니다.

standalone rabbitmq usage detail sender message guarantee

Source application에 대해 consumer application을 구성할 때 durable queue에 해당하는 group binding property를 설정할 수 있습니다.

requiredGroups 속성을 설정하지 않으면 user-detail exchange 에 queue 없는 것을 볼 수 있습니다. 따라서 application이 시작하기 전에 comsumer가 실행되지 않으면 메시지가 손실됩니다.

2.1.2. Running the processor

미리 정의된 configuration properties(유일한 server port와 함께)를 사용하여 `UsageCostProcessort`를 구성할 수 있습니다.

java -jar target/usage-cost-processor-rabbit-0.0.1-SNAPSHOT.jar --server.port=9002 &

RabbitMQ 콘솔에서 다음을 볼 수 있습니다.

  • UsageCostProcessor application이 spring.cloud.stream.bindings.input.group=usage-cost-consumer property로 정의된 usage-detail.usage-cost-consumer durable queue를 소비하는 것을 확인할 수 있습니다.

  • UsageCostProcessor application이 spring.cloud.stream.bindings.output.destination=usage-cost property로 정의된 usage-cost exchange로 `UsageCostDetail`을 제공하는 것을 확인할 수 있습니다.

  • usage-cost.logger durable queue가 생성되고 spring.cloud.stream.bindings.output.producer.requiredGruops=logger property로 정의된 usage-cost exchange 에서 소비되는 것을 확인할 수 있습니다.

이 application이 실행 중일 때 usage-cost RabbieMQ exchange 가 생성되고 `usage-cost.logger`로 이름지어진 durable queue가 이 exchange에 바인딩 되는 것을 다음 그림처럼 확인할 수 있습니다.

standalone rabbitmq usage cost processor

또한 Queues 를 클릭하여 usage-cost.logger queue 가 다음 그림처럼 소비되고 저장되는 것을 확인할 수 있습니다.

standalone rabbitmq usage cost processor message guarantee

2.1.3. Running the Sink

미리 정의된 configuration properties(유일한 server port와 함께)를 사용하여 UserCostLogger를 실행할 수 있습니다.

java -jar target/usage-cost-logger-rabbit-0.0.1-SNAPSHOT.jar --server.port=9003 &

이제 이 application은 다음 예제와 같이 usage-cost RabbitMQ 가 exchange에서 받은 usage-cost.logger durable queue를 통해 받은 usage cost detail을 기록하게 됩니다.

2019-05-08 08:16:46.442  INFO 10769 --- [o6VmGALOP_onw-1] i.s.d.s.u.UsageCostLoggerApplication     : {"userId": "user2", "callCost": "28.3", "dataCost": "29.8" }
2019-05-08 08:16:47.446  INFO 10769 --- [o6VmGALOP_onw-1] i.s.d.s.u.UsageCostLoggerApplication     : {"userId": "user2", "callCost": "12.0", "dataCost": "23.75" }
2019-05-08 08:16:48.451  INFO 10769 --- [o6VmGALOP_onw-1] i.s.d.s.u.UsageCostLoggerApplication     : {"userId": "user4", "callCost": "16.0", "dataCost": "30.05" }
2019-05-08 08:16:49.454  INFO 10769 --- [o6VmGALOP_onw-1] i.s.d.s.u.UsageCostLoggerApplication     : {"userId": "user1", "callCost": "17.7", "dataCost": "18.0" }

2.2. Cloud Foundry