Blocking Behaviors are a type of behavior that have to perform some tasks that can block threads. Due to the nature of GreenLightning and its scheduling, it is necessary to have blocking behaviors for tasks such as external database calls.
Creating a Custom Blocking Behavior
Blocking Behaviors can be created like normal behaviors. However, they have a special register method that needs to be adhered to.
Below is an example of a blocking behavior inside a HTTP client. Edit this example to add other libraries (such as JDBC-MySQL) that would otherwise block a GreenLightning application.
A blocking behavior requires a unique field association to ensure correct ordering of inputs and outputs. For HTTP, we will use the connectionID as this field association.
ExampleBlockingBehavior
packagecom.ociweb.gl.example.blocking;importcom.ociweb.gl.api.blocking.BlockingBehavior;importcom.ociweb.pronghorn.pipe.ChannelReader;importcom.ociweb.pronghorn.pipe.ChannelWriter;publicclassExampleBlockingBehaviorextendsBlockingBehavior {// The connection and sequence ID for HTTP to ensure ordering.privatelong connectionId =-1;privatelong sequenceId =-1; /** * Called when a message gets passed to this behavior to perform some * task. These messages should be defined by you and can be read using * the reader. */ @Overridepublicvoidbegin(ChannelReader reader) {assert(-1== connectionId);assert(-1== sequenceId);// Read the connection ID passed in. connectionId =reader.structured().readLong(Field.CONNECTION_ID); sequenceId =reader.structured().readLong(Field.SEQUENCE_ID); // Custom properties here// example:// currentDatabaseAction = reader.structured().readInteger(Field.DB_ACTION); } @Overridepublicvoidrun() throwsInterruptedException {// **********************************************// Here, perform an action that would block the current thread.// This could be creating a database connection,// querying a table, calling a library or a native call,// etc...Thread.sleep(1); // You can delete this, it is just an example of blocking// ********************************************** } @Overridepublicvoidfinish(ChannelWriter writer) {// We need to write back our data.// This is pretty much the begin() method but in reverse.writer.structured().writeLong(Field.CONNECTION_ID, connectionId);writer.structured().writeLong(Field.SEQUENCE_ID, sequenceId);// Finish writing back to the structure, this always comes last!writer.structured().selectStruct(Struct.DATA);// Reset connectionId and sequenceId connectionId =-1; sequenceId =-1; } @Overridepublicvoidtimeout(ChannelWriter writer) {// If a blocking action takes too long (default timeout is 1 minute).System.out.println("This took too long to run.."); // NOTE: If you have a payload like mentioned above, you should update it// here to represent that a timeout occurred. }}// Separate files:publicenumField{ CONNECTION_ID, SEQUENCE_ID;}publicenumStruct { DATA;}
The server app may look something like this:
Inside your declareConfiguration, you will want to create a new structure holding connectionId and sequenceId:
@OverridepublicvoiddeclareConfiguration(Builder builder) { ....builder.parallelTracks(2); //or any other numberbuilder.defineStruct().addField("connectionId",StructType.Long,0,Field.CONNECTION_ID).addField("sequenceId",StructType.Long,0,Field.SEQUENCE_ID) .register(Struct.DATA); .... }
Inside declareParallelBehavior, we want to figure out the connection ID (ideally inside a REST listener) and register the blocking listener. We use declareParallelBehavior because we have multiple tracks.
@OverridepublicvoiddeclareParallelBehavior(GreenRuntime runtime) {....// You will need to use your own topic to use the code from line 8-15.// The REST listener is just for demonstration purposes. However,// in the listener in which you want trigger database or blocking operations,// make sure to set the CONNECTION_ID as seen below.PubSubFixedTopicService pub =runtime.newCommandChannel().newPubSubService("testTopicA"); runtime.addRestListener("restListener",(r)->{returnpub.publishTopic((w)->{// Write connection ID and sequence ID.w.structured().writeLong(Field.CONNECTION_ID,r.getConnectionId());w.structured().writeLong(Field.SEQUENCE_ID,r.getSequenceCode());w.structured().selectStruct(Struct.DATA); }); }).includeRoutesByAssoc(Struct.ROUTE);// Here, we actually register the blocking listener.runtime.registerBlockingListener( ()->{// !!! Must be created new every time this lambda is called.// !!! This lambda is called once per thread on startup.returnnewExampleBlockingBehavior(); },Field.CONNECTION_ID,"testTopicA","testTopicB");// The result of the blocking behavior will be written to PubSub topic testTopicB.... }