springboot-43-并发调用并异步组装Set
(2018-02-09 14:11:10)分类: 微服务 |
一、 背景
主线程希望能够同时调用多个相同服务,然后统一进行组装。
l
l
二、实现
1.
@Configuration
@EnableConfigurationPrope
@EnableAsync
public class AsyncConfig {
//
//
}
2.
package com.scn7th.concurrentSet;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Service;
import java.util.Random;
import java.util.concurrent.Future;
import static com.scn7th.concurrentSet.MyConcurrentService.MAX_WAIT_TIME;
@Service
@EnableAsync
public class MyConcurrentSetService {
@Async("concurrentExecutor")
public Future<Double> getResponse() {
new Random(); Random random =
Double result = random.nextDouble();
sleepTime = (long) (MAX_WAIT_TIME * 0.1); long
{ try
; Thread.sleep(sleepTime)
AsyncResult<>(result); return new
} catch (InterruptedException e) {
; e.printStackTrace()
}
return null;
}
}
3.
package com.scn7th.concurrentSet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@Service
public class MyConcurrentService {
public static long MAX_WAIT_TIME = 1000;
@Autowired
private MyConcurrentSetService myConcurrentSetService;
@Async("concurrentExecutor")
public void getResult(Set<Double> doubleSet) {
myConcurrentSetService.getResponse(); Future<Double> future =
{ try
MAX_WAIT_TIME, TimeUnit.MILLISECONDS); Double result = future.get(
doubleSet.add(result);
} catch (InterruptedException e) {
; e.printStackTrace()
} catch (ExecutionException e) {
; e.printStackTrace()
} catch (TimeoutException e) {
; e.printStackTrace()
}
}
}
4.
这里的结果集合必须使用线程安全的CopyOnWriteArraySet,贸然使用HashSet可能导致结果数量少于调用数量。在允许最大等待时间内,按照某一时间间隔比较结果集合中的结果数量是否等于总调用次数,一旦相等,说明所有线程都已得到响应,结果组装完毕。否则继续轮询比较直到超时结束。
package com.scn7th.concurrentSet;
import com.scn7th.MultiThreadApplicationTests ;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import static com.scn7th.concurrentSet.MyConcurrentService.MAX_WAIT_TIME;
import static com.scn7th.config.AsyncConfig.concurrentMaxThreadCount;
public class CurrentSetTest extends MultiThreadApplicationTests{ private static final Logger logger = LoggerFactory.getLogger(CurrentSetTest.class);
@Autowired
private MyConcurrentService myConcurrentService;
@Test
public void testSet() throws InterruptedException {
//必须使用CopyOnWriteArraySet并发集,普通集合线程不安全
Set<Double> doubles = new CopyOnWriteArraySet<>();
(int i = 0; i < concurrentMaxThreadCount for ; i++) {
myConcurrentService.getResult(doubles);
//获取结果:" + doubles); logger.info("
}
long startTime = System.currentTimeMillis();
endTime = System.currentTimeMillis(); long
(endTime - startTime <= MAX_WAIT_TIME) { while
100); Thread.sleep(
endTime = System.currentTimeMillis();
logger.info("获取结果:" + doubles.size() + ";执行时间:" + (endTime - startTime));
(doubles.size() == concurrentMaxThreadCount if ) {
break;
}
out.println(doubles.size()); }
System.
}
}
三、改进
1.
在上文中,其实是使用了2n线程执行了n个任务,存在线程浪费和上下文切换资源消耗。同时等价于下面这种写法,并通过定量计算来有一个认识,线程池固定大小为20。
1)
@Data
@AllArgsConstructor
@NoArgsConstructor
public class TimeInterval {
private long start;
sleep; private long
}
2)
@Autowired
private ThreadPoolTaskScheduler scheduler;
@Test
public void testLocalFunction() throws InterruptedException, ExecutionException {
int i = 0;
List<Future<TimeInterval>> futureList = new CopyOnWriteArrayList<>();
CopyOnWriteArrayList<TimeInterval> result = new CopyOnWriteArrayList<>();
(;i < 20; i++) { for
scheduler.submit(this::test); Future<TimeInterval> future =
futureList.add(future);
}
for(Future<TimeInterval> future : futureList) {
scheduler.submit(() -> getFuture(future, result));
}
3000); Thread.sleep(
System.out.println(result.size());
}
private TimeInterval test() {
new Random(); Random random =
randomSleep = random.nextInt(500); int
log.info("大于----{}", randomSleep);
TimeInterval interval = new TimeInterval(System.currentTimeMillis(), randomSleep);
{ try
; Thread.sleep(randomSleep)
} catch (InterruptedException e) {
; e.printStackTrace()
}
return interval;
}
private void getFuture(Future<TimeInterval> future, CopyOnWriteArrayList<TimeInterval> result) {
try {
; TimeInterval i = future.get()
result.add(i);
log.info("result:{}-{}", i.getSleep(), System.currentTimeMillis() - i.getStart());
out.println("result:" + i.getSleep()+ "-" + (System.currentTimeMillis() - i.getStart())); System.
} catch (InterruptedException e) {
; e.printStackTrace()
} catch (ExecutionException e) {
; e.printStackTrace()
}
}
3)
可以发现,我们的线程池scheduler中的线程既要执行test方法,在后续同样需要获得自己的future然后get,也就是说,某线程必须先完成自己的任务test,然后才能通过future.get(),显然存在线程资源不足导致的阻塞,以及线程上下文切换带来浪费,即sleep时间只有137,整个线程却花了245毫秒完成。能不能在一个线程中即执行自己的任务,然后又获取future.get(),然后返回给总线程呢?
result:245-245
result:137-245
result:255-255
result:17-252
result:287-287
result:153-339
result:35-339
result:339-339
result:348-348
result:351-351
result:381-381
result:397-397
result:403-403
result:423-423
result:443-443
result:461-461
result:464-464
result:473-473
2.
1)
这个warning还是上面的20个线程的线程池。
@Service
@Slf4j
public class PrintService {
@Async("warning")
public void doAndGetAsync(CopyOnWriteArrayList<TimeInterval> result) {
new AsyncResult<>(testTimeInterval()); AsyncResult<TimeInterval> asyncResult =
{ try
; TimeInterval i = asyncResult.get()
result.add(i);
log.info("result:{}-{}", i.getSleep(), System.currentTimeMillis() - i.getStart());
} catch (ExecutionException e) {
; e.printStackTrace()
}
private TimeInterval testTimeInterval() { }
new Random(); Random random =
randomSleep = random.nextInt(500); int
log.info("大于----{}", randomSleep);
TimeInterval interval = new TimeInterval(System.currentTimeMillis(), randomSleep);
{ try
; Thread.sleep(randomSleep)
} catch (InterruptedException e) {
; e.printStackTrace()
}
return interval;
}
}
2)
@Test
public void testAsyncGetInSameThread() throws InterruptedException {
int i = 0;
CopyOnWriteArrayList<TimeInterval> result = new CopyOnWriteArrayList<>();
(;i < 250; i++) { for
printService.doAndGetAsync(result);
}
10000); Thread.sleep(
System.out.println(result.size());
}
3)
result:4-4
result:36-36
result:47-48
result:61-61
result:65-65
result:80-80
result:86-86
result:93-94
result:103-104
result:177-178
result:180-181
result:199-200
result:209-210
result:229-230
result:262-263
result:318-319
result:419-419
result:433-433
result:443-443
result:469-470