ForkJoin的基本使用

简介: “分而治之” 按照设定的阈值进行分解成多个计算,然后将各个计算结果进行汇总。相应的ForkJoin将复杂的计算当做一个任务。而分解的多个计算则是当做一个子任务。将大的任务缩小化,forkjoin是你成为架构师的必经之路。

1.工作窃取算法

fork/join优秀的地方就在于这个算法,假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一 一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

想到真正意义的理解,更多的还是要查看源码。

2.fork/join的基本使用

(1)ForkjoinTask

是任务本身,使用它来创建任务,提供fork(),join(),compute()等核心方法,提供两个实现子类:

RecursiveTask :需要返回值时;
RecursiveAction:不需要返回值时;

(2) ForkJoinPool:

ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务.

(3)基本使用实例

如有很多的数据需要往数据库中存储,下面使用forkjoin来实现对其操作

package com.dangle.MyThread;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**
 * @author by danglea on 2019/11/7
 */
public class MyForkJoin extends RecursiveTask<Integer> {
    List<Integer> list; //模拟数据库数据
    public  MyForkJoin(List<Integer> list){   //构造方法
        this.list=list;
    }

    @Override
    protected Integer compute() {  //重写RecursiveTask的compute方法
        if(list.size()<3){
            System.out.print("小于3");
            return computeDirectly();
        }else{
            int size=list.size();
            MyForkJoin my1=new MyForkJoin(list.subList(0,size/2)); //分而治之思想
            MyForkJoin my2=new MyForkJoin(list.subList(size/2,list.size()));
            //
            invokeAll(my1,my2); 
            /*my1.fork();分别执行子任务
            my2.fork();*/
            return my1.join()+my2.join();
        }
    }
    private Integer computeDirectly() {
        try {

            Thread.sleep(1000);
        }catch (Exception e){
            e.printStackTrace();
        }
        System.out.println("插入了"+ Arrays.toString(list.toArray())+list.size());
        return list.size();
    }

    public static void main(String[] args) throws  Exception {
        ForkJoinPool forkJoinPool = new ForkJoinPool(8);
        List<Integer> list = new ArrayList<>();
        list.add(1);
        list.add(1);
        list.add(1);
        list.add(1);
        list.add(1);
        list.add(1);
        list.add(1);
        list.add(1);
        list.add(1);
        list.add(1);
        list.add(1);
        list.add(1);

        MyForkJoin batchInsertTask = new MyForkJoin(list);
        long t1 = System.currentTimeMillis();
        ForkJoinTask<Integer> reslut = forkJoinPool.submit(batchInsertTask);
        System.out.println(reslut.get());
        long t2 = System.currentTimeMillis();
        System.out.println(t2-t1);

    }
}

invokeAll()和fork()的区别,为什么fork的执行时间长呢?

线程池的线程是有数,比如现在有400的任务,有四个工人,1,2,3,4使用fork方法,就使用将400的任务分给1,2,每个200,但是1,2不干活只是分配任务,分给3,4.但是3,4也是继续向下分分给其他的一直分到100,所以执行结束就需要7个工人在一天的情况下,而invokeAll方法则是直接1,2,3,4.完成·任务线程的充分利用,而不存在等待的线程。

# 基础 

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×