들어가며
MSA와 같은 분산 시스템에서는 서비스 간 결합도를 낮추기 위한 메세지 큐가 필수다. Kafka, RabbitMQ 등 다양한 기술들이 있는데 NATS JetStream은 비교적 생소할 수 있다. 나도 그랬고!!
이 글에서는 'VM 환경에서의 모놀리식 구조에서 Cloud Native 환경에서 MSA 구조로' 전환하는 프로젝트를 진행하며 NATS JetStream의 push 방식에서 pull 방식으로 전환한 경험을 작성해보려 한다.
먼저 간단하게 NATS JetStream의 개념 및 특징에 대해 알아보도록 하자.
1. NATS JetStream이란?
NATS JetStream은 단일 바이너리 메세지 시스템 중 하나로 경량화 및 고성능을 위해 고안된 오픈소스 메세지 브로커이다.
NATS Core vs JetStream
NATS Core는 순수한 pub/sub 메세징시스템으로 메세지 전달 보장이 at-most-once이다. 즉, 메세지가 전달되거나 전달되지 않을 수 있고 구독자가 연결되어있지 않으면 메세지는 유실되는 형태이다. 메모리 기반으로 동작해 매우 빠르지만 persistence가 보장됮 않는다.
반면, NATS JetStream은 NATS Core에 persistence 레이어를 추가한 형태로 보면 된다.
- 메시지 영구 저장 (Stream 개념)
- 재전송 및 재처리 지원
- Consumer 상태 관리
Stream과 Consumer 개념
Stream은 메세지를 저장하는 일종의 로그로, 특정 subject 패턴에 매칭되는 메세지들을 저장하며 보관 정책에 따라 관리된다.
# Stream 설정 예시
Stream: ORDER_EVENTS
Subjects: orders.>
Retention: Limits (메시지 개수, 크기, 시간 기준)
Storage: File (디스크) 또는 Memory
Consumer는 Stream에서 메세지를 소비하는 주체로, 파티션 없이 메세지를 가져가고 각 Consumer는 독립적인 진행 상태를 유지하면서도 여러 Consumer가 동일한 Stream에서 메시지를 읽을 수 있다. 카프카는 하나의 파티션에 하나의 Consumer만 할당되어병렬 처리를 하려고 한다면 파티션을 늘려야한다는 차이점이 있다.
Stream: [MSG1] [MSG2] [MSG3] [MSG4] [MSG5]
↓ ↓ ↓
Consumer A (last read: MSG2)
Consumer B (last read: MSG4)
Consumer C (last read: MSG1)
Kafka vs RabbitMQ vs NATS JetStream
| Kafka | RabbitMQ | NATS JetStream | |
| 아키텍처 | - 분산 이벤트 스트리밍 플랫폼 - 로그 기반 아키텍처로 메세지를 디스크에 영구 저장 - 대용량 데이터 파이프라인에 적합 |
- 전통적인 메세지 브로커 - AMQP 프로토콜 기반 - Exchange-Queue-Binding 구조 사용 |
- 단일 바이너리로 배포되는 경량 시스템 - NATS Core에 persistence 레이어를 추가한 형태 - Cloud Native 환경에 최적화 |
| 메세지 전달 방식 | - pull 방식 : Consumer가 명시적으로 메세지를 가져가고 처리 속도에 맞춰 offset을 관리 |
- push 방식 : 브로커가 consumer 에게 능동적으로 메세지 전달, consumer는 prefetch count를 통해 처리 속도 조절) |
- push, pull 방식 모두 지원 |
| 성능 및 확장성 | - 처리량 측면에서 가장 높은 성능 - 파티셔닝을 통한 뛰어난 수평 확장 |
- 복잡한 라우팅에서의 유연성 | - 경량성과 낮은 레이터시가 강점 : 초당 수백만 메세지 처리 가능 |
| 운영 복잡도 | - 가장 복잡 : 의존성 및 클러스터 관리 |
- 중간 수준 | - 가장 간단 : 단일 바이러니로 배포 & 설정이 직관적 |
2. NATS Messaging Pattern
Subject 기반 메시징
NATS는 topic 대신 subject를 사용한다. Subject는 계층적 구조를 가지며 와일드카드를 지원하는 방식이다.
orders.created # 주문 생성 이벤트
orders.updated # 주문 업데이트 이벤트
orders.*.shipped # 모든 주문의 배송 이벤트
users.> # users로 시작하는 모든 subject
주요 패턴
1. Pub/Sub (1:N)
// Publisher
nc.Publish("orders.created", orderData)
// Subscribers (모두 동일 메시지 수신)
nc.Subscribe("orders.created", handler1)
nc.Subscribe("orders.created", handler2)
2. Request/Reply (1:1)
// Requester
response, _ := nc.Request("user.get", userID, timeout)
// Responder
nc.Subscribe("user.get", func(msg *nats.Msg) {
user := getUserByID(msg.Data)
msg.Respond(user)
})
3. Queue Groups (경쟁 소비)
// 같은 큐 그룹 내에서는 한 인스턴스만 메시지 수신
nc.QueueSubscribe("orders.process", "workers", handler1)
nc.QueueSubscribe("orders.process", "workers", handler2)
// 한 메시지는 handler1 또는 handler2 중 하나만 처리
Push Consumer vs Pull Consumer
Push Consumer (기존 방식)
브로커가 능동적으로 Consumer에게 메시지를 전달하는 방식
// Push Consumer 생성
js.Subscribe("orders.>", func(msg *nats.Msg) {
processOrder(msg.Data)
msg.Ack()
}, nats.Durable("order-processor"))
동작 방식:
- Consumer 생성 시 콜백 함수 등록
- 메시지 도착 시 JetStream이 자동으로 콜백 호출
- Consumer는 메시지 처리 후 Ack
특징:
- 구현이 간단 (콜백 패턴)
- 실시간성이 높음
- 브로커가 flow control 담당
Pull Consumer (개선 방식)
Consumer가 명시적으로 메시지를 요청하는 방식:
// Pull Consumer 생성
sub, _ := js.PullSubscribe("orders.>", "order-processor")
// 배치로 메시지 가져오기
msgs, _ := sub.Fetch(10, nats.MaxWait(5*time.Second))
for _, msg := range msgs {
processOrder(msg.Data)
msg.Ack()
}
동작 방식:
- Consumer가 Fetch 요청
- JetStream이 요청한 개수만큼 메시지 반환
- Consumer가 처리 후 Ack
특징:
- Consumer가 flow control 주도
- Back pressure 자연스럽게 처리
- 배치 처리 가능
func processMessage(msg *nats.Msg) {
order := unmarshalOrder(msg.Data)
// 잘못된 데이터
if order.ID == "" {
msg.Term() // 다시 시도해도 실패
return
}
// 일시적 장애 (DB 연결 실패 등)
if err := saveOrder(order); err != nil {
msg.Nak() // 재시도 요청
return
}
// 장시간 작업
if isHeavyTask(order) {
msg.InProgress() // timeout 방지
// ... 처리 계속
}
msg.Ack() // 성공
}
그렇다면 이 프로젝트에서 왜 NATS JetStream을 사용했냐?
'간단한 설정'이 가장 큰 이유였다. 엄청나고 거대한 MSA 프로젝트를 진행하는 것도 아니였고 기본적으로 우리가 필요한 메세징 기능들을 다 지원해주고 있었다. 만약 더 많은 처리량이 필요했으면 Kafka 도입도 고려했을 수도 있지만 현재 프로젝트 수준에서는 가장 적합하다고 판단하였다.
3. 기존 Push 방식의 문제점
이제 코드를 살펴보자!!
기존에는 아래와 같은 push 방식으로 구현했었다.
transaction-service:
js.subscribe("transaction.deposit", dispatcher, this::handleDeposit, false);
log.info("Subscribed to transaction.deposit");
js.subscribe("transaction.withdrawal", dispatcher, this::handleWithdrawal, false);
log.info("Subscribed to transaction.withdrawal");
account-service:
js.subscribe("transaction.result.>", dispatcher, msg -> {
// 메시지 처리 로직
msg.ack();
}, false);
문제 1: Consumer Group 미설정
- Consumer Group 설정 없음: 여러 Pod가 같은 subject를 구독하면 모두 메시지 수신
- Durable Consumer 없음: Pod 재시작 시 진행 상태 유실
- 중복 체크 없음: 같은 메시지를 여러 번 처리해도 감지 못함
문제 2: replicas: 2 환경에서 중복 처리 발생
Kubernetes 설정:
replicas: 2 # account-service, transaction-service 모두
중복 처리 시나리오:
1. 사용자: 10,000원 입금 요청 1건
↓
2. account-service: NATS로 메시지 1개 전송 "transaction.deposit"
↓
3. NATS: 메시지를 구독자들에게 전달
→ transaction-service Pod A: 메시지 수신 → 잔액 +10,000원
→ transaction-service Pod B: 같은 메시지 수신 → 잔액 또 +10,000원
↓
4. 결과: 10,000원 입금 요청 1건인데, 잔액이 20,000원 증가 X
문제 3: Flow Control 어려움
Push 방식에서는 브로커가 메시지 전송 속도를 결정하기 때문에 Consumer의 처리 속도와 무관하게 메시지가 밀려들어오면서 문제가 발생했다.
[JetStream] → → → [Consumer]
↓↓↓ (처리 중...)
메시지 큐 쌓임 메모리 증가
응답 시간 증가
결국 OOM
Push Consumer의 동작:
- 브로커가 메시지를 계속 push
- Consumer의 버퍼에 메시지 누적
- 처리가 밀리면 메모리 사용량 급증
4. Pull 방식으로 전환
Pull 구독 방식을 도입하고, 중복 처리 방지 로직을 추가함으로써 발생했던 문제를 해결할 수 있었다.
(1) Pull 구독으로 변경
transaction-service (TransactionEventSubscriber):
@Component
@RequiredArgsConstructor
public class TransactionEventSubscriber implements CommandLineRunner {
private final Connection natsConnection;
private final TransactionService transactionService;
private final TransactionRepository transactionRepository;
private final TransactionResultPublisher resultPublisher;
private final ExecutorService executorService = Executors.newFixedThreadPool(2);
@Override
public void run(String... args) throws Exception {
JetStream js = natsConnection.jetStream();
// Pull Consumer 옵션 설정
PullSubscribeOptions depositOptions = PullSubscribeOptions.builder()
.durable("deposit-consumer") // Durable Consumer 이름
.configuration(ConsumerConfiguration.builder()
.maxDeliver(3) // 최대 재전송 3번
.build())
.build();
PullSubscribeOptions withdrawalOptions = PullSubscribeOptions.builder()
.durable("withdrawal-consumer")
.configuration(ConsumerConfiguration.builder()
.maxDeliver(3)
.build())
.build();
// Pull 구독 생성
JetStreamSubscription depositSub = js.subscribe("transaction.deposit", depositOptions);
JetStreamSubscription withdrawalSub = js.subscribe("transaction.withdrawal", withdrawalOptions);
// 별도 스레드에서 polling 시작
executorService.submit(() -> pollMessages(depositSub, "deposit"));
executorService.submit(() -> pollMessages(withdrawalSub, "withdrawal"));
log.info("Started Pull subscribers for deposit and withdrawal");
}
private void pollMessages(JetStreamSubscription subscription, String type) {
while (!Thread.currentThread().isInterrupted()) {
try {
// 한 번에 10개씩 가져오기 (배치 처리)
List<Message> messages = subscription.fetch(10, Duration.ofSeconds(1));
for (Message msg : messages) {
if ("deposit".equals(type)) {
handleDeposit(msg);
} else {
handleWithdrawal(msg);
}
}
} catch (Exception e) {
log.error("Error polling messages for {}", type, e);
if (e instanceof JetStreamStatusException) {
break;
}
}
}
}
private void handleDeposit(Message msg) {
try {
TransactionProcessRequest request = objectMapper.readValue(
msg.getData(), TransactionProcessRequest.class);
// 중복 체크 추가
if (transactionRepository.existsByRecordId(request.getRecordId())) {
log.warn("Duplicate deposit detected: recordId={}", request.getRecordId());
msg.ack(); // 중복이지만 ack (재처리 방지)
return;
}
log.info("Processing deposit for userId: {}", request.getUserId());
TransactionProcessResponse response = transactionService.processDeposit(request);
resultPublisher.publish("transaction.result.deposit", response);
msg.ack();
} catch (Exception e) {
log.error("Failed to process deposit", e);
msg.nak();
}
}
}
account-service (TransactionResultSubscriber):
@Component
@RequiredArgsConstructor
public class TransactionResultSubscriber implements CommandLineRunner {
private final Connection natsConnection;
private final ObjectMapper objectMapper;
private final TransactionRecordRepository transactionRecordRepository;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
@Override
public void run(String... args) throws Exception {
JetStream js = natsConnection.jetStream();
PullSubscribeOptions options = PullSubscribeOptions.builder()
.durable("account-result-consumer")
.configuration(ConsumerConfiguration.builder()
.maxDeliver(3)
.build())
.build();
JetStreamSubscription subscription = js.subscribe("transaction.result.>", options);
executorService.submit(() -> pollMessages(subscription));
log.info("Started Pull subscriber for transaction.result.>");
}
private void pollMessages(JetStreamSubscription subscription) {
while (!Thread.currentThread().isInterrupted()) {
try {
List<Message> messages = subscription.fetch(10, Duration.ofSeconds(1));
for (Message msg : messages) {
handleResult(msg);
}
} catch (Exception e) {
log.error("Error polling messages", e);
if (e instanceof JetStreamStatusException) {
break;
}
}
}
}
private void handleResult(Message msg) {
try {
TransactionProcessResponse response = objectMapper.readValue(
msg.getData(), TransactionProcessResponse.class);
TransactionRecord record = transactionRecordRepository
.findById(response.getRecordId())
.orElseThrow(() -> new RuntimeException(
"TransactionRecord not found: " + response.getRecordId()));
// 중복 체크 추가
if (record.getTransactionId() != null) {
log.warn("Already processed: recordId={}, transactionId={}",
response.getRecordId(), record.getTransactionId());
msg.ack();
return;
}
// 상태 업데이트
if (response.getStatus() == Status.SUCCESS) {
record.updateStatus(Status.SUCCESS);
} else {
record.updateStatus(Status.FAILED);
}
record.setTransactionId(response.getTransactionId());
transactionRecordRepository.save(record);
log.info("Updated record {} to {}, transactionId={}",
record.getId(), record.getStatus(), record.getTransactionId());
msg.ack();
} catch (Exception e) {
log.error("Failed to process transaction result", e);
msg.nak();
}
}
}
(2) 엔티티 및 Repository 변경
Transaction 엔티티 (transaction-service):
@Entity
@Table(name = "transactions")
public class Transaction {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false, unique = true) // unique 제약 추가
private Long recordId;
private String userId;
private TransactionType type;
private BigDecimal amount;
private BigDecimal balanceAfter;
private LocalDateTime createdAt;
public static Transaction create(Long recordId, String userId,
TransactionType type, BigDecimal amount,
BigDecimal balanceAfter) {
return Transaction.builder()
.recordId(recordId)
.userId(userId)
.type(type)
.amount(amount)
.balanceAfter(balanceAfter)
.createdAt(LocalDateTime.now())
.build();
}
}
TransactionRepository:
public interface TransactionRepository extends JpaRepository<Transaction, Long> {
// 중복 체크용 메서드
boolean existsByRecordId(Long recordId);
}
TransactionRecord 엔티티 (account-service):
@Entity
@Table(name = "transaction_records")
public class TransactionRecord {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column
private Long transactionId; // transaction-service의 처리 결과 ID
private String userId;
private BigDecimal amount;
private String type;
@Enumerated(EnumType.STRING)
private Status status;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
public void updateStatus(Status status) {
this.status = status;
this.updatedAt = LocalDateTime.now();
}
public void setTransactionId(Long transactionId) {
this.transactionId = transactionId;
}
}
5. 개선 효과
(1) 중복 처리 해결
Before (Push):
입금 10,000원 요청 → Pod A, B 둘 다 처리 → 잔액 20,000원 증가 X
After (Pull):
입금 10,000원 요청
↓
Pod A: fetch → 메시지 수신 → existsByRecordId() 체크 → 처리 → ack
Pod B: fetch → (메시지 이미 ack됨) → 받지 못함
↓
잔액 10,000원 증가 O
만약 Pod A가 중간에 죽어서 재전송되면:
Pod B: fetch → 메시지 수신 → existsByRecordId() → 이미 처리됨 감지 → ack만
(2) replicas 증가에도 안전
replicas: 1 → 처리량: 500 msgs/s
replicas: 2 → 처리량: 1,000 msgs/s (중복 없이 2배)
replicas: 5 → 처리량: 2,400 msgs/s (선형 확장)
(3) Consumer가 flow control 주도
Push (문제):
- 브로커가 메시지 계속 push
- Pod가 느리면 메시지 버퍼에 쌓임
- 메모리 증가 → OOM 위험
Pull (개선):
- Pod가 fetch(10)으로 10개씩만 요청
- 처리 완료 후 다음 fetch
- 메모리 사용량 일정 유지
(4) 장애 격리
DB 장애 발생 시:
Push: 메시지 계속 push → 처리 실패 → 메모리 누적 → 서비스 죽음
Pull: fetch 실패 → 재시도 간격 증가 → 메시지는 Stream에 보관 → DB 복구 후 자동 재개
핵심 변경 사항을 요약하면 다음과 같다.
항목 Before (Push) After (Pull)
| 구독 방식 | js.subscribe(subject, dispatcher, callback) | js.subscribe(subject, PullSubscribeOptions) |
| 메시지 수신 | 자동 push (콜백) | 명시적 fetch |
| Consumer 이름 | 없음 | Durable ("deposit-consumer") |
| 중복 체크 | 없음 | existsByRecordId() |
| DB 제약 | 없음 | recordId UNIQUE |
| replicas: 2 대응 | 중복 처리 발생 x | 정상 동작 O |
| Flow control | 브로커 주도 | Consumer 주도 (fetch(10)) |
6. 마치며
흔히 사용하는 Kafka나 RabbitMQ를 사용하지 않고 프로젝트 구조에 맞춰 NATS JetStream을 사용해본 것이 좋은 경험이였고 처음에는 아무 생각없이 push 방식으로 구현했는데 늘 작성하는 코드에 의문을 가지고 사용하는 스택에 왜? 라는 질문을 던지는 것이 중요함을 느낄 수 있었다.
kafka 도입을 고려 중이라면 NATS JetStream도 한 번쯤 고려하는걸 추천!!!