fiware-cosmos-orion-spark-connector-examples

License Documentation badge

This repository contains a few examples for getting started with the fiware-cosmos-orion-spark-connector:

Setup

In order to run the examples, first you need to clone the repository:

git clone https://github.com/ging/fiware-cosmos-orion-spark-connector-examples
cd fiware-cosmos-orion-spark-connector-examples

Next, download the connector JAR from the connector repository and from the fiware-cosmos-orion-spark-connector run:

mvn install:install-file -Dfile=$(PATH_DOWNLOAD)/orion.flink.connector-1.2.2.jar -DgroupId=org.fiware.cosmos -DartifactId=orion.flink.connector -Dversion=1.2.2 -Dpackaging=jar

where PATH_DOWNLOAD is the path where you downloaded the JAR.

Example 1 : Receive simulated notifications

The first example makes use of the OrionSource in order to receive notifications from the Orion Context Broker. For simplicity, in this example the notifications are simulated with a curl command. Specifically, the example receives a notification every second that a node changed its temperature, and calculates the minimum temperature in a given interval.

Simulating a notification

In order to simulate the notifications coming from the Context Broker you can run the following script (available at files/example1/curl_Notification.sh):

while true
do
    temp=$(shuf -i 18-53 -n 1)
    number=$(shuf -i 1-3113 -n 1)

    curl -v -s -S X POST http://localhost:9001 \
    --header 'Content-Type: application/json; charset=utf-8' \
    --header 'Accept: application/json' \
    --header 'User-Agent: orion/0.10.0' \
    --header "Fiware-Service: demo" \
    --header "Fiware-ServicePath: /test" \
    -d  '{
         "data": [
             {
                 "id": "R1","type": "Node",
                 "co": {"type": "Float","value": 0,"metadata": {}},
                 "co2": {"type": "Float","value": 0,"metadata": {}},
                 "humidity": {"type": "Float","value": 40,"metadata": {}},
                 "pressure": {"type": "Float","value": '$number',"metadata": {}},
                 "temperature": {"type": "Float","value": '$temp',"metadata": {}},
                 "wind_speed": {"type": "Float","value": 1.06,"metadata": {}}
             }
         ],
         "subscriptionId": "57458eb60962ef754e7c0998"
     }'


    sleep 1
done

Receiving data and performing operations

This is the code of the example which is explained step by step below:

package org.fiware.cosmos.orion.spark.connector.examples.example1


import org.apache.spark._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.fiware.cosmos.orion.spark.connector.OrionReceiver

object Example1{

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[4]").setAppName("Temperature")
    val ssc = new StreamingContext(conf, Seconds(10))
    // Create Orion Source. Receive notifications on port 9001
    val eventStream = ssc.receiverStream(new OrionReceiver(9001))
    // Process event stream
    val processedDataStream = eventStream
      .flatMap(event => event.entities)
      .map(entity => {
        val temp: Float = ent.attrs("temperature").value.asInstanceOf[Number].floatValue()
        (entity.id, temp)
      })
      .reduceByKeyAndWindow(_ min _ ,Seconds(10))
    processedDataStream.print
    ssc.start()
    ssc.awaitTermination()
  }
}

After importing the necessary dependencies, the first step is creating the source and adding it to the environment.

val eventStream = ssc.receiverStream(new OrionReceiver(9001))

The OrionReceiver accepts a port number as a parameter. The data received from this source is a DataStream of NgsiEvent objects. You can check the details of this object in the connector docs

In the example, the first step of the processing is flat-mapping the entities. This operation is performed in order to put together the entity objects of several NGSI Events.

val processedDataStream = eventStream
  .flatMap(event => event.entities)

Once you have all the entities, you can iterate over them (with map) and extract the desired attributes; in this case, it is the temperature. And In each iteration you create a tuple with the entity id and the temperature.

// ...
.map(entity => {
    val temp: Float = ent.attrs("temperature").value.asInstanceOf[Number].floatValue()
    (entity.id, temp)
})

Now you can group the created objects by entity id and perform the operation in a time interval providing a custom processing window:

// ...
.reduceByKeyAndWindow(_ min _ ,Seconds(10))

After the processing, you can print the results on the console:

processedDataStream.print

Or you can persist them using the sink of your choice.

Example 2: Complete Orion Scenario with docker-compose

The second example does the same processing as the previous one but it writes the processed data back in the Context Broker.

Setting up the scenario

In order to test this feature, we need to have a Context Broker up and running. For this purpose, a docker-compose file is provided under files/example2, which deploys all the necessary containers for this scenario. You just need to run the following command (probably with sudo):

docker-compose up

Once you have the Context Broker and the rest of the machines up and running, you need to create some entities and subscribe to them in order to get a notification whenever their value change. First, let's create a room entity (you can find the script under files/example2/curl_CreateNewEntity.sh):

curl localhost:1026/v2/entities -s -S -H 'Content-Type: application/json' -d @- <<EOF
{
  "id": "Room1",
  "type": "Room",
  "temperature": {
    "value": 23,
    "type": "Float"
  },
  "pressure": {
    "value": 720,
    "type": "Integer"
  },
  "temperature_min": {
    "value": 0,
    "type": "Float"
  }
}
EOF

Now you can subscribe to any changes in the attributes you are interested in. Again, you can find this script under (files/example2/curl_SubscribeToEntityNotifications.sh). Do not forget to change $MY_IP to your machine's IP Address (must be accesible from the docker container):

curl -v localhost:1026/v2/subscriptions -s -S -H 'Content-Type: application/json' -d @- <<EOF
{
  "description": "A subscription to get info about Room1",
  "subject": {
    "entities": [
    {
        "id": "Room1",
        "type": "Room"
    }
    ],
    "condition": {
    "attrs": [
        "pressure",
    "temperature"
    ]
    }
  },
  "notification": {
    "http": {
    "url": "http://$MY_IP:9001/notify"
    },
    "attrs": [
    "temperature",
    "pressure"
    ]
  },
  "expires": "2040-01-01T14:00:00.00Z",
  "throttling": 5
}
EOF

You might want to check that you have created it correctly by running:

curl localhost:1026/v2/subscriptions

Triggering notifications

Now you may start performing changes in the entity's attributes. For that, you can use the following script (files/example2/curl_ChangeAttributes.sh):

while true
do
    timestamp=$(shuf -i 1-100000000 -n 1)
    temp=$(shuf -i 18-53 -n 1)
    number=$(shuf -i 1-3113 -n 1)
    # echo

    curl localhost:1026/v2/entities/Room1/attrs -s -S -H 'Content-Type: application/json' -X PATCH -d '{
      "temperature": {
        "value": '$temp',
        "type": "Float"
      },
      "pressure": {
        "value": '$number',
        "type": "Float"
      }
    }'
    sleep 1
done

Receiving data, performing operations and writing back to the Context Broker

Let's take a look at the Example2 code now:

package org.fiware.cosmos.orion.spark.connector.examples.example2

import org.apache.spark._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.fiware.cosmos.orion.spark.connector._

object Example2 {
  final val URL_CB = "http://sparkexample_orion_1:1026/v2/entities/"
  final val CONTENT_TYPE = ContentType.JSON
  final val METHOD = HTTPMethod.POST

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[4]").setAppName("Temperature")
    val ssc = new StreamingContext(conf, Seconds(10))
    // Create Orion Source. Receive notifications on port 9001
    val eventStream = ssc.receiverStream(new OrionReceiver(9001))

    // Process event stream
    val processedDataStream = eventStream
      .flatMap(event => event.entities)
      .map(ent => {
        val temp: Float = ent.attrs("temperature").value.asInstanceOf[Number].floatValue()
        (ent.id, temp)
      })
      .reduceByKeyAndWindow(_ min _ ,Seconds(10))
      .map(a=> new Temp_Node(a._1,a._2))
      .map(tempNode => {
        val url = URL_CB + tempNode.id + "/attrs"
        OrionSinkObject(tempNode.toString, url, CONTENT_TYPE, METHOD)
      })
    // Add Orion Sink
    OrionSink.addSink( processedDataStream )
    // print the results with a single thread, rather than in parallel
    processedDataStream.print()
    ssc.start()
    ssc.awaitTermination()
  }
  case class Temp_Node(id: String, temperature: Float) extends  Serializable {
    override def toString :String = { "{\"temperature_min\": { \"value\":" + temperature + ", \"type\": \"Float\"}}" }
  }
}

As you can see, it is very similar to the previous example. The main difference is that it writes the processed data back in the Context Broker through the OrionSink. After calculating the minimum temperature, the output data needs to be adapted to the format accepted by the OrionSink. It accepts a stream of OrionSinkObject, which accepts 4 mandatory arguments:

  • Message: It is the content of the update that is going to be sent to the Context Broker. It can be a single value converted to string or, more commonly, a stringified JSON containing the new data. The connector does not build the JSON from a Scala object for us; we need to build it ourselves. We may want to override the toString message of the case class we've created in order to facilitate this process, as seen on the example.

  • URL: It is the URL to which the message will be posted. Normally it has a common base but it somewhat varies depending on the entity we're receiving data from.

  • Content Type: Whether the message is in JSON format (ContentType.JSON) or in plain text (ContentType.Plain).

  • HTTP Method: The HTTP method used for sending the update. It can be: HTTPMethod.POST, HTTPMethod.PUT or HTTPMethod.PATCH.

In the example, an OrionSinkObject is built from the Temp_Node object converted to JSON. Thus, the specified data type is JSON. The URL is formed with the hostname of the docker container where the Context Broker is, and the id of the specific entity we are receiving data from. It uses the HTTP Post method in order to send the message to the Context Broker.

// ...
.map(tempNode => {
    val url = URL_CB + tempNode.id + "/attrs"
    OrionSinkObject(tempNode.toString, url, CONTENT_TYPE, METHOD)
})

Finally, we send the processed DataStream through the OrionSink

OrionSink.addSink( processedDataStream )

If you run the example, you will see that the minimum temperature calculated is displayed in the console. You can test that it has been changed in the Context Broker as well by running the following command several times and checking that the temperature_min attribute is constantly changing:

curl localhost:1026/v2/entities/Room1

Example 3: Packaging the code and submitting it to the Spark Job Manager

In the previous examples, we've seen how to get the connector up and running from an IDE like IntelliJ. In a real case scenario, we might want to package our code and submit it to a Spark cluster in order to run our operations in parallel.

Follow the Setting up the scenario section if you haven't already in order to deploy the containers needed. After that, we need to make some changes to our code.

Subscribing to notifications

First, we need to change the notification URL of our subscription to point to our Spark node like so (files/example3/curl_SubscribeToEntityNotifications.sh):

curl -v localhost:1026/v2/subscriptions -s -S -H 'Content-Type: application/json' -d @- <<EOF
{
  "description": "A subscription to get info about Room1",
  "subject": {
    "entities": [
    {
        "id": "Room1",
        "type": "Room"
    }
    ],
    "condition": {
    "attrs": [
        "pressure",
        "temperature"
    ]
    }
  },
  "notification": {
    "http": {
    "url":"http://spark-master:9001"
    },
    "attrs": [
    "temperature",
    "pressure"
    ]
  },
  "expires": "2040-01-01T14:00:00.00Z",
  "throttling": 5
}
EOF

Modifying the Context Broker URL

Since we are going to run the code from inside a Spark Task Manager container, we no longer can refer to the Context Broker as "http://localhost:1026". Instead, the code in Example 3 only differs from the previous example in the URL specified for the Context Broker: "http://orion:1026".

Packaging the code

Let's build a JAR package of the example. In it, we need to include all the dependencies we have used, such as the connector, but exclude some of the dependencies provided by the environment (Spark, Scala...). This can be done through the maven package command without the add-dependencies-for-IDEA profile checked. This will build a JAR file under target/orion.flink.connector.examples-1.2.2.jar.

Submitting the job

Let's submit the Example 3 code to the Spark cluster we have deployed. In order to do this you can use the spark-submit command provided by Spark.

You can check that the vale for temperature_min is changing in the Context Broker by running:

curl localhost:1026/v2/entities/Room1

Example 4: Other operations

The previous examples focus on how to get the connector up and running but do not give much importance to the actual operations performed on the data received. In fact, the only operation done is calculating the minimum temperature on a time window. Nevertheless, Spark allows us to perform custom operations such as calculating the average.

For this, we use the accumulator and the value of the 'reduceByKeyAndByWindow' function inside the given timespan to have a tuple with the sum of the values and the count of them.

.reduceByKeyAndWindow((acc:(Float,Long),value:(Float,Long))=>{(acc._1 + value._1, acc._2 + value._2)}, Seconds(10))

So then we can use the function 'map' to get tuples classify by id and its average temperature.

.map((agg :  (String,(Float,Long))) =>  (agg._1, agg._2._1 / agg._2._2))
object Example4{

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[4]").setAppName("AvgTemperature")
    val ssc = new StreamingContext(conf, Seconds(10))
    ssc.checkpoint("./output")
    // Create Orion Source. Receive notifications on port 9001
    val eventStream = ssc.receiverStream(new OrionReceiver(9001))

    // Process event stream
    val processedDataStream = eventStream
      .flatMap(event => event.entities)
      .map(ent => ent.id -> {(ent.attrs("temperature").value.asInstanceOf[Number].floatValue(),1L)} )
      .reduceByKeyAndWindow((acc:(Float,Long),value:(Float,Long))=>{(acc._1 + value._1, acc._2 + value._2)}, Seconds(10))
      .map((agg :  (String,(Float,Long))) =>  (agg._1, agg._2._1 / agg._2._2))

    processedDataStream.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

Example 5: Other operations (2)

In this example, similar to the previous one, we calculate the average of all temperatures received without separating by id.

object Example5{

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[4]").setAppName("AvgTemperature")
    val ssc = new StreamingContext(conf, Seconds(10))
    ssc.checkpoint("./output")
    // Create Orion Source. Receive notifications on port 9001
    val eventStream = ssc.receiverStream(new OrionReceiver(9001))

    // Process event stream
    val processedDataStream = eventStream
      .flatMap(event => event.entities)
      .map(ent =>(ent.attrs("temperature").value.asInstanceOf[Number].floatValue(),1L))
      .reduceByWindow((acc:(Float,Long),value:(Float,Long))=>{(acc._1 + value._1, acc._2 + value._2)},Seconds(10), Seconds(10))
      .map((agg :  (Float,Long)) =>( agg._1 / agg._2))

    processedDataStream.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

Example 6: Structured values for attributes

So far, the examples provided have dealt with simple attributes in the shape of integers or strings. Some use cases require more complex attributes, such as objects (https://fiware-orion.readthedocs.io/en/master/user/structured_attribute_valued/index.html). This connector parses this sort of values as scala Maps[String,Any], which eases the process of iterating through their properties.

Example 6 provides an example in which the data received is a list of bus schedules and their prices. These prices are constantly changing and Spark is used in order to calculate the minimum prices within a time window.

The simulated Orion notification is as follows (available at files/example5/curl_Notification.sh):

while true
do
    bus1=$(shuf -i 10-53 -n 1)
    bus2=$(shuf -i 10-44 -n 1)

    curl -v -s -S X POST http://localhost:9001 \
    --header 'Content-Type: application/json; charset=utf-8' \
    --header 'Accept: application/json' \
    --header 'User-Agent: orion/0.10.0' \
    --header "Fiware-Service: demo" \
    --header "Fiware-ServicePath: /test" \
    -d  '{
         "data": [
             {
                 "id": "R1",
                 "type": "Node",
                 "information": {
                     "type": "object",
                     "value": {
                        "buses":[
                            {
                                "name": "BusCompany1",
                                "schedule": {
                                    "morning": [7,9,11],
                                    "afternoon": [13,15,17,19],
                                    "night" : [23,1,5]
                                },
                                "price": '$bus1'
                            },
                            {
                                "name": "BusCompany2",
                                "schedule": {
                                    "morning": [8,10,12],
                                    "afternoon": [16,20],
                                    "night" : [23]
                                },
                                "price": '$bus2'
                            }
                        ]
                     },
                     "metadata": {}
                    }
             }
         ],
         "subscriptionId": "57458eb60962ef754e7c0998"
     }'


    sleep 1
done

The code for Example 6 is similar to the previous examples. The only difference is that it is necessary to manually parse every item of the object attribute in order to make use of it.

object Example6 {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[4]").setAppName("Temperature")
    val ssc = new StreamingContext(conf, Seconds(10))
    // Create Orion Source. Receive notifications on port 9001
    val eventStream = ssc.receiverStream(new OrionReceiver(9001))

    // Process event stream
    val processedDataStream = eventStream
      .flatMap(event => event.entities)
      .map(ent => {
        ent.attrs("information").value.asInstanceOf[Map[String, Any]]
      })
      .map(list => list("buses").asInstanceOf[List[Map[String, Any]]])
      .flatMap(bus => bus)
      .map(bus => {
        val name = bus("name").asInstanceOf[String]
        val price = bus("price").asInstanceOf[scala.math.BigInt].intValue()
        (name, price)
      })
      .reduceByKeyAndWindow(_ min _, Seconds(10))

    processedDataStream.print
    ssc.start()
    ssc.awaitTermination()
  }
}