Always use JPA TM on Beam (#1474)

* Always use JPA TM on Beam

Beam does not have access to datastore. Using ofy on Beam always results
in an error. Normally we should use database migration state schedule to
determine which TM to use, but on Beam there's no point in doing so. By
hard-coding the TM on beam to be SQL we can start testing features before
we migrate to SQL mode, for example the new RDE pipeline.

Also made a change to where the manual deposits are stored. It made more
sense to store them under manual/[direcitory]/[jobname]/ instead of
[jobname]/manual/[directory]/.

TESTED=deployed the pipeline on production and ran a job.
This commit is contained in:
Lai Jiang 2021-12-22 14:43:25 -05:00 committed by GitHub
parent 143cea6dbe
commit a440fbe9b0
3 changed files with 19 additions and 13 deletions

View file

@ -167,13 +167,13 @@ public class RdeIO {
Optional.ofNullable(key.revision())
.orElseGet(() -> RdeRevision.getNextRevision(tld, watermark, mode));
String id = RdeUtil.timestampToId(watermark);
String prefix = options.getJobName();
String basename = RdeNamingUtils.makeRydeFilename(tld, watermark, mode, 1, revision);
String prefix =
options.getJobName()
+ '/'
+ RdeNamingUtils.makeRydeFilename(tld, watermark, mode, 1, revision);
if (key.manual()) {
checkState(key.directoryWithTrailingSlash() != null, "Manual subdirectory not specified");
prefix = prefix + "/manual/" + key.directoryWithTrailingSlash() + basename;
} else {
prefix = prefix + '/' + basename;
prefix = "manual/" + key.directoryWithTrailingSlash() + prefix;
}
BlobId xmlFilename = BlobId.of(rdeBucket, prefix + ".xml.ghostryde");
// This file will contain the byte length (ASCII) of the raw unencrypted XML.

View file

@ -35,7 +35,7 @@ import org.joda.time.DateTime;
/** Factory class to create {@link TransactionManager} instance. */
// TODO: Rename this to PersistenceFactory and move to persistence package.
public class TransactionManagerFactory {
public final class TransactionManagerFactory {
private static final DatastoreTransactionManager ofyTm = createTransactionManager();
@ -47,6 +47,8 @@ public class TransactionManagerFactory {
private static Supplier<JpaTransactionManager> jpaTm =
Suppliers.memoize(TransactionManagerFactory::createJpaTransactionManager);
private static boolean onBeam = false;
private TransactionManagerFactory() {}
private static JpaTransactionManager createJpaTransactionManager() {
@ -86,6 +88,9 @@ public class TransactionManagerFactory {
if (tmForTest.isPresent()) {
return tmForTest.get();
}
if (onBeam) {
return jpaTm();
}
return DatabaseMigrationStateSchedule.getValueAtTime(DateTime.now(UTC))
.getPrimaryDatabase()
.equals(PrimaryDatabase.DATASTORE)
@ -127,6 +132,7 @@ public class TransactionManagerFactory {
public static void setJpaTmOnBeamWorker(Supplier<JpaTransactionManager> jpaTmSupplier) {
checkNotNull(jpaTmSupplier, "jpaTmSupplier");
jpaTm = Suppliers.memoize(jpaTmSupplier::get);
onBeam = true;
}
/**

View file

@ -497,13 +497,13 @@ public class RdePipelineTest {
verifyFiles(ImmutableMap.of(brdaKey, brdaFragments, rdeKey, rdeFragments), true);
assertThat(gcsUtils.listFolderObjects("gcs-bucket", "rde-job/"))
assertThat(gcsUtils.listFolderObjects("gcs-bucket", "manual/test/rde-job/"))
.containsExactly(
"manual/test/soy_2000-01-01_thin_S1_R0.xml.length",
"manual/test/soy_2000-01-01_thin_S1_R0.xml.ghostryde",
"manual/test/soy_2000-01-01_full_S1_R0.xml.length",
"manual/test/soy_2000-01-01_full_S1_R0.xml.ghostryde",
"manual/test/soy_2000-01-01_full_S1_R0-report.xml.ghostryde");
"soy_2000-01-01_thin_S1_R0.xml.length",
"soy_2000-01-01_thin_S1_R0.xml.ghostryde",
"soy_2000-01-01_full_S1_R0.xml.length",
"soy_2000-01-01_full_S1_R0.xml.ghostryde",
"soy_2000-01-01_full_S1_R0-report.xml.ghostryde");
assertThat(loadCursorTime(CursorType.BRDA)).isEquivalentAccordingToCompareTo(now);
assertThat(loadRevision(now, THIN)).isEqualTo(0);
@ -521,7 +521,7 @@ public class RdePipelineTest {
rdePipeline.persistData(fragments);
pipeline.run().waitUntilFinish();
String prefix = manual ? "rde-job/manual/test/" : "rde-job/";
String prefix = manual ? "manual/test/rde-job/" : "rde-job/";
String revision = manual ? "R0" : "R1";
// BRDA