JAVA/멀티스레딩

스레드간 통신

최-코드 2024. 8. 30. 17:43

세마포어

  • 리소스당 하나의 사용자만 제한하는 기존의 락과 달리 세마포어는 리소스에 동적으로 사용자 수를 제한할 수 있다.
  • 세마포어는 0을 포함한 양의 정수의 값을 가지는데, new Semaphore(num)을 통해 초기의 세마포어 값을 지정해줄 수 있다.
  • 이 생성한 세마포어의 acquire()을 사용했을 때, 세마포어의 값이 0이 아니라면 이 메소드를 호출한 스레드는 다음 라인으로 계속 진행하고 세마포어의 값을 -1 시킨다. 인자로 정수 값을 넘겨주면 해당 값만큼 허가를 얻을 수 있다.
  • 0의 값을 가진 세마포어에 스레드가 acquire하면 블로킹 상태에 들어간다. 다른 스레드에서 release()를 통해 세마포어의 값이 1이 되면 블로킹 상태가 풀리면서 다시 세마포어는 0의 값이 된다.
  • 특이한 점으로는 세마포어의 값을 가지지 않은 acquire하지 않은 스레드도 release를 호2출하여 세마포어의 값을 증가시킬 수 있다.
public static void main(String[] args) {
    Barrier barrier = new Barrier(3);
    Thread thread1 = new Thread(new CoordinatedWorkRunner(barrier));
    Thread thread2 = new Thread(new CoordinatedWorkRunner(barrier));
    Thread thread3 = new Thread(new CoordinatedWorkRunner(barrier));

    thread1.start();
    thread2.start();
    thread3.start();
}

public static class Barrier {
    private final int numberOfWorkers;
    private final Semaphore semaphore = new Semaphore(0);
    private int counter = 0;
    private final Lock lock = new ReentrantLock();

    public Barrier(int numberOfWorkers) {
        this.numberOfWorkers = numberOfWorkers;
    }

    public void waitForOthers() throws InterruptedException {
        lock.lock();
        boolean isLastWorker = false;
        try {
            counter++;

            if (counter == numberOfWorkers) {
                isLastWorker = true;
            }
        } finally {
            lock.unlock();
        }

        if (isLastWorker) {
            semaphore.release(numberOfWorkers - 1);
        } else {
            semaphore.acquire();
        }
    }
}

public static class CoordinatedWorkRunner implements Runnable {

    private final Barrier barrier;

    public CoordinatedWorkRunner(Barrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public void run() {
        try {
            task();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private void task() throws InterruptedException {
        // Performing Part 1
        System.out.println(Thread.currentThread().getName()
                + " part 1 of the work is finished");

        barrier.waitForOthers();

        // Performing Part2
        System.out.println(Thread.currentThread().getName()
                + " part 2 of the work is finished");
    }
}
//출력 : 
//part 1 of the work is finished
//part 1 of the work is finished
//part 1 of the work is finished
//part 2 of the work is finished
//part 2 of the work is finished
//part 2 of the work is finished

 

조건 변수 

  • threadA에서 조건이 충족이 안 되면 블로킹 상태에 빠지는데, 이 조건은 다른 스레드에서 충족시켜줄 수 있다. 만약 조건이 충족되면 thradA의 블로킹 상태가 해제되고 작업을 실행하게 된다.
  • 이 조건 변수는 ReentrantLock을 통해 얻을 수 있는데, .newCondition()하면 된다. 그러면 Condition 타입의 객체가 반환된다.
  • await() 메소드를 통해 스레드를 블로킹 상태로 만드는데, 이 때 조건 변수를 만든 Lock의 unlock 메소드를 원자적으로 실행하게 된다. 따라서 다른 스레드에서 해당 락을 얻어 작업을 수행할 수 있게 된다.
  • 다른 스레드에서 signal()을 호출하면 대기 중인 스레드가 깨어나지만, unlock을 했으므로 다시 lock을 얻어야 한다. 따라서 스레드가 unlock()을 호출하여 lock을 얻기까지 계속 대기 상태에 있어야 한다.
public static class SimpleCountDownLatch {
    private int count;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public SimpleCountDownLatch(int count) {
        this.count = count;
        if (count < 0) {
            throw new IllegalArgumentException("count cannot be negative");
        }
    }

    public void await() throws InterruptedException {
        lock.lock();
        try {
            while (count > 0) {
                condition.await();
            }
        } finally {
            lock.unlock();
        }
    }

    public void countDown() {
        lock.lock();
        try {
            if (count > 0) {
                count--;
                if (count == 0) {
                    condition.signalAll();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    public int getCount() {
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
}

 

wait() & notify{All}()

  • 조건 변수와 같은 것으로, 대신 이는 synchronized와 같이 사용된다.
  • wait()을 통해 Lock을 해제함과 동시에 블로킹 상태에 들어가고, notify()을 통해 wait()을 호출한 스레드를 깨우게 된다.
  • 이때도 마찬가지로 nitify를 호출한 스레드가 아직 Lock을 가지고 있으면 lock을 풀고 나서 wait()을 호출한 스레드가 실행된다.
  • 조건 변수와 마찬가지로 notify의 신호는 synchronized에 설정된 객체와 동일한 메소드에만 전달된다.
public static void main(String[] args) throws InterruptedException {
        StateClass stateClass = new StateClass();

        Thread change = new ChangeStateThread(stateClass);
        Thread check1 = new CheckStateThread(stateClass);
        Thread check2 = new CheckStateThread(stateClass);

        check1.start();
        check2.start();
        Thread.sleep(2000);
        change.start();
    }

    public static class ChangeStateThread extends Thread{
        private final StateClass stateClass;

        public ChangeStateThread(StateClass stateClass) {
            this.stateClass = stateClass;
        }

        @Override
        public void run() {
            stateClass.changeState();
        }
    }

    public static class CheckStateThread extends Thread{
        private final StateClass stateClass;

        public CheckStateThread(StateClass stateClass) {
            this.stateClass = stateClass;
        }

        @Override
        public void run() {
            try {
                stateClass.isTrueState();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class StateClass{
        private boolean state =false;

        public synchronized void changeState(){
            state =true;
            notifyAll();
        }

        public synchronized void isTrueState() throws InterruptedException {
            System.out.println("enter");
            while(!state){
                wait();
            }
            System.out.println("State is " + state);
        }
    }
//출력 : 
//enter 
//enter 
//State is ture
//State is true