ReentrantLock

  • 객체에 적용된 synchronized 키워드처럼 작동한다 
  • 하지만 이때 명확한 락킹과 언락킹이 필요하다는 차이점이 있다.
  • 락킹과 언락킹 사이에서 동기화가 진행된다.
  • 이 때 주의할 사항으로는 언락킹을 빼먹거나 중간에 return이나 throw가 발생하여 언락킹이 실행되지 않는 경우인데, 이를 위해 동기화가 필요한 부분에 try{}문으로 감싸고 언락킹 부분을 finally{}로 감쏸다면 언제나 언락킹 메소드가 실행된다.
  • 이 방식은 락의 공정성을 제어할 때 빛을 발한다. synchronized를 이용하면 한 스레드만 지속적으로 락을 얻게 되는 상황이 발생할 수 있다. 이는 공정하지 않은 상황으로, 이를 위해 ReentrantLock을 사용해야 한다. 이 때 중요한 스레드가 락을 얻어야 하는데 공정성에 의해 늦게 받으며 앱의 처리량이 줄어들 수 있다. isFair(true)를 해주면 된다.
  • 락을 걸었을 때 다른 스레드에서 락을 사용하고 있다면 블로킹 상태가 된다. 이때는 interrupt()을 보내도 스레드가 감지하지 않지만, lockInterruptibly()을 호출해주면 interrupt 상태를 감지하여 종료할 수도 있다. 즉, 락을 얻기 위해 대기 중일 때 interrupt를 감지할 수 있는 락을 얻는다.
  • tryLock()은 락이 다른 스레드에 없으면 lock()과 동일하게 동작하지만, 락이 다른 스레드에 존재하면 블로킹 상태에 들어가는 lock()과 달리 무조건 값을 반환하여 계속적으로 스레드가 실행되도록 할 수 있다. 따라서 실시간 UI같은 경우 지속적인 반응을 만들어낼 수 있다. 이 때 값은 boolean값으로 락을 얻었을 때는 true를 반환한다.
  • 기본적인 메커니즘은 락 객체가 (try)lock()을 호출했을 때 동시에 수행해도 상관없는 로직일지라도 unlock까지 실행하지 못한다.
public static void main(String[] args) {
    PricesContainer pricesContainer = new PricesContainer();
    Thread thread1 = new PriceUpdater(pricesContainer);
    Thread thread2 = new PriceGetter(pricesContainer);

    thread1.start();
    thread2.start();
}

public static class PricesContainer {
	//생성자 호출 시 아무 값을 안 주면 비공정 락으로 설정됨. 따라서 공정 락으로 설정하려면 true값을 인자로 줘야 함.
    private Lock lockObject = new java.util.concurrent.locks.ReentrantLock();

    private double bitcoinPrice;
    private double etherPrice;
    private double litecoinPrice;
    private double bitcoinCashPrice;
    private double ripplePrice;

    public void setLockObject(Lock lockObject) {
        this.lockObject = lockObject;
    }

    public void setBitcoinPrice(double bitcoinPrice) {
        this.bitcoinPrice = bitcoinPrice;
    }

    public void setEtherPrice(double etherPrice) {
        this.etherPrice = etherPrice;
    }

    public void setLitecoinPrice(double litecoinPrice) {
        this.litecoinPrice = litecoinPrice;
    }

    public void setBitcoinCashPrice(double bitcoinCashPrice) {
        this.bitcoinCashPrice = bitcoinCashPrice;
    }

    public void setRipplePrice(double ripplePrice) {
        this.ripplePrice = ripplePrice;
    }

    public Lock getLockObject() {
        return lockObject;
    }

    public double getBitcoinPrice() {
        return bitcoinPrice;
    }

    public double getEtherPrice() {
        return etherPrice;
    }

    public double getLitecoinPrice() {
        return litecoinPrice;
    }

    public double getBitcoinCashPrice() {
        return bitcoinCashPrice;
    }

    public double getRipplePrice() {
        return ripplePrice;
    }
}

public static class PriceUpdater extends Thread {
    private PricesContainer pricesContainer;
    private Random random = new Random();

    public PriceUpdater(PricesContainer pricesContainer) {
        this.pricesContainer = pricesContainer;
    }

    @Override
    public void run() {
        while (true) {
            pricesContainer.getLockObject().lock();
            try {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
                pricesContainer.setBitcoinPrice(random.nextInt(20000));
                pricesContainer.setEtherPrice(random.nextInt(20000));
                pricesContainer.setLitecoinPrice(random.nextInt(20000));
                pricesContainer.setBitcoinCashPrice(random.nextInt(20000));
                pricesContainer.setRipplePrice(random.nextInt(20000));
            } finally {
                pricesContainer.getLockObject().unlock();
            }
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
        }
    }
}

public static class PriceGetter extends Thread {
    private PricesContainer pricesContainer;

    public PriceGetter(PricesContainer pricesContainer) {
        this.pricesContainer = pricesContainer;
    }

    @Override
    public void run() {
        while (true) {
            if (pricesContainer.getLockObject().tryLock()) {
                try {
                    System.out.println(pricesContainer.getBitcoinPrice() + " " +
                            pricesContainer.getEtherPrice() + " " +
                            pricesContainer.getLitecoinPrice() + " " +
                            pricesContainer.getBitcoinCashPrice() + " " +
                            pricesContainer.getRipplePrice());
                } finally {
                    pricesContainer.getLockObject().unlock();
                }
            }
            else{
                System.out.println(System.currentTimeMillis());
            }
        }
    }
}

 

ReentrantReadWriteLock

  • 읽기 락과 쓰기락을 각각 가지고 있다.
  • 따라서 읽기의 경우 여러 스레드가 동시에 접근해도 블로킹 상태가 되지 않고 동시에 접근할 수 있도록 허용해주지만 쓰기 락의 경우다른 스레드를 블로킹 상태를 만들어준다.
  • 또한 읽기 락과 쓰기 락은 동시에 접근할 수 없다. 만약 스레드 하나가 쓰기 락을 얻으면 다른 어떤 스레드도 쓰기 락과 읽기 락을 얻을 수 없다. 반대로 모든 스레드가 읽기 락을 풀기 전까지 쓰기 락을 얻을 수 없다.
private static final int HIGHEST_PRICE=1000;

public static void main(String[] args) throws InterruptedException {
    InventoryDatabase inventoryDatabase = new InventoryDatabase();

    Random random = new Random();
    for (int i = 0; i < 100000; i++) {
        inventoryDatabase.addItem(random.nextInt(HIGHEST_PRICE));
    }

    Thread writer = new Thread(()->{
        while(true){
            inventoryDatabase.addItem(random.nextInt(HIGHEST_PRICE));
            inventoryDatabase.removeItem(random.nextInt(HIGHEST_PRICE));

            try{
                Thread.sleep(10);
            }catch (InterruptedException e){}
        }
    });

    writer.setDaemon(true);
    writer.start();

    int numberOfReaderThreads = 7;
    List<Thread> readers = new ArrayList<>();
    for (int readerIndex = 0; readerIndex < numberOfReaderThreads; readerIndex++) {
        Thread reader = new Thread(()->{
            for (int i = 0; i < 100000; i++) {
                int upperBoundPrice = random.nextInt(HIGHEST_PRICE);
                int lowerBoundPrice = upperBoundPrice>0? random.nextInt(upperBoundPrice) : 0;
                inventoryDatabase.getNumberOfItemsInPriceRange(lowerBoundPrice, upperBoundPrice);
            }
        });

        reader.setDaemon(true);
        readers.add(reader);
    }

    long startReadingTime = System.currentTimeMillis();

    for (Thread reader : readers) {
        reader.start();
    }

    for (Thread reader : readers) {
        reader.join();
    }

    long endReadingTime = System.currentTimeMillis();

    System.out.println(String.format("Reading took %d ms", endReadingTime-startReadingTime));
}

public static class InventoryDatabase {
    private TreeMap<Integer, Integer> priceToCountMap = new TreeMap<>();
    private ReentrantLock lock = new ReentrantLock();
    private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
    private Lock readLock = reentrantReadWriteLock.readLock();
    private Lock writeLock = reentrantReadWriteLock.writeLock();

    public int getNumberOfItemsInPriceRange(int lowerBound, int upperBound) {
        readLock.lock();
        try {
            Integer fromKey = priceToCountMap.ceilingKey(lowerBound);
            Integer toKey = priceToCountMap.floorKey(upperBound);

            if (fromKey == null || toKey == null) {
                return 0;
            }
            NavigableMap<Integer, Integer> rangeOfPrices = priceToCountMap.subMap(fromKey, true,
                    toKey, true);

            int sum = 0;
            for (int numberOfItemsForPrice : rangeOfPrices.values()) {
                sum += numberOfItemsForPrice;
            }
            return sum;
        } finally {
            readLock.unlock();
        }
    }

    public void addItem(int price) {
        writeLock.lock();
        try {
            Integer numberOfItemsForPrice = priceToCountMap.get(price);
            if (numberOfItemsForPrice == null) {
                priceToCountMap.put(price, 1);
            } else {
                priceToCountMap.put(price, numberOfItemsForPrice + 1);
            }
        } finally {
            writeLock.unlock();
        }
    }

    public void removeItem(int price) {
        writeLock.lock();
        try {
            Integer numberOfItemsForPrice = priceToCountMap.get(price);
            if (numberOfItemsForPrice == null || numberOfItemsForPrice == 1) {
                priceToCountMap.remove(price);
            } else {
                priceToCountMap.put(price, numberOfItemsForPrice - 1);
            }
        }finally {
            writeLock.unlock();
        }
    }
}

'JAVA > 멀티스레딩' 카테고리의 다른 글

Lock-Free 알고리즘, 데이터 구조 및 기술  (0) 2024.09.02
스레드간 통신  (0) 2024.08.30
병행성 문제와 솔루션  (0) 2024.08.29
멀티 스레드 주의 사항(병행성 문제)  (0) 2024.08.29
Thread Pooling  (0) 2024.08.28

+ Recent posts