加载中…
个人资料
  • 博客等级:
  • 博客积分:
  • 博客访问:
  • 关注人气:
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
正文 字体大小:

springboot-43-并发调用并异步组装Set

(2018-02-09 14:11:10)
分类: 微服务

一、 背景

主线程希望能够同时调用多个相同服务,然后统一进行组装。

l  情形1:例如进行大量查询1000条数据,分页上限为200条,那么主线程需要循环调用5次。如果数据提供者功能强大,主线程可以启动5个子进程并发调用查询服务,然后进行组装,这样在线程池允许的情况下就能提高5倍以上的速度。

l  情形2:主线程希望能够调用分布于10台负载上的某服务,要么同步循环10次,也可以并发调用然后统一组装。

二、实现

1.         springboot异步设置

 

@Configuration
@EnableConfigurationProperties
(MyAsyncProperties.class)
@EnableAsync
public class AsyncConfig {
   
@Autowired
   
private MyAsyncProperties myAsyncProperties;

    public static int
concurrentMaxThreadCount = 500;

   
@Bean
   
public ThreadPoolTaskExecutor concurrentExecutor() {
        ThreadPoolTaskExecutor executor =
new ThreadPoolTaskExecutor();
       
executor.setCorePoolSize(concurrentMaxThreadCount);
       
executor.setMaxPoolSize(concurrentMaxThreadCount);
       
executor.setQueueCapacity(concurrentMaxThreadCount * 2);
       
executor.setKeepAliveSeconds(20);
       
//以下两个属性的功能在MyContextClosedHanlder手动实现
        //
程序关闭前等待线程池中线程执行
//        executor.setWaitForTasksToCompleteOnShutdown(true);
        //
程序关闭前等待线程池中线程执行,50秒后强制销毁
//        executor.setAwaitTerminationSeconds(50);
       
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
       
executor.initialize();
        return
executor;
   
}
}

 

2.         异步调用服务——基础异步服务

 

package com.scn7th.concurrentSet;

import
org.springframework.scheduling.annotation.Async;
import
org.springframework.scheduling.annotation.AsyncResult;
import
org.springframework.scheduling.annotation.EnableAsync;
import
org.springframework.stereotype.Service;

import
java.util.Random;
import
java.util.concurrent.Future;

import static
com.scn7th.concurrentSet.MyConcurrentService.MAX_WAIT_TIME;


@Service
@EnableAsync
public class MyConcurrentSetService {
   
@Async("concurrentExecutor")
   
public Future<Double> getResponse() {
        Random random =
new Random();
       
Double result = random.nextDouble();
        long
sleepTime = (long) (MAX_WAIT_TIME * 0.1);
        try
{
            Thread.sleep(sleepTime)
;
            return new
AsyncResult<>(result);
       
} catch (InterruptedException e) {
            e.printStackTrace()
;
       
}
       
return null;
   
}
}

 

3.         限时处理——异步调用基础异步服务

 

package com.scn7th.concurrentSet;

import
org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.scheduling.annotation.Async;
import
org.springframework.stereotype.Service;

import
java.util.Set;
import
java.util.concurrent.ExecutionException;
import
java.util.concurrent.Future;
import
java.util.concurrent.TimeUnit;
import
java.util.concurrent.TimeoutException;


@Service
public class MyConcurrentService {
   
public static long MAX_WAIT_TIME = 1000;
   
@Autowired
   
private MyConcurrentSetService myConcurrentSetService;
   
@Async("concurrentExecutor")
   
public void getResult(Set<Double> doubleSet) {
        Future<Double> future =
myConcurrentSetService.getResponse();
        try
{
            Double result = future.get(
MAX_WAIT_TIME, TimeUnit.MILLISECONDS);
           
doubleSet.add(result);
       
} catch (InterruptedException e) {
            e.printStackTrace()
;
       
} catch (ExecutionException e) {
            e.printStackTrace()
;
       
} catch (TimeoutException e) {
            e.printStackTrace()
;
       
}
    }
}

 

4.         主流程——CopyOnWriteArraySet获取结果

这里的结果集合必须使用线程安全的CopyOnWriteArraySet,贸然使用HashSet可能导致结果数量少于调用数量。在允许最大等待时间内,按照某一时间间隔比较结果集合中的结果数量是否等于总调用次数,一旦相等,说明所有线程都已得到响应,结果组装完毕。否则继续轮询比较直到超时结束。

package com.scn7th.concurrentSet;

import
com.scn7th.MultiThreadApplicationTests;
import
org.junit.Test;
import
org.slf4j.Logger;
import
org.slf4j.LoggerFactory;
import
org.springframework.beans.factory.annotation.Autowired;

import
java.util.Set;
import
java.util.concurrent.CopyOnWriteArraySet;

import static
com.scn7th.concurrentSet.MyConcurrentService.MAX_WAIT_TIME;
import static
com.scn7th.config.AsyncConfig.concurrentMaxThreadCount;


public class CurrentSetTest extends MultiThreadApplicationTests{
   
private static final Logger logger = LoggerFactory.getLogger(CurrentSetTest.class);
   
@Autowired
   
private MyConcurrentService myConcurrentService;

   
@Test
   
public void testSet() throws InterruptedException {
       
//必须使用CopyOnWriteArraySet并发集,普通集合线程不安全
       
Set<Double> doubles = new CopyOnWriteArraySet<>();
        for
(int i = 0; i < concurrentMaxThreadCount; i++) {
           
myConcurrentService.getResult(doubles);
//            logger.info("获取结果:" + doubles);
       
}
       
long startTime = System.currentTimeMillis();
        long
endTime = System.currentTimeMillis();
        while
(endTime - startTime <= MAX_WAIT_TIME) {
            Thread.sleep(
100);
           
endTime = System.currentTimeMillis();
           
logger.info("获取结果:" + doubles.size() + ";执行时间:" + (endTime - startTime));
            if
(doubles.size() == concurrentMaxThreadCount) {
               
break;
           
}
        }
        System.
out.println(doubles.size());
   
}
}

 

 

三、改进

1.       分析

在上文中,其实是使用了2n线程执行了n个任务,存在线程浪费和上下文切换资源消耗。同时等价于下面这种写法,并通过定量计算来有一个认识,线程池固定大小为20

1)       TimeInterval

 

@Data
@AllArgsConstructor
@NoArgsConstructor
public class TimeInterval {
   
private long start;
    private long
sleep;
}

2)       测试

 

@Autowired
private ThreadPoolTaskScheduler scheduler;
 
@Test
public void testLocalFunction() throws InterruptedException, ExecutionException {
   
int i = 0;
   
List<Future<TimeInterval>> futureList = new CopyOnWriteArrayList<>();
   
CopyOnWriteArrayList<TimeInterval> result = new CopyOnWriteArrayList<>();

    for
(;i < 20; i++) {
        Future<TimeInterval> future =
scheduler.submit(this::test);
       
futureList.add(future);
   
}

   
for(Future<TimeInterval> future : futureList) {
       
scheduler.submit(() -> getFuture(future, result));
   
}
    Thread.sleep(
3000);
   
System.out.println(result.size());
}

private TimeInterval test() {
    Random random =
new Random();
    int
randomSleep = random.nextInt(500);
   
log.info("大于----{}" randomSleep);
   
TimeInterval interval  = new TimeInterval(System.currentTimeMillis(), randomSleep);
    try
{
        Thread.sleep(randomSleep)
;
   
} catch (InterruptedException e) {
        e.printStackTrace()
;
   
}
   
return interval;
}

private void getFuture(Future<TimeInterval> future, CopyOnWriteArrayList<TimeInterval> result) {
   
try {
        TimeInterval i = future.get()
;
       
result.add(i);
       
log.info("result:{}-{}", i.getSleep(), System.currentTimeMillis() - i.getStart());
        System.out.println("result:" + i.getSleep()+ "-" + (System.currentTimeMillis() - i.getStart()));

   
} catch (InterruptedException e) {
        e.printStackTrace()
;
   
} catch (ExecutionException e) {
        e.printStackTrace()
;
   
}
}

 

3)       结果

可以发现,我们的线程池scheduler中的线程既要执行test方法,在后续同样需要获得自己的future然后get,也就是说,某线程必须先完成自己的任务test,然后才能通过future.get(),显然存在线程资源不足导致的阻塞,以及线程上下文切换带来浪费,即sleep时间只有137,整个线程却花了245毫秒完成。能不能在一个线程中即执行自己的任务,然后又获取future.get(),然后返回给总线程呢?

result:245-245

result:137-245

result:255-255

result:17-252

result:287-287

result:153-339

result:35-339

result:339-339

result:348-348

result:351-351

result:381-381

result:397-397

result:403-403

result:423-423

result:443-443

result:461-461

result:464-464

result:473-473

 

2.       改进

1)       同一线程执行任务并future.get()结果

这个warning还是上面的20个线程的线程池。

 

@Service
@Slf4j
public class PrintService {

   
@Async("warning")
   
public void doAndGetAsync(CopyOnWriteArrayList<TimeInterval> result) {
        AsyncResult<TimeInterval> asyncResult =
new AsyncResult<>(testTimeInterval());
        try
{
            TimeInterval i = asyncResult.get()
;
           
result.add(i);
           
log.info("result:{}-{}", i.getSleep(), System.currentTimeMillis() - i.getStart());
        
} catch (ExecutionException e) {
            e.printStackTrace()
;
       
}
    }

   
private TimeInterval testTimeInterval() {
        Random random =
new Random();
        int
randomSleep = random.nextInt(500);
       
log.info("大于----{}" randomSleep);
       
TimeInterval interval  = new TimeInterval(System.currentTimeMillis(), randomSleep);
        try
{
            Thread.sleep(randomSleep)
;
       
} catch (InterruptedException e) {
            e.printStackTrace()
;
       
}
       
return interval;
   
}
}

 

2)       测试

 


@Test
public void testAsyncGetInSameThread() throws InterruptedException {
   
int i = 0;
   
CopyOnWriteArrayList<TimeInterval> result = new CopyOnWriteArrayList<>();

    for
(;i < 250; i++) {
       
printService.doAndGetAsync(result);
   
}

    Thread.sleep(
10000);
   
System.out.println(result.size());
}

 

 

3)       结果 :没有浪费

 

result:4-4

result:36-36

result:47-48

result:61-61

result:65-65

result:80-80

result:86-86

result:93-94

result:103-104

result:177-178

result:180-181

result:199-200

result:209-210

result:229-230

result:262-263

result:318-319

result:419-419

result:433-433

result:443-443

 

result:469-470

0

阅读 收藏 喜欢 打印举报/Report
  

新浪BLOG意见反馈留言板 欢迎批评指正

新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 产品答疑

新浪公司 版权所有