The post Deploying ScaleOut’s Distributed Cache In Google Cloud appeared first on ScaleOut Software.
]]>by Olivier Tritschler, Senior Software Engineer
Because of their ability to provide highly elastic computing resources, public clouds have become a highly attractive platform for hosting distributed caches, such as ScaleOut StateServer®. To complement its current offerings on Amazon AWS and Microsoft Azure, ScaleOut Software has just announced support for the Google Cloud Platform. Let’s take a look at some of the benefits of hosting distributed caches in the cloud and understand how we have worked to make both deployment and management as simple as possible.
Distributed Caching in the Cloud
Distributed caches, like ScaleOut StateServer, enhance a wide range of applications by offering shared, in-memory storage for fast-changing state information, such as shopping carts, financial transactions, geolocation data, etc. This data needs to be quickly updated and shared across all application servers, ensuring consistent tracking of user state regardless of the server handling a request. Distributed caches also offer a powerful computing platform for analyzing live data and generating immediate feedback or operational intelligence for applications.
Built using a cluster of virtual or physical servers, distributed caches automatically scale access throughput and analytics to handle large workloads. With their tightly integrated client-side caching, these caches typically provide faster access to fast-changing data than backing stores, such as blob stores and database servers. In addition, they incorporate redundant data storage and recovery techniques to provide built-in high availability and ensure uninterrupted access if a server fails.
To meet the needs of elastic applications, distributed caches must themselves be elastic. They are designed to transparently scale upwards or downwards by adding or removing servers as the workload varies. This is where the power of the cloud becomes clear.
Because cloud infrastructures provide inherent elasticity, they can benefit both applications and distributed caches. As more computing resources are needed to handle a growing workload, clouds can deploy additional virtual servers (also called cloud “instances”). Once a period of high demand subsides, resources can be dialed back to minimize cost without compromising quality of service. The flexibility of on-demand servers also avoids costly capital investments and reduces management costs.
Deploying ScaleOut’s Distributed Cache in the Google Cloud
A key challenge in using a distributed cache as part of a cloud-hosted application is to make it easy to deploy, manage, and access by the application. Distributed caches are typically deployed in the cloud as a cluster of virtual servers that scales as the workload demands. To keep it simple, a cloud-hosted application should just view a distributed cache as an abstract entity and not have to keep track of individual caching servers or which data they hold. The application does not want to be concerned with connecting N application instances to M caching servers, especially when N and M (as well as cloud IP addresses) vary over time. In particular, an application should not have to discover and track the IP addresses for the caching servers.
Even though a distributed cache comprises several servers, the simplest way to deploy and manage it in the cloud is to identify the cache as a single, coherent service. ScaleOut StateServer takes this approach by identifying a cloud-hosted distributed cache with a single “store” name combined with access credentials. This name becomes the basis for both managing the deployed servers and connecting applications to the cache. It lets applications connect to the caching cluster without needing to be aware of the IP addresses for the cluster’s virtual servers.
The following diagram shows a ScaleOut StateServer distributed cache deployed in Google Cloud. It shows both cloud-hosted and on-premises applications connected to the cache, as well as ScaleOut’s management console, which lets users deploy and manage the cache. Note that while the distributed cache and applications all contain multiple servers, applications and users can access the cache just by using its store name.
Building on the features developed for the integration of Amazon AWS and Microsoft Azure, the ScaleOut Management Console now lets users deploy and manage a cache in Google Cloud by just specifying a store name and initial number of servers, as well as other optional parameters. The console does the rest, interacting with Google Cloud to start up the distributed cache and configure its servers. To enable the servers to form a cluster, the console records metadata for all servers and identifies them as having the same store name.
Here’s a screenshot of the console wizard used for deploying ScaleOut StateServer in Google Cloud:
The management console provides centralized, on-premises management for initial deployment, status tracking, and adding or removing servers. It uses Google’s managed instance groups to host servers, and automated scripts use server metadata to guarantee that new servers automatically connect with an existing store. The managed instance groups used by ScaleOut also support defining auto-scaling options based on CPU/Memory usage metrics.
Instead of using the management console, users can also deploy ScaleOut StateServer to Google Cloud directly with Google’s Deployment Manager using optional templates and configuration files.
Simplifying Connectivity for Applications
On-premises applications typically connect each client instance to a distributed cache using a fixed list of IP addresses for the caching servers. This process works well on premises because the cache’s IP addresses typically are well known and static. However, it is impractical in the cloud since IP addresses change with each deployment or reboot of a caching server.
To avoid this problem, ScaleOut StateServer lets client applications specify a store name and credentials to access a cloud-hosted distributed cache. ScaleOut’s client libraries internally use this store name to discover the IP addresses of caching servers from metadata stored in each server.
The following diagram shows a client application connecting to a ScaleOut StateServer distributed cache hosted in Google Cloud. ScaleOut’s client libraries make use of an internal software component called a “grid mapper” which acts as a bootstrap mechanism to find all servers belonging to a specified cache using its store name. The grid mapper accesses the metadata for the associated caching servers and returns their IP addresses back to the client library. The grid mapper handles any potential changes in IP addresses, such as servers being added or removed for scaling purposes.
Summing up
Because they provide elastic computing resources and high performance, public clouds, such as Google Cloud, offer an excellent platform for hosting distributed caches. However, the ephemeral nature of their virtual servers introduces challenges for both deploying the cluster and connecting applications. Keeping deployment and management as simple as possible is essential to controlling operational costs. ScaleOut StateServer makes use of centralized management, server metadata, and automatic client connections to address these challenges. It ensures that applications derive the full benefits of the cloud’s elastic resources with maximum ease of use and minimum cost.
The post Deploying ScaleOut’s Distributed Cache In Google Cloud appeared first on ScaleOut Software.
]]>The post Simulate at Scale with Digital Twins appeared first on ScaleOut Software.
]]>
With the ScaleOut Digital Twin Streaming Service, the digital twin software model has proven its versatility well beyond its roots in product lifecycle management (PLM). This cloud-based service uses digital twins to implement streaming analytics and add important contextual information not possible with other stream-processing architectures. Because each digital twin can hold key information about an individual data source, it can enrich the analysis of incoming telemetry and extracts important, actionable insights without delay. Hosting digital twins on a scalable, in-memory computing platform enables the simultaneous tracking of thousands — or even millions — of data sources.
Owing to the digital twin’s object-oriented design, many diverse applications can take advantage of its powerful but easy-to-use software architecture. For example, telematics applications use digital twins to track telemetry from every vehicle in a fleet and immediately identify issues, such as lost or erratic drivers or emerging mechanical problems. Airlines can use digital twins to track the progress of passengers throughout an itinerary and respond to delays and cancellations with proactive remedies that smooth operations and reduce stress. Other applications abound, including health informatics, financial services, logistics, cybersecurity, IoT, smart cities, and crime prevention.
Here’s an example of a telematics application that tracks a large fleet of vehicles. Each vehicle has a corresponding digital twin analyzing telemetry from the vehicle in real time:
Applications like these need to simultaneously track the dynamic behavior of numerous data sources, such as IoT devices, to identify issues (or opportunities) as quickly as possible and give systems managers the best possible situational awareness. To either validate streaming analytics code for a complex physical system or model its behavior, it is useful to simulate the devices and the telemetry that they generate. The ScaleOut Digital Twin Streaming Service now enables digital twins to simplify both tasks.
Digital twins can implement a workload generator that generates telemetry used in validating streaming analytics code. Each digital twin models the behavior of a physical data source, such as a vehicle in fleet, and the messages it sends and receives. When running in simulation, thousands of digital twins can then generate realistic telemetry for all data sources and feed streaming analytics, such as a telematics application, designed to track and analyze its behavior. In fact, the streaming service enables digital twins to implement both the workload generator and the streaming analytics. Once the analytics code has been validated in this manner, developers can then deploy it to track a live system.
Here’s an example of using a digital twin to simulate the operations of a pump and the telemetry (such as the pump’s temperature and RPM) that it generates. Running in simulation, this simulated pump sends telemetry messages to a corresponding real-time digital twin that analyzes the telemetry to predict impending issues:
Once the simulation has validated the analytics, the real-time digital twin can be deployed to analyze telemetry from an actual pump:
This example illustrates how digital twins can both simulate devices and provide streaming analytics for a live system.
Using digital twins to build a workload generator enables investigation of a wide range of scenarios that might be encountered in typical, real-world use. Developers can implement parameterizable, stateful models of physical data sources and then vary these parameters in simulation to evaluate the ability of streaming analytics to analyze and respond in various situations. For example, digital twins could simulate perimeter devices detecting security intrusions in a large infrastructure to help evaluate how well streaming analytics can identify and classify threats. In addition, the streaming service can capture and record live telemetry and later replay it in simulation.
In addition to using digital twins for analyzing telemetry, the ScaleOut Digital Twin Streaming Service enables digital twins to implement time-driven simulations that model large groups of interacting physical entities. Digital twins can model individual entities within a large system, such as airline passengers, aircraft, airport gates, and air traffic sectors in a comprehensive airline model. These digital twins maintain state information about the physical entities they represent, and they can run code at each time step in the simulation model’s execution to update digital twin state over time. These digital twins also can exchange messages that model interactions.
For example, an airline tracking system can use simulation to model numerous types of weather delays and system outages (such as ground stops) to see how their system manages passenger needs. As the simulation model evolves over time, simulated aircraft can model flight delays and send messages to simulated passengers that react by updating their itineraries. Here is a depiction of an airline tracking simulation:
In contrast to the use of digital twins for PLM, which typically embody a complex design within a single digital twin model, the ScaleOut Digital Twin Streaming Service enables large numbers of physical entities and their interactions to be simulated. By doing this, simulations can model intricate behaviors that evolve over time and reveal important insights during system design and optimization. They also can be fed live data and run faster than real time as a tool for making predictions that assist decision-making by managers (such as airline dispatchers).
Digital twins offer a compelling software architecture for implementing time-driven simulations with thousands of entities. In a typical implementation, developers create multiple digital twin models to describe the state information and simulation code representing various physical entities, such as trucks, cargo, and warehouses in a telematics simulation. They create instances of these digital twin models (simply called digital twins) to implement all of the entities being simulated, and the streaming service runs their code at each time step being simulated. During each time step, digital twins can exchange messages that represent simulated interactions between physical entities.
The ScaleOut Digital Twin Streaming Service uses scalable, in-memory computing technology to provide the speed and memory capacity needed to run large simulations with many entities. It stores digital twins in memory and automatically distributes them across a cluster of servers that hosts a simulation. At each time step, each server runs the simulation code for a subset of the digital twins and determines the next time step that the simulation needs to run. The streaming service orchestrates the simulation’s progress on the cluster and advances simulation time at a rate selected by the user.
In this manner, the streaming service can harness as many servers as it needs to host a large simulation and run it with maximum throughput. As illustrated below, the service’s in-memory computing platform can add new servers while a simulation is running, and it can transparently handle server outages should they occur. Users need only focus on building digital twin models and deploying them to the streaming service.
Digital twins have historically been employed as a tool for simulating increasingly detailed behavior of a complex physical entity, like a jet engine. The ScaleOut Digital Twin Streaming Service takes digital twins in a new direction: simulation of large systems. Its highly scalable, in-memory computing architecture enables it to easily simulate many thousands of entities and their interactions. This provides a powerful new tool for extracting insights about complex systems that today’s managers must operate at peak efficiency. Its analytics and predictive capabilities promise to offer a high return on investment in many industries.
The post Simulate at Scale with Digital Twins appeared first on ScaleOut Software.
]]>The post Steve Smith Review: Simplify Redis Clustering with ScaleOut IMDB appeared first on ScaleOut Software.
]]>Check out the blog post and video from distinguished software architect and .NET guru Steve “ardalis” Smith on the challenges of scaling single-server Redis and how ScaleOut In-Memory Database tackles them with fully automated cluster technology to avoid complex manual configuration steps.
Steve Smith is a well-known entrepreneur and software developer. He is passionate about building quality software and spreading his knowledge through training workshops, speaking at developer conferences, and sharing his experience on his blog and podcast. Steve Smith has also been recognized as a Microsoft MVP for over ten consecutive years.
The post Steve Smith Review: Simplify Redis Clustering with ScaleOut IMDB appeared first on ScaleOut Software.
]]>The post Redis vs ScaleOut: What You Need to Know appeared first on ScaleOut Software.
]]>By William L. Bain and Bryce C. Klinker
Breaking news: ScaleOut Software has announced a community preview of support for Redis clients in ScaleOut StateServer. Learn more here.
Distributed caching technology first hit the market in about 2001 with the introduction of Tangosol Coherence and has been evolving ever since. Designed to help applications scale performance by eliminating bottlenecks in accessing data, this distributed computing technology stores live, fast-changing data in memory across a cluster of inexpensive, commodity servers or virtual machines. The combination of fast, memory-based data storage and throughput scaling with multiple servers results in consistently fast access and update times for growing workloads, such as e-commerce, financial services, IoT device tracking, and other applications.
ScaleOut Software introduced its distributed caching product, ScaleOut StateServer® (SOSS), in 2005 and has made continuous enhancements over the last 16 years. While the single-server version of Redis was released in 2009 by Salvatore Sanfilippo, clustering support was first added in 2015. These two products embody highly different design goals. SOSS was designed as an integrated distributed caching architecture incorporating transparent throughput scaling and high availability using data replication with the goals of maximizing performance, ease of use, and portability across operating systems. In contrast, according to M. Russo, Redis was conceived as a single-server, data-structure store to improve the performance of a real-time data analytics product. (Beyond just storing strings or opaque objects, a data-structure store also implements various data types, such as lists and sorted sets.) Clustering was added to Redis’ single-server architecture after 4 years to provide a way to scale.
As background for the following discussion, it’s important to review some key concepts. Most distributed caches use a key/value storage model that identifies stored objects using string keys. To distribute objects across multiple servers in a cluster, a distributed cache typically maps keys to hash slots, each of which holds a subset of objects. The cache then distributes hash slots across the servers and moves them between servers as needed to balance the workload; this process is called sharding. A group of hash slots running on a single server (called a node here) can either be a primary or replica. Clients direct updates to the target hash slot on a primary node, which replicates the update to one or more replica nodes for high availability in case the primary node fails.
Ease of Use
The differences in design goals of the two technologies have led to very different impacts on users. To maximize ease of use, SOSS automatically creates and manages hash slots for the user, including primaries and replicas. Using a built-in load-balancer, each service internally manages a subset of both primary and replica hash slots, as illustrated below. Users just create a single SOSS service process on every node, and these service processes discover each other and distribute the hash slots among themselves to balance the workload. They also automatically handle all aspects of recovery after a node fails.
In contrast, Redis users create separate service processes on each node for primary and replica hash slots and must manually distribute the hash slots among the primaries. (Unlike SOSS, a 1-node or 2-node Redis cluster is not allowed.) As we will see below, users must perform a complex set of manual actions when adding and removing nodes and to heal and rebalance the cluster after a node fails. The following diagram illustrates the difference between Redis and SOSS in the user’s view of the cluster:
Adding a Node to the Cluster Using SOSS
To illustrate how SOSS’s built-in mechanisms for managing hash slots, load-balancing, failure detection, and self-healing simplify cluster management, let’s look at the steps needed to add a node to the cluster. When using SOSS, the user just installs the service on a new node and clicks a button in the management console to join the cluster. Using multicast discovery (or optional host list if multicast is not available), the service process automatically receives primary and replica hash slots and starts handling its portion of the workload. The following diagram shows the addition of a fourth node to a cluster:
Adding a Node to the Cluster Using Redis
Because Redis requires the user to manage the creation of primary and replica service processes (sometimes called shards) and the management of hash slots, many more steps must be performed to add a node to the cluster. To accomplish this, the user runs administrative commands that create the new processes, connect the primaries and replicas, move the replicas as necessary, and reallocate the hash slots among the nodes. The required configuration changes are illustrated below:
Here is an example of administrative steps required to make the configuration changes (using node 0’s IP and port as the bootstrap address for the new node):
// Start up a new replica redis-server instance on node 3 for primary 2: redis-cli --cluster add-node host3Ip:replicaPort node0Ip:node0Port --cluster-slave --cluster-master-id primary2NodeID // Start up a new primary redis-server instance on node 3: redis-cli --cluster add-node host3Ip:primaryPort existingIp:existingPort // Connect to replica 2 on node 0 and modify it to replicate primary 3: redis-cli -h replica2Ip -p -replica2Port > cluster replicate primary3NodeID // Reshard the cluster by interactively moving hash slots from existing nodes to node 3: redis-cli --cluster reshard existingIp:existingPort > How many slots to move? 4096 //16384 / 4 = 4096 > What node to move slots to? primary3NodeID // (primary3NodeID returned by previous command) > What nodes to move slots from? all
This process is complex, and it becomes more difficult to keep track of the distribution of hash slots with larger cluster memberships. Removing a node has comparable complexity.
Recovering After a Node Fails (SOSS and Redis)
SOSS’s service processes automatically detect and recover from the loss of a node. They use built-in, scalable, peer-to-peer heart-beating to detect missing node(s) and create a new, coherent cluster membership. Next, they promote replica hash slots to primaries on the surviving nodes, create new replicas for self-healing, and rebalance the workload across the nodes.
Redis does not implement a coherent cluster membership and does not provide automatic self-healing and recovery. Each Redis node sends heartbeat messages to random other nodes to detect possible failures, and the cluster uses a gossip mechanism to declare that a node has failed. After that, its replica on a different node promotes itself to a primary so that the hash slots remain available, but Redis does not self-heal by creating a new replica for the hash slots. Also, it does not automatically redistribute the hash slots across the nodes to rebalance the workload. These tasks are left to the system administrator, who needs to sort out the needed configuration changes and implement them to restore a fully redundant, balanced cluster.
Performance Comparison
The different design choices between SOSS and Redis also lead to semantic and performance differences. To maximize ease of use for application developers, SOSS maintains all stored data with full consistency (to be more precise, sequential consistency), ensuring that it only serves the latest updates and never loses data after the failure of a single server (or two servers if multiple replicas are used). This design choice targets enterprise applications that need to ensure that the distributed cache always returns the correct data. To implement data replication across multiple replicas with the highest possible performance, SOSS uses a patented quorum algorithm.
In contrast, Redis employs an eventual consistency model with asynchronous replication. In general, this choice enables higher throughput because updates do not have to wait for replication to complete before responding to the user. It also enables potentially higher read throughput by serving reads from replicas even if they are not guaranteed to serve the latest updates.
Given these two design choices, it’s valuable to compare the throughput of the two distributed caches as nodes are added and the workload is simultaneously increased, as illustrated below. This technique evaluates how well the caches can scale their throughput by adding nodes to handle increasing workload; linear throughput scaling ensures consistently fast response times. (For a discussion of throughput scaling in distributed systems, see Gustafson’s Law.).
To perform an apples-to-apples throughput comparison of Redis 6.2 and SOSS 5.10, SOSS was configured to use eventual consistency (“EC”) when updating replicas. The performance of SOSS with full consistency (“FC”) was also measured. Tests were run for 3, 4, and 6 node clusters in AWS on m5.xlarge instances with 4 cores@2.5 Ghz, and 16GB RAM. The clients ran read/update pairs on 100K objects of sizes 2KB and 20KB to represent a typical web workload with a 1:1 read/update ratio. The results are as follows:
SOSS provided consistently higher throughput than Redis when eventual consistency was used to perform updates (the blue and gray lines in the charts). Running SOSS with full consistency (the red lines) resulted in lower throughput, as expected, since updates have to be committed at the replica before responding to the client instead of being performed asynchronously. However, both Redis and SOSS with full consistency delivered close to the same throughput for 20KB objects. This may be due to benefits of SOSS’s client-side caching, which eliminated unnecessary data transfers during reads.
Summing Up
Our comparison of SOSS and Redis shows the benefits of ScaleOut’s integrated clustering architecture. A key design goal for SOSS was to simplify the user’s workload by providing a unified, location-transparent data cache with built-in, fully automatic load-balancing and high availability. By hiding the inner workings of hash slots, heart-beating, replica placement, load-balancing, and self-healing, the application developer and systems administrator can focus on simply using the distributed cache instead of configuring its implementation. In our view, Redis’s approach of exposing these complex mechanisms to the user significantly steepens the learning curve and increases the user’s workload.
It might come as a surprise to learn that in the above benchmark testing, SOSS maintained a consistent performance advantage. We attribute this to ScaleOut’s approach of designing an integrated cluster architecture from the outset instead of adding clustering to a single server data store, as Redis did. This approach enabled design freedom at every step to eliminate distributed bottlenecks, and it led to extensive use of multithreading and internal data sharding within each service process to extract maximum performance from multi-core servers.
Lastly, SOSS demonstrates that the CAP theorem doesn’t really prevent the use of full consistency when building a scalable, distributed cache. For many enterprise applications, which demand data integrity at all times, this may be the better choice.
Learn more about how ScaleOut StateServer compares to Redis.
The post Redis vs ScaleOut: What You Need to Know appeared first on ScaleOut Software.
]]>The post Why Use “Real-Time Digital Twins” for Streaming Analytics? appeared first on ScaleOut Software.
]]>
What Problems Does Streaming Analytics Solve?
To understand why we need real-time digital twins for streaming analytics, we first need to look at what problems are tackled by popular streaming platforms. Most if not all platforms focus on mining the data within an incoming message stream for patterns of interest. For example, consider a web-based ad-serving platform that selects ads for users and logs messages containing a timestamp, user ID, and ad ID every time an ad is displayed. A streaming analytics platform might count all the ads for each unique ad ID in the latest five-minute window and repeat this every minute to give a running indication of which ads are trending.
Based on technology from the Trill research project, the Microsoft Stream Analytics platform offers an elegant and powerful platform for implementing applications like this. It views the incoming stream of messages as a columnar database with the column representing a time-ordered history of messages. It then lets users create SQL-like queries with extensions for time-windowing to perform data selection and aggregation within a time window, and it does this at high speed to keep up with incoming data streams.
Other streaming analytic platforms, such as open-source Apache Storm, Flink, and Beam and commercial implementations such as Hazelcast Jet, let applications pass an incoming data stream through a pipeline (or directed graph) of processing steps to extract information of interest, aggregate it over time windows, and create alerts when specific conditions are met. For example, these execution pipelines could process a stock market feed to compute the average stock price for all equities over the previous hour and trigger an alert if an equity moves up or down by a certain percentage. Another application tracking telemetry from gas meters could likewise trigger an alert if any meter’s flow rate deviates from its expected value, which might indicate a leak.
What’s key about these stream-processing applications is that they focus on examining and aggregating properties of data communicated in the stream. Other than by observing data in the stream, they do not track the dynamic state of the data sources themselves, and they don’t make inferences about the behavior of the data sources, either individually or in aggregate. So, the streaming analytics platform for the ad server doesn’t know why each user was served certain ads, and the market-tracking application does not know why each equity either maintained its stock price or deviated materially from it. Without knowing the why, it’s much harder to take the most effective action when an interesting situation develops. That’s where real-time digital twins can help.
The Need for Real-Time Digital Twins
Real-time digital twins shift the application’s focus from the incoming data stream to the dynamically evolving state of the data sources. For each individual data source, they let the application incorporate dynamic information about that data source in the analysis of incoming messages, and the application can also update this state over time. The net effect is that the application can develop a significantly deeper understanding about the data source so that it can take effective action when needed. This cannot be achieved by just looking at data within the incoming message stream.
For example, the ad-serving application can use a real-time digital twin for each user to track shopping history and preferences, measure the effectiveness of ads, and guide ad selection. The stock market application can use a real-time digital twin for each company to track financial information, executive changes, and news releases that explain why its stock price starts moving and filter out trades that don’t fit desired criteria.
Also, because real-time digital twins maintain dynamic information about each data source, applications can aggregate this highly curated data instead of just aggregating data in the data stream. This gives users deeper insights into the overall state of all data sources and boosts “situational awareness” that is hard to maintain by just looking at the message stream.
An Example
Consider a trucking fleet that manages thousands of long-haul trucks on routes throughout the U.S. Each truck periodically sends telemetry messages about its location, speed, engine parameters, and cargo status (for example, trailer temperature) to a real-time monitoring application at a central location. With traditional streaming analytics, personnel can detect changes in these parameters, but they can’t assess their significance to take effective, individualized action for each truck. Is a truck stopped because it’s at a rest stop or because it has stalled? Is an out-of-spec engine parameter expected because the engine is scheduled for service or does it indicate that a new issue is emerging? Has the driver been on the road too long? Does the driver appear to be lost or entering a potentially hazardous area?
The use of real-time digital twins provides the context needed for the application to answer these questions as it analyzes incoming messages from each truck. For example, it can keep track of the truck’s route, schedule, cargo, mechanical and service history, and information about the driver. Using this information, it can alert drivers to impending problems, such as road blockages, delays or emerging mechanical issues. It can assist lost drivers, alert them to erratic driving or the need for rest stops, and help when changing conditions require route updates.
The following diagram shows a truck communicating with its associated real-time digital twin. (The parallelogram represents application code.) Because the twin holds unique contextual data for each truck, analysis code for incoming messages can provide highly focused feedback that goes well beyond what is possible with traditional streaming analytics:
As illustrated below, the ScaleOut Digital Twin Streaming Service runs as a cloud-hosted service in the Microsoft Azure cloud to provide streaming analytics using real-time digital twins. It can exchange messages with thousands of trucks across the U.S., maintain a real-time digital twin for each truck, and direct messages from that truck to its corresponding twin. It simplifies application code, which only needs to process messages from a given truck and has immediate access to dynamic, contextual information that enhances the analysis. The result is better feedback to drivers and enhanced overall situational awareness for the fleet.
Lower Complexity and Higher Performance
While the functionality implemented by real-time digital twins can be replicated with ad hoc solutions that combine application servers, databases, offline analytics, and visualization, they would require vastly more code, a diverse combination of skill sets, and longer development cycles. They also would encounter performance bottlenecks that require careful analysis to measure and resolve. The real-time digital twin model running on ScaleOut Software’s integrated execution platform sidesteps these obstacles.
Scaling performance to maintain high throughput creates an interesting challenge for traditional streaming analytics platforms because the work performed by their task pipelines does not naturally map to a set of processing cores within multiple servers. Each pipeline stage must be accelerated with parallel execution, and some stages require longer processing time than others, creating bottlenecks in the pipeline.
In contrast, real-time digital twins naturally create a uniformly large set of tasks that can be evenly distributed across servers. To minimize network overhead, this mapping follows the distribution of in-memory objects within ScaleOut’s in-memory data grid, which holds the state information for each twin. This enables the processing of real-time digital twins to scale transparently without adding complexity to either applications or the platform.
Summing Up
Why use real-time digital twins? They solve an important challenge for streaming analytics that is not addressed by other, “pipeline-oriented” platforms, namely, to simultaneously track the state of thousands of data sources. They use contextual information unique to each data source to help interpret incoming messages, analyze their importance, and generate feedback and alerts tailored to that data source.
Traditional streaming analytics finds patterns and trends in the data stream. Real-time digital twins identify and react to important state changes in the data sources themselves. As a result, applications can achieve better situational awareness than previously possible. This new way of implementing streaming analytics can be used in a wide range of applications. We invite you to take a closer look.
The post Why Use “Real-Time Digital Twins” for Streaming Analytics? appeared first on ScaleOut Software.
]]>The post The Amazing Evolution of In-Memory Computing appeared first on ScaleOut Software.
]]>
Going back to the mid-1990s, online systems have seen relentless, explosive growth in usage, driven by ecommerce, mobile applications, and more recently, IoT. The pace of these changes has made it challenging for server-based infrastructures to manage fast-growing populations of users and data sources while maintaining fast response times. For more than two decades, the answer to this challenge has proven to be a technology called in-memory computing.
In general terms, in-memory computing refers to the related concepts of (a) storing fast-changing data in primary memory instead of in secondary storage and (b) employing scalable computing techniques to distribute a workload across a cluster of servers. Assuming bottlenecks are avoided, this enables transparent throughput scaling that matches an increase in workload, which in turn keeps response times low for users. It can also take advantage of the elastic computing resources available in cloud infrastructures to quickly and cost-effectively scale throughput to meet changes in demand.
Harnessing the power of in-memory computing requires software platforms that can make in-memory computing’s scalability readily available to applications using APIs while hiding the complexity of its implementation. Emerging in the early 2000s, the first such platforms provided distributed caching on clustered servers with straightforward APIs for storing and retrieving in-memory objects. When first introduced, distributed caching offered a breakthrough for applications by storing fast-changing data in memory on a server cluster for consistently fast response times, while simultaneously offloading database servers that would otherwise become bottlenecked. For example, ecommerce applications adopted distributed caching to store session-state, shopping carts, product descriptions, and other data that shoppers need to be able to access quickly.
Software platforms for distributed caching, such as ScaleOut StateServer®, which was introduced in 2005, hide internal mechanisms for cluster membership, throughput scaling, and high availability to take full advantage of the cluster’s scalable memory without adding complexity to applications. They transparently distribute stored objects across the cluster’s servers and ensure that data is not lost if a server or network component fails.
As distributed caching has evolved over the last two decades, additional mechanisms for in-memory computing have been incorporated to take advantage of the computing power available in the server cluster. Parallel query enables stored objects on all servers to be scanned simultaneously to retrieve objects with desired properties. Data-parallel computing analyzes objects on the server cluster to extract and report patterns of interest; it scales much better than parallel query by avoiding network bottlenecks and by using the cluster’s computing resources.
Most recently, stream-processing has been implemented with in-memory computing to simultaneously analyze telemetry from thousands or even millions of data sources and track dynamic state information for each data source. ScaleOut Software’s real-time digital twin model provides straightforward APIs for implementing stream-processing applications within its ScaleOut Digital Twin Streaming Service, an Azure-based cloud service, while hiding internal mechanisms, such as distributing incoming messages to in-memory objects, updating state information for each data source, and running aggregate analytics.
The following diagram shows the evolution of in-memory computing from distributed caching to stream-processing with real-time digital twins. Each step in the evolution has built on the previous one to add new capabilities that take advantage of the scalable computing power and fast data access that in-memory computing enables.
For ecommerce applications, this evolution has created new capabilities that dramatically improve the experience for online shoppers. Instead of just passively hosting session-state and shopping carts, online applications now can mine shopping carts for dynamic trends to evaluate the effectiveness of product descriptions and marketing programs (such as flash sales). They can also employ real-time digital twins or similar techniques to track each shopper’s behavior and make recommendations. By analyzing a click-stream of product selections in the context of knowledge of a shopper’s preferences and demographics, an ecommerce site can make highly focused recommendations to assist the shopper.
For example, one of ScaleOut Software’s customers recently upgraded from just using distributed caching to scale its ecommerce application. This customer now incorporates stream-processing capabilities using ScaleOut StreamServer® to capture click-streams and score users so that its web site can make more effective real-time offers.
The following diagram illustrates how the evolution of in-memory computing has enhanced the online experience for ecommerce shoppers by providing in-the-moment suggestions:
Starting with its development for parallel supercomputing in the late 1990s and evolving into its latest form as a cloud-based service, in-memory computing has offered powerful, software based APIs for building applications that serve large populations of users and data sources. It has helped assure that these applications deliver predictably fast performance and scale to meet the demands of growing workloads. In the next few years, we should continued innovation from in-memory computing to help ecommerce and other applications maintain their competitive edge.
The post The Amazing Evolution of In-Memory Computing appeared first on ScaleOut Software.
]]>The post Voluntary & Anonymous Contact “Self”-Tracing at Scale appeared first on ScaleOut Software.
]]>A New Approach: Log Our Own Contacts in Advance
As we all know, getting people back to work will require testing and contact tracing. The latter will need armies of people to track down all contacts of each person who tests positive for coronavirus. Leading software companies are building mobile apps to log the places we have been and determine possible contacts. However, these apps will be complex by relying on Bluetooth wireless technology to detect proximity, and they raise concerns regarding both accuracy and privacy. Experts have stated that humans need to be in the loop for contact tracing to be effective. Maybe there’s a hybrid approach that offers promise in the near term.
What if we could easily and anonymously let people keep track of encounters as they occur that they judge to be potentially risky, such as with work colleagues, merchants, neighbors, and friends? As people log these contacts using aliases for privacy, cloud software could build a web of connections, often connecting strangers through intermediaries. Later, when a person tests positive, he or she could notify this software. It then could follow the breadcrumbs and anonymously alert via text message or email all people who recently came into contact and should be tested and/or self-isolated.
Here’s an example. When a grocery checker with the alias “flash88” tests positive for COVID-19, he can anonymously alert a chain of people who are connected by direct or intermediate contacts who have logged significant, recent encounters, such as haircuts or backyard barbeques:
Although voluntarily “self-tracing” our contacts would require proactive effort for each of us to log individual encounters, it could be done quickly and simply by just entering a person’s anonymous alias into a mobile app or web page. Most people likely would choose to do this only for lengthy interactions, such as with colleagues working closely together, friends at a barbeque, or perhaps a hairdresser or server at a restaurant. Location information could be included if permitted. What emerges from countless individual actions is a massive, continuously evolving web of contacts that can be immediately called upon for alerting when anyone tests positive.
For self-tracing to be effective, people who come into mutual contact need to register aliases and contact information (email or mobile number) with the software. As each person encounters a potentially risky contact and logs that contact’s alias, the system builds a record of the contact just for the period of time that risk of infection is possible, possibly two weeks. Regular work contacts could automatically be refreshed on a schedule.
The power of self-tracing is its ability to give us automatic, immediate — and anonymous — notification of exposure risk from a chain of contacts. While this tool does not replace standard contact tracing, it can add important value in its simplicity, timeliness, and incorporation of human judgment. As an alternative to more complicated and invasive Bluetooth-based contact tracing software, it could help accelerate the return to work for colleagues and businesses while offloading some of the work for contact tracers.
Advanced Capabilities
Beyond just tracking contacts, cloud-hosted software can keep track of important statistics, such as when and where (if permitted) a person tested positive or was alerted and how many in a chain separate a person from the source of an alert. Real-time analytics can continuously evaluate this data to immediately identify important trends, such as the locations where an unusually large number of alerts are occurring. This allows personnel to quickly respond and institute a self-isolation protocol where necessary.
Real-time analytics also can generate other useful statistics, such as the average chain length that results in co-infection and the percentage of connected people who actually become co-infected. If anonymous demographics are maintained and submitted by users, statistics can be computed by gender, age, and other parameters useful to healthcare professionals.
The Tech: In-Memory Cloud Computing Makes Self-Tracing Fast and Scalable
Cloud-hosted software technology called in-memory computing makes this contact tracing technique fast, scalable and quickly deployable. (It also can be used to assist other contact tracing efforts and for asset tracking.) While simultaneously tracking contacts for millions of people, this software can alert all related contacts in a matter of seconds after notification, and it can send out many thousands of alerts every second.
When a person tests positive and notifies the software, a cloud-based system uses the record of contacts kept with each person to send out alerts, and it follows these records from contact to contact until all mutually connected people have been alerted. In addition, human contact tracers could take advantage of this network (if permitted by users) to aid in their investigations.
ScaleOut Software has developed an in-memory, cloud-hosted software technology called real-time digital twins which make hosting this application easy, fast, and flexible. As illustrated below, cloud software can maintain a real-time digital twin for each user being tracked in the system to keep dynamic data for both alerting and real-time analytics. In addition to processing alerts, this software continuously extracts dynamic data from the real-time digital twins and analyzes it every few seconds. This makes it possible to generate and visualize the statistics described above in real time. Using real-time digital twins enables an application like this to be implemented within just a few days instead of weeks or months.
Note: To the extent possible, ScaleOut Software will make its cloud-based ScaleOut Digital Twin Streaming Service available free of charge (except for fees from the cloud provider) for public institutions needing help tracking data to assist in the COVID19 crisis.
Summing Up
Anonymous, voluntary, contact self-tracing has the potential to fill an important gap as a hybrid approach between manual contact tracing and complex, fully automated, location-based technologies. In work settings, it could be especially useful in protecting colleagues and customers. It offers a powerful tool to help contain the spread of COVID19, maximize situational awareness for healthcare workers and contact tracers, and minimize risks from exposure for all of us.
The post Voluntary & Anonymous Contact “Self”-Tracing at Scale appeared first on ScaleOut Software.
]]>The post Use Distributed Caching to Accelerate Online Web Sites appeared first on ScaleOut Software.
]]>In this time of extremely high online usage, web sites and services have quickly become overloaded, clogged trying to manage high volumes of fast-changing data. Most sites maintain a wide variety of this data, including information about logged-in users, e-commerce shopping carts, requested product specifications, or records of partially completed transactions. Maintaining rapidly changing data in back-end databases creates bottlenecks that impact responsiveness. In addition, repeatedly accessing back-end databases to serve up popular items, such as product descriptions and news stories, also adds to the bottleneck.
The Solution: Distributed Caching
The solution to this challenge is to use scalable, memory-based data storage for fast-changing data so that web sites can keep up with exploding workloads. A widely used technology called distributed caching meets this need by storing frequently accessed data in memory on a server farm instead of within a database. This speeds up accesses and updates while offloading back-end database servers. Also called in-memory data grids, distributed caches, such as ScaleOut StateServer®, use server farms to both scale storage capacity and accelerate access throughput, thereby maintaining fast data access at all times.
The following diagram illustrates how a distributed cache can store fast-changing data to accelerate online performance and offload a back-end database server:
The Technology Behind Distributed Caching
It’s not enough simply to lash together a set of servers hosting a collection of in-memory caches. To be reliable and easy to use, distributed caches need to incorporate technology that provides important attributes, including ease of integration, location transparency, transparent scaling, and high availability with strong consistency. Let’s take a look at some of these capabilities.
To make distributed caches easy to use and keep them fast, they typically employ a “NoSQL” key/value access model and store values as serialized objects. This enables web applications to store, retrieve, and update instances of their application-defined objects (for example, shopping carts) using a simple key, such as a user’s unique identifier. This object-oriented approach allows distributed caches to be viewed as more of an extension of an application’s in-memory data storage than as a separate storage tier.
That said, a web application needs to interact with a distributed cache as a unified whole. It’s just too difficult for the application to keep track of which server within a distributed cache holds a given data object. For this reason, distributed caches handle all the bookkeeping required to keep track of where objects are stored. Applications simply present a key to the distributed cache, and the cache’s client library finds the object, regardless of which server holds it.
It’s also the distributed cache’s responsibility to distribute access requests across its farm of servers and scale throughput as servers are added. Linear scaling keeps access times low as the workload increases. Distributed caches typically use partitioning techniques to accomplish this. ScaleOut StateServer further integrates the cache’s partitioning with its client libraries so that scaling is both transparent to applications and automatic. When a server is added, the cache quietly rebalances the workload across all caching servers and makes the client libraries aware of the changes.
To enable their use in mission-critical applications, distributed caches need to be highly available, that is, to ensure that both stored data and access to the distributed cache can survive the failure of one of the servers. To accomplish this, distributed caches typically store each object on two (or more) servers. If a server fails, the cache detects this, removes the server from the farm, and then restores the redundancy of data storage in case another failure occurs.
When there are multiple copies of an object stored on different servers, it’s important to keep them consistent. Otherwise, stale data due to a missed update could inadvertently be returned to an application after a server fails. Unlike some distributed caches which use a simpler, “eventual” consistency model prone to this problem, ScaleOut StateServer uses a patented, quorum-based technique which ensures that all stored data is fully consistent.
There’s More: Parallel Query and Computing
Because a distributed cache stores memory-based objects on a farm of servers, it can harness the CPU power of the server farm to analyze stored data much faster than would be possible on a single server. For example, instead of just accessing individual objects using keys, it can query the servers in parallel to find all objects with specified properties. With ScaleOut StateServer, applications can use standard query mechanisms, such as Microsoft LINQ, to create parallel queries executed by the distributed cache.
Although they are powerful, parallel queries can overload both a requesting client and the network by returning a large number of query results. In many cases, it makes more sense to let the distributed cache perform the client’s work within the cache itself. ScaleOut StateServer provides an API called Parallel Method Invocation (and also a variant of .NET’s Parallel.ForEach called Distributed ForEach) which lets a client application ship code to the cache that processes the results of a parallel query and then returns merged results back to the client. Moving code to where the data lives accelerates processing while minimizing network usage.
Distributed Caches Can Help Now
Online web sites and services are now more vital than ever to keeping our daily activities moving forward. Since almost all large web sites use server farms to handle growing workloads, it’s no surprise that server farms can also offer a powerful and cost-effective hardware platform for hosting a site’s fast-changing data. Distributed caches harness the power of server farms to handle this important task and remove database bottlenecks. Also, with integrated parallel query and computing, distributed caches can now do much more to offload a site’s workload. This might be a good time to take a fresh look at the power of distributed caching.
The post Use Distributed Caching to Accelerate Online Web Sites appeared first on ScaleOut Software.
]]>The post The Next Generation in Logistics Tracking with Real-Time Digital Twins appeared first on ScaleOut Software.
]]>Traditional platforms for streaming analytics don’t offer the combination of granular data tracking and real-time aggregate analysis that logistics applications in operational environments such as these require. It’s not enough just to pick out interesting events from an aggregated data stream and then send them to a database for offline analysis using Spark. What’s needed is the ability to easily track incoming telemetry from each individual store so that issues can be quickly analyzed, prioritized, and handled. At the same time, it’s important to be able to combine data and generate analytics results for all stores in real time so that strategic decisions, such as running flash sales or replenishing hot-selling inventory, can be made without unnecessary delay.
The answer to these challenges is a new software concept called the “real-time digital twin.” This breakthrough approach for streaming analytics lets application developers separately track and analyze incoming telemetry from each individual store while the platform handles the task of filtering out telemetry associated with other stores. This dramatically simplifies application code and automatically scales its use by letting the execution platform run this code simultaneously for all stores. In addition, the platform provides fast, in-memory data storage so that the application can easily and quickly record both telemetry and analytics results for each store. Lastly, built-in, aggregate analysis tools make it easy to immediately roll up these analytics results across all stores and continuously keep track of the big picture.
The ScaleOut Digital Twin Streaming Service, which runs in the Microsoft Azure cloud, hosts real-time digital twins for applications like these that need to track thousands of data sources. It can receive telemetry from retail stores over the Internet using event delivery systems such as Azure IoT Event Hub, AWS, Kafka, and REST, and it can respond back to the retail stores in milliseconds. In addition, the cloud service hosts aggregate analytics that can continuously complete every few seconds, avoiding the need to wait for offline results from a data lake.
The following diagram illustrates a nationwide chain of retail outlets streaming telemetry to their counterpart real-time digital twins running in the cloud service. It also shows real-time aggregate results being fed to displays for immediate consumption by operations managers.
With this ground-breaking technology, operations managers now can easily manage logistics for thousands of retail outlets, immediately identify and respond to issues, and optimize operations in real time. By harnessing the power and simplicity of the real-time digital twin model, application developers can quickly implement and customize streaming analytics to meet these mission-critical requirements. With the real-time digital twin model, the next generation of streaming analytics has arrived.
The post The Next Generation in Logistics Tracking with Real-Time Digital Twins appeared first on ScaleOut Software.
]]>The post Digital Twins Enable Seamless Use of Edge Computing in IoT appeared first on ScaleOut Software.
]]>We also saw how digital twins can be organized in a hierarchy in which the lowest level twins represent individual devices and higher-level twins represent subsystems, possibly organized at multiple levels, which control these devices. Higher-level twins receive events from lower-level twins in the same manner that the lowest-level twins receive events from physical devices. Likewise, twins at all levels can send messages downwards in the hierarchy for purposes of control, eventually resulting in signals being sent to the devices.
As described in the blog cited above, the following diagram illustrates the use of a digital twin hierarchy to implement a hypothetical windmill:
Observing that digital models cleanly match the semantics of object-oriented programming, we can implement them in a straightforward manner as object instances that correspond to individual devices or higher-level subsystems. Because real-world IoT applications can track thousands of devices or other entities (e.g., medical patients, ecommerce shoppers), distributed, in-memory data grids (IMDGs) with integrated in-memory computing (such as ScaleOut StreamServer) provide a natural platform for hosting these objects and executing their event-handling functions. IMDGs enable transparent performance scaling, which is required to ensure fast event handling, and they typically have built-in high availability. The following diagram depicts a population of devices and their corresponding digital twins running in an IMDG:
As computing power at the edge inexorably grows, it makes increasing sense to provide enhanced intelligence close to the devices. This minimizes event-handling latency and enables better management of local operations, while still providing strategic analysis and control by remote (cloud-based or on-premises) IoT applications. The challenge is to determine how to partition application logic between the cloud and edge, and more specifically, how to easily migrate functionality to the edge. What is required is a software architecture that enables seamless migration without requiring application code to be reimplemented for execution on edge-based platforms.
The digital twin provides a powerful answer to this challenge. We can leverage its data and code encapsulation to transparently migrate low-level event handling functionality to the edge – where the devices live. Instead of re-implementing application code for use at the edge, we can simply migrate the lowest level digital twins to edge-based execution platforms without changing their code or messaging protocols. For example, consider how the windmill’s digital twins can be migrated downwards to the windmill itself while keeping the overall digital twin hierarchy intact, as shown in the following diagram:
The lowest-level digital twins now are hosted at the edge next to their corresponding devices without any changes to the code. Container-based execution can replicate the IMDG’s execution environment so that application code is unaware of the migration other than by observing dramatically lower event-handling latency. The higher-level digital twins continue to run in the cloud or on-premises – wherever the required computing resources are located.
Looking beyond this simple example, the use of the digital twin model does not require that all device-specific functionality migrate to the edge. Consider a more complex application in which a device is represented by a pair of digital twins, a low-level twin that directly manages the device’s operations, and a higher-level twin that implements predictive analytics, perhaps using a compute-intensive, machine-learning (ML) algorithm, based on telemetry received from the low-level twin. In this application, it may make sense to migrate the low-level twin to the edge for better responsiveness and uninterrupted operations, while keeping the high-level, “strategic” twin in the cloud where computing resources are available to execute its predictive analytics algorithm. The following diagram illustrates this scenario for the windmill:
Transparent migration of event-handling functionality to the edge represents yet another way the digital twin model adds value to stateful, stream-processing applications. This model takes full advantage of object-oriented concepts to both simplify application design and create new capabilities which would be daunting and complex to implement at the application level or with conventional stream-processing platforms. Our list of digital twin capabilities now includes:
The digital twin model is worth a close look when designing the next generation of IoT applications.
The post Digital Twins Enable Seamless Use of Edge Computing in IoT appeared first on ScaleOut Software.
]]>The post How to Easily Deploy an IMDG in the Cloud appeared first on ScaleOut Software.
]]>In-memory data grids (IMDGs) add tremendous value to this scenario by providing a sharable, in-memory repository for an application’s fast-changing state information, such as shopping carts, financial transactions, pending orders, geolocation information, machine state, etc. This information tends to be rapidly updated and often needs to be shared across all application servers. For example, when external requests from a web user are directed to different web servers, the user’s state has to be tracked independent of which server is handling the request.
With their tightly integrated client-side caching, IMDGs typically provide much faster access to this shared data than backing stores, such as blob stores, database servers, and NoSQL stores. They also offer a powerful computing platform for analyzing live data as it changes and generating immediate feedback or “operational intelligence;” for example, see this blog post describing the use of real-time analytics in a retail application.
A key challenge in using an IMDG as part of a cloud-hosted application is to easily deploy, access, and manage the IMDG. To meet the needs of an elastic application, an IMDG must be designed to transparently scale its throughput by adding virtual servers and then automatically rebalance its in-memory storage to keep the workload evenly distributed. Likewise, it must be easy to remove IMDG servers when the workload decreases and creates excess capacity.
Like the applications they serve, IMDGs are deployed as a cluster of cloud-hosted virtual servers that scales as the workload demands. This scaling may differ from the application in the number of virtual servers required to handle the workload. To keep it simple, a cloud-hosted application should just view the IMDG as an abstract entity and not be concerned with individual IMDG servers or the data they hold. The application does not want to be concerned with connecting N application instances to M IMDG servers, especially when N and M (as well as cloud IP addresses) vary over time.
Even though an IMDG comprises several servers, the simplest way to deploy and manage an IMDG in the cloud is to identify it as a single, coherent service. ScaleOut StateServer® (and ScaleOut Analytics Server®, which includes features for operational intelligence) take this approach by naming a cloud-hosted IMDG with a single “store” name combined with access credentials. This name becomes the basis both for managing the deployed servers and for connecting applications to the IMDG.
For example, ScaleOut StateServer’s management console lets users deploy and manage an IMDG in both Amazon EC2 and Windows Azure by specifying a store name and the initial number of servers, as well as other optional parameters. The console does the rest, interacting with the cloud provider to accomplish several tasks, including starting up the IMDG, configuring its servers so that they can see each other, and recording metadata in the cloud needed to manage the deployment. For example, here’s the console wizard for deploying an IMDG in Amazon EC2:
When the IMDG’s servers start up, they make use of metadata to find and connect to each other and to form a single, scalable, peer-to-peer service. ScaleOut StateServer uses different techniques on EC2 and Azure to make use of available metadata support. Also, the ScaleOut management console lets users specify various security parameters appropriate to the various cloud providers (e.g., security groups and VPC in EC2 and firewall settings in Azure), and the start-up process configures these parameters for all IMDG servers.
The management console also lets users add (or remove) instances as necessary to handle changes in the workload. The IMDG automatically redistributes the workload across the servers as the membership changes.
The power of managing an IMDG using a single store name becomes apparent when connecting instances of a cloud-based application to the IMDG. On-premise applications typically connect each client instance to an IMDG using a list of IP addresses corresponding to available IMDG servers. This process works well on premise because IP addresses typically are well known and static. However, it is impractical in the cloud since IP addresses change with each deployment or reboot of an IMDG server.
The solution to this problem is to let the application access the IMDG solely by its store name and cloud access credentials and have the IMDG find the servers. The store name and credentials are stored in a configuration file on each application instance with the access credentials fully encrypted. At startup time, the IMDG’s client library reads the configuration file and then uses previously stored metadata in the cloud to find the IMDG’s servers and connect to them. Note that this technique works well with both unencrypted and encrypted connections.
The following diagram illustrates how application instances automatically connect to the IMDG’s servers using the client library’s “grid mapper” software, which retrieves cloud-based metadata to make connections to ScaleOut Analytics Server:
The application need not be running in the cloud. The same mechanism also allows an on-premise application to access a cloud-based IMDG. It also allows an on-premise IMDG to replicate its data to a cloud-based IMDG or connect to a cloud-based IMDG to form a virtual IMDG spanning both sites. (These features are provided in the ScaleOut GeoServer® product.) The following diagram illustrates connecting an on-premise application to a cloud-based IMDG:
Summing Up
As more and more server-side applications migrate to the cloud to take advantage of its elasticity, the power of IMDGs to unlock scalable performance and operational intelligence becomes increasingly compelling. Keeping IMDG deployment as simple as possible is critical to unlock the potential of this combined solution. Leveraging cloud-based metadata to automate the configuration process lets the application ignore the details of the IMDG’s infrastructure and easily access its scalable storage and computing power.
The post How to Easily Deploy an IMDG in the Cloud appeared first on ScaleOut Software.
]]>The post How Do In-Memory Data Grids Differ from Spark? appeared first on ScaleOut Software.
]]>IMDGs host data in memory and distribute it across a cluster of commodity servers. Using an object-oriented data storage model, they provide APIs for updating data objects typically in well under a millisecond (depending on the size of the object). This enables operational systems to use IMDGs for storing, accessing, and updating fast-changing data, while maintaining fast access times even as the storage workload grows. For example, an e-commerce website can store session state and shopping carts within an IMDG, and a financial services application can store stock portfolios. In both cases, stored data must be frequently updated and accessed.
Data storage needs can easily grow as more users store data within an IMDG. IMDGs accommodate this growth by adding servers to the cluster and automatically rebalancing stored data across the servers. This ensures that both capacity and throughput grow linearly with the size of the workload and that access and update times remain low regardless of the workload’s size.
Moreover, IMDGs maintain stored data with high availability using data replication. They typically create one or more replicas of each data object on different servers so that they can continue to access all stored data even after a server (or network component) fails; they do not have to pause to recreate data after a failure. IMDGs also self-heal to automatically create new replicas during recovery. All of this is critically important to operational systems which must continuously handle access and update requests without delay.
IMDGs Add Data-Parallel Computation for Analytics
Because IMDGs store data in memory distributed across a cluster of servers, they easily can perform data-parallel computations on stored data; they simply make use of the cluster’s processing power to analyze data “in place,” that is, without the need to migrate it to other servers. This enables IMDGs to provide fast results with minimum overhead. For example, a recent demonstration of ScaleOut hServer running a MapReduce calculation for a financial services application generated analysis results in about 330 milliseconds compared to 15+ seconds for Apache Hadoop.
A significant aspect of the IMDG’s architecture for data analytics is that it performs its computations on data hosted in memory – not on an incoming data stream. This memory-based storage is continuously updated by an incoming data stream, so the computation has access to the latest changes to the data. However, the computation also has access to the history of changes as manifested by the state of the data stored in the grid. This gives the computation a much richer data set for performing an analysis than it would have if it could only see the incoming data stream. We call it “stateful” real-time analytics.
Take a look at the following diagram, which illustrates the architecture for ScaleOut Analytics Server and ScaleOut hServer. The diagram shows a stream of incoming changes which are applied to the grid’s memory-based data store using API updates. The real-time analytics engine performs data parallel computation on the stored data, combines the results across the cluster, and outputs a combined stream of alerts to the operational system.
The power of stateful analytics is that the computation can provide deeper insights than otherwise. For example, an e-commerce website can analyze not just browser actions but also interpret these actions in terms of a history of customer preferences and shopping history to offer feedback. Likewise, a financial services application can analyze market price fluctuations to determine trading strategies based on the trading histories for individual portfolios tuned after several trades and influenced by preferences.
The Berkeley Spark project has developed a data-parallel execution engine designed to accelerate Hadoop MapReduce calculations (and add related operators) by staging data in memory instead of by moving it from disk to memory and back for each operator. Using this technique and other optimizations, it has demonstrated impressive performance gains over Hadoop MapReduce. This project’s stated goal (quoting from a tutorial slide deck from U.C. Berkeley’s amplab is to “extend the MapReduce model to better support two common classes of analytics apps: iterative algorithms (machine learning, graphs) [and] interactive data mining [and] enhance programmability: integrate into Scala programming language.”
A key new mechanism that supports Spark’s programming model is the resilient distributed dataset (RDD) to “allow apps to keep working sets in memory for efficient reuse.” They are “immutable, partitioned collections of objects created through parallel transformations.” To support fault tolerance, “RDDs maintain lineage information that can be used to reconstruct lost partitions.”
You can see the key differences between using an IMDG hosting data-parallel computation and Spark to perform MapReduce and similar analyses. IMDGs analyze updatable, highly available, memory-based collections of objects, and this makes them ideal for operational environments in which data is being constantly updated even while analytics computations are ongoing. In contrast, Spark was designed to create, analyze, and transform immutable collections of data hosted in memory. This makes Spark ideal for optimizing the execution of a series of analytics operators.
The following diagram illustrates Spark’s use of memory-hosted RDDs to hold data accessed by its analytics engine:
However, Spark is not well suited to operational environments for two reasons. First, data cannot be updated. In fact, if Spark inputs data from HDFS, changes have to propagated to HDFS from another data source since HDFS files only can be appended, not updated. Second, RDDs are not highly available. Their fault-tolerance results from reconstructing them from their recorded lineage, which may take substantially more time to complete than server failover by an IMDG. This represents an appropriate tradeoff for Spark because, unlike IMDGs, it focuses on analytics computations on data that does not need to be constantly available.
Even though Spark makes different design tradeoffs than IMDGs to support fast analytics, IMDGs can still deliver comparable speedup over Hadoop. For example, we measured Apache Spark executing the well-known Hadoop “word count” benchmark on a 4-server cluster running 9.6X faster than CDH5 Hadoop MapReduce for a 10 GB dataset hosted in HDFS. On this same benchmark, ScaleOut hServer ran 14X faster than Hadoop when executing standard Java MapReduce code.
Spark Streaming extends Spark to handle streams of input data and was motivated by the need to “process large streams of live data and provide results in near-real-time” (quoting from the slide deck referenced above). It “run[s] a streaming computation as a series of very small, deterministic batch jobs” by chopping up an input stream into a sequence of RDDs which it feeds to Spark’s execution engine. “The processed results of the RDD operations are returned in batches.” Computations can create or update other RDDs in memory which hold information regarding the state or history of the stream.
The representation of input and output streams as RDDs can be illustrated as follows:
This model of computation overcomes Spark’s basic limitation of working only on immutable data. Spark Streaming offers stateful operators that enable incoming data to be combined with in-memory state. However, it employs a distinctly stream-oriented approach with parallel operators that does not match the typical, object-oriented usage model of asynchronous, individual updates to memory-based objects implemented by IMDGs for operational environments. It also uses Spark’s fault-tolerance which does not support high availability for individual objects.
For example, IMDGs apply incoming changes to individual objects within a stateful collection by using straightforward object updates, and they simultaneously run data-parallel operations on the collection as a whole to perform analytics. We theorize that when using Spark Streaming, the same computation would require that each collection of updates represented by an incoming RDD be applied to the appropriate subset of objects within another “stateful” RDD held in memory. This in turn would require that the two RDDs be aligned to perform a parallel operation, which could add complexity to the original algorithm, especially if updates need to be applied to more than one object in the stateful collection. Also, fault-tolerance might require checkpointing to disk since the collection’s lineage could grow lengthy over time.
IMDGs offer a platform for scalable, memory-based storage and data-parallel computation which was specifically designed for use in operational systems, such as the ones we looked at above. Because it incorporates API support for accessing and updating individual data objects with integrated high availability, IMDGs are easily integrated into the business logic of these systems. Although Spark and Spark Streaming, with their use of memory-based storage and accelerated MapReduce execution times, bear a resemblance to IMDGs such as ScaleOut hServer, they were not intended for use in operational systems and do not provide the feature set needed to make this feasible. We will take a look at how IMDGs differ from Storm and CEP in an upcoming blog.
The post How Do In-Memory Data Grids Differ from Spark? appeared first on ScaleOut Software.
]]>The post How Object-Oriented Programming Simplifies Data-Parallel Analytics appeared first on ScaleOut Software.
]]>The evolution of in-memory data grids (IMDGs) over the past twelve or so years has paved the way for in-memory computing and analytics. IMDGs, such as ScaleOut StateServer, originally were developed to provide scalable, memory-based data storage for operational systems. For example, IMDGs host session-state and shopping carts for e-commerce web server farms and stock trades for financial systems. These applications and their associated IMDGs often run on clusters of physical or virtual servers to maintain fast response times for growing workloads.
Because web server applications, financial applications, and many others typically use object-oriented languages, such as Java, C#, or C++, to implement their business logic, they naturally structure fast-changing state information as collections of objects. For example, consider a website hosting an e-commerce application. This site’s shopping carts can be stored as instances of a “shopping cart” type, enabling business logic to directly manage these objects and leverage the benefits of object-oriented programming (e.g., data encapsulation and inheritance).
It is useful to think of a collection of objects as analogous to rows in a relational database table, with object properties corresponding to the table’s attributes. However, when objects are input from (or output to) a database or other persistent storage repository, a conversion to/from an object-oriented representation is performed. Tools such as Hibernate help perform these conversions.
To maximize ease of use, IMDGs are designed to integrate into an application’s object-oriented business logic and store fast-changing application state. They enable the application to seamlessly share data across servers and synchronize access by threads running on different servers. For example, consider a server cluster hosting a web server farm with a load-balancer distributing incoming browser connections to the web servers. If an e-commerce web application stores shopping carts within an IMDG, it can access the carts from any web server. Also, multiple browser connections can coordinate access to the shopping carts by using the IMDG’s APIs for distributed locking.
Note that IMDGs usually are not used for long term, persistent storage; that is the job of database and file servers. That said, IMDGs usually incorporate mechanisms to ensure the high availability of data after server outages so that they can be employed in mission-critical applications.
To enable IMDGs to store business logic state for scaled-out applications, their APIs provide an object-oriented view of data. IMDGs organize stored data as unstructured collections of objects, called “namespaces,” with each collection holding objects of a single type. These collections directly reflect mechanisms provided by object-oriented languages, such as Java maps and C# collections. To make objects globally accessible by the IMDG across a cluster of servers, the application supplies an identifying name, or “key,” for each stored object, usually in the form of a string, a number, or sometimes another object. For this reason, IMDGs are sometimes called “key-value stores.”
IMDGs automatically distribute objects within each namespace across the IMDG’s cluster of servers to maximize storage capacity and access throughput (by avoiding hot spots on individual servers). Well-designed IMDGs are “elastic” in the sense that they can transparently rebalance the storage workload as servers are added to handle more data or as servers are removed if the workload shrinks. In addition, IMDGs typically maintain redundant copies of stored objects on different servers so that data is not lost if a server fails or becomes unreachable. If such a failure occurs, the IMDG self-heals by restoring full redundancy to stored data on the surviving servers.
Take a look at the following diagram which depicts how an IMDG distributes collections of objects across a cluster of servers:
Because an IMDG’s memory is held in service processes running separately from applications, stored objects must be serialized into byte streams which are sent to the IMDG using inter-process or network communication. IMDGs typically store objects as “blobs” of serialized, uninterpreted data accessed by their keys. However, they also use client-side caches within the address space of applications to hold deserialized copies and thereby speed up retrieval.
Because an IMDG runs on a cluster of servers, it incorporates scalable computing power that grows with storage capacity. IMDGs use this computing power both to scale access throughput and to select data using parallel query. IMDGs can query objects within a namespace by their object properties. Queries take the form of an expression which filters objects based on the values of these properties and generates a collection of keys identifying those objects which match the query criteria.
IMDG queries are similar to database queries except that they operate on object properties instead of relational attributes. For example, if an IMDG holds a collection describing equities in the stock market, a query could select all stocks which are in the high tech sector and have price/earnings ratios greater than 15. The following diagram depicts a parallel query in an IMDG:
ScaleOut StateServer takes advantage of Microsoft’s language integrated query (LINQ) to implement C# queries with a straightforward syntax similar to SQL. It constructs Java queries by composing methods which filter property values and creating Boolean expressions of these filters.
Various techniques are available to index objects for fast query so that they do not have to be deserialized to evaluate their properties when a query is performed. The net effect is that IMDGs can take full advantage of the cluster’s computing power to quickly select the objects of interest within a large collection stored in the IMDG.
However, efficiently managing the results of a parallel query without creating a performance bottleneck is a major challenge. If a query matches many objects, a large set of keys must be collected and delivered to the requesting application, and this can consume a great deal of network bandwidth. For example, matching 1M objects with 40-byte keys requires delivering 40MB to an application, which can take half a second on a gigabit network. Moreover, the application has to retrieve all 1M objects to process the selected data, which can take several seconds depending on the amount and complexity of the data.
Another issue with parallel queries is that they must be used judiciously to avoid saturating the CPUs and network. Because each query generates work for all servers, if many application threads simultaneously perform queries, the total amount of work can grow quadratically and overload the servers or network.
While parallel queries provide a powerful means to identify the objects of interest for further analysis, the key to scalable speedup without performance bottlenecks is to analyze objects in place on the grid servers. Instead of shipping the keys and objects to an application thread for analysis, it is far more efficient to ship the analytics code to the IMDG’s servers for parallel analysis. This minimizes data motion, scales the computation, and ensures that results can be returned as quickly as possible.
When performing a data-parallel computation, a parallel query serves to filter the objects that are submitted for analysis on each grid server. This avoids the performance bottleneck caused by shipping keys and objects across the network. The data-parallel computation also reduces the usage of parallel queries by moving the analysis into the IMDG.
The object-oriented data model simplifies the construction of data-parallel analysis in several ways. First, it partitions the data set into a collection of logically related entities (instances of a type) which can be independently analyzed. These objects form the domain decomposition needed for parallel execution, as discussed in the previous blog. Second, it enables the analysis code to be conveniently represented as a method on the type. Lastly, it enables the results of the parallel analysis to be structured as another collection of objects, usually of a different type, which can be combined or fed to another data-parallel operation (as is the case with Hadoop MapReduce).
Because the IMDG distributes the objects within a collection to all grid servers, it automatically ensures that the domain for data-parallel execution is load-balanced across the cluster. This maximizes overall throughput by balancing the analysis workload across the servers.
For example, consider the financial services example we saw in the previous blog in which the IMDG holds a collection of “strategy” objects for a hedge fund, each of which represents a market sector, such as high tech or real estate, and holds the equity positions and rules for that market sector. A data-parallel computation can independently update each strategy object with a snapshot of market price changes and then evaluate the strategy to determine if stock trades are needed. By performing this analysis in parallel across all strategies, results can be generated in milliseconds instead of several minutes needed by conventional disk-based, sequential analysis.
The following diagram shows how each IMDG server within a cluster can analyze local objects resulting from a parallel query, thereby avoiding data motion across the network. This diagram illustrates the multi-core “parallel method invocation” engine implemented by ScaleOut StateServer Pro. Note that locally selected objects are analyzed using the application’s Analyze method; analysis results then are combined by the Merge method and flow back into the server’s local memory for optional combining across servers.
Each server in the IMDG’s cluster run a portion of the data-parallel computation on locally selected objects, as illustrated below:
To simplify running data-parallel analyses, some IMDGs, such as ScaleOut StateServer Pro, can ship the application code to all grid servers and initialize the execution environment (i.e., start up JVMs or .NET runtimes). In addition, the IMDG’s use of object collections streamlines multiple data-parallel operations. For example, ScaleOut hServer uses the IMDG to run both the Map and Reduce phases as data-parallel operations, staging intermediate data within the IMDG to minimize overall execution time. Object collections also reduce the complexity and overhead of Hadoop’s input and output formats and enable automatic optimizations (e.g., automatically creating splits and partitions).
Object-oriented programming gives IMDGs an efficient and well understood means to hold business logic state, perform queries, and structure data-parallel computations. This allows IMDGs to be seamlessly integrated into operational systems and perform real-time analytics on “live” data, opening up many new opportunities to add value to these systems.
The post How Object-Oriented Programming Simplifies Data-Parallel Analytics appeared first on ScaleOut Software.
]]>The post Creating Data-Parallel Computations for Real-Time Analytics appeared first on ScaleOut Software.
]]>Going back to the 1980s, application developers realized that determining how to partition data so that each portion can be analyzed in parallel, a technique often called “domain decomposition,” is the fundamental design decision in data-parallel programming. Once this is accomplished, the domain can be parceled out among the servers within a computational cluster, and the analysis can proceed in parallel on all servers. The cluster speeds up the computation and also allows more servers to be added as the workload (i.e., the domain) grows in size.
The key to domain decomposition is determining which data within the state of an application to use as the domain for parallel analysis. In many cases, the domain is easy to identify, especially when the application is analyzing a physical entity. For example, in an earlier blog we saw how a climate simulation divides the atmosphere, land, and ocean into a grid of boxes which are analyzed independently at each time interval. Likewise, structural mechanics and fluid dynamics typically use grids that model physical systems.
The process of domain decomposition can be more challenging for applications whose data sets can be partitioned in different ways. Consider the hedge fund example above. Do we analyze the data by partitioning the stock symbols across the servers and then analyzing all strategies which hold a given stock? Alternatively, should we partition the strategies and analyze all positions within the strategy? Is there still another domain decomposition?
The best approach to choose usually is dictated by the need to minimize data motion. If a given decomposition induces data motion at each analysis step, this can kill performance while saturating the network. For example, if the hedge fund analysis partitions stock symbols, it would need to query all strategies affected by the stock symbol to obtain information needed to perform the analysis, and this requires substantial data motion for each stock symbol. (This assumes that the strategies as well as the stock symbols – and, in general, all data sets – are distributed across the cluster of servers performing the data-parallel computation, as is the case for data sets stored within an in-memory data grid (IMDG).
Instead, the computation could partition the strategies across the servers and analyze all strategies in parallel. In this case, it would need to query the latest stock prices for all positions within each strategy; this also induces data motion. In either case, if the strategies and stock prices are stored in separate data sets, unacceptable data motion will be incurred by the data-parallel computation.
However, if the stock prices for all relevant positions are kept up to date within the strategies instead of storing this data elsewhere, no data motion is needed to perform the data-parallel analysis, and maximum performance is achieved. Bearing in mind that multiple strategies maintain positions in a given stock, how can we efficiently update the strategies as price changes flow in from a market feed? The answer lies in leveraging data-parallel computation to both analyze and update the strategies. We can distribute a snapshot of price changes to all strategies at the start of each analysis and use this information to update each strategy’s positions prior to performing the analysis. This easily can be accomplished by tracking all price changes since the last analysis step and efficiently distributing them to the servers as part of the invocation mechanism for the analysis.
The following diagram illustrates how an IMDG can host a set of strategies and perform parallel analysis on the strategies while updating them with a live market feed containing snapshots of price changes. The analysis produces a stream of alerts to the trader (or to an automated trading system) for strategies that need rebalancing. In ScaleOut Analytics Server, the data parallel analysis can be implemented using a feature called parallel method invocation (PMI):
The net effect is that the hedge fund in real time can update its strategies and obtain alerts regarding positions that require rebalancing based on current market conditions. Take a look at this demo of a proof of concept implementation for 2K strategies tracking and analyzing a total of 40K positions using a cluster of four servers. This demo was implemented using a real-time Hadoop MapReduce engine implemented within ScaleOut hServer and delivered alerts within about 330 milliseconds (instead of more than 15 seconds for standard Hadoop).
Consider another example of an e-commerce application that needs to reconcile orders and inventory in real time to avoid a shortfall in inventory. Orders flow into the system for a set of products (“SKUs”), and inventory changes flow into the system from several sources: orders to outside vendors are placed, products arrive at the warehouse, orders from customers are filled, defects are detected, etc. Especially with perishable goods, it’s vital to precisely track order commitments so that orders can be accurately filled (and customers stay happy).
Several different domain decompositions for reconciling orders to inventory could be used. Do we partition based on the orders, the inventory changes, or on some other basis? If we reconcile based on pending orders, we have to query the inventory changes for all items within each order, inducing substantial, performance-killing data motion. Likewise, if we partition the inventory changes, we have to query all orders that include a given inventory item, again inducing data motion.
To solve this problem, we can collect pending orders and inventory changes within their common SKUs and then use the SKUs to form a domain decomposition. As orders come into the system, we update the SKUs for all items within each order to track those orders. Likewise, as inventory changes flow in, we update the SKUs to maintain the latest inventory updates. This continuous process updates the SKUs on a real-time basis in a manner analogous to an ongoing database join operation. Now, a data-parallel analysis of the SKUs can reconcile all relevant orders and inventory changes for that SKU without causing data motion. This enables the cluster to perform a reconciliation across all SKUs in seconds instead of minutes.
The following diagram illustrates an IMDG hosting a collection of data representing SKUs that are being updated by incoming orders and inventory changes. The IMDG performs data-parallel reconciliation (“Eval”) of the SKUs and combines the results (“Merge”) while operations are ongoing:
Note that in this example, we do not use the data-parallel invocation mechanism to perform updates to the application’s state as we did with the hedge fund example. Instead, individual order and inventory updates flow into the application on a continuous basis, and the data-parallel analysis is performed on the state information as it changes. In both cases, the integration of data-parallel analysis within an operational system handling live data demonstrates the value of real-time analytics: analysis results are continuously generated and fed back to the system to optimize its ongoing operations.
Lastly, note that an IMDG serves as an ideal host for both the application’s state and the data-parallel analysis. By storing data in memory and distributing it across a cluster of servers, it can keep up with continuous updates and scale to handle growing workloads (while maintaining high availability) just by adding servers. And it can perform data-parallel analysis on domains of data distributed across the servers, leveraging the IMDG’s automatic load-balancing and avoiding data motion across the network during the analysis.
Interestingly, the object-oriented programming model supported by IMDGs helps us to structure, distribute, identify, query, and analyze domains that we construct for real time analysis. We will explore that in an upcoming blog.
The post Creating Data-Parallel Computations for Real-Time Analytics appeared first on ScaleOut Software.
]]>The post Scaling Real-Time Analytics with an IMDG appeared first on ScaleOut Software.
]]>Scaling out contrasts to the alternative technique of scaling “up” computing power by adding processors or special purpose processing hardware, such as a vector processor or GPU, to a single, shared memory, multiprocessor system. This technique originally gained popularity with mainframe systems prior to the advent of parallel supercomputing. However, mainframe architect Gene Amdahl recognized that the slowest element of a computing system constrains its overall throughput when executing a fixed size problem; he codified this as Amdahl’s Law. For example, if half of the computation time for a problem is consumed by a vector calculation that could be offloaded to an infinitely fast vector processor, the overall computation only runs 2X faster. Even if 90% of the computation time could be accelerated in this manner, the computation speeds up by no more than 10X.
Scaling out solves this problem by avoiding it. Scaling out adds servers to a cluster so that it can handle larger problem sizes and continue to increase throughput without increasing computation time (as long as the overheads do not grow faster than the number of servers!). This benefit was first observed independently by Cleve Moler and John Gustafson and codified as Gustafson’s Law of scalable speedup. When scaling out, the goal changes from trying to reduce the computation time for a fixed size problem to maintaining the same computation time as the workload increases in size. This nicely matches the needs of system architects who need to scale their systems to keep up with fast growing workloads.
Scaling out enables in-memory data grids to scale both in-memory data storage capacity and computing power so that they can perform real-time analytics on fast changing data. For example, consider the basic function of an IMDG to let client applications seamlessly access memory-based objects distributed across a cluster of servers. To read an object stored in an IMDG, a client application calls an API which makes a network request to an IMDG server. Assuming the client library can keep track of which server holds the specified object, this request can be satisfied with a single network round trip, consuming a small amount of CPU and NIC bandwidth on the server. As more requests arrive at the server, they consume increasing amounts of CPU and NIC bandwidth until these resources are maxed out.
This is where scalable speedup takes over. Even though the overall request rate that a server can handle is constrained by its available resources, adding a second server doubles the request rate without increasing response time. Likewise, adding more servers to the cluster linearly increases the throughput while maintaining fixed response times until the cluster’s network infrastructure eventually is saturated. Scaling out enables IMDGs to handle growing workloads just by adding servers.
Storing and updating objects in an IMDG requires more network bandwidth than accessing them. To maintain high availability, IMDGs store one or more copies of every object on additional servers so that if a server or network connection fails, data can be accessed from an alternative server within the cluster. So, at a minimum, an update request requires twice as much network bandwidth as an access request. As a result, an update-heavy workload tends to saturate the NICs and network infrastructure faster than an access-heavy workload. However, in both cases, the overhead is still proportional to the request rate and also proportional to the number of servers in the cluster. This means that we can expect linear throughput growth as we add IMDG servers until the network itself is saturated. (At that point, we need a faster network, such as 10 Gbps Ethernet or InfiniBand.)
It’s easy to see the importance of the IMDG’s automatic, dynamic load balancing of stored objects in maintaining scalable speedup. This mechanism evenly distributes the workload across all servers within the IMDG’s cluster to make sure that maximum throughput is obtained, avoiding “hot spots” which would direct too many requests to a few servers and limit throughput.
Maintaining Scalable Speedup in Analytics Computations
What happens if we create a workload that induces network overhead which grows faster than the number of servers? This problem was often encountered in parallel supercomputing and gave rise to the aphorism “embarrassingly parallel” for applications which minimize communication overhead. It is just as relevant today: real-time analytics applications must make sure this problem does not occur and kill scalable speedup as the workload grows.
For example, consider a data analytics computation distributed across a set of clients, one per server in an IMDG cluster with N servers, with the client application running on the grid servers. Also assume each server’s computation must access and analyze N or more objects. This makes the total number of accesses required to perform the computation proportional to N*N. If these objects are randomly distributed within the IMDG, the network overhead to process this workload will grow as N-squared and quickly saturate the network, drastically limiting scalable speedup. A “task-parallel” application which parcels out tasks to the clients (one task per object) to complete this analysis could be expected to exhibit this behavior since it randomly selects objects in the IMDG.
However, if we arrange to perform the same analysis as a “data-parallel” computation in which each client only analyzes the objects which are located on the same servers as the client, we can avoid data motion and thereby eliminate order-N-squared network overhead (although we still have order-N inter-process communication overhead between the IMDG’s service process and clients). This is accomplished by letting the IMDG perform the analysis tasks in parallel across all cluster servers on local objects within each server. (In the same manner, Hadoop MapReduce task trackers perform map operations on co-located “splits” within HDFS.)
ScaleOut StateServer Pro’s “parallel method invocation” (PMI) feature implements a data-parallel computation model. This feature lets the user specify a Java or C# method to analyze an object and another method to combine the results of two analyses. (These are analogous to a Hadoop mapper and combiner except that, unlike Hadoop, PMI automatically combines results across all servers into a single result object.) The user also specifies a parallel query to select the objects to be analyzed. To run a PMI, the IMDG pre-stages the code on all grid servers, performs the parallel query, and then analyzes all selected objects in place without moving them across the network. Lastly, it combines the results on each server and then combines the results across the servers using a binary combining tree which minimizes execution time.
The performance benefits of the data-parallel approach are dramatic. To illustrate this, we captured performance measurements of a risk analysis computation in financial services called “back testing.” This analysis compares a variety of stock trading algorithms using recorded price histories for a collection of equities. Each price history was stored in a single object within the IMDG, and the clients were assigned equities to analyze. (Note that the IMDG’s object-oriented storage also could dynamically update the price histories from a ticker feed to enable real-time feedback to a trading system.)
We compared the task-parallel technique in which the clients analyzed a random set of equities to the data-parallel technique in which the clients only examined equities stored on the same server; the data-parallel technique was implemented using PMI. The following chart shows the throughput obtained for both the task-parallel and data-parallel techniques. Note how the data-parallel approach (red line) maintains linear performance scaling as the workload increases and IMDG servers are added to the cluster. In contrast, the task-parallel approach (blue line) fails to achieve performance scaling due to accessing objects from remote servers which creates substantial networking overhead.
IMDGs take full advantage of scalable speedup to scale their handling of access requests and to perform data-parallel computations. This enables them to run real-time analytics on fast-changing data held in the IMDG and maintain fast response time for large workloads. Unlike pure streaming systems, they combine the IMDG’s in-memory storage with scalable computation to implement complex applications in real-time analytics. We will explore some of these applications in an upcoming blog.
The post Scaling Real-Time Analytics with an IMDG appeared first on ScaleOut Software.
]]>The post IMDGs: Next Generation Parallel Supercomputers appeared first on ScaleOut Software.
]]>Back in the 1980s, IBM, Intel, and nCube (among others) began commercializing parallel computing (“multicomputing”) technology pioneered by professors Charles Seitz and Geoffrey Fox at Caltech. They recognized that commodity servers could be clustered using a high speed network to run parallel programs which deliver highly scalable performance well beyond the power of shared memory multiprocessing servers. With the development of message passing libraries, these multicomputers were programmed using C and Fortran to implement parallel applications in matrix algebra, structural mechanics, fluid dynamics, distributed simulation, and many other areas.
While this multicomputing architecture had the potential to deliver very high scalability, it introduced several challenges. Chief among them was hiding network overhead and latency which could easily dominate processing time and impede scalability. Hardware architects developed novel high speed networks, such as Bill Dally’s pipelined torus and mesh routers, to minimize message passing latency. (Standard 10 Mbps Ethernet LANs of the 1980s were quickly determined to be too slow for use in multicomputers.)
However, to really deliver scalable performance, Cleve Moler (the creator of Matlab, then working at Intel)– and, independently, John Gustafson at Sandia Labs – recognized that scaling the size of an application (e.g., the size of a matrix being multiplied) as more servers are added to the cluster helps mask networking overhead and enable linear growth in performance; this is called Gustafson’s Law. At first glance, this insight might seem counter-intuitive since one expects that adding computing power will speed up processing for a fixed size application. (See Amdahl’s Law.) But adding servers to a computing cluster to handle larger problem sizes actually is very natural: for example, think about adding web servers to a farm as a site’s incoming web load grows.
The daunting complexity inherent in the creation of parallel programs with message passing posed another big obstacle for multicomputers. It became clear that just adding message passing APIs to “dusty deck” applications could easily lead to frustrating and inscrutable deadlocks. Developers realized that higher level design patterns were needed; two that emerged were the “task parallel” and “data parallel” approaches. Data-parallel programming is by far the simpler of the two, since the developer need not write application-specific synchronization code, which can be complex and error prone. Instead, the multicomputer executes a single, sequential method on a collection of data that has been distributed across the servers in the cluster. This code automatically runs in parallel across all servers to deliver scalable performance. (Of course, message passing may be needed between execution steps to exchange data between parts of the application.)
For example, consider a climate simulation model such as NCAR’s Community Climate Model. Climate models typically partition the atmosphere, land, and oceans into a grid of boxes and model each box independently using a sequential code. They repeatedly simulate each box’s behavior and exchange data between boxes at every time step in the simulation. Using a multicomputer, the boxes all can be held in memory and distributed across the servers in the cluster, thereby avoiding disk I/O which impedes performance. The cluster can be scaled to hold larger models with more boxes to improve resolution and generate more accurate results. The multicomputer provides scalable performance, and it runs data-parallel applications to help keep development as simple as possible.
So what does all this have to do with in-memory data grids? IMDGs make use of the same parallel computing architecture as multicomputers. They host service processes on a clustered set of servers to hold application data which they spread across the servers. This data is stored as one or more collections of serialized objects, such as instances of Java, C#, or C++ objects, and accessed using simple create/read/update/delete (“CRUD”) APIs. As the data set grows in size, more servers can be added to the cluster to ensure that all data is held in memory and access throughput grows linearly.
By doing all of this, IMDGs keep access times constant, which is exactly the characteristic needed by applications which have to handle growing workloads. For example, consider a website holding shopping carts in an IMDG. As more and more customers are attracted to the site, web servers must be added to handle increasing traffic. Likewise, IMDG servers must be added to hold more shopping carts, scale access throughput, and keep response times low. In a real sense, the IMDG serves as a parallel supercomputer for hosting application data, delivering the same benefits as it does for climate models and other scientific applications.
However, the IMDG’s relationship to parallel supercomputers runs deeper than this. Some IMDGs can host data-parallel applications to update and analyze data stored on the grid’s servers. For example, ScaleOut Analytics Server uses its “parallel method invocation” (PMI) APIs to run Java, C#, or C++ methods on a collection of objects specified by a parallel query. It also uses this mechanism to execute Hadoop MapReduce applications with very low latency. In this way, the IMDG serves as a parallel supercomputer by directly running data-parallel applications. These applications can implement real-time analytics on live data, such as analyzing the effect of market fluctuations on a hedge fund’s financial holdings (more on that in an upcoming blog).
IMDGs bring parallel supercomputing to the next generation in significant ways. Unlike multicomputers, they can be deployed on cloud infrastructures to take full advantage of the cloud’s elasticity. They host an object-oriented data storage model with property-based query that integrates seamlessly into the business logic of object-oriented applications. IMDGs automatically load balance stored data across all grid servers, ensuring scalable speedup and relieving the developer of this burden. They provide built-in high availability to ensure that both data and the results of a parallel computation are not lost if a server or network component fails. Lastly, they can ship code from the developer’s workstation to the grid’s servers and automatically stage the execution environment (e.g., a JVM or .NET runtime on every grid server) to simplify deployment.
Although they share a common heritage, IMDGs are not your parent’s parallel supercomputer. They represent the next generation in parallel computing: easily deployable in the cloud, object-oriented, elastic, highly available, and powerful enough to run data-parallel applications and deliver real-time results.
The post IMDGs: Next Generation Parallel Supercomputers appeared first on ScaleOut Software.
]]>