Skip to content

[WIP] Create mongodb-to-mongodb template#3752

Open
michaeltle-goog wants to merge 16 commits intoGoogleCloudPlatform:mainfrom
michaeltle-goog:mongo-to-mongo-template
Open

[WIP] Create mongodb-to-mongodb template#3752
michaeltle-goog wants to merge 16 commits intoGoogleCloudPlatform:mainfrom
michaeltle-goog:mongo-to-mongo-template

Conversation

@michaeltle-goog
Copy link
Copy Markdown

No description provided.

@gemini-code-assist
Copy link
Copy Markdown

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a new Dataflow template designed to copy data between MongoDB instances. It includes the necessary project configuration, the main pipeline logic, and custom transformation classes to handle data writing with built-in error management via Dead Letter Queues.

Highlights

  • New Template Implementation: Introduced a new Dataflow template to facilitate data migration from one MongoDB database to another.
  • Custom Write Transform: Implemented a custom PTransform with support for Dead Letter Queue (DLQ) handling to manage write failures robustly.
  • Configuration Options: Added comprehensive pipeline options including support for BSON filtering, projections, batch sizing, and write concerns.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@michaeltle-goog michaeltle-goog added addition New feature or request MongoDB labels May 1, 2026
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new Dataflow template for migrating data between MongoDB databases, supporting both single and multi-collection transfers with integrated Dead Letter Queue (DLQ) functionality. The code review identifies several critical areas for improvement regarding data integrity and pipeline efficiency. Key feedback includes addressing silent data loss for documents missing '_id' fields, refining bulk write error handling to prevent successful records from being sent to the DLQ, and avoiding generic exception catching to allow for standard Beam retries on transient errors. Additionally, the reviewer suggested optimizing the MongoClient lifecycle, improving write parallelism through distributed keying, and extending standard template options to align with existing patterns.

Comment on lines +155 to +161
if (id != null) {
updates.add(
new ReplaceOneModel<>(
new Document("_id", id),
doc,
new ReplaceOptions().upsert(true)));
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Documents that do not contain an _id field are currently skipped silently. This can lead to data loss. These documents should be identified and sent to the failure tag. To ensure consistent implementation, particularly for aspects like character encoding, use shared helper functions for these common data manipulations.

                                if (id != null) {
                                  updates.add(
                                      new ReplaceOneModel<>(
                                          new Document("_id", id),
                                          doc,
                                          new ReplaceOptions().upsert(true)));
                                } else {
                                  c.output(failureTag, doc.toJson() + " - Error: Document missing _id");
                                }
References
  1. Create shared helper functions for common data manipulations to ensure consistent implementation, especially for character encoding.

Comment on lines +168 to +172
} catch (MongoBulkWriteException e) {
for (Document doc : docList) {
c.output(
failureTag, doc.toJson() + " - Error: " + e.getMessage());
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When a MongoBulkWriteException occurs, the entire batch is sent to the DLQ. This is problematic because successful writes within the batch are also marked as failures. Use e.getWriteErrors() to identify and output only the specific documents that failed. Ensure that the conversion to the failure output uses shared helper functions to maintain consistency in data formatting and encoding.

References
  1. Create shared helper functions for common data manipulations to ensure consistent implementation, especially for character encoding.

Comment on lines +173 to +178
} catch (Exception e) {
for (Document doc : docList) {
c.output(
failureTag, doc.toJson() + " - Error: " + e.getMessage());
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Catching a generic Exception and sending the entire batch to the DLQ is discouraged for transient errors (e.g., network timeouts). This prevents the Beam runner from retrying the bundle, which is the preferred way to handle temporary issues. Only non-transient exceptions related to data content should be caught and sent to the DLQ.

optionsClass = MongoDbToMongoDb.Options.class)
public class MongoDbToMongoDb {

public interface Options extends PipelineOptions {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Options interface should extend CommonTemplateOptions to include standard Dataflow parameters such as extraFilesToStage and disabledAlgorithms, which are useful for configuring worker environments and handling security requirements.

Suggested change
public interface Options extends PipelineOptions {
public interface Options extends PipelineOptions, com.google.cloud.teleport.v2.options.CommonTemplateOptions {

}
}

for (String collection : collections) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Iterating over collections and applying transforms within a loop can lead to an excessively large pipeline graph if the source database contains many collections. This can result in slow job submission or even failure to submit the job due to graph size limits. Consider adding a mechanism to limit the number of collections or processing them in a more dynamic way if possible.


PCollectionTuple writeResults =
input
.apply("AddDummyKey", WithKeys.of(""))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using a single static key ("") for GroupIntoBatches limits the parallelism of the write stage, as all documents for a given collection will be routed to a single worker thread for batching. To improve throughput, consider using a distributed keying strategy, such as a random integer or a hash of the document.

Suggested change
.apply("AddDummyKey", WithKeys.of(""))
.apply("AddKeys", WithKeys.of(doc -> java.util.concurrent.ThreadLocalRandom.current().nextInt(100)))

Comment on lines +121 to +144
@StartBundle
public void startBundle() {
mongoClient = MongoClients.create(uri);
MongoDatabase db = mongoClient.getDatabase(database);

WriteConcern wc = WriteConcern.ACKNOWLEDGED;
if (writeConcern != null) {
if (writeConcern.equalsIgnoreCase("majority")) {
wc = WriteConcern.MAJORITY;
} else {
try {
int w = Integer.parseInt(writeConcern);
wc = new WriteConcern(w);
} catch (NumberFormatException e) {
// Fallback to default
}
}
}
if (journal != null) {
wc = wc.withJournal(journal);
}

mongoCollection = db.getCollection(collection).withWriteConcern(wc);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The MongoClient is currently initialized in @StartBundle and closed in @FinishBundle. It is more efficient to use @setup and @teardown for managing the client's lifecycle. This allows the client and its underlying connection pool to be reused across multiple bundles within the same worker process, reducing the overhead of establishing connections.

@codecov
Copy link
Copy Markdown

codecov Bot commented May 1, 2026

Codecov Report

❌ Patch coverage is 57.53968% with 107 lines in your changes missing coverage. Please review.
✅ Project coverage is 53.18%. Comparing base (0af55a0) to head (5645f8f).
⚠️ Report is 13 commits behind head on main.

Files with missing lines Patch % Lines
.../cloud/teleport/v2/templates/MongoDbToMongoDb.java 0.00% 68 Missing ⚠️
...e/cloud/teleport/v2/transforms/MongoDbWriteFn.java 80.15% 17 Missing and 9 partials ⚠️
...loud/teleport/v2/transforms/MongoDbTransforms.java 75.47% 9 Missing and 4 partials ⚠️

❌ Your patch check has failed because the patch coverage (57.53%) is below the target coverage (80.00%). You can increase the patch coverage or adjust the target coverage.

Additional details and impacted files
@@             Coverage Diff              @@
##               main    #3752      +/-   ##
============================================
+ Coverage     53.09%   53.18%   +0.08%     
- Complexity     6031     6520     +489     
============================================
  Files          1072     1078       +6     
  Lines         64967    65515     +548     
  Branches       7198     7261      +63     
============================================
+ Hits          34493    34841     +348     
- Misses        28160    28334     +174     
- Partials       2314     2340      +26     
Components Coverage Δ
spanner-templates 72.82% <ø> (-0.05%) ⬇️
spanner-import-export 68.65% <ø> (+0.01%) ⬆️
spanner-live-forward-migration 80.91% <ø> (-0.09%) ⬇️
spanner-live-reverse-replication 77.03% <ø> (-0.18%) ⬇️
spanner-bulk-migration 91.09% <ø> (-0.06%) ⬇️
gcs-spanner-dv 85.72% <ø> (-0.11%) ⬇️
Files with missing lines Coverage Δ
...loud/teleport/v2/transforms/MongoDbTransforms.java 75.47% <75.47%> (ø)
...e/cloud/teleport/v2/transforms/MongoDbWriteFn.java 80.15% <80.15%> (ø)
.../cloud/teleport/v2/templates/MongoDbToMongoDb.java 0.00% <0.00%> (ø)

... and 21 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Comment thread v2/pom.xml
<module>kinesis-to-pubsub</module>
<module>kudu-to-bigquery</module>
<module>managed-io-to-managed-io</module>
<module>mongodb-to-mongodb</module>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is effort to consolidate templates by its source/sink dependency types. In particular MongoDB and Googlecloud templates has put into v2/mongodb-and-googlecloud module and share the template image: #3315. We can use the existing module.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

addition New feature or request MongoDB size/XXL

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants