使用 Semaphore 实现一个简单限流器
# Semaphore(信号量)的基本使用
Semaphore 内部实现是一个计数器 + 队列,计数器小于 0 时进入阻塞方法,直到计数器 >= 0 时放行。计数器和队列对外都是透明的。
// jdk 源码
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 限流器实现
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
private static final ExecutorService POOL = Executors.newFixedThreadPool(10);
// 任务执行时长
private static final int SLEEP = 1;
// 限制并发数
private static final int LIMIT = 3;
public static void main(String[] args) throws InterruptedException {
Server server = new Server();
Instant start = Instant.now();
// 测试任务数
int count = 10;
for (int i = 0; i < count; i++) {
int finalI = i;
POOL.submit(() -> {
server.handle(finalI);
});
}
POOL.shutdown();
// 保证任务能完成再退出主线程
POOL.awaitTermination(count * SLEEP / LIMIT + 10, TimeUnit.SECONDS);
System.out.println(Duration.between(start, Instant.now()).toSeconds());
}
static class Server {
private final Semaphore semaphore = new Semaphore(LIMIT);
void handle(int i) {
try {
semaphore.acquire();
Thread.sleep(SLEEP * 1000);
System.out.println("handle " + i);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release();
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
demo 中模拟同时有10个客户端请求,Server 限制并发数为 3,通过打印耗时,可以看到需要 4 秒才能处理完所有请求
Last Updated: 2024/04/23, 01:30:37