Actor model을 기반으로 작성된 Akka 프레임워크를 공부하던 중, Actor가 가지고 있는 mailbox의 메시지 큐의 디폴트 값이 ConcurrentLinkedQueue라는 것을 알게 되었다. Actor의 디폴트 mailbox는 unbounded mailbox이고, 이는 java.util.concurrent.ConcurrentLinkedQueue 클래스로 구성되어 있다. ConcurrentLinkedQueue는 특성상 큐의 사이즈를 측정할 수 없는데, 만약 특정 mailbox에 메시지가 가득찼음에도 불구하고 메시지가 들어올 경우 큐가 터질 수도 있다. 이러한 고민을 시작으로 ConcurrentLinkedQueue와 LinkedBlockingQueue에 대해 알아 보았다.


Unlike in most collections, the size method is NOT a constant-time operation. Because of the asynchronous nature of these queues, determining the current number of elements requires a traversal of the elements, and so may report inaccurate results if this collection is modified during traversal.

LinkedBlockingQueue는 생성자의 인자에 큐의 용량 capacity 를 명시하여 사이즈를 지정할 수 있는 반면에, ConcurrentLinkedQueue는 큐의 사이즈를 지정할 수 없을 뿐만 아니라 size 메서드는 상수 시간에 호출되지 않아서 큐에 들어있는 원소의 개수를 파악하는 것이 어렵다.

ConcurrentLinkedQueue

This implementation employs an efficient “wait-free” algorithm based on one described in Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms by Maged M. Michael and Michael L. Scott.

자바의 java.util 에서 제공하는 Queue 클래스는 멀티 스레드 환경에서 임계영역(critical section)에 대한 동기화가 적용되어 있지 않다. 그래서 자바는 멀티 스레드 환경에서 컬렉션의 요소를 동시적으로(Concurrent) 처리 할 수 있도록 특별한 컬렉션을 제공한다. java.util.concurrent 패키지에 있는 Queue 구현체인 ConcurrentLinkedQueueNon-blocking lock-free Queue를 위해 Maged M. Michael와 Michael L. Scott의 알고리즘을 기반으로 작성되었다. 그래서 ConcurrentLinkedQueue는 큐에 꺼낼 원소가 없다면 즉시 리턴하고 다른 일을 수행하러 간다. 따라서, ConcurrentLinkedQueue는 생산자-소비자 producer-consumer 모델에서 소비자가 많고 생산자가 하나인 경우에 사용하면 좋다.

여러 개의 쓰레드에서 하나의 Queue 객체에 들어있는 데이터를 꺼내기 위해 queue.poll() 메서드를 호출할 경우, 동일한 실행 결과가 나타날 수 있다. 예를 들어, Queue에 [1, 2, 3]과 같은 데이터가 들어있을 경우, 스레드 3개가 critical section에서 poll() 메서드를 호출하면 각 스레드들은 모두 1이라는 데이터를 가져오고 Queue에는 [2, 3]만 남게 된다. 큐가 비어있을 경우 null을 리턴한다.

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

Queue<Object> queue = new ConcurrentLinkedQueue<Object>();
queue.offer(data);  // put
queue.poll();       // get

LinkedBlockingQueue

The optional capacity bound constructor argument serves as a way to prevent excessive queue expansion. The capacity, if unspecified, is equal to Integer.MAX_VALUE.

LinkedBlockingQueue 은 이름에서도 알 수 있듯이 각각의 블로킹 큐가 링크드 노드로 연결된 큐이다. 큐에서 꺼내갈 원소가 없을 경우 해당 쓰레드는 wait 상태에 들어간다. 따라서, LinkedBlockingQueue는 생산자가 많고 하나의 소비자일 경우에 사용하면 좋다. 또한 이 글의 서두에서 언급한 것처럼, LinkedBlockingQueue은 큐의 폭발을 막기 위해 생성자에 큐의 사이즈를 명시할 수 있도록 설계되었다.

LinkedBlockingQueue 내에 있는 데이터를 가져오기 retrieve 위해 poll()과 take() 메소드를 제공한다. 이 두 메소드의 차이점은 큐가 비어있을 때, poll 메소드는 null을 리턴하거나 Timeout을 설정할 수 있는 반면에, take 메소드는 꺼낼 수 있는 원소가 있을 때까지 기다린다(waiting).

import java.util.concurrent.LinkedBlockingQueue

LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();

queue.offer();    // put
queue.take();     // get

참고