Skip to content

Siddhi Architecture

Siddhi is a software library that can be utilized in any of the following ways:

  • Run as a server on its own
  • Run within WSO2 SP as a service
  • Embedded into any Java or Python based application
  • Run on an Android application

It provides analytical operators, orchestrates data flows, calculates analytics, and detects patterns on event data from multiple disparate live data sources. This allows developers to build applications that collect data, analyze it, and communicate the results in real time.

This section illustrates the architecture of Siddhi and guides you through its key functionality. We hope this article helps developers to understand Siddhi and its the codebase better, and also contribute to improve Siddhi by reporting and fixing bugs.

Main Design Decisions

  • Event by event processing of real-time streaming data to achieve low latency.
  • Ease of use with Streaming SQL providing an intuitive way to express stream processing logic and complex event processing constructs such as Patterns.
  • Achieve high performance by processing all events in-memory and by storing their states in-memory.
  • Optimizing performance by enforcing a strict schema for event streams and by pre-compiling the queries.
  • Optimizing memory consumption by having only the absolutly necessary information in-memory and dropping the rest as soon as possible.
  • Supporting multiple extension points to accommodate a diverse set of functionality such as supporting multiple sources, sinks, functions, aggregation operations, windows, etc.

High Level Architecture

Simple Siddhi Overview

At a high level, Siddhi consumes events from various events sources, processes them according to the defined Siddhi application, and produces results to the subscribed event sinks. Siddhi can store and consume events from in-memory tables or from external data stores such as RDBMS, MongoDB, Hazelcast in-memory grid, etc. (i.e., when configured to do so). Siddhi also allows applications and users to query Siddhi via its Store Query API to interactively retrieve data from its in-memory and other stores.

Main Modules in Siddhi

Siddhi comprises four main modules, they are:

  • Siddhi Query API : This allows you to define the execution logic of the Siddhi application as queries and definitions using POJO classes. Internally, Siddhi uses these objects to identify the tasks that it is expected to perform.

  • Siddhi Query Compiler : This allows you to define the Siddhi application using the Siddhi Streaming SQL, and then it converts the Streaming SQL script to Siddhi Query POJO Objects so that Siddhi can execute them.

  • Siddhi Core : This builds the execution runtime based on the defined Siddhi Application and processes the events as and when they arrive.

  • Siddhi Annotation : This is a helper module that allows all extensions to be annotated, so that they can be picked by Siddhi Core for processing. This also helps Siddhi to generate the extension documentation.

Siddhi Component Architecture

The following diagram illustrates the main components of Siddhi and how they work together.

Siddhi Component Architecture

Here the Siddhi Core module maintains the execution logic. It also interacts with the external environment and systems for consuming and publishing events. To achieve these tasks, it uses the following components:

  • SiddhiManager : This is a key component of Siddhi Core that manages Siddhi Application Runtimes and facilitates their functionality via Siddhi Context with periodic state persistence, statistics reporting and extension loading.

  • SiddhiAppRuntime : A Siddhi Application Runtime is generated for each Siddhi Application deployed. Siddhi Application Runtimes provide an isolated execution environment for each Siddhi Application defined. These Siddhi Application Runtimes are based on the logic of their Siddhi Application, and they consume and publish events from various external systems and Java or Python programmes.

  • SiddhiContext : This is a shared object for all the Siddhi Application Runtimes within the a Siddhi manager, and it contains references to the persistence store for periodic persistence, statistics manager to report performance statistics of Siddhi Application Runtimes, and extension holders for loading Siddhi extensions.

Siddhi Application Creation

Execution logic in Siddhi is composed as a Siddhi Application, and this is passed as a string to SiddhiManager to create the SiddhiAppRuntime for execution.

When a Siddhi Application is passed to the SiddhiManager.createSiddhiAppRuntime(), it is processed internally with the SiddhiCompiler. Here, the SiddhiApp String is converted to SiddhiApp object model by the SiddhiQLBaseVisitorImpl class. The model is then passed to the SiddhiAppParser for the SiddhiAppRuntime creation.

Siddhi App Execution Flow

Following diagram depicts the execution flow within a Siddhi App Runtime.

Execution Flow in Siddhi App

The path taken by events within Siddhi is indicated in blue.

The components that are involved in handling the events are as follows:

  • StreamJunction

    This routes events to various components within the Siddhi App Runtime. A stream junction is generated for each stream defined or inferred in the Siddhi Application. A stream junction by default uses the incoming event's thread for processing subscribed components, but it can also be configured via the @Async annotation to buffer the events and use a different thread for subsequent execution.

  • InputHandler

    An instance of input handler is created for each stream junction, and this is used for pushing Event and Event[] objects into stream junctions from sources, and Java/Python programmes.

  • StreamCallback

    This receives Event[]s from stream junction and passes them to sinks to publish to external endpoints, or passes them to subscribed Java/Python programmes for further processing.

  • Queries & Partitions

    These components process the events by filtering, transforming, joining, patten matching, etc. They consume events from one or more stream junctions, process them and publish the processed events into stream junctions based on the defined query or partition.

  • Source

    Sources consume events from external sources in various data formats, convert them into Siddhi events and pass them to corresponding Stream Junctions via their Input Handlers. A source is generated for each @Source annotation defined above a stream definition. Each source consumes events from an external source in the data format configured for it.

  • SourceMapper

    A source mapper needs to be configured for each source in order to convert the format of each incoming event to that of a Siddhi event. The source mapper type can be configured using the @Map annotation within the @Source annotation. When the @Map annotation is not defined, Siddhi uses the PassThroughSourceMapper, where it assumes that the incoming message is already in the Siddhi Event format, and therefore makes no changes to the event format.

  • Sink

    Sinks convert the Siddhi Events to various data formats and publish them to external endpoints. A Sink generated for each @Sink annotation defined above a stream definition to publish the events arriving to that stream.

  • SinkMapper

    A sink mapper is configured for each Sink in order to map the Siddhi events to the specified data format so that they can be published via the sink. The sink mapper type can be configured using the @Map annotation within the @Sink annotation. When the @Map annotation is not defined, Siddhi uses PassThroughSinkMapper, where it passes the Siddhi events in the existing format (i.e., the Siddhi event format) to the Sink.

  • Table

    Tables store events. By default, Siddhi uses the InMemoryTable implementation to store events in-memory. When @Store annotation is used, it loads the associated Table implementation based on the defined store type. Most table implementations are extended from the AbstractRecordTable abstract class for the ease of development.

  • Window

    Windows store events and determine when events can be considered expired based on the given window constrain. Multiple types of windows are can be implemented by extending the WindowProcessor abstract class.

  • IncrementalAggregation

    This allows you to obtain aggregates in an incremental manner for a specified set of time periods. Incremental aggregation functions can be implemented by extending IncrementalAttributeAggregator.

  • Trigger

    A trigger triggers events at a given interval to the stream junction that has the same name as the trigger.

  • QueryCallback

    A query callback receives notifications when events are emitted from queries. Then it notifies the event occurrence timestamp, and classifies the events into currentEvents, and expiredEvents.

Siddhi Query Execution

Siddhi QueryRuntimes can be categorized in to three main types:

Following section explains the internals of each query type.

SingleInputStream Query Runtime (Filter & Windows)

Single Input Stream Query

A single input stream query runtime is generated for filter and window queries. They consume events from a stream junction or a window and convert the incoming events according to the expected output stream format at the ProcessStreamReceiver by dropping all the unrelated incoming stream attributes.

Then the converted events are passed through a few Processors such as FilterProcessor, StreamProcessor, StreamFunctionProcessor, WindowProcessor and QuerySelector. Here, the StreamProcessor, StreamFunctionProcessor, and WindowProcessor can be extended with various stream processing capabilities. The last processor of the chain of processors must always be a QuerySelector, and each chain can have only one WindowProcessor. When the query runtime consumes events from a window, its chain of processors cannot contain a WindowProcessor.

The FilterProcessor is implemented with an ExpressionExecutor that returns a boolean value. Expressions have a tree structure, and they are processed based on the Depth First Search algorithm. To achieve high performance, Siddhi currently depends on the user to formulate the least successful case in the leftmost side of the condition, thereby increasing the chance of an early false detection.

The condition expression price >= 100 and ( Symbol == 'IBM' or Symbol == 'MSFT' ) is represented as shown below.

Siddhi Expression

These expressions also support the execution of user defined functions (UDFs), and they can be implemented by extending the FunctionExecutor class.

After getting processed by each processor, each event reaches the QuerySelector for transformation. At the QuerySelector, events are transformed based on the select clause of the query. If there is a Group By defined, then the GroupByKeyGenerator identifies the group by key, and then each AttributeProcessor is executed based on that group-by key. AttributeProcessors can contain any expression including constant values, variables and user defined functions. They can also contain AttributeAggregators for processing aggregation operations such as sum, count, etc. Here, an AttributeAggregator is generated to keep track of the aggregation state. When it becomes obsolete, it is destroyed. The events are transformed to the expected output format through this operation.

After an event is transformed to the output format, it is evaluated against the HavingConditionExecutor if a having clause is provided. Only the succeeding events are pushed to the OutputRateLimiter.

At OutputRateLimiter, the event output is controlled before sending the events to the stream junction or to the query callback. When the output clause is not defined, the PassThroughOutputRateLimiter is used by passing all the events without any rate limiting.

Temporal Processing with Windows

The temporal event processing aspect is achieved via Window and AttributeAggregators

To achieve temporal processing,Siddhi uses the following four type of events:

  • Current Events: Events that are newly arriving to a query from streams.

  • Expired Events: Events that have expired from a window.

  • Timer Events: Events that inform the query about an update of execution time. These events are usually generated by schedulers.

  • Reset Events: Events that resets the Siddhi query states.

In Siddhi, when an event comes into a WindowProcessor, it creates an appropriate expired event corresponding to the incoming current event with the expiring timestamp, and stores that event in the window. The WindowProcessor also forwards the current event to the next processor for further processing. It uses a scheduler or such to determine when to emit the events it has in memory. During that time, it emits these events as expired events. If it needs to emit all the events in the memory at once, it emits a single reset event instead of sending one expired event for each event it has stored, so that it can reset the states in one go. For each current event emitted from a window, a corresponding expired event must be emitted. If not, a common reset event must be emitted.
This is vital in Siddhi because Siddhi relies on these events to calculate the Aggregations at the QuerySelector. In the QuerySelector, the arrived current events increase the aggregation values, expired events decrease the values, and reset events reset the aggregation calculation.

For example, the sliding TimeWindow creates a corresponding expired event for each current event that arrives, adds that to the window, adds an entry to the scheduler to notify when that event need to be expired by sending it a timer event, and finally sends the current event to the next processor for subsequent processing.
When the window receives an indication that the expected expiry time has come for the oldest event in the window via a a timer event or other means, it removes the expired event from the window and passes that to the next processor.

Siddhi Time Window

JoinInputStream Query Runtime (Join)

Join Input Stream Query

Join input stream query runtime is generated for join queries. This can consume events from two stream junctions and perform a join operation as depicted above. It can also perform a join by consuming events from one stream junction and join against itself, or it can join against a table, window or an incremental aggregation. When a join is performed with a table, window or incremental aggregation, the WindowProcessor in the image is replaced with the table, window or incremental aggregation. No basic processors are used on their side.

The joining operation is triggered by the events that arrive from the stream junction. Here, when an event from one stream reaches the pre JoinProcessor, it matches against all the available events of the other stream's WindowProcessor. When a match is found, those matched events are sent to the QuerySelector as current events, and at the same time, the original event is added to the WindowProcessor where it remains until it expires. Similarly, when an event expires from the WindowProcessor, it matches against all the available events of the other stream's WindowProcessor, and when a match is found, those matched events are sent to the QuerySelector as expired events.

Note: Despite the optimizations, a join query is quite expensive when it comes to performance. This is because the WindowProcessor is locked during the matching process to avoid race conditions and to achieve accuracy while joining. Therefore, avoid matching large windows in high volume streams. Based on the scenario, using appropriate window sizes (by time or length) helps to achieve maximum performance.

StateInputStream Query Runtime (Pattern & Sequence)

State Input Stream Query (Pattern & Sequence)

The state input stream query runtime is generated for pattern and sequence queries. This consumes events from one or more stream junctions via ProcessStreamReceivers. A ProcessStreamReceiver and a set of basic processors is defined in the query for each condition. When a ProcessStreamReceiver consumes the events, it updates the states with the incoming events that are generated by previous conditions. If the condition applied is the first condition in the query, then it creates a new state and updates that with the incoming event. This is then passed to basic processors to perform filtering and transformation operations. The states that pass the basic processors are consumed by the PostStateProcessor and stored at the PreStateProcessor of the following pattern or sequence condition. When the state reaches the final condition's PostStateProcessor, the output event is generated and emitted by QuerySelector.

Siddhi Partition Execution

Siddhi Partition

A partition is a wrapper around one or more Siddhi queries and inner streams that connect them. A partition is implemented in Siddhi as a PartitionRuntime, and each unique partition instance is implemented as a PartitionInstanceRuntime Each partitioned stream entering the partition goes through the appropriate PartitionStreamReceiver. The PartitionExecutor of stream receiver evaluates
the incoming events to identify the partition key using the RangePartitionExecutor or the ValuePartitionExecutor, and passes the events to the QueryRuntimes of the appropriate PartitionInstanceRuntime based on the partition key. If a PartitionInstanceRuntime is not available for the given partition key, it is dynamically generated via the relevant QueryRuntimes and InnerStreamJunctions.

Based on the partition definition, the QueryRuntimes in the PartitionInstanceRuntime wire themselves via inner StreamJunctions or using StreamJunctions outside the partitions. When a partition query consumes a non partitioned global stream, all instances of its QueryRuntime that are part of multiple PartitionInstanceRuntimes receive the same event as depicted in the above diagram.

Siddhi Event Formats

Siddhi has three event formats.

  • Event

    This is the format exposed to end users when they send events via Input Handler, and consume events via Stream Callback or Query Callback. This consists of an Object[] that contains all the values in accordance to the corresponding stream.

  • StreamEvent (Subtype of ComplexEvent)

    This is used within queries. This contains the following three Object[]s: - beforeWindowData: This contains values that are only used in processors that are executed before the WindowProcessor. - onAfterWindowData: This contains values that are only used by the WindowProcessor and the other processors that follow it, but not sent as output. - outputData: This contains the values that comply with the output stream of the query.

In order to optimize the amount of data that is stored in the in-memory at windows, the content in beforeWindowData is cleared before the event enters the WindowProcessor. StreamEvents can also be chained by linking each other via the next property in them.

  • StateEvent (Subtype of ComplexEvent)

    This is used in Joins, Patterns and Sequences when we need to associate events of multiple streams, tables, windows and aggregations of different types together. This has a collection of StreamEvents representing different streams, tables, etc, that are used in the query, and outputData to contain the values that are needed for the query output. StreamEvents can also be chained by linking each other with the next property in them.

Event Chunks

Event Chunks provide an easier way of manipulating the chain of StreamEvents and StateEvents so that they are be easily iterated, inserted and removed.

Summary

This article focuses on describing the architecture of Siddhi and rationalizing the architectural decisions made. It also explains the key features of Siddhi.
This is possibly a great starting point for new developers to understand Siddhi and to start contributing to it.