The post Using In-Memory Data Grids for ETL on Streaming Data appeared first on ScaleOut Software.
]]>A key challenge for any data warehouse is to supply data to it in a format that can be readily ingested and analyzed, and this is the role of the well-known process called “extract-transform-load” (ETL). In the case of Hadoop, this usually means extracting data from external sources and transforming them into a form that can be stored in HDFS for use by MapReduce applications. When incoming data arrives as collections of files, it’s a straightforward matter either to just copy them into HDFS or to periodically run a batch MapReduce application which reads in the files, transforms the data as needed, and outputs it to HDFS.
Consider a company that sends end-of-day reports from its field offices to the data warehouse for aggregate analysis. The data warehouse can start up a MapReduce application after the last report has been uploaded to read from an external file system, reorganize it, and then output the results to HDFS. For example, this application might use the keys output from the mappers to join data for various fields (such as, revenue, volume, etc.) across all offices so that the reducers can output this data to HDFS by field instead of by office.
Implementing ETL using MapReduce offers several advantages. It makes use of the data warehouse’s parallel infrastructure to quickly process the data on a cluster of servers. It also leverages the development team’s skill sets in developing MapReduce applications to minimize overall cost. Lastly, it avoids the need to deploy a variety of technologies, which creates unnecessary complexity and headaches for system administrators.
Running ETL using a batch MapReduce job works fine for static data, such as file-based, end-of-day reports. But what about streaming data that continuously flows into the data warehouse? For example, consider an e-commerce website that accepts orders which flow to the data warehouse for analysis to identify patterns and issues. The website generates a continuous stream of orders which must be stored as HDFS files by an ETL processing step, as illustrated by the following diagram:
The simplest possible approach to this problem is to store the incoming orders as individual files in HDFS. Of course, this does not allow for any data translation prior to saving the files in the data warehouse. Also, this creates many file I/O operations both when loading HDFS and later when reading large numbers of small files during each analysis.
A better solution would be to run a MapReduce application that reads the input stream and outputs to HDFS. This enables the translation step to reorganize and consolidate the data as necessary and to efficiently output it to HDFS. By using standard MapReduce instead of another stream processing platform, such as Spark or Storm, the skill sets already employed for the data warehouse can be used instead of requiring a different software stack to perform ETL.
However, the data warehouse’s batch-oriented MapReduce execution environment incurs high scheduling latency (typically 15 seconds or longer) that makes it unsuitable for processing an incoming data stream. Furthermore, this MapReduce application would need to run continuously, tying up resources that were intended for data analysis, not ongoing ETL.
The streaming ETL challenge can be met by deploying an in-memory data grid with an integrated MapReduce engine, such as ScaleOut hServer, to capture the data stream in real time, perform ETL, and offload the data warehouse. Let’s take a look at how this works.
IMDGs host data in memory and distribute it across a cluster of commodity servers. Using an object-oriented data storage model, they provide APIs for storing, accessing, and updating data objects in well under a millisecond (depending on the size of the object). This enables operational systems to use IMDGs for storing fast-changing, “live” data, such as the data warehouse’s incoming order stream.
An IMDG provides an ideal repository for the data stream, buffering orders as objects within the grid and running the ETL application using built-in MapReduce (more on that below). The IMDG matches the arrival rate of the incoming data stream by adding servers as needed to its cluster, ensuring that both storage capacity and update throughput scale linearly while keeping update times fast. Also, the IMDG maintains high availability using data replication so that if a server fails, the IMDG can continue to handle update requests without delay.
Because IMDGs store data in memory distributed across a cluster of servers, they can easily perform data-parallel computations on stored data, such as the ETL function needed by the data warehouse; they simply make use of the cluster’s processing power to analyze data “in place,” that is, without the need to migrate it to other servers. This enables IMDGs to complete ETL fast (possibly in less than a second) with minimal overhead.
Some IMDGs, such as ScaleOut hServer, can execute standard Hadoop MapReduce applications (i.e., applications which are fully code-compatible with Apache Hadoop), allowing these applications to access in-memory data from the grid and output to HDFS. This enables the ETL function to be deployed as a conventional MapReduce application within the IMDG. The application extracts orders from the grid’s memory, transforms them as required for storage in the data warehouse, and then outputs them to HDFS using standard MapReduce techniques, as illustrated in the following diagram:
The use of an IMDG offloads the data warehouse, allowing the MapReduce application performing ETL to run continuously. It also dramatically reduces the latency required to start up each iteration from 15+ seconds to a few milliseconds. Buffering orders in memory while simultaneously migrating them to HDFS ensures that ETL processing seamlessly keeps up with the incoming data stream.
To show how continuous processing can be achieved, the following diagram depicts the use of a “double buffering” strategy to perform ETL processing. IMDGs organize collections of objects within name spaces that can be identified and used as input to a MapReduce application. In this case, while incoming orders are added to one name space, which serves as an input buffer, the MapReduce application extracts orders from a second name space that was previously filled; it then organizes them into an appropriate format and outputs the data to HDFS. Upon completion, the extracted orders are cleared from the associated name space, the name spaces are switched, and the MapReduce application is restarted on the other name space:
This technique uses the memory of the IMDG to allow orders to flow smoothly into the IMDG while processing by the MapReduce ETL application is ongoing. It requires that sufficient memory be available in the IMDG to buffer incoming order objects during the processing time of the application. Because the IMDG can scale memory capacity by adding servers and because the IMDG fast start-up and data-parallel execution minimize the ETL application’s processing time, continuous processing of incoming orders is ensured.
Hadoop’s powerful analytics capabilities are rapidly making it the centerpiece of next-generation data warehouses. The ability of IMDGs to implement ETL for streaming data enables them to serve as a vital component of these infrastructures. IMDGs which can run MapReduce applications provide the threefold benefits of meeting the low latency requirements for ingesting streaming data, offloading the data warehouse’s execution environment, and leveraging existing Hadoop skills. ETL on streaming data is yet another example of real-time analytics and a prime application for IMDGs.
Perhaps most exciting is that hosting ETL in an IMDG’s real-time analytics engine opens the door to analyzing the order stream (or a clickstream) in real time and generating instant feedback for web users. Over time, the ETL function can evolve to perform real-time analysis, provide guidance, and thereby drive incremental sales. The IMDG’s analytics engine forms a bridge from the data warehouse to customers, helping push the benefits of data analytics to the point of sale where it can have maximum impact.
The post Using In-Memory Data Grids for ETL on Streaming Data appeared first on ScaleOut Software.
]]>The post IMDGs: Next Generation Parallel Supercomputers appeared first on ScaleOut Software.
]]>Back in the 1980s, IBM, Intel, and nCube (among others) began commercializing parallel computing (“multicomputing”) technology pioneered by professors Charles Seitz and Geoffrey Fox at Caltech. They recognized that commodity servers could be clustered using a high speed network to run parallel programs which deliver highly scalable performance well beyond the power of shared memory multiprocessing servers. With the development of message passing libraries, these multicomputers were programmed using C and Fortran to implement parallel applications in matrix algebra, structural mechanics, fluid dynamics, distributed simulation, and many other areas.
While this multicomputing architecture had the potential to deliver very high scalability, it introduced several challenges. Chief among them was hiding network overhead and latency which could easily dominate processing time and impede scalability. Hardware architects developed novel high speed networks, such as Bill Dally’s pipelined torus and mesh routers, to minimize message passing latency. (Standard 10 Mbps Ethernet LANs of the 1980s were quickly determined to be too slow for use in multicomputers.)
However, to really deliver scalable performance, Cleve Moler (the creator of Matlab, then working at Intel)– and, independently, John Gustafson at Sandia Labs – recognized that scaling the size of an application (e.g., the size of a matrix being multiplied) as more servers are added to the cluster helps mask networking overhead and enable linear growth in performance; this is called Gustafson’s Law. At first glance, this insight might seem counter-intuitive since one expects that adding computing power will speed up processing for a fixed size application. (See Amdahl’s Law.) But adding servers to a computing cluster to handle larger problem sizes actually is very natural: for example, think about adding web servers to a farm as a site’s incoming web load grows.
The daunting complexity inherent in the creation of parallel programs with message passing posed another big obstacle for multicomputers. It became clear that just adding message passing APIs to “dusty deck” applications could easily lead to frustrating and inscrutable deadlocks. Developers realized that higher level design patterns were needed; two that emerged were the “task parallel” and “data parallel” approaches. Data-parallel programming is by far the simpler of the two, since the developer need not write application-specific synchronization code, which can be complex and error prone. Instead, the multicomputer executes a single, sequential method on a collection of data that has been distributed across the servers in the cluster. This code automatically runs in parallel across all servers to deliver scalable performance. (Of course, message passing may be needed between execution steps to exchange data between parts of the application.)
For example, consider a climate simulation model such as NCAR’s Community Climate Model. Climate models typically partition the atmosphere, land, and oceans into a grid of boxes and model each box independently using a sequential code. They repeatedly simulate each box’s behavior and exchange data between boxes at every time step in the simulation. Using a multicomputer, the boxes all can be held in memory and distributed across the servers in the cluster, thereby avoiding disk I/O which impedes performance. The cluster can be scaled to hold larger models with more boxes to improve resolution and generate more accurate results. The multicomputer provides scalable performance, and it runs data-parallel applications to help keep development as simple as possible.
So what does all this have to do with in-memory data grids? IMDGs make use of the same parallel computing architecture as multicomputers. They host service processes on a clustered set of servers to hold application data which they spread across the servers. This data is stored as one or more collections of serialized objects, such as instances of Java, C#, or C++ objects, and accessed using simple create/read/update/delete (“CRUD”) APIs. As the data set grows in size, more servers can be added to the cluster to ensure that all data is held in memory and access throughput grows linearly.
By doing all of this, IMDGs keep access times constant, which is exactly the characteristic needed by applications which have to handle growing workloads. For example, consider a website holding shopping carts in an IMDG. As more and more customers are attracted to the site, web servers must be added to handle increasing traffic. Likewise, IMDG servers must be added to hold more shopping carts, scale access throughput, and keep response times low. In a real sense, the IMDG serves as a parallel supercomputer for hosting application data, delivering the same benefits as it does for climate models and other scientific applications.
However, the IMDG’s relationship to parallel supercomputers runs deeper than this. Some IMDGs can host data-parallel applications to update and analyze data stored on the grid’s servers. For example, ScaleOut Analytics Server uses its “parallel method invocation” (PMI) APIs to run Java, C#, or C++ methods on a collection of objects specified by a parallel query. It also uses this mechanism to execute Hadoop MapReduce applications with very low latency. In this way, the IMDG serves as a parallel supercomputer by directly running data-parallel applications. These applications can implement real-time analytics on live data, such as analyzing the effect of market fluctuations on a hedge fund’s financial holdings (more on that in an upcoming blog).
IMDGs bring parallel supercomputing to the next generation in significant ways. Unlike multicomputers, they can be deployed on cloud infrastructures to take full advantage of the cloud’s elasticity. They host an object-oriented data storage model with property-based query that integrates seamlessly into the business logic of object-oriented applications. IMDGs automatically load balance stored data across all grid servers, ensuring scalable speedup and relieving the developer of this burden. They provide built-in high availability to ensure that both data and the results of a parallel computation are not lost if a server or network component fails. Lastly, they can ship code from the developer’s workstation to the grid’s servers and automatically stage the execution environment (e.g., a JVM or .NET runtime on every grid server) to simplify deployment.
Although they share a common heritage, IMDGs are not your parent’s parallel supercomputer. They represent the next generation in parallel computing: easily deployable in the cloud, object-oriented, elastic, highly available, and powerful enough to run data-parallel applications and deliver real-time results.
The post IMDGs: Next Generation Parallel Supercomputers appeared first on ScaleOut Software.
]]>