The Producer

In Pronghorn, a producer is simply responsible for providing data for for parts of a graph. Think of it as (usually) the start of your tree: It consistently checks for data or produces initial data that then flows through pipes and through the stages.

In this Hello World Example, we create a producer that consistently generates a "Hello World" messages, that will then be received by another stage (not this producer). Although not very useful, this demonstrate basic Pronghorn concepts such as how data passing is done.

Creating the Producer

First, let's create a new file named "GreeterStage.java" under src/test/java/com.ociweb

Every new Pronghorn stage needs to extend PronghornStage and have the following components:

  • A constructor that calls super() and passes in expected input and output pipes.

  • A startup() method that allows you to initialize your objects and values.

  • A run() method that performs the logic of the stage.

  • An optional shutdown() method that lets you clean up the stage.

Using those requirements, let's see what our GreeterStage.java looks like:

package com.ociweb;

import com.ociweb.pronghorn.pipe.Pipe;
import com.ociweb.pronghorn.stage.PronghornStage;
import com.ociweb.pronghorn.stage.scheduling.GraphManager;

public class GreeterStage extends PronghornStage {

    private Pipe<HelloWorldSchema> output;
    private String name;

    public GreeterStage(GraphManager graphManager, String name, Pipe<HelloWorldSchema> output) {
        super(graphManager, NONE, output);

        this.name = name;
        this.output = output;
    }

    @Override
    public void startup() {

    }

    @Override
    public void run() {

    }

    @Override
    public void shutdown() {
        
    }
}

Lines 12-16 are interesting:

  • We store a local reference to the output pipe, which will be given to us in whatever uses this stage.

  • We create our constructor with two parameters:

    • The graphManager, which is a Pronghorn class that manages and renders your stage to the graph, thus making it discoverable.

    • The output pipe, which will be written to during the run() procedure. This will contain our greeting.

    • We set the name to be greeted on line 15.

    • We set the output pipe locally on line 16.

We can safely ignore the startup() method for now since we are not initializing any other objects or configuring anything.

However, the run() method is important. We want to accomplish the following:

  • Open our output pipe.

  • While the pipe allows us to write, write a simple HelloWorld message.

  • Then, close and send the "poison pill" to let our graph know that the producer's task of writing to the pipe is accomplished.

Let's take a look at the code:

@Override
public void run() {
    if(PipeWriter.tryWriteFragment(output, HelloWorldSchema.MSG_HELLOWORLDMESSAGE_1)) {

        PipeWriter.writeUTF8(output, HelloWorldSchema.MSG_HELLOWORLDMESSAGE_1_FIELD_GREETINGNAME_100, name);

        PipeWriter.publishWrites(output);
        PipeWriter.publishEOF(output);
        
        requestShutdown();

    }
}
  • On line 3, we first check that we can write to our pipe using the schema by seeing if tryWriteFragment is true. We use the constant MSG_HELLOWORLDMESSAGE_1 provided to us by our previously generated schema.

  • On line 5, using the PipeWriter, we write a UTF-8 encoded string (the name variable) to the GREETINGNAME field as previously defined in our schema.

  • On lines 7-8, we publish to our pipe and then send an end-of-file message to the pipe to let our stages know we ended producing values.

  • On line 10, we request the stage to be shut down (since nothing more is being produced).

Done! Ideally, we would let our users know if the pipe is full or if any other error occurred, but that's currently not in the scope of this tutorial.

On the next page, we will create another stage that will read the HELLOWORLDSCHEMA from any pipe and dump it to the console.

Last updated