Blocking Behaviors

Blocking Behaviors are a type of behavior that have to perform some tasks that can block threads. Due to the nature of GreenLightning and its scheduling, it is necessary to have blocking behaviors for tasks such as external database calls.

Creating a Custom Blocking Behavior

Blocking Behaviors can be created like normal behaviors. However, they have a special register method that needs to be adhered to.

Below is an example of a blocking behavior inside a HTTP client. Edit this example to add other libraries (such as JDBC-MySQL) that would otherwise block a GreenLightning application.

A blocking behavior requires a unique field association to ensure correct ordering of inputs and outputs. For HTTP, we will use the connectionID as this field association.

ExampleBlockingBehavior
package com.ociweb.gl.example.blocking;

import com.ociweb.gl.api.blocking.BlockingBehavior;
import com.ociweb.pronghorn.pipe.ChannelReader;
import com.ociweb.pronghorn.pipe.ChannelWriter;

public class ExampleBlockingBehavior extends BlockingBehavior {
   // The connection and sequence ID for HTTP to ensure ordering.
   private long connectionId = -1;
   private long sequenceId = -1;

   /** 
   * Called when a message gets passed to this behavior to perform some
   * task. These messages should be defined by you and can be read using 
   * the reader.
   */      
   @Override
   public void begin(ChannelReader reader) {
      assert(-1 == connectionId);
      assert(-1 == sequenceId);
      
      // Read the connection ID passed in.
      connectionId = reader.structured().readLong(Field.CONNECTION_ID);
      sequenceId = reader.structured().readLong(Field.SEQUENCE_ID);   
      
      // Custom properties here
      // example:
      // currentDatabaseAction = reader.structured().readInteger(Field.DB_ACTION);
   }

   @Override
   public void run() throws InterruptedException {
      // **********************************************
      // Here, perform an action that would block the current thread.
      // This could be creating a database connection,
      // querying a table, calling a library or a native call,
      // etc...
      Thread.sleep(1); // You can delete this, it is just an example of blocking
      // **********************************************
   }

   @Override
   public void finish(ChannelWriter writer) {
      // We need to write back our data.
      // This is pretty much the begin() method but in reverse.
      writer.structured().writeLong(Field.CONNECTION_ID, connectionId);
      writer.structured().writeLong(Field.SEQUENCE_ID, sequenceId);
      
      // Finish writing back to the structure, this always comes last!
      writer.structured().selectStruct(Struct.DATA);

      // Reset connectionId and sequenceId
      connectionId = -1;
      sequenceId = -1;
   }

   @Override
   public void timeout(ChannelWriter writer) {
      // If a blocking action takes too long (default timeout is 1 minute).
      System.out.println("This took too long to run..");      
      // NOTE: If you have a payload like mentioned above, you should update it
      // here to represent that a timeout occurred.
   }
}

// Separate files:

public enum Field
{
   CONNECTION_ID, SEQUENCE_ID;
}

public enum Struct {
	DATA;
}

The server app may look something like this:

Inside your declareConfiguration, you will want to create a new structure holding connectionId and sequenceId:

@Override
   public void declareConfiguration(Builder builder) {     
      ....
      builder.parallelTracks(2); //or any other number
      builder.defineStruct()
            .addField("connectionId", StructType.Long, 0, Field.CONNECTION_ID)
            .addField("sequenceId", StructType.Long, 0, Field.SEQUENCE_ID)     
            .register(Struct.DATA);          
      ....
   }

Inside declareParallelBehavior, we want to figure out the connection ID (ideally inside a REST listener) and register the blocking listener. We use declareParallelBehavior because we have multiple tracks.

@Override
   public void declareParallelBehavior(GreenRuntime runtime) {
      ....
      // You will need to use your own topic to use the code from line 8-15.
      // The REST listener is just for demonstration purposes. However,
      // in the listener in which you want trigger database or blocking operations,
      // make sure to set the CONNECTION_ID as seen below.
      PubSubFixedTopicService pub = runtime.newCommandChannel().newPubSubService("testTopicA");     
      runtime.addRestListener("restListener",(r)->{
         return pub.publishTopic((w)->{
            // Write connection ID and sequence ID.
            w.structured().writeLong(Field.CONNECTION_ID, r.getConnectionId());
            w.structured().writeLong(Field.SEQUENCE_ID, r.getSequenceCode());
            w.structured().selectStruct(Struct.DATA); 
         });
      }).includeRoutesByAssoc(Struct.ROUTE);
      
      // Here, we actually register the blocking listener.
      runtime.registerBlockingListener(
            ()->{
               // !!! Must be created new every time this lambda is called.
               // !!! This lambda is called once per thread on startup.
               return new ExampleBlockingBehavior();
            },
            Field.CONNECTION_ID, "testTopicA", "testTopicB");
      // The result of the blocking behavior will be written to PubSub topic testTopicB.
      ... 
   }

Last updated