이 가이드에서는 RabbitMQ에 대한 Spring Cloud Stream의 지원을 사용하는 3가지 Spring Boot application을 개발하여 Cloud Foundry, Kubernetes 및 local machine에 배치합니다.
다른 가이드에서는 Data Flow를 사용하여 이 application을 배포합니다.
application을 수동으로 배포하면 Data Flow에서 자동으로 수행하는 단개를 보다 잘 이해할 수 있습니다.
다음 절에서는 이러한 applicaton을 처음부터 빌드하는 방법에 대해 설명합니다.
원하는 경우 이러한 application의 소스가 포함된 zip 파일을 다운로드 하여 압축을 풀고 deploy 섹션으로 진행할 수 있습니다.
브라우저에서 세 가지 application을 모두 포함하는 프로젝트를 다운로드 할 수 있습니다.
다음 예제와 같이 command line을 사용할 수도 있습니다.
wget https://github.com/spring-cloud/spring-cloud-dataflow-samples/blob/master/dataflow-website/stream-developer-guides/streams/standalone-stream-rabbitmq/dist/usage-cost-stream-rabbit.zip?raw=true -O usage-cost-stream-rabbit.zip
1. Development
우리는 RabbitMQ를 사용하여 통신하는 3개의 Spring Cloud Stream Application을 생성합니다.
시나리오는 고객을 위한 청구서를 작성하는 휴대 전화 회사입니다.
유저는 각 통화마다 duration
과 사용한 data
양이 있습니다.
청구서를 생성하는 프로세스의 일부로 원시 호출 데이터는 call 기간 동안의 비용 및 사용된 데이터의 양에 대한 비용으로 변환되어야 합니다.
call은 duration과 사용한 data 양을 포함한 UsageDetail을 사용하여 모델링됩니다.
bill은 call의 가격(costCall)과 data의 가격(costData)을 포함한 UserCostDetail을 사용하여 모델링 됩니다.
3 가지 streaming application은 다음과 같습니다.
-
UserDetailSender로 명명한
Source
application은userId
와 사용자의 통화 시간(duration
)과data
의 양을 가진 UserDetail을 전송 (UserDetail Json과 같은 객체) -
UsageCostProcess로 명명한
Processor
는 UserDetail을 소비하여 userId 당 cost of the call과 cost of the data를 계산한 후 UsageCostDetail JSON으로 객체를 보냅니다. -
UsageCostLogger로 명명한
Sink
는 UserCostDetail 객체를 소비하고 호출 및 데이터의 비용을 기록합니다.
1.1. Source
이 단계에서는 UsageDetailSender
source를 만듭니다.
Spring initializer site에서 아래처럼 프로젝트를 생성하세요.
-
새 maven 프로젝트를 만듭니다.
group :io.spring.dataflow.sample
artifact:usage-detail-sender-rabbit
-
dependency에
RabbitMQ
를 추가합니다. -
dependency에
Cloud Stream
을 추가합니다. -
dependency에
Actuator
를 추가합니다. -
만약
Cloud Foundry
가 대상 플렛폼인 경우 dependency에 SpringCloud Connector
를 추가합니다. -
Generate Project 버튼을 클릭하세요.
1.1.1. Business Logic
이제 이 application에 필요한 코드를 생성할 수 있습니다.
-
io.spring.dataflow.sample.usagedetailsender
패키지에UserDetail
class를 생성합니다.
UserDetail
class는userId
,data
,duration
속성을 가집니다. -
io.spring.dataflow.sample.usagedetailsender
패키지에UsageDetailSender
class를 생성합니다.
다음과 같은 형태입니다.package io.spring.dataflow.sample.usagedetailsender; @EnableScheduling @EnableBinding(Source.class) public class UsageDetailSender { @Autowired private Source source; private String[] users = {"user1", "user2", "user3", "user4", "user5"}; @Scheduled(fixedDelay = 1000) public void sendEvents() { UsageDetail usageDetail = new UsageDetail(); usageDetail.setUserId(this.users[new Random().nextInt(5)]); usageDetail.setDuration(new Random().nextInt(300)); usageDetail.setData(new Random().nextInt(700)); this.source.output().send(MessageBuilder.withPayload(usageDetail).build()); } }
@EnableBinding
annotation은 application을 messaging middleware에 bind 하려는 것을 나타냅니다.
이 annotation은 하나 이상의 interface를 매개 변수로 사용합니다.
이 경우 출력 채널 이름을 정의하는 Source interface입니다.
RabbitMQ의 경우 output
채널로 전송된 메시지는 TopicExchange
를 사용하여 RabbitMQ message brocker로 전송됩니다.
@EnableSCheduling
annotation은 Spring의 Scheduling 기능을 활성화 할 것을 나타내며 @Scheduled
annotation이 1초마다 fixedDelay로 수행되도록 합니다.
sendEvents
method는 UserDetail을 생성한 후 Source 객체의 output().send() method를 통해 output 채널로 전송합니다.
1.1.2. Configuration
source
application을 구성할 때 다음을 설정해야 합니다.
-
data를 publish 할
output
binding destination (RabbitMQ exchange) producer -
consumer applicaton으로 보내지는 메시지가 소비되는 그룹을 보장하게 하는 consumer group을 명시할
requiredGroups
src/main/resources/application.properties
에서 다음과 같은 속성을 추가할 수 있습니다.
spring.cloud.stream.bindings.output.destination=usage-detail
spring.cloud.stream.bindings.output.producer.requiredGroups=usage-cost-consumer
-
이
spring.cloud.stream.bindings.output.destination
속성은 UserDetailSender 객체의 output을usage-detail
RabbitMQ exchange에 바인딩합니다. -
이
spring.cloud.stream.bindings.output.producer.requiredGroups
속성은 RabbitMQ exchange의usage-detail
의usage-detail.usage-cost-consumer
에서 소비되도록 합니다.
Durable Queues
기본적으로 Spring Cloud Stream application은 anonymous
auto-delete queue를 생성합니다.
이로 인해 producer application이 consumer application보다 먼저 실행된 경우 producer가 저장 및 전달하지 않는 메시지가 발생할 수 있습니다.
따라서 이후에 소비가 되도록 durable
queue를 exchange에 연결시켜주어야 합니다.
영구 대기열을 미리 만들고 이를 exchange에 bind하려면 producer application은 다음과 같은 속성이 설정되야 합니다.
spring.cloud.stream.bindings.<channelName>.producer.requiredGroups
requiredGroups
속성은 producer가 메시지 전달을 보장해야하는 쉼표로 구분된 그룹 목록을 허용합니다.
이 속성을 설정하면 <exchange>.<requiredGroup>
형식을 사용하여 영구 대기열이 만들어집니다.
1.1.3. Building
이제 Usage Detail Sender application을 빌드할 수 있습니다.
usage-detail-sender 디렉토리에서 다음 command로 maven project를 빌드합니다.
./mvnw clean package
1.1.4. Testing
Spring cloud Stream은 Spring Cloud Stream application을 테스트하기 위해 spring-cloud-stream-test-support
dependency를 제공합니다.
RabbitMQ binder 대신 Test
binder를 사용하여 application의 outbound 및 inbound message를 추적하고 테스트합니다.
Test
binder는 MessageCollector라 불리는 utility class를 통해 메모리에 메시지를 저장합니다.
UsageDetailSender application을 단위테스트하려면 다음 코드를 추가해보세요.
package io.spring.dataflow.sample.usagedetailsender;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class UsageDetailSenderApplicationTests {
@Autowired
private MessageCollector messageCollector;
@Autowired
private Source source;
@Test
public void contextLoads() {
}
@Test
public void testUsageDetailSender() throws Exception {
Message message = this.messageCollector.forChannel(this.source.output()).poll(1, TimeUnit.SECONDS);
String usageDetailJSON = message.getPayload().toString();
assertTrue(usageDetailJSON.contains("userId"));
assertTrue(usageDetailJSON.contains("duration"));
assertTrue(usageDetailJSON.contains("data"));
}
}
-
contextLoads
테스트 케이스는 application이 성공적으로 시작되었는지 확인합니다. -
testUsageDetailSender
테스트 케이스는Test
바인더의MessageCollector
에 의해UsageDetailSender
로 전송된 메시지를 수집합니다.
1.2. Processor
이 단계에서는 UsageCostProcessor
processor를 생성합니다.
Spring initializer site에서 아래처럼 프로젝트를 생성하세요.
-
새 Maven 프로젝트를 만듭니다.
group:io.spring.dataflow.sample
artifact:usage-cost-processor-rabbit
-
dependency에
RabbitMQ
를 추가합니다. -
dependency에
Cloud Stream
을 추가합니다. -
dependency에
Actuator
를 추가합니다. -
만약
Cloud Foundry
플랫폼인 경우 dependency에Cloud Connectors
를 추가합니다. -
Generate Project 버튼을 클릭하세요.
1.2.1. Business Logic
이제 application에 필요한 코드를 생성할 수 있습니다.
-
io.spring.dataflow.sample.usagecostprocessor
패키지에UserDetail
class를 생성합니다.
UserDetail 클래스는 userId, data, duration 속성을 가집니다. -
io.spring.dataflow.sample.usagecostprocessor
패키지에UsageCostDetail
class를 생성합니다.
UserCostDetail
class는userId
,callCost
,dataCost
속성을 가집니다. -
io.spring.dataflow.sample.usagecostprocessor
패키지에UsageCostProcessor
class를 생성합니다.
UserDetail
메시지를 수신하고 데이터 비용을 계산하고UserCostDetail
메시지를 보내는 역할을 합니다.
소스코드는 다음과 같습니다.package io.spring.dataflow.sample.usagecostprocessor; @EnableBinding(Processor.class) public class UsageCostProcessor { private double ratePerSecond = 0.1; private double ratePerMB = 0.05; @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public UsageCostDetail processUsageCost(UsageDetail usageDetail) { UsageCostDetail usageCostDetail = new UsageCostDetail(); usageCostDetail.setUserId(usageDetail.getUserId()); usageCostDetail.setCallCost(usageDetail.getDuration() * this.ratePerSecond); usageCostDetail.setDataCost(usageDetail.getData() * this.ratePerMB); return usageCostDetail; } }
앞의 application에서 @EnableBinding
annotation은 application을 messaging middleware에 bind하려는 것을 나타냅니다.
annotation은 하나 이상의 interface를 매개 변수로 사용합니다.
이 경우에는 채널을 정의하고 입력 및 출력하는 processor 입니다.
@StreamListener
annotation은 application의 input
채널에 processUsageCost
method를 바인딩 하고 json을 UserDetail
객체로 컨버트하여 전달합니다.
@SendTo
annotation은 processUsageCost
메소드의 output을 TopicExchange
를 사용하는 RabbitMQ의 message brocker인 application의 output
채널로 보내는 역할을 합니다.
1.2.2. Configuration
processor
application을 구성할 때 다음 properties를 설정해야 합니다.
-
anonymous
auto-delete 또는durable
queue를 통해 subscribe 할input
binding destination (RabbitMQ exchange) -
어느 consumer application에 속해 있는 consumer group인지 정의할
group
-
producer가 publish 한 데이터를 보낼
output
binding destination (RabbitMQ exchange) -
메시지 전달 보장을 밪기위한 consuper group을 정의할
requiredGroups
src/main/resources/application.properties
에서 다음과 같은 속성을 추가할 수 있습니다.
spring.cloud.stream.bindings.input.destination=usage-detail
spring.cloud.stream.bindings.input.group=usage-cost-consumer
spring.cloud.stream.bindings.output.destination=usage-cost
spring.cloud.stream.bindings.output.producer.requiredGroups=logger
-
spring.cloud.stream.bindings.input.destination
과spring.cloud.stream.bindings.input.group
속성은UsageCostProcessor
객체의input
을 RabbitMQ exchange의usage-detail
에usage-detail.usage-cost-consumer
durable queue로 바인딩합니다. -
spring.cloud.stream.bindings.output.destination
속성은UsageCostProcessor
객체의 output을 RabbitMQ exchang의usage-cost
로 바인딩합니다. -
spring.cloud.stream.bindings.output.producer.requiredGroups
속성은 RabbitMQ exchage의usage-cost
에서 소비될usage-cost.logger
durable queue를 생성합니다.
1.2.3. Building
이제 Usage Cost Processor application을 빌드할 수 있습니다.
usage-cost-processor 디렉토리에서 다음 command로 maven project를 빌드합니다.
./mvnw clean package
1.2.4. Testing
Spring cloud Stream은 Spring Cloud Stream application을 테스트하기 위해 spring-cloud-stream-test-support
dependency를 제공합니다.
RabbitMQ binder 대신 Test
binder를 사용하여 application의 outbound 및 inbound message를 추적하고 테스트합니다.
Test
binder는 MessageCollector라 불리는 utility class를 통해 메모리에 메시지를 저장합니다.
`UsageCostProcessor`를 단위 테스트 하려면 다음 코드를 추가하세요.
package io.spring.dataflow.sample.usagecostprocessor;
@RunWith(SpringRunner.class)
@SpringBootTest
public class UsageCostProcessorApplicationTests {
@Autowired
private Processor processor;
@Autowired
private MessageCollector messageCollector;
@Test
public void contextLoads() {
}
@Test
public void testUsageCostProcessor() throws Exception {
this.processor.input().send(MessageBuilder.withPayload("{\"userId\":\"user3\",\"duration\":101,\"data\":502}").build());
Message message = this.messageCollector.forChannel(this.processor.output()).poll(1, TimeUnit.SECONDS);
assertTrue(message.getPayload().toString().equals("{\"userId\":\"user3\",\"callCost\":10.100000000000001,\"dataCost\":25.1}"));
}
}
-
contextLoads
테스트 케이스는 application이 성공적으로 시작되었는지 확인합니다. -
testUsageCostProcessor
테스트 케이스는Test
바인더의MessageCollector
에 의해UsageCostProcessor
의output
메시지를 수집합니다.
1.3. Sink
이 단계에서는 UsageCostLogger
Sink를 생성합니다.
Spring initializer site에서 아래처럼 프로젝트를 생성하세요.
-
새 maven 프로젝트를 만듭니다.
group :io.spring.dataflow.sample
artifact:usage-detail-logger-rabbit
-
dependency에
RabbitMQ
를 추가합니다. -
dependency에
Cloud Stream
을 추가합니다. -
dependency에
Actuator
를 추가합니다. -
만약
Cloud Foundry
가 대상 플렛폼인 경우 dependency에 SpringCloud Connector
를 추가합니다. -
Generate Project 버튼을 클릭하세요.
1.3.1. Business Logic
이제 이 application에 필요한 코드를 생성할 수 있습니다.
-
io.spring.dataflow.sample.usagecostlogger
패키지에UserCostDetail
class를 생성합니다.
UserCostDetail
class는userId
,callCost
,dataCost
속성을 가집니다. -
io.spring.dataflow.sample.usagecostlogger
패키지에UsageCostDetail
class를 생성합니다.
다음과 같은 형태입니다.package io.spring.dataflow.sample.usagecostlogger; @EnableBinding(Sink.class) public class UsageCostLogger { private static final Logger logger = LoggerFactory.getLogger(UsageCostLoggerApplication.class); @StreamListener(Sink.INPUT) public void process(UsageCostDetail usageCostDetail) { logger.info(usageCostDetail.toString()); } }
@EnableBinding
annotation은 application을 messaging middleware에 bind 하려는 것을 나타냅니다.
이 annotation은 하나 이상의 interface를 매개 변수로 사용합니다.
이 경우 입력 채널 이름을 정의하는 Sink interface입니다.
@StreamListener
annotation은 application의 input
채널을 process
method의 UserCostDetail
객체에 JSON 을 변환하여 바인딩해줍니다.
1.3.2. Configuration
sink
application을 구성할 떄 다음을 설정해야 합니다.
-
anonymous
auto-delete 또는durable
queue를 통해 subscribed 할input
binding destination (RabbitMQ exchange) -
consumer application이 속할 consumer group을 지정할
group
src/main/resources/application.properties
에서 다음과 같이 속성을 추가할 수 있습니다.
spring.cloud.stream.bindings.input.destination=usage-cost
spring.cloud.stream.bindings.input.group=logger
spring.cloud.stream.bindings.input.destination
과 spring.cloud.stream.bindings.input.group
속성은 UsageCostLogger
객체의 input
을 usage-cost.logger
durable queue를 통해 RabbitMQ exchange의 usage-cost
에 바인딩해줍니다.
1.3.3. Building
이제 Usage Cost Logger application을 빌드할 수 있습니다.
usage-cost-logger 디렉토리에서 다음 command로 maven project를 빌드합니다.
./mvnw clean package
1.3.4. Testing
UsageCostLogger application을 단위테스트하려면 다음 코드를 추가해보세요.
package io.spring.dataflow.sample.usagecostlogger;
import io.spring.dataflow.sample.UsageCostDetail;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class UsageCostLoggerApplicationTests {
@Autowired
protected Sink sink;
@Autowired
protected UsageCostLogger usageCostLogger;
@Test
public void contextLoads() {
}
@Test
public void testUsageCostLogger() throws Exception {
ArgumentCaptor<UsageCostDetail> captor = ArgumentCaptor.forClass(UsageCostDetail.class);
this.sink.input().send(MessageBuilder.withPayload("{\"userId\":\"user3\",\"callCost\":10.100000000000001,\"dataCost\":25.1}").build());
verify(this.usageCostLogger).process(captor.capture());
}
@EnableAutoConfiguration
@EnableBinding(Sink.class)
static class TestConfig {
// Override `UsageCostLogger` bean for spying.
@Bean
@Primary
public UsageCostLogger usageCostLogger() {
return spy(new UsageCostLogger());
}
}
}
-
contextLoads
테스트 케이스는 application이 성공적으로 시작되었는지 확인합니다. -
testUsageCostLogger
테스트 케이스는Mockito
를 사용하여UsageCostLogger
의process
method를 invoke 합ㅎ니다. 이를 위해UsageCostLogger`의 Mock bean을 만듭니다.
를 mocking하기 위해
`UsageCostLoggerTestConfig
는@EnableBinding
과@EnableAutoConfiguration
도 선언합니다.
2. Deploymenet
이 섹션에서는 이전에 만든 application을 local machine, Cloud Foundry 및 Kubernetes에 배포합니다.
이 세가지 application (UsageDetailSender
, UsageCostProcessor
및 UsageCostLogger
)를 배포하면 메시지 흐름은 다음과 같습니다.
UsageDetailSender -> UsageCostProcessor -> UsageCostLogger
UsageDetailSender
source application의 output이 UsageCostProcessor
processor application의 input에 연결됩니다.
UsageCostProcessor
application의 output은 UsageCostLogger
sink의 input에 연결됩니다.
application이 실행되면 RabbitMQ
binder는 applicatio의 output 및 input 경계를 RabbitMQ message brocker의 exchange와 queue에 바인딩합니다.
2.1. Local
local 환경에서 standalone application으로 실행할 수 있습니다.
RabbitMQ docker 이미지를 설치하고 실행하려면 다음 명령을 실행하세요.
docker run -d --hostname rabbitmq --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:3.7.14-management
설치가 긑나면 local machine의 RabbitMQ 관리 콘솔 (localhost:15672)에 로그인할 수 있습니다. (기본 계정 guest:guest)
2.1.1. Running the UsageDetailSender Source
미리 정의된 configuration properties(유일한 server port와 함께)를 사용하여 UserDetailSender를 실행할 수 있습니다.
java -jar target/usage-detail-sender-rabbit-0.0.1-SNAPSHOT.jar --server.port=9001 &
application이 실행 중 일때 user-detail
RabbitMQ exchange가 생상되고 usage-detail.usage-cost-consumer
durable queue가 exchange에 아래 그림처럼 바인딩됩니다.
또한 Queues
를 클릭하여 queue를 확인하면 다음 그림 처럼 durable queue에 메시지가 소비되고 저장되고 있음을 확인할 수 있습니다.
이 Source
application에 대해 consumer application을 구성할 때 durable queue에 해당하는 group
binding property를 설정할 수 있습니다.
이 requiredGroups 속성을 설정하지 않으면 user-detail exchange 에 queue 없는 것을 볼 수 있습니다. 따라서 application이 시작하기 전에 comsumer가 실행되지 않으면 메시지가 손실됩니다.
|
2.1.2. Running the processor
미리 정의된 configuration properties(유일한 server port와 함께)를 사용하여 `UsageCostProcessort`를 구성할 수 있습니다.
java -jar target/usage-cost-processor-rabbit-0.0.1-SNAPSHOT.jar --server.port=9002 &
RabbitMQ 콘솔에서 다음을 볼 수 있습니다.
-
UsageCostProcessor
application이spring.cloud.stream.bindings.input.group=usage-cost-consumer
property로 정의된usage-detail.usage-cost-consumer
durable queue를 소비하는 것을 확인할 수 있습니다. -
UsageCostProcessor
application이spring.cloud.stream.bindings.output.destination=usage-cost
property로 정의된usage-cost
exchange로 `UsageCostDetail`을 제공하는 것을 확인할 수 있습니다. -
usage-cost.logger
durable queue가 생성되고spring.cloud.stream.bindings.output.producer.requiredGruops=logger
property로 정의된usage-cost
exchange 에서 소비되는 것을 확인할 수 있습니다.
이 application이 실행 중일 때 usage-cost
RabbieMQ exchange 가 생성되고 `usage-cost.logger`로 이름지어진 durable queue가 이 exchange에 바인딩 되는 것을 다음 그림처럼 확인할 수 있습니다.
또한 Queues
를 클릭하여 usage-cost.logger
queue 가 다음 그림처럼 소비되고 저장되는 것을 확인할 수 있습니다.
2.1.3. Running the Sink
미리 정의된 configuration properties(유일한 server port와 함께)를 사용하여 UserCostLogger를 실행할 수 있습니다.
java -jar target/usage-cost-logger-rabbit-0.0.1-SNAPSHOT.jar --server.port=9003 &
이제 이 application은 다음 예제와 같이 usage-cost
RabbitMQ 가 exchange에서 받은 usage-cost.logger
durable queue를 통해 받은 usage cost detail을 기록하게 됩니다.
2019-05-08 08:16:46.442 INFO 10769 --- [o6VmGALOP_onw-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "user2", "callCost": "28.3", "dataCost": "29.8" }
2019-05-08 08:16:47.446 INFO 10769 --- [o6VmGALOP_onw-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "user2", "callCost": "12.0", "dataCost": "23.75" }
2019-05-08 08:16:48.451 INFO 10769 --- [o6VmGALOP_onw-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "user4", "callCost": "16.0", "dataCost": "30.05" }
2019-05-08 08:16:49.454 INFO 10769 --- [o6VmGALOP_onw-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "user1", "callCost": "17.7", "dataCost": "18.0" }