mirror of
https://github.com/google/nomulus.git
synced 2025-04-30 12:07:51 +02:00
Remove sharding parameter from RegistryJpaIO (#1856)
This parameter is misleading and does not do what it purports to do. Namely, it does not impact the level of parallelism. Given the input n for this parameter, and m for the batch size, the elements are divided (keyed) into n groups, each of which are then spread evenly across all threads, which are eventually in turn batched into batches with size m: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L227 This is also evident in the implementation itself, where the ShardedKey is determined by the unique number for a worker/thread combo and the original key: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L268 Using a more concrete example, suppose we have 100 elements and 10 worker threads, with a target batch size of 5. If the "shard" number is set to 1, we first spread the 100 elements across 10 threads, resulting in 10 elements per thread, each thread then batches the elements into 2 batches of size 5. If the "shard" number is set to 2, the 100 elements are first divided into 2 "shards" of 50 each. Each "shard" is then distributed within the 10 threads, resulting in 5 elements per "shard" per thread. They then get turned into 1 batch per "shard" per thread. In the end, each thread still processes 2 batches, even though they are from 2 different "shards". Therefore this "shard" number does not perform horizontal partitioning that one normally associates with sharding, and provides no performance benefits but rather confuses the user. It is also suggested that using withShardedKey() alone is already sufficient to achieve auto-sharding within the keyed group. There is no need to manually divide the input by keying them differently based on the "shard" number specified: https://youtu.be/jses0W4Zalc?t=967
This commit is contained in:
parent
d8a835cf43
commit
2a222ca935
7 changed files with 6 additions and 75 deletions
|
@ -26,7 +26,6 @@ import google.registry.persistence.transaction.JpaTransactionManager;
|
|||
import google.registry.persistence.transaction.TransactionManagerFactory;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import javax.annotation.Nullable;
|
||||
import javax.persistence.criteria.CriteriaQuery;
|
||||
import org.apache.beam.sdk.coders.Coder;
|
||||
|
@ -263,38 +262,11 @@ public final class RegistryJpaIO {
|
|||
|
||||
public static final int DEFAULT_BATCH_SIZE = 1;
|
||||
|
||||
/** The default number of write shard. Please refer to {@link #shards} for more information. */
|
||||
public static final int DEFAULT_SHARDS = 1;
|
||||
|
||||
public abstract String name();
|
||||
|
||||
/** Number of elements to be written in one call. */
|
||||
public abstract int batchSize();
|
||||
|
||||
/**
|
||||
* The number of shards the output should be split into.
|
||||
*
|
||||
* <p>This value is a hint to the pipeline runner on the level of parallelism, and should be
|
||||
* significantly greater than the number of threads working on this transformation (see next
|
||||
* paragraph for more information). On the other hand, it should not be too large to the point
|
||||
* that the number of elements per shard is lower than {@link #batchSize()}. As a rule of thumb,
|
||||
* the following constraint should hold: {@code shards * batchSize * nThreads <=
|
||||
* inputElementCount}. Although it is not always possible to determine the number of threads
|
||||
* working on this transform, when the pipeline run is IO-bound, it most likely is close to the
|
||||
* total number of threads in the pipeline, which is explained below.
|
||||
*
|
||||
* <p>With Cloud Dataflow runner, the total number of worker threads in a batch pipeline (which
|
||||
* includes all existing Registry pipelines) is the number of vCPUs used by the pipeline, and
|
||||
* can be set by the {@code --maxNumWorkers} and {@code --workerMachineType} parameters. The
|
||||
* number of worker threads in a streaming pipeline can be set by the {@code --maxNumWorkers}
|
||||
* and {@code --numberOfWorkerHarnessThreads} parameters.
|
||||
*
|
||||
* <p>Note that connections on the database server are a limited resource, therefore the number
|
||||
* of threads that interact with the database should be set to an appropriate limit. Again, we
|
||||
* cannot control this number, but can influence it by controlling the total number of threads.
|
||||
*/
|
||||
public abstract int shards();
|
||||
|
||||
public abstract SerializableFunction<T, Object> jpaConverter();
|
||||
|
||||
public Write<T> withName(String name) {
|
||||
|
@ -305,10 +277,6 @@ public final class RegistryJpaIO {
|
|||
return toBuilder().batchSize(batchSize).build();
|
||||
}
|
||||
|
||||
public Write<T> withShards(int shards) {
|
||||
return toBuilder().shards(shards).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* An optional function that converts the input entities to a form that can be written into the
|
||||
* database.
|
||||
|
@ -322,10 +290,7 @@ public final class RegistryJpaIO {
|
|||
@Override
|
||||
public PCollection<Void> expand(PCollection<T> input) {
|
||||
return input
|
||||
.apply(
|
||||
"Shard data " + name(),
|
||||
WithKeys.<Integer, T>of(e -> ThreadLocalRandom.current().nextInt(shards()))
|
||||
.withKeyType(integers()))
|
||||
.apply("Add key to data " + name(), WithKeys.<Integer, T>of(0).withKeyType(integers()))
|
||||
// The call to withShardedKey() is performance critical. The resulting transform ensures
|
||||
// that data is spread evenly across all worker threads.
|
||||
.apply(
|
||||
|
@ -340,7 +305,6 @@ public final class RegistryJpaIO {
|
|||
return new AutoValue_RegistryJpaIO_Write.Builder<T>()
|
||||
.name(DEFAULT_NAME)
|
||||
.batchSize(DEFAULT_BATCH_SIZE)
|
||||
.shards(DEFAULT_SHARDS)
|
||||
.jpaConverter(x -> x);
|
||||
}
|
||||
|
||||
|
@ -351,8 +315,6 @@ public final class RegistryJpaIO {
|
|||
|
||||
abstract Builder<T> batchSize(int batchSize);
|
||||
|
||||
abstract Builder<T> shards(int jdbcNumConnsHint);
|
||||
|
||||
abstract Builder<T> jpaConverter(SerializableFunction<T, Object> jpaConverter);
|
||||
|
||||
abstract Write<T> build();
|
||||
|
|
|
@ -14,7 +14,6 @@
|
|||
|
||||
package google.registry.beam.common;
|
||||
|
||||
import google.registry.beam.common.RegistryJpaIO.Write;
|
||||
import google.registry.config.RegistryEnvironment;
|
||||
import google.registry.model.annotations.DeleteAfterMigration;
|
||||
import google.registry.persistence.PersistenceModule.JpaTransactionManagerType;
|
||||
|
@ -29,9 +28,9 @@ import org.apache.beam.sdk.options.Description;
|
|||
* Defines Nomulus-specific pipeline options, e.g. JPA configurations.
|
||||
*
|
||||
* <p>When using the Cloud Dataflow runner, users are recommended to set an upper bound on active
|
||||
* database connections by setting the pipeline worker options including {@code --maxNumWorkers},
|
||||
* {@code workerMachineType}, and {@code numberOfWorkerHarnessThreads}. Please refer to {@link
|
||||
* Write#shards()} for more information.
|
||||
* database connections by setting the max number of pipeline worker threads using {@code
|
||||
* --maxNumWorkers} and {@code workerMachineType} for batch pipelines, or {@code --maxNumWorkers}
|
||||
* and {@code --numberOfWorkerHarnessThreads} for streaming pipelines.
|
||||
*/
|
||||
public interface RegistryPipelineOptions extends GcpOptions {
|
||||
|
||||
|
@ -58,14 +57,6 @@ public interface RegistryPipelineOptions extends GcpOptions {
|
|||
|
||||
void setSqlWriteBatchSize(int sqlWriteBatchSize);
|
||||
|
||||
@Description(
|
||||
"Number of shards to create out of the data before writing to the SQL database. Please refer "
|
||||
+ "to the Javadoc of RegistryJpaIO.Write.shards() for how to choose this value.")
|
||||
@Default.Integer(100)
|
||||
int getSqlWriteShards();
|
||||
|
||||
void setSqlWriteShards(int maxConcurrentSqlWriters);
|
||||
|
||||
@DeleteAfterMigration
|
||||
@Description(
|
||||
"Whether to use self allocated primary IDs when building entities. This should only be used"
|
||||
|
|
|
@ -33,7 +33,6 @@ import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
|
|||
import google.registry.persistence.VKey;
|
||||
import google.registry.util.DateTimeUtils;
|
||||
import java.io.Serializable;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import org.apache.beam.sdk.Pipeline;
|
||||
import org.apache.beam.sdk.PipelineResult;
|
||||
import org.apache.beam.sdk.coders.StringUtf8Coder;
|
||||
|
@ -143,15 +142,13 @@ public class ResaveAllEppResourcesPipeline implements Serializable {
|
|||
/** Projects and re-saves all resources with repo IDs provided by the {@link Read}. */
|
||||
private <T extends EppResource> void projectAndResaveResources(
|
||||
Pipeline pipeline, Class<T> clazz, Read<?, String> repoIdRead) {
|
||||
int numShards = options.getSqlWriteShards();
|
||||
int batchSize = options.getSqlWriteBatchSize();
|
||||
String className = clazz.getSimpleName();
|
||||
pipeline
|
||||
.apply("Read " + className, repoIdRead)
|
||||
.apply(
|
||||
"Shard data for class" + className,
|
||||
WithKeys.<Integer, String>of(e -> ThreadLocalRandom.current().nextInt(numShards))
|
||||
.withKeyType(integers()))
|
||||
WithKeys.<Integer, String>of(0).withKeyType(integers()))
|
||||
.apply(
|
||||
"Group into batches for class" + className,
|
||||
GroupIntoBatches.<Integer, String>ofSize(batchSize).withShardedKey())
|
||||
|
|
|
@ -162,7 +162,6 @@ public class Spec11Pipeline implements Serializable {
|
|||
RegistryJpaIO.<KV<DomainNameInfo, ThreatMatch>>write()
|
||||
.withName(transformId)
|
||||
.withBatchSize(options.getSqlWriteBatchSize())
|
||||
.withShards(options.getSqlWriteShards())
|
||||
.withJpaConverter(
|
||||
(kv) -> {
|
||||
DomainNameInfo domainNameInfo = kv.getKey();
|
||||
|
|
|
@ -29,15 +29,6 @@
|
|||
"^[1-9][0-9]*$"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "sqlWriteShards",
|
||||
"label": "Number of output shards to create when writing to SQL.",
|
||||
"helpText": "Number of shards to create out of the data before writing to the SQL database. Please refer to the Javadoc of RegistryJpaIO.Write.shards() for how to choose this value.",
|
||||
"is_optional": true,
|
||||
"regexes": [
|
||||
"^[1-9][0-9]*$"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "yearMonth",
|
||||
"label": "The year and month we generate invoice and detailed reports for.",
|
||||
|
|
|
@ -29,15 +29,6 @@
|
|||
"^[1-9][0-9]*$"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "sqlWriteShards",
|
||||
"label": "Number of output shards to create when writing to SQL.",
|
||||
"helpText": "Number of shards to create out of the data before writing to the SQL database. Please refer to the Javadoc of RegistryJpaIO.Write.shards() for how to choose this value.",
|
||||
"is_optional": true,
|
||||
"regexes": [
|
||||
"^[1-9][0-9]*$"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "date",
|
||||
"label": "The date when the pipeline runs",
|
||||
|
|
|
@ -59,7 +59,7 @@ class RegistryJpaWriteTest implements Serializable {
|
|||
ImmutableList<Contact> contacts = contactsBuilder.build();
|
||||
testPipeline
|
||||
.apply(Create.of(contacts))
|
||||
.apply(RegistryJpaIO.<Contact>write().withName("Contact").withBatchSize(4).withShards(2));
|
||||
.apply(RegistryJpaIO.<Contact>write().withName("Contact").withBatchSize(4));
|
||||
testPipeline.run().waitUntilFinish();
|
||||
|
||||
assertThat(loadAllOf(Contact.class))
|
||||
|
|
Loading…
Add table
Reference in a new issue