diff --git a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java index fe40932d6..76f178e7b 100644 --- a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java +++ b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java @@ -16,12 +16,14 @@ package google.registry.beam.spec11; import static com.google.common.base.Preconditions.checkArgument; import static google.registry.beam.BeamUtils.getQueryFromFile; +import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import com.google.auto.value.AutoValue; import com.google.common.collect.ImmutableSet; import dagger.Component; import dagger.Module; import dagger.Provides; +import google.registry.backup.AppEngineEnvironment; import google.registry.beam.common.RegistryJpaIO; import google.registry.beam.common.RegistryJpaIO.Read; import google.registry.beam.spec11.SafeBrowsingTransforms.EvaluateSafeBrowsingFn; @@ -30,6 +32,7 @@ import google.registry.model.domain.DomainBase; import google.registry.model.reporting.Spec11ThreatMatch; import google.registry.model.reporting.Spec11ThreatMatch.ThreatType; import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; +import google.registry.persistence.VKey; import google.registry.util.Retrier; import google.registry.util.SqlTemplate; import google.registry.util.UtilsModule; @@ -41,6 +44,8 @@ import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; @@ -113,15 +118,45 @@ public class Spec11Pipeline implements Serializable { } static PCollection readFromCloudSql(Pipeline pipeline) { - Read read = + Read> read = RegistryJpaIO.read( - "select d, r.emailAddress from Domain d join Registrar r on" - + " d.currentSponsorClientId = r.clientIdentifier where r.type = 'REAL'" - + " and d.deletionTime > now()", + "select d.repoId, r.emailAddress from Domain d join Registrar r on" + + " d.currentSponsorClientId = r.clientIdentifier where r.type = 'REAL' and" + + " d.deletionTime > now()", false, Spec11Pipeline::parseRow); - return pipeline.apply("Read active domains from Cloud SQL", read); + return pipeline + .apply("Read active domains from Cloud SQL", read) + .apply( + "Build DomainNameInfo", + ParDo.of( + new DoFn, DomainNameInfo>() { + @ProcessElement + public void processElement( + @Element KV input, OutputReceiver output) { + try (AppEngineEnvironment allowOfyEntity = new AppEngineEnvironment()) { + DomainBase domainBase = + jpaTm() + .transact( + () -> + jpaTm() + .loadByKey( + VKey.createSql(DomainBase.class, input.getKey()))); + String emailAddress = input.getValue(); + if (emailAddress == null) { + emailAddress = ""; + } + DomainNameInfo domainNameInfo = + DomainNameInfo.create( + domainBase.getDomainName(), + domainBase.getRepoId(), + domainBase.getCurrentSponsorRegistrarId(), + emailAddress); + output.output(domainNameInfo); + } + } + })); } static PCollection readFromBigQuery( @@ -142,17 +177,8 @@ public class Spec11Pipeline implements Serializable { .withTemplateCompatibility()); } - private static DomainNameInfo parseRow(Object[] row) { - DomainBase domainBase = (DomainBase) row[0]; - String emailAddress = (String) row[1]; - if (emailAddress == null) { - emailAddress = ""; - } - return DomainNameInfo.create( - domainBase.getDomainName(), - domainBase.getRepoId(), - domainBase.getCurrentSponsorRegistrarId(), - emailAddress); + private static KV parseRow(Object[] row) { + return KV.of((String) row[0], (String) row[1]); } static void saveToSql(