공부

NATS JetStream 비동기 push 방식 -> pull 방식 전환

chaeyuuu 2026. 2. 10. 12:25

들어가며

MSA와 같은 분산 시스템에서는 서비스 간 결합도를 낮추기 위한 메세지 큐가 필수다. Kafka, RabbitMQ 등 다양한 기술들이 있는데 NATS JetStream은 비교적 생소할 수 있다. 나도 그랬고!!
이 글에서는 'VM 환경에서의 모놀리식 구조에서 Cloud Native 환경에서 MSA 구조로' 전환하는 프로젝트를 진행하며 NATS JetStream의 push 방식에서 pull 방식으로 전환한 경험을 작성해보려 한다. 

 

먼저 간단하게 NATS JetStream의 개념 및 특징에 대해 알아보도록 하자.

 


1. NATS JetStream이란?

NATS JetStream은 단일 바이너리 메세지 시스템 중 하나로 경량화 및 고성능을 위해 고안된 오픈소스 메세지 브로커이다. 

 

NATS Core vs JetStream

NATS Core는 순수한 pub/sub 메세징시스템으로 메세지 전달 보장이 at-most-once이다. 즉, 메세지가 전달되거나 전달되지 않을 수 있고 구독자가 연결되어있지 않으면 메세지는 유실되는 형태이다. 메모리 기반으로 동작해 매우 빠르지만 persistence가 보장됮 않는다.

반면, NATS JetStream은 NATS Core에 persistence 레이어를 추가한 형태로 보면 된다.

  • 메시지 영구 저장 (Stream 개념)
  • 재전송 및 재처리 지원
  • Consumer 상태 관리

 

Stream과 Consumer 개념

Stream은 메세지를 저장하는 일종의 로그로, 특정 subject 패턴에 매칭되는 메세지들을 저장하며 보관 정책에 따라 관리된다.  

# Stream 설정 예시
Stream: ORDER_EVENTS
Subjects: orders.>
Retention: Limits (메시지 개수, 크기, 시간 기준)
Storage: File (디스크) 또는 Memory

 

Consumer는 Stream에서 메세지를 소비하는 주체로, 파티션 없이 메세지를 가져가고 각 Consumer는 독립적인 진행 상태를 유지하면서도 여러 Consumer가 동일한 Stream에서 메시지를 읽을 수 있다. 카프카는 하나의 파티션에 하나의 Consumer만 할당되어병렬 처리를 하려고 한다면 파티션을 늘려야한다는 차이점이 있다.   

Stream: [MSG1] [MSG2] [MSG3] [MSG4] [MSG5]
         ↓       ↓       ↓
Consumer A (last read: MSG2)
Consumer B (last read: MSG4)
Consumer C (last read: MSG1)

 

Kafka vs RabbitMQ vs NATS JetStream

  Kafka RabbitMQ NATS JetStream
아키텍처 - 분산 이벤트 스트리밍 플랫폼
- 로그 기반 아키텍처로 메세지를 디스크에 영구 저장
- 대용량 데이터 파이프라인에 적합
- 전통적인 메세지 브로커
- AMQP 프로토콜 기반
- Exchange-Queue-Binding 구조 사용
- 단일 바이너리로 배포되는 경량 시스템
- NATS Core에 persistence 레이어를 추가한 형태
- Cloud Native 환경에 최적화
메세지 전달 방식 - pull 방식
: Consumer가 명시적으로 메세지를 가져가고 처리 속도에 맞춰 offset을 관리
- push 방식
: 브로커가 consumer 에게 능동적으로 메세지 전달, consumer는 prefetch count를 통해 처리 속도 조절)
- push, pull 방식 모두 지원
성능 및 확장성 - 처리량 측면에서 가장 높은 성능
- 파티셔닝을 통한 뛰어난 수평 확장 
- 복잡한 라우팅에서의 유연성  - 경량성과 낮은 레이터시가 강점
: 초당 수백만 메세지 처리 가능
운영 복잡도 - 가장 복잡
: 의존성 및 클러스터 관리
- 중간 수준 - 가장 간단
: 단일 바이러니로 배포 & 설정이 직관적

 


2. NATS Messaging Pattern

Subject 기반 메시징

NATS는 topic 대신 subject를 사용한다. Subject는 계층적 구조를 가지며 와일드카드를 지원하는 방식이다.

orders.created       # 주문 생성 이벤트
orders.updated       # 주문 업데이트 이벤트
orders.*.shipped     # 모든 주문의 배송 이벤트
users.>              # users로 시작하는 모든 subject

 

주요 패턴

1. Pub/Sub (1:N)

// Publisher
nc.Publish("orders.created", orderData)

// Subscribers (모두 동일 메시지 수신)
nc.Subscribe("orders.created", handler1)
nc.Subscribe("orders.created", handler2)

 

2. Request/Reply (1:1)

// Requester
response, _ := nc.Request("user.get", userID, timeout)

// Responder
nc.Subscribe("user.get", func(msg *nats.Msg) {
    user := getUserByID(msg.Data)
    msg.Respond(user)
})

 

3. Queue Groups (경쟁 소비)

// 같은 큐 그룹 내에서는 한 인스턴스만 메시지 수신
nc.QueueSubscribe("orders.process", "workers", handler1)
nc.QueueSubscribe("orders.process", "workers", handler2)
// 한 메시지는 handler1 또는 handler2 중 하나만 처리

 

 

Push Consumer vs Pull Consumer

Push Consumer (기존 방식)

브로커가 능동적으로 Consumer에게 메시지를 전달하는 방식

// Push Consumer 생성
js.Subscribe("orders.>", func(msg *nats.Msg) {
    processOrder(msg.Data)
    msg.Ack()
}, nats.Durable("order-processor"))

 

동작 방식:

  1. Consumer 생성 시 콜백 함수 등록
  2. 메시지 도착 시 JetStream이 자동으로 콜백 호출
  3. Consumer는 메시지 처리 후 Ack

특징:

  • 구현이 간단 (콜백 패턴)
  • 실시간성이 높음
  • 브로커가 flow control 담당

 

Pull Consumer (개선 방식)

Consumer가 명시적으로 메시지를 요청하는 방식:

// Pull Consumer 생성
sub, _ := js.PullSubscribe("orders.>", "order-processor")

// 배치로 메시지 가져오기
msgs, _ := sub.Fetch(10, nats.MaxWait(5*time.Second))
for _, msg := range msgs {
    processOrder(msg.Data)
    msg.Ack()
}

 

동작 방식:

  1. Consumer가 Fetch 요청
  2. JetStream이 요청한 개수만큼 메시지 반환
  3. Consumer가 처리 후 Ack

특징:

  • Consumer가 flow control 주도
  • Back pressure 자연스럽게 처리
  • 배치 처리 가능
func processMessage(msg *nats.Msg) {
    order := unmarshalOrder(msg.Data)
    
    // 잘못된 데이터
    if order.ID == "" {
        msg.Term()  // 다시 시도해도 실패
        return
    }
    
    // 일시적 장애 (DB 연결 실패 등)
    if err := saveOrder(order); err != nil {
        msg.Nak()   // 재시도 요청
        return
    }
    
    // 장시간 작업
    if isHeavyTask(order) {
        msg.InProgress()  // timeout 방지
        // ... 처리 계속
    }
    
    msg.Ack()  // 성공
}

 


그렇다면 이 프로젝트에서 왜 NATS JetStream을 사용했냐?

'간단한 설정'이 가장 큰 이유였다. 엄청나고 거대한 MSA 프로젝트를 진행하는 것도 아니였고 기본적으로 우리가 필요한 메세징 기능들을 다 지원해주고 있었다. 만약 더 많은 처리량이 필요했으면 Kafka 도입도 고려했을 수도 있지만 현재 프로젝트 수준에서는 가장 적합하다고 판단하였다.    

 


3. 기존 Push 방식의 문제점

이제 코드를 살펴보자!!

기존에는 아래와 같은 push 방식으로 구현했었다. 

 

transaction-service:

        js.subscribe("transaction.deposit", dispatcher, this::handleDeposit, false);
        log.info("Subscribed to transaction.deposit");

        js.subscribe("transaction.withdrawal", dispatcher, this::handleWithdrawal, false);
        log.info("Subscribed to transaction.withdrawal");

 

 

account-service:

js.subscribe("transaction.result.>", dispatcher, msg -> {
    // 메시지 처리 로직
    msg.ack();
}, false);
 
 
이렇게 하니 아래와 같은 문제점이 있었다. 

 

문제 1: Consumer Group 미설정

  • Consumer Group 설정 없음: 여러 Pod가 같은 subject를 구독하면 모두 메시지 수신
  • Durable Consumer 없음: Pod 재시작 시 진행 상태 유실
  • 중복 체크 없음: 같은 메시지를 여러 번 처리해도 감지 못함

문제 2: replicas: 2 환경에서 중복 처리 발생

Kubernetes 설정:

replicas: 2  # account-service, transaction-service 모두
 

중복 처리 시나리오:

1. 사용자: 10,000원 입금 요청 1건
   ↓
2. account-service: NATS로 메시지 1개 전송 "transaction.deposit"
   ↓
3. NATS: 메시지를 구독자들에게 전달
   → transaction-service Pod A: 메시지 수신 → 잔액 +10,000원
   → transaction-service Pod B: 같은 메시지 수신 → 잔액 또 +10,000원
   ↓
4. 결과: 10,000원 입금 요청 1건인데, 잔액이 20,000원 증가 X

 

 

문제 3: Flow Control 어려움

Push 방식에서는 브로커가 메시지 전송 속도를 결정하기 때문에 Consumer의 처리 속도와 무관하게 메시지가 밀려들어오면서 문제가 발생했다.

 
[JetStream] → → → [Consumer]
              ↓↓↓    (처리 중...)
메시지 큐 쌓임      메모리 증가
                   응답 시간 증가
                   결국 OOM

 

Push Consumer의 동작:

  1. 브로커가 메시지를 계속 push
  2. Consumer의 버퍼에 메시지 누적
  3. 처리가 밀리면 메모리 사용량 급증
 

4. Pull 방식으로 전환

Pull 구독 방식을 도입하고, 중복 처리 방지 로직을 추가함으로써 발생했던 문제를 해결할 수 있었다. 

(1) Pull 구독으로 변경

transaction-service (TransactionEventSubscriber):

@Component
@RequiredArgsConstructor
public class TransactionEventSubscriber implements CommandLineRunner {

    private final Connection natsConnection;
    private final TransactionService transactionService;
    private final TransactionRepository transactionRepository;
    private final TransactionResultPublisher resultPublisher;
    
    private final ExecutorService executorService = Executors.newFixedThreadPool(2);

    @Override
    public void run(String... args) throws Exception {
        JetStream js = natsConnection.jetStream();
        
        // Pull Consumer 옵션 설정
        PullSubscribeOptions depositOptions = PullSubscribeOptions.builder()
                .durable("deposit-consumer")  // Durable Consumer 이름
                .configuration(ConsumerConfiguration.builder()
                        .maxDeliver(3)  // 최대 재전송 3번
                        .build())
                .build();

        PullSubscribeOptions withdrawalOptions = PullSubscribeOptions.builder()
                .durable("withdrawal-consumer")
                .configuration(ConsumerConfiguration.builder()
                        .maxDeliver(3)
                        .build())
                .build();

        // Pull 구독 생성
        JetStreamSubscription depositSub = js.subscribe("transaction.deposit", depositOptions);
        JetStreamSubscription withdrawalSub = js.subscribe("transaction.withdrawal", withdrawalOptions);

        // 별도 스레드에서 polling 시작
        executorService.submit(() -> pollMessages(depositSub, "deposit"));
        executorService.submit(() -> pollMessages(withdrawalSub, "withdrawal"));

        log.info("Started Pull subscribers for deposit and withdrawal");
    }

    private void pollMessages(JetStreamSubscription subscription, String type) {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                // 한 번에 10개씩 가져오기 (배치 처리)
                List<Message> messages = subscription.fetch(10, Duration.ofSeconds(1));

                for (Message msg : messages) {
                    if ("deposit".equals(type)) {
                        handleDeposit(msg);
                    } else {
                        handleWithdrawal(msg);
                    }
                }
            } catch (Exception e) {
                log.error("Error polling messages for {}", type, e);
                if (e instanceof JetStreamStatusException) {
                    break;
                }
            }
        }
    }

    private void handleDeposit(Message msg) {
        try {
            TransactionProcessRequest request = objectMapper.readValue(
                    msg.getData(), TransactionProcessRequest.class);

            // 중복 체크 추가
            if (transactionRepository.existsByRecordId(request.getRecordId())) {
                log.warn("Duplicate deposit detected: recordId={}", request.getRecordId());
                msg.ack();  // 중복이지만 ack (재처리 방지)
                return;
            }

            log.info("Processing deposit for userId: {}", request.getUserId());

            TransactionProcessResponse response = transactionService.processDeposit(request);
            resultPublisher.publish("transaction.result.deposit", response);

            msg.ack();
        } catch (Exception e) {
            log.error("Failed to process deposit", e);
            msg.nak();
        }
    }
}

 

account-service (TransactionResultSubscriber):

@Component
@RequiredArgsConstructor
public class TransactionResultSubscriber implements CommandLineRunner {

    private final Connection natsConnection;
    private final ObjectMapper objectMapper;
    private final TransactionRecordRepository transactionRecordRepository;
    
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();

    @Override
    public void run(String... args) throws Exception {
        JetStream js = natsConnection.jetStream();

        PullSubscribeOptions options = PullSubscribeOptions.builder()
                .durable("account-result-consumer")
                .configuration(ConsumerConfiguration.builder()
                        .maxDeliver(3)
                        .build())
                .build();

        JetStreamSubscription subscription = js.subscribe("transaction.result.>", options);

        executorService.submit(() -> pollMessages(subscription));

        log.info("Started Pull subscriber for transaction.result.>");
    }

    private void pollMessages(JetStreamSubscription subscription) {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                List<Message> messages = subscription.fetch(10, Duration.ofSeconds(1));

                for (Message msg : messages) {
                    handleResult(msg);
                }
            } catch (Exception e) {
                log.error("Error polling messages", e);
                if (e instanceof JetStreamStatusException) {
                    break;
                }
            }
        }
    }

    private void handleResult(Message msg) {
        try {
            TransactionProcessResponse response = objectMapper.readValue(
                    msg.getData(), TransactionProcessResponse.class);

            TransactionRecord record = transactionRecordRepository
                    .findById(response.getRecordId())
                    .orElseThrow(() -> new RuntimeException(
                            "TransactionRecord not found: " + response.getRecordId()));

            // 중복 체크 추가
            if (record.getTransactionId() != null) {
                log.warn("Already processed: recordId={}, transactionId={}",
                        response.getRecordId(), record.getTransactionId());
                msg.ack();
                return;
            }

            // 상태 업데이트
            if (response.getStatus() == Status.SUCCESS) {
                record.updateStatus(Status.SUCCESS);
            } else {
                record.updateStatus(Status.FAILED);
            }
            record.setTransactionId(response.getTransactionId());

            transactionRecordRepository.save(record);
            log.info("Updated record {} to {}, transactionId={}",
                    record.getId(), record.getStatus(), record.getTransactionId());

            msg.ack();
        } catch (Exception e) {
            log.error("Failed to process transaction result", e);
            msg.nak();
        }
    }
}

(2) 엔티티 및 Repository 변경

Transaction 엔티티 (transaction-service):

@Entity
@Table(name = "transactions")
public class Transaction {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true)  // unique 제약 추가
    private Long recordId;

    private String userId;
    private TransactionType type;
    private BigDecimal amount;
    private BigDecimal balanceAfter;
    private LocalDateTime createdAt;

    public static Transaction create(Long recordId, String userId, 
                                     TransactionType type, BigDecimal amount, 
                                     BigDecimal balanceAfter) {
        return Transaction.builder()
                .recordId(recordId)
                .userId(userId)
                .type(type)
                .amount(amount)
                .balanceAfter(balanceAfter)
                .createdAt(LocalDateTime.now())
                .build();
    }
}

 

TransactionRepository:

public interface TransactionRepository extends JpaRepository<Transaction, Long> {
    // 중복 체크용 메서드
    boolean existsByRecordId(Long recordId);
}

 

TransactionRecord 엔티티 (account-service):

@Entity
@Table(name = "transaction_records")
public class TransactionRecord {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column
    private Long transactionId;  // transaction-service의 처리 결과 ID

    private String userId;
    private BigDecimal amount;
    private String type;
    
    @Enumerated(EnumType.STRING)
    private Status status;
    
    private LocalDateTime createdAt;
    private LocalDateTime updatedAt;

    public void updateStatus(Status status) {
        this.status = status;
        this.updatedAt = LocalDateTime.now();
    }

    public void setTransactionId(Long transactionId) {
        this.transactionId = transactionId;
    }
}

 

5. 개선 효과

(1) 중복 처리 해결

Before (Push):

입금 10,000원 요청 → Pod A, B 둘 다 처리 → 잔액 20,000원 증가 X

 

After (Pull):

입금 10,000원 요청
↓
Pod A: fetch → 메시지 수신 → existsByRecordId() 체크 → 처리 → ack
Pod B: fetch → (메시지 이미 ack됨) → 받지 못함
↓
잔액 10,000원 증가 O

만약 Pod A가 중간에 죽어서 재전송되면:
Pod B: fetch → 메시지 수신 → existsByRecordId() → 이미 처리됨 감지 → ack만

 

(2) replicas 증가에도 안전

replicas: 1 → 처리량: 500 msgs/s
replicas: 2 → 처리량: 1,000 msgs/s (중복 없이 2배)
replicas: 5 → 처리량: 2,400 msgs/s (선형 확장)

 

(3) Consumer가 flow control 주도

Push (문제):

  • 브로커가 메시지 계속 push
  • Pod가 느리면 메시지 버퍼에 쌓임
  • 메모리 증가 → OOM 위험

Pull (개선):

  • Pod가 fetch(10)으로 10개씩만 요청
  • 처리 완료 후 다음 fetch
  • 메모리 사용량 일정 유지

(4) 장애 격리

DB 장애 발생 시:

Push: 메시지 계속 push → 처리 실패 → 메모리 누적 → 서비스 죽음

Pull: fetch 실패 → 재시도 간격 증가 → 메시지는 Stream에 보관 → DB 복구 후 자동 재개

 

 

핵심 변경 사항을 요약하면 다음과 같다.

 

항목 Before (Push) After (Pull)

구독 방식 js.subscribe(subject, dispatcher, callback) js.subscribe(subject, PullSubscribeOptions)
메시지 수신 자동 push (콜백) 명시적 fetch
Consumer 이름 없음 Durable ("deposit-consumer")
중복 체크 없음 existsByRecordId()
DB 제약 없음 recordId UNIQUE
replicas: 2 대응 중복 처리 발생 x 정상 동작 O
Flow control 브로커 주도 Consumer 주도 (fetch(10))

 

6. 마치며

 

흔히 사용하는 Kafka나 RabbitMQ를 사용하지 않고 프로젝트 구조에 맞춰 NATS JetStream을 사용해본 것이 좋은 경험이였고 처음에는 아무 생각없이 push 방식으로 구현했는데 늘 작성하는 코드에 의문을 가지고 사용하는 스택에 왜? 라는 질문을 던지는 것이 중요함을 느낄 수 있었다. 

kafka 도입을 고려 중이라면 NATS JetStream도 한 번쯤 고려하는걸 추천!!!

 

참고 자료