Composing a workflow from responders

Introduction

This chapter introduces the Microbase notification system and describes how to use it to ‘wire up’ a number of responders to form a workflow.

Responders are event-driven, reusable, independent agents that handle task and data management duties for a single unit of workflow functionality. Some responders are provided with Microbase and provide generic functions that may be applicable for a wide range of workflows.

Other responders may be responsible for wrapping a single domain-specific analysis tool. Almost any existing command line tool can be incorporated into a Microbase workflow. However, most command line tools are not aware of distributed computing environments and therefore require a responder to perform certain actions such as:

  • Obtain required input files from remote servers.
  • Formulate command lines for running the tool.
  • Upload output files to remote servers.
  • Interpret result data within the context of the workflow.

Responders therefore act as the bridge between existing stand-alone analysis tools and their parallel execution within the distributed computing environment provided by Microbase.

Terminology

Before we start defining responders and workflows, here’s a list of the technical terms used throughout this chapter:

  • Message. Represents an event, for instance, the arrival of a new data item, or the completion of an analysis. Messages are stored permanently and are immutable once published.
  • Topic. The type of a message. Responders can register an interest in messages tagged with a particular topic.
  • Responder. A unit of domain-specific functionality. The term ‘responder’ is overloaded and can mean any of the following, depending on context:
    • Responder logic: a module of code, for example an independent Maven artifact that implements a set of related domain-specific functionality, including but not limited to: formulating command lines; parsing files; and writing to databases.
    • Responder context: a component in a workflow, configured to respond to specific message types. There may be multiple responder contexts within a workflow and they may use the same or different responder implementation.
    • Responder instance: a configured copy of a responder context that is executed by a Microbase minion operating within a horde. This instance may is configured by appropriate responder configuration objects that apply to the current minion’s horde.

Defining a workflow

Responders within a Microbase system work largely independently. However, one responder may trigger another responder by firing a suitable message. Therefore, there is no such thing as a defined ‘workflow’ in Microbase; the set of individual topic-responder subscriptions within the Microbase notification system gives rise to a set of organised computations resembling a workflow. This approach has a number of advantages:

  • Workflows are not rigid structures. They can expand with your requirements.
  • When appropriately designed, adding new tools to an existing workflow does not require the recomputation of previous results.
  • If you opt to store result data in an Entanglement graph, results from newly added tools can be automatically integrated with existing results.

Registering responder types and message topics

When designing an analysis pipeline, the first step is usually to decide which tasks should be performed in which order. As a workflow engineer, you’ll typically have some idea of which data items that must be processed by which tools in which order. It is important to understand the tools you require and the data flow dependencies between them before you start using or writing responders.

This chapter describes how to construct a simple workflow that, when triggered, executes a number of independent tasks in the correct order. We will start with 4 tasks, and then show how to expand the workflow in a later section.

Topics define which message types trigger which responder(s). A topic can be configured to trigger zero or more responders. Multiple responders can be chained together by registering the output message type of one responder as the input type to another. Currently, Microbase 3.0 supports responders with one incoming message type; that is, a responder can register an interest with up to one topic. However, a responder can publish as many messages with as many topics as required. Whilst publishing multiple messages of different types is useful in some situations, most responders will typically only require the use of 3 distinct message topics:

  1. an incoming topic;
  2. a ‘result’ topic for outgoing messages;
  3. and a message topic associated with errors.

For a given responder, registering only the incoming topic type is sufficient for a fully functional workflow. However, it is useful for documentation, provenance, and debugging reasons to record these associations. Therefore, Microbase supports all three common responder-topic associations; these are analogous to the well-known Unix concepts of “standard in, standard out, and standard error”.

First, we must register 4 distinct responder types, and their associated message types (topics):

Each of the above commands creates a new ResponderType node, and if necessary, appropriate Topic nodes within the notification system graph that you configured in the previous chapter. Responders, topics and interest subscription information for any given workflow is stored within an Entanglement graph. Since these details are persistent, they only need to be configured once per graph.

After executing the IRC commands listed above, the Microbase graph now contains the appropriate nodes and edges to represent the following conceptual workflow:

Note that we could have created the topics, responder types, and subscription information independently by using three separate commands. However, in most real-world cases, it is more convenient to use the create-responder command as shown above.

The responder implementation used for all four task types (Task1 – Task4) is the HelloWorldResponder. This is a single class responder implementation that simply performs some simulated work by blocking a CPU thread and then releasing it after a specified amount of time. The ‘hello world’ responder is simple, but it demonstrates how to do the following operations:

  • Access a property a the responder configuration object
  • Parse an incoming message
  • Act on the content of the message
  • Upload a sample result file to the Microbase filesystem
  • Publish an outgoing message to the notification system

The ‘hello world’ responder has been designed such that the output messages are suitable for use as input messages to other ‘hello world’-based responders. We can therefore daisy-chain different instances of the same implementation together, as shown in the workflow diagram above.

Running a workflow

Creating configurations for responders and running a workflow

Our current configuration contains a number of responders, topics, and subscriptions, which together form a workflow. These items are stored persistently within a database. However, responders also require a transient runtime configuration before they will execute. Responder configuration objects apply to given horde and are available as long as at least one minion is running.

Responder configuration objects provide Microbase with information about how to run a responder, including any required limits on parallelism such as the number of local or global concurrent jobs. Responder configurations may also override horde configuration properties. Finally, key/value pairs (analogous to environment variables) may be defined to specify implementation-specific configurations.

For example, the following command would create a responder configuration for a responder type named ‘some-task’, while executed by a minion of the horde ‘some-horde’. The default levels of parallelism are set (by their absence). Finally, two environment variables ‘foo’ and ‘bar’ are set. These key-value pairs are made available to responder instances.

Different responder configurations can be created for different hordes of minions, allowing responder instances executing within different compute clusters to take advantage of local resources.

Create a configuration object for each of the first of the four responder types that you defined in the previous section:

The first responder (Task1), is now configured. You can verify that this is the case by querying the minion:

Because responder ‘Task1’ is now configured and present on the system, the minion is now periodically checking for new work for responder ‘Task1’. Currently, no work is available because we haven’t yet published a message. We’ll do that next.

A word about IRC channels

Up to this point, we’ve been using the sole channel #microbase. By default, minions use this channel to interact with the user. Users type commands here, and the immediate output of the command is returned. There are two other channels that you should join now: #microbase.rt and #microbase.res. The first of these is used for logging the output of minion background processes (job queue populator, and job scheduler). The second channel, #microbase.res, is used for logging the output of running responder instances.

We have separated these types of content into their own channels since some processing operations tend to be quite verbose with their logging. The minion will accept command line input from any channel. The immediate output from the command line will always be sent to the channel on which the command line was issued.

Publishing a message

Responders will remain idle until triggered by a message with the appropriate topic. For simplicity, we will now publish a message that will trigger the responder at the top of the ‘workflow’ (Task1). However, bear in mind that any responder can be triggered at any time with a suitably-crafted message; i.e., not just the responder at the ‘top’ of the ‘workflow’. It is possible to construct a number of independent mini processing pipelines that may also be linked together. It is also possible to publish a message for any responder at any time, or even for non-existent responders. Responders will only execute if they are: a) installed on the classpath; b) configured.

Publish a message that triggers Task1:

The immediate output is as follows (indicating that the message was successfully stored). Microbase messages may contain any number of key/value properties. The publish command supports populating message content in this way from the IRC command line. Above, we set the property max_work_time as a content property of the message.

Now, keep an eye on the #microbase.rt and #microbase.res channels (it may be helpful to open them in separate windows). After a few seconds you should see the following:

This output indicates that the minion found a new message for Task1 and populated the distributed job queue. Then, the message was scheduled for processing. However, something went wrong while processing, and the exit status was a failure. Microbase has now marked this message (with respect to responder Task1 as ‘failed’).

To see why the job failed, we need to examine the responder channel, #microbase.res:

Here, we see a Java exception in red. The message indicates that the responder needs a specific configuration property destination_bucket. The responder requires a location within the Microbase filesystem in order to upload its result file. We haven’t specified this.

Correcting a configuration error

Next, let’s re-write the responder configuration object and include the required configuration property. This command is the same as before, but this time, we’ve added a configuration property:

Re-try the failed job

Since the message failed to process correctly, Microbase has now marked the job as ‘failed’. Whilst it is in the ‘failed’ state, nothing further will happen with it.

For this tutorial, it would be straightforward enough simply to re-publish a new message using the notification/publish command as above (in fact, you can try this if you wish). However, in the real world, it may be more desirable to simply re-try the existing (failed) tasks. Here’s how to reset the state of the ‘failed’ message. Microbase stores the entire state history for every job. The previous failure and subsequent state reset is all recorded in the database and is queryable at a later point, if necessary. Execute the following command to reset the message state to READY.

Note that the alter-message-states command can be used to set messages from any state to any state. This can be useful when debugging or testing a responder implementation. By default though, it will simply reset ‘failed’ tasks to ‘ready’. As a slight digression, you can use the IRC help command to find out more about any command, and what it’s default values are. Try the following:

You can use the help command on its own to obtain a list of all the supported commands.

By now, you might have noticed that the job has re-executed, and has been successful this time:

The bullet points in the screenshot from #microbase.res were output from the responder instance as it executed (as indicated by the [Task1|HelloWorldResponder] line prefix. These log outputs clearly show the stages of responder execution:

  1. The responder receives a new message.
  2. The responder is given the opportunity to perform any necessary cleanup operations (for example, from a partially-completed previous execution).
  3. The responder starts processing the message. The processing takes a random amount of time, up to the limit specified in the message configuration property max_work_time (we set this property when publishing the message).

Observe generated result file

We mentioned at the start of this tutorial that the responder creates an output file, which is then placed into the Microbase filesystem.

Recall the Microbase filesystem configuration from the previous chapter. In a file explorer utility, navigate to the directory that you specified as the ‘root’ for the Microbase filesystem (e.g., /tmp/microbase for Unix-based systems, or c:\temp\microbase for Windows machines). Within the root directory, you should notice a subdirectory, foo.

The directory foo was specified by the configuration option we set for the responder. Within that directory, are further directories that are hard-coded into the responder implementation. Finally, the output file contains the following content:

 

Configurations for the Responders: Tasks2-4

At this point, we have a correctly configured responder: Task1. That responder has executed a simulated unit of computational work. Looking back at our workflow sketch, we can see that both Task2 and Task3 take the output of Task1. Finally, Task4 operates on the output of Task2. These responders haven’t executed yet, because they are not yet configured. Create new configuration objects for the remaining responders:

These commands are very similar to the one we executed to create the configuration for Task1 earlier. Note that we’re using a destination_bucket=bar for task4.

You should notice that Task2 & Task3 execute automatically, as soon as they have been configured. This is because they have detected the ‘completion’ message of Task1 that we ran earlier:

Whilst Task2 and Task3 execute in no particular order (they both have the same trigger), Task4 executes last because it depends on the presence of Task2’s completion message.

Meanwhile, the internal Microbase process log looks like this:

The final filesystem tree should look similar to the following. Note the different placement of Task4’s output file, due to the different responder configuration setting:

Extending the workflow

Let’s add two new responders, Task5 and Task6, such that they form a new mini pipeline that works on the results of Task1.

Also create configurations for the two new responders:

The notification system graph now represents the following workflow:

Notice that, simply the act of adding the new responder definitions and configurations causes an immediate execution of Task5 and then Task6. This is because our new mini workflow is triggered by the output of Task1, which has already completed processing a message. Task5 and Task6 therefore ‘catch up’ with the rest of the system:

New result files also appear:

Dynamic reconfiguration: Add more machines

A Microbase cluster can be extended in a straightforward manner.

Start one or more additional nodes, using the method described in the last chapter (replace XX to construct unique minion IDs):

Then, define the same database cluster as the one currently in use with minion01:

Finally, we need some work for each of these machines to process. The publish command introduced earlier can send multiple copies of the same message (different messages, same content). Try this now:

Notice that the new responder(s) simply pick up jobs from the queue and start executing them:

 

Dynamic reconfiguration: Amazon’s S3

You’ve achieved a lot in this tutorial. While you’ve got everything still running, there’s one more part of Microbase functionality to demonstrate – dynamic reconfiguration.

Recall from the previous chapter, that we configured a Microbase fileystem using the ‘LOCAL_DIR’ implementation:

We’re now going to dynamically the horde to use a new filesystem definition that point towards Amazon’s S3 storage system. For this to work, you’ll need an AWS account and a set access keys; we assume you have these with appropriate access permissions. Create an Amazon bucket using their Web interface:

Next, create the new filesystem configuration, substituting the blanks with your keys:

Then, update the horde configuration to use the new remote filesystem:

Also update your responders to use the ‘bucket’ you created on S3 (substitute microbase.test with whichever bucket you created on S3).

Finally, reconfigure any minions:

At this point, we have updated the horde configuration with the new filesystem. Publish a new message to re-trigger the entire workflow. This time, the workflow will execute locally, but store its results on Amazon’s S3 system.

After a while, refresh the AWS bucket page. You should see a number of new directories have been created in the specified bucket:

Navigating these directories reveals the expected result files:

Summary

This tutorial has shown you how to:

  • Get to grips with the IRC command line interface
  • Specify persistent responder and topic definitions whose emergent behaviour resembles a workflow of tasks.
  • Configure a running Microbase cluster with horde-specific responder configurations.
  • Publish messages that trigger one or more responders
  • Manipulate message/responder states to enable re-processing of failed jobs.
  • Extend the workflow with new responders, and observe the new responders catching up with the rest of the system.
  • Reconfigure the system to store data on Amazon’s S3.