In this article, I’ll introduce the problem, the approaches thought off, and the one which we decided to use.
Challenge: We wanted to build a recommendation engine but as the calculation of scores would be computationally heavy (MapReduce and Aggregation queries in real time would not be of much help), we wanted to use the capabilities of bigquery and keep the scores/recommendations ready on the DB side. We could then update the recommendations every day by running a job.
For this we came up with 3 approaches:
Retrieve the data from the bigquery and export the data to the CSV file(s) to a common location. A scheduler job process those files going through the ETL process and persist them into a MongoDB collection.
Create an AirFlow job that does the extraction of the data from the bigquery, transforms the data as per the requirement, and loads them to the MongoDB collection.
Last but not least, using the bigquery client SDK, retrieve the required data from BigQuery, transform and insert the data into MongoDB.
We decided to go ahead with the 3rd approach.
Step 1: Ready the query to get the required data from Bigquery
For this blog, I am using a sample dataset and retrieving all the data from the table and will insert this into MongoDB.
Step 2: Initial Set Up to use the client library
Follow all the steps mentioned in the Before you begin section.
Step 3: Install the client library
We will be using maven, so we need to import the following dependencies in the pom:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>16.2.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
</dependency>
</dependencies>
Step 4: Complete source code to retrieve data from bigquery
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
// table ref to insert
TableId tableId = TableId.of("project-id", "dataset-name", "food_events");
// Query config
QueryJobConfiguration queryConfig = QueryJobConfiguration
// Query to run
.newBuilder("SELECT * FROM `bigquery-public-data.fda_food.food_events`")
// Sets the table where to put query results. If not provided a new table is created.
.setDestinationTable(tableId).setUseLegacySql(false)
// Sets the action that should occur if the destination table already exists.
.setWriteDisposition(JobInfo.WriteDisposition.WRITE_TRUNCATE).build();
// Create a job ID so that we can safely retry.
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
// Wait for the query to complete.
queryJob = queryJob.waitFor();
// Check for errors
if (queryJob == null) {
throw new RuntimeException("Job no longer exists");
} else if (queryJob.getStatus().getError() != null) {
// You can also look at queryJob.getStatus().getExecutionErrors() for all
// errors, not just the latest one.
throw new RuntimeException(queryJob.getStatus().getError().toString());
}
After finishing the job, we can iterate over the result and insert the records into MongoDB:
// Get the results.
TableResult result = queryJob.getQueryResults();
int batchSize = 10000;
ArrayList<FoodEvent> foodEventArrayList = new ArrayList<>();
// Iterate over all the rows
for (FieldValueList row : result.iterateAll()) {
try {
// Fetch the values
String reportNumber = row.get("report_number").getStringValue();
String reactions = row.get("reactions").getStringValue();
String outcomes = row.get("outcomes").getStringValue();
String brandName = row.get("products_brand_name").getStringValue();
String industryCode = row.get("products_industry_code").getStringValue();
FoodEvent foodEvent = FoodEvent.builder().reportNumber(reportNumber).reactions(reactions)
.outcomes(outcomes).brandName(brandName).industryCode(industryCode).build();
foodEventArrayList.add(foodEvent);
// Bulk insert in batches
if (foodEventArrayList.size() >= batchSize) {
// add all the orders to db
this.bulkMongoDbOperationService.bulkInsertEvents(foodEventArrayList);
foodEventArrayList.clear();
}
} catch (Exception e) {
log.error("Exception while parsing ({})", row, e);
}
}
if (foodEventArrayList.size() > 0) {
// add all the orders to db
this.bulkMongoDbOperationService.bulkInsertEvents(foodEventArrayList);
foodEventArrayList.clear();
}
Step 5: Inserting into MongoDB
You can persist or save the documents into MongoDB either by loading them one by one or in several batches. Inserting the records in batches can improve the performance dramatically. So we insert them in batches as below:
public void bulkInsertEvents(List<FoodEvent> foodEventList) {
try {
BulkWriteResult bulkWriteResult =
mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, FoodEvent.class)
.insert(foodEventList).execute();
} catch (BulkWriteException bwe) {
List<Integer> failedEventIndexes = new ArrayList<>();
for (BulkWriteError e : bwe.getWriteErrors()) {
failedEventIndexes.add(e.getIndex());
log.error("Insertion failed for index: {} with error: {}", e.getIndex(), e);
}
} catch (Exception e) {
log.error("Bulk Insertion failed for {} events: {}", foodEventList.size(), foodEventList, e);
}
}
Conclusion: We were able to migrate data from BigQuery to MongoDB using the above steps. But we found that BigQuery is quite fast at processing large sets of data, however retrieving large datasets was a bottleneck. It took a lot of time to paginate and insert into the DB. We played a lot with different page sizes and batch sizes to find the optimal parameters for us. And as our data size was not very huge (approx 15 lakh), we were fine with this approach. But if your dataset size is huge, you might want to try some other approach. Do comment if you have any suggestions or a totally new/better approach.
References:
Also, read our blog on JavaScript Engine Functionality
Comments