Spark 3 migration for the top global retailer
This article sums up our work with a challenging Spark 3 migration for a top global retailer.
Technology plays an essential role in many companies, not just tech-focused organisations. Modern and ambitious offline businesses have also relied on technology for years.
When appropriately applied, technology allows business efficiency to grow by increasing productivity or reducing costs. However, besides the benefits, technology also has associated costs, e.g. the necessary upgrades and maintenance of technological platforms. There’s a danger in relying on legacy platforms without proper maintenance. It can cause performance to regress, cause organisations to lose their competitive edge, and create security risks by leaving platforms vulnerable to security incidents.
For one of our clients, the next step in their data analytics strategy is moving to a hybrid data lake capable of operating in a plug & play manner, which will be achieved by separating the computation layer from the storage layer. We worked with the client to create a high-level plan to guide their existing big data platform towards this goal.
One of the first steps was migrating to a higher version of HDP and, as a prerequisite for this process, we had to upgrade Apache Spark to a version compatible with both current and future versions of HDP. This upgrade allowed us to minimise risk by separating the migration stages, which enabled us to test the changes in isolation.
This article shares our approach to this challenge, with a detailed explanation of every aspect of the migration, including the business context, requirements analysis, technical insights, and our conclusions after completing this process. Enjoy!
Business Context – The client’s perspective on migration
Our client is a global retailer that handles thousands of orders per second. Each order requires that a series of actions be fulfilled. Operations within their retail business are expensive, complex, and demanding enough to be divided into multiple domains, and each of the domains is big enough (taking into consideration the variety, velocity, and volume of data) to be split into tens of data pipelines.
The complexity of the business impacts the size of the data platform and its operational aspects. One of the most crucial aspects is high availability, which means there is no room for long-lasting downtime. That was especially important when migrating the platform to a higher version of Spark (from 2.3.0 to 3.0.2, in this case) because the process needed to be planned and executed without interrupting operations.
Engineering capacity also influences the migration process. The data teams’ work scope was divided into a few areas, such as infrastructure, maintenance and business tasks, so the amount of work needed to be balanced between them. Another influential factor is the engineers themselves, who are in short supply — not only in this project but throughout the data industry. It is a common issue, familiar to every organisation that is (or wants to be) data-driven.
One additional aspect to consider during the migration process is the danger that data is deleted, overridden or becomes polluted. This can happen, for example, when changes are made in the way data types are handled. To prevent data corruption, the data needs to be protected (i.e. backed up) during the migration process. Data consistency is enormously important, especially for data-driven organisations that depend on data insights for their operations. In our client’s case, they require data to identify the resources engaged in the fulfilment process.
The final element that impacted the migration described in this article was Christmas, as the migration happened in November and December, a particularly hot time in retail. This made the demand for a resilient and highly-available data platform even more critical.
The scope of every technical task we performed was based on both the business context (above) and technological context, which we will describe in this section.
The migration process began with a so-called Spike task, as should every complex and demanding epic. This was a user story created to do research, understand stakeholder needs, find potential bottlenecks, divide implementation into smaller pieces wherever necessary, and create actionable tasks based on the output from the listed activities. Such a process has been conducted for the topic of interest of this article, and the output of its technical part was a view on the project internals, described below.
The build process was executed by a Gradle Kotlin (DSL) plugin, and the business logic was implemented with Scala 2.11, with Spark 2.3.0 applied as the distributed computing framework/engine. The CI/CD process was performed by Jenkins, Ansible, and custom Bash scripts.
These tools made deployment, testing, and operation more elastic for the engineers. However, the tools had a few drawbacks, as well. One of them was sensitivity to changes in the CD process itself—Any modification required at least two scripts to be altered. The scope of these modifications was task-dependent; however, such an operation required more resources than the script adjustment for a single tool.
Another thing we noticed while working on the spike tasks for the migration was the source code for the build process. We selected the Gradle Kotlin DSL plugin as a build tool due to the build pace and the fact that it offers better performance than competitive SBT. The choice was rational and feasible; however, the accumulation of business tasks caused that the Kotlin source code has not yet been modularised/generalised, and thus any changes in project dependencies required additional effort.
Now that both the business and technical aspects of the migration have been introduced let’s move on to our conclusions from the research and implementation.
The analysis included a few challenges (some of them, e.g. high availability, resource shortages and data consistency, have already been touched on). Yet, there was another aspect that needed to be taken into account and applied during the migration process: backward compatibility (BC). BC can be described as a property of the system, script, or process that allows it to work with its legacy version. It refers to all aspects of migration, precisely: business logic, building source code, and the CI/CD scripts.
There is one more argument in favour of choosing backward compatibility. That is the shortage of engineers. As an example, our domain team consisted of four engineers – two of them needed to work on business tasks and one was engaged in support, so only the fourth was fully dedicated to infrastructure-related issues, which in this case was the Spark migration. Since people working on business tasks needed to maintain the ability to deliver, the environments used in the development process could not be frozen at any time.
There are a few ways to achieve backward compatibility. It can be easily achieved in the CD process since it requires a simple change in deployment scripts. However, when it comes to other aspects of BC in migration (i.e. business logic and build tool/script/code), the task is no longer as straightforward. There are two ways to get around this: the first is to fork the master branch. This comes with at least one serious drawback – the necessity to maintain both versions of code. The second option is cross-compilation/build. This option causes fewer maintenance issues, however, additional effort is required to apply cross-compilation logic into the build code.
We chose cross-compilation in this project, which offered some extra benefits. Proper implementation will make further migrations/upgrades less challenging and will enforce build code modularisation, which makes it easier to work with.
The last challenge we will detail is data consistency and coherency. The default output format within the project (for Spark 2.3) was Apache Parquet. Parquet employs column-oriented data representation, which enhances the performance of both bulk and aggregate queries.
The output data format from jobs employing the new version of HDP should be supported by tools and frameworks which are currently used in the project. In particular, Apache Hive (v1.2.1) is important in this context since there are some well-known issues related to handling Parquet files generated by Spark 3 and Hive v1.2 (i.e. here or here). Since these bugs have not been fixed and because Apache Hive will only be upgraded later, a workaround is required. One possible solution is the usage of another data format. The demand for an efficient high-performance data format in both storage and processing is the reason why we focused on column-oriented formats. And, as an alternative to Parquet, we chose to use the ORC format. Both formats share most of their capabilities, supporting similar types, compressions, encodings and optimisations. What’s more, Hive has proven to provide responses faster when querying ORC files, rather than Parquet.
One of the most well-known features of Scala is the fact that it doesn’t ensure compatibility between major versions. That means that, for example, a Scala code build using Scala 2.11 cannot be run on Scala 2.12 and vice versa. That is why it is necessary to use the libraries built with the proper version of Scala corresponding to the version used in the project.
When it comes to lib version per Scala version availability, library versions are quite often built using two or three versions of Scala. For example, pure-config v0.10 is available for Scala 2.11 and 2.12, while v0.11 is available for 2.12 and 2.13. That makes migration between those respective versions relatively easy. At the same time, that is the reason why the first step in the migration process should be to upgrade libraries to a version available for both versions of Scala – both the current and new ones.
This process went smoothly for all our libraries apart from GeoSpark, which does not have a valid version for Scala 2.12 (or higher). It was acquired by the Apache Foundation incubator and rebranded into Apache Sedona. The general concept and most of the functionalities were not changed, however, there were some changes made in the codebase involving packaging and class renaming. Such circumstances forced us to switch the libraries used and adjust the code within this project.
How was it resolved?
Once the dependencies were adjusted for both versions (Scala 2.11 and 2.12), we could start thinking about implementing cross-compilation; however, there was one more obstacle ahead: the build code was neither modularised nor generic. It was impossible to provide a cross-compilation mechanism while versions or dependency paths were fixed inside the different modules. As a first step towards modularisation, all fixed dependencies were moved to a class named dependencies.kt. At the same time, helper functions were implemented for building the Gradle dependency path, consisting of the library package, name, major Scala version and library version.
With the project modularised and the libraries adjusted to both versions of Scala, we could focus on cross-compilation. Cross-compilation can be easily achieved by using a Scala build tool. Gradle, on the other hand, does not support cross-builds, but there are at least a few ways to implement this capability. Among them are: using an existing plugin, creating a custom-build task that executes builds for different versions of Scala, or passing a version as a parameter to the build code.
We chose the latter option – passing a version as a parameter. Such a mechanism is simple and can be implemented relatively easily once a project is properly structured. Yet another argument in favour of parametrised cross-compilation is the fact that there are drawbacks to using plugins or employing separate tasks. Among them are concurrency issues (in the case of creating custom build tasks) which might have occurred. These could have been solved, obviously, but it would have required additional time, which was an extremely valuable resource in the project.
In order to cross-compile the project by passing a version in a parameter, only two additional changes were made in the source code (not counting code modularisation and libs adjustment). The first one was handling a version parameter in order to use it in the build process, the second was to make the target directory name dependent on the version we wanted to build.
The last job in implementing the cross-build logic was to adjust the CI/CD scripts used in the project. Since the details of writing bash and Jenkins CI/CD scripts are project dependent, we will not go into detail in this article. It’s worth mentioning, however, that requirements related to BC were taken into account during implementation and ensured in the logic.
Production deployment & testing process
“There are two kinds of people: those who backup and those who will have started.”
Once the code was written and the build process & unit tests were complete, we could deploy the new version of the application. Deployment was carried out in modules, one by one. We started with simpler and less important jobs, letting us check if the new version of the application operated properly without the risk of the most vital jobs crashing.
Before submitting Spark 3.0.2 jobs, all the data related to the given module/domain was backed up. It gave us an option to recover the data in case of data pollution or any unexpected events. The next step was to run a job with the old version and save the data to a table called df_old. Both jobs were scheduled by Oozie and ran using the same input data. The output from jobs should be identical regardless of the Spark version used. The output was saved to a table called df_new. Then the Spark function — except — was executed twice (see Fig 1.). The prerequisite for accepting the output of a job run with Spark 3.0.2 was an empty dataframe received as a result of both the except calls.
Once the output provided by spark 3.0.2 jobs were consistent with the one achieved with legacy spark, the new version was scheduled on the environment. The process was executed on two environments, starting in a non-production environment, then, once that succeeded, a similar process was performed in the production.
The output data frames were stored in ORC format (the motivation behind changing the data format was described in the “research conclusions” section). The next step of the migration process, once the job output was proven to be valid and legitimate, was to migrate the historical data (backed up earlier) from Parquet to ORC. Since this process was repeatable, a migration script was prepared and applied to perform the task. Then, once the historical data was migrated and its correctness was checked, we monitored the running jobs for 30 days to make sure we didn’t introduce any hidden issues. Finally, we could remove the backups from HDFS and the process of migration of the given job could be considered complete.
Now that we’ve described the whole process, let’s summarise the migration and evaluate the results. The main factor that proves that the migration was successful is the platform itself, which runs on a new Spark version (3.0.2). All the jobs were migrated successfully, and only three of them required meaningful changes in the codebase (the case of the GeoSpark library replaced by Apache Sedona). Moreover, the platform stayed up and running throughout the migration, so both the analytics and the development teams were able to deliver value through their day-to-day work.
The data stayed consistent, and the usage of the new data format did not require any significant changes in either codebase or configuration. The modularisation of the build source code was achieved as an additional benefit, which will facilitate future changes in the project. What is more, the potential upgrade of either the Scala version or library version should be much more straightforward.
The motivation behind this article was to present a detailed case study of successful migration in hopes that it provides insight into how such a process might look. At the same time, the methodology applied to the case featured in this article is universal enough to be successfully applied to other cases (for example, to the development of a new microservice or to architectural changes within the project).
As I conclude this post, I would like to mention a few people who helped to make the article more consistent and added value with their suggestions and feedback. Big kudos to Tomasz Radwan for the domain knowledge he shared and for his remarks about the article’s composition; to Mariusz Karwat for his advice and valued discussions about the infrastructural aspects of the project; and to Kamil Szeląg for his significant effort and assistance in the deployment and testing processes.
Keep learning and good luck!