diff --git a/java/google/registry/proxy/quota/QuotaConfig.java b/java/google/registry/proxy/quota/QuotaConfig.java index 8b3475b44..97a1544a7 100644 --- a/java/google/registry/proxy/quota/QuotaConfig.java +++ b/java/google/registry/proxy/quota/QuotaConfig.java @@ -14,6 +14,8 @@ package google.registry.proxy.quota; +import static com.google.common.base.Preconditions.checkState; + import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import google.registry.proxy.ProxyConfig.Quota; @@ -23,6 +25,10 @@ import org.joda.time.Duration; /** Value class that stores the quota configuration for a protocol. */ public class QuotaConfig { + /** A special value of token amount that indicates unlimited tokens. */ + public static final int SENTINEL_UNLIMITED_TOKENS = -1; + + private final String protocolName; private final int refreshSeconds; private final QuotaGroup defaultQuota; private final ImmutableMap customQuotaMap; @@ -33,7 +39,8 @@ public class QuotaConfig { *

Each {@link QuotaGroup} is keyed to all the {@code userId}s it contains. This allows for * fast lookup with a {@code userId}. */ - public QuotaConfig(Quota quota) { + public QuotaConfig(Quota quota, String protocolName) { + this.protocolName = protocolName; refreshSeconds = quota.refreshSeconds; defaultQuota = quota.defaultQuota; ImmutableMap.Builder mapBuilder = new ImmutableMap.Builder<>(); @@ -47,18 +54,34 @@ public class QuotaConfig { return customQuotaMap.getOrDefault(userId, defaultQuota); } + /** + * Returns if the given user ID is provisioned with unlimited tokens. + * + *

This is configured by setting {@code tokenAmount} to {@code -1} in the config file. + */ + boolean hasUnlimitedTokens(String userId) { + return findQuotaGroup(userId).tokenAmount == SENTINEL_UNLIMITED_TOKENS; + } + /** Returns the token amount for the given {@code userId}. */ - public int getTokenAmount(String userId) { + int getTokenAmount(String userId) { + checkState( + !hasUnlimitedTokens(userId), "User ID %s is provisioned with unlimited tokens", userId); return findQuotaGroup(userId).tokenAmount; } /** Returns the refill period for the given {@code userId}. */ - public Duration getRefillPeriod(String userId) { + Duration getRefillPeriod(String userId) { return Duration.standardSeconds(findQuotaGroup(userId).refillSeconds); } /** Returns the refresh period for this quota config. */ - public Duration getRefreshPeriod() { + Duration getRefreshPeriod() { return Duration.standardSeconds(refreshSeconds); } + + /** Returns the name of the protocol for which this quota config is made. */ + String getProtocolName() { + return protocolName; + } } diff --git a/java/google/registry/proxy/quota/QuotaManager.java b/java/google/registry/proxy/quota/QuotaManager.java new file mode 100644 index 000000000..4cc69e6c5 --- /dev/null +++ b/java/google/registry/proxy/quota/QuotaManager.java @@ -0,0 +1,110 @@ +// Copyright 2017 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.proxy.quota; + +import com.google.auto.value.AutoValue; +import google.registry.proxy.quota.QuotaManager.QuotaResponse.Status; +import google.registry.proxy.quota.TokenStore.TimestampedInteger; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import javax.annotation.concurrent.ThreadSafe; +import org.joda.time.DateTime; + +/** + * A thread-safe quota manager that schedules background refresh if necessary. + * + *

This class abstracts away details about the {@link TokenStore}. It: + * + *

+ * + *

There should be one {@link QuotaManager} per protocol. + */ +@ThreadSafe +public class QuotaManager { + + @AutoValue + abstract static class QuotaRequest { + + static QuotaRequest create(String userId) { + return new AutoValue_QuotaManager_QuotaRequest(userId); + } + + abstract String userId(); + } + + @AutoValue + abstract static class QuotaResponse { + + enum Status { + SUCCESS, + FAILURE, + } + + static QuotaResponse create(Status status, String userId, DateTime grantedTokenRefillTime) { + return new AutoValue_QuotaManager_QuotaResponse(status, userId, grantedTokenRefillTime); + } + + abstract Status status(); + + abstract String userId(); + + abstract DateTime grantedTokenRefillTime(); + } + + @AutoValue + abstract static class QuotaRebate { + static QuotaRebate create(QuotaResponse response) { + return new AutoValue_QuotaManager_QuotaRebate( + response.userId(), response.grantedTokenRefillTime()); + } + + abstract String userId(); + + abstract DateTime grantedTokenRefillTime(); + } + + private final TokenStore tokenStore; + + private final ExecutorService backgroundExecutor; + + QuotaManager(TokenStore tokenStore, ExecutorService backgroundExecutor) { + this.tokenStore = tokenStore; + this.backgroundExecutor = backgroundExecutor; + tokenStore.scheduleRefresh(); + } + + /** Attempts to acquire requested quota, synchronously. */ + QuotaResponse acquireQuota(QuotaRequest request) { + TimestampedInteger tokens = tokenStore.take(request.userId()); + Status status = (tokens.value() == 0) ? Status.FAILURE : Status.SUCCESS; + return QuotaResponse.create(status, request.userId(), tokens.timestamp()); + } + + /** + * Returns granted quota to the token store, asynchronously. + * + * @return a {@link Future} representing the asynchronous task to return the quota. + */ + Future releaseQuota(QuotaRebate rebate) { + return backgroundExecutor.submit( + () -> tokenStore.put(rebate.userId(), rebate.grantedTokenRefillTime())); + } +} diff --git a/java/google/registry/proxy/quota/TokenStore.java b/java/google/registry/proxy/quota/TokenStore.java new file mode 100644 index 000000000..7d1a87644 --- /dev/null +++ b/java/google/registry/proxy/quota/TokenStore.java @@ -0,0 +1,222 @@ +// Copyright 2017 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.proxy.quota; + +import static google.registry.proxy.quota.QuotaConfig.SENTINEL_UNLIMITED_TOKENS; +import static java.lang.StrictMath.max; +import static java.lang.StrictMath.min; + +import com.google.auto.value.AutoValue; +import com.google.common.annotations.VisibleForTesting; +import google.registry.util.Clock; +import google.registry.util.FormattingLogger; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import javax.annotation.concurrent.ThreadSafe; +import org.joda.time.DateTime; +import org.joda.time.Duration; + +/** + * A thread-safe token store that supports concurrent {@link #take}, {@link #put}, and {@link + * #refresh} operations. + * + *

The tokens represent quota allocated to each user, which needs to be leased to the user upon + * connection and optionally returned to the store upon termination. Failure to acquire tokens + * results in quota fulfillment failure, leading to automatic connection termination. For details on + * tokens, see {@code config/default-config.yaml}. + * + *

The store also lazily refills tokens for a {@code userId} when a {@link #take} or a {@link + * #put} takes place. It also exposes a {@link #refresh} method that goes through each entry in the + * store and purges stale entries, in order to prevent the token store from growing too large. + * + *

There should be one token store for each protocol. + */ +@ThreadSafe +public class TokenStore { + + /** Value class representing a timestamped integer. */ + @AutoValue + abstract static class TimestampedInteger { + + static TimestampedInteger create(int value, DateTime timestamp) { + return new AutoValue_TokenStore_TimestampedInteger(value, timestamp); + } + + abstract int value(); + + abstract DateTime timestamp(); + } + + /** + * A wrapper to get around Java lambda's closure limitation. + * + *

Use the class to modify the value of a local variable captured by an lambda. + */ + private static class Wrapper { + T value; + } + + private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass(); + + /** A map of {@code userId} to available tokens, timestamped at last refill time. */ + private final ConcurrentHashMap tokensMap = new ConcurrentHashMap<>(); + + private final QuotaConfig config; + private final ScheduledExecutorService refreshExecutor; + private final Clock clock; + + TokenStore(QuotaConfig config, ScheduledExecutorService refreshExecutor, Clock clock) { + this.config = config; + this.refreshExecutor = refreshExecutor; + this.clock = clock; + } + + /** + * Attempts to take one token from the token store. + * + *

This method first check if the user already has an existing entry in the tokens map, and if + * that entry has been last refilled before the refill period. In either case it will reset the + * token amount to the allotted to the user. + * + *

The request can be partially fulfilled or all-or-nothing, meaning if there are fewer tokens + * available than requested, we can grant all available ones, or grant nothing, depending on the + * {@code partialGrant} parameter. + * + * @param userId the identifier of the user requesting the token. + * @return the number of token granted, timestamped at refill time of the pool of tokens from + * which the granted one is taken. + */ + TimestampedInteger take(String userId) { + Wrapper grantedToken = new Wrapper<>(); + tokensMap.compute( + userId, + (user, availableTokens) -> { + DateTime now = clock.nowUtc(); + int currentTokenCount; + DateTime refillTime; + // Checks if the user is provisioned with unlimited tokens. + if (config.hasUnlimitedTokens(user)) { + grantedToken.value = TimestampedInteger.create(1, now); + return TimestampedInteger.create(SENTINEL_UNLIMITED_TOKENS, now); + } + // Checks if the entry exists. + if (availableTokens == null + // Or if refill is enabled and the entry needs to be refilled. + || (!config.getRefillPeriod(user).isEqual(Duration.ZERO) + && !new Duration(availableTokens.timestamp(), now) + .isShorterThan(config.getRefillPeriod(user)))) { + currentTokenCount = config.getTokenAmount(user); + refillTime = now; + } else { + currentTokenCount = availableTokens.value(); + refillTime = availableTokens.timestamp(); + } + int newTokenCount = max(0, currentTokenCount - 1); + grantedToken.value = + TimestampedInteger.create(currentTokenCount - newTokenCount, refillTime); + return TimestampedInteger.create(newTokenCount, refillTime); + }); + return grantedToken.value; + } + + /** + * Attempts to return the granted token to the token store. + * + *

The method first check if a refill is needed, and do it accordingly. It then checks if the + * returned token are from the current pool (i. e. has the same refill timestamp as the current + * pool), and returns the token, capped at the allotted amount for the {@code userId}. + * + * @param userId the identifier of the user returning the token. + * @param returnedTokenRefillTime The refill time of the pool of tokens from which the returned + * one is taken from. + */ + void put(String userId, DateTime returnedTokenRefillTime) { + tokensMap.computeIfPresent( + userId, + (user, availableTokens) -> { + DateTime now = clock.nowUtc(); + int currentTokenCount = availableTokens.value(); + DateTime refillTime = availableTokens.timestamp(); + int newTokenCount; + // Check if quota is unlimited. + if (!config.hasUnlimitedTokens(userId)) { + // Check if refill is enabled and a refill is needed. + if (!config.getRefillPeriod(user).isEqual(Duration.ZERO) + && !new Duration(availableTokens.timestamp(), now) + .isShorterThan(config.getRefillPeriod(user))) { + currentTokenCount = config.getTokenAmount(user); + refillTime = now; + } + // If the returned token comes from the current pool, add it back, otherwise discard it. + newTokenCount = + returnedTokenRefillTime.equals(refillTime) + ? min(currentTokenCount + 1, config.getTokenAmount(userId)) + : currentTokenCount; + } else { + newTokenCount = SENTINEL_UNLIMITED_TOKENS; + } + return TimestampedInteger.create(newTokenCount, refillTime); + }); + } + + /** + * Refreshes the token store and deletes any entry that has not been refilled for longer than the + * refresh period. + * + *

Strictly speaking it should delete the entries that have not been updated (put, taken, + * refill) for longer than the refresh period. But the last update time is not recorded. Typically + * the refill period is much shorter than the refresh period, so the last refill time should serve + * as a good proxy for last update time as the actual update time cannot be one refill period + * later from the refill time, otherwise another refill would have been performed. + */ + void refresh() { + tokensMap.forEach( + (user, availableTokens) -> { + if (!new Duration(availableTokens.timestamp(), clock.nowUtc()) + .isShorterThan(config.getRefreshPeriod())) { + tokensMap.remove(user); + } + }); + } + + /** Schedules token store refresh if enabled. */ + void scheduleRefresh() { + // Only schedule refresh if the refresh period is not zero. + if (!config.getRefreshPeriod().isEqual(Duration.ZERO)) { + Future unusedFuture = + refreshExecutor.scheduleWithFixedDelay( + () -> { + refresh(); + logger.infofmt("Refreshing quota for protocol %s", config.getProtocolName()); + }, + config.getRefreshPeriod().getStandardSeconds(), + config.getRefreshPeriod().getStandardSeconds(), + TimeUnit.SECONDS); + } + } + + /** + * Helper method to retrieve the timestamped token value for a {@code userId} for testing. + * + *

This non-mutating method is exposed solely for testing, so that the {@link #tokensMap} can + * stay private and not be altered unintentionally. + */ + @VisibleForTesting + TimestampedInteger getTokenForTests(String userId) { + return tokensMap.get(userId); + } +} diff --git a/javatests/google/registry/proxy/quota/QuotaConfigTest.java b/javatests/google/registry/proxy/quota/QuotaConfigTest.java index b6ca81bfc..6c6c57f10 100644 --- a/javatests/google/registry/proxy/quota/QuotaConfigTest.java +++ b/javatests/google/registry/proxy/quota/QuotaConfigTest.java @@ -34,13 +34,16 @@ public class QuotaConfigTest { private static QuotaConfig loadQuotaConfig(String filename) { return new QuotaConfig( new Yaml() - .loadAs(readResourceUtf8(QuotaConfigTest.class, "testdata/" + filename), Quota.class)); + .loadAs(readResourceUtf8(QuotaConfigTest.class, "testdata/" + filename), Quota.class), + "theProtocol"); } private void validateQuota(String userId, int tokenAmount, int refillSeconds) { + assertThat(quotaConfig.hasUnlimitedTokens(userId)).isFalse(); assertThat(quotaConfig.getTokenAmount(userId)).isEqualTo(tokenAmount); assertThat(quotaConfig.getRefillPeriod(userId)) .isEqualTo(Duration.standardSeconds(refillSeconds)); + assertThat(quotaConfig.getProtocolName()).isEqualTo("theProtocol"); } @Test @@ -61,6 +64,24 @@ public class QuotaConfigTest { validateQuota("no_match", 100, 60); } + @Test + public void testSuccess_noRefresh_noRefill() { + quotaConfig = loadQuotaConfig("quota_config_no_refresh_no_refill.yaml"); + assertThat(quotaConfig.getRefreshPeriod()).isEqualTo(Duration.ZERO); + assertThat(quotaConfig.getRefillPeriod("no_match")).isEqualTo(Duration.ZERO); + } + + @Test + public void testFailure_getTokenAmount_throwsOnUnlimitedTokens() { + quotaConfig = loadQuotaConfig("quota_config_unlimited_tokens.yaml"); + assertThat(quotaConfig.hasUnlimitedTokens("some_user")).isTrue(); + IllegalStateException e = + expectThrows(IllegalStateException.class, () -> quotaConfig.getTokenAmount("some_user")); + assertThat(e) + .hasMessageThat() + .contains("User ID some_user is provisioned with unlimited tokens"); + } + @Test public void testFailure_duplicateUserId() { IllegalArgumentException e = diff --git a/javatests/google/registry/proxy/quota/QuotaManagerTest.java b/javatests/google/registry/proxy/quota/QuotaManagerTest.java new file mode 100644 index 000000000..a2c3164d7 --- /dev/null +++ b/javatests/google/registry/proxy/quota/QuotaManagerTest.java @@ -0,0 +1,85 @@ +// Copyright 2017 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.proxy.quota; + +import static com.google.common.truth.Truth.assertThat; +import static google.registry.proxy.quota.QuotaManager.QuotaResponse.Status.FAILURE; +import static google.registry.proxy.quota.QuotaManager.QuotaResponse.Status.SUCCESS; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.google.common.util.concurrent.MoreExecutors; +import google.registry.proxy.quota.QuotaManager.QuotaRebate; +import google.registry.proxy.quota.QuotaManager.QuotaRequest; +import google.registry.proxy.quota.QuotaManager.QuotaResponse; +import google.registry.proxy.quota.TokenStore.TimestampedInteger; +import google.registry.testing.FakeClock; +import java.util.concurrent.Future; +import org.joda.time.DateTime; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link QuotaManager}. */ +@RunWith(JUnit4.class) +public class QuotaManagerTest { + + private static final String USER_ID = "theUser"; + + private final TokenStore tokenStore = mock(TokenStore.class); + private final FakeClock clock = new FakeClock(); + + private QuotaManager quotaManager = + new QuotaManager(tokenStore, MoreExecutors.newDirectExecutorService()); + private QuotaRequest request; + private QuotaResponse response; + private QuotaRebate rebate; + + @Test + public void testSuccess_requestApproved() { + when(tokenStore.take(anyString())).thenReturn(TimestampedInteger.create(1, clock.nowUtc())); + + request = QuotaRequest.create(USER_ID); + response = quotaManager.acquireQuota(request); + assertThat(response.status()).isEqualTo(SUCCESS); + assertThat(response.userId()).isEqualTo(USER_ID); + assertThat(response.grantedTokenRefillTime()).isEqualTo(clock.nowUtc()); + } + + @Test + public void testSuccess_requestDenied() { + when(tokenStore.take(anyString())).thenReturn(TimestampedInteger.create(0, clock.nowUtc())); + + request = QuotaRequest.create(USER_ID); + response = quotaManager.acquireQuota(request); + assertThat(response.status()).isEqualTo(FAILURE); + assertThat(response.userId()).isEqualTo(USER_ID); + assertThat(response.grantedTokenRefillTime()).isEqualTo(clock.nowUtc()); + } + + @Test + public void testSuccess_rebate() throws Exception { + DateTime grantedTokenRefillTime = clock.nowUtc(); + response = QuotaResponse.create(SUCCESS, USER_ID, grantedTokenRefillTime); + rebate = QuotaRebate.create(response); + Future unusedFuture = quotaManager.releaseQuota(rebate); + verify(tokenStore).scheduleRefresh(); + verify(tokenStore).put(USER_ID, grantedTokenRefillTime); + verifyNoMoreInteractions(tokenStore); + } +} diff --git a/javatests/google/registry/proxy/quota/TokenStoreTest.java b/javatests/google/registry/proxy/quota/TokenStoreTest.java new file mode 100644 index 000000000..b7daf263a --- /dev/null +++ b/javatests/google/registry/proxy/quota/TokenStoreTest.java @@ -0,0 +1,315 @@ +// Copyright 2017 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.proxy.quota; + +import static com.google.common.truth.Truth.assertThat; +import static google.registry.proxy.quota.QuotaConfig.SENTINEL_UNLIMITED_TOKENS; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import google.registry.proxy.quota.TokenStore.TimestampedInteger; +import google.registry.testing.FakeClock; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; + +/** Unit tests for {@link TokenStore}. */ +@RunWith(JUnit4.class) +public class TokenStoreTest { + + private final QuotaConfig quotaConfig = mock(QuotaConfig.class); + private final FakeClock clock = new FakeClock(); + private final ScheduledExecutorService refreshExecutor = mock(ScheduledExecutorService.class); + private final TokenStore tokenStore = spy(new TokenStore(quotaConfig, refreshExecutor, clock)); + private final String user = "theUser"; + private final String otherUser = "theOtherUser"; + + private DateTime assertTake(int grantAmount, int amountLeft, DateTime timestamp) { + return assertTake(user, grantAmount, amountLeft, timestamp); + } + + private DateTime assertTake(String user, int grantAmount, int amountLeft, DateTime timestamp) { + TimestampedInteger grantedToken = tokenStore.take(user); + assertThat(grantedToken).isEqualTo(TimestampedInteger.create(grantAmount, timestamp)); + assertThat(tokenStore.getTokenForTests(user)) + .isEqualTo(TimestampedInteger.create(amountLeft, timestamp)); + return grantedToken.timestamp(); + } + + private void assertPut( + DateTime returnedTokenRefillTime, int amountAfterReturn, DateTime refillTime) { + assertPut(user, returnedTokenRefillTime, amountAfterReturn, refillTime); + } + + private void assertPut( + String user, DateTime returnedTokenRefillTime, int amountAfterReturn, DateTime refillTime) { + tokenStore.put(user, returnedTokenRefillTime); + assertThat(tokenStore.getTokenForTests(user)) + .isEqualTo(TimestampedInteger.create(amountAfterReturn, refillTime)); + } + + private void submitAndWaitForTasks(ExecutorService executor, Runnable... tasks) { + List> futures = new ArrayList<>(); + for (Runnable task : tasks) { + futures.add(executor.submit(task)); + } + futures.forEach( + f -> { + try { + f.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Before + public void setUp() { + when(quotaConfig.getRefreshPeriod()).thenReturn(Duration.standardSeconds(60)); + when(quotaConfig.getRefillPeriod(user)).thenReturn(Duration.standardSeconds(10)); + when(quotaConfig.getTokenAmount(user)).thenReturn(3); + when(quotaConfig.getRefillPeriod(otherUser)).thenReturn(Duration.standardSeconds(15)); + when(quotaConfig.getTokenAmount(otherUser)).thenReturn(5); + } + + @Test + public void testSuccess_take() { + // Take 3 tokens one by one. + DateTime refillTime = clock.nowUtc(); + assertTake(1, 2, refillTime); + assertTake(1, 1, refillTime); + clock.advanceBy(Duration.standardSeconds(2)); + assertTake(1, 0, refillTime); + + // Take 1 token, not enough tokens left. + clock.advanceBy(Duration.standardSeconds(3)); + assertTake(0, 0, refillTime); + + // Refill period passed. Take 1 token - success. + clock.advanceBy(Duration.standardSeconds(6)); + refillTime = clock.nowUtc(); + assertTake(1, 2, refillTime); + } + + @Test + public void testSuccess_put_entryDoesNotExist() { + tokenStore.put(user, clock.nowUtc()); + assertThat(tokenStore.getTokenForTests(user)).isNull(); + } + + @Test + public void testSuccess_put() { + DateTime refillTime = clock.nowUtc(); + + // Initialize the entry. + DateTime grantedTokenRefillTime = assertTake(1, 2, refillTime); + + // Put into full bucket. + assertPut(grantedTokenRefillTime, 3, refillTime); + assertPut(grantedTokenRefillTime, 3, refillTime); + + clock.advanceBy(Duration.standardSeconds(3)); + + // Take 1 token out, put 1 back in. + assertTake(1, 2, refillTime); + assertPut(refillTime, 3, refillTime); + + // Do not put old token back. + grantedTokenRefillTime = assertTake(1, 2, refillTime); + clock.advanceBy(Duration.standardSeconds(11)); + refillTime = clock.nowUtc(); + assertPut(grantedTokenRefillTime, 3, refillTime); + } + + @Test + public void testSuccess_takeAndPut() { + DateTime refillTime = clock.nowUtc(); + + // Take 1 token. + DateTime grantedTokenRefillTime1 = assertTake(1, 2, refillTime); + + // Take 1 token. + DateTime grantedTokenRefillTime2 = assertTake(1, 1, refillTime); + + // Return first token. + clock.advanceBy(Duration.standardSeconds(2)); + assertPut(grantedTokenRefillTime1, 2, refillTime); + + // Refill time passed, second returned token discarded. + clock.advanceBy(Duration.standardSeconds(10)); + refillTime = clock.nowUtc(); + assertPut(grantedTokenRefillTime2, 3, refillTime); + } + + @Test + public void testSuccess_multipleUsers() { + DateTime refillTime1 = clock.nowUtc(); + DateTime refillTime2 = clock.nowUtc(); + + // Take 1 from first user. + DateTime grantedTokenRefillTime1 = assertTake(user, 1, 2, refillTime1); + + // Take 1 from second user. + DateTime grantedTokenRefillTime2 = assertTake(otherUser, 1, 4, refillTime2); + assertTake(otherUser, 1, 3, refillTime2); + assertTake(otherUser, 1, 2, refillTime2); + + // first user tokens refilled. + clock.advanceBy(Duration.standardSeconds(10)); + refillTime1 = clock.nowUtc(); + DateTime grantedTokenRefillTime3 = assertTake(user, 1, 2, refillTime1); + DateTime grantedTokenRefillTime4 = assertTake(otherUser, 1, 1, refillTime2); + assertPut(user, grantedTokenRefillTime1, 2, refillTime1); + assertPut(otherUser, grantedTokenRefillTime2, 2, refillTime2); + + // second user tokens refilled. + clock.advanceBy(Duration.standardSeconds(5)); + refillTime2 = clock.nowUtc(); + assertPut(user, grantedTokenRefillTime3, 3, refillTime1); + assertPut(otherUser, grantedTokenRefillTime4, 5, refillTime2); + } + + @Test + public void testSuccess_refresh() { + DateTime refillTime1 = clock.nowUtc(); + assertTake(user, 1, 2, refillTime1); + + clock.advanceBy(Duration.standardSeconds(5)); + DateTime refillTime2 = clock.nowUtc(); + assertTake(otherUser, 1, 4, refillTime2); + + clock.advanceBy(Duration.standardSeconds(55)); + + // Entry for user is 60s old, entry for otherUser is 55s old. + tokenStore.refresh(); + assertThat(tokenStore.getTokenForTests(user)).isNull(); + assertThat(tokenStore.getTokenForTests(otherUser)) + .isEqualTo(TimestampedInteger.create(4, refillTime2)); + } + + @Test + public void testSuccess_unlimitedQuota() { + when(quotaConfig.hasUnlimitedTokens(user)).thenReturn(true); + for (int i = 0; i < 10000; ++i) { + assertTake(1, SENTINEL_UNLIMITED_TOKENS, clock.nowUtc()); + } + for (int i = 0; i < 10000; ++i) { + assertPut(clock.nowUtc(), SENTINEL_UNLIMITED_TOKENS, clock.nowUtc()); + } + } + + @Test + public void testSuccess_noRefill() { + when(quotaConfig.getRefillPeriod(user)).thenReturn(Duration.ZERO); + DateTime refillTime = clock.nowUtc(); + assertTake(1, 2, refillTime); + assertTake(1, 1, refillTime); + assertTake(1, 0, refillTime); + clock.advanceBy(Duration.standardDays(365)); + assertTake(0, 0, refillTime); + } + + @Test + public void testSuccess_noRefresh() { + when(quotaConfig.getRefreshPeriod()).thenReturn(Duration.ZERO); + DateTime refillTime = clock.nowUtc(); + assertTake(1, 2, refillTime); + clock.advanceBy(Duration.standardDays(365)); + assertThat(tokenStore.getTokenForTests(user)) + .isEqualTo(TimestampedInteger.create(2, refillTime)); + } + + @Test + public void testSuccess_concurrency() throws Exception { + ExecutorService executor = Executors.newWorkStealingPool(); + final DateTime time1 = clock.nowUtc(); + submitAndWaitForTasks( + executor, + () -> tokenStore.take(user), + () -> tokenStore.take(otherUser), + () -> tokenStore.take(user), + () -> tokenStore.take(otherUser)); + assertThat(tokenStore.getTokenForTests(user)).isEqualTo(TimestampedInteger.create(1, time1)); + assertThat(tokenStore.getTokenForTests(otherUser)) + .isEqualTo(TimestampedInteger.create(3, time1)); + + // No refill. + clock.advanceBy(Duration.standardSeconds(5)); + submitAndWaitForTasks( + executor, () -> tokenStore.take(user), () -> tokenStore.put(otherUser, time1)); + assertThat(tokenStore.getTokenForTests(user)).isEqualTo(TimestampedInteger.create(0, time1)); + assertThat(tokenStore.getTokenForTests(otherUser)) + .isEqualTo(TimestampedInteger.create(4, time1)); + + // First user refill. + clock.advanceBy(Duration.standardSeconds(5)); + final DateTime time2 = clock.nowUtc(); + submitAndWaitForTasks( + executor, + () -> { + tokenStore.put(user, time1); + tokenStore.take(user); + }, + () -> tokenStore.take(otherUser)); + assertThat(tokenStore.getTokenForTests(user)).isEqualTo(TimestampedInteger.create(2, time2)); + assertThat(tokenStore.getTokenForTests(otherUser)) + .isEqualTo(TimestampedInteger.create(3, time1)); + + // Second user refill. + clock.advanceBy(Duration.standardSeconds(5)); + final DateTime time3 = clock.nowUtc(); + submitAndWaitForTasks( + executor, + () -> tokenStore.take(user), + () -> { + tokenStore.put(otherUser, time1); + tokenStore.take(otherUser); + }); + assertThat(tokenStore.getTokenForTests(user)).isEqualTo(TimestampedInteger.create(1, time2)); + assertThat(tokenStore.getTokenForTests(otherUser)) + .isEqualTo(TimestampedInteger.create(4, time3)); + } + + @Test + public void testSuccess_scheduleRefresh() throws Exception { + when(quotaConfig.getRefreshPeriod()).thenReturn(Duration.standardSeconds(5)); + + tokenStore.scheduleRefresh(); + + // Verify that a task is scheduled. + ArgumentCaptor argument = ArgumentCaptor.forClass(Runnable.class); + verify(refreshExecutor) + .scheduleWithFixedDelay( + argument.capture(), eq((long) 5), eq((long) 5), eq(TimeUnit.SECONDS)); + + // Verify that the scheduled task calls TokenStore.refresh(). + argument.getValue().run(); + verify(tokenStore).refresh(); + } +} diff --git a/javatests/google/registry/proxy/quota/testdata/quota_config_no_refresh_no_refill.yaml b/javatests/google/registry/proxy/quota/testdata/quota_config_no_refresh_no_refill.yaml new file mode 100644 index 000000000..9d244b1a0 --- /dev/null +++ b/javatests/google/registry/proxy/quota/testdata/quota_config_no_refresh_no_refill.yaml @@ -0,0 +1,8 @@ +refreshSeconds: 0 + +defaultQuota: + userId: [] + tokenAmount: 100 + refillSeconds: 0 + +customQuota: [] diff --git a/javatests/google/registry/proxy/quota/testdata/quota_config_unlimited_tokens.yaml b/javatests/google/registry/proxy/quota/testdata/quota_config_unlimited_tokens.yaml new file mode 100644 index 000000000..c292188dc --- /dev/null +++ b/javatests/google/registry/proxy/quota/testdata/quota_config_unlimited_tokens.yaml @@ -0,0 +1,8 @@ +refreshSeconds: 3600 + +defaultQuota: + userId: [] + tokenAmount: -1 + refillSeconds: 60 + +customQuota: []