内容字号:默认大号超大号

段落设置:段首缩进取消段首缩进

字体设置:切换到微软雅黑切换到宋体

Java并发基础:并发工具类(2)

2018-09-11 21:36 出处:清屏网 人气: 评论(0

并发工具类

本系列文章主要讲解 Java 并发相关的内容,包括同步、锁、信号量、阻塞队列、线程池等,整体思维导图如下:

本文主要以实例讲解 Semaphore 、阻塞队列等内容。

Semaphore

基本概念和用途

Semaphore 常称信号量,其维护了一个许可集,可以用来控制线程并发数。线程调用 acquire() 方法去或者许可证,然后执行相关任务,任务完成后,调用 release() 方法释放该许可证,让其他阻塞的线程可以运行。

Semaphore 可以用于流量控制,尤其是一些公共资源有限的场景,比如数据库连接。假设我们上面的账户余额管理中的账户修改操作涉及到去更改 mysql 数据库,为了避免数据库并发太大,我们进行相关限制。

常用方法

Semaphore(int permits) :构造方法,初始化许可证数量

void acquire() :获取许可证

void release() :释放许可证

int availablePermits() :返回此信号量中当前可用的许可证数。

int getQueueLength() :返回正在等待获取许可证的线程数。

boolean hasQueuedThreads() :是否有线程正在等待获取许可证。

void reducePermits(int reduction) :减少reduction个许可证。是个protected方法。

Collection getQueuedThreads() :返回所有等待获取许可证的线程集合。是个protected方法。

运行示例

虽然在代码中设置了 20 个线程去运行,但同时设置了许可证的数量为 5 ,因而实际的最大并发数还是 5

package com.aidodoo.java.concurrent;

import java.util.concurrent.*;

/**
 * Created by zhangkh on 2018/9/9.
 */
public class SemaphoreDemo {
    public static void main(String[] args){
        Semaphore semaphore=new Semaphore(5);
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        Account account=new Account();
        for(int i=0;i<20;i++){
            SpenderWithSemaphore spender = new SpenderWithSemaphore(account, semaphore);
            executorService.submit(spender);
        }

        executorService.shutdown();
    }
}
class SpenderWithSemaphore implements Runnable {
    private final Account account;
    private final Semaphore semaphore;

    public SpenderWithSemaphore(Account account, Semaphore semaphore) {
        this.account = account;
        this.semaphore = semaphore;
    }

    @Override
    public void run() {
        try{
            semaphore.acquire();
            System.out.println(String.format("%s get a premit at time %s,change and save data to mysql",Thread.currentThread().getName(),System.currentTimeMillis()/1000));
            Thread.sleep(2000);
        }catch (InterruptedException e){
            e.printStackTrace();
        }finally {
//            System.out.println(String.format("%s release a premit",Thread.currentThread().getName()));
            semaphore.release();
        }
    }
}

获取许可证后,模拟操作 mysql ,我们让线程睡眠 2 秒,程序输出如下:

pool-1-thread-2 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-5 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-3 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-4 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-1 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-8 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-7 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-6 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-9 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-10 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-11 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-13 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-12 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-14 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-15 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-16 get a premit at time 1536480864,change and save data to mysql
pool-1-thread-17 get a premit at time 1536480864,change and save data to mysql
pool-1-thread-19 get a premit at time 1536480864,change and save data to mysql
pool-1-thread-18 get a premit at time 1536480864,change and save data to mysql
pool-1-thread-20 get a premit at time 1536480864,change and save data to mysql

可以看到前面 5 个线程同一时间 1536480858 获得许可证,然后执行操作,并不是 20 个线程一起操作,这样能降低对 mysql 数据库的影响。

如果把上面 Semaphore 的构造方法中的许可证数量改为 20 ,大家可以看到 20 个线程的运行时间基本一致。

源码实现

Semaphore 实现直接基于 AQS ,有公平和非公平两种模式。公平模式即按照调用 acquire() 的顺序依次获得许可证,遵循 FIFO (先进先出),非公平模式是抢占式的,谁先抢到先使用。

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

获取许可证

acquire() 方法最终调用父类 AQS 中的 acquireSharedInterruptibly 方法。

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)              //(1)
        doAcquireSharedInterruptibly(arg);      //(2)
}

(1):调用 tryAcquireShared ,尝试去获取许可证

(2):如果获取失败,则调用 doAcquireSharedInterruptibly ,将线程加入到等待队列中

tryAcquireShared 方法由 Semaphore 的内部类,同时也是 AQS 的子类去实现,即 NonfairSyncFairSync ,下面我们以 NonfairSync 为例说明其实现。

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

nonfairTryAcquireShared 方法如下:

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();             //(1)
        int remaining = available - acquires;   //(2)
        if (remaining < 0 ||
            compareAndSetState(available, remaining)) (3)
            return remaining;
    }
}

(1):获取 state 的值,也就是总许可证数量

(2):计算本次申请后,剩余的许可证数量

(3):如果剩余的许可证数量大于 0 且通过 CASstate 的值修改成功后,返回剩余的许可证数量,否则继续循环阻塞。

释放许可证

release() 方法的调用最终会调用父类 AQSreleaseShared() 方法:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {        //(1)
        doReleaseShared();              //(2)
        return true;
    }
    return false;
}

(1):尝试释放许可证

(2):如果释放许可证成功,则通知阻塞的线程,让其执行

tryReleaseShared 方法很简单,基本上是 nonfairTryAcquireShared 的逆过程,即增加许可证的数量,并通过 CAS 修改 state 的值。

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

BlockingQueue

基本概念

阻塞队列主要是解决如何高效安全传输数据的问题,此外能降低程序耦合度,让代码逻辑更加清晰。

其继承了 Queue ,并在其基础上支持了两个附加的操作:

  • 当队列为空时,获取元素的线程会阻塞,等待队列变为非空
  • 当队列满时,添加元素的线程会阻塞,等待队列可用

比较典型的使用场景是生产者和消费者。

BlockingQueue 根据对于不能立即满足但可能在将来某一时刻可以满足的操作,提供了不同的处理方法,进而导致众多的 api 操作:

  Throws exception Special value Blocks Times out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek()} not applicable not applicable

Throws exception :指当阻塞队列满时候,再往队列里插入元素,会抛出 IllegalStateException 异常。当队列为空时,从队列里获取元素时会抛出 NoSuchElementException 异常

Special value :插入方法会返回是否成功,成功则返回 true 。移除方法,则是从队列里拿出一个元素,如果没有则返回 null

Blocks :当阻塞队列满时,如果生产者线程往队列里 put 元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里 take 元素,队列也会阻塞消费者线程,直到队列可用。

Time out :当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。

整体架构和类图

Java 并发包根据不同的结构和功能提供了不同的阻塞队列,整体类图如下:

其中 BlockingQueue 有如下子类:

ArrayBlockingQueue
DelayQueue
PriorityBlockingQueue
SynchronousQueue
LinkedBlockingQueue

其中 BlockingDeque 有一个子类:

  • LinkedBlockingDeque :一个由链表结构组成的双向阻塞队列。

    BlockingDeque 作为双端队列,针对头部元素,还提供了如下方法:

First Element (Head)
  Throws exception Special value Blocks Times out
Insert addFirst(e) offerFirst(e) putFirst(e) offerFirst(e, time, unit)
Remove removeFirst() pollFirst() takeFirst() pollFirst(time, unit)
Examine getFirst() peekFirst() not applicable not applicable

针对尾部元素

Last Element (Tail)
  Throws exception Special value Blocks Times out
Insert addLast(e) offerLast(e) putLast(e) offerLast(e, time, unit)
Remove removeLast() pollLast() takeLast() pollLast(time, unit)
Examine getLast() peekLast() not applicable not applicable

使用示例

一个典型的生产者和消费者实例如下,一个 BlockingQueue 可以安全地与多个生产者和消费者一起使用, Producer 线程调用 NumerGenerator . getNextNumber() 生成自增整数,不断地写入数字,然后 Consumer 循环消费。

package com.aidodoo.java.concurrent;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by zhangkh on 2018/7/17.
 */
public class BlockingQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue queue = new ArrayBlockingQueue(1024,true);
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        for (int i = 0; i < 5; i++) {
            executorService.submit(new Producer(queue));
        }
        for (int i = 0; i < 3; i++) {
            executorService.submit(new Consumer(queue));
        }
        Thread.sleep(30 * 1000L);
        executorService.shutdown();
    }
}

class Producer implements Runnable {
    Logger logger = LoggerFactory.getLogger(Producer.class.getName());
    protected BlockingQueue queue = null;
    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            for(int i=0;i<3;i++){
                int num = NumerGenerator.getNextNumber();
                queue.put(num);
                Thread.sleep(1000);
                logger.info("{} producer put {}", Thread.currentThread().getName(), num);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


class Consumer implements Runnable {
    Logger logger = LoggerFactory.getLogger(Consumer.class.getName());

    protected BlockingQueue queue = null;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                int ele = (int) queue.take();
                logger.info("{} Consumer take {}", Thread.currentThread().getName(), ele);
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class NumerGenerator{
    private static AtomicInteger count = new AtomicInteger();
    public static Integer getNextNumber(){
        return count.incrementAndGet();
    }
}

程序输出如下:

18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 1
18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 2
18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 3
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-3 producer put 3
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-2 producer put 2
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-1 producer put 1
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-5 producer put 5
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-4 producer put 4
18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 4
18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 5
18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 6
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-3 producer put 6
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-1 producer put 8
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-2 producer put 7
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-5 producer put 9
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-4 producer put 10
18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 7
18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 8
18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 9
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-1 producer put 12
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-3 producer put 11
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-5 producer put 14
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-4 producer put 15
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-2 producer put 13
18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 10
18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 11
18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 12
18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 13
18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 14
18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 15

其他 BlockingQueue 子类的使用可参考对应的 Java Api

源码分析

由于 BlockingQueue 相关的子类众多,我们仅以 ArrayBlockingQueue 从源码角度分析相关实现。

构造方法

ArrayBlockingQueue 中定义的成员变量如下:

final Object[] items; 
int takeIndex;
int putIndex;
int count;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
transient Itrs itrs = null

各变量的解释如下,以便了解后续的代码:

items
takeIndex
putIndex
count
notEmpty
notFull
itrs

内部结构如下:

构造方法如下:

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);  //(1)                               
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];      //(2)               
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();         //(3)
    notFull =  lock.newCondition();         //(4)
}
public ArrayBlockingQueue(int capacity, boolean fair,
                          Collection<? extends E> c) {
    this(capacity, fair);

    final ReentrantLock lock = this.lock;
    lock.lock(); // Lock only for visibility, not mutual exclusion
    try {
        int i = 0;
        try {
            for (E e : c) {                 //(5)
                checkNotNull(e);
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}

(1):默认情况下,非公平模式,即抢占式

(2):数组初始化

(3)/(4):条件变量初始化

(5):如果构造方法中,含有初始化集合的话,则将对应元素添加到内部数组,并更改 countputIndex 的值。

插入数据

插入数据,我们主要看 put() 方法的实现,重点看生产者和消费者插入和获取数据时,线程何时阻塞,同时又何时唤醒。

public void put(E e) throws InterruptedException {
    checkNotNull(e);                        //(1)
    final ReentrantLock lock = this.lock;   //(2)
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();                //(3)
        enqueue(e);
    } finally {
        lock.unlock();                      //(4)
    }
}

private void enqueue(E x) {
final Object[] items = this.items;
    items[putIndex] = x;                    //(5)
    if (++putIndex == items.length)         //(6)
        putIndex = 0;
    count++;                                //(7)
    notEmpty.signal();                      //(8)
}
( 1 ):非空检查,插入的元素不能为 null ,否则抛出 NullPointerException

( 2 ):获取互斥锁

( 3 ):如果当前队列的元素个数等于队列总长度,即队列已满,则通过条件变量,释放和 notFull 相关的锁,当前线程阻塞。当前线程唤醒的条件如下:

  • 其他某个线程调用此 Conditionsignal() 方法,并且碰巧将当前线程选为被唤醒的线程;
  • 或者其他某个线程调用此 ConditionsignalAll() 方法;
  • 或者其他某个线程中断当前线程,且支持中断线程的挂起;
  • 或者发生“虚假唤醒”

(5):如果队列未满,则将元素添加的 putIndex 索引的位置

(6): putIndex 增加 1 后和队列长度相等,即已到达队列尾部,则 putIndex0
(7):队列已有元素数量加 1

(8):通知 notEmpty 条件变量,唤醒等待获取元素的线程

(4):释放互斥锁

可以看到 ArrayBlockingQueue 每次插入元素后,都会去唤醒等待获取元素的线程。

获取数据

take() 方法源码如下:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;   //(1)
    lock.lockInterruptibly();
    try {
        while (count == 0)                  
            notEmpty.await();               //(2)
        return dequeue();
    } finally {
        lock.unlock();                      //(9)
    }
}

private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];             //(3)
    items[takeIndex] = null;                //(4)
    if (++takeIndex == items.length)
        takeIndex = 0;                      //(5)
    count--;                                //(6)
    if (itrs != null)
        itrs.elementDequeued();             //(7)
    notFull.signal();                       //(8)
    return x;
}

(1):获取互斥锁

(2):如果 count0 ,即队列为空,则释放互斥锁,然后挂起当前线程

(3):根据 takeIndex 索引到数组中获取具体的值,并赋值给 x

(4):赋值完成后, takeIndex 索引位置数据置 null ,便于回收

(5): takeIndex1 ,然后和队列长度比较,如果相等,即已经读取到队列尾部, takeIndex0
(6):获取后,将队列元素个数 count1

(7):维护和 queue 相关的迭代器

(8):唤醒等待插入元素的线程

(9):释放互斥锁

可以看到 ArrayBlockingQueue 每次获取元素后,都会唤醒等待插入元素的线程。

迭代器

在分析源码前,我们先看在一个迭代器的示例

package com.aidodoo.java.concurrent;

import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * Created by zhangkh on 2018/9/10.
 */
public class ArrayBlockingQueueIterDemo {
        public static void main(String[] args) throws InterruptedException{
            BlockingQueue<String> queue=new ArrayBlockingQueue(5);
            queue.put("hadoop");
            queue.put("spark");
            queue.put("storm");
            queue.put("flink");

            Iterator<String> iterator1 = queue.iterator();
            System.out.println( queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println();
            while(iterator1.hasNext()) {
                System.out.println(iterator1.next());
            }
            System.out.println();
            Iterator<String> iterator2 = queue.iterator();
            while(iterator2.hasNext()) {
                System.out.println(iterator2.next());
            }
        }
}

程序输出如下:

hadoop
spark
storm

hadoop
flink

flink

我们结合这个示例来具体分析数据插入和获取时,内部成员变量的值

当分别插入 hadoopsparkstormflink 四个元素后,内部变量的值如下:

此时, ArrayBlockingQueue 的成员变量的值 itrsnull

调用 iterator() 方法后,源码如下:

public Iterator<E> iterator() {
    return new Itr();                   //(1)
}

Itr() {
lastRet = NONE;
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();                            //(2)
try {
    if (count == 0) {                   //(3)
        cursor = NONE;
        nextIndex = NONE;
        prevTakeIndex = DETACHED;
    } else {
        final int takeIndex = ArrayBlockingQueue.this.takeIndex;
        prevTakeIndex = takeIndex;
        nextItem = itemAt(nextIndex = takeIndex);   //(4)
        cursor = incCursor(takeIndex);              //(5)
        if (itrs == null) {
            itrs = new Itrs(this);                  //(6)
        } else {
            itrs.register(this);                    //(7)
            itrs.doSomeSweeping(false);
        }
        prevCycles = itrs.cycles;
    }
} finally {
    lock.unlock();                                  //(8)
}

}

(1):调用内部类 Itr 的构造方法

(2):获取外部类即 ArrayBlockingQueue 的锁

(3):没有没有元素,初始化变量值。内部类 Itr 的成员变量如下:

/** Index to look for new nextItem; NONE at end */
private int cursor;

/** Element to be returned by next call to next(); null if none */
private E nextItem;

/** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
private int nextIndex;

/** Last element returned; null if none or not detached. */
private E lastItem;

/** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
private int lastRet;

/** Previous value of takeIndex, or DETACHED when detached */
private int prevTakeIndex;

/** Previous value of iters.cycles */
private int prevCycles;

(4):将外部类的 takeIndex 赋值给内部类 nextIndex ,并获取数组具体的值赋值给 nextItem
(5):计算游标 cursor 的下个值,其中 incCursor 方法如下:

private int incCursor(int index) {
    // assert lock.getHoldCount() == 1;
    if (++index == items.length)
        index = 0;
    if (index == putIndex)
        index = NONE;
    return index;
}

(6):注册,主要是维护链表

(7):清理 itrs

(8):释放外部类的互斥锁

在上面的示例中,调用 iterator() 方法后, Itr 的内部变量值如下:

由于后面三次调用了 queue . take() ,依次输出 hadoopsparkstorm 后,相关成员变量的值见图片标识,重点关注 takeIndex = 3

当调用 next() 方法时,代码如下:

public E next() {
    final E x = nextItem;
    if (x == null)
        throw new NoSuchElementException();
    final ReentrantLock lock = ArrayBlockingQueue.this.lock;
    lock.lock();
    try {
        if (!isDetached())          //(1)
            incorporateDequeues();
        lastRet = nextIndex;
        final int cursor = this.cursor;
        if (cursor >= 0) {
            nextItem = itemAt(nextIndex = cursor);
            this.cursor = incCursor(cursor);
        } else {
            nextIndex = NONE;
            nextItem = null;
        }
    } finally {
        lock.unlock();
    }
    return x;
}

其中(1)处的 isDetached 方法如下

boolean isDetached() {
    // assert lock.getHoldCount() == 1;
    return prevTakeIndex < 0;
}

由于我们示例中初始化 Itr 的时候的 prevTakeIndex0 ,故 isDetached 返回为 false ,程序将调用 incorporateDequeues 方法,根据注释我们也知道,该方法主要是调整和迭代器相关的内部索引。

/**
 * Adjusts indices to incorporate all dequeues since the last
 * operation on this iterator.  Call only from iterating thread.
 */
private void incorporateDequeues() {
    final int cycles = itrs.cycles;
    final int takeIndex = ArrayBlockingQueue.this.takeIndex;
    final int prevCycles = this.prevCycles;
    final int prevTakeIndex = this.prevTakeIndex;

    if (cycles != prevCycles || takeIndex != prevTakeIndex) {
        final int len = items.length;
        // how far takeIndex has advanced since the previous
        // operation of this iterator
        long dequeues = (cycles - prevCycles) * len
            + (takeIndex - prevTakeIndex);

        // Check indices for invalidation
        if (invalidated(lastRet, prevTakeIndex, dequeues, len))
            lastRet = REMOVED;
        if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
            nextIndex = REMOVED;
        if (invalidated(cursor, prevTakeIndex, dequeues, len))
            cursor = takeIndex;

        if (cursor < 0 && nextIndex < 0 && lastRet < 0)
            detach();
        else {
            this.prevCycles = cycles;
            this.prevTakeIndex = takeIndex;
        }
    }
}

注意 cursor = takeIndex 这句代码,将外部内的 takeIndex 赋值给 cursor ,这样子将队列和迭代器数据读取进行了同步。

对于 iterator1 ,第一次调用 next() 方法时, cursor 被赋值为 3 首先将 nextItem 的值保持在 x 变量中,即 hadoop 字符串。

然后设置 nextItemcursor 的值

nextItem = itemAt(nextIndex = cursor);
this.cursor = incCursor(cursor);

设置完成后, nextItemflink , cursor 为- 1

最后返回保存在 x 变量中的值,即返回 hadoop 字符串。

第二次调用 next() 方法时,输出的值即上次保存的 nextItem 值,即 flink 字符串。

迭代器运行过程中,相关变量内容如下:

至于 iterator2 迭代器,各位可以自己去分析,不再赘述。

本文主要以实例讲解 Semaphore 、阻塞队列,并分析了相关核心源码实现。

本文参考

Java 7 Concurrency Cookbook

concurrency-modle-seven-week

java-concurrency

java-util-concurrent

java se 8 apidoc

分享给小伙伴们:
本文标签: Java

相关文章

发表评论愿您的每句评论,都能给大家的生活添色彩,带来共鸣,带来思索,带来快乐。

CopyRight © 2015-2016 QingPingShan.com , All Rights Reserved.

清屏网 版权所有 豫ICP备15026204号