top of page
Hussain Poonawala

Data Migration from BigQuery to MongoDB

Updated: Aug 23, 2021

In this article, I’ll introduce the problem, the approaches thought off, and the one which we decided to use.


Data Migration from BigQuery to MongoDB

Challenge: We wanted to build a recommendation engine but as the calculation of scores would be computationally heavy (MapReduce and Aggregation queries in real time would not be of much help), we wanted to use the capabilities of bigquery and keep the scores/recommendations ready on the DB side. We could then update the recommendations every day by running a job.

For this we came up with 3 approaches:

  1. Retrieve the data from the bigquery and export the data to the CSV file(s) to a common location. A scheduler job process those files going through the ETL process and persist them into a MongoDB collection.

  2. Create an AirFlow job that does the extraction of the data from the bigquery, transforms the data as per the requirement, and loads them to the MongoDB collection.

  3. Last but not least, using the bigquery client SDK, retrieve the required data from BigQuery, transform and insert the data into MongoDB.

We decided to go ahead with the 3rd approach.


Step 1: Ready the query to get the required data from Bigquery

For this blog, I am using a sample dataset and retrieving all the data from the table and will insert this into MongoDB.


Ready the query to get the required data from Bigquery

Step 2: Initial Set Up to use the client library

Follow all the steps mentioned in the Before you begin section.


Step 3: Install the client library

We will be using maven, so we need to import the following dependencies in the pom:


<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>16.2.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
</dependency>
</dependencies>

Step 4: Complete source code to retrieve data from bigquery


BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

   // table ref to insert
   TableId tableId = TableId.of("project-id", "dataset-name", "food_events");

   // Query config
   QueryJobConfiguration queryConfig = QueryJobConfiguration
       // Query to run
       .newBuilder("SELECT * FROM `bigquery-public-data.fda_food.food_events`")
       // Sets the table where to put query results. If not provided a new table is created.
       .setDestinationTable(tableId).setUseLegacySql(false)
       // Sets the action that should occur if the destination table already exists.
       .setWriteDisposition(JobInfo.WriteDisposition.WRITE_TRUNCATE).build();

   // Create a job ID so that we can safely retry.
   JobId jobId = JobId.of(UUID.randomUUID().toString());
   Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

   // Wait for the query to complete.
   queryJob = queryJob.waitFor();

   // Check for errors
   if (queryJob == null) {
     throw new RuntimeException("Job no longer exists");
   } else if (queryJob.getStatus().getError() != null) {
     // You can also look at queryJob.getStatus().getExecutionErrors() for all
     // errors, not just the latest one.
     throw new RuntimeException(queryJob.getStatus().getError().toString());
   }


After finishing the job, we can iterate over the result and insert the records into MongoDB:


// Get the results.
   TableResult result = queryJob.getQueryResults();

   int batchSize = 10000;
   ArrayList<FoodEvent> foodEventArrayList = new ArrayList<>();
   // Iterate over all the rows
   for (FieldValueList row : result.iterateAll()) {

     try {
       // Fetch the values
       String reportNumber = row.get("report_number").getStringValue();
       String reactions = row.get("reactions").getStringValue();
       String outcomes = row.get("outcomes").getStringValue();
       String brandName = row.get("products_brand_name").getStringValue();
       String industryCode = row.get("products_industry_code").getStringValue();


       FoodEvent foodEvent = FoodEvent.builder().reportNumber(reportNumber).reactions(reactions)
           .outcomes(outcomes).brandName(brandName).industryCode(industryCode).build();
       foodEventArrayList.add(foodEvent);

       // Bulk insert in batches
       if (foodEventArrayList.size() >= batchSize) {
         // add all the orders to db
         this.bulkMongoDbOperationService.bulkInsertEvents(foodEventArrayList);
         foodEventArrayList.clear();
       }
     } catch (Exception e) {
       log.error("Exception while parsing ({})", row, e);
     }

   }
   if (foodEventArrayList.size() > 0) {
     // add all the orders to db
     this.bulkMongoDbOperationService.bulkInsertEvents(foodEventArrayList);
     foodEventArrayList.clear();
   }

Step 5: Inserting into MongoDB


You can persist or save the documents into MongoDB either by loading them one by one or in several batches. Inserting the records in batches can improve the performance dramatically. So we insert them in batches as below:


public void bulkInsertEvents(List<FoodEvent> foodEventList) {
   try {
     BulkWriteResult bulkWriteResult =
         mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, FoodEvent.class)
             .insert(foodEventList).execute();
   } catch (BulkWriteException bwe) {
     List<Integer> failedEventIndexes = new ArrayList<>();
     for (BulkWriteError e : bwe.getWriteErrors()) {
       failedEventIndexes.add(e.getIndex());
       log.error("Insertion failed for index: {} with error: {}", e.getIndex(), e);
     }
   } catch (Exception e) {
     log.error("Bulk Insertion failed for {} events: {}", foodEventList.size(), foodEventList, e);
   }
 }

Conclusion: We were able to migrate data from BigQuery to MongoDB using the above steps. But we found that BigQuery is quite fast at processing large sets of data, however retrieving large datasets was a bottleneck. It took a lot of time to paginate and insert into the DB. We played a lot with different page sizes and batch sizes to find the optimal parameters for us. And as our data size was not very huge (approx 15 lakh), we were fine with this approach. But if your dataset size is huge, you might want to try some other approach. Do comment if you have any suggestions or a totally new/better approach.


References:

Also, read our blog on JavaScript Engine Functionality

22 views0 comments

Recent Posts

See All

コメント


bottom of page