The post Reports of Scale-Out’s Demise Are Greatly Exaggerated appeared first on ScaleOut Software.
]]>Scaling up is the process of migrating to an increasingly more powerful single server to process a workload faster or to handle a growing workload that fits within the server. Usually this means moving to a server with more CPU cores, greater memory capacity, and higher-end networking and storage options.
At some point, scaling up becomes costly, and workloads grow beyond what even a high end server can handle. To overcome this limitation, scaling out distributes a workload across a cluster of servers working together. Now CPU, memory, and storage resources can grow without predefined limits. Unless network bandwidth also grows proportionally to the number of servers, it usually becomes the limiter in scaling. (Supercomputing systems use scalable networks such as torus interconnects; commodity clusters typically use Ethernet switches with fixed bandwidth.)
There are several reasons why scaling out will continue to serve an important role, and to be sure, it’s more than the size of the workload that matters. First, scaling out enables computing capacity to be deployed incrementally and economically as the size of the workload increases using clusters of relatively small servers (instead of investing in a single, expensive multicore server to handle the highest anticipated workload). Second, scale-out’s ability to provide high availability will always be important to mission-critical applications, even if the problem size fits within one server. Third, technology changes so fast that today’s pricey, top of the line server will quickly become tomorrow’s dusty objet d’art as it awaits recycling. (Cloud computing just changes that to a per-hour calculation – top-of-the-line servers may not be cost-effective.)
Moreover, in our experience, many analytics applications host data sets much larger than the 100 GB size Microsoft cites as a typical upper bound, and new applications drive this trend by taking advantage of newly available memory. (We often host data sets in the terabytes in our in-memory data grid.) These data sets require a cluster to keep in memory. For example, a recent data set used in an e-commerce analytics application held 40 million objects totaling about 2 TB of data including replicas. This data set could not be stored in one server on Amazon EC2, even using their largest instance type; it required a cluster of servers to hold the entire data set in memory. Also, it’s often not advisable to store such large data sets in the smallest possible cluster of servers; there are benefits to using a larger cluster of small servers (see below).
That said, it’s clear that scale-out infrastructures, including middleware execution platforms like Hadoop, need to evolve to make full use of large memory capacity and many cores available within each server. As Hadoop gained popularity over the last few years, software architects have been focused on efficiently scaling out with minimum overhead, and the Microsoft paper reminds us to rethink our scale-up algorithms and extract maximum value out of new technology as it enters the mainstream. For example, fully using available cores with multi-threading and minimizing latency for inter-process data transfers with memory-mapped files can squeeze more performance out of modern servers.
Once we accept that scale-out is an integral element of mission-critical deployments, it’s finding the right balance between memory, CPU, and network bandwidth that matters in driving overall performance. One or more of these resources tends to lag in performance at any point as technology’s evolves, and software design has to compensate for that. For example, right now network bandwidth in commodity networks tends to be the laggard, making it costly (in time) to ship the 100s of gigabytes a server can now hold. (For example, a 10 Gbps network requires 80 seconds at maximum bandwidth to send 100 gigabytes between servers; scatter/gather of many objects greatly increases that time.) As the amount of data stored in each server grows, load-balancing between servers takes longer, and the delays eventually can impact availability. Scaling out can help this by distributing the data set across more servers, thereby reducing network delays in rebalancing workloads after a server is added or removed.
In sum, it’s not “either-or” — scale-out is here to stay. However, it’s absolutely true that maintaining support for the latest memory and processor technology is crucial to reaping the benefits of scale-up, integrating it with scale-out, and thereby maximizing performance, availability, and cost-effectiveness.
The post Reports of Scale-Out’s Demise Are Greatly Exaggerated appeared first on ScaleOut Software.
]]>The post Using In-Memory Data Grids for ETL on Streaming Data appeared first on ScaleOut Software.
]]>A key challenge for any data warehouse is to supply data to it in a format that can be readily ingested and analyzed, and this is the role of the well-known process called “extract-transform-load” (ETL). In the case of Hadoop, this usually means extracting data from external sources and transforming them into a form that can be stored in HDFS for use by MapReduce applications. When incoming data arrives as collections of files, it’s a straightforward matter either to just copy them into HDFS or to periodically run a batch MapReduce application which reads in the files, transforms the data as needed, and outputs it to HDFS.
Consider a company that sends end-of-day reports from its field offices to the data warehouse for aggregate analysis. The data warehouse can start up a MapReduce application after the last report has been uploaded to read from an external file system, reorganize it, and then output the results to HDFS. For example, this application might use the keys output from the mappers to join data for various fields (such as, revenue, volume, etc.) across all offices so that the reducers can output this data to HDFS by field instead of by office.
Implementing ETL using MapReduce offers several advantages. It makes use of the data warehouse’s parallel infrastructure to quickly process the data on a cluster of servers. It also leverages the development team’s skill sets in developing MapReduce applications to minimize overall cost. Lastly, it avoids the need to deploy a variety of technologies, which creates unnecessary complexity and headaches for system administrators.
Running ETL using a batch MapReduce job works fine for static data, such as file-based, end-of-day reports. But what about streaming data that continuously flows into the data warehouse? For example, consider an e-commerce website that accepts orders which flow to the data warehouse for analysis to identify patterns and issues. The website generates a continuous stream of orders which must be stored as HDFS files by an ETL processing step, as illustrated by the following diagram:
The simplest possible approach to this problem is to store the incoming orders as individual files in HDFS. Of course, this does not allow for any data translation prior to saving the files in the data warehouse. Also, this creates many file I/O operations both when loading HDFS and later when reading large numbers of small files during each analysis.
A better solution would be to run a MapReduce application that reads the input stream and outputs to HDFS. This enables the translation step to reorganize and consolidate the data as necessary and to efficiently output it to HDFS. By using standard MapReduce instead of another stream processing platform, such as Spark or Storm, the skill sets already employed for the data warehouse can be used instead of requiring a different software stack to perform ETL.
However, the data warehouse’s batch-oriented MapReduce execution environment incurs high scheduling latency (typically 15 seconds or longer) that makes it unsuitable for processing an incoming data stream. Furthermore, this MapReduce application would need to run continuously, tying up resources that were intended for data analysis, not ongoing ETL.
The streaming ETL challenge can be met by deploying an in-memory data grid with an integrated MapReduce engine, such as ScaleOut hServer, to capture the data stream in real time, perform ETL, and offload the data warehouse. Let’s take a look at how this works.
IMDGs host data in memory and distribute it across a cluster of commodity servers. Using an object-oriented data storage model, they provide APIs for storing, accessing, and updating data objects in well under a millisecond (depending on the size of the object). This enables operational systems to use IMDGs for storing fast-changing, “live” data, such as the data warehouse’s incoming order stream.
An IMDG provides an ideal repository for the data stream, buffering orders as objects within the grid and running the ETL application using built-in MapReduce (more on that below). The IMDG matches the arrival rate of the incoming data stream by adding servers as needed to its cluster, ensuring that both storage capacity and update throughput scale linearly while keeping update times fast. Also, the IMDG maintains high availability using data replication so that if a server fails, the IMDG can continue to handle update requests without delay.
Because IMDGs store data in memory distributed across a cluster of servers, they can easily perform data-parallel computations on stored data, such as the ETL function needed by the data warehouse; they simply make use of the cluster’s processing power to analyze data “in place,” that is, without the need to migrate it to other servers. This enables IMDGs to complete ETL fast (possibly in less than a second) with minimal overhead.
Some IMDGs, such as ScaleOut hServer, can execute standard Hadoop MapReduce applications (i.e., applications which are fully code-compatible with Apache Hadoop), allowing these applications to access in-memory data from the grid and output to HDFS. This enables the ETL function to be deployed as a conventional MapReduce application within the IMDG. The application extracts orders from the grid’s memory, transforms them as required for storage in the data warehouse, and then outputs them to HDFS using standard MapReduce techniques, as illustrated in the following diagram:
The use of an IMDG offloads the data warehouse, allowing the MapReduce application performing ETL to run continuously. It also dramatically reduces the latency required to start up each iteration from 15+ seconds to a few milliseconds. Buffering orders in memory while simultaneously migrating them to HDFS ensures that ETL processing seamlessly keeps up with the incoming data stream.
To show how continuous processing can be achieved, the following diagram depicts the use of a “double buffering” strategy to perform ETL processing. IMDGs organize collections of objects within name spaces that can be identified and used as input to a MapReduce application. In this case, while incoming orders are added to one name space, which serves as an input buffer, the MapReduce application extracts orders from a second name space that was previously filled; it then organizes them into an appropriate format and outputs the data to HDFS. Upon completion, the extracted orders are cleared from the associated name space, the name spaces are switched, and the MapReduce application is restarted on the other name space:
This technique uses the memory of the IMDG to allow orders to flow smoothly into the IMDG while processing by the MapReduce ETL application is ongoing. It requires that sufficient memory be available in the IMDG to buffer incoming order objects during the processing time of the application. Because the IMDG can scale memory capacity by adding servers and because the IMDG fast start-up and data-parallel execution minimize the ETL application’s processing time, continuous processing of incoming orders is ensured.
Hadoop’s powerful analytics capabilities are rapidly making it the centerpiece of next-generation data warehouses. The ability of IMDGs to implement ETL for streaming data enables them to serve as a vital component of these infrastructures. IMDGs which can run MapReduce applications provide the threefold benefits of meeting the low latency requirements for ingesting streaming data, offloading the data warehouse’s execution environment, and leveraging existing Hadoop skills. ETL on streaming data is yet another example of real-time analytics and a prime application for IMDGs.
Perhaps most exciting is that hosting ETL in an IMDG’s real-time analytics engine opens the door to analyzing the order stream (or a clickstream) in real time and generating instant feedback for web users. Over time, the ETL function can evolve to perform real-time analysis, provide guidance, and thereby drive incremental sales. The IMDG’s analytics engine forms a bridge from the data warehouse to customers, helping push the benefits of data analytics to the point of sale where it can have maximum impact.
The post Using In-Memory Data Grids for ETL on Streaming Data appeared first on ScaleOut Software.
]]>The post How Do In-Memory Data Grids Differ from Storm? appeared first on ScaleOut Software.
]]>(The following description of in-memory data grids (IMDGs) is excerpted from last week’s blog post; see that post for more details.)
IMDGs host data in memory and distribute it across a cluster of commodity servers. Using an object-oriented data storage model, they provide APIs for updating data objects typically in well under a millisecond (depending on the size of the object). This enables operational systems to use IMDGs for storing, accessing, and updating fast-changing, “live” data, while maintaining fast access times even as the storage workload grows.
Data storage needs can easily grow as more users store data within an IMDG. IMDGs accommodate this growth by adding servers to the cluster and automatically rebalancing stored data across the servers. This ensures that both capacity and throughput scale linearly with growth in the workload, and access and update times remain low regardless of the workload’s size. Moreover, IMDGs maintain stored data with high availability using data replication so that if a server fails, operational systems can continuously handle access requests and update requests without delay.
IMDGs Perform Data-Parallel Computation
Because IMDGs store data in memory distributed across a cluster of servers, they easily can perform data-parallel computations on stored data; they simply make use of the cluster’s processing power to analyze data “in place,” that is, without the need to migrate it to other servers. This enables IMDGs to provide fast results (often in milliseconds) with minimal overhead.
The following diagram of the architecture used by ScaleOut Analytics Server and ScaleOut hServer shows a stream of incoming changes which are applied to the grid’s memory-based data store using API updates. The real-time analytics engine performs data parallel computation on stored data, combines the results across the cluster, and outputs a combined stream of alerts to the operational system.
A significant aspect of the IMDG’s architecture for data analytics is that it performs computations on data hosted in memory – not specifically on an incoming data stream. This memory-based storage is continuously updated by an incoming data stream, so the computation has access to the latest changes to the data. However, the computation also has access to the history of changes manifested by the current state of data stored in the grid. This gives the computation a rich data set for analysis that includes both the incoming data stream and the application’s persistent state.
Storm originally was developed by Nathan Marz at Backtype to overcome the limitations of Hadoop in analyzing streams of incoming data, such as Twitter streams and web log files. Its goal was to provide real-time, continuous computation that is both scalable and fault tolerant. Described both as stream processing and event processing, its computation model incorporates a combination of task parallelism and pipelining. The developer describes two basic entities: “spouts,” which generate streams of data in the form of ordered tuples, and “bolts,” which process incoming streams and optionally generate outgoing streams for other bolts. Spouts and bolts are organized into an acyclic, directed graph to create an executable configuration. (See this slide deck, among many available, for a more detailed explanation.)
The following diagram illustrates a Storm configuration of streams and bolts processing a set of input streams and generating a set of output streams. The green circles represent tuples within an input stream, and the blue boxes represent bolts. Note that spouts which generate the input streams are not shown in the diagram. The orange circles represent an optional output data stream, which may be implemented by the bolts in an arbitrary manner (e.g., as API calls to an external agent instead of as a stream of tuples).
Application developers specify several aspects of the configuration, such as the number of tasks that can be spawned to execute each bolt, and the manner in which an incoming stream’s tuples are distributed across these tasks. Various groupings implement characteristics that correspond to behaviors found in Hadoop MapReduce. For example, the shuffle grouping implements a random distribution of tuples to tasks akin to input to mappers, and the field grouping implements a key-based partitioning very close to that used as input to reducers. Other groupings also are available, such as “all,” which is equivalent to multicast.
Storm implements and executes a specified configuration using a hierarchy of nodes whose state and fault-tolerance are maintained by the open-source Zookeeper cluster manager. A master node (called Nimbus) manages a set of worker nodes (called Supervisors), which run tasks. Strategies are available to handle failures of each of these components and to ensure that stream tuples are reliably processed.
A major strength of Storm is its continuous execution model. Once a configuration has been deployed, incoming data streams can be processed without scheduling delays, thereby providing uninterrupted, real-time results. This overcomes a major drawback of Hadoop MapReduce, which processes data in batch jobs with significant latency (often 15+ seconds) in starting up each job.
IMDGs approximate Storm’s continuous execution model in two ways. First they allow continuous, overlapped updates to in-memory state, enabling them to handle high arrival rates of incoming data (e.g., 1000s of updates per second for each IMDG server in a cluster). Both IMDGs and Storm scale out to increase throughput. Second, some IMDGs allow data-parallel operations to be performed continuously with very low startup delay (typically a few milliseconds). This allows IMDGs to output a stream of analysis results that matches the low latency required by operational systems. (Unlike Storm, IMDGs such as ScaleOut hServer also precisely match Hadoop’s MapReduce semantics, which require that reducers be able to process all key-value pairs emitted by the mappers in a given computation.)
Storm’s data model describes a set of tuple streams. Bolts analyze and filter these streams, creating new streams to hold their results. While bolts are unconstrained in their ability to access and update external stores, such as IMDGs or file-based NoSQL stores (e.g., Mongo DB or Cassandra), this is not a central aspect of their processing model. Put another way, Storm does not provide any particular semantics for managing stateful data.
In contrast, IMDGs are organized around a stateful data model implemented by an object-oriented, in-memory store which is both scalable and highly available. This store is intended to hold ongoing, business-logic state implemented by collections of objects representing fast-changing data used in operational environments. In previous blog posts, we have seen examples in e-commerce (e.g., session-state and shopping carts) and financial services (e.g., portfolios and stock histories). Incoming data streams update these entities, which hold information that persists and evolves over their lifetimes. Making these entities “first class” citizens in the computation model simplifies the design of business logic while enabling stream processing using a combination of object-oriented updates and data-parallel computation to both modify and analyze this state.
Where IMDGs and Storm really differ is in their approaches to managing the complexity of the computation model. Like Microsoft Dryad and other parallel execution platforms with task precedence graphs, Storm defines a computation using a directed graph of execution nodes, each of which has a variable number of tasks. While the modular nature of an execution pipeline has appeal, its complexity can quickly become daunting. One reason for this is that the configuration’s graph is represented by sequential code describing bolts and the streams to which they are connected. As the number of bolts and streams grows, it becomes increasingly difficult to visualize their relationships and grasp the application’s overall behavior.
Other parallel systems like Storm with task precedence graphs, such as messaging passing systems and actor models, have demonstrated substantial complexity over the last few decades. Also, the Storm application developer must specify the number of tasks executed by each bolt. As the number of bolts and streams increase, it becomes challenging for the developer to manage the graph, predict the dynamics of its execution, and tune for best performance.
A central reason that IMDGs employ a data-parallel computation model is its simplicity, both in exposition and execution. (Another key reason is that data-parallel computation minimizes data motion which limits scalability. Storm’s data motion between bolts may incur more network overhead than IMDGs and impact scalability, but we have not evaluated this.) Since their application code is inherently straightforward, data-parallel programs are relatively easy to understand, and they don’t need extensive tuning for high performance. Also, separating updates to business logic state from data-parallel analytics simplifies integration into operational systems.
IMDGs offer a platform for scalable, memory-based storage and data-parallel computation which was specifically designed for use in operational systems. Because it incorporates API support for accessing and updating individual data objects and data-parallel analytics, IMDGs are easily integrated into the business logic of these systems.
Storm was designed for a different purpose, namely to analyze streams of data using a continuously running execution pipeline. Its more complex computation model fits this purpose well, and, as a result, Storm embodies a different set of tradeoffs than IMDGs. Clearly, the term “real-time analytics” encompasses a variety of solutions designed to meet diverse business requirements.
The post How Do In-Memory Data Grids Differ from Storm? appeared first on ScaleOut Software.
]]>The post How Do In-Memory Data Grids Differ from Spark? appeared first on ScaleOut Software.
]]>IMDGs host data in memory and distribute it across a cluster of commodity servers. Using an object-oriented data storage model, they provide APIs for updating data objects typically in well under a millisecond (depending on the size of the object). This enables operational systems to use IMDGs for storing, accessing, and updating fast-changing data, while maintaining fast access times even as the storage workload grows. For example, an e-commerce website can store session state and shopping carts within an IMDG, and a financial services application can store stock portfolios. In both cases, stored data must be frequently updated and accessed.
Data storage needs can easily grow as more users store data within an IMDG. IMDGs accommodate this growth by adding servers to the cluster and automatically rebalancing stored data across the servers. This ensures that both capacity and throughput grow linearly with the size of the workload and that access and update times remain low regardless of the workload’s size.
Moreover, IMDGs maintain stored data with high availability using data replication. They typically create one or more replicas of each data object on different servers so that they can continue to access all stored data even after a server (or network component) fails; they do not have to pause to recreate data after a failure. IMDGs also self-heal to automatically create new replicas during recovery. All of this is critically important to operational systems which must continuously handle access and update requests without delay.
IMDGs Add Data-Parallel Computation for Analytics
Because IMDGs store data in memory distributed across a cluster of servers, they easily can perform data-parallel computations on stored data; they simply make use of the cluster’s processing power to analyze data “in place,” that is, without the need to migrate it to other servers. This enables IMDGs to provide fast results with minimum overhead. For example, a recent demonstration of ScaleOut hServer running a MapReduce calculation for a financial services application generated analysis results in about 330 milliseconds compared to 15+ seconds for Apache Hadoop.
A significant aspect of the IMDG’s architecture for data analytics is that it performs its computations on data hosted in memory – not on an incoming data stream. This memory-based storage is continuously updated by an incoming data stream, so the computation has access to the latest changes to the data. However, the computation also has access to the history of changes as manifested by the state of the data stored in the grid. This gives the computation a much richer data set for performing an analysis than it would have if it could only see the incoming data stream. We call it “stateful” real-time analytics.
Take a look at the following diagram, which illustrates the architecture for ScaleOut Analytics Server and ScaleOut hServer. The diagram shows a stream of incoming changes which are applied to the grid’s memory-based data store using API updates. The real-time analytics engine performs data parallel computation on the stored data, combines the results across the cluster, and outputs a combined stream of alerts to the operational system.
The power of stateful analytics is that the computation can provide deeper insights than otherwise. For example, an e-commerce website can analyze not just browser actions but also interpret these actions in terms of a history of customer preferences and shopping history to offer feedback. Likewise, a financial services application can analyze market price fluctuations to determine trading strategies based on the trading histories for individual portfolios tuned after several trades and influenced by preferences.
The Berkeley Spark project has developed a data-parallel execution engine designed to accelerate Hadoop MapReduce calculations (and add related operators) by staging data in memory instead of by moving it from disk to memory and back for each operator. Using this technique and other optimizations, it has demonstrated impressive performance gains over Hadoop MapReduce. This project’s stated goal (quoting from a tutorial slide deck from U.C. Berkeley’s amplab is to “extend the MapReduce model to better support two common classes of analytics apps: iterative algorithms (machine learning, graphs) [and] interactive data mining [and] enhance programmability: integrate into Scala programming language.”
A key new mechanism that supports Spark’s programming model is the resilient distributed dataset (RDD) to “allow apps to keep working sets in memory for efficient reuse.” They are “immutable, partitioned collections of objects created through parallel transformations.” To support fault tolerance, “RDDs maintain lineage information that can be used to reconstruct lost partitions.”
You can see the key differences between using an IMDG hosting data-parallel computation and Spark to perform MapReduce and similar analyses. IMDGs analyze updatable, highly available, memory-based collections of objects, and this makes them ideal for operational environments in which data is being constantly updated even while analytics computations are ongoing. In contrast, Spark was designed to create, analyze, and transform immutable collections of data hosted in memory. This makes Spark ideal for optimizing the execution of a series of analytics operators.
The following diagram illustrates Spark’s use of memory-hosted RDDs to hold data accessed by its analytics engine:
However, Spark is not well suited to operational environments for two reasons. First, data cannot be updated. In fact, if Spark inputs data from HDFS, changes have to propagated to HDFS from another data source since HDFS files only can be appended, not updated. Second, RDDs are not highly available. Their fault-tolerance results from reconstructing them from their recorded lineage, which may take substantially more time to complete than server failover by an IMDG. This represents an appropriate tradeoff for Spark because, unlike IMDGs, it focuses on analytics computations on data that does not need to be constantly available.
Even though Spark makes different design tradeoffs than IMDGs to support fast analytics, IMDGs can still deliver comparable speedup over Hadoop. For example, we measured Apache Spark executing the well-known Hadoop “word count” benchmark on a 4-server cluster running 9.6X faster than CDH5 Hadoop MapReduce for a 10 GB dataset hosted in HDFS. On this same benchmark, ScaleOut hServer ran 14X faster than Hadoop when executing standard Java MapReduce code.
Spark Streaming extends Spark to handle streams of input data and was motivated by the need to “process large streams of live data and provide results in near-real-time” (quoting from the slide deck referenced above). It “run[s] a streaming computation as a series of very small, deterministic batch jobs” by chopping up an input stream into a sequence of RDDs which it feeds to Spark’s execution engine. “The processed results of the RDD operations are returned in batches.” Computations can create or update other RDDs in memory which hold information regarding the state or history of the stream.
The representation of input and output streams as RDDs can be illustrated as follows:
This model of computation overcomes Spark’s basic limitation of working only on immutable data. Spark Streaming offers stateful operators that enable incoming data to be combined with in-memory state. However, it employs a distinctly stream-oriented approach with parallel operators that does not match the typical, object-oriented usage model of asynchronous, individual updates to memory-based objects implemented by IMDGs for operational environments. It also uses Spark’s fault-tolerance which does not support high availability for individual objects.
For example, IMDGs apply incoming changes to individual objects within a stateful collection by using straightforward object updates, and they simultaneously run data-parallel operations on the collection as a whole to perform analytics. We theorize that when using Spark Streaming, the same computation would require that each collection of updates represented by an incoming RDD be applied to the appropriate subset of objects within another “stateful” RDD held in memory. This in turn would require that the two RDDs be aligned to perform a parallel operation, which could add complexity to the original algorithm, especially if updates need to be applied to more than one object in the stateful collection. Also, fault-tolerance might require checkpointing to disk since the collection’s lineage could grow lengthy over time.
IMDGs offer a platform for scalable, memory-based storage and data-parallel computation which was specifically designed for use in operational systems, such as the ones we looked at above. Because it incorporates API support for accessing and updating individual data objects with integrated high availability, IMDGs are easily integrated into the business logic of these systems. Although Spark and Spark Streaming, with their use of memory-based storage and accelerated MapReduce execution times, bear a resemblance to IMDGs such as ScaleOut hServer, they were not intended for use in operational systems and do not provide the feature set needed to make this feasible. We will take a look at how IMDGs differ from Storm and CEP in an upcoming blog.
The post How Do In-Memory Data Grids Differ from Spark? appeared first on ScaleOut Software.
]]>The post Transforming Retail with Real-Time Analytics appeared first on ScaleOut Software.
]]>Operational Systems Need In-Memory Data Grids
Operational systems typically manage fast-changing client data that constantly streams in for processing by business logic, which updates existing state information and initiates appropriate responses. Some responses provide feedback to clients and others commit changes to persistent storage. For example, an e-commerce system receives requests to view products from web browsers, displays requested products and offers, and sends requested information back to clients. It also receives orders from clients, which it commits to permanent storage, and then it sends out messages to other systems to process these orders.
In-memory data grids (IMDGs) have been used for several years within operational systems to ensure fast responses and to scale throughput as workloads grow. In-memory data grids enable the execution of business logic to scale out across a cluster of servers while holding fast-changing application state in memory accessible to all servers. Memory-based data storage helps minimize response times, and servers can add CPU capacity to handle incremental growth in the workload.
For example, an in-memory data grid can hold session state and shopping carts for an e-commerce web farm, enabling all web servers to quickly and seamlessly access this data as they handle incoming browser requests (which are distributed by an IP load-balancer to web servers):
The next step for operational systems is to add real-time analytics, and the easiest way to insert real-time analytics into an operational system is to integrate it with the system’s business logic using an IMDG. By adding real-time analytics to an in-memory data grid, it becomes instantly available to analyze fast-changing data flowing through the system and produce immediate results:
As we have explored in previous blogs, the key to fast response times for real-time analytics is data-parallel programming, that is, examining many data items in parallel using a single algorithm. This approach has two major strengths: (a) it enables the algorithm to be distributed across the grid’s cluster of servers for fast execution, and (b) it avoids moving data between servers for processing. The net result is that large, memory-based data sets can be quickly analyzed to generate timely responses.
Some IMDGs, such as ScaleOut Analytics Server, offer an integrated real-time analytics engine that automatically ships analytics code to all grid servers and then executes the code in parallel on a specified collection of data stored within the IMDG. This simplifies the task of embedding real-time analytics within an operational system and ensures high performance.
Real-time analytics also can be constructed using the Hadoop MapReduce programming model, which offers a very popular data-parallel design pattern. ScaleOut hServer hosts Hadoop MapReduce applications using its real-time analytics engine and eliminates the overheads of task scheduling and data motion usually associated with Hadoop, thereby opening the door to using MapReduce in operational systems.
Let’s look at how real-time analytics can be integrated into an e-commerce system. In addition to sending basic page requests to the system from clients browsing a website, the browser also can be instrumented to send detailed information about which products customers are examining and the time they are spending on each product. Combining all of this information, the system can build a history of site usage for each customer and collect a set of preferences for that customer. To support a large population of customers, customer information can be persisted in a database or NoSQL store and then brought into the IMDG when the customer starts browsing.
As illustrated in the following diagram, real-time analytics can continuously examine all active customers in parallel to identify special offers that are appropriate for the customer based on a combination of his/her preferences, shopping history, and current browsing behavior. By analyzing access patterns, the site also can determine if a customer is having difficulty finding products or services and suggest remedies. Inactive customers can be flagged and sent emails to remind them to complete purchases in their shopping carts. In addition, common patterns across customers can be identified and used to steer strategic decisions influenced by buying trends.
As e-commerce has gained increasing dominance with the shopping public, brick and mortar stores have responded by personalizing the shopping experience. High end retailers are now beginning to send real-time information from the point of sale to back-office servers for analysis in order to provide immediate feedback to sales staff. This enables the retailer to dramatically enhance the shopping experience.
For example, opt-in customers can identify themselves to sales staff on arrival so that their preferences and history can be used to help suggest products of interest. Products can be tracked with RFID tags to alert the sales staff when an active customer’s size is not present on the sales floor and must be retrieved from the stockroom (preferably before the customer requests it). These tags also can identify which products are being taken from the shelves or racks so that buying trends can be tracked. This also helps the store determine which products are repeatedly left in the changing rooms and not purchased, increasing the store’s buying power with the manufacturer. These are some of the many potential uses for real-time analytics in brick and mortar retail.
As the following diagram illustrates, IMDGs with integrated real-time analytics provide a fast and highly scalable platform for hosting customer information and analytics algorithms used by brick and mortar stores. Streams of information regarding customer activity and product motion can be fed to an IMDG to update in-memory state information for customers and products. Using data-parallel execution, analytics algorithms can continuously analyze this in-memory state and generate alerts for the sales staff which are delivered to point of sale terminals or tablets.
These examples show the power of real-time analytics to enhance operational systems which manage retail purchases, whether online or in brick and mortar stores. By hosting real-time analytics within an IMDG, these systems easily can host customer and product information which is repeatedly updated by streams of activity data. Unlike pure streaming systems, IMDGs can integrate these two types of information to provide a more complete picture of customer activity, leading to a deeper understanding of behavior, preferences, and customer needs. Lastly, IMDGs which host data-parallel analytics algorithms can deliver fast results, avoiding the batch processing overheads of conventional analytics systems, while ensuring scalable performance to handle growing workloads.
The post Transforming Retail with Real-Time Analytics appeared first on ScaleOut Software.
]]>The post What’s New in ScaleOut StateServer® Version 5.1 appeared first on ScaleOut Software.
]]>We introduced ScaleOut StateServer® almost exactly nine years ago and have worked continuously since then to add features requested by our customers and boost the product’s performance. Version 5.1 contains several exciting new capabilities, led by our introduction of C++ APIs. Our goal was to make these C++ APIs as easy to use as possible. So the first decision was to make them open source. This allows developers to build the APIs for a variety of compilers starting with GCC 4.4 (circa 2009) and newer. To strike a balance that allows support for the older compilers used on some enterprise-grade distributions of Linux, some newer C++11 features were not used, and the APIs use the widely-available Boost C++ libraries instead. (Releases of Boost going back to version 1.41 have been verified to work.) So, for example, rather than returning a std::shared_ptr to a retrieved object, the API returns a boost::shared_ptr. The C++ APIs are also available for Windows developers; we ship pre-built libraries for Visual Studio 2013 users in the release.
The next big challenge with the C++ APIs was how to handle data serialization, which is needed to store objects within an out-of-process, in-memory data grid (IMDG). We first introduced C# APIs in 2005, and then added Java APIs in 2008. Unlike C++, both of these languages have built-in serializers; ScaleOut StateServer uses these serializers by default to keep application development as simple for the user as possible. Looking at other IMDGs in the market, we did not want to go down the same path of requiring the use of serialization APIs provided by the IMDG vendor (us in this case). So we chose to offer integrated support for the popular Google Protocol Buffer encoding standard (with optional indexing of annotated fields to support parallel query) and also provide an extensible API mechanism that allows users to build custom serializers.
With version 5.1, we also extended support for data replication and remote access to IMDGs hosted in public clouds using our ScaleOut GeoServer® product. This product lets users connect a local IMDG to one or more remote IMDGs so that data can be replicated off-site in case of a site-wide failure; it also allows transparent access to data stored at remote sites using the IMDG’s APIs for local data access. With this release, remote IMDGs hosted in Amazon Web Services or Windows Azure can be accessed by ScaleOut GeoServer (and by client applications) with full support for secure connections using SSL.
The challenge with accessing cloud-based IMDGs is that it is clumsy to bootstrap connectivity using IP addresses, as is standard practice for on-premise grids, since these IP addresses are highly dynamic. To solve this problem, we created a simple mechanism (first introduced in version 5.0 for remote clients) which binds clients and remote IMDGs to a cloud-hosted IMDG using a simple combination of account credentials and a “store” name. We then retrieve cloud-based metadata to automatically identify and configure the current IP addresses and ports for the client or remote IMDG. The net effect is that configuring ScaleOut GeoServer to access a cloud-hosted IMDG is simple and secure.
With 5.1, we also rolled out the Windows version of our ScaleOut hServer® product, which lets developers create and run Hadoop MapReduce applications on grid-based data. This enables analysis of “live”, fast-changing data held within the IMDG, and it also delivers real-time results in milliseconds to a few seconds (instead of the minutes to hours required by standard, open source Hadoop distributions). Now users can run ScaleOut hServer on both Linux and Windows. We also added support for the Cloudera CDH4 Hadoop APIs to supplement support for the Apache Hadoop 1.X APIs.
Some of the most exciting enhancements in version 5.1 deal with the internal architecture of ScaleOut’s IMDG. Over the last nine years, we have watched advances in CPU, memory, and networking technology. Unfortunately, these advances occur at different times and put stress in varying parts of the IMDG’s architecture. Today’s IMDGs often are deployed on clusters of servers each with 32 GB memory or higher (instead of 2 GB, which was common in 2005) and 8 or more i7 or Xeon cores. However, network bandwidth has only jumped 10X to 1 Gbps from 100 Mbps since 2005, while 10 Gbps Ethernet and Infiniband await widespread adoption in clusters of commodity servers. The net effect is that IMDG applications can easily saturate a gigabit network as servers are added to the cluster, especially when large objects are stored.
To help address this, we have streamlined the IMDG’s internal transport protocol used for load-balancing to boost its effective throughput by as much as 5X. This allows load-balancing to complete much faster after a server is added or removed from the IMDG.
Another big technology change we have seen over the last nine years is the migration to virtualized environments; many if not most of our customer deployments are now hosted on virtual servers. Because it’s all too easy to overload the underlying physical servers with too many VMs, we often see intermittent network or processing delays caused by maxing out the CPU and NIC and sometimes by paging grid-hosted memory. These transient delays make it difficult to build a reliable heart-beating mechanism to recognize and recover from server or network outages (by looking for missing heartbeat messages between servers). Version 5.0 incorporated an adaptive heart-beating mechanism that responded to intermittent delays but could be spoofed by the unpredictable behavior of virtualized systems.
We now have fully revised this mechanism with new heuristics that more effectively identify and ignore these transient delays. ScaleOut StateServer measures the network for a full 24 hours before tightening its parameters for treating a heartbeat delay as a real outage, and it fully re-measures the network after a failure is detected. (Because it’s important to handle real outages quickly, allowed heartbeat delays must be kept as short as possible.) Our tests show that this approach minimizes service interruptions caused by erratic delays endemic to virtualized environments. However, it’s important to note that because of its heuristic nature, heart-beating can interpret communication delays as server failures.
We hope this tour of version 5.1 has helped illustrate our ongoing goals to maximize both ease of use and application performance, two core objectives of our IMDG and analytics technology. Please let us know your thoughts and comments.
The post What’s New in ScaleOut StateServer® Version 5.1 appeared first on ScaleOut Software.
]]>