Building Stages

Last updated 6 months ago
Demonstrating stages and pipes.

In graph theory, stages are better known as vertices or nodes, while pipes are known as edges or lines. These concepts are an integral part in the Actor model. As an actor framework, Pronghorn has stages that can receive messages, make local decisions, process data, and respond to messages with new messages. Messages are passed through pipes, which allow for data to be shared between stages.

Every stage is defined with a fixed number of inputs and/or outputs. The PronghornStage class is an abstract class that provides a protocol for creating custom stages. Please follow the guidelines below for writing correct Pronghorn code.

Create your first Stage

Add a new Java class to your existing Pronghorn project (or create a new project). If you prefer, create a new package in your project called “Stages” and place it there. Use the code below to help you get started:

package com.ociweb.stages;
import com.ociweb.pronghorn.pipe.Pipe;
import com.ociweb.pronghorn.pipe.RawDataSchema;
import com.ociweb.pronghorn.pipe.RawDataSchema;
import com.ociweb.pronghorn.stage.PronghornStage;
import com.ociweb.pronghorn.stage.scheduling.GraphManager;
public class ExampleStage extends PronghornStage {
Pipe<RawDataSchema> input, output;
public ExampleStage(GraphManager gm, Pipe<RawDataSchema> input, Pipe<RawDataSchema> output) {
super(gm, input, output);
this.input = input;
this.output = output;
}
public ExampleStage newInstance(GraphManager gm, Pipe<RawDataSchema> input, Pipe<RawDataSchema> output) {
return new ExampleStage(gm, input, output);
}
@Override
public void startup() {
// Initialize your objects and values here
}
@Override
public void run() {
// Put your stage logic here
}
// Only override this when you are certain that clean up needs to be performed before shutdown
// Otherwise, do not use this
@Override
public void shutdown() {
// Clean up
}
}

Depending on the functionality of your stage, you may need to change RawDataSchema to the schema that your stage will be processing. More on this can be found on the Schema page.

  • The constructor should be responsible for saving references to your pipes and defining notas.

Every Pronghorn stage requires you to set the GraphManager. You are not, however, required to have output or input pipes. If you choose to do so, super()will have to look similar to this:

super(gm, NONE, NONE);

where NONE is a constant declaring an empty pipe.

  • The static new instance method is Pronghorn convention for instantiating new stages. You are not required to have this, it exists primarily for readability reasons.

  • The startup() method gets visited when your stage is requested to start.

Instead of allocating and initializing local objects and variables in the class constructor, do it instead in startup(). Any other settings except notas should be configured here as well.

  • The run() method gets called depending on the current scheduled rate (which can be changed using notas).

Fine-tuning this rate can vastly improve performance and/or its ability to handle larger volume of data. Reading from input pipes, writing to output pipes, and performing the core logic of your stage should be done here.

Shutting down your stage is another important task to do in your run() method. Once your processing has finished, you need to let Pronghorn know that work is done and that run() should not be executed again. You do this using the requestShutdown() method. Do not call shutdown().

Important: It is important that the outputPublish needs to occur before any of the inputs release.

Producers

Producers are the start of your graph and are responsible for providing data to the rest of the stages. You can have multiple producers on the same graph for different subsets of your graph.

An example of a producer would be the FileBlobReadStage. It opens a file and passes the data onto a pipe into the next stage for further processing, e.g. to parse a certain value from the file or to replicate the file onto other stages.

Using your Stage

To use and test your stage, it needs to be added to the current GraphManager.

public class CoolPronghornApp {
final GraphManager gm = new GraphManager();
public static void main(String[] args) {
populateGraph(gm);
//turn on awesome free telemetry
gm.enableTelemetry(7777);
StageScheduler.defaultScheduler(gm).startup();
}
private static void populateGraph(GraphManager gm) {
//Create test pipes using the standard RawDataSchema:
Pipe<RawDataSchema> pipeA = RawDataSchema.instance.newPipe(10, 10_000);
Pipe<RawDataSchema> pipeB = RawDataSchema.instance.newPipe(20, 20_000);
//Pronghorn convention:
ExampleStage.newInstance(gm, pipeA, pipeB);
//If you chose the standard instantiation:
//new ExampleStage(gm, pipeA, pipeB);
}
}