diff --git a/core/src/main/java/google/registry/beam/rde/RdeIO.java b/core/src/main/java/google/registry/beam/rde/RdeIO.java index 03cbb5d24..2058556f3 100644 --- a/core/src/main/java/google/registry/beam/rde/RdeIO.java +++ b/core/src/main/java/google/registry/beam/rde/RdeIO.java @@ -167,13 +167,13 @@ public class RdeIO { Optional.ofNullable(key.revision()) .orElseGet(() -> RdeRevision.getNextRevision(tld, watermark, mode)); String id = RdeUtil.timestampToId(watermark); - String prefix = options.getJobName(); - String basename = RdeNamingUtils.makeRydeFilename(tld, watermark, mode, 1, revision); + String prefix = + options.getJobName() + + '/' + + RdeNamingUtils.makeRydeFilename(tld, watermark, mode, 1, revision); if (key.manual()) { checkState(key.directoryWithTrailingSlash() != null, "Manual subdirectory not specified"); - prefix = prefix + "/manual/" + key.directoryWithTrailingSlash() + basename; - } else { - prefix = prefix + '/' + basename; + prefix = "manual/" + key.directoryWithTrailingSlash() + prefix; } BlobId xmlFilename = BlobId.of(rdeBucket, prefix + ".xml.ghostryde"); // This file will contain the byte length (ASCII) of the raw unencrypted XML. diff --git a/core/src/main/java/google/registry/persistence/transaction/TransactionManagerFactory.java b/core/src/main/java/google/registry/persistence/transaction/TransactionManagerFactory.java index 4f053abff..a2c4da2d5 100644 --- a/core/src/main/java/google/registry/persistence/transaction/TransactionManagerFactory.java +++ b/core/src/main/java/google/registry/persistence/transaction/TransactionManagerFactory.java @@ -35,7 +35,7 @@ import org.joda.time.DateTime; /** Factory class to create {@link TransactionManager} instance. */ // TODO: Rename this to PersistenceFactory and move to persistence package. -public class TransactionManagerFactory { +public final class TransactionManagerFactory { private static final DatastoreTransactionManager ofyTm = createTransactionManager(); @@ -47,6 +47,8 @@ public class TransactionManagerFactory { private static Supplier jpaTm = Suppliers.memoize(TransactionManagerFactory::createJpaTransactionManager); + private static boolean onBeam = false; + private TransactionManagerFactory() {} private static JpaTransactionManager createJpaTransactionManager() { @@ -86,6 +88,9 @@ public class TransactionManagerFactory { if (tmForTest.isPresent()) { return tmForTest.get(); } + if (onBeam) { + return jpaTm(); + } return DatabaseMigrationStateSchedule.getValueAtTime(DateTime.now(UTC)) .getPrimaryDatabase() .equals(PrimaryDatabase.DATASTORE) @@ -127,6 +132,7 @@ public class TransactionManagerFactory { public static void setJpaTmOnBeamWorker(Supplier jpaTmSupplier) { checkNotNull(jpaTmSupplier, "jpaTmSupplier"); jpaTm = Suppliers.memoize(jpaTmSupplier::get); + onBeam = true; } /** diff --git a/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java b/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java index 3b7e9621a..8371df291 100644 --- a/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java +++ b/core/src/test/java/google/registry/beam/rde/RdePipelineTest.java @@ -497,13 +497,13 @@ public class RdePipelineTest { verifyFiles(ImmutableMap.of(brdaKey, brdaFragments, rdeKey, rdeFragments), true); - assertThat(gcsUtils.listFolderObjects("gcs-bucket", "rde-job/")) + assertThat(gcsUtils.listFolderObjects("gcs-bucket", "manual/test/rde-job/")) .containsExactly( - "manual/test/soy_2000-01-01_thin_S1_R0.xml.length", - "manual/test/soy_2000-01-01_thin_S1_R0.xml.ghostryde", - "manual/test/soy_2000-01-01_full_S1_R0.xml.length", - "manual/test/soy_2000-01-01_full_S1_R0.xml.ghostryde", - "manual/test/soy_2000-01-01_full_S1_R0-report.xml.ghostryde"); + "soy_2000-01-01_thin_S1_R0.xml.length", + "soy_2000-01-01_thin_S1_R0.xml.ghostryde", + "soy_2000-01-01_full_S1_R0.xml.length", + "soy_2000-01-01_full_S1_R0.xml.ghostryde", + "soy_2000-01-01_full_S1_R0-report.xml.ghostryde"); assertThat(loadCursorTime(CursorType.BRDA)).isEquivalentAccordingToCompareTo(now); assertThat(loadRevision(now, THIN)).isEqualTo(0); @@ -521,7 +521,7 @@ public class RdePipelineTest { rdePipeline.persistData(fragments); pipeline.run().waitUntilFinish(); - String prefix = manual ? "rde-job/manual/test/" : "rde-job/"; + String prefix = manual ? "manual/test/rde-job/" : "rde-job/"; String revision = manual ? "R0" : "R1"; // BRDA