# Building Stages

![Demonstrating stages and pipes.](/files/-LEAv4aJ6u-RRSKGht9M)

In graph theory, **stages** are better known as vertices or nodes, while **pipes** are known as edges or lines. These concepts are an integral part in the [Actor model](https://en.wikipedia.org/wiki/Actor_model). As an actor framework, Pronghorn has **stages** that can receive messages, make local decisions, process data, and respond to messages with new messages. Messages are passed through **pipes**, which allow for data to be shared between stages.

Every stage is defined with a fixed number of inputs and/or outputs. The `PronghornStage` class is an abstract class that provides a protocol for creating custom stages. Please follow the guidelines below for writing correct Pronghorn code.

### Create your first Stage <a href="#create-your-first-stage" id="create-your-first-stage"></a>

Add a new Java class to your existing Pronghorn project (or [create a new project](https://oci-pronghorn.gitbook.io/pronghorn/chapter-1-getting-started-with-pronghorn/1.-hello-world-introduction/0.-getting-started)). If you prefer, create a new package in your project called “Stages” and place it there. Use the code below to help you get started:

```java
package com.ociweb.stages;

import com.ociweb.pronghorn.pipe.Pipe;
import com.ociweb.pronghorn.pipe.RawDataSchema;
import com.ociweb.pronghorn.pipe.RawDataSchema;
import com.ociweb.pronghorn.stage.PronghornStage;
import com.ociweb.pronghorn.stage.scheduling.GraphManager;

public class ExampleStage extends PronghornStage {

    Pipe<RawDataSchema> input, output;

    public ExampleStage(GraphManager gm, Pipe<RawDataSchema> input, Pipe<RawDataSchema>  output) {

        super(gm, input, output);

        this.input = input;
        this.output = output;

    }

    public ExampleStage newInstance(GraphManager gm, Pipe<RawDataSchema> input, Pipe<RawDataSchema>  output) {

        return new ExampleStage(gm, input, output);

    }

    @Override
    public void startup() {

        // Initialize your objects and values here

    }

    @Override
    public void run() {

        // Put your stage logic here

    }

    // Only override this when you are certain that clean up needs to be performed before shutdown
    // Otherwise, do not use this
    @Override
    public void shutdown() {

        // Clean up

    }

}
```

Depending on the functionality of your stage, you may need to change `RawDataSchema` to the schema that your stage will be processing. More on this can be found on the [Schema](https://oci-pronghorn.gitbook.io/pronghorn/chapter-2-schemas/defining-schemas) page.

* The constructor should be responsible for saving references to your pipes and defining [notas](https://oci-pronghorn.gitbook.io/pronghorn/chapter-3-stages/notas).

Every Pronghorn stage requires you to set the GraphManager. You are not, however, required to have output or input pipes. If you choose to do so, `super()`will have to look similar to this:

```java
super(gm, NONE, NONE);
```

where `NONE` is a constant declaring an empty pipe.

* The static new instance method is Pronghorn convention for instantiating new stages. You are not required to have this, it exists primarily for readability reasons.
* The `startup()` method gets visited when your stage is requested to start.

Instead of allocating and initializing local objects and variables in the class constructor, do it instead in `startup()`. Any other settings except notas should be configured here as well.

* The `run()` method gets called depending on the current scheduled rate (which can be changed using notas).

Fine-tuning this rate can vastly improve performance and/or its ability to handle larger volume of data. Reading from input pipes, writing to output pipes, and performing the core logic of your stage should be done here.

Shutting down your stage is another important task to do in your `run()` method. Once your processing has finished, you need to let Pronghorn know that work is done and that `run()` should not be executed again. You do this using the `requestShutdown()` method. Do **not** call `shutdown()`.

**Important:** It is important that the `outputPublish` needs to occur before any of the inputs release.

#### Producers <a href="#producers" id="producers"></a>

**Producers** are the start of your graph and are responsible for providing data to the rest of the stages. You can have multiple producers on the same graph for different subsets of your graph.

An example of a producer would be the `FileBlobReadStage`. It opens a file and passes the data onto a pipe into the next stage for further processing, e.g. to parse a certain value from the file or to replicate the file onto other stages.

### Using your Stage <a href="#using-your-stage" id="using-your-stage"></a>

To use and test your stage, it needs to be added to the current GraphManager.

```java
public class CoolPronghornApp {

  final GraphManager gm = new GraphManager();

  public static void main(String[] args) {

     populateGraph(gm);

     //turn on awesome free telemetry
     gm.enableTelemetry(7777);

     StageScheduler.defaultScheduler(gm).startup();

  }

  private static void populateGraph(GraphManager gm) {

     //Create test pipes using the standard RawDataSchema:
     Pipe<RawDataSchema> pipeA = RawDataSchema.instance.newPipe(10, 10_000);
     Pipe<RawDataSchema> pipeB = RawDataSchema.instance.newPipe(20, 20_000);

     //Pronghorn convention:
     ExampleStage.newInstance(gm, pipeA, pipeB);

     //If you chose the standard instantiation:
     //new ExampleStage(gm, pipeA, pipeB);

  }

}
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://oci-pronghorn.gitbook.io/pronghorn/chapter-3-stages/building-stages.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
