The post Use Distributed Caching to Accelerate Online Web Sites appeared first on ScaleOut Software.
]]>In this time of extremely high online usage, web sites and services have quickly become overloaded, clogged trying to manage high volumes of fast-changing data. Most sites maintain a wide variety of this data, including information about logged-in users, e-commerce shopping carts, requested product specifications, or records of partially completed transactions. Maintaining rapidly changing data in back-end databases creates bottlenecks that impact responsiveness. In addition, repeatedly accessing back-end databases to serve up popular items, such as product descriptions and news stories, also adds to the bottleneck.
The Solution: Distributed Caching
The solution to this challenge is to use scalable, memory-based data storage for fast-changing data so that web sites can keep up with exploding workloads. A widely used technology called distributed caching meets this need by storing frequently accessed data in memory on a server farm instead of within a database. This speeds up accesses and updates while offloading back-end database servers. Also called in-memory data grids, distributed caches, such as ScaleOut StateServer®, use server farms to both scale storage capacity and accelerate access throughput, thereby maintaining fast data access at all times.
The following diagram illustrates how a distributed cache can store fast-changing data to accelerate online performance and offload a back-end database server:
The Technology Behind Distributed Caching
It’s not enough simply to lash together a set of servers hosting a collection of in-memory caches. To be reliable and easy to use, distributed caches need to incorporate technology that provides important attributes, including ease of integration, location transparency, transparent scaling, and high availability with strong consistency. Let’s take a look at some of these capabilities.
To make distributed caches easy to use and keep them fast, they typically employ a “NoSQL” key/value access model and store values as serialized objects. This enables web applications to store, retrieve, and update instances of their application-defined objects (for example, shopping carts) using a simple key, such as a user’s unique identifier. This object-oriented approach allows distributed caches to be viewed as more of an extension of an application’s in-memory data storage than as a separate storage tier.
That said, a web application needs to interact with a distributed cache as a unified whole. It’s just too difficult for the application to keep track of which server within a distributed cache holds a given data object. For this reason, distributed caches handle all the bookkeeping required to keep track of where objects are stored. Applications simply present a key to the distributed cache, and the cache’s client library finds the object, regardless of which server holds it.
It’s also the distributed cache’s responsibility to distribute access requests across its farm of servers and scale throughput as servers are added. Linear scaling keeps access times low as the workload increases. Distributed caches typically use partitioning techniques to accomplish this. ScaleOut StateServer further integrates the cache’s partitioning with its client libraries so that scaling is both transparent to applications and automatic. When a server is added, the cache quietly rebalances the workload across all caching servers and makes the client libraries aware of the changes.
To enable their use in mission-critical applications, distributed caches need to be highly available, that is, to ensure that both stored data and access to the distributed cache can survive the failure of one of the servers. To accomplish this, distributed caches typically store each object on two (or more) servers. If a server fails, the cache detects this, removes the server from the farm, and then restores the redundancy of data storage in case another failure occurs.
When there are multiple copies of an object stored on different servers, it’s important to keep them consistent. Otherwise, stale data due to a missed update could inadvertently be returned to an application after a server fails. Unlike some distributed caches which use a simpler, “eventual” consistency model prone to this problem, ScaleOut StateServer uses a patented, quorum-based technique which ensures that all stored data is fully consistent.
There’s More: Parallel Query and Computing
Because a distributed cache stores memory-based objects on a farm of servers, it can harness the CPU power of the server farm to analyze stored data much faster than would be possible on a single server. For example, instead of just accessing individual objects using keys, it can query the servers in parallel to find all objects with specified properties. With ScaleOut StateServer, applications can use standard query mechanisms, such as Microsoft LINQ, to create parallel queries executed by the distributed cache.
Although they are powerful, parallel queries can overload both a requesting client and the network by returning a large number of query results. In many cases, it makes more sense to let the distributed cache perform the client’s work within the cache itself. ScaleOut StateServer provides an API called Parallel Method Invocation (and also a variant of .NET’s Parallel.ForEach called Distributed ForEach) which lets a client application ship code to the cache that processes the results of a parallel query and then returns merged results back to the client. Moving code to where the data lives accelerates processing while minimizing network usage.
Distributed Caches Can Help Now
Online web sites and services are now more vital than ever to keeping our daily activities moving forward. Since almost all large web sites use server farms to handle growing workloads, it’s no surprise that server farms can also offer a powerful and cost-effective hardware platform for hosting a site’s fast-changing data. Distributed caches harness the power of server farms to handle this important task and remove database bottlenecks. Also, with integrated parallel query and computing, distributed caches can now do much more to offload a site’s workload. This might be a good time to take a fresh look at the power of distributed caching.
The post Use Distributed Caching to Accelerate Online Web Sites appeared first on ScaleOut Software.
]]>The post Real-Time Digital Twins: A New Approach to Streaming Analytics appeared first on ScaleOut Software.
]]>A conventional pipeline combines telemetry from all data sources into a single stream which is queried by the user’s streaming analytics application. This code often takes the form of a set of SQL queries (extended with time-windowing semantics) running continuously to select interesting events from the stream. These query results are then forwarded to a data lake for offline analytics using tools such as Spark and for data visualization. Query results also might be forwarded to cloud-based serverless functions to trigger alerts or other actions in conjunction with access to a database or blob store.
These techniques are highly effective for analyzing telemetry in aggregate to identify unusual situations which might require action. For example, if the telemetry is tracking a fleet of rental cars, a query could report all cars by make and model that have reported a mechanical problem more than once over the last 24 hours so that follow-up inquiries can be made. In another application, if the telemetry stream contains key-clicks from an e-commerce clothing site, a query might count how many times a garment of a given type or brand was viewed in the last hour so that a flash sale can be started.
A key limitation of this approach is that it is difficult to separately track and analyze the behavior of each individual data source, especially when they number in the thousands or more. It’s simply not practical to create a unique query tailored for each data source. Fine-grained analysis by data source must be relegated to offline processing in the data lake, making it impossible to craft individualized, real-time responses to the data sources.
For example, a rental car company might want to alert a driver if she/he strays from an allowed region or appears to be lost or repeatedly speeding. An e-commerce company might want to offer a shopper a specific product based on analyzing the click-stream in real time with knowledge of the shopper’s brand preferences and demographics. These individualized actions are impractical using the conventional tools of real-time streaming analytics.
However, real-time digital twins easily bring these capabilities within reach. Take a look at how the streaming pipeline differs when using real-time digital twins:
The first important difference to note is that the execution platform automatically correlates telemetry events by data source. This avoids the need for the application to select events by data source using queries (which is impractical in any case when using a conventional pipeline with many data sources). The second difference is that real-time digital twins maintain immediately accessible (in-memory) state information for each data source which is used by message-processing code to analyze incoming events from that data source. This enables straightforward application code to immediately react to telemetry information in the context of knowledge about the history and state of each data source.
For example, the rental car application can keep each driver’s contract, location history, and the car’s known mechanical issues and service history within the corresponding digital twin for immediate reference to help detect whether an alert is needed. Likewise, the e-commerce application can keep each shopper’s recent product searches along with brand preferences and demographics in her/his digital twin, enabling timely suggestions targeted to each shopper.
The power of real-time digital twins lies in their ability to make fine-grained analysis and responses possible in real time for thousands of data sources. They are made possible by scalable, in-memory computing technology hosted on clusters of cloud-based servers. This provides the fast response times and scalable throughput needed to support many thousands of data sources.
Lastly, real-time digital twins open the door to real-time aggregate analytics that analyze state data across all instances to spot emerging patterns and trends. Instead of waiting for the data lake to provide insights, aggregate analytics on real-time digital twins can immediately surface patterns of interest, maximizing situational awareness and assisting in the creation of response strategies.
With aggregate analytics, the rental car company can identify regions with unusual delays due to weather or highway blockages and then alert the appropriate drivers to suggest alternative routes. The e-commerce company can spot hot-selling products perhaps due to social media events and respond to ensure that inventory is made available.
Real-time digital twins create exciting new capabilities that were not previously possible with conventional techniques. You can find detailed information about ScaleOut Software’s cloud service for real-time digital twins here.
The post Real-Time Digital Twins: A New Approach to Streaming Analytics appeared first on ScaleOut Software.
]]>The post AppFabric Caching: What Now? appeared first on ScaleOut Software.
]]>Eighteen months ago we posted a blog on the performance and feature shortcomings of Microsoft’s Windows Server AppFabric (WSAF) Caching. Since then much has transpired. Microsoft announced earlier this year it will be ending support for Windows Server AppFabric 1.1 by April 2017. AppFabric Caching users now have to determine the right next step in migrating to an alternative distributed cache.
Recommended alternatives found lacking
With its “mobile first, cloud first” strategy, it appears that Microsoft is pushing customers to its Microsoft Azure cloud platform by recommending that “all Microsoft AppFabric customers using Cache to move to Microsoft Azure Redis Cache.” However, for many customers it is not yet practical to move to Azure, and a fully supported, on-premise solution is required for their distributed cache. Microsoft’s recommendation that customers move to Redis is both controversial and misleading since the Redis open source community does not recommend running on Windows.
Further, Redis is an in-memory database which works well on a single server but lacks many of the scalability and high-availability features of a mature, fully featured in-memory data grid, let alone real-time analytics and computing capabilities. This leaves customers with an uncertain and commercially unsupported future when migrating their on-premise, .NET compatible, distributed cache away from AppFabric Caching.
A better alternative
Luckily, ScaleOut Software has been loyally serving the .NET developer community with an industry-leading in-memory data grid for over a decade. ScaleOut’s architectural design philosophy focuses on delivering high performance with maximum ease-of-use. It employs a single, coherent architecture for integrating scalability and high availability; this architecture is transparently leveraged in all aspects of its in-memory data grid. We call this “scalable, highly available everything” — the platform goes beyond linear performance scaling for accessing grid objects and uses a common architecture for all features such as distributed locking, event processing, load-balancing, geographic replication, parallel computation and backup/restore.
The key benefits and advantages that set ScaleOut StateServer® apart from its industry peers are:
Peer-to-peer architecture:
Ease-of-use:
Extended functionality:
ScaleOut StateServer (and its product extensions) go far beyond AppFabric Caching’s basic capabilities and add important functionality, including:
Furthermore, unlike Redis and some of the other open-source alternatives, ScaleOut StateServer is fully commercially supported by ScaleOut Software for use on Windows (or Linux).
Replacement and migration options
Customers have two paths to choose from when planning their migration off of AppFabric Caching to ScaleOut Software’s distributed cache: retain source-code compatibility for their existing legacy applications or migrate their applications to fully take advantage of native ScaleOut APIs.
Source-code compatible library
The ScaleOut Windows Server AppFabric (WSAF) Caching Compatibility Library is a source-code compatible, drop-in replacement for Microsoft AppFabric Caching APIs. This allows existing customer applications using AppFabric to preserve the legacy AppFabric Caching API semantics and switch to ScaleOut StateServer without making any code changes and use familiar PowerShell commands to manage the distributed cache. This library ships as part of ScaleOut StateServer release 5.4 and later, as described in the WSAF Caching Compatibility Library Reference.
Native ScaleOut APIs
Customers also can rewrite their applications to use ScaleOut StateServer’s native APIs, which allows applications to take full advantage of the extended functionality mentioned above. Using a hybrid approach, native ScaleOut StateServer APIs can be used alongside AppFabric APIs. We have developed a detailed technical AppFabric Caching migration guide to help developers through this process.
Using the WSAF Caching Compatibility Library
The WSAF Caching Compatibility Library is easy to integrate into applications that use AppFabric Caching’s APIs. Here is an outline of the required steps:
For More Information
More information about all the AppFabric Caching replacement and migration resources available can be found at www.scaleoutsoftware.com/appfabric, including a valuable offer for former AppFabric Caching users. We hope that the power of our platform as both a replacement and an upgrade from Microsoft’s WSAF Caching has captured your interest. Regardless if you run on-premise or in the cloud, it may be the right next step for your application.
The post AppFabric Caching: What Now? appeared first on ScaleOut Software.
]]>The post AppFabric Caching: Retry Later appeared first on ScaleOut Software.
]]>Given all this, we thought it would be a good opportunity to see how we are doing relative to the competition, and in particular, relative to Microsoft’s AppFabric caching for Windows on-premise servers. In addition to looking at performance differences, we also want to compare ScaleOut StateServer (SOSS) to AppFabric on qualitative measures, such as features, ease of installation, and management.
Well, performance comparisons aren’t so easy since the AppFabric license agreement states: “You may not disclose the results of any benchmark tests of the software to any third party without Microsoft’s prior written approval.” So our comments will be confined to what testing we felt was valuable and how well SOSS performed.
In a recent customer engagement, the application needed to load 10M objects into each server of the IMDG’s cluster to make full use of high-end servers with 60GB memory capacity. Measuring object creation and access rates on a single server is a good test of the IMDG’s memory management and multithreading, and this is an area in which we have made several performance optimizations. Using a workload of random object sizes varying from 200B to 2KB, SOSS was able to load 2 million objects in 59.2 seconds and then sustain 18K read/update pairs per second to random objects. That’s actually quite fast. (We invite you to test AppFabric’s performance; contact us if you need a test application.)
We also looked at load-balancing and recovery times for this workload of 2M objects when adding a server to a 2-server cluster, removing the third server, and also just killing the third server. This measures how well the IMDG’s servers use multithreading to maximize network bandwidth during load-balancing, and it also evaluates failure detection and recovery algorithms. These are areas in which we have invested heavily to take advantage of 10 Gbps (and faster) networks and to handle intermittent network delays inherent in virtual server infrastructures. While handling an access load of 6K read/update pairs per second, SOSS was measured to complete load-balancing in less than 35 seconds for joins and leaves and also to complete recovery and restore full throughput in this amount of time after a server failure.
We were surprised to discover that AppFabric throws exceptions to the client application during load-balancing and recovery and due to security issues, as described in other blog posts. During load-balancing, the client gets the following exception when accessing the cache:
ErrorCode<ERRCA0017>:SubStatus<ES0006>:There is a temporary failure. Please retry later. (One or more specified cache servers are unavailable, which could be caused by busy network or servers. …)
You’ll also see a “Please retry later” exception (forever!) if the client runs with insufficient security authorization; we had to run the client as administrator to avoid problems without a much deeper investigation. The client also throws these exceptions during “graceful” host shutdowns and during recovery.
To keep application development as straightforward as possible, SOSS handles all exceptions that occur during membership changes and load-balancing, automatically retrying requests within the server as necessary. This avoids exposing the application to the details of the IMDG’s internal operations unless the IMDG is completely unreachable.
Our experience with customers consistently reinforces the need to keep middleware software as easy to install and use as possible, especially given that it is deployed as distributed software running on a cluster of servers. This philosophy manifests itself in all areas, including installation, application development, and cluster management. We believe that installing our software should be as straightforward as we can make it, requiring minimal knowledge of the host operating system and the fewest possible explicit configuration settings.
AppFabric caching does not share this design approach and requires the configuration of numerous components, including security policies, SQL Server or a network file share with the appropriate permissions, “lead” hosts, etc. Also, a long list of PowerShell commands for managing the cache must be learned and correctly used. Running AppFabric caching in production typically requires the use of a domain and is deeply tied into the Windows Server infrastructure. As expected, AppFabric requires the comprehensive knowledge of a Windows system administrator to install and manage.
In our testing, the net result was about a half-day investment in time on our part (and some frustration) getting AppFabric up and running. After spending an hour trying to join multiple cluster hosts in a Windows workgroup, we switched to using a domain controller to make this work; it just wasn’t worth the time to sort out the incorrect configuration settings. Head scratching was required in several other areas before our AppFabric cluster showed signs of life.
A major source of frustration with AppFabric is its use of PowerShell commands to manage the cache cluster. It’s easy to forget that distributed software is running on multiple servers which need to be orchestrated as a group, and that’s hard to do with a sequence of shell commands because you can’t track the state of the cluster at a glance. It’s much easier to manage a cluster with a graphical user interface (GUI) which shows the status of all hosts and alerts you to dynamic situations, such as high usage, load-balancing, or network outages.
To take full advantage of the GUI approach, SOSS uses a Windows-based management console with intuitive controls that make management of the IMDG simple and easy to learn. The console also adds advanced visualization features, such as integrated performance charts and tabular usage charts, a “heat” map showing the availability and dynamic state of all regions within the distributed store, and an object browser that lets you see stored objects and examine both their metadata and contents.
The following screenshots illustrate SOSS’s performance charting and heat map. Note that the status of the IMDG and all cluster hosts is instantly visible in the tree list at the left:
Because the GUI management console receives periodic updates from all cluster hosts, it stays tightly integrated with the cluster and dynamically updates the latest status. In contrast, the use of shell commands just gives you a snapshot of the state of the cluster at one instant in time. We also have observed that these results quickly can become out of sync with what client applications are actually experiencing. For example, a shell command can report that the cluster is in an unknown state when in fact the client is successfully completing access requests. (In AppFabric, be prepared to wait for several seconds for management commands to time out when a cluster host goes into an unknown state.)
AppFabric uses a single store, either a file share or SQL Server, to hold the cluster’s configuration, which adds complexity to installation and creates a single point of failure. Although SQL Server can be clustered, this adds even more cost than just using a single server. To avoid these problems, SOSS automatically replicates its configuration files on every cluster host to maximize availability with a fully peer-to-peer implementation. This approach also keeps the user from having to deal with managing a configuration store.
The peer-to-peer issue arises again in AppFabric’s requiring a majority of lead hosts (presumably) to reconfigure the cluster after a membership change. Because some hosts are lead hosts and others are not, an AppFabric caching cluster will go down even when hosts are healthy. Moreover, the administrator has to understand and ensure that a majority hosts quorum of lead hosts exists, and the number of lead hosts varies with cluster size. For example, a small cluster of up to 20 hosts might use 3 lead hosts, requiring two hosts to form a quorum. If 2 of the 3 lead hosts go down, the cluster will go down even if there are 18 healthy hosts.
With ScaleOut, you don’t need to know what the word “quorum” means. SOSS sidesteps these issues by using a fully peer-to-peer membership so that all hosts can participate in constructing the cluster membership. (SOSS makes use of a ScaleOut Software patent which eliminates the need for a majority hosts quorum by building a logical quorum on a uniform set of servers.) This means that SOSS can avoid the use of lead hosts and maintains service as long as any host survives. System administrators view all cluster hosts as peers and do not have to be aware of SOSS’s internal mechanisms for implementing the cluster membership.
It’s actually not clear to us whether AppFabric’s design philosophy regarding high availability is more closely aligned with “best effort” distributed caches like memcached or with mission-critical in-memory data grids like SOSS and others. Data replication is turned off by default and is explicitly set on a namespace-by-namespace basis using management commands. (High availability apparently is not available on Windows Server 2008R2 Standard Edition and requires Enterprise Edition, which can cost about $2,800 more per server, or you must upgrade to Windows Server 2012.) Also, since data is hosted in managed code, access delays due to garbage collection (as well as the exceptions noted above) are to be expected. (Microsoft recommends not storing more than 16GB in a cache host to avoid GC pauses.) Lastly, AppFabric’s client cache is not kept coherent with the distributed cache, and so the client cache cannot be used for transactional data.
In contrast, SOSS automatically replicates all stored objects on multiple hosts to maintain high availability at all times. Likewise, it uses an unmanaged heap for stored data to keep access times as predictable as possible and avoid GC pauses. It also keeps all client caches coherent with the IMDG so that multi-threaded code running on the cluster can coordinate access to transactional data using the well understood sequential consistency model.
It was tempting to write this blog post as a feature comparison between AppFabric and SOSS. It’s clear that, as a full in-memory data grid, SOSS — unlike AppFabric — incorporates many features that go well beyond distributed caching. For example, SOSS lets applications query stored data by property using Microsoft’s own LINQ, and ScaleOut Analytics Server (SOAS) can perform data-parallel analysis of queried objects using application code that SOAS automatically deploys on the cluster. ScaleOut hServer takes analytics a step further by hosting full Hadoop MapReduce on the IMDG.
That said, since AppFabric is targeted at distributed caching, we felt that AppFabric users likely are more focused on issues regarding deployment, performance, and availability. Beyond just evaluating how well products like AppFabric and SOSS extract performance from scale-up, it’s also important to examine how they stack up in their overall role of providing scalable, highly available in-memory storage.
When looking at other design approaches, we feel that ScaleOut’s philosophy of easy-to-use, fully peer-to-peer design with GUI-based management provides important dividends by simplifying deployment and maximizing visibility, while driving high performance and availability. Not surprisingly, all of this lowers the total cost of ownership, which — as we have seen — even for “free” software is never zero.
The post AppFabric Caching: Retry Later appeared first on ScaleOut Software.
]]>The post How Do In-Memory Data Grids Differ from Storm? appeared first on ScaleOut Software.
]]>(The following description of in-memory data grids (IMDGs) is excerpted from last week’s blog post; see that post for more details.)
IMDGs host data in memory and distribute it across a cluster of commodity servers. Using an object-oriented data storage model, they provide APIs for updating data objects typically in well under a millisecond (depending on the size of the object). This enables operational systems to use IMDGs for storing, accessing, and updating fast-changing, “live” data, while maintaining fast access times even as the storage workload grows.
Data storage needs can easily grow as more users store data within an IMDG. IMDGs accommodate this growth by adding servers to the cluster and automatically rebalancing stored data across the servers. This ensures that both capacity and throughput scale linearly with growth in the workload, and access and update times remain low regardless of the workload’s size. Moreover, IMDGs maintain stored data with high availability using data replication so that if a server fails, operational systems can continuously handle access requests and update requests without delay.
IMDGs Perform Data-Parallel Computation
Because IMDGs store data in memory distributed across a cluster of servers, they easily can perform data-parallel computations on stored data; they simply make use of the cluster’s processing power to analyze data “in place,” that is, without the need to migrate it to other servers. This enables IMDGs to provide fast results (often in milliseconds) with minimal overhead.
The following diagram of the architecture used by ScaleOut Analytics Server and ScaleOut hServer shows a stream of incoming changes which are applied to the grid’s memory-based data store using API updates. The real-time analytics engine performs data parallel computation on stored data, combines the results across the cluster, and outputs a combined stream of alerts to the operational system.
A significant aspect of the IMDG’s architecture for data analytics is that it performs computations on data hosted in memory – not specifically on an incoming data stream. This memory-based storage is continuously updated by an incoming data stream, so the computation has access to the latest changes to the data. However, the computation also has access to the history of changes manifested by the current state of data stored in the grid. This gives the computation a rich data set for analysis that includes both the incoming data stream and the application’s persistent state.
Storm originally was developed by Nathan Marz at Backtype to overcome the limitations of Hadoop in analyzing streams of incoming data, such as Twitter streams and web log files. Its goal was to provide real-time, continuous computation that is both scalable and fault tolerant. Described both as stream processing and event processing, its computation model incorporates a combination of task parallelism and pipelining. The developer describes two basic entities: “spouts,” which generate streams of data in the form of ordered tuples, and “bolts,” which process incoming streams and optionally generate outgoing streams for other bolts. Spouts and bolts are organized into an acyclic, directed graph to create an executable configuration. (See this slide deck, among many available, for a more detailed explanation.)
The following diagram illustrates a Storm configuration of streams and bolts processing a set of input streams and generating a set of output streams. The green circles represent tuples within an input stream, and the blue boxes represent bolts. Note that spouts which generate the input streams are not shown in the diagram. The orange circles represent an optional output data stream, which may be implemented by the bolts in an arbitrary manner (e.g., as API calls to an external agent instead of as a stream of tuples).
Application developers specify several aspects of the configuration, such as the number of tasks that can be spawned to execute each bolt, and the manner in which an incoming stream’s tuples are distributed across these tasks. Various groupings implement characteristics that correspond to behaviors found in Hadoop MapReduce. For example, the shuffle grouping implements a random distribution of tuples to tasks akin to input to mappers, and the field grouping implements a key-based partitioning very close to that used as input to reducers. Other groupings also are available, such as “all,” which is equivalent to multicast.
Storm implements and executes a specified configuration using a hierarchy of nodes whose state and fault-tolerance are maintained by the open-source Zookeeper cluster manager. A master node (called Nimbus) manages a set of worker nodes (called Supervisors), which run tasks. Strategies are available to handle failures of each of these components and to ensure that stream tuples are reliably processed.
A major strength of Storm is its continuous execution model. Once a configuration has been deployed, incoming data streams can be processed without scheduling delays, thereby providing uninterrupted, real-time results. This overcomes a major drawback of Hadoop MapReduce, which processes data in batch jobs with significant latency (often 15+ seconds) in starting up each job.
IMDGs approximate Storm’s continuous execution model in two ways. First they allow continuous, overlapped updates to in-memory state, enabling them to handle high arrival rates of incoming data (e.g., 1000s of updates per second for each IMDG server in a cluster). Both IMDGs and Storm scale out to increase throughput. Second, some IMDGs allow data-parallel operations to be performed continuously with very low startup delay (typically a few milliseconds). This allows IMDGs to output a stream of analysis results that matches the low latency required by operational systems. (Unlike Storm, IMDGs such as ScaleOut hServer also precisely match Hadoop’s MapReduce semantics, which require that reducers be able to process all key-value pairs emitted by the mappers in a given computation.)
Storm’s data model describes a set of tuple streams. Bolts analyze and filter these streams, creating new streams to hold their results. While bolts are unconstrained in their ability to access and update external stores, such as IMDGs or file-based NoSQL stores (e.g., Mongo DB or Cassandra), this is not a central aspect of their processing model. Put another way, Storm does not provide any particular semantics for managing stateful data.
In contrast, IMDGs are organized around a stateful data model implemented by an object-oriented, in-memory store which is both scalable and highly available. This store is intended to hold ongoing, business-logic state implemented by collections of objects representing fast-changing data used in operational environments. In previous blog posts, we have seen examples in e-commerce (e.g., session-state and shopping carts) and financial services (e.g., portfolios and stock histories). Incoming data streams update these entities, which hold information that persists and evolves over their lifetimes. Making these entities “first class” citizens in the computation model simplifies the design of business logic while enabling stream processing using a combination of object-oriented updates and data-parallel computation to both modify and analyze this state.
Where IMDGs and Storm really differ is in their approaches to managing the complexity of the computation model. Like Microsoft Dryad and other parallel execution platforms with task precedence graphs, Storm defines a computation using a directed graph of execution nodes, each of which has a variable number of tasks. While the modular nature of an execution pipeline has appeal, its complexity can quickly become daunting. One reason for this is that the configuration’s graph is represented by sequential code describing bolts and the streams to which they are connected. As the number of bolts and streams grows, it becomes increasingly difficult to visualize their relationships and grasp the application’s overall behavior.
Other parallel systems like Storm with task precedence graphs, such as messaging passing systems and actor models, have demonstrated substantial complexity over the last few decades. Also, the Storm application developer must specify the number of tasks executed by each bolt. As the number of bolts and streams increase, it becomes challenging for the developer to manage the graph, predict the dynamics of its execution, and tune for best performance.
A central reason that IMDGs employ a data-parallel computation model is its simplicity, both in exposition and execution. (Another key reason is that data-parallel computation minimizes data motion which limits scalability. Storm’s data motion between bolts may incur more network overhead than IMDGs and impact scalability, but we have not evaluated this.) Since their application code is inherently straightforward, data-parallel programs are relatively easy to understand, and they don’t need extensive tuning for high performance. Also, separating updates to business logic state from data-parallel analytics simplifies integration into operational systems.
IMDGs offer a platform for scalable, memory-based storage and data-parallel computation which was specifically designed for use in operational systems. Because it incorporates API support for accessing and updating individual data objects and data-parallel analytics, IMDGs are easily integrated into the business logic of these systems.
Storm was designed for a different purpose, namely to analyze streams of data using a continuously running execution pipeline. Its more complex computation model fits this purpose well, and, as a result, Storm embodies a different set of tradeoffs than IMDGs. Clearly, the term “real-time analytics” encompasses a variety of solutions designed to meet diverse business requirements.
The post How Do In-Memory Data Grids Differ from Storm? appeared first on ScaleOut Software.
]]>The post How Do In-Memory Data Grids Differ from Spark? appeared first on ScaleOut Software.
]]>IMDGs host data in memory and distribute it across a cluster of commodity servers. Using an object-oriented data storage model, they provide APIs for updating data objects typically in well under a millisecond (depending on the size of the object). This enables operational systems to use IMDGs for storing, accessing, and updating fast-changing data, while maintaining fast access times even as the storage workload grows. For example, an e-commerce website can store session state and shopping carts within an IMDG, and a financial services application can store stock portfolios. In both cases, stored data must be frequently updated and accessed.
Data storage needs can easily grow as more users store data within an IMDG. IMDGs accommodate this growth by adding servers to the cluster and automatically rebalancing stored data across the servers. This ensures that both capacity and throughput grow linearly with the size of the workload and that access and update times remain low regardless of the workload’s size.
Moreover, IMDGs maintain stored data with high availability using data replication. They typically create one or more replicas of each data object on different servers so that they can continue to access all stored data even after a server (or network component) fails; they do not have to pause to recreate data after a failure. IMDGs also self-heal to automatically create new replicas during recovery. All of this is critically important to operational systems which must continuously handle access and update requests without delay.
IMDGs Add Data-Parallel Computation for Analytics
Because IMDGs store data in memory distributed across a cluster of servers, they easily can perform data-parallel computations on stored data; they simply make use of the cluster’s processing power to analyze data “in place,” that is, without the need to migrate it to other servers. This enables IMDGs to provide fast results with minimum overhead. For example, a recent demonstration of ScaleOut hServer running a MapReduce calculation for a financial services application generated analysis results in about 330 milliseconds compared to 15+ seconds for Apache Hadoop.
A significant aspect of the IMDG’s architecture for data analytics is that it performs its computations on data hosted in memory – not on an incoming data stream. This memory-based storage is continuously updated by an incoming data stream, so the computation has access to the latest changes to the data. However, the computation also has access to the history of changes as manifested by the state of the data stored in the grid. This gives the computation a much richer data set for performing an analysis than it would have if it could only see the incoming data stream. We call it “stateful” real-time analytics.
Take a look at the following diagram, which illustrates the architecture for ScaleOut Analytics Server and ScaleOut hServer. The diagram shows a stream of incoming changes which are applied to the grid’s memory-based data store using API updates. The real-time analytics engine performs data parallel computation on the stored data, combines the results across the cluster, and outputs a combined stream of alerts to the operational system.
The power of stateful analytics is that the computation can provide deeper insights than otherwise. For example, an e-commerce website can analyze not just browser actions but also interpret these actions in terms of a history of customer preferences and shopping history to offer feedback. Likewise, a financial services application can analyze market price fluctuations to determine trading strategies based on the trading histories for individual portfolios tuned after several trades and influenced by preferences.
The Berkeley Spark project has developed a data-parallel execution engine designed to accelerate Hadoop MapReduce calculations (and add related operators) by staging data in memory instead of by moving it from disk to memory and back for each operator. Using this technique and other optimizations, it has demonstrated impressive performance gains over Hadoop MapReduce. This project’s stated goal (quoting from a tutorial slide deck from U.C. Berkeley’s amplab is to “extend the MapReduce model to better support two common classes of analytics apps: iterative algorithms (machine learning, graphs) [and] interactive data mining [and] enhance programmability: integrate into Scala programming language.”
A key new mechanism that supports Spark’s programming model is the resilient distributed dataset (RDD) to “allow apps to keep working sets in memory for efficient reuse.” They are “immutable, partitioned collections of objects created through parallel transformations.” To support fault tolerance, “RDDs maintain lineage information that can be used to reconstruct lost partitions.”
You can see the key differences between using an IMDG hosting data-parallel computation and Spark to perform MapReduce and similar analyses. IMDGs analyze updatable, highly available, memory-based collections of objects, and this makes them ideal for operational environments in which data is being constantly updated even while analytics computations are ongoing. In contrast, Spark was designed to create, analyze, and transform immutable collections of data hosted in memory. This makes Spark ideal for optimizing the execution of a series of analytics operators.
The following diagram illustrates Spark’s use of memory-hosted RDDs to hold data accessed by its analytics engine:
However, Spark is not well suited to operational environments for two reasons. First, data cannot be updated. In fact, if Spark inputs data from HDFS, changes have to propagated to HDFS from another data source since HDFS files only can be appended, not updated. Second, RDDs are not highly available. Their fault-tolerance results from reconstructing them from their recorded lineage, which may take substantially more time to complete than server failover by an IMDG. This represents an appropriate tradeoff for Spark because, unlike IMDGs, it focuses on analytics computations on data that does not need to be constantly available.
Even though Spark makes different design tradeoffs than IMDGs to support fast analytics, IMDGs can still deliver comparable speedup over Hadoop. For example, we measured Apache Spark executing the well-known Hadoop “word count” benchmark on a 4-server cluster running 9.6X faster than CDH5 Hadoop MapReduce for a 10 GB dataset hosted in HDFS. On this same benchmark, ScaleOut hServer ran 14X faster than Hadoop when executing standard Java MapReduce code.
Spark Streaming extends Spark to handle streams of input data and was motivated by the need to “process large streams of live data and provide results in near-real-time” (quoting from the slide deck referenced above). It “run[s] a streaming computation as a series of very small, deterministic batch jobs” by chopping up an input stream into a sequence of RDDs which it feeds to Spark’s execution engine. “The processed results of the RDD operations are returned in batches.” Computations can create or update other RDDs in memory which hold information regarding the state or history of the stream.
The representation of input and output streams as RDDs can be illustrated as follows:
This model of computation overcomes Spark’s basic limitation of working only on immutable data. Spark Streaming offers stateful operators that enable incoming data to be combined with in-memory state. However, it employs a distinctly stream-oriented approach with parallel operators that does not match the typical, object-oriented usage model of asynchronous, individual updates to memory-based objects implemented by IMDGs for operational environments. It also uses Spark’s fault-tolerance which does not support high availability for individual objects.
For example, IMDGs apply incoming changes to individual objects within a stateful collection by using straightforward object updates, and they simultaneously run data-parallel operations on the collection as a whole to perform analytics. We theorize that when using Spark Streaming, the same computation would require that each collection of updates represented by an incoming RDD be applied to the appropriate subset of objects within another “stateful” RDD held in memory. This in turn would require that the two RDDs be aligned to perform a parallel operation, which could add complexity to the original algorithm, especially if updates need to be applied to more than one object in the stateful collection. Also, fault-tolerance might require checkpointing to disk since the collection’s lineage could grow lengthy over time.
IMDGs offer a platform for scalable, memory-based storage and data-parallel computation which was specifically designed for use in operational systems, such as the ones we looked at above. Because it incorporates API support for accessing and updating individual data objects with integrated high availability, IMDGs are easily integrated into the business logic of these systems. Although Spark and Spark Streaming, with their use of memory-based storage and accelerated MapReduce execution times, bear a resemblance to IMDGs such as ScaleOut hServer, they were not intended for use in operational systems and do not provide the feature set needed to make this feasible. We will take a look at how IMDGs differ from Storm and CEP in an upcoming blog.
The post How Do In-Memory Data Grids Differ from Spark? appeared first on ScaleOut Software.
]]>