JAVA/멀티스레딩
스레드간 통신
최-코드
2024. 8. 30. 17:43
세마포어
- 리소스당 하나의 사용자만 제한하는 기존의 락과 달리 세마포어는 리소스에 동적으로 사용자 수를 제한할 수 있다.
- 세마포어는 0을 포함한 양의 정수의 값을 가지는데, new Semaphore(num)을 통해 초기의 세마포어 값을 지정해줄 수 있다.
- 이 생성한 세마포어의 acquire()을 사용했을 때, 세마포어의 값이 0이 아니라면 이 메소드를 호출한 스레드는 다음 라인으로 계속 진행하고 세마포어의 값을 -1 시킨다. 인자로 정수 값을 넘겨주면 해당 값만큼 허가를 얻을 수 있다.
- 0의 값을 가진 세마포어에 스레드가 acquire하면 블로킹 상태에 들어간다. 다른 스레드에서 release()를 통해 세마포어의 값이 1이 되면 블로킹 상태가 풀리면서 다시 세마포어는 0의 값이 된다.
- 특이한 점으로는 세마포어의 값을 가지지 않은 acquire하지 않은 스레드도 release를 호2출하여 세마포어의 값을 증가시킬 수 있다.
public static void main(String[] args) {
Barrier barrier = new Barrier(3);
Thread thread1 = new Thread(new CoordinatedWorkRunner(barrier));
Thread thread2 = new Thread(new CoordinatedWorkRunner(barrier));
Thread thread3 = new Thread(new CoordinatedWorkRunner(barrier));
thread1.start();
thread2.start();
thread3.start();
}
public static class Barrier {
private final int numberOfWorkers;
private final Semaphore semaphore = new Semaphore(0);
private int counter = 0;
private final Lock lock = new ReentrantLock();
public Barrier(int numberOfWorkers) {
this.numberOfWorkers = numberOfWorkers;
}
public void waitForOthers() throws InterruptedException {
lock.lock();
boolean isLastWorker = false;
try {
counter++;
if (counter == numberOfWorkers) {
isLastWorker = true;
}
} finally {
lock.unlock();
}
if (isLastWorker) {
semaphore.release(numberOfWorkers - 1);
} else {
semaphore.acquire();
}
}
}
public static class CoordinatedWorkRunner implements Runnable {
private final Barrier barrier;
public CoordinatedWorkRunner(Barrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
task();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private void task() throws InterruptedException {
// Performing Part 1
System.out.println(Thread.currentThread().getName()
+ " part 1 of the work is finished");
barrier.waitForOthers();
// Performing Part2
System.out.println(Thread.currentThread().getName()
+ " part 2 of the work is finished");
}
}
//출력 :
//part 1 of the work is finished
//part 1 of the work is finished
//part 1 of the work is finished
//part 2 of the work is finished
//part 2 of the work is finished
//part 2 of the work is finished
조건 변수
- threadA에서 조건이 충족이 안 되면 블로킹 상태에 빠지는데, 이 조건은 다른 스레드에서 충족시켜줄 수 있다. 만약 조건이 충족되면 thradA의 블로킹 상태가 해제되고 작업을 실행하게 된다.
- 이 조건 변수는 ReentrantLock을 통해 얻을 수 있는데, .newCondition()하면 된다. 그러면 Condition 타입의 객체가 반환된다.
- await() 메소드를 통해 스레드를 블로킹 상태로 만드는데, 이 때 조건 변수를 만든 Lock의 unlock 메소드를 원자적으로 실행하게 된다. 따라서 다른 스레드에서 해당 락을 얻어 작업을 수행할 수 있게 된다.
- 다른 스레드에서 signal()을 호출하면 대기 중인 스레드가 깨어나지만, unlock을 했으므로 다시 lock을 얻어야 한다. 따라서 스레드가 unlock()을 호출하여 lock을 얻기까지 계속 대기 상태에 있어야 한다.
public static class SimpleCountDownLatch {
private int count;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public SimpleCountDownLatch(int count) {
this.count = count;
if (count < 0) {
throw new IllegalArgumentException("count cannot be negative");
}
}
public void await() throws InterruptedException {
lock.lock();
try {
while (count > 0) {
condition.await();
}
} finally {
lock.unlock();
}
}
public void countDown() {
lock.lock();
try {
if (count > 0) {
count--;
if (count == 0) {
condition.signalAll();
}
}
} finally {
lock.unlock();
}
}
public int getCount() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
}
wait() & notify{All}()
- 조건 변수와 같은 것으로, 대신 이는 synchronized와 같이 사용된다.
- wait()을 통해 Lock을 해제함과 동시에 블로킹 상태에 들어가고, notify()을 통해 wait()을 호출한 스레드를 깨우게 된다.
- 이때도 마찬가지로 nitify를 호출한 스레드가 아직 Lock을 가지고 있으면 lock을 풀고 나서 wait()을 호출한 스레드가 실행된다.
- 조건 변수와 마찬가지로 notify의 신호는 synchronized에 설정된 객체와 동일한 메소드에만 전달된다.
public static void main(String[] args) throws InterruptedException {
StateClass stateClass = new StateClass();
Thread change = new ChangeStateThread(stateClass);
Thread check1 = new CheckStateThread(stateClass);
Thread check2 = new CheckStateThread(stateClass);
check1.start();
check2.start();
Thread.sleep(2000);
change.start();
}
public static class ChangeStateThread extends Thread{
private final StateClass stateClass;
public ChangeStateThread(StateClass stateClass) {
this.stateClass = stateClass;
}
@Override
public void run() {
stateClass.changeState();
}
}
public static class CheckStateThread extends Thread{
private final StateClass stateClass;
public CheckStateThread(StateClass stateClass) {
this.stateClass = stateClass;
}
@Override
public void run() {
try {
stateClass.isTrueState();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public static class StateClass{
private boolean state =false;
public synchronized void changeState(){
state =true;
notifyAll();
}
public synchronized void isTrueState() throws InterruptedException {
System.out.println("enter");
while(!state){
wait();
}
System.out.println("State is " + state);
}
}
//출력 :
//enter
//enter
//State is ture
//State is true