KAFKA-2312: use atomic long for thread id reference; reviewed by Ewen Cheslack-Postava, Jason Gustafson, Ismael Juma and Guozhang Wang

This commit is contained in:
Tim Brooks 2015-07-13 13:13:02 -07:00 committed by Guozhang Wang
parent 4aba4bc1df
commit 69b451e289
1 changed files with 6 additions and 4 deletions

View File

@ -51,6 +51,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.kafka.common.utils.Utils.min;
@ -395,6 +396,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
private static final long LATEST_OFFSET_TIMESTAMP = -1L;
private static final long NO_CURRENT_THREAD = -1L;
private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
private final Coordinator coordinator;
@ -417,7 +419,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
// currentThread holds the threadId of the current thread accessing KafkaConsumer
// and is used to prevent multi-threaded access
private final AtomicReference<Long> currentThread = new AtomicReference<Long>();
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
// refcount is used to allow reentrant access by the thread who has acquired currentThread
private final AtomicInteger refcount = new AtomicInteger(0);
@ -1355,8 +1357,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
private void acquire() {
ensureNotClosed();
Long threadId = Thread.currentThread().getId();
if (!threadId.equals(currentThread.get()) && !currentThread.compareAndSet(null, threadId))
long threadId = Thread.currentThread().getId();
if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
refcount.incrementAndGet();
}
@ -1366,6 +1368,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
private void release() {
if (refcount.decrementAndGet() == 0)
currentThread.set(null);
currentThread.set(NO_CURRENT_THREAD);
}
}