Project/토이 프로젝트

[선착순 상품 구매 프로젝트] 자바 synchronized 키워드 적용으로 동시성 제어하기

Lea Hwang 2024. 4. 12. 14:03

특정 시간대에 집중된 주문 요청이 발생하는 [선착순 상품 구매 프로젝트]를 진행하면서 동시성 문제를 맞땋드렸습니다. synchronized 키워드 활용, 낙관적 락, 비관적 락을 활용해 동시성 제어통합 테스트로 확인해보았습니다. 

 

두 포스팅으로 나눠서 어떤 이유로 해당 방법을 사용했는지 저만의 문제 접근 방식을 기술해 보도록 하겠습니다. 

1. [선착순 상품 구매 프로젝트] 자바 synchronized 키워드 적용으로 동시성 제어하기

2. [선착순 상품 구매 프로젝트] Pessimistic Lock(비관적 락, 선점 잠금), Optimistic Lock(낙관적 락, 비선점 잠금)으로 동시성제어하기

목차

문제 상황

문제 분석

해결 방법

구현 및 테스트 결과

아쉬운 점 및 한계점

향후 학습 


 

문제 상황

주문/결제 요청 시 재고 수량보다 많은 결제가 처리되는 문제가 발생하였습니다. 

 

문제 분석

기존 재고 증가, 감소 코드 

[재고 증가 요청 메서드]
@Transactional
public StockResponseDto increaseProductStock(StockRequestDto requestDto) {
    if (requestDto.stock() <= 0) {
        throw new CustomException(ErrorCode.INVALID_STOCK_QUANTITY);
    }

    Stock stock = stockRepository.findById(requestDto.productId())
            .orElseThrow(() -> new CustomException(ErrorCode.PRODUCT_NOT_FOUND));

    Long newStockQuantity = stock.getStock() + requestDto.stock();

    Stock updatedStock = stockRepository.save(Stock.builder()
            .productId(stock.getProductId())
            .stock(newStockQuantity)
            .build());

    return new StockResponseDto(updatedStock.getProductId(), updatedStock.getStock());
}


[재고 감소 요청 메서드]
@Transactional
public StockResponseDto decreaseProductStock(StockRequestDto requestDto) {
    Stock stock = stockRepository.findById(requestDto.productId())
            .orElseThrow(() -> new CustomException(ErrorCode.PRODUCT_NOT_FOUND));

    Long newStockQuantity = stock.getStock() - requestDto.stock();

    if(newStockQuantity < 0) {
        throw new CustomException(ErrorCode.STOCK_NOT_ENOUGH);
    }

    Stock updatedStock = stockRepository.save(Stock.builder()
            .productId(stock.getProductId())
            .stock(newStockQuantity)
            .build());

    return new StockResponseDto(updatedStock.getProductId(), updatedStock.getStock());
}

 

동시성 시뮬레이션  코드

시나리오 코드를 통해 상품을 등록하고 주문 ➡ 결제 서비스를 동시에 1,000개 요청을 보내는 상황을 가정했습니다. 

아래는 상품을 등록, 동시에 주문과 결제 요청을 하는 시나리오 코드입니다.

더보기

상품 등록 시나리오

- 일반 상품

- 예약 상품 (선착순 상품 구매 프로젝트의 '상품'이다.)

import pymysql
from datetime import datetime

# 테이블 초기화 함수
def truncate_table(connection, table_name):
    cursor = connection.cursor()
    cursor.execute(f"TRUNCATE TABLE {table_name}")
    connection.commit()
    print(f"{table_name} db 초기화 성공")
    cursor.close()

def insert_products():
    try:
        # 데이터베이스 연결 설정
        connection = pymysql.connect(
            host='localhost',
            port=13310,
            database='product_database',
            user='[user]',
            password='[password]'
        )
        
        # product 테이블 초기화
        truncate_table(connection, 'product')

        cursor = connection.cursor()

        # 일반 상품 등록
        normal_product_query = """
        INSERT INTO product (product_name, price, product_type, available_from, available_until) 
        VALUES (%s, %s, %s, %s, %s)
        """
        normal_product_values = ("일반 상품", 12500, "REGULAR", None, None)
        cursor.execute(normal_product_query, normal_product_values)
        normal_product_id = cursor.lastrowid

        # 예약 상품 등록 (매번 시간 수정 필요)
        pre_order_product_query = """
        INSERT INTO product (product_name, price, product_type, available_from, available_until) 
        VALUES (%s, %s, %s, %s, %s)
        """
        available_at = datetime(2024, 4, 7, 11, 0, 0)
        end_at = datetime(2025, 4, 7, 12, 0, 0)
        pre_order_product_values = ('예약 상품', 500, 'RESERVED', available_at, end_at)
        cursor.execute(pre_order_product_query, pre_order_product_values)
        pre_order_product_id = cursor.lastrowid

        connection.commit()

        print(f"일반 상품과 예약 상품이 성공적으로 등록되었습니다. 상품 번호: {normal_product_id}, {pre_order_product_id}")

    except pymysql.MySQLError as e:
        print("데이터베이스 연결 또는 쿼리 실행 중 오류 발생:", e)
    finally:
        # 연결 종료
        cursor.close()
        connection.close()
        print("MySQL 연결이 종료됨.")
    return [normal_product_id, pre_order_product_id]

def insert_stocks(firstProductId, secondProductId):
    try:
        # 데이터베이스 연결 설정
        connection = pymysql.connect(
            host='localhost',
            port=13313,
            database='stock_database',
            user='[user]',
            password='[password]'
        )

        # stock 테이블 초기화
        truncate_table(connection, 'stock')
        
        cursor = connection.cursor()

        # 재고 등록
        stock_query = "INSERT INTO stock (product_id, stock) VALUES (%s, %s)"
        cursor.execute(stock_query, (firstProductId, 100))

        retrieve_query = "SELECT * FROM stock WHERE product_id = %s"
        cursor.execute(retrieve_query, (firstProductId,))
        inserted_data = cursor.fetchone()
        print(f"일반 상품 삽입된 데이터: {inserted_data}")

        cursor.execute(stock_query, (secondProductId, 20))
        cursor.execute(retrieve_query, (secondProductId,))
        inserted_data = cursor.fetchone()
        print(f"예약 상품 삽입된 데이터: {inserted_data}")

        connection.commit()

    except pymysql.MySQLError as e:
        print("데이터베이스 연결 또는 쿼리 실행 중 오류 발생:", e)
    finally:
        # 연결 종료
        cursor.close()
        connection.close()
        print("MySQL 연결이 종료됨.")

if __name__ == "__main__":
    insert_product_result = insert_products()
    insert_stocks(insert_product_result[0], insert_product_result[1])

 

동시에 주문과 결제 요청을 하는 시나리오 

import pymysql
import requests
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime

# 주문, 결제 테이블 초기화 (테스트 정확성 보장)
def truncate_table():
    try:
        order_table = "orders"

        connection = pymysql.connect(
            host='localhost',
            port=13311,
            database='orders_database',
            user='[user]',
            password='[password]'
        )

        cursor = connection.cursor()
        cursor.execute(f"TRUNCATE TABLE {order_table}")
        connection.commit()
        cursor.close()
        connection.close()

        payment_table = "payment"
        connection = pymysql.connect(
            host='localhost',
            port=13312,
            database='payment_database',
            user='youjung',
            password='timegate2249207'
        )

        cursor = connection.cursor()
        cursor.execute(f"TRUNCATE TABLE {payment_table}")
        connection.commit()
        cursor.close()
        connection.close()
        
        print("주문, 결제 DB 초기화 성공")

    except pymysql.MySQLError as e:
        print("DB 연결 또는 쿼리 실행 중 오류 발생:", e)
    
    finally:
        print("MySQL 연결 종료.")
    
def send_http_request(buyerNum):
    order_url = "http://localhost:8083/orders-service/api/v1/orders"
    payment_url = "http://localhost:8084/payment-service/api/v1/payments"

    try:
        # POST 요청 (매번 productId, price 수정 필요)
        response = requests.post(order_url, json={"userId": buyerNum, "productId": 2, "price": 500, "quantity":1})

        # 응답 JSON 형식으로 파싱
        data = response.json()

        # "orderId" 키가 있는지 확인
        if "orderId" in data.get("data", {}):
            orderId = data["data"]["orderId"]
            print(f"Received orderId: {orderId}")

            # 결제 요청
            payment_response = requests.post(payment_url, json={"orderId": orderId})

            if payment_response.status_code == 201:
                payment_data = payment_response.json()
                print(f"Payment for orderId {orderId} completed successfully, {payment_data['data']}")
            else:
                payment_data = payment_response.json()
                print(f"Payment for orderId {orderId} failed with status code {payment_data}")

    except requests.exceptions.RequestException as e:
        print(f"Error sending request to {order_url}: {e}")

def main():
    # 테이블 초기화
    truncate_table()

    # 주문 및 결제 서비스 동시적으로 HTTP 요청(n번 요청)
    num_requests = 1000  # buyerNum을 1부터 n까지 보낼 것이므로 요청 수를 n으로 설정

    start_time = datetime.now()  # 코드 실행 시작 시간

    # 동시에 여러 주문 요청을 처리 -> 동시성 문제 확인
    with ThreadPoolExecutor(max_workers=10) as executor:
        # Use a list comprehension to create a list of tasks
        tasks = [executor.submit(send_http_request, buyerNum) for buyerNum in range(1, num_requests + 1)]

        # 비동기 작업 완료 될 때 까지 대기 후 처리
        for future in tasks:
            future.result()

    end_time = datetime.now()  # 코드 실행 종료 시간
    duration = end_time - start_time

    print(f"실행 소요시간 {duration}")

if __name__ == "__main__":
    main()

 

MySQL DB에서 stock 엔티티를 조회하고 증가/감소하는 부분에서 동시성 문제가 발생함을 확인하였습니다.

 

해결 방법

자바의 synchronized키워드 사용한 이유
서버가 여러대인 상황에서 서버들 간 동기화된 처리를 위해 Redis 분산락 방식이 있습니다. 하지만 초기 스타트업 입장에서 바로 Redis를 사용하기엔 비용부담이 있을 수 있습니다. 단일서버에서 만큼은 원자적으로 해결해보고 싶었고 자바에서 제공하는 기술을 선 적용하여 JVM의 동작방식을 익히는 데 초점을 두고자 했습니다.

자바가 동기화를 지원하기 위해 사용하는 메커니즘 : 모니터 (Monitor)
모든 자바 객체는 기본적으로 모니터를 가지며 여러 스레드가 객체의 임계 영역(critical section)에 진입하려고 할 때  JVM 은 synchronized키워드를 이용하여 뮤텍스 동기화를 처리해 스레드 간 동기화를 보장합니다. 
➡ synchronized 키워드를 통해 자바는 동기화를 구현

synchronized 
synchronized 블록은 해당 객체의 모니터를 획득할 수 있으며 모니터를 획득한 스레드만이 임계영역에 접근 가능합니다. 추후 synchronized 블록을 빠져나오면 모니터 Lock 이 해제되고 대기 중인 다른 스레드 중 하나가 락을 얻고 임계 영역에 진입하여 작업을 수행합니다. synchronized는 메서드나 코드 블록에 적용할 수 있습니다.

 

구현 및 테스트 결과

구현 코드

재고 증가, 감소 요청 메서드

(synchronized 키워드 적용 (@Transactional 주석처리))

//@Transactional
public StockResponseDto increaseProductStock(StockRequestDto requestDto) {
    if (requestDto.stock() <= 0) {
        throw new CustomException(ErrorCode.INVALID_STOCK_QUANTITY);
    }

    synchronized(this) {
        Stock stock = stockRepository.findById(requestDto.productId())
                .orElseThrow(() -> new CustomException(ErrorCode.PRODUCT_NOT_FOUND));

        Long newStockQuantity = stock.getStock() + requestDto.stock();

        Stock updatedStock = stockRepository.save(Stock.builder()
                .productId(stock.getProductId())
                .stock(newStockQuantity)
                .build());

        return new StockResponseDto(updatedStock.getProductId(), updatedStock.getStock());
    }
}

 

//@Transactional
public synchronized StockResponseDto decreaseProductStock(StockRequestDto requestDto) {
    Stock stock = stockRepository.findById(requestDto.productId())
            .orElseThrow(() -> new CustomException(ErrorCode.PRODUCT_NOT_FOUND));

    Long newStockQuantity = stock.getStock() - requestDto.stock();

    if(newStockQuantity < 0) {
        throw new CustomException(ErrorCode.STOCK_NOT_ENOUGH);
    }

    Stock updatedStock = stockRepository.save(Stock.builder()
            .productId(stock.getProductId())
            .stock(newStockQuantity)
            .build());

    return new StockResponseDto(updatedStock.getProductId(), updatedStock.getStock());
}

 

JUnit5를 사용한 통합테스트
단위테스트가 아닌 통합테스트 한 이유

관련 메서드에 대한 단위 테스트를 진행할 경우, 실제 데이터베이스와의 연동이 이루어지지 않아 재고 감소나 증가가 올바르게 반영되지 않는 문제가 발생했습니다. 이에 통합 테스트를 통해 실제 상황을 가정하고 테스트를 수행하였습니다.

원자적 연산 적용
자바의 java.util.concurrent.atomic 패키지를 통해 원자적 연산을 지원하는 클래스들을 제공합니다. 이러한 클래스들은 멀티 스레드 환경에서 공유 변수에 안전하게 작업할 수 있도록 돕습니다.
예) AtomicInteger, AtomicLong, AtomicBoolean 등의 클래스들은 각각 int, long, boolean 타입의 변수에 대한 원자적 연산을 지원하며 incrementAndGet(), compareAndSet()와 같은 메서드를 통해 공유 변수에 대한 원자적 연산을 수행합니다. (아래 적용 코드)

더보기

@Test
@DisplayName("예약 상품 100개에 대한 120개 요청 처리 시 모든 재고 소진 및 재고 부족 예외(STOCK_NOT_ENOUGH) 발생 검증")
void whenRequesting120_OutOf100Stock_ThenRemainingStockShouldBe0_And_HandleStockNotEnoughError() throws InterruptedException {
    // given
    int requestCount = 120;
    AtomicInteger successCount = new AtomicInteger();
    AtomicInteger failureCount = new AtomicInteger();
    CountDownLatch latch = new CountDownLatch(requestCount);
    ExecutorService executor = Executors.newFixedThreadPool(10);

    // when
    for (int i = 0; i < requestCount; i++) {
        executor.submit(() -> {
            try {
                stockService.decreaseProductStock(new StockRequestDto(productId, 1L));
                successCount.incrementAndGet();
            } catch (CustomException e) {
                if (e.getMessage().contains("STOCK_NOT_ENOUGH")) {
                    failureCount.incrementAndGet();
                }
            } finally {
                latch.countDown();
            }
        });
    }

    latch.await();
    executor.shutdown();

    // then
    Optional<Stock> finalStock = stockRepository.findById(productId);
    assertEquals(0, finalStock.get().getStock().intValue());
    assertEquals(initialStock.intValue(), successCount.get());
}

 

테스트 결과

코드 링크 [ feat: 동시성 제어를 위한 synchronized 키워드 활용 및 테스트 코드 구현 ]


아쉬운 점 및 한계점

1. 파이썬으로 작성한 동시성 시뮬레이션 코드
파이썬 코드를 사용하면 간결하게 동시성 문제를 시뮬레이션하고 테스트하기 편리하다는 장점이 있습니다.하지만 인터프리터 언어이기 때문에 컴파일 언어에 비해 실행 속도가 느릴 수 있고 비동기로 진행하게 되어 동시성 높은 애플리케이션에서는 이로 인해 병목 현상이 발생할 수도 있습니다. (만 건 동시에 0.01초 차이도 안 나게 요청을 처리하기에 한계점 존재)
따라서 다음 프로젝트에서는 부하 테스트와 성능 측정을 위한 전문적인 툴을 사용해보고자합니다.
(예) JMeter, nGrinder, Locust, Gatling 도구 중에 프로젝트의 요구사항에 맞게 사용하고자 합니다.)

2. 자바의 synchronized 키워드를 사용한 동시성 제어는 멀티 서버 환경에서 보장되지 않으며, 데드락의 가능성이 큽니다.
자바의 synchronized는 단일 프로세스 내에서만 동기화를 보장합니다. 하나의 서버에서는 효과적일 수 있지만, 여러 서버가 데이터에 접근하는 환경에서는 적합하지 않습니다. 현업에서는 여러 서버를 사용하는 상황이 많기 때문에, synchronized 키워드는 일반적으로 사용되지 않습니다.


향후 학습 

1. @Transactional 안에서 synchronized 키워드 썼을 때는 왜 에러 났을까?

왜? @Transactional의 동작 방식 때문
Transactional 호출 후에, 메서드를 호출하고, 메서드 실행이 종료가 되면, Transaction을 종료하는 흐름이다. Transactional 종료시점에 DB업데이트를 하는데, 재고 감소 메서드 실행이 종료되고 DB업데이트 전에 다른 스레드가 해당 메서드를 호출할 수 있는데, 그럼 그 다른 스레드는 갱신 전 값을 가져가서 문제가 발생하는 것이다.
→ 우선 @Transactional 주석처리해서 성공처리

 

2.  Mysql을 활용한 다양한 방법 학습 후 적용
- Pessimistic Lock 비관락 

- Optimistic Lock 낙관락

- Named Lock