Flows

Home / Flows
Addjson
Parse a JSON object string and add the elements as fields to the data tuple.

Description

Given an Field parameter, parse the field’s content in the input tuple as an unnested JSON object string and add the key-value pairs as fields to the output tuple.

Parameters

  • Field (field) – JSON object string identifier

References a field available in the input tuple.

Input

The field references by the Field parameter should be available in the input tuple.

Output

The output tuple contains the key-value pairs from the JSON object string as fields.

Example
concept Example {
    flow
        => code[j = '{"a": "a", "b": 1}']
        => addjson['j']
}
Add the fields a and b with values “a” and 1 to the flow. The output tuple would look as follows:
{ j: {"a": "a", "b": 1}, a: "a", b: 1 }
Remarks
  • JSON object string should be well-formed.
  • JSON object string should not be nested.
Aggregate
Run a single aggregation step on the collection of input tuples.

Description

Aggregate takes a set of tuples as input and aggregate the results into 1 tuple. Additionally add new fields which use the functions count, sum or avg on any of the existing fields

Parameters

  • List of fields – the fields for which the data is grouped

References fields available in the input tuple or extension on top of them

Input

The fields referenced by the List of Fields parameter should be available in the input tuple. Futhermore new fields can be defined which are the result of function on top of the existing fields

Output

The output tuple for each unique occurrence of the values in the List of Fields

Example
concept Example {
val url = `document.location.href`
flow
=> bucket['10s']
=> aggregate['url',views='count url']}
This will collect the url for each page opened and aggregate the data based on that value. After 10 seconds for each unique url a tuple will be generated which will also contain the views field. This field contains the number of times the url has occurred.
Bucket
Convert a stream of tuples that arrive in a predefined time slot to a single collection of tuples.

Description

Convert a stream of tuples that arrive in a predefined time slot to a single collection of tuples. Guaranties the timely delivery of the output collection if at least a single tuple has arrived in the time slot.

Parameters

  • time – Time slot for collecting tupless

The postfix time notation uses ‘ms’ for milliseconds, ‘s’ for seconds, ‘m’ for minutes, ‘h’ for hours and ‘d’ for days.

Input

No specific requirements

Output

All tuples of the input stream

Example
concept Example {
    val url = `document.location.href`

    flow
        => bucket['10s']
        => aggregate['url',views='count url']
}
This will collect the url for each page opened and aggregate the data based on that value. After 10 seconds for each unique url a tuple will be generated which will also contain the views field. This field contains the number of times the url has occurred.
Buffer
Convert a stream of tuples that arrive in a predefined time slot to a single collection of tuples. The only guaranty is that of eventual delivery of the output collection.

Description

The buffer element takes a single unnamed parameter that defines the minimum time to buffer tuples. The buffer tries to send the data as soon as possible, but makes no guarantees. The data is send as a single list of tuples and stamped with the current time as packet time. This time can be retrieved using the time element..

Parameters

  • time –  Time slot for collecting tupless

The postfix time notation uses ‘ms’ for milliseconds, ‘s’ for seconds, ‘m’ for minutes, ‘h’ for hours and ‘d’ for days.

Input

No specific requirements

Output

All tuples of the input stream

Example
concept Example {
    val url = `document.location.href`

    flow
        => buffer['10s']
        => aggregate['url',views='count url']
}
This will collect the url for each page opened and aggregate the data based on that value. After 10 seconds for each unique url a tuple will be generated which will also contain the views field. This field contains the number of times the url has occurred.
Buffer: aggregate
Perform aggregation as a streaming step on input data that arrives in a specific time slot

Description

This element combines both the buffer and the aggregate flow elements into one element

Parameters

  • time –  Time slot for collecting tupless
  • List of fields – the fields for which the data is grouped

The postfix time notation uses ‘ms’ for milliseconds, ‘s’ for seconds, ‘m’ for minutes, ‘h’ for hours and ‘d’ for days. References fields available in the input tuple or extension on top of them

Input

The fields referenced by the List of Fields parameter should be available in the input tuple. Futhermore new fields can be defined which are the result of function on top of the existing fields

Output

The output tuple for each unique occurrence of the values in the List of Fields

Example
concept Example {
    val url = `document.location.href`

    flow
        => buffer:aggregate['10s','url',views='count url']
}
This will collect the url for each page opened and aggregate the data based on that value. After 10 seconds for each unique url a tuple will be generated which will also contain the views field. This field contains the number of times the url has occurred.
Buffer: session
Collect input tuples until a user session ends.

Description

This element buffers input tuples them using a session id. Apart from the session id, a time out can be defined after which the session has ended. The body of this code element can be used to perform additional calculation on the input tuples such as storing data in the session object. The body is executed at the moment the input tuple reach the code element. If the body returns false, the input tuple is discard. This is particularly convenient for calculting properties of a session both only storing them at the end of the session when the session expiration took place. All fields of the session object will be available as fields in the resulting flow

Parameters

  • time –  time of inactivity after which is sessions “ends”
  • session id – value which is used to aggregate the data

The postfix time notation uses ‘ms’ for milliseconds, ‘s’ for seconds, ‘m’ for minutes, ‘h’ for hours and ‘d’ for days. The session id refers

Input

The fields referenced by the List of Fields parameter should be available in the input tuple.

Output

Based on the use of false, a number of input tuple after the expiration as defined as field. Furthermore additional fields might be available

Example
concept Global {
  match '*'

  def guid = `dimml.sha1(+new Date()+Math.random().toString(36).slice(2)+navigator.userAgent).slice(20)`

  val url = `location`
  val sessionId = `sessionStorage.dimmlsid=sessionStorage.dimmlsid||guid()`

  @groovy
  flow
    => buffer:session['sessionId', timeout = '30s', `
      session.startPage = session.startPage?:url
      session.endPage = url
      session.pageCount = (session.pageCount?:0)+1
      false`]
    => console
}
This will collect the url for each page opened and the data based on the session ID. After 30 seconds for each unique url a tuple will be generated which will execute the code defined in the body. Since the startpage is stored only for the first tuple (and unchanged after that), it contains the first page of the visit. Similarly since the endPage parameter is updated for each new tuple, it will contain the URL of the last page of the visit. Finally the pagecount parameter is incremented with each page view. Since false is added to the body, no input tuples will be send to the next code element. Only 1 tuple will be send to the next code element, containing all session parameters. If for this DimML application first the URL http://documentation.dimml.io is opened followed by http://documentation.dimml.io/basic-syntax, the output tuple will be

{startPage=http://documentation.dimml.io, endPage=http://documentation.dimml.io/basic-syntax, pageCount=2}
Call
Execute a specified flow on the input tuple

Description

Send the input tuple to a `target` flow and forward the resulting output. The `target` flow can be defined in a separate concept, possibly in a separate file. This effectively sends the data to a flow in another concept, executes that concept, and returns the result.

Parameters

  • target (code) – Description of the concept and flow to be executed

Input

No specific requirements

Output

The output of the called flow given the current input tuple.

Example
concept Example {
    match '*'
    val a = `3.0`@groovy
    val b = `4.0`@groovy

    flow
    => call[`Functions:pythagoras`@dimml]
    => debug

    plugin debug
}

concept Functions {
    flow (pythagoras)
    => code[`Math.sqrt(a.power(2) + b.power(2))`@groovy]
}
This will result in the following tuple:

 {b=4.0, a=3.0, c=5.0}
The called target flow must be available, and thus be included somewhere if it is defined in another file.
Calls to the target flow share the same execution concept. Specifically, this means that calls to the same target flow from different places can be combined.
The call flow elements can be used to simulate function calls. This means the function flows can be included in arbitrary flows, so the content of the input tuple is uncertain and performing desired checks is recommended. For example, in the pythagoras flow you might want to verify that both a and b are present and numeric.

Camel
Use Apache Camel to quickly collect data from or distribute data to external sources

Description

Define new fields based on client side or server side code. The flow element is split into two specific elements: camel:to and camel:from, indicating if the DimML application should get data from that source or actually send the data.

Parameters

  • Camel instruction (string) – Camel instruction defining what type and specific instruction to execute
  • List of context and dependencies – Depending on the connection type additional details in native code

Details on possible camel instructions can be found at camel.apache.org/components.html. Please contact us if a specific Camel integration is required for providing the specific code. Additionally the example below illustrates several uses of the Camel flow element.

Input

No specific requirements

Output

The Camel component is started. Note that for the Camel components which run continously, calling the Camel flow element will result in the Camel component to start executing continously as well. For example calling the Camel flow element which makes a call to an FTP server for receiving files, will result in the Camel component to run continuously. That means that after completing the initial flow, if additional files are placed on the server in 2 hours, the flow will be executed with the new data (file) starting from the camel component.

Example
Note that for reading file from an FTP server it is also possible to read a zip file. To do that, the file can be read as usual, but with a specific flow element to follow up. Using the code below will unzip the file

flow
  => camel:from['ftp://..', $type = 'java.io.InputStream']
  => code[body = `def zip=new java.util.zip.ZipInputStream(body);zip.getNextEntry();zip`]

Additionally, most camel connections generate a burst of events. When opening a (log) file and transforming each row into an event, a high amount of events will be processed by the platform. While this is not a problem for the platform, not all end points where the data is distributed to can handle this. That is way it is best practice to make the data from the camel flow element available as a stream and use the stream flow element to throttle the events. Below is an example of that

@groovy

concept TestService {
  match `keepalive`@cron

  flow
    => camel:from['ftp://corona@46.51.177.161/david/input?binary=true&password=...&move=done&passiveMode=true', $type = 'java.io.InputStream']
    => code[body = `def zip=new java.util.zip.ZipInputStream(body);zip.getNextEntry();zip`]
   => stream:in[scanner = '\n']
   => console
}
Code
Add values to the data tuple based on specified script code.

Description

Define new fields based on client side or server side code.

Parameters

  • List of assignments – Scripts to define a value for a field

Each assignment contains an expression which is assigned to a field. This script can either be in Javascript (default, executed client side) or Groovy (executed server side). For the later, add @groovy after the script text.

Input

No specific requirements

Output

The data tuple is extended each field defined in de code element

Example
concept Example {
    match '*'

    val browser = 'Mozilla/5.0 (Windows NT) Firefox/36.0'

    flow
        => code[
     	    ie = `ua.contains('MSIE')`, 
     	    firefox = `ua.indexOf('Firefox')>=0`@groovy
        ]
}
This will result in the following tuple:
 “browser”: “1234567”, “ie”: “false”, “firefox”: “true”
Compact
Add a new field to the data tuple that contains a map of the current fields as value.

Description

Add a field called ‘compacted’ to the tuple that contains a (unmodifiable) map view of the tuple. The default field name can be overridden

Parameters

  • Field – Optional: field which the contains map

 Input

No specific requirements

 Output

An additional field with a map as specificed

 Example
concept all {
    match '*'

    val firstName = 'John'
    val lastName = 'Doe'

  flow
  => compact['map']
  => debug

  plugin debug
}
This will result in the debug plugin showing the following text in the console:
{lastName=Doe, firstName=John, map={lastName=Doe, firstName=John}}
CSV
Convert the data tuple to a CSV representation.

Description

Convert the data tuple to a CSV representation.

Parameters

  • Mode – field seperator

An alternative mode of operation can be selected using the mode=’1′ parameter. This will use the semicolon (;) as separator, no double quotes to escape data, escape any semicolon in the data (using \;)

Input

No specific requirements

Output

This element will output a new tuple with a single field called ‘serialized’. This field is of type String and contains the CSV formatted data of the input fields. Note that all output flow elements use this serialized field by default as input. The delimiter is a comma (,). All data is escaped using double quotes.

Note that there is no predefined sequence in which the values are shown. Should this be required, use the filter element

Example
concept Example {
    match '*'

    val firstName = 'John'
    val lastName = 'Doe'

    flow
    => filter['firstName','lastName']
    => csv[mode='1']
    => debug

    plugin debug
}
This will result in the debug plugin showing the following text in the console:
John;Doe
Additionally it is necessary that the separator that is used does not occur in the values for the fields. Since any use of the files and separating the columns will result in a different amount of identified values. E.g. in the previous example the firstName field could contain a comma (e.g. John,the one and only). The output would then be

John;the one and only;Doe

Processing this data would cause challenges since the additional comma results in 3 fields being identified (or the one and only as last name). Therefore it is advised to filter out any occurrences of the separator in the field values. The code below does that for the semicolon.

concept Example {
    match '*'

    val firstName = 'John;the one and only'
    val lastName = 'Doe'

    flow
    => filter['firstName','lastName']
    //the value of all fields are put in the serialized field, and then every occurence of the the semicolon is replaced
    => compact['serialized'] => code[serialized = `serialized.values()*.replace(';','').join(';')`]
    => debug

    plugin debug
}
Debug
Send the data tuple to the Javascript console.

Description

Data will pass through the ‘debug’ element unaltered. This allows the use of debug anywhere in a flow to get a peek of the data passing through. This element should be combined with the ‘debug’ plugin to make its output visible in the Javascript console.

Parameters

None

Input

Uses the ‘serialized’ field which contains a string to be shown

Output

No changes to the data tuple

Example
concept Example {
    match '*'

    val firstName = 'John'
    val lastName = 'Doe'

    flow
    => filter['firstName','lastName']
    => csv
    => debug

    plugin debug
}
This will result in the debug plugin showing the following text in the console:
“John”;”Doe”
Delay
Delay each input tuple for a specified amount of time

Description

For each input tuple, wait for the amount of time specified by the `delay` parameter before outputting it.

Parameters

  • delay (string) – Amount of time tuples are delayed

Input

No additional requirements

Output

No changes to the data tuple, though the flow is continued later (the delay)

Example
concept Example {
    match '*'

    val a = 'a'

    flow
        => delay['30s']
        => debug

    plugin debug
}
Output after 30 seconds in the console:
{a=a}
Expand
Interpret the data tuple as a map and include its fields in the current data tuple.

Description

Interpret the data tuple as a map and include its fields in the current data tuple.

Parameters

  • Field – Optional: field which contains the fields as a map

 Input

If no field is defined, the field ‘expanded’ will be use. The Field that is used has to be of type map. Therefore it is the map alternative to addjson, which uses a string as input field

Output

Each item in the map is added as a field to the data tuple.

Example
concept all {
  match '*'

  flow
  => code[map= `["a":1,"b":1]`@groovy]
  => expand['map']
  => debug

  plugin debug
}
This will result in the debug plugin showing the following text in the console:

{map={a=1, b=1}, a=1, b=1}
Filter
Only include the specified fields in the output tuples.

Description

Filter is used to limit the number of fields in the output tuples and also order them in a specific order. The latter is particularly convenient when having to write to a file.

Parameters

  • List of Field – Which fields should remain in the output tuples

Input

No specific requirements

 Output

The output tuples only contain the fields specified as in the element parameter. If one of the fields specified is not in the input tuple, filtering is ignored

Example
concept all {
  match '*'

  val first = "first"
  val second = "second"
  val third = "third"

  flow
  => filter["fourth","second","first"]
  => debug

  plugin debug
}
This will result in the debug plugin showing the following text in the console:
{second=second, first=first}
It is also possible to specify the fields which should be removed (instead of the fields to include). It’s not possible to combine included and excluded fields.
concept all {
  match '*'

  val first = "first"
  val second = "second"
  val third = "third"

  flow
  => filter["-first"]
  => debug

  plugin debug
}
This will result in the debug plugin showing the following text in the console:
{second=second, third=third}
FTP
Export the data to a file on an FTP server.

Description

Store flow data on a remote FTP server according to the specified FTP server settings. By default the data is appended to existing file; data is streamed real time.

Parameters

  • host (string) – FTP server hostname
  • user (string) – FTP server username
  • password (string) – FTP server password
  • dir (string) – FTP server directory
  • flush (boolean) – Enable flush mode

If flush mode is enabled, the element sends data to the FTP server even when data is still being received for a particular file.

Input

  • ?filename (string) – Filename

If the input tuple contains the ?filename property, its value will be used as the filename to store the data. This property can be set using other elements, like time or log.

When no filename is specified, the default filename output.log will be used.

Output

No output tuple is generated by this element.

Example
concept Example {
    match '*'
    val a = 'a'

    flow
        => time[?filename = "origin:yyyyMMdd'.log'"]
        => csv
        => ftp[
            host = 'example.com', 
            user = 'user', 
            password = 'password', 
            dir = 'directory',
            flush = 'true'
        ]
}

Connect to the example.com FTP server, with username user and password password, to append a CSV format of the input tuple to the file myfile[yyyyMMdd].log (with yyyyMMdd being the current date) in the directory directory.

In this case the file would contains the value ‘a’ (which is the CSV format of the entire input tuple.

Remarks
  • The FTP server should allow FTP connections from the DimML server.
  • The FTP server should be in passive mode.
  • The FTP server should allow append uploads.
sFTP
Export the data to a file on an FTP server.

Description

The sftp element is similar to the ftp element, the only difference being that it connects to an secure server. Therefor the syntax and use is the same as the ftp element.

HTTP
Perform an asynchronous HTTP request.

Description

Use an iterable field in the input tuple, to generate multiple similar output tuples. Every output tuple has the same content as the input tuple, but with one specific value of the iterable field.

Parameters

  • method – The HTTP method to use,
  • url -The URL to use in the HTTP request,
  • headers –  A set of HTTP headers separated by newline,
  • data – The body of the HTTP request.

All parameters can define their respective content in the definition of the ‘http’ element (like method, url and headers do in the example). Alternatively, the parameters url, headers and data can optionally specify a field in the input tuple to use. The field is identified by prepending it with the at sign (@)

Input

No specific input requirements

Output

A field result is added to the data tuple which contains the result of the HTTP request

Important: the header `Content-Type: application/json` is added to the requests automatically. Adding it to the headers parameter will cause an error when the code is executed.

Example

concept Example {
    match '*'

    => code[apiurl = `"http://53.15.28.16:8080/restapi/getversion"`@groovy]
    => http[
        url = '@apirul', 
        method = 'GET', 
        headers = 'Accept: application/json\n Authorization: Basic YWRtaRtaW4='
        data = ''
       ]
    => filter['result']
    => debug

    plugin debug
}
Output (note that the URL does not exist so the code won’t work in a sandbox environment):
{result = v3.4}
If
Continue in the current data stream only if a condition is met.

Description

The if flow evaluates an expression and continues with the flow if the expression evaluates to true. If the evaluation evaluates to false, all named flows provided with the if flow element are executed.

Output

The current flow is continued or stopped based on the evaluation of the expression

Example

flow
=> if[`beer == 'Heineken'`] (no-heineken)
=> ftp[..]


flow (no-heineken)
=> sql[..]

The expression in the if flow element in this example evaluated whether the beer parameter is set to a specific value. If this is the case, the flow is continued as defined (an no other flows are initiated). In this case the FTP element is executed. If the expression is not true, the no-heineken flow is initiated and the current flow stops (nothing is sent by FTP). With all of the current data in the data tuple the SQL element is executed.

IP
Add the visitor’s IP address to the data tuple.

Description

The ip flow adds the IP address attached to the current tuple’s original HTTP request as a field named ip to the data tuple. Note that some flow elements might make it impossible to access the original request, notably aggregation flows and the join flow.

Output

  • ip(string) – The IP address of the current tuple’s original HTTP request.

Example

concept Example {
    match '*'

    flow
        => ip
        => debug

    plugin debug
}
Output:
{ip=81.30.41.185}
Remarks

Some flow elements might make it impossible to access the original request, notably aggregation flows and the join flow.

JSON
Convert all fields in the data tuple to a JSON representation.

Description

Takes the input tuple and converts it to JSON. No escaping is performed. By default, the output tuple contains a single field called serialized, containing the JSON representation of the input fields. It is also possible to specify a field name as a parameter, in which case that field is added to the tuple, containing the JSON representation of the input fields.

Parameters

  • field(string) – Field name in which the JSON result will be stored

Output

If the json is used without a parameter:

  • serialized(string) – JSON object string of the input tuple’s fields

If the field parameter is used:

  • field(string) – Field name in which the JSON result will be stored

Example 1

concept Example {
    match '*'

    val a = 'a'
    val b = `1`

    flow
        => json
        => debug

    plugin debug
}
Output:
{"b":"1","a":"a"}
Example 2
concept Example {
    match '*'

    val a = 'a'
    val b = `1`

    flow
        => json['c']
        => debug

    plugin debug
}

Output:

{b=1, a=a, c={"b":"1","a":"a"}}
Remarks

  • No escaping is performed.
  • The fields in the JSON object string are in arbitrary order


Mail
Send an email using the availalable data

Description
The `mail` element allows the application to send an email using fields that have been processed until that point.

Parameters

  • configuration (json) – SMTP configuration

The configuration parameter is an expression resulting in a Javascript object / Java map that contains the configuration for connecting to an SMTP server. The following fields are used commonly in the SMTP configuration:

  • host (string) – SMTP server host name
  • port (integer) – SMTP server port
  • auth (boolean) – Indicates if SMTP server requires authentication
  • username (string) – SMTP server username
  • password (string) – SMTP server password
  • starttls.enable (boolean) – Encrypted connection
  • from (string) – Sender’s mail address

Input

  • to (string) – Recipient’s mail address
  • subject (string) – Mail subject
  • text (string) – Mail content
  • mime (string) – Used for extending the format to HTML

Output

The output tuple is equal to the input tuple.

Example 

concept Example {
    val Title = `'Email title'`@groovy
    val Body = `'Email body'`@groovy

    flow 
    => code[
      to = `'support@yourdomain.com'`,
      subject = `'DimML Mail services'`,
      mime = `'text/html'`,
      text = `'<html><head><title>'+Title+'</title></head><body style="font-family:verdana;font-size:11px;">'+Body+'</body></html>'`
    ]
    => mail[`EMAIL_OPTIONS`]

    const EMAIL_OPTIONS = `{
        username: 'userXXX',
        password: 'passwordXXXX',
        from: 'mail@dimml.io',
        auth: true,
        'starttls.enable': true,
        host: 'email-smtp.domain.com',
        port: 123
    }`
}
Mongo
Export the data to a Mongo DB.

Description
The `mongo` element allows usage of a MongoDB document store using the provided query and database connection settings. Documents can be inserted or modified based on flow tuple fields.

Parameters

  • uri (string) – MongoDB connection string
  • db (string) – Database name
  • collection (string) – Collection name
  • key (string, optional) – List of fields
  • set (string, optional) – Hash describing key-value pairs of the document
  • inc (string, optional) – Hash describing increment keys and values

For the `uri` parameter you can use the format as described on https://docs.mongodb.com/manual/reference/connection-string/.

The `key` parameter is a comma-separated list of vals used as the filter query for document updates. If if provided an upsert query is executed instead of an insert query. The documetns in MongoDB to be updated are the ones that match the conjunction of value equality checks for all listed keys in the `key` parameter, where the MongoDB document values are compared to the data tuple vals. If no documents match the filter query, a single document is inserted.

The `set` parameter is a code block returning a map of MongoDB fields and according values to be set. When omitted all vals in the data tuple will be sent to MongoDB. Vals in the `set` parameter that are also present in the `inc` parameter are excluded.

The `inc` parameter is a map returning code block specifying the MongoDB fields and according increment values. The counter field is either updated according to the increment value, or added as a field to the matching documents with the increment value as value.

Example 

concept Example {
	val metric1 = `1`@groovy
	flow
	=>mongo[
		uri = 'mongodb://user:password@ds054118.mongolab.com:54118/data',
		db = 'data',
		collection = 'web'
   		key = 'href, test',
		set = `[
			test: "bar",
			metric1: metric1 + 5
		]`@groovy,
		inc = `[
			counter1: metric1,
			counter2: (2 + 3) / metric1
		]`@groovy
	]
}
If no `set` parameter is provided, the document consists of all fields in the data tuple. If no `key` parameter is provided, the `inc` parameter will not be used. Keys used in the `inc` parameter are excluded from the keys in the `set` parameter to avoid ambiguity.
Nlp:language
Determines the language of the sentence or piece of text at hand
// {text: 'This is a happy message!'}
flow
  => nlp:language
This code element takes any (tokenized) human-written message and provides an additional language field. The language is composed of a ISO 639-1 language code followed by an underscore and a ISO 3166 Country Code (examples: en\_UK, de\_DE, nl\_NL)
Nlp:tokenize
Divides a sentence or piece of text into tokens (“words”).
flow
  => nlp:tokenize
The text is outputted as [“token1″, “token2″, “token3″]
Nlp:tag
Grammatically disassembles tokens of a message.
flow
  => nlp:tag
Determines the word categories of the words in the sentence by means of Part-Of-Speech (POS) tagging. The output is a list with a same amount of elements as the tokens where each element specifies the type of word of that token. For example [“DT”, “NN”, “VB”]
Nlp:polarity
Determines the polarity of a piece of text (either ‘negative’, ‘neutral’ or ‘positive’)
flow
  => nlp:polarity
A polarity label (positive, negative or neutral) and a score (in the interval [-1, 1]) is added to the fields based on the tokenization
Nlp:emotion
Add one of four scores of emotions adhering to Plutchik’s wheel of emotions.
concept Test {
 match '*'

 val text = 'What an very good product!!'

 flow
 => select[`text != null`@groovy]
 => nlp:language // {language: 'en_UK', ...}
 => nlp:tokenize // {tokens: ['This', 'is', 'a', 'happy', 'message', '!'], ...}
 => nlp:tag // {tags: ['DT', 'VBZ', 'DT', 'JJ', 'NN', 'SENT'], ...}
 => nlp:polarity // {polarity: 'positive', score: 2.1915518004205867, ...}
 => nlp:emotion // {surpriseAnticipation: 0.0, trustDisgust: 0.0, fearAnger: 0.0, joySadness: 1.0, ...}
 => console
 => debug

 plugin debug
}
//Output:
//{surpriseAnticipation=0.0, trustDisgust=1.0, fearAnger=0.0, joySadness=1.0, polarity=positive, score=4.4324966380895585, tags=[WP, DT, RB, JJ, NN, SENT, SENT], tokens=[What, an, very, good, product, !, !], language=en_UK, text=What an very good product!!}
Emotion detection produces roduces four different scores that indicate emotion as given by Plutchik’s Wheel of Emotions (see http://en.wikipedia.org/wiki/Plutchik\%27s\_Wheel\_of\_Emotions#Plutchik.27s\_wheel\_of\_emotions). Four scores of emotions adhering to Plutchik’s wheel of emotions. When a given emotion response is positive, this means the first mentioned emotion occurs, when it is negative, the second mentioned emotion occurs. When the score is 0, neither occur. As an example, when joySadness has a value of 0.25, there is a score of 0.25 for Joy. Similarly, if it were -0.25, Sadness has a score of 0.25.
Pattern
Recognize patterns of concept access and make the collected information from those concepts available.
=> session['sessionid']
=> pattern[previous = 'Global', current = 'Global']
The pattern element requires the presence of a ‘session’ field in the input tuple to operate. This field should expose ajava.util.Map that can be used to store session related data on the server (see the session element). Concept access will be recorded in the session. An output tuple will only be produced when a matching pattern is discovered. The output tuple will contain all the fields of the input tuple plus any named parameters.

In the example an output tuple will be produced when two Global concept accesses occur and the last access is the current request. Since both parameters in the example are ‘named’ (namely ‘previous’ and ‘current’), these fields will be added to the output tuple. They contain a reference to the data collected for these concepts when they reached the pattern element in the past. A more elaborate example:

concept Global {
  val url = `location`
  val sessionid = `dimml.cookies.get('session')`

  flow
    => session['sessionid']
    => pattern[previous = 'Global', current = 'Global']
    => select[`previous.url.contains('/orderform.html')`@groovy]
    => ...
}

This will collect the values ‘url’ and ‘sessionid’ on the client and start flow processing. First a ‘session’ field is added containing a server-side session based on the ‘sessionid’ field. Then pattern matching is executed to detect the occurence of two ‘Global’ concept accesses. Then only those matches are selected where the ‘previous’ access was for a page that contained ‘/orderform.html’ as part of the URL.

Select
Drop tuples when they do not match a specified expression.

Description

Evaluates an expression for every incoming tuple. The input tuple is dropped when the expression evaluates to false. In the expression, all fields in the input tuple can be accessed as local variables.

Parameters

  • expression(code) – The code expression to evaluate incoming tuple

Output

The output tuple is equal to the input tuple if the expression evaluates to true. If the expression evaluates to false, the input tuple is dropped and there is no output.

Example 1

concept Example {
    match '*'

    val a = 'a'

    flow
        => select[`a == 'a'`@groovy]
        => debug

    plugin debug
}
Output:
{a=a}
Example 2
concept Example {
    match '*'

    val a = 'a'

    flow
        => select[`a == 'b'`@groovy]
        => debug

    plugin debug
}

Output: Empty

Remarks

  • All fields in the input tuple can be accessed as local variables in the expression.
Session
Make a session object available to the data tuple. The session will be linked to a session id that should be present in the data.
=> session['sessionid']

A server-side session is a concurrent map that is made available to subsequent flow elements. Every distinct value of the field specified as a parameter will result in a distinct session. Sessions are guaranteed to remain alive on the server for at least 15 minutes of inactivity. The default name of the session object is ‘session’. This can be overwritten by providing a named parameter:

=> session[sessmap = 'sessionid']
Split
Split the input tuple in multiple output tuples based on a single field that should contain an iterable value.

Description

Use an iterable field in the input tuple, to generate multiple similar output tuples. Every output tuple has the same content as the input tuple, but with one specific value of the iterable field.

Parameters

  • field(string) – The field name containing an iterable value

Input

  • field(iterable) – Field in the input tuple containing an iterable value pointed to by the flow parameter

Output

For every item of the iterable field value an output tuple is generated with the same fields as the input tuple, except for the value of field, which contains a single item from the original field iterable.

Example

concept Example {
    match '*'

    val a = 'a'
    val i = `[1, 2, 3]`@groovy

    flow
        => split['i']
        => debug

    plugin debug
}

Output:

{a=a, i=3}
{a=a, i=1}
{a=a, i=2}
Remarks

The output tuples are in arbitrary order.

SQL
Interact with an SQL database.

Description

The sql element allows usage of a relational database using the provided query and database connection settings. The query can be parameterized by referring to flow tuple fields using the colon notation.

Parameters

  • statement(string) – Database query
  • configuration(json) – Database configuration
  • cache(string) (optional) Specify caching parameters
  • limit(integer) – (optional) Maximum number of results for a selection query
  • fieldName(string) – (optional) Field name for storing selection query results
  • batch(boolean) – (optional) Specify batch size

The statement is the SQL query, which using colon notation can be parameterized by including the values from the data tuple. So field field from the data tuple is available as :field in the SQL query statement.

The configuration parameter is an expression resulting in a Javascript object / Java map that contains the configuration for connecting to a datasource. The following fields are used commonly in the datasource configuration:

  • type- Mandatory field to indicate the type of database to connect to. Current options are ‘mysql’, ‘postgresql’ or ‘oracle’.
  • serverName- The host name to use in connecting to the target database.
  • port- The port to use in connecting to the target database.
  • databaseName- The name of the database on the target server.
  • user- (optional) Username when authentication is needed.
  • password- (optional) Password when authentication is needed.

Any field available in configuring the target datasource can be specified using this configuration mechanism. The exact fields depend on the type of database.

Parameter cache applies only to selection queries. It enables caching of results returned by a selection query. When the same query is executed again, results will be returned from the cache. A cache hit is identified by comparing all tuple fields that are used to parameterize the query. The syntax of this parameter is [size]:[time]. The size determines the maximum size of the cache. When there is no more room in the cache the least recently used item is evicted. The time parameter specifies a maximum time for items to remain in the cache. The prefix can range from ms for milliseconds to h for hour. The default is 1m.

Parameter limit applies only to selection queries. It specifies the maximum amount of records to retrieve. By default this limit is set to 1, which is a special mode of operation in that it will add retrieved fields to the current tuple. When a higher limit is set, the data will be stored as a list of maps, where each map contains the data of one row. By default this list is stored in the field named result. This can be overwritten using the fieldName parameter.

Parameter fieldName applies only to selection queries. It overwrites the field name used to store the result of a multi-record selection query as described in the limit parameter explanation.

The batch parameter gives control over how queries are batched. By default all calls to the external datasource are batched. This will improve throughput at the cost of latency. By waiting a small amount of time the sql flow will ensure that most calls to the database benefit from this mechanism. The batch size can be specified using this parameter; the default value is 50.

Sending insert or update queries in batches is usually the best thing to do. In selection queries this might be problematic, especially when the query is part of a flow that produces synchronous output. Furthermore, data that passed through a batched selection query will be bundled together and treated as one set. It will have lost access to the original context that produced the data. When using a selection query in a synchronous flow always set batch to1, which effectively disables batching for that query. Tuples passing through the sql flow will retain their original context, allowing the flow to be used in combination with synchronous flows as used by the output plugin.

Output

For insert and update queries:

  • sqlCount(integer) – The number of affected rows

For selection queries with one result row:

  • List of selected columns- Each selected column is transformed to a key-value pair that is added to the output tuple.

For selection queries with multiple result rows:

  • fieldName- List of maps, where each map contains the data of one row. The field name is defined by thefieldName parameter, which is result by default.

Example

concept Example {
    match '*'

    const dataSource = `{
        type: 'mysql',
        port: 3306,
        serverName: 'localhost',
        databaseName: 'test',
        user: 'test',
        password: 'test'
    }`

    val a = 'a'

    flow
        => sql["INSERT INTO `test` (`a`) VALUES (:a)", `dataSource`]
        => debug

    plugin debug
}
Output:
{a=a, sqlCount=1}

Remarks

  • The database server should be accessible using the provided connection settings.
  • The database user should have the correct privileges to execute the query.
Store
Make the data availablein in a file using an intermediate file storing server

Description

The store flow element is a more robust implementation of the FTP flow element. The store flow element stores data in a file on a FTP or SFTP server. The biggest difference is that the store flow element stores the file in an intermediate server. This allows for better robustness since the file is resend several times if the connection is unavailable. Additionally, the file is transferred from 1 location/server at the end of the time interval. This requires a lot less (parallel) connections. The only disadvantage is that data is stored in a AWS S3 environment while the FTP element streams the data directly to the end point

Parameters

  • uri(string) – The uri parameter identifies the target location. These take the form:
    scheme://username:password@host:port/path/to/storage?param1=value1&param2=value2

    The currently supported schemes are ‘ftp’ and ‘sftp’. They both support encrypted credentials by supplying the credentials as a parameter (called ‘credentials’). Depending on the contents of the credentials you can omit one or more of the following: username, password, host. The ‘ftp’ scheme supports tunneling FTP over SSL (called FTPS). To activate add ‘secure’ to the list of parameters. Certificate validation can optionally be disabled by specifying ‘no-validate':

    ftp://user2:...@46.51.177.161/david?secure&no-validate

    The ‘sftp’ scheme supports private key authentication. To use it, enter the URI encoded private key as a parameter named ‘key’.
    All scheme support storing the data in a zip file. To use it, add zip as a query parameter.

  • file (string) – The file ‘pattern’ is a name for files in the target location, where ‘%s’ is replaced with a timestamp in the format ‘yyyyMMddHHmmss’. So %s.log in the example will result in files like 20160628132700.log. The default = ‘%s.log’.
  • window(string) – The ‘window’ parameter specifies the logging window to use. The smallest possible window is 30s (30 seconds). The new store flow can reliably store any window size. The commit to the external system won’t start until the complete file is finished. The default is 5 minutes.
  • seperator(string, optional) – The ‘separator’ parameter is used as a separator between data elements when the file is written to the external location.
  • header(string, optional) – The ‘header’ parameter is used to add a fixed line of text at the top of each file
  • footer(string, optional) – The ‘footer’ parameter is used to add a fixed line of text at the top of each file
  • data(code, optional) – The ‘data’ parameter specifies a code block that is evaluated every time data arrives at the store flow. Instead of relying on another flow (like ‘csv’) to specify the format of what is stored, the data flow can be used to export to any format desired.

Output

At the specified time interval, the file with all data is provided on the location

Example

flow
  => compact
  => store[
    uri = 'ftp://user2:...@46.51.177.161/david',
    window = '1m',
    separator = ',\n ',
    header = '[\n ',
    footer = '\n]',
    data = `JsonOutput.toJson(compacted)`@groovy
  ]
//In this example the 'compact' flow is used to make the tuple available as a map.
//The store flow is configured to create a JSON array containing all tuples as JSON objects.

The above syntax uses a static storage key. The URI and other parameters do not change during the lifetime of the flow. Alternatively, you can employ a dynamic storage key. When the first parameter to the store flow is an unnamed code block, it is interpreted to be a map / javascript object that contains all the parameters required to store the data.The example below would store the data from different sections of a website in different files (assuming ‘url’ contains the current URL and the first path element identifies a section). You could also use this mechanism to create files per user session / visitor ID.

flow
  => store[`[
    uri: 'ftp://user2:894zhmkl%40!@46.51.177.161/david',
    window: '1m',
    separator: ',\n',
    pattern: "${url==null?'root':(uri(url).getPath()?.substring(1)?.takeWhile{it!='/'}?:'root')}_%s.log"
  ]`@groovy]
Stream
Make the data available as HTTP streaming resource

Description

The stream flow element will make the input tuples available as an HTTP streaming resource. This can be used for instance in as a part of a web page containing a stream of DimML generated data. Also the sandbox environment is designed to display all data made available for the current sandbox environment. The URL uses the path component of the DimML URI preceded by http://baltar-dev.dimml.io:8080/. That means that since all sandbox DimML files are located in 1 folder that the URI to capture a stream from the file example.dimml for sandbox user 1234 will be

http://balater-dev.dimml.io:8080/sandbox/1234/example.dimml.

Note that it is also possible to add a name and an IP addres as parameter. This can be used to call a specific stream from a DimML file and make streams only available from defined IP addresses.

Parameters

  • name(string) (optional) Stream name
  • IP address (integer) – (optional) IP addresses allowed for the stream

Output

An HTTP streaming resource containing all data tuples

Example

concept Global {
    match `0/5 * * * * ?`@cron

    flow
    => code[url=`'hello'`@groovy]
    => stream

    plugin stream

}

This DimML application can be copy pasted directly into the sandbox environment. There it will show the text ‘hello’ every 5 seconds in the output stream.

Time
Add date and time information to the data tuple.

Description

When a tuple reaches a flow element there are three timestamps that might be of interest: the original creation time of a tuple, the last modified time of a tuple, and a timestamp for the entire set. These three times are referred to as ‘origin’, ‘modified’, and ‘packet’ time respectively. With the time flow element it is possible to add fields to the tuple with values based on this time information.

Parameters

  • List of fields- The field specifications for the output tuple

Every field has the format field = ‘type:format’, where

  • fieldis the name of the field added to the tuple
  • typespecifies which tuple timestamp is used (‘origin’, ‘modified’, or ‘packet’)
  • formatspecifies how the type timestamp is formatted

Output

The input tuple plus the list of fields with corresponding time values as specified as defined by the flow element parameters.

Example

concept Example {
    match '*'

    flow
        => time[
            dateStamp = 'origin:yyyy-MM-dd',
            timeStamp = 'modified:HH:mm:00'
        ]
        => debug

    plugin debug
}

Output:

{dateStamp=2015-04-02, timeStamp=15:17:00}
Remarks

The special ?filename property can also be set with the time flow element. This property is used by file-based flow elements to set a time based file name.

Twitter
Retrieve data from the Twitter streaming API

Description

The twitter producer flow emits data whenever it is available in Twitter. It should be used in combination with a keepalive service, to keep the flow active and ensure that the `twitter` flow element keeps emitting data tuples.

Parameters

  • configuration – JSON field containing the twitter configuration details

The configuration parameter is an expression resulting in a Javascript object / Java map that contains the configuration for connecting to the Twitter streaming data API. The following fields are used commonly in the Twitter configuration:

  • consumerKey (string) – Twitter consumer key
  • consumerSecret (string) – Twitter consumer secret
  • token (string) – Twitter token
  • secret (string) – Twitter secret
  • terms (Array[string]) – List of terms to monitor in the streaming API
  • lang (Array[string]) – Language filter

 

Output

  • message – JSON formatted tweet data returned form the Twitter streaming API

Example

concept Example {
        match `keepalive`@cron

        flow
            => twitter[`TWITTER_OPTIONS`]
            => debug

        const TWITTER_OPTIONS = `{
            consumerKey: '...',
            consumerSecret: '...',
            token: '...',
            secret: '...',
            terms: ['#beer']
            lang: ['nl','en']
        }`
    }
Output:

Data tuples containing JSON formatted `message` values, for every message placed on Twitter that matches the specified terms and languages.

Remarks

The `twitter` flow element keeps emitting data tuples for each new tweet retrieved from the Twitter API. Normally the flow would be deactived by the system after a certain period of time. To prevent this the keepalive mechanism from the example can be used to keep the flow active.

User agent
Make user agent data available server side

Description

The user agent flow element will process the user agent headers into a list of properties by processing the value for the header server side. Note that the request header can be used to capture the user agent string. Additionally for web pages the Javascript function navigator.userAgent can be used to capture the user agent string as well.

Parameters

  • input field name – name of the field that contains the user agent string
  • output field name – name of the field that contains the properties as an object. The properties are name, device, family, os, osVersion, type, version

Output

The following fields are added to the output tuple:

  • family – Browser family
  • name – Browser name
  • type – Browser type
  • version – Browser version
  • os – Operating System name
  • osVersion – Operating System version
  • device – Device type

More details on the processing can be found here

Example

@groovy
concept Test {
  match '*'

  request[ua = `headers['User-Agent']`]

  flow
    => useragent[parsed = 'ua']
    => console
}
Output:

{parsed={name=Chrome, device=PERSONAL_COMPUTER, family=CHROME, os=WINDOWS, osVersion=10.0, type=BROWSER, version=54.0.2840.99}, ua=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.99 Safari/537.36}