参考 RateLimiter 代码。
基本使用
1 2 3 4
| <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency>
|
1 2
| RateLimiter rateLimiter = RateLimiter.create(1); rateLimiter.acquire();
|
代码实现
抽取 SmoothBursty
限流器的关键代码,梳理基本的实现流程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| class RateLimiter {
private final double maxBurstSeconds;
private final double maxPermits;
private final double stableIntervalMicros;
private double storedPermits;
private long nextFreeTicketMicros = 0L;
RateLimiter(double permitsPerSecond) { this.maxBurstSeconds = 1.0; maxPermits = maxBurstSeconds * permitsPerSecond; stableIntervalMicros = TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond; }
public double acquire(int permits) { if (permits <= 0) { throw new IllegalArgumentException(String.format("Requested permits (%s) must be positive", permits)); } long microsToWait; synchronized (this) { Instant instant = Instant.now(); long nowMicros = instant.getEpochSecond() * 1_000_000 + instant.getNano() / 1000; long momentAvailable = reserveEarliestAvailable(permits, nowMicros); microsToWait = max(momentAvailable - nowMicros, 0); } LockSupport.parkNanos(microsToWait * 1000); return 1.0 * microsToWait / SECONDS.toMicros(1L); }
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { if (nowMicros > nextFreeTicketMicros) { double newPermits = (nowMicros - nextFreeTicketMicros) / stableIntervalMicros; storedPermits = min(maxPermits, storedPermits + newPermits); nextFreeTicketMicros = nowMicros; } long returnValue = nextFreeTicketMicros; double storedPermitsToSpend = min(requiredPermits, this.storedPermits); double freshPermits = requiredPermits - storedPermitsToSpend; long waitMicros = (long) (freshPermits * stableIntervalMicros); try { nextFreeTicketMicros = Math.addExact(nextFreeTicketMicros, waitMicros); } catch (ArithmeticException e) { nextFreeTicketMicros = Long.MAX_VALUE; } storedPermits -= storedPermitsToSpend; return returnValue; }
public static void main(String[] args) { RateLimiter rateLimiter = new RateLimiter(1); for (int i = 0; i < 10; i++) { double wait = rateLimiter.acquire(2); System.out.printf("等待时间: %fs, 剩余许可: %f\n", wait, rateLimiter.storedPermits); } } }
|
1 2 3 4 5 6 7 8
|
等待时间: 0.000000s, 剩余许可: 0.000000
等待时间: 0.967138s, 剩余许可: 0.000000
等待时间: 1.981604s, 剩余许可: 0.000000 等待时间: 1.989831s, 剩余许可: 0.000000
|