[WIP] Create mongodb-to-mongodb template#3752
[WIP] Create mongodb-to-mongodb template#3752michaeltle-goog wants to merge 16 commits intoGoogleCloudPlatform:mainfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a new Dataflow template designed to copy data between MongoDB instances. It includes the necessary project configuration, the main pipeline logic, and custom transformation classes to handle data writing with built-in error management via Dead Letter Queues. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a new Dataflow template for migrating data between MongoDB databases, supporting both single and multi-collection transfers with integrated Dead Letter Queue (DLQ) functionality. The code review identifies several critical areas for improvement regarding data integrity and pipeline efficiency. Key feedback includes addressing silent data loss for documents missing '_id' fields, refining bulk write error handling to prevent successful records from being sent to the DLQ, and avoiding generic exception catching to allow for standard Beam retries on transient errors. Additionally, the reviewer suggested optimizing the MongoClient lifecycle, improving write parallelism through distributed keying, and extending standard template options to align with existing patterns.
| if (id != null) { | ||
| updates.add( | ||
| new ReplaceOneModel<>( | ||
| new Document("_id", id), | ||
| doc, | ||
| new ReplaceOptions().upsert(true))); | ||
| } |
There was a problem hiding this comment.
Documents that do not contain an _id field are currently skipped silently. This can lead to data loss. These documents should be identified and sent to the failure tag. To ensure consistent implementation, particularly for aspects like character encoding, use shared helper functions for these common data manipulations.
if (id != null) {
updates.add(
new ReplaceOneModel<>(
new Document("_id", id),
doc,
new ReplaceOptions().upsert(true)));
} else {
c.output(failureTag, doc.toJson() + " - Error: Document missing _id");
}References
- Create shared helper functions for common data manipulations to ensure consistent implementation, especially for character encoding.
| } catch (MongoBulkWriteException e) { | ||
| for (Document doc : docList) { | ||
| c.output( | ||
| failureTag, doc.toJson() + " - Error: " + e.getMessage()); | ||
| } |
There was a problem hiding this comment.
When a MongoBulkWriteException occurs, the entire batch is sent to the DLQ. This is problematic because successful writes within the batch are also marked as failures. Use e.getWriteErrors() to identify and output only the specific documents that failed. Ensure that the conversion to the failure output uses shared helper functions to maintain consistency in data formatting and encoding.
References
- Create shared helper functions for common data manipulations to ensure consistent implementation, especially for character encoding.
| } catch (Exception e) { | ||
| for (Document doc : docList) { | ||
| c.output( | ||
| failureTag, doc.toJson() + " - Error: " + e.getMessage()); | ||
| } | ||
| } |
There was a problem hiding this comment.
Catching a generic Exception and sending the entire batch to the DLQ is discouraged for transient errors (e.g., network timeouts). This prevents the Beam runner from retrying the bundle, which is the preferred way to handle temporary issues. Only non-transient exceptions related to data content should be caught and sent to the DLQ.
| optionsClass = MongoDbToMongoDb.Options.class) | ||
| public class MongoDbToMongoDb { | ||
|
|
||
| public interface Options extends PipelineOptions { |
There was a problem hiding this comment.
The Options interface should extend CommonTemplateOptions to include standard Dataflow parameters such as extraFilesToStage and disabledAlgorithms, which are useful for configuring worker environments and handling security requirements.
| public interface Options extends PipelineOptions { | |
| public interface Options extends PipelineOptions, com.google.cloud.teleport.v2.options.CommonTemplateOptions { |
| } | ||
| } | ||
|
|
||
| for (String collection : collections) { |
There was a problem hiding this comment.
Iterating over collections and applying transforms within a loop can lead to an excessively large pipeline graph if the source database contains many collections. This can result in slow job submission or even failure to submit the job due to graph size limits. Consider adding a mechanism to limit the number of collections or processing them in a more dynamic way if possible.
|
|
||
| PCollectionTuple writeResults = | ||
| input | ||
| .apply("AddDummyKey", WithKeys.of("")) |
There was a problem hiding this comment.
Using a single static key ("") for GroupIntoBatches limits the parallelism of the write stage, as all documents for a given collection will be routed to a single worker thread for batching. To improve throughput, consider using a distributed keying strategy, such as a random integer or a hash of the document.
| .apply("AddDummyKey", WithKeys.of("")) | |
| .apply("AddKeys", WithKeys.of(doc -> java.util.concurrent.ThreadLocalRandom.current().nextInt(100))) |
| @StartBundle | ||
| public void startBundle() { | ||
| mongoClient = MongoClients.create(uri); | ||
| MongoDatabase db = mongoClient.getDatabase(database); | ||
|
|
||
| WriteConcern wc = WriteConcern.ACKNOWLEDGED; | ||
| if (writeConcern != null) { | ||
| if (writeConcern.equalsIgnoreCase("majority")) { | ||
| wc = WriteConcern.MAJORITY; | ||
| } else { | ||
| try { | ||
| int w = Integer.parseInt(writeConcern); | ||
| wc = new WriteConcern(w); | ||
| } catch (NumberFormatException e) { | ||
| // Fallback to default | ||
| } | ||
| } | ||
| } | ||
| if (journal != null) { | ||
| wc = wc.withJournal(journal); | ||
| } | ||
|
|
||
| mongoCollection = db.getCollection(collection).withWriteConcern(wc); | ||
| } |
There was a problem hiding this comment.
The MongoClient is currently initialized in @StartBundle and closed in @FinishBundle. It is more efficient to use @setup and @teardown for managing the client's lifecycle. This allows the client and its underlying connection pool to be reused across multiple bundles within the same worker process, reducing the overhead of establishing connections.
Codecov Report❌ Patch coverage is ❌ Your patch check has failed because the patch coverage (57.53%) is below the target coverage (80.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #3752 +/- ##
============================================
+ Coverage 53.09% 53.18% +0.08%
- Complexity 6031 6520 +489
============================================
Files 1072 1078 +6
Lines 64967 65515 +548
Branches 7198 7261 +63
============================================
+ Hits 34493 34841 +348
- Misses 28160 28334 +174
- Partials 2314 2340 +26
🚀 New features to boost your workflow:
|
| <module>kinesis-to-pubsub</module> | ||
| <module>kudu-to-bigquery</module> | ||
| <module>managed-io-to-managed-io</module> | ||
| <module>mongodb-to-mongodb</module> |
There was a problem hiding this comment.
There is effort to consolidate templates by its source/sink dependency types. In particular MongoDB and Googlecloud templates has put into v2/mongodb-and-googlecloud module and share the template image: #3315. We can use the existing module.
…DataflowTemplates into mongo-to-mongo-template
No description provided.