Add VKey workaround to spec11 pipeline (#1339)

* Add VKey workaround to spec11 pipeline

* Parallelize entity loading
This commit is contained in:
sarahcaseybot 2021-10-01 15:21:16 -04:00 committed by GitHub
parent a4f4aa6b5f
commit 7cbc27af30

View file

@ -16,12 +16,14 @@ package google.registry.beam.spec11;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static google.registry.beam.BeamUtils.getQueryFromFile; import static google.registry.beam.BeamUtils.getQueryFromFile;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import com.google.auto.value.AutoValue; import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import dagger.Component; import dagger.Component;
import dagger.Module; import dagger.Module;
import dagger.Provides; import dagger.Provides;
import google.registry.backup.AppEngineEnvironment;
import google.registry.beam.common.RegistryJpaIO; import google.registry.beam.common.RegistryJpaIO;
import google.registry.beam.common.RegistryJpaIO.Read; import google.registry.beam.common.RegistryJpaIO.Read;
import google.registry.beam.spec11.SafeBrowsingTransforms.EvaluateSafeBrowsingFn; import google.registry.beam.spec11.SafeBrowsingTransforms.EvaluateSafeBrowsingFn;
@ -30,6 +32,7 @@ import google.registry.model.domain.DomainBase;
import google.registry.model.reporting.Spec11ThreatMatch; import google.registry.model.reporting.Spec11ThreatMatch;
import google.registry.model.reporting.Spec11ThreatMatch.ThreatType; import google.registry.model.reporting.Spec11ThreatMatch.ThreatType;
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
import google.registry.persistence.VKey;
import google.registry.util.Retrier; import google.registry.util.Retrier;
import google.registry.util.SqlTemplate; import google.registry.util.SqlTemplate;
import google.registry.util.UtilsModule; import google.registry.util.UtilsModule;
@ -41,6 +44,8 @@ import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ParDo;
@ -113,15 +118,45 @@ public class Spec11Pipeline implements Serializable {
} }
static PCollection<DomainNameInfo> readFromCloudSql(Pipeline pipeline) { static PCollection<DomainNameInfo> readFromCloudSql(Pipeline pipeline) {
Read<Object[], DomainNameInfo> read = Read<Object[], KV<String, String>> read =
RegistryJpaIO.read( RegistryJpaIO.read(
"select d, r.emailAddress from Domain d join Registrar r on" "select d.repoId, r.emailAddress from Domain d join Registrar r on"
+ " d.currentSponsorClientId = r.clientIdentifier where r.type = 'REAL'" + " d.currentSponsorClientId = r.clientIdentifier where r.type = 'REAL' and"
+ " and d.deletionTime > now()", + " d.deletionTime > now()",
false, false,
Spec11Pipeline::parseRow); Spec11Pipeline::parseRow);
return pipeline.apply("Read active domains from Cloud SQL", read); return pipeline
.apply("Read active domains from Cloud SQL", read)
.apply(
"Build DomainNameInfo",
ParDo.of(
new DoFn<KV<String, String>, DomainNameInfo>() {
@ProcessElement
public void processElement(
@Element KV<String, String> input, OutputReceiver<DomainNameInfo> output) {
try (AppEngineEnvironment allowOfyEntity = new AppEngineEnvironment()) {
DomainBase domainBase =
jpaTm()
.transact(
() ->
jpaTm()
.loadByKey(
VKey.createSql(DomainBase.class, input.getKey())));
String emailAddress = input.getValue();
if (emailAddress == null) {
emailAddress = "";
}
DomainNameInfo domainNameInfo =
DomainNameInfo.create(
domainBase.getDomainName(),
domainBase.getRepoId(),
domainBase.getCurrentSponsorRegistrarId(),
emailAddress);
output.output(domainNameInfo);
}
}
}));
} }
static PCollection<DomainNameInfo> readFromBigQuery( static PCollection<DomainNameInfo> readFromBigQuery(
@ -142,17 +177,8 @@ public class Spec11Pipeline implements Serializable {
.withTemplateCompatibility()); .withTemplateCompatibility());
} }
private static DomainNameInfo parseRow(Object[] row) { private static KV<String, String> parseRow(Object[] row) {
DomainBase domainBase = (DomainBase) row[0]; return KV.of((String) row[0], (String) row[1]);
String emailAddress = (String) row[1];
if (emailAddress == null) {
emailAddress = "";
}
return DomainNameInfo.create(
domainBase.getDomainName(),
domainBase.getRepoId(),
domainBase.getCurrentSponsorRegistrarId(),
emailAddress);
} }
static void saveToSql( static void saveToSql(