Blocking Behaviors

Blocking Behaviors are a type of behavior that have to perform some tasks that can block threads. Due to the nature of GreenLightning and its scheduling, it is necessary to have blocking behaviors for tasks such as external database calls.

Creating a Custom Blocking Behavior

Blocking Behaviors can be created like normal behaviors. However, they have a special register method that needs to be adhered to.

Below is an example of a blocking behavior inside a HTTP client. Edit this example to add other libraries (such as JDBC-MySQL) that would otherwise block a GreenLightning application.

A blocking behavior requires a unique field association to ensure correct ordering of inputs and outputs. For HTTP, we will use the connectionID as this field association.

ExampleBlockingBehavior
package com.ociweb.gl.example.blocking;

import com.ociweb.gl.api.blocking.BlockingBehavior;
import com.ociweb.pronghorn.pipe.ChannelReader;
import com.ociweb.pronghorn.pipe.ChannelWriter;

public class ExampleBlockingBehavior extends BlockingBehavior {
   // The connection and sequence ID for HTTP to ensure ordering.
   private long connectionId = -1;
   private long sequenceId = -1;

   /** 
   * Called when a message gets passed to this behavior to perform some
   * task. These messages should be defined by you and can be read using 
   * the reader.
   */      
   @Override
   public void begin(ChannelReader reader) {
      assert(-1 == connectionId);
      assert(-1 == sequenceId);
      
      // Read the connection ID passed in.
      connectionId = reader.structured().readLong(Field.CONNECTION_ID);
      sequenceId = reader.structured().readLong(Field.SEQUENCE_ID);   
      
      // Custom properties here
      // example:
      // currentDatabaseAction = reader.structured().readInteger(Field.DB_ACTION);
   }

   @Override
   public void run() throws InterruptedException {
      // **********************************************
      // Here, perform an action that would block the current thread.
      // This could be creating a database connection,
      // querying a table, calling a library or a native call,
      // etc...
      Thread.sleep(1); // You can delete this, it is just an example of blocking
      // **********************************************
   }

   @Override
   public void finish(ChannelWriter writer) {
      // We need to write back our data.
      // This is pretty much the begin() method but in reverse.
      writer.structured().writeLong(Field.CONNECTION_ID, connectionId);
      writer.structured().writeLong(Field.SEQUENCE_ID, sequenceId);
      
      // Finish writing back to the structure, this always comes last!
      writer.structured().selectStruct(Struct.DATA);

      // Reset connectionId and sequenceId
      connectionId = -1;
      sequenceId = -1;
   }

   @Override
   public void timeout(ChannelWriter writer) {
      // If a blocking action takes too long (default timeout is 1 minute).
      System.out.println("This took too long to run..");      
      // NOTE: If you have a payload like mentioned above, you should update it
      // here to represent that a timeout occurred.
   }
}

// Separate files:

public enum Field
{
   CONNECTION_ID, SEQUENCE_ID;
}

public enum Struct {
	DATA;
}

The server app may look something like this:

Inside your declareConfiguration, you will want to create a new structure holding connectionId and sequenceId:

Inside declareParallelBehavior, we want to figure out the connection ID (ideally inside a REST listener) and register the blocking listener. We use declareParallelBehavior because we have multiple tracks.

Last updated