- The Dempsy Project
- Overview
- Prerequisites
- Simple Example - Word Count
- Terminology
- Developing Applications With Dempsy
- Deployment and Configuration
If you're already familiar with real-time stream based BigData engines, the following list of features will distinguish Dempsy from the others:
- Fine grained "actor model": Dempsy provides for the fine grained distribution and lifecycle management of (potentially) millions of "actors" ("message processors" in Dempsy parlance) across a large cluster of machines allowing developers to write code that concentrates on handling individual data points in a stream without any concern for concurrency.
- Inversion of control programming paradigm: Dempsy allows developers to construct these large-scale processing applications decoupled from all infrastructure concerns providing a means for simple and testable POJO implementations.
- Dynamic Topology Autoconfiguration: Dempsy doesn't require the prior definition of a topology for the stream processing. Not only does it dynamically discover the topology, but an application's topology can morph at runtime. Therefore, there's no topology defining DSL or configuration necessary.
- Integrates with available IAAS and PAAS tools: Dempsy doesn't pretend to be an application server or a batch processing system and therefore the provisioning, deployment and management of nodes in an application are provided by cloud management tools already part of your DevOps infrastructure. It fully integrates with various monitoring solutions including: JMX, Graphite, and Ganglia.
- Fully elastic: Expanding Dempsy's cooperation with existing IAAS/PAAS tools, elasticity allows for the dynamic provisioning and deprovisioning of nodes of the application at runtime. This allows for optimizing the use of computational resources over time (Currently available in the trunk revision).
In a nutshell, Dempsy is a framework that provides for the easy implementation Stream-based, Real-time, BigData applications.
Dempsy was originally developed at Nokia for processing streaming GPS data from vehicles in its vehicle traffic products for its "HERE" division. It stands for "Distributed Elastic Message Processing System."
- Dempsy is Distributed. That is to say a dempsy application can run on multiple JVMs on multiple physical machines.
- Dempsy is Elastic. That is, it is relatively simple to scale an application to more (or fewer) nodes. This does not require code or configuration changes but allows the dynamic insertion and removal of processing nodes.
- Dempsy is Message Processing. Dempsy fundamentally works by message passing. It moves messages between Message processors, which act on the messages to perform simple atomic operations such as enrichment, transformation, or other processing. Generally an application is intended to be broken down into more smaller simpler processors rather than fewer large complex processors.
- Dempsy is a Framework. It is not an application container like a J2EE container, nor a simple library. Instead, like the Spring Framework it is a collection of patterns, the libraries to enable those patterns, and the interfaces one must implement to use those libraries to implement the patterns.
Dempsy is not designed to be a general purpose framework, but is intended to solve two specific classes of "stream processing" problems while encouraging the use of the best software development practices. These two classes include:
Dempsy can be used to solve the problem of processing large amounts of "near real time" stream data with the lowest lag possible; problems where latency is more important that "guaranteed delivery." This class of problems includes use cases such as:
- Real time monitoring of large distributed systems
- Processing complete rich streams of social networking data
- Real time analytics on log information generated from widely distributed systems
- Statistical analytics on real-time vehicle traffic information on a global basis
It is meant to provide developers with a tool that allows them to solve these problems in a simple straightforward manner by allowing them to concentrate on the analytics themselves rather than the infrastructure. Dempsy heavily emphasizes "separation of concerns" through "dependency injection" and out of the box supports both Spring and Guice. It does all of this by supporting what can be (almost) described as a "distributed actor model."
In short Dempsy is a framework to enable decomposing a large class of message processing applications into flows of messages to relatively simple processing units implemented as POJOs
Important features of Dempsy include:
- Built to support an “inversion of control” programming paradigm, making it extremely easy to use, and resulting in smaller and easier to test application codebases, reducing the total cost of ownership of applications that employ Dempsy.
- Fine grained message processing allows the developer to decompose complex analytics into simple small steps.
- Invisible Scalability. While Dempsy is completely horizontally scalable and multithreaded, the development paradigm supported by Dempsy removes all scalability and threading concerns from the application developer.
- Dynamic topologies. There is no need to hard wire application stages into a configuration or into the code. Topologies (by default) are discovered at run-time.
- Full elasticity (for May). Dempsy can handle the dynamic provisioning and decommissioning of computational resources.
- Fault Tolerant. While Dempsy doesn't manage the application state in the case of a failure, its elasticity provides fault tolerance by automatically rebalancing the load among the remaining available servers. It cooperates with IaaS auto-scaling tools by rebalancing when more servers are added.
Dempsy is intentionally not an “Application Server” and runs in a completely distributed manner relying on Apache ZooKeeper for coordination. In sticking with one of the development teams guiding principles, it doesn’t try to solve problems well solved in other parts of the industry. As DevOps tools become the norm in cloud computing environments, where it’s “easier to re-provision that to repair,” Dempsy defers to such systems the management of computational resources, however, being fully elastic (May 2012), it cooperates to produce true dynamic fault tolerance.
Dempsy has been described as a distributed actor framework. While not strictly speaking an actor framework in the sense of Erlang or Akka actors, in that actors typically direct messages directly to other actors, the Message Processors in Dempsy are "actor like POJOs" similar to "Processor Elements" in S4 and less so like Bolts in Storm. Message processors are similar to actors in that Message processors act on a single message at a time, and need not deal with concurrency directly. Unlike actors, Message Processors also are relieved of the the need to know the destination(s) for their output messages, as this is handled inside Dempsy itself.
The Actor model is an approach to concurrent programming that has the following features:
- Fine-grained processing
A traditional (linear) programming model processes input sequentially, maintaining whatever state is needed to represent the entire input space. In an "Fine Grained Actor" model, input is divided into messages and distributed to a large number of independent actors. An individual actor maintains only the state needed to process the messages that it receives.
- Shared-Nothing
Each actor maintains its own state, and does not expose that state to any other actor. This eliminates concurrency bottlenecks and the potential for deadlocks. Immutable state may be shared between actors.
- Message-Passing
Actors communicate by sending immutable messages to one-another. Each message has a key, and the framework is responsible for directing the message to the actor responsible for that key.
A distributed actor model takes an additional step, of allowing actors to exist on multiple nodes in a cluster, and supporting communication of messages between nodes. It adds the following complexities to the Actor model:
- Distribution of Messages
A message may or may not be consumed by an actor residing in the same JVM as the actor that sent the message. The required network communication will add delay to processing, and require physical network configuration to support bandwidth requirements and minimize impact to other consumers.
- Load-Balancing
The framework must distribute work evenly between nodes, potentially using different strategies for different message types (eg: regional grouping for map-matcher, simple round-robin for vehicles).
- Node Failure
If a node fails, the workload on that node must be shifted to other nodes. All state maintained by actors on the failed node is presumed lost.
- Network Partition
If the network connection to a node temporarily drops, it will appear as a node failure to other nodes in the cluster. The node must itself recognize that it is no longer part of the cluster, and its actors must stop sending messages (which may conflict with those sent by the cluster's "replacement" node).
- Node Addition
To support elastic scalability (adding nodes on demand to service load, as well as re-integration of a previously failed node), the framework must support redistribution of actors and their state based on changes to the cluster.
Dempsy is not any flavor of "Complex Event Processing" (nor other names CEP may be known as, such as "Event Processing System," etc.). It does not support the ability to query streams with all of the attending bells-and-whistles.
These systems have their place and if your particular use case has need of their features, then Dempsy is not the right tool for you. Dempsy aims at satisfying the particular class of use-cases in stream processing that don't require the complexity of these systems in a manner that makes these applications much easier to build and maintain.
We believe there is a large number of use-cases that Dempsy fits, but there is a large number of use-cases it does not.
Above all, and in many ways, Dempsy is meant to be SIMPLE. It doesn't try to be the solution for every problem. It tries to do one thing well and it is meant to support developers that think this way. Dempsy is built emphasizing, and built to emphasize several interrelated principles. These principles are meant to reduce the longer term total cost of ownership of the software written using Dempsy. These include:
-
Separation of Concerns (SoC) - Dempsy expects the developer to be able to concentrate on writing the analytics and business logic with (virtually) no consideration for framework or infrastructure.
-
Decoupling - SoC provides the means to isolate cross-cutting concerns so that code written for Dempsy will have little to no (with due respect to annotations) dependence on even the framework itself. Developer's code can be easily run separate from the framework and, in the spirit of Dependency Injection, the framework uses the developer's code rather than the developer having to use the framework. This type of decoupling provides for analytics/business code that has no infrastructure concerns: no framework dependencies, no messaging code, no threading code, etc.
-
Testability - All of this provides a basis to write code that's more testable.
-
"Do one thing well" - Dempsy is written to provide one service: support for the type of "Distributed Actor Model" (with all of the acknowledged caveats) programming paradigm. For this reason it does not pretend to be an Application Server. Nor does it substitute for the lack of an automated provistioning/deployment system.
CEP is really trying to solve a different problem. If you have a large stream of data you want to mine by separating it into subsets and executing different analytics on each subset (which can including ignoring entire subsets), then CEP solutions make sense. If, however, you’re going to do the same thing to every message then you will be underutilizing the power of CEP. Underutilized functionality usually means an increased total cost of ownership, and Dempsy is ALL ABOUT reducing the total cost of ownership for systems that do this type of processing.
There are several pure "Actor Model" frameworks and languages that have been posed as alternatives for Dempsy. Dempsy is not a pure actor model and primarily solves a different problem. As described above Dempsy is primarily a routing mechanism for messages for "fine grained" actors. The reason we still (though loosely) call it an "actor model" is because Dempsy supports concurrency the way a typical Actor Model does.
Dempsy emphasizes reducing the total cost of ownership of real-time analytics applications and as a direct result we feel it has some advantages over the alternatives.
First, as mentioned, Dempsy supports “fine grained” message processing. Because of this, by writing parallel use-cases in Dempsy and alternatives that don't support this programming model, we find that Dempsy leads to a lower code-line count.
Also, because of Dempsy’s emphasis on “Inversion of Control” the resulting applications are more easily testable. With the exception of annotations, Message Processors, which are the atomic unit of work in Dempsy, have no dependency on the framework itself. Every alternative we've found requires that the application be written against and written to use that framework.
Also, in following the adage to never require the system to be told something that it can deduce, the topology of a Dempsy application’s pipeline is discovered at runtime and doesn’t need to be preconfigured. This is primarily a by-product of the fact that Dempsy was designed from the ground up to be “elastic” and as a result, the topology can morph dynamically.
This means that applications with complicated topologies with many branches and merges can be trivially configured since the dependency relationship between stages is discovered by the framework.
You will need Java 11 (or higher).
To build an application against Dempsy you will need to add the Dempsy dependencies to your build. This should be as simple as including the following dependency in your maven pom.xml
file (or the gradle equivalent).
<dependency>
<groupId>net.dempsy</groupId>
<artifactId>dempsy-framework.impl</artifactId>
<version>0.16</version>
</dependency>
The core Dempsy jar files are deployed to the Maven Central Repository. See the section on Understanding the Dempsy Codebase for a description of each of the artifacts.
You can build Dempsy applications and run them within a single VM. Once you want to run your application distributed you will need an installation of Apache ZooKeeper. In a development environment, or if you are just trying out Dempsy, then you can get away with a single ZooKeeper server but we suggest following the recommendations of the ZooKeeper team.
The Dempsy Examples repository has several versions of the WordCount example. You can find the final version of this tutorial's code in the userguide-wordcount sub-project.
In order to understand how to use Dempsy you need to understand how a Dempsy application is structured and the best way to see this is through a simple example. The "Hello World" of the BigData applications seems to be the "word counter." If you're familiar with Hadoop then you probably started with the WordCount example. In this simple example let's suppose we have a source of words from some text. In traditional batch based BigData systems the source for these words would be a file or perhaps already partitioned across some distributed storage. In a Dempsy application we're receiving these words in real-time through a data stream. Maybe we're getting all of the live data from a large social media application and we want to calculate the histogram of word occurrences.
What would the Dempsy application that accomplishes the above described feat look like? Imagine that each word from this hypothetical live stream of text is broken into its own message. Each of these messages is routed to an instance of a class (let's call that instance a message processor) that has the responsibility to keep track of the count for a single word. That is, there is an instance of Word Counting message processor, per distinct word. For example, every time the word "Google" flows through the stream it's routed to the same message processor (the one dedicated to the word "Google"). Every time the word "is" is encountered, it's routed to another message processor instance. And likewise with the word "Skynet."
How easy would it be to write the code for that message processor class? This way of looking at the problem makes the code fairly simple. It could be as simple as this:
class WordCount {
private long count = 0;
public void countWord(String word) {
count++;
}
}
Notice, we write the message processor in such a way that we assume each instance is responsible for a single word and that in the larger application there will be a many instances, each operating on their piece of the stream. These instances can be (usually are) spread out over a large number of machines.
Of course, what's missing? How does each word
get to its respective message processor? How are the WordCount
instances instantiated, deleted, provided their word
message? Where are they instantiated? What about synchronization? What about sending out messages? All of these are the primary responsibility of Dempsy.
At this point we have a nice little piece of POJO functionality completely unmoored from infrastructural concerns. Let's look at how Dempsy handles some of these concerns. As mentioned, one of Dempsy's primary responsibilities is, given a message, find the message processor responsible for handling that message. Now that we have a POJO that accomplishes some business functionality we need to tell Dempsy how they are to be addressed, that is, how Dempsy is to find which message processor is responsible for which messages.
The way Dempsy does this is through the use of a message key. Each message that flows through Dempsy needs to have a message key. Dempsy is (optionally) annotation driven, so classes that represent messages need to identify a means of obtaining the message's MessageKey through the use of an annotation. An important concept to grasp here is that a MessageKey
is essentially the address of an individual message processor instance.
In our example, each word is a message.
import net.dempsy.lifecycle.annotation.MessageKey;
@MessageType
public class Word implements Serializable {
private final String wordText;
public Word(final String data) {
this.wordText = data;
}
@MessageKey
public String getWordText() {
return this.wordText;
}
}
So when Dempsy receives a message of type Word
, it retrieves the key using the annotated method getWordText()
. That key will become the address of a message processor somewhere on the system. Dempsy will find the message processor instance (in this case an instance of the class WordCount
) within a cluster of nodes responsible for running the WordCount
message processing. In the case that the instance doesn't already exist, Dempsy will clone()
a WordCount
instance prototype.
Note: the annotation @MessageType is used to tell Dempsy that the class name is meant to be used as a "message type" in the system. Please see the user guide for a more detailed description.
If you're paying attention you might notice there's two gaps that need to be filled in the WordCount
implementation. First, how is it that Dempsy understands that the WordCount
handles the Word
message, and second, how is a WordCount
prototype cloned (notice the existing WordCount
class cannot (yet) simply be cloned()
).
This requires us to revisit the WordCount
implementation. We need to do several things to satisfy Dempsy:
- We need to identify the
WordCount
class as an Mp which is done with a class annotation - We need to identify the
WordCount.countWord()
call as the point where the Mp handles the message by annotating it with the@MessageHandler
annotation. - We need to make sure
WordCount
is Cloneable.
This would be accomplished by the following:
import net.dempsy.lifecycle.annotation.Activation;
import net.dempsy.lifecycle.annotation.MessageHandler;
import net.dempsy.lifecycle.annotation.Mp;
import net.dempsy.lifecycle.annotation.Output;
@Mp
public class WordCount implements Cloneable {
private long count = 0;
@MessageHandler
public void countWord(final Word word) {
count++;
}
@Override
public Object clone() throws CloneNotSupportedException {
return super.clone();
}
}
The framework now has enough information that it can understand:
- How to create instances of the
WordCount
message processor given an already existing instance it will use as a prototype. - That instances of
WordCount
handle messages of typeWord
using theWordCount.countWord()
method. - That the key for a given
Word
message, which represents the "address" of a uniqueWordCount
instance to which theWord
message should be routed, is provided by a call on the message instance,Word.getWordText()
.
Note: Currently the default implementation for the Serializer is Kryo. This at least requires messages to have a default constructor defined (though Kryo supports private default construtor invocations under the right circumstances. See the Kryo documentation for more information). Also, Dempsy can be configured with the standard Java Serializer (though I'm not sure why anyone would ever want to do this).
a word about the message key
It is critical that the Object that Dempsy obtains as the message key (in the example that would be the result of the call to Word.getWordText()
) has the appropriate identity semantics. In all cases that means there needs to be a non-default equals()
and hashCode()
method. The reason for this is partially very obvious: a "unique" message key corresponds to an instance of a message processor so it's important to get the understanding of "unique" correct. The default Object
behavior is not adequate. Think of Dempsy as using the key as if it were a key in a HashMap that contained all of the current message processor instances. The default implementation of Object.equals
and Object.hashCode
wouldn't work given multiple instantiations of the same Word
.
But this is not all. Given that instances of a message processor are distributed across many nodes, the default routing behavior of Dempsy uses the hashCode()
as a means of determining which node a particular message processor is running on. Therefore, while strictly speaking most Java applications would work (though very poorly) if, for example, the hashCode()
method were implemented to simply return 1
, this would cause ALL message processors to be instantiate on the same node of a cluster.
In the example, the MessageKey
is a java.lang.String
which has appropriate identity semantics. Note: The mesage key is not restricted to only String type but to any type that is hashable.
So where do Word
messages come from and how do they get to Dempsy in order to be routed to the appropriate WordCount
message processor? Dempsy provides an interface that needs to be implemented by the application developer in order to adapt sources of stream data to the Dempsy message bus. An Adaptor
implementation:
- will be given a handle to the Dempsy message bus through an interface called a
Dispatcher
. - will need to obtain data from an external source and use the
Dispatcher
to send that data onto Dempsy
The API for an Adaptor
is very simple so we will extend the Word Count example with the following class:
...
import net.dempsy.messages.Adaptor;
import net.dempsy.messages.Dispatcher;
public class WordAdaptor implements Adaptor {
private Dispatcher dempsy;
private AtomicBoolean running = new AtomicBoolean(false);
/**
* This method is called by the framework to provide a handle to the
* Dempsy message bus. It's called prior to start()
*/
@Override
public void setDispatcher(final Dispatcher dispatcher) {
this.dempsy = dispatcher;
}
@Override
public void start() {
// ... set up the source for the words.
running.set(true);
while (running.get()) {
// obtain data from an external source
final String wordString = getNextWordFromSoucre();
if (wordString == null)
running.set(false);
else {
try {
dempsy.dispatchAnnotated(new Word(wordString));
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e); // This will stop the flow of Words from this adaptor.
// Optimally you'd like to recover and keep going.
}
}
}
}
@Override
public void stop() {
running.set(false);
}
private String getNextWordFromSoucre() {
... // get the next word to put on the processing stream
}
}
When the WordAdaptor
is registered with Dempsy, it will be provided a handle to a Dispatcher
. Then Adaptor.start()
will be called. The application developer is responsible for creating Dempsy compliant messages (as described above, a message should be Serializable
by whatever serialization technique is chosen (Kryo by default), and have a means of obtaining a MessageKey
identified) using data from an external source.
Notice the lifecycle. The start()
is called from the framework but it never exits. If it ever does exit, it will not be called again without restarting the node that the Adaptor
was instantiated in. Note: It's very important that you manage this. You are allowed to exit the start()
method whenever you want, either because the Adaptor
is finished (if such a case exists) or because you decided to do the work in another thread (or many other threads) but Dempsy will not re-invoke the start()
method.
Dempsy will invoke the stop()
method to shut down the Adaptor when the node shuts down. Well behaved Adaptors must return from start()
at this time, if they had not done so previously. Not doing so will hang the Vm on exit since, by default, the Adaptor is run in a non-daemon thread (though this is a configurable option for ill-behaved Adaptors).
The examples that follow will show how to do the configuration by hand which means the translation to any DI container should be obvious to the users of those containers. We will also include Spring examples.
At this point we should begin to have an understanding of what a Dempsy application is. It's a series of instances of message processors across a number of compute nodes, being routed messages based on their keys, and being supplied message data by Adaptors
. The configuration of an application is simply a formalization of these specifics with specific infrastructure selections. To define (configure) the Word Count application we've been walking through, we need to simply lay out the specifics. Doing this programatically we would have:
import java.util.concurrent.ArrayBlockingQueue;
import net.dempsy.NodeManager;
import net.dempsy.cluster.ClusterInfoException;
import net.dempsy.cluster.local.LocalClusterSessionFactory;
import net.dempsy.config.Cluster;
import net.dempsy.config.Node;
import net.dempsy.lifecycle.annotation.MessageProcessor;
import net.dempsy.monitoring.dummy.DummyNodeStatsCollector;
import net.dempsy.transport.blockingqueue.BlockingQueueReceiver;
public class Main {
public static void main(final String[] args) throws ClusterInfoException {
@SuppressWarnings("resource")
final NodeManager nodeManager = new NodeManager() // 1
.node(new Node("word-count").setClusters(
new Cluster("adaptor")
.adaptor(new WordAdaptor()),
new Cluster("counter")
.mp(new MessageProcessor<WordCount>(new WordCount()))
.routing("net.dempsy.router.managed"))
.receiver(new BlockingQueueReceiver(new ArrayBlockingQueue<>(100000)))
.nodeStatsCollector(new DummyNodeStatsCollector())) // this will be defaulted in 0.9.1
.collaborator(new LocalClusterSessionFactory().createSession());
nodeManager.start();
System.out.println("Exiting Main");
}
}
Notice what we are NOT doing here. We are NOT defining the topology of the Dempsy application. The order the C
lusters are listed in makes no difference. In this above case we've decided to run both our Adaptor
and our message processor on the Node
.
The above example has all "in process" infrastructure chosen. The LocalClusterSessionFactory
and the BlockingQueueReceiver
will not span processes. This can be remedied by selecting the ZookeeperSessionFactory
and the NioReceiver
instead.
Note also, once the distributed infrastructure is chosen (ZookeeperSessionFactory
, NioReceiver
) we can run the Adaptor
on one set of nodes and the Word Processor on another. In that case you'd have 2 different Main classes (or if you're configuring with Spring, two different Spring contexts). One that starts the Node
with just the Adaptor
and one that started it with just the WordCount
message processor. The topology is implicit in the business class definitions (Word
, WordCount
).
The application, called "word-count," consists of two clusters, "adaptor" and "mp," the first of which contains our Adaptor which, as we have seen, sources Word
messages. This is followed by a message processor whose prototype is an instance of WordCount
.
Although messages coming from the WordAdaptor
flow to the WordCount
message processor, the order in the definition doesn't actually matter. Dempsy determines where messages are sent based on the type of the message and the type of object that the MessageHandler
on the MessageProcessor
takes. In the case of our example, when the WordAdaptor
adaptor produces a message of type Word
, Dempsy knows that message can be handled by the WordCount
message processor because the method WordCount.countWord()
(which is annotated with the @MessageHandler
annotation) takes the type Word
. If there are other message processors that also have handlers that take a Word
the messages will be routed to the appropriate message processor within those clusters also.
What do we do with the ApplicationDefinition
? That depends on the Dependency Injection framework you're using. If using either Spring or Guice you don't need to do much else to run your application. If you're using a different dependency injection container then you'll need to obtain a reference to the Dempsy
object and give it the ApplicationDefinition
, but this is a more advanced topic for a later section. Moving forward we will show you how the Spring implementation works.
The above application definition could be defined using Spring as follows:
<beans>
<bean class="com.nokia.dempsy.config.ApplicationDefinition">
<constructor-arg value="word-count" />
<property name="clusterDefinitions">
<list>
<bean class="com.nokia.dempsy.config.ClusterDefinition">
<constructor-arg value="adaptor"/>
<property name="adaptor">
<bean class="com.nokia.dempsy.example.userguide.wordcount.WordAdaptor" />
</property>
</bean>
<bean class="com.nokia.dempsy.config.ClusterDefinition">
<constructor-arg value="mp"/>
<property name="messageProcessorPrototype">
<bean class="com.nokia.dempsy.example.userguide.wordcount.WordCount"/>
</property>
</bean>
</list>
</property>
</bean>
</beans>
By default there are two modes that you can run the Dempsy application in. It can be run all within a local Java VM. Or it can be run distributed on a set of machines. For the purposes of this tutorial we will demonstrate how to run it in a local Java VM. This would be easy to set up in an IDE like Eclipse.
There are several "main" implementations provided depending on what mode you're running a Dempsy application in, as well as which DI framework you're using. As a matter of fact the only place any particular DI container is assumed is in these supplied "main" applications so adding other currently unsupported DI containers is straightforward.
To run your application using the Spring container on the command line you would use:
java -Dapplication=WordCount.xml -cp [classpath] com.nokia.dempsy.spring.RunAppInVm
Your classpath will need to contain all of the main Dempsy artifacts plus the Dempsy Spring library. See the section on the codebase structure for more information.
Having gone through the example we should codify some of the terminology that was developed and will continue to be used throughout this document as well as throughout the Dempsy code.
Term | Definition |
---|---|
message processor | an instance of a cloned message processor prototype responsible for processing every message of a particular type with a particular unique key. |
message processor prototype | an instance used by Dempsy to serve as a template when it needs to create more instances of a message processor. |
message | is an object that Dempsy routes to a message processor based on the message's key. |
key | obtained from a message using the method on the message object that's annotated with the `@MessageKey` annotation. Each unique key addresses an individual message processor instance in a cluster |
cluster | a cluster is the collection of all message processors or adaptors of a common type in the same stage of processing of a Dempsy application. A cluster contains a complete set of potential message processors keyed by all of the potential keys from a particular message type.That is, a cluster of message processor instances covers the entire key-space of a message. |
node | a node is a subset of a set of clusters containing a portion of the each cluster's message processors. nodes are almost always (except in some possible test situations) the intersection of a cluster and a Java process. That is, the portion of a cluster that's running in a particular process is the cluster's node |
container | Sometimes also referred to as a message processor container, it is the part of the Dempsy infrastructure that manages the lifecycle of message processors within an individual node. That is, there is a one-to-one between the portion of a cluster running in a particular node, and a container. |
Dempsy recognizes three types of developers and the codebase is organized accordingly.
1)"Application developers" are those that are only concerned with building real-time stream based analytics.
-
However, Dempsy is built on a set of abstractions that allow it to be extended with new transports, routing strategies, monitoring techniques, as well as others. Developers interested in adding new implementations or techniques are "framework developers."
-
Any developer changing the existing framework or any of the default implementations of the core abstractions are the Dempsy contributors.
The Dempsy codebase is broken into jar
artifacts that reflect this understanding of the developer community. These artifacts are served from the Maven Central Repository and so should be accessible to any modern build tool (maven, gradle, ivy). The core codebase is broken into these three layers:
- The Application Developer Api (lib-dempsyapi). The jar artifact lib-dempsyapi contains all of the annotations, interfaces, exceptions and configuration classes, required for the application developer. To use it your build system should refer to:
<groupId>net.dempsy</groupId>
<artifactId>lib-dempsyapi</artifactId>
- The Dempsy Framework Api (lib-dempsycore). The jar artifact lib-dempsycore contains the the set of core abstractions that Dempsy itself is built on internally. It's meant as an api for those that want to extend the framework itself. For example, if someone doesn't like the current message transport implementations that comes out-of-the-box with Dempsy, they can implement their own and plug it in. This artifact includes
lib-dempsyapi
as a transitive dependency and to use it your build system should refer to:
<groupId>net.dempsy</groupId>
<artifactId>lib-dempsycore</artifactId>
- The Default Dempsy Implementation (lib-dempsyimpl). The jar artifact lib-dempsyimpl contains the default implementations for the framework Api (from lib-dempsycore) as well as a set of required concrete classes. This artifact includes
lib-dempsycore
as a transitive dependency and to use it your build system should refer to:
<groupId>net.dempsy</groupId>
<artifactId>lib-dempsyimpl</artifactId>
Dempsy also currently includes two runtime libraries that each support the startup and configuration using a particular Dependency Injection container.
- Spring based startup and configuration (lib-dempsyspring). This library contains the "main" methods for starting Dempsy using Spring. The code here assumes that the
ApplicationDefinition
will be accessible to the Spring application context. This artifact includeslib-dempsyimpl
as a transitive dependency and to use it your build system should refer to:
<groupId>net.dempsy</groupId>
<artifactId>lib-dempsyspring</artifactId>
- Note: The Guice support is currently under construction: Guice based startup and configuration (lib-dempsyguice). This library contains the "main" methods for starting Dempsy using Google Guice. The code here assumes that the
ApplicationDefinition
will be accessible to the Guice application module. This artifact includeslib-dempsyimpl
as a transitive dependency and to use it your build system should refer to:
<groupId>net.dempsy</groupId>
<artifactId>lib-dempsyguice</artifactId>
lib-dempsyapi
contains all of the classes, interfaces, and annotations that an "application developer" would need. Some of these we've seen already. A table with brief overview follows. These are expanded upon in more detail in later sections.
Element | Description |
---|---|
@MessageProcessor | This is a class annotation that identifies the class of a message processor prototype |
@MessageHandler | This is a method annotation that identifies a method on a message processor prototype class (which must be annotated with a @MessageProcess annotation) as one that handles a message. The method so annotated must take a single object class which must follow the Dempsy message requirements and can also return a message if desired. |
@MessageKey | As described above, a Dempsy message class requires that one, and only one, method be annotated as a message key. |
@Activation, @Passivation | When Dempsy instantiates a message processor to handle a particular message, it's first @Activated and provided the message key. Also, when Dempsy removes a message processor it will @Passivate it first. |
@Output | Dempsy can make calls on message processors based on a particular schedule rather than simply based on incoming data. Methods annotated with the @Output call will be invoked at these scheduled times, and the data they return, if a compliant Dempsy message, will be passed on |
@Evictable | Dempsy can remove unused message processors from the container. This behavior is fully controlled by the application developer |
Adaptor | The Adaptor interface is implemented by the application developer to adapt Dempsy to external sources of data. |
KeySource | Dempsy can pre-create all message processors at start-up rather than simply based on incoming data. The application developer will implement the KeySource interface to provide the domain of the key-space to Dempsy. |
The core of a Dempsy application is the Message Processor. All of the features of Dempsy can be understood by walking through the lifecycle of a message processor. The following diagram is the reference for the rest of the section which will walk through each phase and transition.
First, the message processor prototype is instantiated (constructed) on start-up of the node. You therefore have access to all of the DI framework capabilities for configuring your prototype.
In the diagram the lifecycle stages of the message processor prototype (as opposed to the message processor) are the four stages in the top center of the diagram.
Keep in mind the distinction between a message processor prototype and a message processor. There is a reason this is called a prototype. The instance that's instantiated on start-up by the dependency injection framework serves as a template for future instances. Dempsy will use the "clone
" method to create more instances from this prototype. You need to consider that when you configure and initialize your message processor prototype and when you write the "clone
" method.
Also note, there will be a prototype created within each node that the specific message processor will be running in. However, it is possible that instances within one node were instantiated from cloning a prototype within another node and then that instance was migrated as part of Dempsy's elastic behavior. This could affect you if you rely on static members in your classes or on references between message processors and the message processor prototype.
Dempsy provides an opportunity to initialize the message processor prototypes in a phase of the lifecycle other than construction that will only be invoked in nodes where the prototype will be used. Any method annotated with @Start will be called after construction. You can think of this as a @PostContruct for the prototype.
Note: we intend to introduce "lazy construction" of message processor prototypes which will remove the need for the @Start annotation and lifecycle phase of the prototype. To keep up with the progress and decisions on this issue, or participate in the discussion, see the "Lazy instantiation of Mp Prototypes" issue. |
The method annotated with the @Start
is allowed to take a ClusterId
which will be passed by the framework when it is invoked with the value that corresponds to the Application Name and Cluster Name where this prototype's @Start method is being invoked.
After the @Start, or after the Construction if there is no method annotated with an @Start, the message processor prototype enters the Prototype Ready phase where it will now serve as a template to create message processors as needed.
From the Prototype Ready state, the "clone()" method can be invoked for one of two reasons.
- When a message comes into the node for with a key for a message processor that has not yet been created, then
clone()
will be invoked on the prototype. - There is a means of pre-instantiating the entire key space in a cluster. Please see the section on pre instantiation of message processors.
The message processor resulting from the clone
method call is then activated by having any method annotated with the @Activation annotation invoked. If the @Activation method takes a parameter of the type of the message key then the key is passed to the @Activation method.
@Activation annotated method is called during pre-instantiation process on each Message Processor clone.
While not shown on this diagram, @Activation will also be called when a prototype is migrated from one node to another. See the section on [elasticity] and to corresponding diagram.
See the section on Activation and Passivation for more details.
In this state the message processor is available to be used in normal processing. It can now be used to handle messages, @Output directives (see below), or @Evictable checks (see below). If the message processor was created in response to an incoming message, it will immediately be given the message to work on and enter the @MessageHandler phase.
When a message comes in for the message processor then the appropriate method annotated with the @MessageHandler annotation will be invoked and given the message. Any resulting data will be routed on to any cluster capable of receiving the message.
Dempsy will periodically invoke any method on a message processor that's annotated with the @Output annotation. The schedule for @Output calls is configurable as part of the ClusterDefinition
.
@Output
provides a mechanism for message processors to add data to the Dempsy processing chain without it being driven from the stream itself. Any messages returned from the invocation of the message processor @Output method will be routed on to any clusters that handle those messages.
Note: The @Output scheduler is currently supported but will be changed in the future to be "cron"' like rather than using relative time intervals. You can keep up with this development or comment on it by watching the enhancement issue "Cron-like output scheduler." |
See the section on Non-stream Driven Message Processor Output for more details.
In some use cases, there may be a desire for message processor instances to be transient. For example, in a case where message processors are monitoring events from an individual session with a unique id. Once that session closes the message processor will never be invoked again, but will remain in the container.
For such cases Dempsy provides a means for the message processor to let the container know that it can be evicted from the container. If the message processor annotates a method with the @Evictable annotation that returns a boolean
then the container will periodically invoke that method. If that method ever returns true
, an indication that the message processor instance can be removed from the container, then that message processor will never have the message handler or @Output method invoked again, and will eventually be cleaned up by garbage collection.
When a message processor is going to be removed from a container, the message processor is notified via any method that is annotated with the @Passivation annotation. This phase of the lifecycle provides the message processor with a clean means of managing its state.
There are two events that will cause a message processor to have the @Passivation
method invoked.
- As described above, the message processor is being evicted from the container.
- The @Passivation method is called when a message processor is about to be migrated to another node in the cluster.
Keep in mind that in cases where an application has stateful message processors, Dempsy relies on the application to manage that state. While the @Passivation method can be helpful, it cannot be relied upon as the sole point where stateful applications will persist their state information. In the case of node failures @Passivation is not invoked, but the recreation and @Activation of that message processor can still happen on another node following such a failure.
Obviously, Dempsy is meant to be deployed in a distributed manner. It's designed to make this really easy, yet not take on responsibilities better solved by other infrastructure components. By this we mean that Dempsy does NOT operate as an "Application Server." You do not "deploy" an application to Dempsy. Rather you deploy your application, along with Dempsy.
There is a significant difference between batch oriented big-data systems and stream based big-data systems. In batch oriented big-data (Hadoop) you have data partitioned among nodes and you send the functionality to the data in order to operate on it in parallel. Therefore, in cases like that, the concept of "deploying" and application to an application server makes sense.
Here, however, your functions run, and the job of the system is to get the data to your code. This means there can be a better separation of concerns and well established IAAS cloud based provisioning tools can be used to manage the system.
Dempsy is designed to be run in a cloud based system and rely on the provisioning tools already supporting the cloud. On Amazon, for example, you can automatically provision more nodes in response to load (using Amazon's "CloudWatch").
Dempsy is responsible for the coordination of nodes, once they are started. Not for starting them. But Dempsy makes this coordination work with almost no effort or configuration. The next sections describe how this works.
In order to distribute a Dempsy application on a set of nodes, you simply start the application using the com.nokia.dempsy.spring.RunNode
wrapper with a few system parameters ... on all of the nodes. Some of the parameters are required and some are optional.
java -classpath classpath [options] com.nokia.dempsy.spring.RunNode
options include:
- -Dappdef - Required - This identifies the spring application context with the ApplicationDefinition describing the application being started.
- -Dapplication - Required - This parameter identifies the application and cluster to start by name. The format is:
application:cluster
. Bothapplication
andcluster
can be regular expressions. This provides for the ability to start multiple clusters within the same node. - -Dzk_connect - Required - As mentioned in the Prerequisites section, an Apache ZooKeeper install is required to run Dempsy in a distributed manner. The connect string for ZooKeeper needs to be supplied.
- -Dzk_session_timeout - Optional - Please refer to the ZooKeeper documentation for the description of the timeout parameter. The value is in milliseconds and the default is 5000 (milliseconds).
- -Dmin_nodes_for_cluster - Optional - This parameter should be set to the minimum number of nodes it will require to run the application. If the number of nodes falls below this number, then some of the messages being sent to this cluster (or these clusters) will be dropped. The default is 1.
- -Dtotal_slots_for_cluster - Optional - This allows the tunning of the number of
shards
being used in the node management. Please see the section on Routing for more details. The default is 100.
That's it. There's no configuration about what is running where as this is discovered dynamically. There's no configuring of a physical topology and even the logical topology (what messages from what processors go to what other processors) is discovered at runtime and can change dynamically (see the sections on Routing and Dynamic Topology for more details).
As a result of this flexibility Dempsy cooperates with applications that run in a cloud environment where good IAAS tools provision and start nodes automatically and dynamically in response to whatever the current environmental conditions call for.
As an example, suppose we're running our WordCount example distributed and we find that we are overloaded. That we are overloaded can be discovered in any number of ways including: examination of Dempsy monitoring points (see the section on Monitoring) or due to back pressure on the source (e.g. say the words are being drawn from a traditional message queuing system and Dempsy is configured not to discard messages when it gets behind, the queue of words would begin to lengthen indicating the need for more processing power to keep up) or by simply looking at the system metrics on the nodes (CPU and memory).
In response to any or all of these conditions new nodes brought online will automatically join the processing.
As we have seen, Dempsy needs some minimal description of an application. With "Dynamic Topology" this description itself doesn't need to be centralized. In the examples we have seen to date the Application Definition described all of the MessageProcessors that participate in an application. For many, if not most applications, this is a good way to go since everything is defined in one place.
Let's review. In the Word Count example the application definition fully describes the application by listing which message processors participate in the application. Note, this configuration does NOT describe the topology (which message processors send messages to which other message processors). Again, that information is discovered at runtime and so the order of the ClusterDefinition
s below makes no difference.
<beans>
<bean class="com.nokia.dempsy.config.ApplicationDefinition">
<constructor-arg value="word-count" />
<property name="clusterDefinitions">
<list>
<bean class="com.nokia.dempsy.config.ClusterDefinition">
<constructor-arg value="adaptor"/>
<property name="adaptor">
<bean class="com.nokia.dempsy.example.userguide.wordcount.WordAdaptor" />
</property>
</bean>
<bean class="com.nokia.dempsy.config.ClusterDefinition">
<constructor-arg value="mp"/>
<property name="messageProcessorPrototype">
<bean class="com.nokia.dempsy.example.userguide.wordcount.WordCount"/>
</property>
</bean>
</list>
</property>
</bean>
</beans>
Notice, an ApplicationDefinition
contains ClusterDefinition
s and, remembering our Terminology, a Cluster is the set of all message processors of a particular type that are all participating in the same step of an application.
There are several parameters that can be set for both the ApplicationDefinition
and the ClusterDefinition
. All of the ApplicationDefinition
parameters allow for the defaulting of Cluster specific settings and can be redefined or overridden at the cluster level using the ClusterDefinition
. The ApplicationDefinition
parameters include:
serializer
- Optional - You can set the serialization scheme for the transferring of messages between message processors. Out of the box, Dempsy comes with two serializers that can be used here. One supports straightforward java serialization and one supports Kryo. Kryo is the default but should be tuned for production applications. See the section on [Serialization] for how to correctly tune Kryo within Dempsy. Custom serialization can also be defined. See the section on Framework Development and specifically the section on the Serialization Api.statsCollectorFactory
- Optional - This setting provides for either the complete override the monitoring back-end, or the fine tuning of the main out-of-the-box implementation. The main implementation is very powerful and so the main use of this setting will be to fine tune the monitoring parameters. For information on how to completely replace the monitoring see the section on the Monitoring Api as part of Framework Development. See the section on Monitoring Configuration for details on how to tune the Dempsy statistics collector.executor
- Optional - This may change prior to a 1.0 release. Dempsy provides for the ability to tune, or completely redefine, the default threading model for processing. To understand how to replace the threading model, see the section on the Executor Api as part of Framework Development. To tune the existing implementation see the section on Executor Configuration.
Each of the above parameters can be overridden in the ClusterDefinition
. In addition, the ClusterDefinition
provides for the setting of the following:
adaptor
- Required eitheradaptor
ormessageProcessorPrototype
- Setting the Adaptor defines a special type of cluster that defines the source for streaming data. The object provided must be an instance of theAdaptor
interface. See the Adaptors section of the Simple Example and the api docs for the Adaptor interface for more information.adaptorDaemon
- Optional - this is a boolean property that identifies whether or not the thread that the Adaptor runs in should be adaemon
thread. The default isfalse
. It's really not a good idea to use this parameter as it tells Dempsy to ignore the shutting down of the Adaptor. Ideally anadaptor
should shutdown cleanly in response to astop
. This property is ignored if theClusterDefninition
doesn't have theadaptor
set.messageProcessorPrototype
- Required eitheradaptor
ormessageProcessorPrototype
- This is how a prototype for the message processor is provided to the cluster. The object provided must be annotated with the@MessageProcessor
annotation. See the Message Processor Prototype section of the Simple Example and the api docs for the @MessageProcessor annotation for more information.evictionTimePeriod
andevictionTimeUnit
- Optional - When a message processor is evictable these properties allow for the tuning of the frequency of eviction checking. If one is set, they should both be set. The default is 600 seconds (10 minutes). The time unit is a instance of the enumjava.util.concurrent.TimeUnit
. If the message processor isn't evictable then setting these properties has no effect.outputExecutor
- Required if the message processor needs to run @Output cycles - This property is used to set the scheme for invoking an @Output cycle. See the section in the Simple Example on Non-stream Driven Message Processor Output for more information. Especially the Advanced Options section. This setting can be used to define new schemes following the details in the section on the Output Executor Api as part of Framework Development.
As was previously mentioned, Dempsy is built on a set of core abstractions. One of those abstractions is Serialization. This means the entire serialization scheme for the framework is pluggable. If you're interested in plugging in custom serialization then see the section on Framework Development and specifically the section on the Serialization Api.
Dempsy comes with two serializer implementations. The first is straightforward java serialization. The second is Kryo, which is the default if you don't specify it in the Application Definition or Cluster Definition.
Note: Currently the default implementation for the Serializer is Kryo. Kryo, when used correctly, can be 8 to 10 times more efficient, in terms of both network traffic and raw serialize/deserialize performance, than straight Java serialization. Please do not use Java serialization for any production systems that require efficiency unless you plan on hand optimizing all of your message serialization. You have been warned. |
As the name implies, Java serialization is simply an implementation of the core abstraction that uses standard java serialization. Therefore, in order for it to work, all of your message classes and their attributes need to implement java.io.Serializable
. If they don't you will get send failures in Dempsy.
In order to enable java serialization you simply provide the implementation to the Application Definition (which is over-ridable in the Cluster Definition as described in the Application Definition section. An example follows:
...
<bean class="com.nokia.dempsy.config.ApplicationDefinition">
<constructor-arg value="ApplicationName"/>
<property name="serializer">
<bean class="com.nokia.dempsy.serialization.java.JavaSerializer"/>
</property>
...
If you don't do anything, Kryo will be used for the default serialization. However, Kryo can be made much more efficient with a little bit of configuration.
Note: In order to use the defaults, every class that gets serialized must have a default constructor. Kryo will work with private default constructors as long as the security settings allow for it. For more details on Kryo's requirements for object creation, see the Kryo documentation on object creation |
The easiest way to optimize Kryo is to provide the configuration with an ordered list of all of the classes that will be serialized as part of the application. The order of the list should be most-frequently serialized to least frequently serialized. Note: it is critical that all senders and receivers have the same list in the same order and the best way to make this happen is to make this list part of the Application Definition.
In order to list the classes you will provide the instance of Dempsy serializer in the Application Definition just like in the case of the Java Serializer except we will include additional parameters. For example, the Application Definition for the Word Count example could look like this:
<bean class="com.nokia.dempsy.config.ApplicationDefinition">
<constructor-arg value="WordCount"/>
<property name="serializer">
<bean class="com.nokia.dempsy.serialization.kryo.KryoSerializer" >
<constructor-arg>
<list>
<bean class="com.nokia.dempsy.serialization.kryo.Registration" >
<constructor-arg value="com.nokia.dempsy.example.userguide.wordcount.Word" />
</bean>
<bean class="com.nokia.dempsy.serialization.kryo.Registration" >
<constructor-arg value="com.nokia.dempsy.example.userguide.wordcount.CountedWord" />
</bean>
</list>
</constructor-arg>
...
Note the Word
class is listed prior to the CountedWord
class. It's safe to say, given the nature of the Word Count application, that there will be many more Word
messages than CountedWord
messages and therefore, Word
is listed first.
By default the Kryo serializer will work with unregistered classes, however, there is a way to tell the Kryo serializer that all classes must be registered. Since registered classes are so much more efficient, during development it is a good idea to set this option. Setting this options causes serialization to fail on any unregistered class. This will require the developer to register the class to resolve the problem and thereby assure that all classes are registered.
To enable this option you set the following property:
...
<bean class="com.nokia.dempsy.serialization.kryo.KryoSerializer" >
...
<property name="kryoRegistrationRequired" value="true" />
...
Dempsy provides a hook into the Kryo serializer that allows for the application developer to get access to the underlying Kryo
implementation and thereby opens up all of the underlying Kryo options to the developer. You will need to be familiar with Kryo itself in order to use it but if you want access, you need to implement the com.nokia.dempsy.serialization.kryo.KryoOptimizer
interface.
The implementation of the com.nokia.dempsy.serialization.kryo.KryoOptimizer
interface will be given the central com.esotericsoftware.kryo.Kryo
instance both before any registration happens, and then again afterward. Things to keep in mind:
- If you want to modify or add to the
com.esotericsoftware.kryo.Kryo
instance before any Registrations are applied, implement thevoid preRegister(com.esotericsoftware.kryo.Kryo kryo)
method to do that. Otherwise implement it to do nothing. - If you want to modify or add to the
com.esotericsoftware.kryo.Kryo
instance after all of the Registrations are applied, implement thevoid postRegister(com.esotericsoftware.kryo.Kryo kryo)
method to do that. Otherwise implement it to do nothing. - Dempsy's Kryo based serializer pools
com.esotericsoftware.kryo.Kryo
instances since they are not thread safe. They are created 'as needed' and therefore theKryoOptimizer
will be used each time a newcom.esotericsoftware.kryo.Kryo
instance is created and pooled.
To provide your implementation of the KryoOptimizer
you set the property on the Serializer as follows:
...
<bean class="com.nokia.dempsy.serialization.kryo.KryoSerializer" >
...
<property name="kryoOptimizer">
<bean class="com.mycompany.myapplication.MyOptimizer" />
</property>
...
Dempsy monitoring is very flexible, detailed, powerful, and designed to be used in a widely distributed system. Out of the box it supports integration with several different "reporting back ends" including:
- Full integration with Gaphite
- Full integration with Ganglia
- It can write all metrics to a set of CSV files
- It can write all metrics to the console
... or any combination of the above. With very little effort you can merge the Dempsy framework metrics with application metrics and piggyback your own application monitoring points onto the same system.
Or, like many of the other core abstractions that Dempsy is built on, you can completely replace the implementation and develop your own monitoring extensions. For more information on this see the section on Framework Development and specifically the section on the Monitoring Api.
Dempsy's default monitoring is built on the Yammer Metrics library by Coda Hale, an incredibly simple, flexible and versatile monitoring abstraction.
The following lists all of the framework metrics that are gathered. Depending on the back-end selected, aggregations of individual metrics can also be supplied. For example, when using Graphite you can see, not only the raw message count, but also 1-minute, 5-minute, and 10-minute average message rates, among others. Graphite will also provide very useful duration/timing aggregates including medians, standard-deviations, and various percentile times (99th, 99.9th, etc) which can be really useful in tracking down application problems. These aggregations will not be listed in the following table, but can be inferred depending on the aggregation capabilities of the reporting back end.
Keep in mind, metrics are specific to a particular cluster within a particular node. Aggregation is provided by the "reporting back end" functionality.
The metrics are broken into three sets. It's useful to keep this distinction in mind:
- Receive side metrics are those that track information about message receiving. They deal with monitoring what happens from when a message comes into a node to when it's routed to a message processor.
- Processing metrics are those that track information about the processing of messages, including timing, counts and failures.
- Send side metrics are those that track information about message sending. They deal with monitoring what happens from when a message is dispatched from an Adaptor or returned from a message processor to when it's sent on to the next step in the processing.
Metric | Description |
---|---|
messages-received | How many messages were successfully received by this node. Note that in Adaptors this will always be zero. |
bytes-received | How many bytes were successfully received by this node. Note that in Adaptors this will always be zero. |
messages-discarded | This metric indicates how many messages were discarded. This metric does not incorporate 'failed' messages (see the entry below) but does include 'collisions' (see the entry below). Dempsy usually discards messages due to load when queues fill up. It will always opt to discard the oldest messages. This metric will always be zero in an Adaptor. |
messages-collisions | Depending on the transport and how it's configured, Dempsy may discard messages if one shows up for a message processor while it's busy working on something else. Transport defaults will queue messages when collisions happen rather than discard them so this metric will always be zero unless the transport configuration is told to 'fail fast'. See the details on the various Transports for more information. This metric is also incorporated into the 'messages-discarded' described above. This metric will always be zero in an Adaptor. |
messages-pending | The total number of queued messages waiting to be processed at any given point in time. This metric is dependent on the underlying transport and isn't supported in all cases. If the transport queues incoming messages, then it should be supplied. This metric will always be zero in an Adaptor. |
Metric | Description |
---|---|
messages-dispatched | This is the total number of messages that have been dispatched to a message processor no matter what the eventual disposition of the processing is. This metric will always be zero in an Adaptor. |
messages-processed | This metric indicates how many messages were successfully processed by a _message processor_. These include all messages provided to a message processor where themessage processor's @MessageHandler returned without throwing an exception, whether or not it returned a message to be forwarded on. This metric will always be zero in an Adaptor. |
messages-mp-failed | This metric includes all messages provided to a message processor where the message processor's @MessageHandler threw and exception. This metric will always be zero in an Adaptor. |
messages-dempsy-failed | This metric includes all message that failed due to a framework/container error when a message was presented to a message processor. These are extremely rare and will be accompanied by an error in the log detailing the problem. This metric will always be zero in an Adaptor. |
message-processors-created | The total number of message processors that have been created. This is not the total number of message processor currently in the container but the number that have been created since the container was started. Therefore the number is always increasing even when eviction is enabled or elasticity moves instances. The current total number of live message processor will be 'message-processors-created' - 'message-processors-deleted' metrics (see below for 'message-processors-deleted'). This metric will always be zero in an Adaptor. |
message-processors-deleted | The total number of message processors that have been @Passivated. There are several reasons that a message processor might be passivated. Please review the section on Passivation for more information. This metric will always be zero in an Adaptor. |
message-processors | A snapshot of the total number of message processors currently active in the container. This metric will always be zero in an Adaptor. |
mp-handle-message-duration | The duration of calls on the message processor's @MessageHandler method. Aggregates of this value can be useful in determining application performance bottlenecks. This metric will always be zero in an Adaptor. |
outputInvoke-duration | The duration of @Output passes. See the section on Non-stream Driven Message Processor Output. Aggregates of this value can be useful in determining application performance bottlenecks in the output processing of the application. This metric will always be zero when the message processor has no @Output method or in an Adaptor. |
evictionInvoke-duration | The duration of Eviction check passes. See the section on Eviction. This metric will always be zero when the message processor has no @Evictable method or in an Adaptor. |
pre-instantiation-duration | The duration of the pre-instantiation pass. This metric will always be zero in an Adaptor. |
Metric | Description |
---|---|
messages-sent | How many messages were successfully sent out of this node. |
bytes-sent | How many bytes were successfully sent out of this node. |
messages-unsent | This metric indicates how many messages where an attempt was made to send, but were discarded prior to successfully transmitting the message. This count is not incorporated in the above 'messages-sent' metric since that only covers successfully transmitted messages. There are several reasons this metric can be triggered. They include: 1) There is no current destination available for the message. 2) The message has no @MessageKey or reflection failed to retrieve it. 3) A shutdown happens while the message is being queued. 4) An exception (likely an I/O exception) happens when attempting to transmit the message. 4) There are too many messages queued up to be sent out to a particular destination. In this case Dempsy will always opt to discard the oldest messages. |
messages-out-pending | The total number of queued messages waiting to be transmitted at any given point in time. This metric is dependent on the underlying transport and isn't supported in all cases. If the transport queues outgoing messages, then it should be supplied. |
The monitoring can be tuned in several different ways but each of these require setting the statsCollectorFactory
property on the Application Definition and setting parameters on the implementation provided. The property is set to an implementation of the interface com.nokia.dempsy.monitoring.StatsCollectorFactory
(again, see the Monitoring Api section for more details on this interface and how to provide your own implementation). The implementation provided with Dempsy is called com.nokia.dempsy.monitoring.coda.StatsCollectorFactoryCoda
(named after Coda Hale).
As an example, one that will be the basis of the following sections, you would add the following to your Application Definition for your application:
<beans>
<bean class="com.nokia.dempsy.config.ApplicationDefinition">
<constructor-arg value="myApplication" />
...
<property name="statsCollectorFactory">
<bean class="com.nokia.dempsy.monitoring.coda.StatsCollectorFactoryCoda" />
</property>
...
Of course, without any properties set on the StatsCollectorFactoryCoda
there's nothing more than the default already gives you. The properties that can be set are described in the following sections:
By default, Dempsy's monitoring implementation isn't configured with any "reporting back-ends" (see the enumerated list above). However, all metrics are automatically exposed via JMX even if no (other) reporting back ends are configured. If you want to use one of the reporting back-ends you need to supply a list of reporting specs (com.nokia.dempsy.monitoring.coda.MetricsReporterSpec
instances) to the configuration of the StatsCollectorFactoryCoda
implementation.
To select different reporting back ends you use the type
property on the MetricsReporterSpec
. The following shows how to select Graphite:
<beans>
<bean class="com.nokia.dempsy.config.ApplicationDefinition">
<constructor-arg value="myApplication" />
...
<property name="statsCollectorFactory">
<bean class="com.nokia.dempsy.monitoring.coda.StatsCollectorFactoryCoda" >
<property name="reporters">
<list>
<bean class="com.nokia.dempsy.monitoring.coda.MetricsReporterSpec">
<property name="type">
<value type="com.nokia.dempsy.monitoring.coda.MetricsReporterType">GRAPHITE</value>
</property>
...
MetricsReporterType
is an enum with the values: GRAPHITE
,GANGLIA
,CONSOLE
,CSV
.
Note: You can enable multiple reporting back ends simultaneously by providing multiple MetricsReporterSpecs in the list for the "reporters" property. |
Note: Keep in mind, all metrics are automatically reported through JMX not matter which, or how many, "reporting back-ends" are enabled. |
Here is an example of how you configure Graphite as the reporting back end for the monitoring:
<beans>
<bean class="com.nokia.dempsy.config.ApplicationDefinition">
<constructor-arg value="myApplication" />
...
<property name="statsCollectorFactory">
<bean class="com.nokia.dempsy.monitoring.coda.StatsCollectorFactoryCoda" >
<property name="reporters">
<list>
<bean class="com.nokia.dempsy.monitoring.coda.MetricsReporterSpec">
<property name="type">
<value type="com.nokia.dempsy.monitoring.coda.MetricsReporterType">GRAPHITE</value>
</property>
<property name="period" value="1"/>
<property name="unit">
<value type="java.util.concurrent.TimeUnit">MINUTES</value>
</property>
<property name="hostName" value="${graphite.host}"/>
<property name="portNumber" value="${graphite.port}"/>
</bean>
</list>
</property>
</bean>
</property>
...
When configuring the StatsCollectorFactoryCoda
to report to Graphite you MUST supply:
- the:
period
and theunit
, together these provide the time period for successive outputs to Graphite - the
hostName
andportNumber
for where the Graphite server is listening.
Notice that in the above example, the Spring configuration assumes the PropertyPlaceholderConfigurer
or -D
command line parameters are being used to supply the graphite host name and port.
Ganglia configuration is identical to Graphite configuration except the type
is GANGLIA
. For example:
<beans>
<bean class="com.nokia.dempsy.config.ApplicationDefinition">
<constructor-arg value="myApplication" />
...
<property name="statsCollectorFactory">
<bean class="com.nokia.dempsy.monitoring.coda.StatsCollectorFactoryCoda" >
<property name="reporters">
<list>
<bean class="com.nokia.dempsy.monitoring.coda.MetricsReporterSpec">
<property name="type">
<value type="com.nokia.dempsy.monitoring.coda.MetricsReporterType">GANGLIA</value>
</property>
<property name="period" value="1"/>
<property name="unit">
<value type="java.util.concurrent.TimeUnit">MINUTES</value>
</property>
<property name="hostName" value="${ganglia.host}"/>
<property name="portNumber" value="${ganglia.port}"/>
</bean>
</list>
</property>
</bean>
</property>
...
Again, like for Graphite, you MUST supply:
- the:
period
and theunit
, together these provide the time period for successive outputs to Ganglia - the
hostName
andportNumber
for where the Ganglia server (carbon-cache agent) is listening.
For CSV output you just need to supply the output directory. The CSV reporter will periodically append data to metric-specific files in that output directory. An example of how to configure it follows:
<beans>
<bean class="com.nokia.dempsy.config.ApplicationDefinition">
<constructor-arg value="myApplication" />
...
<property name="statsCollectorFactory">
<bean class="com.nokia.dempsy.monitoring.coda.StatsCollectorFactoryCoda" >
<property name="reporters">
<list>
<bean class="com.nokia.dempsy.monitoring.coda.MetricsReporterSpec">
<property name="type">
<value type="com.nokia.dempsy.monitoring.coda.MetricsReporterType">CSV</value>
</property>
<property name="period" value="1"/>
<property name="unit">
<value type="java.util.concurrent.TimeUnit">MINUTES</value>
</property>
<property name="outputDir" >
<bean class="java.io.File">
<constructor-arg value="${csv.outdir}" />
</bean>
</property>
</bean>
</list>
</property>
</bean>
</property>
...
You MUST supply:
- the:
period
and theunit
, together these provide the time period for successive outputs to the individual CSV files. - the
outputDir
of where to write the files.
You can configure Dempsy to periodically report the metrics to the console (stdout):
<beans>
<bean class="com.nokia.dempsy.config.ApplicationDefinition">
<constructor-arg value="myApplication" />
...
<property name="statsCollectorFactory">
<bean class="com.nokia.dempsy.monitoring.coda.StatsCollectorFactoryCoda" >
<property name="reporters">
<list>
<bean class="com.nokia.dempsy.monitoring.coda.MetricsReporterSpec">
<property name="type">
<value type="com.nokia.dempsy.monitoring.coda.MetricsReporterType">CONSOLE</value>
</property>
<property name="period" value="1"/>
<property name="unit">
<value type="java.util.concurrent.TimeUnit">MINUTES</value>
</property>
</bean>
</list>
</property>
</bean>
</property>
...
You MUST supply the: period
and the unit
, together these provide the time period for successive outputs to the console.
Metrics are named hierarchically and various reporting back-ends (and JMX) handle these names differently. Metric naming is important for management and aggregation purposes. For example, it helps to be able to separate the 'messages-received' coming from different applications, hosts, or difference deployment environments (development, testing, etc). This is especially important where the metrics are centrally gathered, tracked and managed.
By default, Dempsy provides it's metrics using the following naming scheme:
[environment prefix][node name][application name]-[cluster name].Dempsy.[metric name]
Note that the environment prefix and the node only apply to the Graphite and Ganglia reporting back-ends and do not effect the names of the monitoring points in either the other back-ends or in JMX.
Keep in mind that some back-ends, like Graphite, give special meaning to the '.' character.
The environment prefix provides a means to prepend an indication as to which environment this node is running in. This allows for the use of a single Graphite or Ganglia server to monitoring multiple environments and yet, at the top of the hierarchy, separate all of the metrics by which environment they're from.
You supply the environment prefix in one of two ways. First, you can set the environmentPrefix
property directly on the StatsCollectorFactoryCoda
instance when configuring it as described above. Alternatively you can provide a system property using -Denvironment.prefix=...
. The system property will take precedence.
The environment prefix doesn't assume a '.' after it by default so if you want the back-end to acknowledge the environment as a place in the hierarchy of the name, then you should include the '.' when you specify the prefix. For example, in Graphite, there's a difference between -Denvironment.prefix=test-
and -Denvironment.prefix=test.
. The latter will allow the collapsing of the hierarchy under the top level environment name, the former will not. This allows the person providing the environment name to determine whether or not they want it flat, or hierarchical.
The environment prefix has no effect on the name within JMX or within reporting back-ends other than Graphite and Ganglia.
The node name is derived from the transport. In most cases it will be the IP Address of the host that the node in question is running on. However, since the IP address has '.' characters in it, this would cause Graphite to create a hierarchy out of the octets of the address so the '.' characters are replaced with '-' characters.
The node name has no effect on the name within JMX or within reporting back-ends other than Graphite and Ganglia.
Next comes the [application name]-[cluster name]
. This is directly from the Application Definition and Cluster Definition that's driving the node in question. The preference here is to flatten out the metrics (not use the '.'). This is a preference based on experience. If you really don't like this you can overload the naming scheme following the directions in the Monitoring Api.
In order to separate framework metrics from application metrics, a point in the hierarchy is inserted using .Dempsy.
which will cause all of the framework metrics to be grouped under a Dempsy
subset.
Each metric from the tables above will then be the last part of the name. Below this point in the name hierarchy will be the specific aggregates provided by Yammer Metrics and will depend on the type of metric.
Since Dempsy is built on Yammer Metrics, then simply using Yammer Metrics directly in your application will expose those metrics through whatever back-ends are configured along with the framework metrics.
Of course, it will help to use a naming scheme within Yammer Metrics that cooperates with the existing Dempsy framework use in order to have all of your metrics organized together with the framework metrics. There is no need to handle the environment name or node name portions of the metric names as those are specific to the reporting back-end and configured through the above configuration techniques. An example follows:
import com.yammer.metrics.core.MetricName;
...
MetricName appMetricName =
new MetricName(applicationName + "-" + clusterName,"application", "my-custom-application-metric");
...
If you want access to the ClusterId
that you're particular message processor is running in, you can use the @Start method on the message processor prototype.
Dempsy allows fine tuning of the number of threads that are used for processing. These threads apply to all processing taking place inside the message processor including message handling and output processing.
Note: In 0.7.2 the output processing has it's own executor with it's own threads. The intention is to eventually use the same DempsyExecutor instance for both. The documentation here assumes that. |
The default is based on the number of cores available on the machine. Often the default will not be appropriate. If the MP is I/O intensive, or the message sizes are large and therefore create an I/O bottleneck in the framework, it could be advantageous to explicitly set or tune the thread pool size.
By default the thread pool is set using the following formula:
number of threads = m * number of cores + additional threads
m
defaults to 1.25 and additional threads
defaults to 2.
You can set the number of threads through one of two alternative methods. The relative (to the number of cores) method is by setting m
and additional threads
. The absolute method is to simply supply the number of threads
directly.
The setting can be configured at either the ApplictionDefinition
level, or the ClusterDefinition
level. When set at the ApplicationDefinition
level it applies to all ClusterDefinitions
in the application.
<property name="executor">
<bean class="com.nokia.dempsy.executor.DefaultDempsyExecutor">
<property name="additionalThreads" value="45"/>
<property name="coresFactor" value="2.0"/>
</bean>
</property>
The above results in setting the number of threads to 2 * number of cores + 45
so if, for example, the number of cores is 8, the total number of threads will be 71.
Alternatively you can simply set the number of threads on the executor directly. Again, the setting can be configured at either the ApplictionDefinition
level, or the ClusterDefinition
level. When set at the ApplicationDefinition
level it applies to all ClusterDefinitions
in the application.
In the following example we set the number of threads to 45:
<property name="executor">
<bean class="com.nokia.dempsy.executor.DefaultDempsyExecutor">
<constructor-arg value="45"/>
<constructor-arg value="-1"/>
</bean>
</property>
Note: the second constructor argument is the maxNumberOfQueuedLimitedTasks discussed below. |
The Executor
is also the place where processing queue limits are handled. For example, if the node is getting behind and there are too many tasks queued up to execute, then the Executor will handle the shedding of messages.
When messages come into a node they are queued for delivery to the appropriate message processors but by default there are limits set. When the limits are exceeded the oldest queued messages are discarded, and all of the appropriate monitoring bookeeping is done.
By default the limit is derived from the number of threads and is currently set to 20 * thread pool size
. This can be explicitly set using either:
- The second constructor parameter
- The
maxNumberOfQueuedLimitedTasks
property on theDefaultDempsyExecutor
.
If this value is set to '< 0' (it's set to -1
in the above constructor arg. example) then the formula will be used to calculate the value.
It may be advantageous in certain testing situations to force the message processor message handling to either block on message submission, or set the limiting queue to unbounded. It is not recommended that either option be used in a production scenario for what should be obvious reasons.
If blocking is set to true then submitting message processing tasks, once the maxNumWaitingLimitedTasks
is reached, will block until room is available. This will create back pressure on the transport.
To set blocking use:
<property name="blocking" value="true"/>
Setting unlimited
to true allows for an unbounded queue to handle message processing. This instructs the DefaultDempsyExecutor
to effectively ignore the maxNumWaitingLimitedTasks
setting. The default behavior is for the oldest message processing tasks to be rejected when the maxNumWaitingLimitedTasks
is reached.
Setting unlimited
effectively causes the Executor to ignore the blocking
setting.
To set the DefaultDempsyExecutor
to unlimited
use:
<property name="unlimited" value="true"/>
#Message Routing and Microsharding
As described in the Simple Example, Dempsy associates an each unique @MessageKey value with an individual message processor instance. The main job of Dempsy then is to find the correct message processor to deliver each message to, among a cluster of machines. By default, Dempsy does this through the concept of microsharding.
Conceptually microsharding is elegant and provides several advantages over other solutions.
- It applies a constant amount of computational resources to the message routing problem, independent of the number of potential or actual message processors (the message key space size) or compute nodes. This is as opposed to tracking the location of each individual message processor.
- It provides a uniform distribution of message processors across compute nodes. This is as opposed to algorithms like the "distributed hash table."
- It allows for "stickiness" of message processor node locations when the cluster topology changes. This is as opposed to stateless algorithms like a simple mod operation on the key's hash code over the number of nodes. In this case existing message processors would be reshuffled among the nodes every time a new node entered or left the cluster.
Microsharding is a technique where the key-space for all potential message processors is broken up among a fixed number of shards. Shards are assigned uniquely to the currently available compute nodes. The number of shards M
is >>
the number of nodes N
. A shard is uniquely assigned to an individual node while a node will have many shards. The relationship between which nodes own which shards is managed by ZooKeeper.
In order to locate the correct node that a message processor is on, Dempsy will:
- Get the @MessageKey from the message instance.
- Get the
hashCode
of the @MessageKey - Find the shard associated to the message processor to which the message is addressed using
shard = message key's hash code % M
- Route the message to the node that owns the computed shard.
Using microsharding it's easy to manage changes to the cluster of nodes. Suppose a new node enters the cluster. An imbalance is introduced where the shards are no longer evenly distributed among the nodes. In this case, using ZooKeeper as the intermediary, the new node will negotiate for shards until the number of shards is evenly distributed again.
As a concrete example, let's say there's 5 nodes and 100 shards. That means each node has 20 shards. If a 6th node enters the cluster then the most any node should have would be 17 and the least any should have would be 16. The new node has zero and must acquire at least 16. The other nodes all have 20 and need to give up at least 4. In Dempsy's default RoutingStrategy this is done in a completely decentralized manner using ZooKeeper.
Notice that when the cluster changes, the fewest number of message processors actually migrate and most of them never move.
If one of the nodes then leaves the cluster we're back down to 5. When this happens the remaining nodes negotiate through ZooKeeper for to acquire the additional shards. None of the message processors from the nodes that stayed in the cluster needed to move.